• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python loopback.loopbackAsync函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.protocols.loopback.loopbackAsync函数的典型用法代码示例。如果您正苦于以下问题:Python loopbackAsync函数的具体用法?Python loopbackAsync怎么用?Python loopbackAsync使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了loopbackAsync函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _greetingtest

    def _greetingtest(self, write, testServer):
        """
        Test one of the permutations of write/writeSequence client/server.
        """
        class GreeteeProtocol(Protocol):
            bytes = ""
            def dataReceived(self, bytes):
                self.bytes += bytes
                if self.bytes == "bytes":
                    self.received.callback(None)

        class GreeterProtocol(Protocol):
            def connectionMade(self):
                getattr(self.transport, write)("bytes")

        if testServer:
            server = GreeterProtocol()
            client = GreeteeProtocol()
            d = client.received = Deferred()
        else:
            server = GreeteeProtocol()
            d = server.received = Deferred()
            client = GreeterProtocol()

        loopback.loopbackAsync(server, client)
        return d
开发者ID:BillAndersan,项目名称:twisted,代码行数:26,代码来源:test_loopback.py


示例2: _greetingtest

    def _greetingtest(self, write, testServer):
        """
        Test one of the permutations of write/writeSequence client/server.

        @param write: The name of the method to test, C{"write"} or
            C{"writeSequence"}.
        """
        class GreeteeProtocol(Protocol):
            bytes = b""
            def dataReceived(self, bytes):
                self.bytes += bytes
                if self.bytes == b"bytes":
                    self.received.callback(None)

        class GreeterProtocol(Protocol):
            def connectionMade(self):
                if write == "write":
                    self.transport.write(b"bytes")
                else:
                    self.transport.writeSequence([b"byt", b"es"])

        if testServer:
            server = GreeterProtocol()
            client = GreeteeProtocol()
            d = client.received = Deferred()
        else:
            server = GreeteeProtocol()
            d = server.received = Deferred()
            client = GreeterProtocol()

        loopback.loopbackAsync(server, client)
        return d
开发者ID:0004c,项目名称:VTK,代码行数:32,代码来源:test_loopback.py


示例3: _hostpeertest

    def _hostpeertest(self, get, testServer):
        """
        Test one of the permutations of client/server host/peer.
        """
        class TestProtocol(Protocol):
            def makeConnection(self, transport):
                Protocol.makeConnection(self, transport)
                self.onConnection.callback(transport)

        if testServer:
            server = TestProtocol()
            d = server.onConnection = Deferred()
            client = Protocol()
        else:
            server = Protocol()
            client = TestProtocol()
            d = client.onConnection = Deferred()

        loopback.loopbackAsync(server, client)

        def connected(transport):
            host = getattr(transport, get)()
            self.failUnless(IAddress.providedBy(host))

        return d.addCallback(connected)
开发者ID:0004c,项目名称:VTK,代码行数:25,代码来源:test_loopback.py


示例4: setUp

    def setUp(self):
        MorbidTestCase.setUp(self)
        self.serverProtocol2 = self.factory.buildProtocol(None)
        self.serverProtocol2.log = self.log
        self.client2 = StompClientProtocol('user2','user2')
        self.client2.log = self.log
        self.loopbackBody = loopback.loopbackAsync(self.serverProtocol2, 
                                                   self.client2)

        self.serverProtocol3 = self.factory.buildProtocol(None)
        self.serverProtocol3.log = self.log
        self.client3 = StompClientProtocol('user3','user3')
        self.client3.log = self.log
        self.loopbackBody = loopback.loopbackAsync(self.serverProtocol3, 
                                                   self.client3)
开发者ID:NicolaeNMV,项目名称:Tsunami,代码行数:15,代码来源:test_morbid.py


示例5: test_makeConnection

    def test_makeConnection(self):
        """
        Test that the client and server protocol both have makeConnection
        invoked on them by loopbackAsync.
        """
        class TestProtocol(Protocol):
            transport = None
            def makeConnection(self, transport):
                self.transport = transport

        server = TestProtocol()
        client = TestProtocol()
        loopback.loopbackAsync(server, client)
        self.failIfEqual(client.transport, None)
        self.failIfEqual(server.transport, None)
开发者ID:0004c,项目名称:VTK,代码行数:15,代码来源:test_loopback.py


示例6: test_List

    def test_List(self):
        """
        You can get a listing
        """
        server = Server(FakePlumber({
            'ls': [
                ('foo', 'bar', 12, True),
            ]
        }))
        client = SingleCommandClient(List)

        from twisted.protocols.loopback import loopbackAsync
        def check(response):
            self.assertEqual(server.plumber.called, ['ls'])
            self.assertEqual(client.response, {
                'pipes': [
                    {
                        'src': 'foo',
                        'dst': 'bar',
                        'conns': 12,
                        'active': True,
                    },
                ]
            })
        r = loopbackAsync(server, client)
        return r.addCallback(check)
开发者ID:iffy,项目名称:grace,代码行数:26,代码来源:test_control.py


示例7: test_disorderlyShutdown

    def test_disorderlyShutdown(self):
        """
        If a L{TLSMemoryBIOProtocol} loses its connection unexpectedly, this is
        reported to the application.
        """
        clientConnectionLost = Deferred()
        clientFactory = ClientFactory()
        clientFactory.protocol = (
            lambda: ConnectionLostNotifyingProtocol(
                clientConnectionLost))

        clientContextFactory = HandshakeCallbackContextFactory()
        wrapperFactory = TLSMemoryBIOFactory(
            clientContextFactory, True, clientFactory)
        sslClientProtocol = wrapperFactory.buildProtocol(None)

        # Client speaks first, so the server can be dumb.
        serverProtocol = Protocol()

        connectionDeferred = loopbackAsync(serverProtocol, sslClientProtocol)

        # Now destroy the connection.
        serverProtocol.transport.loseConnection()

        # And when the connection completely dies, check the reason.
        def cbDisconnected(clientProtocol):
            clientProtocol.lostConnectionReason.trap(Error)
        clientConnectionLost.addCallback(cbDisconnected)
        return clientConnectionLost
开发者ID:Almad,项目名称:twisted,代码行数:29,代码来源:test_tls.py


示例8: test_pumpPolicy

    def test_pumpPolicy(self):
        """
        The callable passed as the value for the C{pumpPolicy} parameter to
        L{loopbackAsync} is called with a L{_LoopbackQueue} of pending bytes
        and a protocol to which they should be delivered.
        """
        pumpCalls = []
        def dummyPolicy(queue, target):
            bytes = []
            while queue:
                bytes.append(queue.get())
            pumpCalls.append((target, bytes))

        client = Protocol()
        server = Protocol()

        finished = loopback.loopbackAsync(server, client, dummyPolicy)
        self.assertEqual(pumpCalls, [])

        client.transport.write(b"foo")
        client.transport.write(b"bar")
        server.transport.write(b"baz")
        server.transport.write(b"quux")
        server.transport.loseConnection()

        def cbComplete(ignored):
            self.assertEqual(
                pumpCalls,
                # The order here is somewhat arbitrary.  The implementation
                # happens to always deliver data to the client first.
                [(client, [b"baz", b"quux", None]),
                 (server, [b"foo", b"bar"])])
        finished.addCallback(cbComplete)
        return finished
开发者ID:0004c,项目名称:VTK,代码行数:34,代码来源:test_loopback.py


示例9: test_writeSequence

    def test_writeSequence(self):
        """
        Bytes written to L{TLSMemoryBIOProtocol} with C{writeSequence} are
        received by the protocol on the other side of the connection.
        """
        bytes = "some bytes"
        class SimpleSendingProtocol(Protocol):
            def connectionMade(self):
                self.transport.writeSequence(list(bytes))

        clientFactory = ClientFactory()
        clientFactory.protocol = SimpleSendingProtocol

        clientContextFactory = HandshakeCallbackContextFactory()
        wrapperFactory = TLSMemoryBIOFactory(
            clientContextFactory, True, clientFactory)
        sslClientProtocol = wrapperFactory.buildProtocol(None)

        serverProtocol = AccumulatingProtocol(len(bytes))
        serverFactory = ServerFactory()
        serverFactory.protocol = lambda: serverProtocol

        serverContextFactory = DefaultOpenSSLContextFactory(certPath, certPath)
        wrapperFactory = TLSMemoryBIOFactory(
            serverContextFactory, False, serverFactory)
        sslServerProtocol = wrapperFactory.buildProtocol(None)

        connectionDeferred = loopbackAsync(sslServerProtocol, sslClientProtocol)

        # Wait for the connection to end, then make sure the server received
        # the bytes sent by the client.
        def cbConnectionDone(ignored):
            self.assertEquals("".join(serverProtocol.received), bytes)
        connectionDeferred.addCallback(cbConnectionDone)
        return connectionDeferred
开发者ID:Almad,项目名称:twisted,代码行数:35,代码来源:test_tls.py


示例10: writeBeforeHandshakeTest

    def writeBeforeHandshakeTest(self, sendingProtocol, bytes):
        """
        Run test where client sends data before handshake, given the sending
        protocol and expected bytes.
        """
        clientFactory = ClientFactory()
        clientFactory.protocol = sendingProtocol

        clientContextFactory, handshakeDeferred = (
            HandshakeCallbackContextFactory.factoryAndDeferred())
        wrapperFactory = TLSMemoryBIOFactory(
            clientContextFactory, True, clientFactory)
        sslClientProtocol = wrapperFactory.buildProtocol(None)

        serverProtocol = AccumulatingProtocol(len(bytes))
        serverFactory = ServerFactory()
        serverFactory.protocol = lambda: serverProtocol

        serverContextFactory = DefaultOpenSSLContextFactory(certPath, certPath)
        wrapperFactory = TLSMemoryBIOFactory(
            serverContextFactory, False, serverFactory)
        sslServerProtocol = wrapperFactory.buildProtocol(None)

        connectionDeferred = loopbackAsync(sslServerProtocol, sslClientProtocol)

        # Wait for the connection to end, then make sure the server received
        # the bytes sent by the client.
        def cbConnectionDone(ignored):
            self.assertEqual("".join(serverProtocol.received), bytes)
        connectionDeferred.addCallback(cbConnectionDone)
        return connectionDeferred
开发者ID:bluemutedwisdom,项目名称:twisted,代码行数:31,代码来源:test_tls.py


示例11: test_handshake

    def test_handshake(self):
        """
        The TLS handshake is performed when L{TLSMemoryBIOProtocol} is
        connected to a transport.
        """
        clientFactory = ClientFactory()
        clientFactory.protocol = Protocol

        clientContextFactory, handshakeDeferred = (
            HandshakeCallbackContextFactory.factoryAndDeferred())
        wrapperFactory = TLSMemoryBIOFactory(
            clientContextFactory, True, clientFactory)
        sslClientProtocol = wrapperFactory.buildProtocol(None)

        serverFactory = ServerFactory()
        serverFactory.protocol = Protocol

        serverContextFactory = DefaultOpenSSLContextFactory(certPath, certPath)
        wrapperFactory = TLSMemoryBIOFactory(
            serverContextFactory, False, serverFactory)
        sslServerProtocol = wrapperFactory.buildProtocol(None)

        connectionDeferred = loopbackAsync(sslServerProtocol, sslClientProtocol)

        # Only wait for the handshake to complete.  Anything after that isn't
        # important here.
        return handshakeDeferred
开发者ID:Almad,项目名称:twisted,代码行数:27,代码来源:test_tls.py


示例12: test_writeNotReentrant

    def test_writeNotReentrant(self):
        """
        L{loopback.loopbackAsync} does not call a protocol's C{dataReceived}
        method while that protocol's transport's C{write} method is higher up
        on the stack.
        """
        class Server(Protocol):
            def dataReceived(self, bytes):
                self.transport.write(b"bytes")

        class Client(Protocol):
            ready = False

            def connectionMade(self):
                reactor.callLater(0, self.go)

            def go(self):
                self.transport.write(b"foo")
                self.ready = True

            def dataReceived(self, bytes):
                self.wasReady = self.ready
                self.transport.loseConnection()


        server = Server()
        client = Client()
        d = loopback.loopbackAsync(client, server)
        def cbFinished(ignored):
            self.assertTrue(client.wasReady)
        d.addCallback(cbFinished)
        return d
开发者ID:0004c,项目名称:VTK,代码行数:32,代码来源:test_loopback.py


示例13: testEmptyPASS

 def testEmptyPASS(self):
     dummy = DummyPOP3()
     client = LineSendingProtocol([
         "PASS ",
         "QUIT"
     ])
     d = loopback.loopbackAsync(dummy, client)
     return d.addCallback(self._cbTestEmptyPASS, client, dummy)
开发者ID:GunioRobot,项目名称:twisted,代码行数:8,代码来源:test_pop3.py


示例14: testAuthListing

    def testAuthListing(self):
        p = DummyPOP3()
        p.factory = internet.protocol.Factory()
        p.factory.challengers = {"Auth1": None, "secondAuth": None, "authLast": None}
        client = LineSendingProtocol(["AUTH", "QUIT"])

        d = loopback.loopbackAsync(p, client)
        return d.addCallback(self._cbTestAuthListing, client)
开发者ID:Bobboya,项目名称:vizitown_plugin,代码行数:8,代码来源:test_pop3.py


示例15: testProtocol

 def testProtocol(self):
     from twisted.protocols import loopback
     server = flow.Protocol()
     server.controller = echoServer
     client = flow.makeProtocol(echoClient)()
     client.factory = protocol.ClientFactory()
     client.factory.d = defer.Deferred()
     d2 = loopback.loopbackAsync(server, client)
     client.factory.d.addCallback(self.assertEquals, 'testing')
     return defer.gatherResults([client.factory.d, d2])
开发者ID:AnthonyNystrom,项目名称:YoGoMee,代码行数:10,代码来源:test_flow.py


示例16: testLoopback

 def testLoopback(self):
     protocol =  MyVirtualPOP3()
     protocol.service = self.factory
     clientProtocol = MyPOP3Downloader()
     def check(ignored):
         self.assertEqual(clientProtocol.message, self.message)
         protocol.connectionLost(
             failure.Failure(Exception("Test harness disconnect")))
     d = loopback.loopbackAsync(protocol, clientProtocol)
     return d.addCallback(check)
开发者ID:GunioRobot,项目名称:twisted,代码行数:10,代码来源:test_pop3.py


示例17: testLoopback

 def testLoopback(self):
     server = http.HTTPChannel()
     server.requestFactory = DummyHTTPHandler
     client = LoopbackHTTPClient()
     client.handleResponse = self._handleResponse
     client.handleHeader = self._handleHeader
     client.handleEndHeaders = self._handleEndHeaders
     client.handleStatus = self._handleStatus
     d = loopback.loopbackAsync(server, client)
     d.addCallback(self._cbTestLoopback)
     return d
开发者ID:MatthewTurk,项目名称:codenode,代码行数:11,代码来源:test_http.py


示例18: test_Stop

    def test_Stop(self):
        """
        You can stop the whole server.
        """
        server = Server(FakePlumber())
        client = SingleCommandClient(Stop)

        from twisted.protocols.loopback import loopbackAsync
        def check(response):
            self.assertEqual(server.plumber.called, ['stop'])
        r = loopbackAsync(server, client)
        return r.addCallback(check)
开发者ID:iffy,项目名称:grace,代码行数:12,代码来源:test_control.py


示例19: _producertest

    def _producertest(self, producerClass):
        toProduce = list(map(intToBytes, range(0, 10)))

        class ProducingProtocol(Protocol):
            def connectionMade(self):
                self.producer = producerClass(list(toProduce))
                self.producer.start(self.transport)

        class ReceivingProtocol(Protocol):
            bytes = b""
            def dataReceived(self, data):
                self.bytes += data
                if self.bytes == b''.join(toProduce):
                    self.received.callback((client, server))

        server = ProducingProtocol()
        client = ReceivingProtocol()
        client.received = Deferred()

        loopback.loopbackAsync(server, client)
        return client.received
开发者ID:0004c,项目名称:VTK,代码行数:21,代码来源:test_loopback.py


示例20: test_loseConnectionAfterHandshake

    def test_loseConnectionAfterHandshake(self):
        """
        L{TLSMemoryBIOProtocol.loseConnection} sends a TLS close alert and
        shuts down the underlying connection.
        """
        clientConnectionLost = Deferred()
        clientFactory = ClientFactory()
        clientFactory.protocol = (
            lambda: ConnectionLostNotifyingProtocol(
                clientConnectionLost))

        clientContextFactory, handshakeDeferred = (
            HandshakeCallbackContextFactory.factoryAndDeferred())
        wrapperFactory = TLSMemoryBIOFactory(
            clientContextFactory, True, clientFactory)
        sslClientProtocol = wrapperFactory.buildProtocol(None)

        serverProtocol = Protocol()
        serverFactory = ServerFactory()
        serverFactory.protocol = lambda: serverProtocol

        serverContextFactory = DefaultOpenSSLContextFactory(certPath, certPath)
        wrapperFactory = TLSMemoryBIOFactory(
            serverContextFactory, False, serverFactory)
        sslServerProtocol = wrapperFactory.buildProtocol(None)

        connectionDeferred = loopbackAsync(sslServerProtocol, sslClientProtocol)

        # Wait for the handshake before dropping the connection.
        def cbHandshake(ignored):
            serverProtocol.transport.loseConnection()

            # Now wait for the client to notice.
            return clientConnectionLost
        handshakeDeferred.addCallback(cbHandshake)

        # Wait for the connection to end, then make sure the client was
        # notified of a handshake failure.
        def cbConnectionDone(clientProtocol):
            clientProtocol.lostConnectionReason.trap(ConnectionDone)

            # The server should have closed its underlying transport, in
            # addition to whatever it did to shut down the TLS layer.
            self.assertTrue(serverProtocol.transport.q.disconnect)

            # The client should also have closed its underlying transport once
            # it saw the server shut down the TLS layer, so as to avoid relying
            # on the server to close the underlying connection.
            self.assertTrue(clientProtocol.transport.q.disconnect)
        handshakeDeferred.addCallback(cbConnectionDone)
        return handshakeDeferred
开发者ID:Almad,项目名称:twisted,代码行数:51,代码来源:test_tls.py



注:本文中的twisted.protocols.loopback.loopbackAsync函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python policies.ProtocolWrapper类代码示例发布时间:2022-05-27
下一篇:
Python basic.LineReceiver类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap