• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python remoting.decode函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyamf.remoting.decode函数的典型用法代码示例。如果您正苦于以下问题:Python decode函数的具体用法?Python decode怎么用?Python decode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了decode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_invalid_header_data_length

    def test_invalid_header_data_length(self):
        remoting.decode('\x00\x00\x00\x01\x00\x04name\x00\x00\x00\x00\x06\x0a'
            '\x00\x00\x00\x00\x00\x00')

        self.failUnlessRaises(pyamf.DecodeError, remoting.decode,
            '\x00\x00\x00\x01\x00\x04name\x00\x00\x00\x00\x06\x0a\x00\x00\x00'
            '\x00\x00\x00', strict=True)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:7,代码来源:test_remoting.py


示例2: test_amf_version

    def test_amf_version(self):
        for x in ("\x00", "\x03"):
            try:
                remoting.decode(x)
            except EOFError:
                pass

        self.failUnlessRaises(pyamf.DecodeError, remoting.decode, "\x10")
开发者ID:kruser,项目名称:zualoo,代码行数:8,代码来源:test_remoting.py


示例3: test_amf_version

    def test_amf_version(self):
        for x in ('\x00', '\x03'):
            try:
                remoting.decode(x)
            except IOError:
                pass

        self.failUnlessRaises(pyamf.DecodeError, remoting.decode, '\x10')
开发者ID:danielor,项目名称:TowerSaint,代码行数:8,代码来源:test_remoting.py


示例4: test_invalid_body_data_length

    def test_invalid_body_data_length(self):
        remoting.decode('\x00\x00\x00\x00\x00\x01\x00\x09test.test\x00\x02/1'
            '\x00\x00\x00\x13\x0a\x00\x00\x00\x01\x08\x00\x00\x00\x00\x00\x01'
            '\x61\x02\x00\x01\x61\x00\x00\x09')

        self.failUnlessRaises(pyamf.DecodeError, remoting.decode,
            '\x00\x00\x00\x00\x00\x01\x00\x09test.test\x00\x02/1\x00\x00\x00'
            '\x13\x0a\x00\x00\x00\x01\x08\x00\x00\x00\x00\x00\x01\x61\x02\x00'
            '\x01\x61\x00\x00\x09', strict=True)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:9,代码来源:test_remoting.py


示例5: test_client_version

 def test_client_version(self):
     """
     Tests the AMF client version.
     """
     for x in ('\x00', '\x01', '\x03'):
         try:
             remoting.decode('\x00' + x)
         except IOError:
             pass
开发者ID:cardmagic,项目名称:PyAMF,代码行数:9,代码来源:test_remoting.py


示例6: test_invalid_header_data_length

    def test_invalid_header_data_length(self):
        remoting.decode("\x00\x00\x00\x01\x00\x04name\x00\x00\x00\x00\x06\x0a" "\x00\x00\x00\x00\x00\x00")

        self.failUnlessRaises(
            pyamf.DecodeError,
            remoting.decode,
            "\x00\x00\x00\x01\x00\x04name\x00\x00\x00\x00\x06\x0a\x00\x00\x00" "\x00\x00\x00",
            strict=True,
        )
开发者ID:kruser,项目名称:zualoo,代码行数:9,代码来源:test_remoting.py


示例7: test_client_version

 def test_client_version(self):
     """
     Tests the AMF client version.
     """
     for x in ("\x00", "\x01", "\x03"):
         try:
             remoting.decode("\x00" + x)
         except EOFError:
             pass
开发者ID:kruser,项目名称:zualoo,代码行数:9,代码来源:test_remoting.py


示例8: test_invalid_body_data_length

    def test_invalid_body_data_length(self):
        remoting.decode(
            "\x00\x00\x00\x00\x00\x01\x00\x09test.test\x00\x02/1"
            "\x00\x00\x00\x13\x0a\x00\x00\x00\x01\x08\x00\x00\x00\x00\x00\x01"
            "\x61\x02\x00\x01\x61\x00\x00\x09"
        )

        self.failUnlessRaises(
            pyamf.DecodeError,
            remoting.decode,
            "\x00\x00\x00\x00\x00\x01\x00\x09test.test\x00\x02/1\x00\x00\x00"
            "\x13\x0a\x00\x00\x00\x01\x08\x00\x00\x00\x00\x00\x01\x61\x02\x00"
            "\x01\x61\x00\x00\x09",
            strict=True,
        )
开发者ID:kruser,项目名称:zualoo,代码行数:15,代码来源:test_remoting.py


示例9: main

def main():
    """
    Run AMF decoder on input file.
    """
    (options, args) = parse_options()

    print 'Using pyamf from: %s' % (pyamf,)
    print 'Strict = ' + str(options.strict)

    for arg in args:
        for fname in glob.glob(arg):
            if fnmatch(fname, '*.amf*'):
                body = read_file(fname)

                try:
                    print "\nDecoding file:", fname
                    request = remoting.decode(body, None, options.strict)

                    if options.debug:
                        for name, message in request:
                            print "  %s: %s" % (name, message)
                except pyamf.UnknownClassAlias, c:
                    if options.debug:
                        print '\n    Warning: %s' % c
                except pyamf.DecodeError, c:
                    if options.debug:
                        print '\n    Warning: %s' % c
                except:
开发者ID:thijstriemstra,项目名称:pyamf-dumps,代码行数:28,代码来源:parse_dump.py


示例10: get_clip_info

def get_clip_info(const, playerID, videoPlayer, publisherID):
	conn = httplib.HTTPConnection("c.brightcove.com")
	envelope = build_amf_request(const, playerID, videoPlayer, publisherID)
	conn.request("POST", "/services/messagebroker/amf?playerKey=AQ~~,AAAAC_GBGZE~,q40QbnxHunHkwKuAvWxESNjERBgcAQY8", str(remoting.encode(envelope).read()), {'content-type': 'application/x-amf'})
	response = conn.getresponse().read()
	response = remoting.decode(response).bodies[0][1].body
	return response  
开发者ID:slices81,项目名称:xbmcplus-xbmc-plugins-beta,代码行数:7,代码来源:fox.py


示例11: __GetBrightCoveData

    def __GetBrightCoveData(self):
        """ Retrieves the Url's from a brightcove stream
        
        Arguments:
        playerKey : string - Key identifying the current request
        contentId : int    - ID of the content to retrieve
        url       : string - Url of the page that calls the video SWF
        seed      : string - Constant which depends on the website
        
        Keyword Arguments:
        experienceId : id     - <unknown parameter>
        
        Returns a dictionary with the data
        
        """

        # Seed = 61773bc7479ab4e69a5214f17fd4afd21fe1987a
        envelope = self.__BuildBrightCoveAmfRequest(
            self.playerKey, self.contentId, self.url, self.experienceId, self.seed
        )

        connection = httplib.HTTPConnection("c.brightcove.com")
        connection.request(
            "POST",
            "/services/messagebroker/amf?playerKey=" + self.playerKey,
            str(remoting.encode(envelope).read()),
            {"content-type": "application/x-amf"},
        )
        response = connection.getresponse().read()
        response = remoting.decode(response).bodies[0][1].body

        # self.logger.debug(response)
        return response["programmedContent"]["videoPlayer"]["mediaDTO"]
开发者ID:bethanie,项目名称:bartsidee-boxee,代码行数:33,代码来源:veronica.py


示例12: test_timezone

    def test_timezone(self):
        import datetime

        http_request = http.HttpRequest()
        self.executed = False

        td = datetime.timedelta(hours=-5)
        now = datetime.datetime.utcnow()

        def echo(d):
            self.assertEqual(d, now + td)
            self.executed = True

            return d

        gw = django.DjangoGateway({'test.test': echo}, timezone_offset=-18000,
            expose_request=False)

        msg = remoting.Envelope(amfVersion=pyamf.AMF0)
        msg['/1'] = remoting.Request(target='test.test', body=[now])

        http_request.method = 'POST'
        http_request.raw_post_data = remoting.encode(msg).getvalue()

        res = remoting.decode(gw(http_request).content)
        self.assertTrue(self.executed)

        self.assertEqual(res['/1'].body, now)
开发者ID:0xmilk,项目名称:appscale,代码行数:28,代码来源:test_django.py


示例13: get_clip_info

def get_clip_info(const, playerID, videoPlayer, publisherID):
    conn = httplib.HTTPConnection("c.brightcove.com")
    envelope = build_amf_request(const, playerID, videoPlayer, publisherID)
    conn.request("POST", "/services/messagebroker/amf?playerKey=AQ~~,AAAAFR9Ptpk~,qrsh31CHJoFjltWH9CfvxE3UxqGVBf9B", str(remoting.encode(envelope).read()), {'content-type': 'application/x-amf'})
    response = conn.getresponse().read()
    response = remoting.decode(response).bodies[0][1].body
    return response
开发者ID:AbsMate,项目名称:bluecop-xbmc-repo,代码行数:7,代码来源:hub.py


示例14: get

    def get(self, url):
        try:
            from pyamf import remoting
        except ImportError:
            log.error("You need to install pyamf to download content from kanal5 and kanal9")
            log.error("In debian the package is called python-pyamf")
            sys.exit(2)

        player_id = 811317479001
        publisher_id = 22710239001
        const = "9f79dd85c3703b8674de883265d8c9e606360c2e"
        env = remoting.Envelope(amfVersion=3)
        env.bodies.append(("/1", remoting.Request(target="com.brightcove.player.runtime.PlayerMediaFacade.findMediaById", body=[const, player_id, self.other, publisher_id], envelope=env)))
        env = str(remoting.encode(env).read())
        url = "http://" + url + "/services/messagebroker/amf?playerKey=AQ~~,AAAABUmivxk~,SnCsFJuhbr0vfwrPJJSL03znlhz-e9bk"
        header = "application/x-amf"
        data = get_http_data(url, "POST", header, env)
        streams = {}

        for i in remoting.decode(data).bodies[0][1].body['renditions']:
            stream = {}
            stream["uri"] = i["defaultURL"]
            streams[i["encodingRate"]] = stream

        test = select_quality(self.options, streams)

        filename = test["uri"]
        match = re.search("(rtmp[e]{0,1}://.*)\&(.*)$", filename)
        other = "-W %s -y %s " % ("http://admin.brightcove.com/viewer/us1.25.04.01.2011-05-24182704/connection/ExternalConnection_2.swf", match.group(2))
        download_rtmp(self.options, match.group(1), self.output, self.live, other, self.resume)
开发者ID:akeks,项目名称:svtplay-dl,代码行数:30,代码来源:svtplay_dl.py


示例15: amf_parse

def amf_parse(string, environ):
    try:
        from pyamf import remoting

        res = remoting.decode(BytesIO(string))

        #print(res)
        body = res.bodies[0][1].body[0]

        values = {}

        if hasattr(body, 'body'):
            values['body'] = body.body

        if hasattr(body, 'source'):
            values['source'] = body.source

        if hasattr(body, 'operation'):
            values['op'] = body.operation

        if environ is not None:
            environ['pywb.inputdata'] = res

        query = urlencode(values)
        #print(query)
        return query

    except Exception as e:
        import traceback
        traceback.print_exc()
        print(e)
        return None
开发者ID:gwu-libraries,项目名称:pywb,代码行数:32,代码来源:loaders.py


示例16: test_unknown_request

    def test_unknown_request(self):
        gw = _django.DjangoGateway()

        request = util.BufferedByteStream()
        request.write(
            "\x00\x00\x00\x00\x00\x01\x00\x09test.test\x00"
            "\x02/1\x00\x00\x00\x14\x0a\x00\x00\x00\x01\x08\x00\x00\x00\x00"
            "\x00\x01\x61\x02\x00\x01\x61\x00\x00\x09"
        )
        request.seek(0, 0)

        http_request = HttpRequest()
        http_request.method = "POST"
        http_request.raw_post_data = request.getvalue()

        http_response = gw(http_request)
        envelope = remoting.decode(http_response.content)

        message = envelope["/1"]

        self.assertEquals(message.status, remoting.STATUS_ERROR)
        body = message.body

        self.assertTrue(isinstance(body, remoting.ErrorFault))
        self.assertEquals(body.code, "Service.ResourceNotFound")
开发者ID:wogooo,项目名称:TowerSaint,代码行数:25,代码来源:test_django.py


示例17: test_simple_body

    def test_simple_body(self):
        self.failUnlessRaises(IOError, remoting.decode,
            '\x00\x00\x00\x00\x00\x01')

        msg = remoting.decode('\x00\x00\x00\x00\x00\x01\x00\x09test.test\x00'
            '\x02/1\x00\x00\x00\x14\x0a\x00\x00\x00\x01\x08\x00\x00\x00\x00'
            '\x00\x01\x61\x02\x00\x01\x61\x00\x00\x09')

        self.assertEquals(msg.amfVersion, 0)
        self.assertEquals(msg.clientType, 0)
        self.assertEquals(len(msg.headers), 0)
        self.assertEquals(len(msg), 1)
        self.assertTrue('/1' in msg)

        m = msg['/1']

        self.assertEquals(m.target, 'test.test')
        self.assertEquals(m.body, [{'a': 'a'}])

        y = [x for x in msg]

        self.assertEquals(len(y), 1)

        x = y[0]
        self.assertEquals(('/1', m), x)
开发者ID:danielor,项目名称:TowerSaint,代码行数:25,代码来源:test_remoting.py


示例18: __GetBrightCoveData

	def __GetBrightCoveData(self):
		""" Retrieves the Url's from a brightcove stream

		Arguments:
		playerKey : string - Key identifying the current request
		contentId : int    - ID of the content to retrieve
		url       : string - Url of the page that calls the video SWF
		seed      : string - Constant which depends on the website

		Keyword Arguments:
		experienceId : id     - <unknown parameter>

		Returns a dictionary with the data

		"""

		# Seed = 61773bc7479ab4e69a5214f17fd4afd21fe1987a
		envelope = self.__BuildBrightCoveAmfRequest(self.playerKey, self.contentId, self.url, self.experienceId, self.seed, self.contentRefId)

		if self.proxy:
			connection = httplib.HTTPConnection(self.proxy.Proxy, self.proxy.Port)
		else:
			connection = httplib.HTTPConnection("c.brightcove.com")
		print envelope
		jj = remoting.encode(envelope).read()			

		connection.request("POST", "http://c.brightcove.com/services/messagebroker/amf?playerKey=" + str(self.playerKey), str(remoting.encode(envelope).read()), {'content-type': 'application/x-amf'})
		response = connection.getresponse().read()
		#print 	response	
		response = remoting.decode(response).bodies[0][1].body
		print response
		if self.logger:
			self.logger.Trace(response)
		self.full_response = response
		return response['programmedContent']['videoPlayer']['mediaDTO']
开发者ID:boggob,项目名称:bogs-xbmc-addons,代码行数:35,代码来源:brightcovehelper.py


示例19: test_unknown_request

    def test_unknown_request(self):
        request = util.BufferedByteStream()
        request.write('\x00\x00\x00\x00\x00\x01\x00\x09test.test\x00'
            '\x02/1\x00\x00\x00\x14\x0a\x00\x00\x00\x01\x08\x00\x00\x00\x00'
            '\x00\x01\x61\x02\x00\x01\x61\x00\x00\x09')
        request.seek(0, 0)

        env = {
            'REQUEST_METHOD': 'POST',
            'CONTENT_LENGTH': str(len(request)),
            'wsgi.input': request
        }

        def start_response(status, headers):
            self.executed = True
            self.assertEquals(status, '200 OK')
            self.assertTrue(('Content-Type', 'application/x-amf') in headers)

        response = self.gw(env, start_response)
        envelope = remoting.decode(''.join(response))

        message = envelope['/1']

        self.assertEquals(message.status, remoting.STATUS_ERROR)
        body = message.body

        self.assertTrue(isinstance(body, remoting.ErrorFault))
        self.assertEquals(body.code, 'Service.ResourceNotFound')
        self.assertTrue(self.executed)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:29,代码来源:test_wsgi.py


示例20: test_timezone

    def test_timezone(self):
        import datetime

        self.executed = False

        td = datetime.timedelta(hours=-5)
        now = datetime.datetime.utcnow()

        def echo(d):
            self.assertEquals(d, now + td)
            self.executed = True

            return d

        self.gw.addService(echo)
        self.gw.timezone_offset = -18000

        msg = remoting.Envelope(amfVersion=pyamf.AMF0)
        msg['/1'] = remoting.Request(target='echo', body=[now])

        stream = remoting.encode(msg)

        env = {
            'REQUEST_METHOD': 'POST',
            'CONTENT_LENGTH': str(len(stream)),
            'wsgi.input': stream
        }

        response = self.gw(env, lambda *args: None)
        envelope = remoting.decode(''.join(response))
        message = envelope['/1']

        self.assertEquals(message.body, now)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:33,代码来源:test_wsgi.py



注:本文中的pyamf.remoting.decode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python remoting.encode函数代码示例发布时间:2022-05-25
下一篇:
Python pyamf.ClassAlias类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap