• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pytest.wait_for函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pytest.wait_for函数的典型用法代码示例。如果您正苦于以下问题:Python wait_for函数的具体用法?Python wait_for怎么用?Python wait_for使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wait_for函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_close

def test_close(Poller, ipv6):
    m = Manager() + Poller()
    server = Server() + UDPServer(0)
    server.register(m)
    m.start()

    try:
        assert pytest.wait_for(server, "ready")
        wait_host(server)

        host, port = server.host, server.port

        server.fire(close())
        assert pytest.wait_for(server, "disconnected")

        server.unregister()

        def test(obj, attr):
            return attr not in obj.components
        assert pytest.wait_for(m, server, value=test)

        server = Server() + UDPServer((host, port))
        server.register(m)

        assert pytest.wait_for(server, "ready", timeout=30.0)
    finally:
        m.stop()
开发者ID:ke4roh,项目名称:circuits,代码行数:27,代码来源:test_udp.py


示例2: test_basic

def test_basic(Poller, ipv6):
    m = Manager() + Poller()

    if ipv6:
        udp_server = UDP6Server(("::1", 0))
        udp_client = UDP6Client(("::1", 0), channel="client")
    else:
        udp_server = UDPServer(0)
        udp_client = UDPClient(0, channel="client")
    server = Server() + udp_server
    client = Client() + udp_client

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(server, "ready")
        assert pytest.wait_for(client, "ready")
        wait_host(server)

        client.fire(write((server.host, server.port), b"foo"))
        assert pytest.wait_for(server, "data", b"foo")

        client.fire(close())
        assert pytest.wait_for(client, "closed")

        server.fire(close())
        assert pytest.wait_for(server, "closed")
    finally:
        m.stop()
开发者ID:ke4roh,项目名称:circuits,代码行数:32,代码来源:test_udp.py


示例3: test_main

def test_main():
    id = "%s:%s" % (os.getpid(), current_thread().getName())

    m = Manager()
    assert repr(m) == "<Manager/ %s (queued=0) [S]>" % id

    app = App()
    app.register(m)
    s = repr(m)
    assert s == "<Manager/ %s (queued=1) [S]>" % id

    m.start()

    pytest.wait_for(m, "_running", True)
    sleep(0.1)

    s = repr(m)
    assert s == "<Manager/ %s (queued=0) [R]>" % id

    m.stop()

    pytest.wait_for(m, "_Manager__thread", None)

    s = repr(m)
    assert s == "<Manager/ %s (queued=0) [S]>" % id
开发者ID:AdricEpic,项目名称:circuits,代码行数:25,代码来源:test_manager_repr.py


示例4: test_tcp_lookup_failure

def test_tcp_lookup_failure(Poller, ipv6):
    m = Manager() + Poller()

    if ipv6:
        tcp_client = TCP6Client()
    else:
        tcp_client = TCPClient()
    client = Client() + tcp_client

    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(client, "ready")

        client.fire(connect("foo", 1234))
        assert pytest.wait_for(
            client, "error", lambda obj, attr: isinstance(getattr(obj, attr), SocketError))
        if pytest.PLATFORM == "win32":
            assert client.error.errno == 11004
        else:
            assert client.error.errno in (EAI_NODATA, EAI_NONAME,)
    finally:
        m.stop()
开发者ID:eriol,项目名称:circuits,代码行数:25,代码来源:test_tcp.py


示例5: test_unix

def test_unix(tmpdir, Poller):
    m = Manager() + Poller()

    sockpath = tmpdir.ensure("test.sock")
    filename = str(sockpath)

    server = Server() + UNIXServer(filename)
    client = Client() + UNIXClient()

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(server, "ready")
        assert pytest.wait_for(client, "ready")

        client.fire(connect(filename))
        assert pytest.wait_for(client, "connected")
        assert pytest.wait_for(server, "connected")
        assert pytest.wait_for(client, "data", b"Ready")

        client.fire(write(b"foo"))
        assert pytest.wait_for(server, "data", b"foo")

        client.fire(close())
        assert pytest.wait_for(client, "disconnected")
        assert pytest.wait_for(server, "disconnected")

        server.fire(close())
        assert pytest.wait_for(server, "closed")
    finally:
        m.stop()
        os.remove(filename)
开发者ID:billtsay,项目名称:win-demo-opcua,代码行数:35,代码来源:test_unix.py


示例6: test_unix

def test_unix(tmpfile, Poller):
    m = Manager() + Poller()

    server = Server() + UNIXServer(tmpfile)
    client = Client() + UNIXClient()

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(server, "ready")
        assert pytest.wait_for(client, "ready")

        client.fire(connect(tmpfile))
        assert pytest.wait_for(client, "connected")
        assert pytest.wait_for(server, "connected")
        assert pytest.wait_for(client, "data", b"Ready")

        client.fire(write(b"foo"))
        assert pytest.wait_for(server, "data", b"foo")

        client.fire(close())
        assert pytest.wait_for(client, "disconnected")
        assert pytest.wait_for(server, "disconnected")

        server.fire(close())
        assert pytest.wait_for(server, "closed")
    finally:
        m.stop()
开发者ID:spaceone,项目名称:circuits,代码行数:31,代码来源:test_unix.py


示例7: test

def test(worker):
    x = worker.fire(task(f))

    assert pytest.wait_for(x, "result")

    assert x.result
    assert x.value == 1000000
开发者ID:carriercomm,项目名称:circuits,代码行数:7,代码来源:test_worker_thread.py


示例8: test_args

def test_args(worker):
    x = worker.fire(task(add, 1, 2))

    assert pytest.wait_for(x, "result")

    assert x.result
    assert x.value == 3
开发者ID:carriercomm,项目名称:circuits,代码行数:7,代码来源:test_worker_thread.py


示例9: test_datetime

def test_datetime(app):
    now = datetime.now()
    d = now + timedelta(seconds=0.1)
    timer = Timer(d, test(), "timer")
    timer.register(app)
    assert pytest.wait_for(app, "flag")
    app.reset()
开发者ID:billtsay,项目名称:win-demo-opcua,代码行数:7,代码来源:test_timers.py


示例10: test_http_1_1_no_host_headers

def test_http_1_1_no_host_headers(webapp):
    transport = TCPClient()
    client = Client()
    client += transport
    client.start()

    host, port, resource, secure = parse_url(webapp.server.http.base)
    client.fire(connect(host, port))
    assert pytest.wait_for(transport, "connected")

    client.fire(write(b"GET / HTTP/1.1\r\n\r\n"))
    assert pytest.wait_for(client, "done")

    client.stop()

    s = client.buffer().decode('utf-8').split('\r\n')[0]
    assert s == "HTTP/1.1 400 Bad Request"
开发者ID:spaceone,项目名称:circuits,代码行数:17,代码来源:test_http.py


示例11: test_tcp_connect_closed_port

def test_tcp_connect_closed_port(Poller, ipv6):
    ### FIXME: This test is wrong.
    ### We need to figure out the sequence of events on Windows
    ### for this scenario. I think if you attempt to connect to
    ### a shutdown listening socket (tcp server) you should get
    ### an error event as response.

    if pytest.PLATFORM == "win32":
        pytest.skip("Broken on Windows")

    m = Manager() + Poller()
    if ipv6:
        tcp_server = TCP6Server(("::1", 0))
        tcp_client = TCP6Client()
    else:
        tcp_server = TCPServer(0)
        tcp_client = TCPClient()
    server = Server() + tcp_server
    client = Client() + tcp_client

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(client, "ready")
        assert pytest.wait_for(server, "ready")
        wait_host(server)

        host, port = server.host, server.port
        tcp_server._sock.close()

        # 1st connect
        client.fire(connect(host, port))
        assert pytest.wait_for(client, "connected")
        assert isinstance(client.error, SocketError)

        client.fire(write(b"foo"))
        assert pytest.wait_for(client, "disconnected")

        client.disconnected = False
        client.fire(write(b"foo"))
        assert pytest.wait_for(client, "disconnected", timeout=1.0) is None
    finally:
        m.stop()
开发者ID:billtsay,项目名称:win-demo-opcua,代码行数:46,代码来源:test_tcp.py


示例12: test

def test(webapp):
    transport = TCPClient()
    client = Client()
    client += transport
    client.start()

    host, port, resource, secure = parse_url(webapp.server.http.base)
    client.fire(connect(host, port))
    assert pytest.wait_for(transport, "connected")

    client.fire(write(b"GET / HTTP/1.1\r\n"))
    client.fire(write(b"Content-Type: text/plain\r\n\r\n"))
    assert pytest.wait_for(client, "done")

    client.stop()

    s = client.buffer().decode('utf-8').split('\r\n')[0]
    assert s == "HTTP/1.1 200 OK"
开发者ID:AdricEpic,项目名称:circuits,代码行数:18,代码来源:test_http.py


示例13: test_inheritence

def test_inheritence():
    app = App1()
    app.start()

    x = app.fire(test())
    assert pytest.wait_for(x, "result")
    v = x.value
    assert v == ["Hello World!", "Foobar"]

    app.stop()
开发者ID:ke4roh,项目名称:circuits,代码行数:10,代码来源:test_inheritence.py


示例14: test_override

def test_override():
    app = App2()
    app.start()

    x = app.fire(test())
    assert pytest.wait_for(x, "result")
    v = x.value
    assert v == "Foobar"

    app.stop()
开发者ID:ke4roh,项目名称:circuits,代码行数:10,代码来源:test_inheritence.py


示例15: test_tcp_bind

def test_tcp_bind(Poller, ipv6):
    m = Manager() + Poller()

    if ipv6:
        sock = socket(AF_INET6, SOCK_STREAM)
        sock.bind(("::1", 0))
        sock.listen(5)
        _, bind_port, _, _ = sock.getsockname()
        sock.close()
        server = Server() + TCP6Server(("::1", 0))
        client = Client() + TCP6Client()
    else:
        sock = socket(AF_INET, SOCK_STREAM)
        sock.bind(("", 0))
        sock.listen(5)
        _, bind_port = sock.getsockname()
        sock.close()
        server = Server() + TCPServer(0)
        client = Client() + TCPClient()

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(client, "ready")
        assert pytest.wait_for(server, "ready")
        wait_host(server)

        client.fire(connect(server.host, server.port))
        assert pytest.wait_for(client, "connected")
        assert pytest.wait_for(server, "connected")
        assert pytest.wait_for(client, "data", b"Ready")

        # assert server.client[1] == bind_port

        client.fire(write(b"foo"))
        assert pytest.wait_for(server, "data", b"foo")

        client.fire(close())
        assert pytest.wait_for(client, "disconnected")
        assert pytest.wait_for(server, "disconnected")

        server.fire(close())
        assert pytest.wait_for(server, "closed")
    finally:
        m.stop()
开发者ID:ke4roh,项目名称:circuits,代码行数:48,代码来源:test_tcp.py


示例16: test_tcps_basic

def test_tcps_basic(Poller, ipv6):
    from circuits import Debugger
    m = Manager() + Debugger() + Poller()

    if ipv6:
        tcp_server = TCP6Server(("::1", 0), secure=True, certfile=CERT_FILE)
        tcp_client = TCP6Client()
    else:
        tcp_server = TCPServer(0, secure=True, certfile=CERT_FILE)
        tcp_client = TCPClient()
    server = Server() + tcp_server
    client = Client() + tcp_client

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(client, "ready")
        assert pytest.wait_for(server, "ready")
        wait_host(server)

        client.fire(connect(server.host, server.port, secure=True))
        assert pytest.wait_for(client, "connected")
        assert pytest.wait_for(server, "connected")
        assert pytest.wait_for(client, "data", b"Ready")

        client.fire(write(b"foo"))
        assert pytest.wait_for(server, "data", b"foo")
        assert pytest.wait_for(client, "data", b"foo")

        client.fire(close())
        assert pytest.wait_for(client, "disconnected")
        assert pytest.wait_for(server, "disconnected")

        server.fire(close())
        assert pytest.wait_for(server, "closed")
    finally:
        m.stop()
开发者ID:giuse88,项目名称:circuits,代码行数:40,代码来源:test_tcp.py


示例17: test_pipe

def test_pipe(Poller):
    m = Manager() + Poller()

    a, b = Pipe("a", "b")
    a.register(m)
    b.register(m)

    a = Client(channel=a.channel).register(m)
    b = Client(channel=b.channel).register(m)

    m.start()

    try:
        assert pytest.wait_for(a, "ready")
        assert pytest.wait_for(b, "ready")

        a.fire(write(b"foo"))
        assert pytest.wait_for(b, "data", b"foo")

        b.fire(write(b"foo"))
        assert pytest.wait_for(a, "data", b"foo")

        a.fire(close())
        assert pytest.wait_for(a, "disconnected")

        b.fire(close())
        assert pytest.wait_for(b, "disconnected")
    finally:
        m.stop()
开发者ID:spaceone,项目名称:circuits,代码行数:29,代码来源:test_pipe.py


示例18: test_tcp_basic

def test_tcp_basic(Poller, ipv6):
    m = Manager() + Poller()

    if ipv6:
        tcp_server = TCP6Server(("::1", 0))
        tcp_client = TCP6Client()
    else:
        tcp_server = TCPServer(0)
        tcp_client = TCPClient()
    server = Server() + tcp_server
    client = Client() + tcp_client

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(client, "ready")
        assert pytest.wait_for(server, "ready")
        wait_host(server)

        client.fire(connect(server.host, server.port))
        assert pytest.wait_for(client, "connected")
        assert pytest.wait_for(server, "connected")
        assert pytest.wait_for(client, "data", b"Ready")

        client.fire(write(b"foo"))
        assert pytest.wait_for(server, "data", b"foo")
        assert pytest.wait_for(client, "data", b"foo")

        client.fire(close())
        assert pytest.wait_for(client, "disconnected")
        assert pytest.wait_for(server, "disconnected")

        server.fire(close())
        assert pytest.wait_for(server, "closed")
    finally:
        m.stop()
开发者ID:ke4roh,项目名称:circuits,代码行数:39,代码来源:test_tcp.py


示例19: test_tcp_connect_closed_port

def test_tcp_connect_closed_port(Poller, ipv6):

    if pytest.PLATFORM == "win32":
        pytest.skip("Broken on Windows")

    m = Manager() + Poller() + Debugger()

    if ipv6:
        tcp_server = TCP6Server(("::1", 0))
        tcp_client = TCP6Client(connect_timeout=1)
    else:
        tcp_server = TCPServer(0)
        tcp_client = TCPClient(connect_timeout=1)
    server = Server() + tcp_server
    client = Client() + tcp_client

    server.register(m)
    client.register(m)

    m.start()

    try:
        assert pytest.wait_for(client, "ready")
        assert pytest.wait_for(server, "ready")
        wait_host(server)

        host, port = server.host, server.port
        tcp_server._sock.close()

        # 1st connect
        client.fire(connect(host, port))
        waiter = WaitEvent(m, "unreachable", channel='client')
        assert waiter.wait()
    finally:
        server.unregister()
        client.unregister()
        m.stop()
开发者ID:ke4roh,项目名称:circuits,代码行数:37,代码来源:test_tcp.py


示例20: test

def test(manager, watcher):
    app = App()
    process, bridge = app.start(process=True, link=manager)
    assert watcher.wait("ready", timeout=30)

    x = manager.fire(hello())

    assert pytest.wait_for(x, "result")

    assert x.value == "Hello from {0:d}".format(app.pid)

    app.stop()
    app.join()

    bridge.unregister()
    watcher.wait("unregistered")
开发者ID:eriol,项目名称:circuits,代码行数:16,代码来源:test_bridge.py



注:本文中的pytest.wait_for函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pytest.warns函数代码示例发布时间:2022-05-27
下一篇:
Python pytest.skip函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap