• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python compat.iterbytes函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.python.compat.iterbytes函数的典型用法代码示例。如果您正苦于以下问题:Python iterbytes函数的具体用法?Python iterbytes怎么用?Python iterbytes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了iterbytes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: verifyResults

    def verifyResults(self, transport, proto, parser):
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)

        for char in iterbytes(b'[email protected]#'):
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (char, None))
            self.assertEqual(occurrences(result), [])

        for char in iterbytes(b'abc123'):
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (char, parser.ALT))
            self.assertEqual(occurrences(result), [])

        occs = occurrences(proto)
        self.assertFalse(occs, "%r should have been []" % (occs,))
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:13,代码来源:test_insults.py


示例2: testCharacterSet

    def testCharacterSet(self):
        self.parser.dataReceived(
            b''.join(
                [b''.join([b'\x1b' + g + n for n in iterbytes(b'AB012')])
                    for g in iterbytes(b'()')
                ]))
        occs = occurrences(self.proto)

        for which in (G0, G1):
            for charset in (CS_UK, CS_US, CS_DRAWING, CS_ALTERNATE, CS_ALTERNATE_SPECIAL):
                result = self.assertCall(occs.pop(0), "selectCharacterSet", (charset, which))
                self.assertFalse(occurrences(result))
        self.assertFalse(occs)
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:13,代码来源:test_insults.py


示例3: dataReceived

 def dataReceived(self, data):
     for ch in iterbytes(data):
         if self.state == b'data':
             if ch == b'\x1b':
                 self.state = b'escaped'
             else:
                 self.terminalProtocol.keystrokeReceived(ch, None)
         elif self.state == b'escaped':
             if ch == b'[':
                 self.state = b'bracket-escaped'
                 self.escBuf = []
             elif ch == b'O':
                 self.state = b'low-function-escaped'
             else:
                 self.state = b'data'
                 self._handleShortControlSequence(ch)
         elif self.state == b'bracket-escaped':
             if ch == b'O':
                 self.state = b'low-function-escaped'
             elif ch.isalpha() or ch == b'~':
                 self._handleControlSequence(b''.join(self.escBuf) + ch)
                 del self.escBuf
                 self.state = b'data'
             else:
                 self.escBuf.append(ch)
         elif self.state == b'low-function-escaped':
             self._handleLowFunctionControlSequence(ch)
             self.state = b'data'
         else:
             raise ValueError("Illegal state")
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:30,代码来源:insults.py


示例4: __repr__

 def __repr__(self):
     """
     Return a pretty representation of this object.
     """
     lines = [
         '<%s %s (%s bits)' % (
             nativeString(self.type()),
             self.isPublic() and 'Public Key' or 'Private Key',
             self._keyObject.key_size)]
     for k, v in sorted(self.data().items()):
         if _PY3 and isinstance(k, bytes):
             k = k.decode('ascii')
         lines.append('attr %s:' % (k,))
         by = common.MP(v)[4:]
         while by:
             m = by[:15]
             by = by[15:]
             o = ''
             for c in iterbytes(m):
                 o = o + '%02x:' % (ord(c),)
             if len(m) < 15:
                 o = o[:-1]
             lines.append('\t' + o)
     lines[-1] = lines[-1] + '>'
     return '\n'.join(lines)
开发者ID:daweasel27,项目名称:PhobiaEnemy,代码行数:25,代码来源:keys.py


示例5: test_delete

    def test_delete(self):
        """
        When L{HistoricRecvLine} receives a DELETE keystroke, it
        delets the character immediately after the cursor.
        """
        kR = lambda ch: self.p.keystrokeReceived(ch, None)

        for ch in iterbytes(b'xyz'):
            kR(ch)
        self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))

        kR(self.pt.DELETE)
        self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))

        kR(self.pt.LEFT_ARROW)
        kR(self.pt.DELETE)
        self.assertEqual(self.p.currentLineBuffer(), (b'xy', b''))

        kR(self.pt.LEFT_ARROW)
        kR(self.pt.DELETE)
        self.assertEqual(self.p.currentLineBuffer(), (b'x', b''))

        kR(self.pt.LEFT_ARROW)
        kR(self.pt.DELETE)
        self.assertEqual(self.p.currentLineBuffer(), (b'', b''))

        kR(self.pt.DELETE)
        self.assertEqual(self.p.currentLineBuffer(), (b'', b''))
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:28,代码来源:test_recvline.py


示例6: test_socks4aFailedResolution

    def test_socks4aFailedResolution(self):
        """
        Failed hostname resolution on a SOCKSv4a packet results in a 91 error
        response and the connection getting closed.
        """
        # send the domain name "failinghost" to be resolved
        clientRequest = (
            struct.pack('!BBH', 4, 2, 34)
            + socket.inet_aton('0.0.0.1')
            + b'fooBAZ\0'
            + b'failinghost\0')

        # Deliver the bytes one by one to exercise the protocol's buffering
        # logic. FakeResolverReactor's resolve method is invoked to "resolve"
        # the hostname.
        for byte in iterbytes(clientRequest):
            self.sock.dataReceived(byte)

        # Verify that the server responds with a 91 error.
        sent = self.sock.transport.value()
        self.assertEqual(
            sent,
            struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))

        # A failed resolution causes the transport to drop the connection.
        self.assertTrue(self.sock.transport.stringTCPTransport_closing)
        self.assertIsNone(self.sock.driver_outgoing)
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:27,代码来源:test_socks.py


示例7: _toString_OPENSSH

    def _toString_OPENSSH(self, extra):
        """
        Return a public or private OpenSSH string.  See
        _fromString_PUBLIC_OPENSSH and _fromString_PRIVATE_OPENSSH for the
        string formats.  If extra is present, it represents a comment for a
        public key, or a passphrase for a private key.

        @param extra: Comment for a public key or passphrase for a
            private key
        @type extra: L{bytes}

        @rtype: L{bytes}
        """
        data = self.data()
        if self.isPublic():
            b64Data = base64.encodestring(self.blob()).replace(b'\n', b'')
            if not extra:
                extra = b''
            return (self.sshType() + b' ' + b64Data + b' ' + extra).strip()
        else:
            lines = [b''.join((b'-----BEGIN ', self.type().encode('ascii'),
                               b' PRIVATE KEY-----'))]
            if self.type() == 'RSA':
                p, q = data['p'], data['q']
                objData = (0, data['n'], data['e'], data['d'], q, p,
                           data['d'] % (q - 1), data['d'] % (p - 1),
                           data['u'])
            else:
                objData = (0, data['p'], data['q'], data['g'], data['y'],
                           data['x'])
            asn1Sequence = univ.Sequence()
            for index, value in izip(itertools.count(), objData):
                asn1Sequence.setComponentByPosition(index, univ.Integer(value))
            asn1Data = berEncoder.encode(asn1Sequence)
            if extra:
                iv = randbytes.secureRandom(8)
                hexiv = ''.join(['%02X' % (ord(x),) for x in iterbytes(iv)])
                hexiv = hexiv.encode('ascii')
                lines.append(b'Proc-Type: 4,ENCRYPTED')
                lines.append(b'DEK-Info: DES-EDE3-CBC,' + hexiv + b'\n')
                ba = md5(extra + iv).digest()
                bb = md5(ba + extra + iv).digest()
                encKey = (ba + bb)[:24]
                padLen = 8 - (len(asn1Data) % 8)
                asn1Data += (chr(padLen) * padLen).encode('ascii')

                encryptor = Cipher(
                    algorithms.TripleDES(encKey),
                    modes.CBC(iv),
                    backend=default_backend()
                ).encryptor()

                asn1Data = encryptor.update(asn1Data) + encryptor.finalize()

            b64Data = base64.encodestring(asn1Data).replace(b'\n', b'')
            lines += [b64Data[i:i + 64] for i in range(0, len(b64Data), 64)]
            lines.append(b''.join((b'-----END ', self.type().encode('ascii'),
                                   b' PRIVATE KEY-----')))
            return b'\n'.join(lines)
开发者ID:daweasel27,项目名称:PhobiaEnemy,代码行数:59,代码来源:keys.py


示例8: test_socks4a

    def test_socks4a(self):
        """
        If the destination IP address has zeros for the first three octets and
        non-zero for the fourth octet, the client is attempting a v4a
        connection.  A hostname is specified after the user ID string and the
        server connects to the address that hostname resolves to.

        @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
        """
        # send the domain name "localhost" to be resolved
        clientRequest = (
            struct.pack('!BBH', 4, 2, 34)
            + socket.inet_aton('0.0.0.1')
            + b'fooBAZ\0'
            + b'localhost\0')

        # Deliver the bytes one by one to exercise the protocol's buffering
        # logic. FakeResolverReactor's resolve method is invoked to "resolve"
        # the hostname.
        for byte in iterbytes(clientRequest):
            self.sock.dataReceived(byte)

        sent = self.sock.transport.value()
        self.sock.transport.clear()

        # Verify that the server responded with the address which will be
        # connected to.
        self.assertEqual(
            sent,
            struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9'))
        self.assertFalse(self.sock.transport.stringTCPTransport_closing)
        self.assertIsNotNone(self.sock.driver_listen)

        # connect
        incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345))
        self.assertIsNotNone(incoming)
        incoming.transport = StringTCPTransport()
        incoming.connectionMade()

        # now we should have the second reply packet
        sent = self.sock.transport.value()
        self.sock.transport.clear()
        self.assertEqual(sent,
                         struct.pack('!BBH', 0, 90, 0)
                         + socket.inet_aton('0.0.0.0'))
        self.assertIsNot(
            self.sock.transport.stringTCPTransport_closing, None)

        # Deliver some data from the output connection and verify it is
        # passed along to the incoming side.
        self.sock.dataReceived(b'hi there')
        self.assertEqual(incoming.transport.value(), b'hi there')

        # the other way around
        incoming.dataReceived(b'hi there')
        self.assertEqual(self.sock.transport.value(), b'hi there')

        self.sock.connectionLost('fake reason')
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:58,代码来源:test_socks.py


示例9: feed

    def feed(self, data):
        """
        Feed the data byte per byte to the receiver.

        @param data: The bytes to deliver.
        @type data: L{bytes}
        """
        for byte in iterbytes(data):
            self.enc.dataReceived(byte)
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:9,代码来源:test_banana.py


示例10: test_partial

 def test_partial(self):
     """
     Send partial data, nothing should be definitely received.
     """
     for s in self.partialStrings:
         r = self.getProtocol()
         for c in iterbytes(s):
             r.dataReceived(c)
         self.assertEqual(r.received, [])
开发者ID:shelmesky,项目名称:twisted,代码行数:9,代码来源:test_basic.py


示例11: test_illegal

 def test_illegal(self):
     """
     Assert that illegal strings cause the transport to be closed.
     """
     for s in self.illegalStrings:
         r = self.getProtocol()
         for c in iterbytes(s):
             r.dataReceived(c)
         self.assertTrue(r.transport.disconnecting)
开发者ID:shelmesky,项目名称:twisted,代码行数:9,代码来源:test_basic.py


示例12: test_iteration

 def test_iteration(self):
     """
     When L{iterbytes} is called with a bytestring, the returned object
     can be iterated over, resulting in the individual bytes of the
     bytestring.
     """
     input = b"abcd"
     result = list(iterbytes(input))
     self.assertEqual(result, [b'a', b'b', b'c', b'd'])
开发者ID:esabelhaus,项目名称:secret-octo-dubstep,代码行数:9,代码来源:test_compat.py


示例13: write

    def write(self, data):
        """
        Add the given printable bytes to the terminal.

        Line feeds in L{bytes} will be replaced with carriage return / line
        feed pairs.
        """
        for b in iterbytes(data.replace(b'\n', b'\r\n')):
            self.insertAtCursor(b)
开发者ID:alfonsjose,项目名称:international-orders-app,代码行数:9,代码来源:helper.py


示例14: test_receive

 def test_receive(self):
     """
     Test receiving data find the same data send.
     """
     r = self.getProtocol()
     for s in self.strings:
         for c in iterbytes(struct.pack(r.structFormat, len(s)) + s):
             r.dataReceived(c)
     self.assertEqual(r.received, self.strings)
开发者ID:shelmesky,项目名称:twisted,代码行数:9,代码来源:test_basic.py


示例15: test_buffer

 def test_buffer(self):
     """
     Test buffering over line protocol: data received should match buffer.
     """
     t = proto_helpers.StringTransport()
     a = LineOnlyTester()
     a.makeConnection(t)
     for c in iterbytes(self.buffer):
         a.dataReceived(c)
     self.assertEqual(a.received, self.buffer.split(b"\n")[:-1])
开发者ID:shelmesky,项目名称:twisted,代码行数:10,代码来源:test_basic.py


示例16: test_dataNotFullyReceived

 def test_dataNotFullyReceived(self):
     """
     Since the initial rule inside the grammar is not matched, the receiver
     shouldn't receive any byte.
     """
     receiver = TrampolinedReceiver()
     trampolinedParser = TrampolinedParser(self.grammar, receiver, {})
     buf = b'foobarandnotreachdelimiter'
     for c in iterbytes(buf):
         trampolinedParser.receive(c)
     self.assertEqual(receiver.received, [])
开发者ID:bjonnh,项目名称:parsley,代码行数:11,代码来源:test_tube.py


示例17: test_currentRuleWithArgs

 def test_currentRuleWithArgs(self):
     """
     TrampolinedParser should be able to invoke curruent rule with args.
     """
     receiver = TrampolinedReceiver()
     receiver.currentRule = "witharg", "nice ", "day"
     trampolinedParser = TrampolinedParser(self.grammar, receiver, {})
     buf = b' oh yes\r\n'
     for c in iterbytes(buf):
         trampolinedParser.receive(c)
     self.assertEqual(receiver.received, ["nice day oh yes"])
开发者ID:bjonnh,项目名称:parsley,代码行数:11,代码来源:test_tube.py


示例18: test_dataFullyReceived

 def test_dataFullyReceived(self):
     """
     The receiver should receive the data according to the grammar.
     """
     receiver = TrampolinedReceiver()
     trampolinedParser = TrampolinedParser(self.grammar, receiver, {})
     buf = b'\r\n'.join((b'foo', b'bar', b'foo', b'bar'))
     for c in iterbytes(buf):
         trampolinedParser.receive(c)
     self.assertEqual(receiver.received, [b'foo', b'bar', b'foo'])
     trampolinedParser.receive('\r\n')
     self.assertEqual(receiver.received, [b'foo', b'bar', b'foo', b'bar'])
开发者ID:bjonnh,项目名称:parsley,代码行数:12,代码来源:test_tube.py


示例19: test_home

    def test_home(self):
        """
        When L{HistoricRecvLine} receives a HOME keystroke it moves the
        cursor to the beginning of the current line buffer.
        """
        kR = lambda ch: self.p.keystrokeReceived(ch, None)

        for ch in iterbytes(b'hello, world'):
            kR(ch)
        self.assertEqual(self.p.currentLineBuffer(), (b'hello, world', b''))

        kR(self.pt.HOME)
        self.assertEqual(self.p.currentLineBuffer(), (b'', b'hello, world'))
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:13,代码来源:test_recvline.py


示例20: testSubnegotiation

    def testSubnegotiation(self):
        # Send a subnegotiation command and make sure it gets
        # parsed and that the correct method is called.
        h = self.p.protocol

        cmd = telnet.IAC + telnet.SB + b"\x12hello world" + telnet.IAC + telnet.SE
        L = [b"These are some bytes but soon" + cmd, b"there will be some more"]

        for b in L:
            self.p.dataReceived(b)

        self.assertEqual(h.bytes, b"".join(L).replace(cmd, b""))
        self.assertEqual(h.subcmd, list(iterbytes(b"hello world")))
开发者ID:dumengnan,项目名称:ohmydata_spider,代码行数:13,代码来源:test_telnet.py



注:本文中的twisted.python.compat.iterbytes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python compat.nativeString函数代码示例发布时间:2022-05-27
下一篇:
Python compat.ioType函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap