• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python netutil.add_accept_handler函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.netutil.add_accept_handler函数的典型用法代码示例。如果您正苦于以下问题:Python add_accept_handler函数的具体用法?Python add_accept_handler怎么用?Python add_accept_handler使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了add_accept_handler函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_chunked_close

    def test_chunked_close(self):
        # test case in which chunks spread read-callback processing
        # over several ioloop iterations, but the connection is already closed.
        port = get_unused_port()
        (sock,) = netutil.bind_sockets(port, address="127.0.0.1")
        def accept_callback(conn, address):
            # fake an HTTP server using chunked encoding where the final chunks
            # and connection close all happen at once
            stream = IOStream(conn, io_loop=self.io_loop)
            stream.write(b("""\
HTTP/1.1 200 OK
Transfer-Encoding: chunked

1
1
1
2
0

""").replace(b("\n"), b("\r\n")), callback=stream.close)
        netutil.add_accept_handler(sock, accept_callback, self.io_loop)
        self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
        resp = self.wait()
        resp.rethrow()
        self.assertEqual(resp.body, b("12"))
开发者ID:aifreedom,项目名称:tornado,代码行数:25,代码来源:httpclient_test.py


示例2: test_multi_line_headers

    def test_multi_line_headers(self):
        # Multi-line http headers are rare but rfc-allowed
        # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
        sock, port = bind_unused_port()
        with closing(sock):

            @gen.coroutine
            def accept_callback(conn, address):
                stream = IOStream(conn)
                request_data = yield stream.read_until(b"\r\n\r\n")
                if b"HTTP/1." not in request_data:
                    self.skipTest("requires HTTP/1.x")
                yield stream.write(
                    b"""\
HTTP/1.1 200 OK
X-XSS-Protection: 1;
\tmode=block

""".replace(
                        b"\n", b"\r\n"
                    )
                )
                stream.close()

            netutil.add_accept_handler(sock, accept_callback)  # type: ignore
            resp = self.fetch("http://127.0.0.1:%d/" % port)
            resp.rethrow()
            self.assertEqual(resp.headers["X-XSS-Protection"], "1; mode=block")
            self.io_loop.remove_handler(sock.fileno())
开发者ID:rgbkrk,项目名称:tornado,代码行数:29,代码来源:httpclient_test.py


示例3: test_chunked_close

    def test_chunked_close(self):
        # test case in which chunks spread read-callback processing
        # over several ioloop iterations, but the connection is already closed.
        sock, port = bind_unused_port()
        with closing(sock):
            @gen.coroutine
            def accept_callback(conn, address):
                # fake an HTTP server using chunked encoding where the final chunks
                # and connection close all happen at once
                stream = IOStream(conn)
                request_data = yield stream.read_until(b"\r\n\r\n")
                if b"HTTP/1." not in request_data:
                    self.skipTest("requires HTTP/1.x")
                yield stream.write(b"""\
HTTP/1.1 200 OK
Transfer-Encoding: chunked

1
1
1
2
0

""".replace(b"\n", b"\r\n"))
                stream.close()
            netutil.add_accept_handler(sock, accept_callback)
            resp = self.fetch("http://127.0.0.1:%d/" % port)
            resp.rethrow()
            self.assertEqual(resp.body, b"12")
            self.io_loop.remove_handler(sock.fileno())
开发者ID:dkdenza,项目名称:tornado,代码行数:30,代码来源:httpclient_test.py


示例4: make_iostream_pair

    def make_iostream_pair(self, **kwargs):
        listener, port = bind_unused_port()
        streams = [None, None]

        def accept_callback(connection, address):
            streams[0] = self._make_server_iostream(connection, **kwargs)
            if isinstance(streams[0], SSLIOStream):
                # HACK: The SSL handshake won't complete (and
                # therefore the client connect callback won't be
                # run)until the server side has tried to do something
                # with the connection.  For these tests we want both
                # sides to connect before we do anything else with the
                # connection, so we must cause some dummy activity on the
                # server.  If this turns out to be useful for real apps
                # it should have a cleaner interface.
                streams[0]._add_io_state(IOLoop.READ)
            self.stop()

        def connect_callback():
            streams[1] = client_stream
            self.stop()
        netutil.add_accept_handler(listener, accept_callback,
                                   io_loop=self.io_loop)
        client_stream = self._make_client_iostream(socket.socket(), **kwargs)
        client_stream.connect(('127.0.0.1', port),
                              callback=connect_callback)
        self.wait(condition=lambda: all(streams))
        self.io_loop.remove_handler(listener.fileno())
        listener.close()
        return streams
开发者ID:RyanWarm,项目名称:ZUtils,代码行数:30,代码来源:iostream_test.py


示例5: test_multi_line_headers

    def test_multi_line_headers(self):
        # Multi-line http headers are rare but rfc-allowed
        # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
        sock, port = bind_unused_port()
        with closing(sock):
            def write_response(stream, request_data):
                if b"HTTP/1." not in request_data:
                    self.skipTest("requires HTTP/1.x")
                stream.write(b"""\
HTTP/1.1 200 OK
X-XSS-Protection: 1;
\tmode=block

""".replace(b"\n", b"\r\n"), callback=stream.close)

            def accept_callback(conn, address):
                stream = IOStream(conn, io_loop=self.io_loop)
                stream.read_until(b"\r\n\r\n",
                                  functools.partial(write_response, stream))
            netutil.add_accept_handler(sock, accept_callback, self.io_loop)
            self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
            resp = self.wait()
            resp.rethrow()
            self.assertEqual(resp.headers['X-XSS-Protection'], "1; mode=block")
            self.io_loop.remove_handler(sock.fileno())
开发者ID:437049211,项目名称:PyQYT,代码行数:25,代码来源:httpclient_test.py


示例6: test_100_continue

    def test_100_continue(self):
        # testing if httpclient is able to skip 100 continue responses.
        # to test without httpserver implementation, using
        # raw response as same as httpclient_test.test_chunked_close.
        port = get_unused_port()
        (sock,) = netutil.bind_sockets(port, address="127.0.0.1")
        with closing(sock):
            def write_response(stream, request_data):
                stream.write(b("""\
HTTP/1.1 100 Continue

HTTP/1.1 200 OK
Content-Length: 6

hjkl
""").replace(b("\n"), b("\r\n")), callback=stream.close)
            def accept_callback(conn, address):
                # fake an HTTP server using chunked encoding where the final chunks
                # and connection close all happen at once
                stream = IOStream(conn, io_loop=self.io_loop)
                stream.read_until(b("\r\n\r\n"),
                                  functools.partial(write_response, stream))
            netutil.add_accept_handler(sock, accept_callback, self.io_loop)
            self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop,
                                   headers={"Expect": "100-continue"})
            resp = self.wait()
            resp.rethrow()
            self.assertEqual(resp.body, b("hjkl\r\n"))
开发者ID:yyuu,项目名称:tornado,代码行数:28,代码来源:simple_httpclient_test.py


示例7: test_chunked_close

    def test_chunked_close(self):
        # test case in which chunks spread read-callback processing
        # over several ioloop iterations, but the connection is already closed.
        sock, port = bind_unused_port()
        with closing(sock):
            def write_response(stream, request_data):
                stream.write(b("""\
HTTP/1.1 200 OK
Transfer-Encoding: chunked

1
1
1
2
0

""").replace(b("\n"), b("\r\n")), callback=stream.close)

            def accept_callback(conn, address):
                # fake an HTTP server using chunked encoding where the final chunks
                # and connection close all happen at once
                stream = IOStream(conn, io_loop=self.io_loop)
                stream.read_until(b("\r\n\r\n"),
                                  functools.partial(write_response, stream))
            netutil.add_accept_handler(sock, accept_callback, self.io_loop)
            self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
            resp = self.wait()
            resp.rethrow()
            self.assertEqual(resp.body, b("12"))
            self.io_loop.remove_handler(sock.fileno())
开发者ID:CNCBASHER,项目名称:tornado,代码行数:30,代码来源:httpclient_test.py


示例8: add_sockets

    def add_sockets(self, sockets, callback=None):
        r"""Makes this server start accepting connections on the given
        sockets.

        The `sockets` parameter is a list of socket objects such as
        those returned by `tornado.netutil.bind_sockets`"""

        if self.io_loop is None:
            self.io_loop = ioloop.IOLoop.current()
        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            netutil.add_accept_handler(sock, callback, io_loop=self.io_loop)
开发者ID:denself,项目名称:queclink_handler,代码行数:12,代码来源:server.py


示例9: add_sockets

    def add_sockets(self, sockets):
        """Makes this server start accepting connections on the given sockets.

        The ``sockets`` parameter is a list of socket objects such as
        those returned by `~tornado.netutil.bind_sockets`.
        `add_sockets` is typically used in combination with that
        method and `tornado.process.fork_processes` to provide greater
        control over the initialization of a multi-process server.
        """
        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            add_accept_handler(sock, self._handle_connection)
开发者ID:ajdavis,项目名称:tornado,代码行数:12,代码来源:tcpserver.py


示例10: __init__

    def __init__(self, usocket, starting_point, allow_design):
        """Used to run the plugins that are installed based on the starting point
        @param socket Unix socket you are watching
        @param starting_point The starting point of the plugins

        """
        self.queue = sundaytasks.utils.get_plugins()
        self.extensions = sundaytasks.utils.get_extensions()
        self.starting_point = starting_point
        self.instance = IOLoop.instance()
        self._allow_design = allow_design
        unix_socket = netutil.bind_unix_socket(usocket)
        netutil.add_accept_handler(unix_socket, self.accept)
开发者ID:olafura,项目名称:sundaytasks-py,代码行数:13,代码来源:main.py


示例11: make_iostream_pair

    def make_iostream_pair(self, **kwargs):
        listener, port = bind_unused_port()
        server_stream_fut = Future()

        def accept_callback(connection, address):
            server_stream_fut.set_result(self._make_server_iostream(connection, **kwargs))

        netutil.add_accept_handler(listener, accept_callback)
        client_stream = self._make_client_iostream(socket.socket(), **kwargs)
        connect_fut = client_stream.connect(('127.0.0.1', port))
        server_stream, client_stream = yield [server_stream_fut, connect_fut]
        self.io_loop.remove_handler(listener.fileno())
        listener.close()
        raise gen.Return((server_stream, client_stream))
开发者ID:leeclemens,项目名称:tornado,代码行数:14,代码来源:iostream_test.py


示例12: setUp

 def setUp(self):
     try:
         super(TestIOStreamStartTLS, self).setUp()
         self.listener, self.port = bind_unused_port()
         self.server_stream = None
         self.server_accepted = Future()
         netutil.add_accept_handler(self.listener, self.accept)
         self.client_stream = IOStream(socket.socket())
         self.io_loop.add_future(self.client_stream.connect(("127.0.0.1", self.port)), self.stop)
         self.wait()
         self.io_loop.add_future(self.server_accepted, self.stop)
         self.wait()
     except Exception as e:
         print(e)
         raise
开发者ID:tomjpsun,项目名称:Blend4Web,代码行数:15,代码来源:iostream_test.py


示例13: asyncSetUp

    def asyncSetUp(self):
        self.client_stream, self.src_stream = yield self.create_iostream_pair()

        self.context = LayerContext(
            mode="socks", src_stream=self.src_stream)
        self.layer = SocksLayer(self.context)

        self.listener, self.port = bind_unused_port()

        def dest_accept_callback(conn, addr):
            self.dest_server_stream = MicroProxyIOStream(conn)
            self.addCleanup(self.dest_server_stream.close)

        add_accept_handler(self.listener, dest_accept_callback)
        self.addCleanup(self.listener.close)
开发者ID:mike820324,项目名称:microProxy,代码行数:15,代码来源:test_socks.py


示例14: asyncSetUp

    def asyncSetUp(self):
        listener, port = bind_unused_port()
        event = Event()

        def accept_callback(conn, addr):
            self.server_stream = IOStream(conn)
            self.addCleanup(self.server_stream.close)
            event.set()

        add_accept_handler(listener, accept_callback)
        self.client_stream = IOStream(socket.socket())
        self.addCleanup(self.client_stream.close)
        yield [self.client_stream.connect(('127.0.0.1', port)),
               event.wait()]
        self.io_loop.remove_handler(listener)
        listener.close()
开发者ID:Agnewee,项目名称:tornado,代码行数:16,代码来源:http1connection_test.py


示例15: make_iostream_pair

 def make_iostream_pair(self):
     port = get_unused_port()
     [listener] = netutil.bind_sockets(port, '127.0.0.1',
                                       family=socket.AF_INET)
     streams = [None, None]
     def accept_callback(connection, address):
         streams[0] = IOStream(connection, io_loop=self.io_loop)
         self.stop()
     def connect_callback():
         streams[1] = client_stream
         self.stop()
     netutil.add_accept_handler(listener, accept_callback,
                                io_loop=self.io_loop)
     client_stream = IOStream(socket.socket(), io_loop=self.io_loop)
     client_stream.connect(('127.0.0.1', port),
                           callback=connect_callback)
     self.wait(condition=lambda: all(streams))
     return streams
开发者ID:GALI472,项目名称:zhidaobot,代码行数:18,代码来源:iostream_test.py


示例16: start

    def start(self):
        if self.opts['__role'] == 'minion':
            yield self.listen_to_minion_connected_event()

        port = int(self.opts['runtests_conn_check_port'])
        log.info('Starting Pytest Engine(role=%s) on port %s', self.opts['__role'], port)
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.setblocking(0)
        # bind the socket to localhost on the config provided port
        self.sock.bind(('localhost', port))
        # become a server socket
        self.sock.listen(5)
        netutil.add_accept_handler(
            self.sock,
            self.handle_connection,
            io_loop=self.io_loop,
        )
开发者ID:bryson,项目名称:salt,代码行数:18,代码来源:runtests_engine.py


示例17: make_iostream_pair

    def make_iostream_pair(self, **kwargs):
        listener, port = bind_unused_port()
        streams = [None, None]

        def accept_callback(connection, address):
            streams[0] = self._make_server_iostream(connection, **kwargs)
            self.stop()

        def connect_callback():
            streams[1] = client_stream
            self.stop()

        netutil.add_accept_handler(listener, accept_callback, io_loop=self.io_loop)
        client_stream = self._make_client_iostream(socket.socket(), **kwargs)
        client_stream.connect(("127.0.0.1", port), callback=connect_callback)
        self.wait(condition=lambda: all(streams))
        self.io_loop.remove_handler(listener.fileno())
        listener.close()
        return streams
开发者ID:tomjpsun,项目名称:Blend4Web,代码行数:19,代码来源:iostream_test.py


示例18: create_iostream_pair

    def create_iostream_pair(self):
        _lock = Event()
        server_streams = []

        def accept_callback(conn, addr):
            server_stream = MicroProxyIOStream(conn)
            server_streams.append(server_stream)
            # self.addCleanup(server_stream.close)
            _lock.set()

        listener, port = bind_unused_port()
        add_accept_handler(listener, accept_callback)
        client_stream = MicroProxyIOStream(socket.socket())
        yield [client_stream.connect(('127.0.0.1', port)),
               _lock.wait()]
        self.io_loop.remove_handler(listener)
        listener.close()

        raise Return((client_stream, server_streams[0]))
开发者ID:mike820324,项目名称:microProxy,代码行数:19,代码来源:utils.py


示例19: add_sockets

    def add_sockets(self, sockets):
        """Makes this server start accepting connections on the given sockets.

        The ``sockets`` parameter is a list of socket objects such as
        those returned by `~tornado.netutil.bind_sockets`.
        `add_sockets` is typically used in combination with that
        method and `tornado.process.fork_processes` to provide greater
        control over the initialization of a multi-process server.
        """
        if self.io_loop is None:
            self.io_loop = IOLoop.current()  # 创建一个IOLoop

        for sock in sockets:  # sock.fileno() 返回文件标记,一个整数
            self._sockets[sock.fileno()] = sock  # 添加到需要监听的列表

            # 需要监听的列表有不同类型的socket,这个是用于accept新连接的socket,
            # 所以使用 add_accept_handler
            # IOLoop是如何区分两个类型的socket,又是如何监听的,需要以后好好的看看
            add_accept_handler(sock, self._handle_connection,
                               io_loop=self.io_loop)
开发者ID:ColorFuzzy,项目名称:tornado_code,代码行数:20,代码来源:tcpserver.py


示例20: bind_sockets

else:
    log.setLevel(WARNING)


ioloop = IOLoop.instance()

if os.getenv('LISTEN_PID'):
    log.info('Getting socket from systemd')
    sck = socket.fromfd(
        SYSTEMD_SOCKET_FD + 1,  # Second socket in .socket file
        socket.AF_INET6 if socket.has_ipv6 else socket.AF_INET,
        socket.SOCK_STREAM)
    sck.setblocking(0)
    sck.listen(128)
    sockets = [sck]
else:
    log.info('Binding sockets')
    sockets = bind_sockets(options.socket_port)

log.info('Accepting')
for sck in sockets:
    add_accept_handler(sck, handle_connection, ioloop)


log.info('Listening')
http_server = SystemdHTTPServer(server)
http_server.listen(options.server_port)

log.info('Starting loop')
ioloop.start()
开发者ID:JacekPliszka,项目名称:wdb,代码行数:30,代码来源:wdb.server.py



注:本文中的tornado.netutil.add_accept_handler函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python netutil.bind_sockets函数代码示例发布时间:2022-05-27
下一篇:
Python gen_log.warning函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap