• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python packet.decode函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中socketio.packet.decode函数的典型用法代码示例。如果您正苦于以下问题:Python decode函数的具体用法?Python decode怎么用?Python decode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了decode函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_decode_error

    def test_decode_error(self):
        """decoding error packet """
        decoded_message = decode('7:::')
        self.assertEqual(decoded_message, {'type': 'error',
                                           'reason': '',
                                           'advice': '',
                                           'endpoint': ''})

        decoded_message = decode('7:::0')
        self.assertEqual(decoded_message, {'type': 'error',
                                           'reason': 'transport not supported',
                                           'advice': '',
                                           'endpoint': ''})

        # decoding error packet with reason and advice
        decoded_message = decode('7:::2+0')
        self.assertEqual(decoded_message, {'type': 'error',
                                           'reason': 'unauthorized',
                                           'advice': 'reconnect',
                                           'endpoint': ''})

        # decoding error packet with endpoint
        decoded_message = decode('7::/woot')
        self.assertEqual(decoded_message, {'type': 'error',
                                           'reason': '',
                                           'advice': '',
                                           'endpoint': '/woot'})
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:27,代码来源:test_packet.py


示例2: test_decode_message

    def test_decode_message(self):
        """decoding a message packet """
        decoded_message = decode('3:::woot')
        self.assertEqual(decoded_message, {'type': 'message',
                                           'endpoint': '',
                                           'data': 'woot'})

        # decoding a message packet with id and endpoint
        decoded_message = decode('3:5:/tobi')
        self.assertEqual(decoded_message, {'type': 'message',
                                           'id': 5,
                                           'ack': True,
                                           'endpoint': '/tobi',
                                           'data': ''})
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:14,代码来源:test_packet.py


示例3: test_decode_json

    def test_decode_json(self):
        """decoding json packet """
        decoded_message = decode('4:::"2"')
        self.assertEqual(decoded_message, {'type': 'json',
                                           'endpoint': '',
                                           'data': '2'})

        # decoding json packet with message id and ack data
        decoded_message = decode('4:1+::{"a":"b"}')
        self.assertEqual(decoded_message, {'type': 'json',
                                           'id': 1,
                                           'endpoint': '',
                                           'ack': 'data',
                                           'data': {u'a': u'b'}})
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:14,代码来源:test_packet.py


示例4: test_decode_connect

    def test_decode_connect(self):
        """decoding a connection packet """
        decoded_message = decode('1::/tobi')
        self.assertEqual(decoded_message, {'type': 'connect',
                                           'endpoint': '/tobi',
                                           'qs': ''
                                           })

        # decoding a connection packet with query string
        decoded_message = decode('1::/test:?test=1')
        self.assertEqual(decoded_message, {'type': 'connect',
                                           'endpoint': '/test',
                                           'qs': '?test=1'
                                           })
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:14,代码来源:test_packet.py


示例5: _receiver_loop

    def _receiver_loop(self):
        """This is the loop that takes messages from the queue for the server
        to consume, decodes them and dispatches them.
        """

        while True:
            rawdata = self.get_server_msg()

            if not rawdata:
                continue  # or close the connection ?
            try:
                pkt = packet.decode(rawdata, self.json_loads)
            except (ValueError, KeyError, Exception), e:
                self.error('invalid_packet',
                    "There was a decoding error when dealing with packet "
                    "with event: %s... (%s)" % (rawdata[:20], e))
                continue

            if pkt['type'] == 'heartbeat':
                # This is already dealth with in put_server_msg() when
                # any incoming raw data arrives.
                continue

            if pkt['type'] == 'disconnect' and pkt['endpoint'] == '':
                # On global namespace, we kill everything.
                self.kill()
                continue

            endpoint = pkt['endpoint']

            if endpoint not in self.namespaces:
                self.error("no_such_namespace",
                    "The endpoint you tried to connect to "
                    "doesn't exist: %s" % endpoint, endpoint=endpoint)
                continue
            elif endpoint in self.active_ns:
                pkt_ns = self.active_ns[endpoint]
            else:
                new_ns_class = self.namespaces[endpoint]
                pkt_ns = new_ns_class(self.environ, endpoint,
                                        request=self.request)
                pkt_ns.initialize()  # use this instead of __init__,
                                     # for less confusion

                self.active_ns[endpoint] = pkt_ns

            retval = pkt_ns.process_packet(pkt)

            # Has the client requested an 'ack' with the reply parameters ?
            if pkt.get('ack') == "data" and pkt.get('id'):
                returning_ack = dict(type='ack', ackId=pkt['id'],
                                     args=retval,
                                     endpoint=pkt.get('endpoint', ''))
                self.send_packet(returning_ack)

            # Now, are we still connected ?
            if not self.connected:
                self.kill()  # ?? what,s the best clean-up when its not a
                             # user-initiated disconnect
                return
开发者ID:ianjuma,项目名称:gevent-socketio,代码行数:60,代码来源:virtsocket.py


示例6: test_decode_event_error

 def test_decode_event_error(self):
     """decoding an event packet """
     decoded_message = decode('5:::')
     self.assertEqual(decoded_message, {'args': [],
                                         'type': 'event',
                                        'endpoint': '',
                                        })
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:7,代码来源:test_packet.py


示例7: test_decode_event

    def test_decode_event(self):
        """decoding an event packet """
        decoded_message = decode('5:::{"name":"woot"}')
        self.assertEqual(decoded_message, {'type': 'event',
                                           'name': 'woot',
                                           'endpoint': '',
                                           'args': []})

        # decoding an event packet with message id and ack
        decoded_message = decode('5:1+::{"name":"tobi"}')
        self.assertEqual(decoded_message, {'type': 'event',
                                           'id': 1,
                                           'ack': 'data',
                                           'name': 'tobi',
                                           'endpoint': '',
                                           'args': []})
开发者ID:dhepper,项目名称:gevent-socketio,代码行数:16,代码来源:test_packet.py


示例8: test_decode_ack

 def test_decode_ack(self):
     """decoding a ack packet """
     decoded_message = decode('6:::140')
     self.assertEqual(decoded_message, {'type': 'ack',
                                        'ackId': 140,
                                        'endpoint': '',
                                        'args': []})
开发者ID:comel,项目名称:gevent-socketio,代码行数:7,代码来源:test_packet.py


示例9: test_except_on_invalid_message_type

 def test_except_on_invalid_message_type(self):
     """decoding a noop packet """
     try:
         decoded_message = decode('99::')
     except Exception as e:
         self.assertEqual(e.message, "Unknown message type: 99")
     else:
         self.assertEqual(decoded_message, None,
                 "We should not get a valid message")
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:9,代码来源:test_packet.py


示例10: test_decode_ack

    def test_decode_ack(self):
        """decoding a ack packet """
        decoded_message = decode('6:::140')
        self.assertEqual(decoded_message, {'type': 'ack',
                                           'ackId': 140,
                                           'endpoint': '',
                                           'args': []})
        
        # Decode with endpoint
        decoded_message = decode('6::/chat:140')
        self.assertEqual(decoded_message, {'type': 'ack',
                                           'ackId': 140,
                                           'endpoint': '/chat',
                                           'args': []})

        # With args
        decoded_message = decode('6::/chat:140+["bob", "bob2"]')
        self.assertEqual(decoded_message, {'type': 'ack',
                                           'ackId': 140,
                                           'endpoint': '/chat',
                                           'args': [u"bob", u"bob2"]})
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:21,代码来源:test_packet.py


示例11: _receiver_loop

    def _receiver_loop(self):
        """This is the loop that takes messages from the queue for the server
        to consume, decodes them and dispatches them.
        """

        while True:
            rawdata = self.get_server_msg()

            if not rawdata:
                continue  # or close the connection ?
            try:
                pkt = packet.decode(rawdata)
            except (ValueError, KeyError, Exception), e:
                self.error('invalid_packet', "There was a decoding error when dealing with packet with event: %s... (%s)" % (rawdata[:20], e))
                continue

            if pkt['type'] == 'heartbeat':
                # This is already dealth with in put_server_msg() when
                # any incoming raw data arrives.
                continue

            endpoint = pkt['endpoint']

            if endpoint not in self.namespaces:
                self.error("no_such_namespace", "The endpoint you tried to connect to doesn't exist: %s" % endpoint, endpoint=endpoint)
                continue
            elif endpoint in self.active_ns:
                pkt_ns = self.active_ns[endpoint]
            else:
                new_ns_class = self.namespaces[endpoint]
                pkt_ns = new_ns_class(self.environ, endpoint,
                                        request=self.request)

                # Fire the initialize function for the namespace
                # This call ISN'T protected by ACLs! Beware!
                pkt_ns.call_method('initialize', pkt, pkt)

                self.active_ns[endpoint] = pkt_ns

            pkt_ns.process_packet(pkt)

            # Now, are we still connected ?
            if not self.connected:
                self.kill()  # ?? what,s the best clean-up when its not a
                             # user-initiated disconnect
                return
开发者ID:ojii,项目名称:gevent-socketio,代码行数:46,代码来源:virtsocket.py


示例12: test_decode_noop

 def test_decode_noop(self):
     """decoding a noop packet """
     decoded_message = decode('8::')
     self.assertEqual(decoded_message, {'type': 'noop',
                                         'endpoint': ''
                                         })
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:6,代码来源:test_packet.py


示例13: test_decode_new_line

 def test_decode_new_line(self):
     """test decoding newline """
     decoded_message = decode('3:::\n')
     self.assertEqual(decoded_message, {'type': 'message',
                                        'data': '\n',
                                        'endpoint': ''})
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:6,代码来源:test_packet.py


示例14: test_decode_heartbeat

 def test_decode_heartbeat(self):
     """decoding a heartbeat packet """
     decoded_message = decode('2:::')
     self.assertEqual(decoded_message, {'type': 'heartbeat',
                                        'endpoint': ''
                                        })
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:6,代码来源:test_packet.py


示例15: test_decode_deconnect

 def test_decode_deconnect(self):
     """decoding a disconnection packet """
     decoded_message = decode('0::/woot')
     self.assertEqual(decoded_message, {'type': 'disconnect',
                                        'endpoint': '/woot'
                                        })
开发者ID:BenoitNorrin,项目名称:gevent-socketio,代码行数:6,代码来源:test_packet.py


示例16: _receiver_loop

    def _receiver_loop(self):
        """This is the loop that takes messages from the queue for the server
        to consume, decodes them and dispatches them.

        It is the main loop for a socket.  We join on this process before
        returning control to the web framework.

        This process is not tracked by the socket itself, it is not going
        to be killed by the ``gevent.killall(socket.jobs)``, so it must
        exit gracefully itself.
        """

        while True:
            rawdata = self.get_server_msg()

            if not rawdata:
                continue  # or close the connection ?
            try:
                pkt = packet.decode(rawdata, self.json_loads)
            except (ValueError, KeyError, Exception), e:
                self.error('invalid_packet',
                    "There was a decoding error when dealing with packet "
                    "with event: %s... (%s)" % (rawdata[:20], e))
                continue

            if pkt['type'] == 'heartbeat':
                # This is already dealt with in put_server_msg() when
                # any incoming raw data arrives.
                continue

            if pkt['type'] == 'disconnect' and pkt['endpoint'] == '':
                # On global namespace, we kill everything.
                self.kill(detach=True)
                continue
            
            with self.manager.lock_socket(self.sessid):
                endpoint = pkt['endpoint']
    
                if endpoint not in self.namespaces:
                    self.error("no_such_namespace",
                        "The endpoint you tried to connect to "
                        "doesn't exist: %s" % endpoint, endpoint=endpoint)
                    continue
                elif endpoint in self.active_ns:
                    pkt_ns = self.active_ns[endpoint]
                else:
                    pkt_ns = self.add_namespace(endpoint)
                    
                retval = pkt_ns.process_packet(pkt)
    
                # Has the client requested an 'ack' with the reply parameters ?
                if pkt.get('ack') == "data" and pkt.get('id'):
                    if type(retval) is tuple:
                        args = list(retval)
                    else:
                        args = [retval]
                    returning_ack = dict(type='ack', ackId=pkt['id'],
                                         args=args,
                                         endpoint=pkt.get('endpoint', ''))
                    self.send_packet(returning_ack)
    
                # Now, are we still connected ?
                if not self.connected:
                    self.kill(detach=True)  # ?? what,s the best clean-up
                                            # when its not a
                                            # user-initiated disconnect
                    return
开发者ID:quyetnd-parlayz,项目名称:gevent-socketio,代码行数:67,代码来源:virtsocket.py



注:本文中的socketio.packet.decode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python packet.encode函数代码示例发布时间:2022-05-27
下一篇:
Python socketio.socketio_manage函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap