• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.asyncio_patch函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.utils.asyncio_patch函数的典型用法代码示例。如果您正苦于以下问题:Python asyncio_patch函数的具体用法?Python asyncio_patch怎么用?Python asyncio_patch使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了asyncio_patch函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_reload

def test_reload(loop, vm, async_run):
    process = MagicMock()

    # Wait process kill success
    future = asyncio.Future()
    future.set_result(True)
    process.wait.return_value = future
    process.returncode = None

    vm.ip_address = "192.168.1.1"
    with patch("sys.platform", return_value="win"):
        with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
            with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
                nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
                async_run(vm.port_add_nio_binding(0, nio))

                vm._ubridge_send = AsyncioMagicMock()
                async_run(vm.start("192.168.1.2"))
                assert vm.is_running()

                with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
                    async_run(vm.reload())
                assert vm.is_running() is True

                #if sys.platform.startswith("win"):
                #    process.send_signal.assert_called_with(1)
                #else:
                process.terminate.assert_called_with()
开发者ID:GNS3,项目名称:gns3-server,代码行数:28,代码来源:test_traceng_vm.py


示例2: test_look_for_interface

def test_look_for_interface(gns3vm, async_run):
    showvminfo = """
nic1="hostonly"
nictype1="82540EM"
nicspeed1="0"
nic2="nat"
nictype2="82540EM"
nicspeed2="0"
nic3="none"
nic4="none"
nic5="none"
nic6="none"
nic7="none"
nic8="none"
vcpwidth=1024
vcpheight=768
vcprate=512
vcpfps=25
GuestMemoryBalloon=0
    """

    with asyncio_patch("gns3server.controller.gns3vm.virtualbox_gns3_vm.VirtualBoxGNS3VM._execute", return_value=showvminfo) as mock:
        res = async_run(gns3vm._look_for_interface("nat"))
    mock.assert_called_with('showvminfo', ['GNS3 VM', '--machinereadable'])
    assert res == 2

    with asyncio_patch("gns3server.controller.gns3vm.virtualbox_gns3_vm.VirtualBoxGNS3VM._execute", return_value=showvminfo) as mock:
        res = async_run(gns3vm._look_for_interface("dummy"))
    assert res == -1
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:29,代码来源:test_virtualbox_gns3_vm.py


示例3: test_reload

def test_reload(loop, vm, async_run):
    process = MagicMock()

    # Wait process kill success
    future = asyncio.Future()
    future.set_result(True)
    process.wait.return_value = future
    process.returncode = None

    with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
        with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
            with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
                nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
                async_run(vm.port_add_nio_binding(0, nio))
                async_run(vm.start())
                assert vm.is_running()

                with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
                    async_run(vm.reload())
                assert vm.is_running() is True

                if sys.platform.startswith("win"):
                    process.send_signal.assert_called_with(1)
                else:
                    process.terminate.assert_called_with()
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:25,代码来源:test_vpcs_vm.py


示例4: test_pull_image

def test_pull_image(loop):
    class Response:
        """
        Simulate a response splitted in multiple packets
        """

        def __init__(self):
            self._read = -1

        async def read(self, size):
            self._read += 1
            if self._read == 0:
                return b'{"progress": "0/100",'
            elif self._read == 1:
                return '"id": 42}'
            else:
                None

    mock_query = MagicMock()
    mock_query.content.return_value = Response()

    with asyncio_patch("gns3server.compute.docker.Docker.query", side_effect=DockerHttp404Error("404")):
        with asyncio_patch("gns3server.compute.docker.Docker.http_query", return_value=mock_query) as mock:
            images = loop.run_until_complete(asyncio.ensure_future(Docker.instance().pull_image("ubuntu")))
            mock.assert_called_with("POST", "images/create", params={"fromImage": "ubuntu"}, timeout=None)
开发者ID:GNS3,项目名称:gns3-server,代码行数:25,代码来源:test_docker.py


示例5: test_start

def test_start(loop, vm, async_run):
    process = MagicMock()
    process.returncode = None

    with NotificationManager.instance().queue() as queue:
        async_run(queue.get(0))  # Ping

        with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
            with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
                with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
                    nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
                    async_run(vm.port_add_nio_binding(0, nio))
                    loop.run_until_complete(asyncio.async(vm.start()))
                    assert mock_exec.call_args[0] == (vm._vpcs_path(),
                                                      '-p',
                                                      str(vm._internal_console_port),
                                                      '-m', '1',
                                                      '-i',
                                                      '1',
                                                      '-F',
                                                      '-R',
                                                      '-s',
                                                      ANY,
                                                      '-c',
                                                      ANY,
                                                      '-t',
                                                      '127.0.0.1')
                assert vm.is_running()
                assert vm.command_line == ' '.join(mock_exec.call_args[0])
        (action, event, kwargs) = async_run(queue.get(0))
        assert action == "node.updated"
        assert event == vm
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:32,代码来源:test_vpcs_vm.py


示例6: test_start

def test_start(loop, vm, async_run):
    process = MagicMock()
    process.returncode = None

    with NotificationManager.instance().queue() as queue:
        async_run(queue.get(1))  # Ping

        vm.ip_address = "192.168.1.1"
        with patch("sys.platform", return_value="win"):
            with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
                with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
                    loop.run_until_complete(asyncio.ensure_future(vm.start("192.168.1.2")))
                    assert mock_exec.call_args[0] == (vm._traceng_path(),
                                                      '-u',
                                                      '-c',
                                                      ANY,
                                                      '-v',
                                                      ANY,
                                                      '-b',
                                                      '127.0.0.1',
                                                      '-s',
                                                      'ICMP',
                                                      '-f',
                                                      '192.168.1.1',
                                                      '192.168.1.2')
                    assert vm.is_running()
                    assert vm.command_line == ' '.join(mock_exec.call_args[0])
        (action, event, kwargs) = async_run(queue.get(1))
        assert action == "node.updated"
        assert event == vm
开发者ID:GNS3,项目名称:gns3-server,代码行数:30,代码来源:test_traceng_vm.py


示例7: test_stop

def test_stop(loop, vm, async_run):
    process = MagicMock()

    # Wait process kill success
    future = asyncio.Future()
    future.set_result(True)
    process.wait.return_value = future
    process.returncode = None

    with NotificationManager.instance().queue() as queue:
        with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
            with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
                with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
                    nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
                    async_run(vm.port_add_nio_binding(0, nio))

                    async_run(vm.start())
                    assert vm.is_running()

                    with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
                        loop.run_until_complete(asyncio.async(vm.stop()))
                    assert vm.is_running() is False

                    if sys.platform.startswith("win"):
                        process.send_signal.assert_called_with(1)
                    else:
                        process.terminate.assert_called_with()

                    async_run(queue.get(0))  #  Ping
                    async_run(queue.get(0))  #  Started

                    (action, event, kwargs) = async_run(queue.get(0))
                    assert action == "node.updated"
                    assert event == vm
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:34,代码来源:test_vpcs_vm.py


示例8: test_start

def test_start(loop, vm):
    process = MagicMock()
    process.returncode = None
    queue = vm.project.get_listen_queue()

    with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
        with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
            nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
            vm.port_add_nio_binding(0, nio)
            loop.run_until_complete(asyncio.async(vm.start()))
            assert mock_exec.call_args[0] == (vm.vpcs_path,
                                              '-p',
                                              str(vm.console),
                                              '-m', '1',
                                              '-i',
                                              '1',
                                              '-F',
                                              '-R',
                                              '-s',
                                              '4242',
                                              '-c',
                                              '4243',
                                              '-t',
                                              '127.0.0.1')
            assert vm.is_running()
            assert vm.command_line == ' '.join(mock_exec.call_args[0])
    (action, event) = queue.get_nowait()
    assert action == "vm.started"
    assert event == vm
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:29,代码来源:test_vpcs_vm.py


示例9: test_start_0_6_1

def test_start_0_6_1(loop, vm, async_run):
    """
    Version 0.6.1 doesn't have the -R options. It's not require
    because GNS3 provide a patch for this.
    """
    process = MagicMock()
    process.returncode = None
    vm._vpcs_version = parse_version("0.6.1")

    with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
        with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
            with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
                nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
                async_run(vm.port_add_nio_binding(0, nio))
                async_run(vm.start())
                assert mock_exec.call_args[0] == (vm._vpcs_path(),
                                                  '-p',
                                                  str(vm._internal_console_port),
                                                  '-m', '1',
                                                  '-i',
                                                  '1',
                                                  '-F',
                                                  '-s',
                                                  ANY,
                                                  '-c',
                                                  ANY,
                                                  '-t',
                                                  '127.0.0.1')
                assert vm.is_running()
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:29,代码来源:test_vpcs_vm.py


示例10: test_stop

def test_stop(loop, vm):
    process = MagicMock()

    # Wait process kill success
    future = asyncio.Future()
    future.set_result(True)
    process.wait.return_value = future
    process.returncode = None

    with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
        with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
            nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
            vm.port_add_nio_binding(0, nio)

            loop.run_until_complete(asyncio.async(vm.start()))
            assert vm.is_running()

            queue = vm.project.get_listen_queue()

            with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
                loop.run_until_complete(asyncio.async(vm.stop()))
            assert vm.is_running() is False

            if sys.platform.startswith("win"):
                process.send_signal.assert_called_with(1)
            else:
                process.terminate.assert_called_with()

            (action, event) = queue.get_nowait()
            assert action == "vm.stopped"
            assert event == vm
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:31,代码来源:test_vpcs_vm.py


示例11: test_start_0_6_1

def test_start_0_6_1(loop, vm):
    """
    Version 0.6.1 doesn't have the -R options. It's not require
    because GNS3 provide a patch for this.
    """
    process = MagicMock()
    process.returncode = None
    queue = vm.project.get_listen_queue()
    vm._vpcs_version = parse_version("0.6.1")

    with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
        with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
            nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
            vm.port_add_nio_binding(0, nio)
            loop.run_until_complete(asyncio.async(vm.start()))
            assert mock_exec.call_args[0] == (vm.vpcs_path,
                                              '-p',
                                              str(vm.console),
                                              '-m', '1',
                                              '-i',
                                              '1',
                                              '-F',
                                              '-s',
                                              '4242',
                                              '-c',
                                              '4243',
                                              '-t',
                                              '127.0.0.1')
            assert vm.is_running()
    (action, event) = queue.get_nowait()
    assert action == "vm.started"
    assert event == vm
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:32,代码来源:test_vpcs_vm.py


示例12: test_stop

def test_stop(loop, vm, async_run):
    process = MagicMock()

    # Wait process kill success
    future = asyncio.Future()
    future.set_result(True)
    process.wait.return_value = future
    process.returncode = None

    vm.ip_address = "192.168.1.1"
    with NotificationManager.instance().queue() as queue:
        with patch("sys.platform", return_value="win"):
            with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
                with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
                    nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
                    async_run(vm.port_add_nio_binding(0, nio))

                    vm._ubridge_send = AsyncioMagicMock()
                    async_run(vm.start("192.168.1.2"))
                    assert vm.is_running()

                    with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
                        loop.run_until_complete(asyncio.ensure_future(vm.stop()))
                    assert vm.is_running() is False

                    process.terminate.assert_called_with()

                    async_run(queue.get(1))  #  Ping
                    async_run(queue.get(1))  #  Started

                    (action, event, kwargs) = async_run(queue.get(1))
                    assert action == "node.updated"
                    assert event == vm
开发者ID:GNS3,项目名称:gns3-server,代码行数:33,代码来源:test_traceng_vm.py


示例13: test_close

def test_close(vm, port_manager, loop):

    with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
        with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
            with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
                loop.run_until_complete(asyncio.ensure_future(vm.start()))
                loop.run_until_complete(asyncio.ensure_future(vm.close()))
                assert vm.is_running() is False
开发者ID:GNS3,项目名称:gns3-server,代码行数:8,代码来源:test_vpcs_vm.py


示例14: mock_ubridge

def mock_ubridge():
    """
    Avoid all interaction with ubridge
    """
    with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._start_ubridge"):
        with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._add_ubridge_connection"):
            with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._delete_ubridge_connection"):
                yield
开发者ID:GNS3,项目名称:gns3-server,代码行数:8,代码来源:test_nat.py


示例15: vm

def vm(server, project, base_params):
    with asyncio_patch("gns3server.modules.docker.Docker.list_images", return_value=[{"image": "nginx"}]) as mock_list:
        with asyncio_patch("gns3server.modules.docker.Docker.query", return_value={"Id": "8bd8153ea8f5"}) as mock:
            response = server.post("/projects/{project_id}/docker/vms".format(project_id=project.id), base_params)
    if response.status != 201:
        print(response.body)
    assert response.status == 201
    return response.json
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:8,代码来源:test_docker.py


示例16: test_close

def test_close(vm, port_manager, loop):
    vm.ip_address = "192.168.1.1"
    with patch("sys.platform", return_value="win"):
        with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
            with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
                loop.run_until_complete(asyncio.ensure_future(vm.start("192.168.1.2")))
                loop.run_until_complete(asyncio.ensure_future(vm.close()))
                assert vm.is_running() is False
开发者ID:GNS3,项目名称:gns3-server,代码行数:8,代码来源:test_traceng_vm.py


示例17: test_start_vnc

def test_start_vnc(vm, loop):
    vm.console_resolution = "1280x1024"
    with patch("shutil.which", return_value="/bin/Xtigervnc"):
        with asyncio_patch("gns3server.compute.docker.docker_vm.wait_for_file_creation") as mock_wait:
            with asyncio_patch("asyncio.create_subprocess_exec") as mock_exec:
                loop.run_until_complete(asyncio.ensure_future(vm._start_vnc()))
    assert vm._display is not None
    assert mock_exec.call_args[0] == ("Xtigervnc", "-geometry", vm.console_resolution, "-depth", "16", "-interface", "127.0.0.1", "-rfbport", str(vm.console), "-AlwaysShared", "-SecurityTypes", "None", ":{}".format(vm._display))
    mock_wait.assert_called_with("/tmp/.X11-unix/X{}".format(vm._display))
开发者ID:GNS3,项目名称:gns3-server,代码行数:9,代码来源:test_docker_vm.py


示例18: vm

def vm(http_compute, project, base_params):
    with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "nginx"}]) as mock_list:
        with asyncio_patch("gns3server.compute.docker.Docker.query", return_value={"Id": "8bd8153ea8f5"}) as mock:
            with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="exited") as mock:
                response = http_compute.post("/projects/{project_id}/docker/nodes".format(project_id=project.id), base_params)
    if response.status != 201:
        print(response.body)
    assert response.status == 201
    return response.json
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:9,代码来源:test_docker.py


示例19: test_close

def test_close(vm, port_manager, loop):
    with asyncio_patch("gns3server.modules.iou.iou_vm.IOUVM._check_requirements", return_value=True):
        with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
            vm.start()
            port = vm.console
            loop.run_until_complete(asyncio.async(vm.close()))
            # Raise an exception if the port is not free
            port_manager.reserve_tcp_port(port, vm.project)
            assert vm.is_running() is False
开发者ID:bluca,项目名称:gns3-server,代码行数:9,代码来源:test_iou_vm.py


示例20: test_library_check

def test_library_check(loop, vm):

    with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="") as mock:

        loop.run_until_complete(asyncio.async(vm._library_check()))

    with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="libssl => not found") as mock:
        with pytest.raises(IOUError):
            loop.run_until_complete(asyncio.async(vm._library_check()))
开发者ID:bluca,项目名称:gns3-server,代码行数:9,代码来源:test_iou_vm.py



注:本文中的tests.utils.asyncio_patch函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.test_create函数代码示例发布时间:2022-05-27
下一篇:
Python utils.assert_path_not_in_errors函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap