• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python support.find_unused_port函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.support.find_unused_port函数的典型用法代码示例。如果您正苦于以下问题:Python find_unused_port函数的具体用法?Python find_unused_port怎么用?Python find_unused_port使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了find_unused_port函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_start_serving_dual_stack

    def test_start_serving_dual_stack(self):
        f_proto = futures.Future()

        def connection_handler(transport):
            f_proto.set_result(MyProto(transport))

        port = find_unused_port()
        f = self.loop.start_serving(connection_handler, host=None, port=port)
        socks = self.loop.run_until_complete(f)
        client = socket.socket()
        client.connect(('127.0.0.1', port))
        client.send(b'xxx')
        proto = self.loop.run_until_complete(f_proto)
        proto.transport.close()
        client.close()

        f_proto = futures.Future()
        client = socket.socket(socket.AF_INET6)
        client.connect(('::1', port))
        client.send(b'xxx')
        proto = self.loop.run_until_complete(f_proto)
        proto.transport.close()
        client.close()

        for s in socks:
            self.loop.stop_serving(s)
开发者ID:sah,项目名称:tulip,代码行数:26,代码来源:events_test.py


示例2: prepare_socksendfile

    def prepare_socksendfile(self):
        proto = MyProto(self.loop)
        port = support.find_unused_port()
        srv_sock = self.make_socket(cleanup=False)
        srv_sock.bind((support.HOST, port))
        server = self.run_loop(self.loop.create_server(
            lambda: proto, sock=srv_sock))
        self.reduce_receive_buffer_size(srv_sock)

        sock = self.make_socket()
        self.run_loop(self.loop.sock_connect(sock, ('127.0.0.1', port)))
        self.reduce_send_buffer_size(sock)

        def cleanup():
            if proto.transport is not None:
                # can be None if the task was cancelled before
                # connection_made callback
                proto.transport.close()
                self.run_loop(proto.wait_closed())

            server.close()
            self.run_loop(server.wait_closed())

        self.addCleanup(cleanup)

        return sock, proto
开发者ID:adrian17,项目名称:cpython,代码行数:26,代码来源:test_sendfile.py


示例3: __init__

 def __init__(self, certfile):
     self.flag = None
     self.active = False
     self.port = support.find_unused_port()
     self.server = self.EchoServer(self.port, certfile)
     threading.Thread.__init__(self)
     self.daemon = True
开发者ID:Kanma,项目名称:Athena-Dependencies-Python,代码行数:7,代码来源:test_ssl.py


示例4: test_source_address

 def test_source_address(self):
     self.client.quit()
     port = support.find_unused_port()
     self.client.connect(self.server.host, self.server.port,
                         source_address=(HOST, port))
     self.assertEqual(self.client.sock.getsockname()[1], port)
     self.client.quit()
开发者ID:pombredanne,项目名称:cpython,代码行数:7,代码来源:test_ftplib.py


示例5: test_create_connection_local_addr

 def test_create_connection_local_addr(self):
     with test_utils.run_test_server(self.loop) as httpd:
         port = find_unused_port()
         f = self.loop.create_connection(
             *httpd.address, local_addr=(httpd.address[0], port))
         tr = self.loop.run_until_complete(f)
         pr = MyProto(tr, create_future=True)
         expected = pr.transport.get_extra_info('socket').getsockname()[1]
         self.assertEqual(port, expected)
         tr.close()
开发者ID:sah,项目名称:tulip,代码行数:10,代码来源:events_test.py


示例6: test_source_address_passive_connection

 def test_source_address_passive_connection(self):
     port = support.find_unused_port()
     self.client.source_address = (HOST, port)
     try:
         with self.client.transfercmd('list') as sock:
             self.assertEqual(sock.getsockname()[1], port)
     except IOError as e:
         if e.errno == errno.EADDRINUSE:
             self.skipTest("couldn't bind to port %d" % port)
         raise
开发者ID:alfonsodiecko,项目名称:PYTHON_DIST,代码行数:10,代码来源:test_ftplib.py


示例7: test_source_address

 def test_source_address(self):
     self.client.quit()
     port = support.find_unused_port()
     try:
         self.client.connect(self.server.host, self.server.port, source_address=(HOST, port))
         self.assertEqual(self.client.sock.getsockname()[1], port)
         self.client.quit()
     except OSError as e:
         if e.errno == errno.EADDRINUSE:
             self.skipTest("couldn't bind to port %d" % port)
         raise
开发者ID:collinanderson,项目名称:ensurepip,代码行数:11,代码来源:test_ftplib.py


示例8: setUp

    def setUp(self):
        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        self.port = support.find_unused_port()
        self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        threading.Thread(target=debugging_server, args=serv_args).start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear()
开发者ID:LinkedModernismProject,项目名称:web_code,代码行数:11,代码来源:test_smtplib.py


示例9: test_create_connection_local_addr

 def test_create_connection_local_addr(self):
     from test.support import find_unused_port
     loop = get_event_loop()
     with run_test_server(loop, EchoServerProtocol) as server:
         yield server.start_serving()
         host = server.address[0]
         port = find_unused_port()
         tr, pr = yield loop.create_connection(SimpleProtocol,
                                               *server.address,
                                               local_addr=(host, port))
         expected = pr.transport.get_extra_info('socket').getsockname()[1]
         self.assertEqual(port, expected)
         tr.close()
开发者ID:elimisteve,项目名称:pulsar,代码行数:13,代码来源:eventloop.py


示例10: testSourceAddress

 def testSourceAddress(self):
     # connect
     src_port = support.find_unused_port()
     try:
         smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
                             timeout=3, source_address=(self.host, src_port))
         self.assertEqual(smtp.source_address, (self.host, src_port))
         self.assertEqual(smtp.local_hostname, 'localhost')
         smtp.quit()
     except OSError as e:
         if e.errno == errno.EADDRINUSE:
             self.skipTest("couldn't bind to source port %d" % src_port)
         raise
开发者ID:CCNITSilchar,项目名称:cpython,代码行数:13,代码来源:test_smtplib.py


示例11: testSourceAddress

 def testSourceAddress(self):
     # connect
     port = support.find_unused_port()
     try:
         smtp = smtplib.SMTP(
             HOST, self.port, local_hostname="localhost", timeout=3, source_address=("127.0.0.1", port)
         )
         self.assertEqual(smtp.source_address, ("127.0.0.1", port))
         self.assertEqual(smtp.local_hostname, "localhost")
         smtp.quit()
     except IOError as e:
         if e.errno == errno.EADDRINUSE:
             self.skipTest("couldn't bind to port %d" % port)
         raise
开发者ID:pykomke,项目名称:Kurz_Python_KE,代码行数:14,代码来源:test_smtplib.py


示例12: test_create_server_sock

 def test_create_server_sock(self):
     port = find_unused_port()
     sock = create_server_sock((None, port))
     with contextlib.closing(sock):
         self.assertEqual(sock.getsockname()[1], port)
         self.assertEqual(sock.type, socket.SOCK_STREAM)
         if has_dual_stack():
             self.assertEqual(sock.family, socket.AF_INET6)
         else:
             self.assertEqual(sock.family, socket.AF_INET)
         self.echo_server(sock)
         cl = socket.create_connection(('localhost', port), timeout=2)
         with contextlib.closing(cl):
             cl.sendall(b'foo')
             self.assertEqual(cl.recv(1024), b'foo')
开发者ID:jacob-carrier,项目名称:code,代码行数:15,代码来源:recipe-578504.py


示例13: prepare_sendfile

    def prepare_sendfile(self, *, is_ssl=False, close_after=0):
        port = support.find_unused_port()
        srv_proto = MySendfileProto(loop=self.loop,
                                    close_after=close_after)
        if is_ssl:
            if not ssl:
                self.skipTest("No ssl module")
            srv_ctx = test_utils.simple_server_sslcontext()
            cli_ctx = test_utils.simple_client_sslcontext()
        else:
            srv_ctx = None
            cli_ctx = None
        srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        srv_sock.bind((support.HOST, port))
        server = self.run_loop(self.loop.create_server(
            lambda: srv_proto, sock=srv_sock, ssl=srv_ctx))
        self.reduce_receive_buffer_size(srv_sock)

        if is_ssl:
            server_hostname = support.HOST
        else:
            server_hostname = None
        cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        cli_sock.connect((support.HOST, port))

        cli_proto = MySendfileProto(loop=self.loop)
        tr, pr = self.run_loop(self.loop.create_connection(
            lambda: cli_proto, sock=cli_sock,
            ssl=cli_ctx, server_hostname=server_hostname))
        self.reduce_send_buffer_size(cli_sock, transport=tr)

        def cleanup():
            srv_proto.transport.close()
            cli_proto.transport.close()
            self.run_loop(srv_proto.done)
            self.run_loop(cli_proto.done)

            server.close()
            self.run_loop(server.wait_closed())

        self.addCleanup(cleanup)
        return srv_proto, cli_proto
开发者ID:adrian17,项目名称:cpython,代码行数:42,代码来源:test_sendfile.py


示例14: test_start_serving_dual_stack

    def test_start_serving_dual_stack(self):
        f_proto = futures.Future(loop=self.loop)

        class TestMyProto(MyProto):
            def connection_made(self, transport):
                super().connection_made(transport)
                f_proto.set_result(self)

        try_count = 0
        while True:
            try:
                port = find_unused_port()
                f = self.loop.start_serving(TestMyProto, host=None, port=port)
                socks = self.loop.run_until_complete(f)
            except OSError as ex:
                if ex.errno == errno.EADDRINUSE:
                    try_count += 1
                    self.assertGreaterEqual(5, try_count)
                    continue
                else:
                    raise
            else:
                break
        client = socket.socket()
        client.connect(('127.0.0.1', port))
        client.send(b'xxx')
        proto = self.loop.run_until_complete(f_proto)
        proto.transport.close()
        client.close()

        f_proto = futures.Future(loop=self.loop)
        client = socket.socket(socket.AF_INET6)
        client.connect(('::1', port))
        client.send(b'xxx')
        proto = self.loop.run_until_complete(f_proto)
        proto.transport.close()
        client.close()

        for s in socks:
            self.loop.stop_serving(s)
开发者ID:johnnoone,项目名称:pyzmqtulip,代码行数:40,代码来源:events_test.py


示例15: test_mlistener

 def test_mlistener(self):
     port = find_unused_port()
     # v4
     sock = MultipleSocketsListener(
         [('127.0.0.1', port), ('::1', port)])
     with contextlib.closing(sock):
         self.echo_server(sock)
         port = sock.getsockname()[1]
         cl = socket.create_connection(("127.0.0.1", port), timeout=2)
         with contextlib.closing(cl):
             cl.sendall(b'foo')
             self.assertEqual(cl.recv(1024), b'foo')
     # v6
     sock = MultipleSocketsListener(
         [('127.0.0.1', port), ('::1', port)])
     with contextlib.closing(sock):
         self.echo_server(sock)
         port = sock.getsockname()[1]
         cl = socket.create_connection(("::1", port), timeout=2)
         with contextlib.closing(cl):
             cl.sendall(b'foo')
             self.assertEqual(cl.recv(1024), b'foo')
开发者ID:jacob-carrier,项目名称:code,代码行数:22,代码来源:recipe-578504.py


示例16: prepare

    def prepare(self):
        sock = self.make_socket()
        proto = self.MyProto(self.loop)
        port = support.find_unused_port()
        srv_sock = self.make_socket(cleanup=False)
        srv_sock.bind(('127.0.0.1', port))
        server = self.run_loop(self.loop.create_server(
            lambda: proto, sock=srv_sock))
        self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))

        def cleanup():
            if proto.transport is not None:
                # can be None if the task was cancelled before
                # connection_made callback
                proto.transport.close()
                self.run_loop(proto.wait_closed())

            server.close()
            self.run_loop(server.wait_closed())

        self.addCleanup(cleanup)

        return sock, proto
开发者ID:FFMG,项目名称:myoddweb.piger,代码行数:23,代码来源:test_proactor_events.py


示例17: testRudeShutdown

        def testRudeShutdown(self):

            listener_ready = threading.Event()
            listener_gone = threading.Event()
            port = support.find_unused_port()

            # `listener` runs in a thread.  It opens a socket listening on
            # PORT, and sits in an accept() until the main thread connects.
            # Then it rudely closes the socket, and sets Event `listener_gone`
            # to let the main thread know the socket is gone.
            def listener():
                s = socket.socket()
                s.bind((HOST, port))
                s.listen(5)
                listener_ready.set()
                s.accept()
                s = None # reclaim the socket object, which also closes it
                listener_gone.set()

            def connector():
                listener_ready.wait()
                s = socket.socket()
                s.connect((HOST, port))
                listener_gone.wait()
                try:
                    ssl_sock = ssl.wrap_socket(s)
                except IOError:
                    pass
                else:
                    raise support.TestFailed(
                          'connecting to closed SSL socket should have failed')

            t = threading.Thread(target=listener)
            t.start()
            connector()
            t.join()
开发者ID:Kanma,项目名称:Athena-Dependencies-Python,代码行数:36,代码来源:test_ssl.py


示例18: setUp

 def setUp(self):
     self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     self.port = support.bind_port(self.serv)
     self.source_port = support.find_unused_port()
     self.serv.listen(5)
     self.conn = None
开发者ID:chidea,项目名称:GoPythonDLLWrapper,代码行数:6,代码来源:test_httplib.py


示例19: test_source_address_passive_connection

 def test_source_address_passive_connection(self):
     port = support.find_unused_port()
     self.client.source_address = (HOST, port)
     with self.client.transfercmd('list') as sock:
         self.assertEqual(sock.getsockname()[1], port)
开发者ID:pombredanne,项目名称:cpython,代码行数:5,代码来源:test_ftplib.py


示例20: test_find_unused_port

 def test_find_unused_port(self):
     port = support.find_unused_port()
     s = socket.socket()
     s.bind((support.HOST, port))
     s.close()
开发者ID:MYSHLIFE,项目名称:cpython-1,代码行数:5,代码来源:test_support.py



注:本文中的test.support.find_unused_port函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python support.findfile函数代码示例发布时间:2022-05-27
下一篇:
Python support.create_empty_file函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap