• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pyamf.get_decoder函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyamf.get_decoder函数的典型用法代码示例。如果您正苦于以下问题:Python get_decoder函数的具体用法?Python get_decoder怎么用?Python get_decoder使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_decoder函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_decode

    def test_decode(self):
        pyamf.register_class(ListModel, "list-model")

        decoder = pyamf.get_decoder(pyamf.AMF0)
        decoder.stream.write(
            "\x10\x00\nlist-model\x00\x07numbers\n\x00\x00"
            "\x00\x05\[email protected]\x00\x00\x00\x00\x00\x00\x00\[email protected]\x10\x00\x00\x00"
            "\x00\x00\x00\[email protected]\x18\x00\x00\x00\x00\x00\x00\[email protected] \x00\x00"
            "\x00\x00\x00\x00\[email protected]$\x00\x00\x00\x00\x00\x00\x00\x00\t"
        )
        decoder.stream.seek(0)

        x = decoder.readElement()

        self.assertTrue(isinstance(x, ListModel))
        self.assertTrue(hasattr(x, "numbers"))
        self.assertEquals(x.numbers, [2, 4, 6, 8, 10])

        decoder = pyamf.get_decoder(pyamf.AMF3)
        decoder.stream.write("\n\x0b\x15list-model\x0fnumbers\t\x0b\x01\x04\x02\x04" "\x04\x04\x06\x04\x08\x04\n\x01")
        decoder.stream.seek(0)

        x = decoder.readElement()

        self.assertTrue(isinstance(x, ListModel))
        self.assertTrue(hasattr(x, "numbers"))
        self.assertEquals(x.numbers, [2, 4, 6, 8, 10])
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:27,代码来源:test_google.py


示例2: test_get_decoder

    def test_get_decoder(self):
        self.assertRaises(ValueError, pyamf.get_decoder, 'spam')

        decoder = pyamf.get_decoder(pyamf.AMF0, stream='123', strict=True)
        self.assertEqual(decoder.stream.getvalue(), '123')
        self.assertTrue(decoder.strict)

        decoder = pyamf.get_decoder(pyamf.AMF3, stream='456', strict=True)
        self.assertEqual(decoder.stream.getvalue(), '456')
        self.assertTrue(decoder.strict)
开发者ID:0xmilk,项目名称:appscale,代码行数:10,代码来源:test_basic.py


示例3: __init__

 def __init__( self ):
     # Prepare the encoder and decoder
     self.encoder    = pyamf.get_encoder( self.encoding )
     self.ostream    = self.encoder.stream
     self.decoder    = pyamf.get_decoder( self.encoding )
     self.istream    = self.decoder.stream
     self.ipos       = 0
开发者ID:doublecluepon,项目名称:SwfConduit,代码行数:7,代码来源:protocol.py


示例4: test_amf0

    def test_amf0(self):
        d = pyamf.get_decoder(pyamf.AMF0)
        b = d.stream

        b.write(
            "\x10\x00\x03Pet\x00\x04_key\x02%s%s\x00\x04type\x02\x00\x03"
            "cat\x00\x10weight_in_pounds\[email protected]\x14\x00\x00\x00\x00\x00\x00\x00"
            "\x04name\x02\x00\x07Jessica\x00\tbirthdate\x0bB^\xc4\xae\xaa\x00"
            "\x00\x00\x00\x00\x00\x12spayed_or_neutered\x01\x00\x00\x00\t"
            % (struct.pack(">H", len(self.key)), self.key)
        )

        b.seek(0)
        x = d.readElement()

        self.assertTrue(isinstance(x, PetExpando))
        self.assertEquals(x.__class__, PetExpando)

        self.assertEquals(x.type, self.jessica.type)
        self.assertEquals(x.weight_in_pounds, self.jessica.weight_in_pounds)
        self.assertEquals(x.birthdate, datetime.date(1986, 10, 2))
        self.assertEquals(x.spayed_or_neutered, self.jessica.spayed_or_neutered)

        # now check db.Expando internals
        self.assertEquals(x.key(), self.jessica.key())
        self.assertEquals(x.kind(), self.jessica.kind())
        self.assertEquals(x.parent(), self.jessica.parent())
        self.assertEquals(x.parent_key(), self.jessica.parent_key())
        self.assertTrue(x.is_saved())
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:29,代码来源:test_google.py


示例5: test_amf3

    def test_amf3(self):
        d = pyamf.get_decoder(pyamf.AMF3)
        b = d.stream

        b.write(
            "\n\x0b\x07Pet\tname\x06\x0fJessica\t_key\x06%s%s\x13birthdate"
            "\x08\x01B^\xc4\xae\xaa\x00\x00\x00!weight_in_pounds\x04\x05\x07"
            "foo\x06\x07bar\ttype\x06\x07cat%%spayed_or_neutered\x02\x01"
            % (amf3._encode_int(len(self.key) << 1 | amf3.REFERENCE_BIT), self.key)
        )

        b.seek(0)
        x = d.readElement()

        self.assertTrue(isinstance(x, PetExpando))
        self.assertEquals(x.__class__, PetExpando)

        self.assertEquals(x.type, self.jessica.type)
        self.assertEquals(x.weight_in_pounds, self.jessica.weight_in_pounds)
        self.assertEquals(x.birthdate, datetime.date(1986, 10, 2))
        self.assertEquals(x.spayed_or_neutered, self.jessica.spayed_or_neutered)

        # now check db.Expando internals
        self.assertEquals(x.key(), self.jessica.key())
        self.assertEquals(x.kind(), self.jessica.kind())
        self.assertEquals(x.parent(), self.jessica.parent())
        self.assertEquals(x.parent_key(), self.jessica.parent_key())
        self.assertTrue(x.is_saved())
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:28,代码来源:test_google.py


示例6: send

 def send(self, message):
     self.sends.append(message)
     # tell server we started listening
     logging.debug('send:  request: %s' % message.__repr__())
     encoder = pyamf.get_encoder(object_encoding)
     encoder.writeElement(message)
     message = encoder.stream.getvalue()
     encoder.stream.truncate()
     logging.debug('send:  encoded request: %s' % message.__repr__())
     send_log = encoder.stream.tell().__repr__() \
             + encoder.stream.__repr__()
     logging.debug('send; ' + send_log)
     decoder = pyamf.get_decoder(object_encoding, message)
     #if not isinstance(stream, util.BufferedByteStream):
     #    stream = util.BufferedByteStream(stream)
     logging.debug('send: ' 
             + decoder.stream.tell().__repr__() 
             + decoder.stream.__repr__())
     data = decoder.readElement()
     logging.debug('send:  decoded %s' % data.__repr__())
     try:
         #total_sent = 0
         #while total_sent < len(message):
             sent = self.sock.send(message)
             if sent == 0:
                 raise RuntimeError, \
                     "socket connection broken"
         #    total_sent += sent
     except socket.error, e:
         raise Exception("Can't connect: %s" % e[1])
开发者ID:ethankennerly,项目名称:hotel-vs-gozilla,代码行数:30,代码来源:go_board_client.py


示例7: listen

def listen(envoy):
    response = ''
    # http://bytes.com/topic/python/answers/22953-how-catch-socket-timeout
    try:
        chunk = envoy.recv(1024)
        if '' == chunk:
            error_message = 'RuntimeError socket connection broken'
            logging.error('listen', error_message)
        response += chunk
    except socket.timeout:
        logging.error('listen timeout')
        logging.error('listen %s' % response.__repr__())
        return 'timeout'
    except socket.error:
        import sys
        error_number, error_string = sys.exc_info()[:2]
        error_message = 'socket error %i:  "%s"' \
                    % (error_number, error_string)
        logging.error('listen', error_message)
        return error_message
    decoder = pyamf.get_decoder(object_encoding, response)
    #if not isinstance(stream, util.BufferedByteStream):
    #    stream = util.BufferedByteStream(stream)
    logging.debug('listen: response: ' + response.__repr__())
    logging.debug('listen: stream: ' + decoder.stream.__repr__())
    data = decoder.readElement()
    logging.debug('listen decoded %s' % data.__repr__())
    return data
开发者ID:ethankennerly,项目名称:hotel-vs-gozilla,代码行数:28,代码来源:go_board_client.py


示例8: decode

    def decode(self, buf):
        """
        Decode a notification message.
        """
        decoder = pyamf.get_decoder(pyamf.AMF0, stream=buf)

        self.name = decoder.next()
        self.argv = [x for x in decoder]
开发者ID:Arlex,项目名称:rtmpy,代码行数:8,代码来源:message.py


示例9: _amf3_decoder

    def _amf3_decoder(self):
        decoder = getattr(self, "__amf3_decoder", None)

        if not decoder:
            decoder = pyamf.get_decoder(pyamf.AMF3, stream=self.stream, timezone_offset=self.timezone_offset)
            self.__amf3_decoder = decoder

        return decoder
开发者ID:Armedite,项目名称:xbmc-catchuptv-au,代码行数:8,代码来源:amf0.py


示例10: test_pure_decoder

    def test_pure_decoder(self):
        """
        With `use_ext=False` specified, the extension must NOT be returned.
        """
        from pyamf import amf3

        decoder = pyamf.get_decoder(pyamf.AMF3, use_ext=False)

        self.assertIsInstance(decoder, amf3.Decoder)
开发者ID:nervatura,项目名称:nerva2py,代码行数:9,代码来源:test_basic.py


示例11: test_encode

    def test_encode(self):
        encoder = pyamf.get_encoder(pyamf.AMF0)
        decoder = pyamf.get_decoder(pyamf.AMF0)
        decoder.stream = encoder.stream

        try:
            raise TypeError, "unknown type"
        except TypeError, e:
            encoder.writeElement(amf0.build_fault(*sys.exc_info()))
开发者ID:wayne-abarquez,项目名称:vizzuality,代码行数:9,代码来源:test_gateway.py


示例12: test_encode_decode_transient

    def test_encode_decode_transient(self):
        user = self._build_obj()

        encoder = pyamf.get_encoder(pyamf.AMF3)
        encoder.writeElement(user)
        encoded = encoder.stream.getvalue()
        decoded = pyamf.get_decoder(pyamf.AMF3, encoded).readElement()

        self._test_obj(user, decoded)
开发者ID:kruser,项目名称:zualoo,代码行数:9,代码来源:test_sqlalchemy.py


示例13: decode

def decode(stream, strict=True):
    """
    Decodes a SOL stream. L{strict} mode ensures that the sol stream is as spec
    compatible as possible.

    @return: A C{tuple} containing the C{root_name} and a C{dict} of name,
        value pairs.
    """
    if not isinstance(stream, util.BufferedByteStream):
        stream = util.BufferedByteStream(stream)

    # read the version
    version = stream.read(2)
    if version != HEADER_VERSION:
        raise pyamf.DecodeError('Unknown SOL version in header')

    # read the length
    length = stream.read_ulong()

    if strict and stream.remaining() != length:
        raise pyamf.DecodeError('Inconsistent stream header length')

    # read the signature
    signature = stream.read(10)

    if signature != HEADER_SIGNATURE:
        raise pyamf.DecodeError('Invalid signature')

    length = stream.read_ushort()
    root_name = stream.read_utf8_string(length)

    # read padding
    if stream.read(3) != PADDING_BYTE * 3:
        raise pyamf.DecodeError('Invalid padding read')

    decoder = pyamf.get_decoder(stream.read_uchar())
    decoder.stream = stream

    values = {}

    while True:
        if stream.at_eof():
            break

        name = decoder.readString()
        value = decoder.readElement()

        # read the padding
        t = stream.read(1) 
        while t != PADDING_BYTE:
        #    raise pyamf.DecodeError('Missing padding byte')
            sys.stderr.write('Bad padding byte: 0x02%x\n' % ord(t))
            t = stream.read(1) 

        values[name] = value

    return (root_name, values)
开发者ID:Averroes,项目名称:raft,代码行数:57,代码来源:sol.py


示例14: setUp

    def setUp(self):
        BaseTestCase.setUp(self)

        self.alias = adapter.DataStoreClassAlias(models.PetModel, "foo.bar")

        self.jessica = models.PetModel(name="Jessica", type="cat")
        self.jessica_expando = models.PetExpando(name="Jessica", type="cat")
        self.jessica_expando.foo = "bar"

        self.decoder = pyamf.get_decoder(pyamf.AMF3)
开发者ID:Poorvak,项目名称:twitter_clone,代码行数:10,代码来源:test_xdb.py


示例15: test_none

    def test_none(self):
        pyamf.register_class(ListModel, "list-model")

        decoder = pyamf.get_decoder(pyamf.AMF0)
        decoder.stream.write("\x10\x00\nlist-model\x00\x07numbers\x05\x00\x00\t")
        decoder.stream.seek(0)

        x = decoder.readElement()

        self.assertEquals(x.numbers, [])
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:10,代码来源:test_google.py


示例16: setUp

    def setUp(self):
        BaseTestCase.setUp(self)

        class FloatModel(db.Model):
            f = db.FloatProperty()

        self.klass = FloatModel
        self.f = FloatModel()
        self.alias = adapter.DataStoreClassAlias(self.klass, None)
        self.decoder = pyamf.get_decoder(pyamf.AMF3)
开发者ID:nervatura,项目名称:nerva2py,代码行数:10,代码来源:test_xdb.py


示例17: test_get_decoder

    def test_get_decoder(self):
        from pyamf import amf0, amf3

        self.assertEquals(pyamf._get_decoder_class(pyamf.AMF0), amf0.Decoder)
        self.assertEquals(pyamf._get_decoder_class(pyamf.AMF3), amf3.Decoder)
        self.assertRaises(ValueError, pyamf._get_decoder_class, 'spam')

        self.assertTrue(isinstance(pyamf.get_decoder(pyamf.AMF0), amf0.Decoder))
        self.assertTrue(isinstance(pyamf.get_decoder(pyamf.AMF3), amf3.Decoder))
        self.assertRaises(ValueError, pyamf.get_decoder, 'spam')

        context = amf0.Context()
        decoder = pyamf.get_decoder(pyamf.AMF0, data='123', context=context)
        self.assertEquals(decoder.stream.getvalue(), '123')
        self.assertEquals(decoder.context, context)

        context = amf3.Context()
        decoder = pyamf.get_decoder(pyamf.AMF3, data='456', context=context)
        self.assertEquals(decoder.stream.getvalue(), '456')
        self.assertEquals(decoder.context, context)
开发者ID:fernandoacorreia,项目名称:flex-and-python-test,代码行数:20,代码来源:test_basic.py


示例18: getAMF3Decoder

    def getAMF3Decoder(self, amf0_decoder):
        decoder = self.extra.get('amf3_decoder', None)

        if decoder:
            return decoder

        decoder = pyamf.get_decoder(pyamf.AMF3, stream=amf0_decoder.stream,
            timezone_offset=amf0_decoder.timezone_offset)
        self.extra['amf3_decoder'] = decoder

        return decoder
开发者ID:Medisan,项目名称:TVWeb,代码行数:11,代码来源:amf0.py


示例19: test_encode_decode_persistent

    def test_encode_decode_persistent(self):
        user = self._build_obj()
        self.session.save(user)
        self.session.commit()
        self.session.refresh(user)

        encoder = pyamf.get_encoder(pyamf.AMF3)
        encoder.writeElement(user)
        encoded = encoder.stream.getvalue()
        decoded = pyamf.get_decoder(pyamf.AMF3, encoded).readElement()

        self._test_obj(user, decoded)
开发者ID:kruser,项目名称:zualoo,代码行数:12,代码来源:test_sqlalchemy.py


示例20: test_ext_decoder

    def test_ext_decoder(self):
        """
        With `use_ext=True` specified, the extension must be returned.
        """
        try:
            from cpyamf import amf3
        except ImportError:
            self.skipTest('amf3 extension not available')

        decoder = pyamf.get_decoder(pyamf.AMF3, use_ext=True)

        self.assertIsInstance(decoder, amf3.Decoder)
开发者ID:nervatura,项目名称:nerva2py,代码行数:12,代码来源:test_basic.py



注:本文中的pyamf.get_decoder函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pyamf.get_encoder函数代码示例发布时间:2022-05-25
下一篇:
Python pyamf.get_class_alias函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap