• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python tests.s2b函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.s2b函数的典型用法代码示例。如果您正苦于以下问题:Python s2b函数的具体用法?Python s2b怎么用?Python s2b使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了s2b函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: sender

            def sender (listener):
                (sock, addr) = listener.accept()
                sock = bufsized(sock, size = bufsize)
                sock.sendall(s2b('x') * many_bytes)
                sock.sendall(s2b('y') * second_bytes)

                received.wait()
开发者ID:inercia,项目名称:evy,代码行数:7,代码来源:test_green_socket_send.py


示例2: accept_close_late

 def accept_close_late(listener):
     # verify that the makefile and the socket are truly independent
     # by closing the made file and then sending a character
     try:
         conn, addr = listener.accept()
         fd = conn.makefile('w')
         fd.write('hello')
         fd.close()
         conn.send(s2b('\n'))
         conn.close()
         self.assertWriteToClosedFileRaises(fd)
         self.assertRaises(socket.error, conn.send, s2b('b'))
     finally:
         listener.close()
开发者ID:Mawaheb,项目名称:Vesper-crm,代码行数:14,代码来源:greenio_test.py


示例3: test_sendall_timeout

    def test_sendall_timeout(self):
        listener = greenio.GreenSocket(socket.socket())
        listener.bind(('', 0))
        listener.listen(50)

        evt = event.Event()

        def server():
            # accept the connection in another greenlet
            sock, addr = listener.accept()
            evt.wait()

        gt = eventlet.spawn(server)

        addr = listener.getsockname()

        client = greenio.GreenSocket(socket.socket())
        client.settimeout(0.1)
        client.connect(addr)

        try:
            msg = s2b("A") * (8 << 20)

            # want to exceed the size of the OS buffer so it'll block
            client.sendall(msg)
            self.fail("socket.timeout not raised")
        except socket.timeout, e:
            self.assert_(hasattr(e, 'args'))
            self.assertEqual(e.args[0], 'timed out')
开发者ID:Mawaheb,项目名称:Vesper-crm,代码行数:29,代码来源:greenio_test.py


示例4: test_send_timeout

    def test_send_timeout(self):
        self.reset_timeout(2)
        listener = bufsized(eventlet.listen(('', 0)))

        evt = event.Event()

        def server():
            # accept the connection in another greenlet
            sock, addr = listener.accept()
            sock = bufsized(sock)
            evt.wait()

        gt = eventlet.spawn(server)

        addr = listener.getsockname()

        client = bufsized(greenio.GreenSocket(socket.socket()))
        client.connect(addr)
        try:
            client.settimeout(0.00001)
            msg = s2b("A") * 100000  # large enough number to overwhelm most buffers

            total_sent = 0
            # want to exceed the size of the OS buffer so it'll block in a
            # single send
            for x in range(10):
                total_sent += client.send(msg)
            self.fail("socket.timeout not raised")
        except socket.timeout, e:
            self.assert_(hasattr(e, 'args'))
            self.assertEqual(e.args[0], 'timed out')
开发者ID:Mawaheb,项目名称:Vesper-crm,代码行数:31,代码来源:greenio_test.py


示例5: test_hub_exceptions

    def test_hub_exceptions (self):
        debug.hub_exceptions(True)
        server = convenience.listen(('0.0.0.0', 0))
        client = convenience.connect(('127.0.0.1', server.getsockname()[1]))
        client_2, addr = server.accept()

        def hurl (s):
            s.recv(1)
            {}[1]  # keyerror

        fake = StringIO()
        orig = sys.stderr
        sys.stderr = fake
        try:
            gt = evy.spawn(hurl, client_2)
            sleep(0)
            client.send(s2b(' '))
            sleep(0)
            # allow the "hurl" greenlet to trigger the KeyError
            # not sure why the extra context switch is needed
            sleep(0)
        finally:
            sys.stderr = orig
            self.assertRaises(KeyError, gt.wait)
            debug.hub_exceptions(False)
            # look for the KeyError exception in the traceback
        self.assert_('KeyError: 1' in fake.getvalue(),
                     "Traceback not in:\n" + fake.getvalue())
开发者ID:inercia,项目名称:evy,代码行数:28,代码来源:test_debug.py


示例6: client

        def client ():
            client = sockets.GreenSocket()
            client.connect(('127.0.0.1', listener_port))
            msg = s2b("A") * (10000)  # large enough number to overwhelm most buffers

            # want to exceed the size of the OS buffer so it'll block in a single send
            total_sent = 0
            for x in range(10):
                total_sent += client.send(msg)
            return total_sent
开发者ID:inercia,项目名称:evy,代码行数:10,代码来源:test_green_socket_send.py


示例7: server

 def server():
     (sock, addr) = listener.accept()
     sock = bufsized(sock)
     send_large_coro = eventlet.spawn(send_large, sock)
     eventlet.sleep(0)
     result = sock.recv(10)
     expected = s2b('hello world')
     while len(result) < len(expected):
         result += sock.recv(10)
     self.assertEquals(result, expected)
     send_large_coro.wait()
开发者ID:Mawaheb,项目名称:Vesper-crm,代码行数:11,代码来源:greenio_test.py


示例8: test_exiting_server

 def test_exiting_server(self):
     # tests that the server closes the client sock on handle() exit
     def closer(sock,addr):
         pass
         
     l = eventlet.listen(('localhost', 0))
     gt = eventlet.spawn(eventlet.serve, l, closer)
     client = eventlet.connect(('localhost', l.getsockname()[1]))
     client.sendall(s2b('a'))
     self.assertFalse(client.recv(100))
     gt.kill()
开发者ID:2013Commons,项目名称:HUE-SHARK,代码行数:11,代码来源:convenience_test.py


示例9: test_full_duplex

    def test_full_duplex (self):
        large_data = s2b('*') * 10 * min_buf_size()
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        listener.bind(('127.0.0.1', 0))
        listener.listen(50)

        def send_large (sock):
            sock.sendall(large_data)

        def read_large (sock):
            result = sock.recv(len(large_data))
            while len(result) < len(large_data):
                result += sock.recv(len(large_data))
                if result == '':
                    break
            self.assertEquals(result, large_data)

        def server ():
            (sock, addr) = listener.accept()
            sock = bufsized(sock)
            send_large_coro = spawn(send_large, sock)
            sleep(0)
            result = sock.recv(10)
            expected = s2b('hello world')
            while len(result) < len(expected) and result is not '':
                result += sock.recv(10)
            self.assertEquals(result, expected)
            send_large_coro.wait()

        server_evt = spawn(server)
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect(('127.0.0.1', listener.getsockname()[1]))
        bufsized(client)
        large_evt = spawn(read_large, client)
        sleep(0)
        client.sendall(s2b('hello world'))

        server_evt.wait()
        large_evt.wait()
        client.close()
开发者ID:inercia,项目名称:evy,代码行数:41,代码来源:test_green_socket_send.py


示例10: test_excepting_server

 def test_excepting_server(self):
     # tests that the server closes the client sock on handle() exception
     def crasher(sock,addr):
         sock.recv(1024)
         0//0
         
     l = eventlet.listen(('localhost', 0))
     gt = eventlet.spawn(eventlet.serve, l, crasher)
     client = eventlet.connect(('localhost', l.getsockname()[1]))
     client.sendall(s2b('a'))
     self.assertRaises(ZeroDivisionError, gt.wait)
     self.assertFalse(client.recv(100))
开发者ID:2013Commons,项目名称:HUE-SHARK,代码行数:12,代码来源:convenience_test.py


示例11: test_excepting_server_already_closed

 def test_excepting_server_already_closed(self):
     # same as above but with explicit clsoe before crash
     def crasher(sock,addr):
         sock.recv(1024)
         sock.close()
         0//0
         
     l = eventlet.listen(('localhost', 0))
     gt = eventlet.spawn(eventlet.serve, l, crasher)
     client = eventlet.connect(('localhost', l.getsockname()[1]))
     client.sendall(s2b('a'))
     self.assertRaises(ZeroDivisionError, gt.wait)
     self.assertFalse(client.recv(100))
开发者ID:2013Commons,项目名称:HUE-SHARK,代码行数:13,代码来源:convenience_test.py


示例12: test_multiple_readers

    def test_multiple_readers(self, clibufsize=False):
        debug.hub_prevent_multiple_readers(False)
        recvsize = 2 * min_buf_size()
        sendsize = 10 * recvsize

        # test that we can have multiple coroutines reading
        # from the same fd.  We make no guarantees about which one gets which
        # bytes, but they should both get at least some
        def reader(sock, results):
            while True:
                data = sock.recv(recvsize)
                if not data:
                    break
                results.append(data)

        results1 = []
        results2 = []
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        listener.bind(('127.0.0.1', 0))
        listener.listen(50)

        def server():
            (sock, addr) = listener.accept()
            sock = bufsized(sock)
            try:
                c1 = eventlet.spawn(reader, sock, results1)
                c2 = eventlet.spawn(reader, sock, results2)
                try:
                    c1.wait()
                    c2.wait()
                finally:
                    c1.kill()
                    c2.kill()
            finally:
                sock.close()

        server_coro = eventlet.spawn(server)
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect(('127.0.0.1', listener.getsockname()[1]))
        if clibufsize:
            bufsized(client, size=sendsize)
        else:
            bufsized(client)
        client.sendall(s2b('*') * sendsize)
        client.close()
        server_coro.wait()
        listener.close()
        self.assert_(len(results1) > 0)
        self.assert_(len(results2) > 0)
        debug.hub_prevent_multiple_readers()
开发者ID:Mawaheb,项目名称:Vesper-crm,代码行数:51,代码来源:greenio_test.py


示例13: client

        def client():
            pid = os.fork()
            if pid:
                return pid

            client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.connect(('127.0.0.1', port))

            bufsized(client, size=sendsize)

            for i in range(sendloops):
                client.sendall(s2b('*') * sendsize)
            client.close()
            os._exit(0)
开发者ID:Mawaheb,项目名称:Vesper-crm,代码行数:14,代码来源:greenio_test.py


示例14: test_client

 def test_client():
     c = eventlet.connect(('localhost', l.getsockname()[1]))
     # verify the client is connected by getting data
     self.assertEquals(s2b('hi'), c.recv(2))
     return c
开发者ID:2013Commons,项目名称:HUE-SHARK,代码行数:5,代码来源:convenience_test.py


示例15: waiter

 def waiter(sock, addr):
     sock.sendall(s2b('hi'))
     evt.wait()
开发者ID:2013Commons,项目名称:HUE-SHARK,代码行数:3,代码来源:convenience_test.py



注:本文中的tests.s2b函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tests.strict_eq函数代码示例发布时间:2022-05-27
下一篇:
Python tests.run_test函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap