本文整理汇总了Python中tests.utils.asyncio_patch函数的典型用法代码示例。如果您正苦于以下问题:Python asyncio_patch函数的具体用法?Python asyncio_patch怎么用?Python asyncio_patch使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了asyncio_patch函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_reload
def test_reload(loop, vm, async_run):
process = MagicMock()
# Wait process kill success
future = asyncio.Future()
future.set_result(True)
process.wait.return_value = future
process.returncode = None
vm.ip_address = "192.168.1.1"
with patch("sys.platform", return_value="win"):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
async_run(vm.port_add_nio_binding(0, nio))
vm._ubridge_send = AsyncioMagicMock()
async_run(vm.start("192.168.1.2"))
assert vm.is_running()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
async_run(vm.reload())
assert vm.is_running() is True
#if sys.platform.startswith("win"):
# process.send_signal.assert_called_with(1)
#else:
process.terminate.assert_called_with()
开发者ID:GNS3,项目名称:gns3-server,代码行数:28,代码来源:test_traceng_vm.py
示例2: test_look_for_interface
def test_look_for_interface(gns3vm, async_run):
showvminfo = """
nic1="hostonly"
nictype1="82540EM"
nicspeed1="0"
nic2="nat"
nictype2="82540EM"
nicspeed2="0"
nic3="none"
nic4="none"
nic5="none"
nic6="none"
nic7="none"
nic8="none"
vcpwidth=1024
vcpheight=768
vcprate=512
vcpfps=25
GuestMemoryBalloon=0
"""
with asyncio_patch("gns3server.controller.gns3vm.virtualbox_gns3_vm.VirtualBoxGNS3VM._execute", return_value=showvminfo) as mock:
res = async_run(gns3vm._look_for_interface("nat"))
mock.assert_called_with('showvminfo', ['GNS3 VM', '--machinereadable'])
assert res == 2
with asyncio_patch("gns3server.controller.gns3vm.virtualbox_gns3_vm.VirtualBoxGNS3VM._execute", return_value=showvminfo) as mock:
res = async_run(gns3vm._look_for_interface("dummy"))
assert res == -1
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:29,代码来源:test_virtualbox_gns3_vm.py
示例3: test_reload
def test_reload(loop, vm, async_run):
process = MagicMock()
# Wait process kill success
future = asyncio.Future()
future.set_result(True)
process.wait.return_value = future
process.returncode = None
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
async_run(vm.port_add_nio_binding(0, nio))
async_run(vm.start())
assert vm.is_running()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
async_run(vm.reload())
assert vm.is_running() is True
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:25,代码来源:test_vpcs_vm.py
示例4: test_pull_image
def test_pull_image(loop):
class Response:
"""
Simulate a response splitted in multiple packets
"""
def __init__(self):
self._read = -1
async def read(self, size):
self._read += 1
if self._read == 0:
return b'{"progress": "0/100",'
elif self._read == 1:
return '"id": 42}'
else:
None
mock_query = MagicMock()
mock_query.content.return_value = Response()
with asyncio_patch("gns3server.compute.docker.Docker.query", side_effect=DockerHttp404Error("404")):
with asyncio_patch("gns3server.compute.docker.Docker.http_query", return_value=mock_query) as mock:
images = loop.run_until_complete(asyncio.ensure_future(Docker.instance().pull_image("ubuntu")))
mock.assert_called_with("POST", "images/create", params={"fromImage": "ubuntu"}, timeout=None)
开发者ID:GNS3,项目名称:gns3-server,代码行数:25,代码来源:test_docker.py
示例5: test_start
def test_start(loop, vm, async_run):
process = MagicMock()
process.returncode = None
with NotificationManager.instance().queue() as queue:
async_run(queue.get(0)) # Ping
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
async_run(vm.port_add_nio_binding(0, nio))
loop.run_until_complete(asyncio.async(vm.start()))
assert mock_exec.call_args[0] == (vm._vpcs_path(),
'-p',
str(vm._internal_console_port),
'-m', '1',
'-i',
'1',
'-F',
'-R',
'-s',
ANY,
'-c',
ANY,
'-t',
'127.0.0.1')
assert vm.is_running()
assert vm.command_line == ' '.join(mock_exec.call_args[0])
(action, event, kwargs) = async_run(queue.get(0))
assert action == "node.updated"
assert event == vm
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:32,代码来源:test_vpcs_vm.py
示例6: test_start
def test_start(loop, vm, async_run):
process = MagicMock()
process.returncode = None
with NotificationManager.instance().queue() as queue:
async_run(queue.get(1)) # Ping
vm.ip_address = "192.168.1.1"
with patch("sys.platform", return_value="win"):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
loop.run_until_complete(asyncio.ensure_future(vm.start("192.168.1.2")))
assert mock_exec.call_args[0] == (vm._traceng_path(),
'-u',
'-c',
ANY,
'-v',
ANY,
'-b',
'127.0.0.1',
'-s',
'ICMP',
'-f',
'192.168.1.1',
'192.168.1.2')
assert vm.is_running()
assert vm.command_line == ' '.join(mock_exec.call_args[0])
(action, event, kwargs) = async_run(queue.get(1))
assert action == "node.updated"
assert event == vm
开发者ID:GNS3,项目名称:gns3-server,代码行数:30,代码来源:test_traceng_vm.py
示例7: test_stop
def test_stop(loop, vm, async_run):
process = MagicMock()
# Wait process kill success
future = asyncio.Future()
future.set_result(True)
process.wait.return_value = future
process.returncode = None
with NotificationManager.instance().queue() as queue:
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
async_run(vm.port_add_nio_binding(0, nio))
async_run(vm.start())
assert vm.is_running()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
loop.run_until_complete(asyncio.async(vm.stop()))
assert vm.is_running() is False
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
async_run(queue.get(0)) # Ping
async_run(queue.get(0)) # Started
(action, event, kwargs) = async_run(queue.get(0))
assert action == "node.updated"
assert event == vm
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:34,代码来源:test_vpcs_vm.py
示例8: test_start
def test_start(loop, vm):
process = MagicMock()
process.returncode = None
queue = vm.project.get_listen_queue()
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
vm.port_add_nio_binding(0, nio)
loop.run_until_complete(asyncio.async(vm.start()))
assert mock_exec.call_args[0] == (vm.vpcs_path,
'-p',
str(vm.console),
'-m', '1',
'-i',
'1',
'-F',
'-R',
'-s',
'4242',
'-c',
'4243',
'-t',
'127.0.0.1')
assert vm.is_running()
assert vm.command_line == ' '.join(mock_exec.call_args[0])
(action, event) = queue.get_nowait()
assert action == "vm.started"
assert event == vm
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:29,代码来源:test_vpcs_vm.py
示例9: test_start_0_6_1
def test_start_0_6_1(loop, vm, async_run):
"""
Version 0.6.1 doesn't have the -R options. It's not require
because GNS3 provide a patch for this.
"""
process = MagicMock()
process.returncode = None
vm._vpcs_version = parse_version("0.6.1")
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
async_run(vm.port_add_nio_binding(0, nio))
async_run(vm.start())
assert mock_exec.call_args[0] == (vm._vpcs_path(),
'-p',
str(vm._internal_console_port),
'-m', '1',
'-i',
'1',
'-F',
'-s',
ANY,
'-c',
ANY,
'-t',
'127.0.0.1')
assert vm.is_running()
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:29,代码来源:test_vpcs_vm.py
示例10: test_stop
def test_stop(loop, vm):
process = MagicMock()
# Wait process kill success
future = asyncio.Future()
future.set_result(True)
process.wait.return_value = future
process.returncode = None
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
vm.port_add_nio_binding(0, nio)
loop.run_until_complete(asyncio.async(vm.start()))
assert vm.is_running()
queue = vm.project.get_listen_queue()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
loop.run_until_complete(asyncio.async(vm.stop()))
assert vm.is_running() is False
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
(action, event) = queue.get_nowait()
assert action == "vm.stopped"
assert event == vm
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:31,代码来源:test_vpcs_vm.py
示例11: test_start_0_6_1
def test_start_0_6_1(loop, vm):
"""
Version 0.6.1 doesn't have the -R options. It's not require
because GNS3 provide a patch for this.
"""
process = MagicMock()
process.returncode = None
queue = vm.project.get_listen_queue()
vm._vpcs_version = parse_version("0.6.1")
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
vm.port_add_nio_binding(0, nio)
loop.run_until_complete(asyncio.async(vm.start()))
assert mock_exec.call_args[0] == (vm.vpcs_path,
'-p',
str(vm.console),
'-m', '1',
'-i',
'1',
'-F',
'-s',
'4242',
'-c',
'4243',
'-t',
'127.0.0.1')
assert vm.is_running()
(action, event) = queue.get_nowait()
assert action == "vm.started"
assert event == vm
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:32,代码来源:test_vpcs_vm.py
示例12: test_stop
def test_stop(loop, vm, async_run):
process = MagicMock()
# Wait process kill success
future = asyncio.Future()
future.set_result(True)
process.wait.return_value = future
process.returncode = None
vm.ip_address = "192.168.1.1"
with NotificationManager.instance().queue() as queue:
with patch("sys.platform", return_value="win"):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
async_run(vm.port_add_nio_binding(0, nio))
vm._ubridge_send = AsyncioMagicMock()
async_run(vm.start("192.168.1.2"))
assert vm.is_running()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
loop.run_until_complete(asyncio.ensure_future(vm.stop()))
assert vm.is_running() is False
process.terminate.assert_called_with()
async_run(queue.get(1)) # Ping
async_run(queue.get(1)) # Started
(action, event, kwargs) = async_run(queue.get(1))
assert action == "node.updated"
assert event == vm
开发者ID:GNS3,项目名称:gns3-server,代码行数:33,代码来源:test_traceng_vm.py
示例13: test_close
def test_close(vm, port_manager, loop):
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
loop.run_until_complete(asyncio.ensure_future(vm.start()))
loop.run_until_complete(asyncio.ensure_future(vm.close()))
assert vm.is_running() is False
开发者ID:GNS3,项目名称:gns3-server,代码行数:8,代码来源:test_vpcs_vm.py
示例14: mock_ubridge
def mock_ubridge():
"""
Avoid all interaction with ubridge
"""
with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._start_ubridge"):
with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._add_ubridge_connection"):
with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._delete_ubridge_connection"):
yield
开发者ID:GNS3,项目名称:gns3-server,代码行数:8,代码来源:test_nat.py
示例15: vm
def vm(server, project, base_params):
with asyncio_patch("gns3server.modules.docker.Docker.list_images", return_value=[{"image": "nginx"}]) as mock_list:
with asyncio_patch("gns3server.modules.docker.Docker.query", return_value={"Id": "8bd8153ea8f5"}) as mock:
response = server.post("/projects/{project_id}/docker/vms".format(project_id=project.id), base_params)
if response.status != 201:
print(response.body)
assert response.status == 201
return response.json
开发者ID:ravirajsdeshmukh,项目名称:gns3-server,代码行数:8,代码来源:test_docker.py
示例16: test_close
def test_close(vm, port_manager, loop):
vm.ip_address = "192.168.1.1"
with patch("sys.platform", return_value="win"):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
loop.run_until_complete(asyncio.ensure_future(vm.start("192.168.1.2")))
loop.run_until_complete(asyncio.ensure_future(vm.close()))
assert vm.is_running() is False
开发者ID:GNS3,项目名称:gns3-server,代码行数:8,代码来源:test_traceng_vm.py
示例17: test_start_vnc
def test_start_vnc(vm, loop):
vm.console_resolution = "1280x1024"
with patch("shutil.which", return_value="/bin/Xtigervnc"):
with asyncio_patch("gns3server.compute.docker.docker_vm.wait_for_file_creation") as mock_wait:
with asyncio_patch("asyncio.create_subprocess_exec") as mock_exec:
loop.run_until_complete(asyncio.ensure_future(vm._start_vnc()))
assert vm._display is not None
assert mock_exec.call_args[0] == ("Xtigervnc", "-geometry", vm.console_resolution, "-depth", "16", "-interface", "127.0.0.1", "-rfbport", str(vm.console), "-AlwaysShared", "-SecurityTypes", "None", ":{}".format(vm._display))
mock_wait.assert_called_with("/tmp/.X11-unix/X{}".format(vm._display))
开发者ID:GNS3,项目名称:gns3-server,代码行数:9,代码来源:test_docker_vm.py
示例18: vm
def vm(http_compute, project, base_params):
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "nginx"}]) as mock_list:
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value={"Id": "8bd8153ea8f5"}) as mock:
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="exited") as mock:
response = http_compute.post("/projects/{project_id}/docker/nodes".format(project_id=project.id), base_params)
if response.status != 201:
print(response.body)
assert response.status == 201
return response.json
开发者ID:AJNOURI,项目名称:gns3-server,代码行数:9,代码来源:test_docker.py
示例19: test_close
def test_close(vm, port_manager, loop):
with asyncio_patch("gns3server.modules.iou.iou_vm.IOUVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
vm.start()
port = vm.console
loop.run_until_complete(asyncio.async(vm.close()))
# Raise an exception if the port is not free
port_manager.reserve_tcp_port(port, vm.project)
assert vm.is_running() is False
开发者ID:bluca,项目名称:gns3-server,代码行数:9,代码来源:test_iou_vm.py
示例20: test_library_check
def test_library_check(loop, vm):
with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="") as mock:
loop.run_until_complete(asyncio.async(vm._library_check()))
with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="libssl => not found") as mock:
with pytest.raises(IOUError):
loop.run_until_complete(asyncio.async(vm._library_check()))
开发者ID:bluca,项目名称:gns3-server,代码行数:9,代码来源:test_iou_vm.py
注:本文中的tests.utils.asyncio_patch函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论