• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils_misc.wait_for函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中virttest.utils_misc.wait_for函数的典型用法代码示例。如果您正苦于以下问题:Python wait_for函数的具体用法?Python wait_for怎么用?Python wait_for使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wait_for函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: check_guest_meminfo

    def check_guest_meminfo(old_mem, check_option):
        """
        Check meminfo on guest.
        """
        assert old_mem is not None
        session = vm.wait_for_login()
        # Hot-plugged memory should be online by udev rules
        udev_file = "/lib/udev/rules.d/80-hotplug-cpu-mem.rules"
        udev_rules = ('SUBSYSTEM=="memory", ACTION=="add", TEST=="state",'
                      ' ATTR{state}=="offline", ATTR{state}="online"')
        cmd = ("grep memory %s || echo '%s' >> %s"
               % (udev_file, udev_rules, udev_file))
        session.cmd(cmd)
        # Wait a while for new memory to be detected.
        utils_misc.wait_for(
            lambda: vm.get_totalmem_sys(online) != int(old_mem), 30, first=20.0)
        new_mem = vm.get_totalmem_sys(online)
        session.close()
        logging.debug("Memtotal on guest: %s", new_mem)
        no_of_times = 1
        if at_times:
            no_of_times = at_times
        if check_option == "attach":
            if new_mem != int(old_mem) + (int(tg_size) * no_of_times):
                test.fail("Total memory on guest couldn't changed after "
                          "attach memory device")

        if check_option == "detach":
            if new_mem != int(old_mem) - (int(tg_size) * no_of_times):
                test.fail("Total memory on guest couldn't changed after "
                          "detach memory device")
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:31,代码来源:libvirt_mem.py


示例2: check_memory

    def check_memory(self, vm=None):
        """
        Check is guest memory is really match assgined to VM.

        :param vm: VM object, get VM object from env if vm is None.
        """
        error_context.context("Verify memory info", logging.info)
        if not vm:
            vm = self.env.get_vm(self.params["main_vm"])
        vm.verify_alive()
        threshold = float(self.params.get("threshold", 0.10))
        timeout = float(self.params.get("wait_resume_timeout", 60))
        # Notes:
        #    some sub test will pause VM, here need to wait VM resume
        # then check memory info in guest.
        utils_misc.wait_for(lambda: not vm.is_paused(), timeout=timeout)
        utils_misc.verify_host_dmesg()
        self.os_type = self.params.get("os_type")
        guest_mem_size = super(MemoryHotplugTest, self).get_guest_total_mem(vm)
        vm_mem_size = self.get_vm_mem(vm)
        if abs(guest_mem_size - vm_mem_size) > vm_mem_size * threshold:
            msg = ("Assigned '%s MB' memory to '%s'"
                   "but, '%s MB' memory detect by OS" %
                   (vm_mem_size, vm.name, guest_mem_size))
            raise exceptions.TestFail(msg)
开发者ID:ngu-niny,项目名称:avocado-vt,代码行数:25,代码来源:__init__.py


示例3: run

def run(test, params, env):
    """
    drive_mirror_stress test:
    1). guest installation
    2). start mirror during guest installation
    3). after installation complete, reboot guest verfiy guest reboot correctly.

    :param test: QEMU test object
    :param params: Dictionary with the test parameters
    :param env: Dictionary with test environment.
    """

    args = (test, params, env)
    bg = utils_misc.InterruptedThread(utils_test.run_virt_sub_test, args,
                                      {"sub_type": "unattended_install"})
    bg.start()
    utils_misc.wait_for(bg.is_alive, timeout=10)
    time.sleep(random.uniform(60, 200))
    tag = params["source_image"]
    mirror_test = drive_mirror.DriveMirror(test, params, env, tag)
    mirror_test.trash_files.append(mirror_test.image_file)
    try:
        mirror_test.start()
        mirror_test.wait_for_steady()
        mirror_test.reopen()
        bg.join()
    finally:
        mirror_test.clean()
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:28,代码来源:drive_mirror_installation.py


示例4: guest_netwok_connecting_check

    def guest_netwok_connecting_check(guest_ip, link_up, change_queues=False):
        """
        Check whether guest network is connective by ping
        """
        if link_up:
            vm.wait_for_login()
            guest_ip = vm.get_address()
        if change_queues:
            env["run_change_queues"] = False
            bg_thread = utils.InterruptedThread(change_queues_number_repeatly,
                                                (guest_ifname,))
            bg_thread.start()

            utils_misc.wait_for(lambda: env["run_change_queues"], 30, 0, 2,
                                "wait queues change start")

        _, output = utils_test.ping(guest_ip, count=10, timeout=20)
        if not link_up and utils_test.get_loss_ratio(output) != 100:
            err_msg = "guest network still connecting after down the link"
            raise error.TestFail(err_msg)
        elif link_up and utils_test.get_loss_ratio(output) == 100:
            err_msg = "All packets lost during ping guest ip after link up"
            raise error.TestFail(err_msg)
        else:
            logging.info("Guest network connecting is exactly as expected")

        if change_queues:
            env["run_change_queues"] = False
            bg_thread.join()
开发者ID:Xiangmin,项目名称:tp-qemu,代码行数:29,代码来源:set_link.py


示例5: run

def run(test, params, env):
    """
    block_stream_installation test:
    1). guest installation
    2). live snapshot during guest installation
    3). block stream afterwards
    :param test: QEMU test object
    :param params: Dictionary with the test parameters
    :param env: Dictionary with test environment.
    """

    args = (test, params, env)
    bg = utils_misc.InterruptedThread(utils_test.run_virt_sub_test, args,
                                      {"sub_type": "unattended_install"})
    bg.start()
    utils_misc.wait_for(bg.is_alive, timeout=10)
    time.sleep(random.uniform(60, 200))
    tag = params["source_image"]
    stream_test = blk_stream.BlockStream(test, params, env, tag)
    stream_test.trash_files.append(stream_test.image_file)
    try:
        stream_test.create_snapshots()
        stream_test.start()
        stream_test.wait_for_finished()
        bg.join()
    finally:
        stream_test.clean()
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:27,代码来源:block_stream_installation.py


示例6: get_dirty

 def get_dirty(session, frozen=False):
     """
     Get dirty data of guest
     """
     try:
         data_cmd = "cat /proc/meminfo | grep Dirty"
         if not frozen:
             result = utils_misc.wait_for(lambda:
                                          int(session.
                                              cmd_output(data_cmd).
                                              strip().split()[1]) != 0,
                                          60)
             if result:
                 return int(session.cmd_output(data_cmd).strip().
                            split()[1])
             else:
                 return 0
             dirty_info = session.cmd_output(data_cmd).strip()
             return int(dirty_info.split()[1])
         else:
             result = utils_misc.wait_for(lambda:
                                          int(session.
                                              cmd_output(data_cmd).
                                              strip().split()[1]) == 0,
                                          60)
             if result:
                 return 0
             else:
                 return int(session.cmd_output(data_cmd).strip().
                            split()[1])
     except (IndexError, ValueError), details:
         raise error.TestFail("Get dirty info failed: %s" % details)
开发者ID:PandaWei,项目名称:tp-libvirt,代码行数:32,代码来源:virsh_qemu_agent_command_fs.py


示例7: get_ip_by_mac

    def get_ip_by_mac(mac_addr, try_dhclint=False):
        """
        Get interface IP address by given MAC addrss. If try_dhclint is
        True, then try to allocate IP addrss for the interface.
        """
        session = vm.wait_for_login()

        def f():
            return utils_net.get_guest_ip_addr(session, mac_addr)

        try:
            ip_addr = utils_misc.wait_for(f, 10)
            if ip_addr is None:
                iface_name = utils_net.get_linux_ifname(session, mac_addr)
                if try_dhclint:
                    session.cmd("dhclient %s" % iface_name)
                    ip_addr = utils_misc.wait_for(f, 10)
                else:
                    # No IP for the interface, just print the interface name
                    logging.warn("Find '%s' with MAC address '%s', "
                                 "but which has no IP address", iface_name,
                                 mac_addr)
        finally:
            session.close()
        return ip_addr
开发者ID:Hao-Liu,项目名称:tp-libvirt,代码行数:25,代码来源:virsh_net_dhcp_leases.py


示例8: clear_interface_linux

def clear_interface_linux(vm, login_timeout, timeout):
    """
    Clears user interface of a vm without reboot

    :param vm:      VM where cleaning is required
    """
    logging.info("restarting X/gdm on: %s", vm.name)
    session = vm.wait_for_login(username="root", password="123456",
                                timeout=login_timeout)

    if "release 7" in session.cmd('cat /etc/redhat-release'):
        command = "gdm"
        pgrep_process = "'^gdm$'"
    else:
        command = "Xorg"
        pgrep_process = "Xorg"

    try:
        pid = session.cmd("pgrep %s" % pgrep_process)
        session.cmd("killall %s" % command)
        utils_misc.wait_for(lambda: _is_pid_alive(session, pid), 10,
                            timeout, 0.2)
    except:
        pass

    try:
        session.cmd("ps -C %s" % command)
    except ShellCmdError:
        raise error.TestFail("X/gdm not running")
开发者ID:Andrei-Stepanov,项目名称:virt-test,代码行数:29,代码来源:utils_spice.py


示例9: check_boot_result

    def check_boot_result(boot_fail_info, device_name):
        """
        Check boot result, and logout from iscsi device if boot from iscsi.
        """

        logging.info("Wait for display and check boot info.")
        infos = boot_fail_info.split(';')
        f = lambda: re.search(infos[0], vm.serial_console.get_output())
        utils_misc.wait_for(f, timeout, 1)

        logging.info("Try to boot from '%s'" % device_name)
        try:
            if dev_name == "hard-drive" or (dev_name == "scsi-hd" and not
                                            params.get("image_name_stg")):
                error.context("Log into the guest to verify it's up",
                              logging.info)
                session = vm.wait_for_login(timeout=timeout)
                session.close()
                vm.destroy()
                return

            output = vm.serial_console.get_output()

            for i in infos:
                if not re.search(i, output):
                    raise error.TestFail("Could not boot from"
                                         " '%s'" % device_name)
        finally:
            cleanup(device_name)
开发者ID:NixSilva,项目名称:virt-test,代码行数:29,代码来源:boot_from_device.py


示例10: receiver

 def receiver():
     """ Receive side """
     logging.info("Starting receiver process on %s", receiver_addr)
     if vm_receiver:
         session = vm_receiver.wait_for_login(timeout=login_timeout)
     else:
         username = params.get("username", "")
         password = params.get("password", "")
         prompt = params.get("shell_prompt", "[\#\$]")
         linesep = eval("'%s'" % params.get("shell_linesep", r"\n"))
         client = params.get("shell_client")
         port = int(params.get("shell_port"))
         log_filename = ("session-%s-%s.log" % (receiver_addr,
                         utils_misc.generate_random_string(4)))
         session = remote.remote_login(client, receiver_addr, port,
                                       username, password, prompt,
                                       linesep, log_filename, timeout)
         session.set_status_test_command("echo %errorlevel%")
     install_ntttcp(session)
     ntttcp_receiver_cmd = params.get("ntttcp_receiver_cmd")
     global _receiver_ready
     f = open(results_path + ".receiver", 'a')
     for b in buffers:
         utils_misc.wait_for(lambda: not _wait(), timeout)
         _receiver_ready = True
         rbuf = params.get("fixed_rbuf", b)
         cmd = ntttcp_receiver_cmd % (
             session_num, receiver_addr, rbuf, buf_num)
         r = session.cmd_output(cmd, timeout=timeout,
                                print_func=logging.debug)
         f.write("Send buffer size: %s\n%s\n%s" % (b, cmd, r))
     f.close()
     session.close()
开发者ID:ayiyaliing,项目名称:virt-test,代码行数:33,代码来源:ntttcp.py


示例11: add_device

    def add_device(pci_num, queues=1):
        info_pci_ref = vm.monitor.info("pci")
        reference = session.cmd_output(reference_cmd)

        try:
            # get function for adding device.
            add_fuction = local_functions["%s_%s" % (cmd_type, pci_type)]
        except Exception:
            raise error.TestError("No function for adding '%s' dev with '%s'" %
                                  (pci_type, cmd_type))
        after_add = None
        if add_fuction:
            # Do add pci device.
            after_add = add_fuction(pci_num, queues)

        try:
            # Define a helper function to compare the output
            def _new_shown():
                o = session.cmd_output(reference_cmd)
                return o != reference

            # Define a helper function to catch PCI device string
            def _find_pci():
                output = session.cmd_output(params.get("find_pci_cmd"))
                output = map(string.strip, output.splitlines())
                ref = map(string.strip, reference.splitlines())
                output = [_ for _ in output if _ not in ref]
                output = "\n".join(output)
                if re.search(params.get("match_string"), output, re.I):
                    return True
                return False

            error.context("Start checking new added device")
            # Compare the output of 'info pci'
            if after_add == info_pci_ref:
                raise error.TestFail("No new PCI device shown after executing "
                                     "monitor command: 'info pci'")

            secs = int(params.get("wait_secs_for_hook_up"))
            if not utils_misc.wait_for(_new_shown, test_timeout, secs, 3):
                raise error.TestFail("No new device shown in output of command "
                                     "executed inside the guest: %s" %
                                     reference_cmd)

            if not utils_misc.wait_for(_find_pci, test_timeout, 3, 3):
                raise error.TestFail("PCI %s %s device not found in guest. "
                                     "Command was: %s" %
                                     (pci_model, pci_type,
                                      params.get("find_pci_cmd")))

            # Test the newly added device
            try:
                session.cmd(params.get("pci_test_cmd") % (pci_num + 1))
            except aexpect.ShellError, e:
                raise error.TestFail("Check for %s device failed after PCI "
                                     "hotplug. Output: %r" % (pci_type, e.output))

        except Exception:
            pci_del(pci_num, ignore_failure=True)
            raise
开发者ID:FT4VT,项目名称:FT4VM-L1_test,代码行数:60,代码来源:pci_hotplug.py


示例12: run_ip_test

    def run_ip_test(session, ip_ver):
        """
        Check iptables on host and ipv6 address on guest
        """
        if ip_ver == "ipv6":
            # Clean up iptables rules for guest to get ipv6 address
            session.cmd_status("ip6tables -F")
        utils_net.restart_guest_network(session, iface_mac,
                                        ip_version=ip_ver)

        # It may take some time to get the ip address
        def get_ip_func():
            return utils_net.get_guest_ip_addr(session, iface_mac,
                                               ip_version=ip_ver)

        utils_misc.wait_for(get_ip_func, 10)
        vm_ip = get_ip_func()
        logging.debug("Guest has ip: %s", vm_ip)
        if not vm_ip:
            raise error.TestFail("Can't find ip address on guest")
        ping_cmd = "ping -c 5"
        ip_gateway = net_ip_address
        if ip_ver == "ipv6":
            ping_cmd = "ping6 -c 5"
            ip_gateway = net_ipv6_address
        if ip_gateway:
            if utils.system("%s %s" % (ping_cmd, ip_gateway),
                            ignore_status=True):
                raise error.TestFail("Failed to ping gateway address: %s"
                                     % ip_gateway)
开发者ID:Antique,项目名称:tp-libvirt,代码行数:30,代码来源:iface_network.py


示例13: wait_for_balloon_complete

 def wait_for_balloon_complete(self, timeout):
     """
     Wait until guest memory don't change
     """
     logging.info("Wait until guest memory don't change")
     is_stable = self._mem_state()
     utils_misc.wait_for(is_stable.next, timeout, step=10.0)
开发者ID:Zhengtong,项目名称:tp-qemu,代码行数:7,代码来源:balloon_check.py


示例14: run_bg_stress_test

    def run_bg_stress_test(bg_stress_test):
        """
        Run backgroud test.

        :param bg_stress_test: Background test.
        :return: return the background case thread if it's successful;
                 else raise error.
        """
        error_context.context("Run test %s background" % bg_stress_test,
                              logging.info)
        stress_thread = None
        wait_time = float(params.get("wait_bg_time", 60))
        target_process = params.get("target_process", "")
        bg_stress_run_flag = params.get("bg_stress_run_flag")
        # Need to set bg_stress_run_flag in some cases to make sure all
        # necessary steps are active
        env[bg_stress_run_flag] = False
        stress_thread = utils.InterruptedThread(
            utils_test.run_virt_sub_test, (test, params, env),
            {"sub_type": bg_stress_test})
        stress_thread.start()
        if not utils_misc.wait_for(lambda: check_bg_running(target_process),
                                   120, 0, 1):
            raise exceptions.TestFail("Backgroud test %s is not "
                                      "alive!" % bg_stress_test)
        if params.get("set_bg_stress_flag", "no") == "yes":
            logging.info("Wait %s test start" % bg_stress_test)
            if not utils_misc.wait_for(lambda: env.get(bg_stress_run_flag),
                                       wait_time, 0, 0.5):
                err = "Fail to start %s test" % bg_stress_test
                raise exceptions.TestError(err)
        env["bg_status"] = 1
        return stress_thread
开发者ID:PyLearner,项目名称:tp-qemu,代码行数:33,代码来源:driver_in_use.py


示例15: run_ip_test

    def run_ip_test(session, ip_ver):
        """
        Check iptables on host and ipv6 address on guest
        """
        if ip_ver == "ipv6":
            # Clean up iptables rules for guest to get ipv6 address
            session.cmd_status("ip6tables -F")

        # It may take some time to get the ip address
        def get_ip_func():
            return utils_net.get_guest_ip_addr(session, iface_mac,
                                               ip_version=ip_ver)
        utils_misc.wait_for(get_ip_func, 5)
        if not get_ip_func():
            utils_net.restart_guest_network(session, iface_mac,
                                            ip_version=ip_ver)
            utils_misc.wait_for(get_ip_func, 5)
        vm_ip = get_ip_func()
        logging.debug("Guest has ip: %s", vm_ip)
        if not vm_ip:
            test.fail("Can't find ip address on guest")
        ip_gateway = net_ip_address
        if ip_ver == "ipv6":
            ip_gateway = net_ipv6_address
            # Cleanup ip6talbes on host for ping6 test
            process.system("ip6tables -F")
        if ip_gateway and not routes:
            ping_s, _ = ping(dest=ip_gateway, count=5,
                             timeout=10, session=session)
            if ping_s:
                test.fail("Failed to ping gateway address: %s" % ip_gateway)
开发者ID:lento-sun,项目名称:tp-libvirt,代码行数:31,代码来源:iface_network.py


示例16: launch_netperf_client

def launch_netperf_client(test, server_ips, netperf_clients, test_option,
                          test_duration, netperf_para_sess,
                          netperf_cmd_prefix):
    """
    start netperf client in guest.
    """
    start_time = time.time()
    stop_time = start_time + test_duration * 1.5
    logging.info("server_ips = %s", server_ips)
    for s_ip in server_ips:
        for n_client in netperf_clients:
            n_client.bg_start(s_ip, test_option,
                              netperf_para_sess, netperf_cmd_prefix)
            if utils_misc.wait_for(n_client.is_netperf_running, 30, 0, 1,
                                   "Wait netperf test start"):
                logging.info("Netperf test start successfully.")
            else:
                test.error("Can not start netperf client.")

    for n_client in netperf_clients:
        if n_client.is_netperf_running():
            left_time = stop_time - time.time()
            utils_misc.wait_for(lambda: not
                                n_client.is_netperf_running(),
                                left_time, 0, 5,
                                "Wait netperf test finish %ss" % left_time)
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:26,代码来源:multi_nics_stress.py


示例17: guest_netwok_connecting_check

    def guest_netwok_connecting_check(guest_ip, link_up, change_queues=False):
        """
        Check whether guest network is connective by ping
        """
        if change_queues:
            env["run_change_queues"] = False
            bg_thread = utils_misc.InterruptedThread(
                change_queues_number_repeatly, (guest_ifname,))
            bg_thread.start()

            utils_misc.wait_for(lambda: env["run_change_queues"], 30, 0, 2,
                                "wait queues change start")
        time.sleep(0.5)
        output = utils_test.ping(guest_ip, 10, interface=host_interface,
                                 timeout=20, session=None)[1]
        if not link_up and utils_test.get_loss_ratio(output) < 80:
            err_msg = "guest network still connecting after down the link"
            test.fail(err_msg)
        elif link_up and utils_test.get_loss_ratio(output) > 20:
            err_msg = "All packets lost during ping guest ip after link up"
            test.fail(err_msg)

        if change_queues:
            env["run_change_queues"] = False
            bg_thread.join()
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:25,代码来源:set_link.py


示例18: do_fstrim

def do_fstrim(fstrim_type, vm, status_error=False):
    """
    Execute fstrim in different ways, and check its result.
    """
    # ensure that volume capacity won't change before trimming disk.
    if not _sync_finished():
        utils_misc.wait_for(_sync_finished, timeout=300)
    if fstrim_type == "fstrim_cmd":
        session = vm.wait_for_login()
        output = session.cmd_output("fstrim -v /mnt", timeout=240)
        session.close()
        logging.debug(output)
        if re.search("Operation not supported", output):
            if status_error:
                # virtio is not supported in unmap operations
                logging.debug("Expected failure: virtio do not support fstrim")
                return
            else:
                raise error.TestFail("Not supported fstrim on supported "
                                     "envrionment.Bug?")
        try:
            trimmed_bytes = re.search("\d+\sbytes",
                                      output).group(0).split()[0]
            trimmed = int(trimmed_bytes)
            logging.debug("Trimmed size is:%s bytes", trimmed)
        except (AttributeError, IndexError), detail:
            raise error.TestFail("Do fstrim failed:%s" % detail)
        if trimmed == 0:
            raise error.TestFail("Trimmed size is 0.")
开发者ID:noxdafox,项目名称:tp-libvirt,代码行数:29,代码来源:storage_discard.py


示例19: check_guest_flags

    def check_guest_flags(bash_cmd, flags):
        """
        Check bypass_cache option for single guest.
        """
        # Drop caches.
        drop_caches()
        virsh_cmd = "service libvirt-guests stop"
        check_flags_parallel(virsh_cmd, bash_cmd %
                             (managed_save_file, managed_save_file,
                              "1", flags), flags)
        ret = utils.run("service libvirt-guests status",
                        ignore_status=True)
        logging.info("status output: %s", ret.stdout)
        if not re.findall(r"Suspending %s" % vm_name,
                          ret.stdout, re.M):
            raise error.TestFail("Can't see messages of suspending vm")
        # status command should return 3.
        if ret.exit_status != 3:
            raise error.TestFail("The exit code %s for libvirt-guests"
                                 " status is not correct" % ret)

        # Wait for VM in shut off state
        utils_misc.wait_for(lambda: vm.state() == "shut off", 10)
        virsh_cmd = "service libvirt-guests start"
        check_flags_parallel(virsh_cmd, bash_cmd %
                             (managed_save_file, managed_save_file,
                              "0", flags), flags)
开发者ID:Antique,项目名称:tp-libvirt,代码行数:27,代码来源:virsh_managedsave.py


示例20: add_device

    def add_device(pci_num):
        reference_cmd = params["reference_cmd"]
        find_pci_cmd = params["find_pci_cmd"]
        info_pci_ref = vm.monitor.info("pci")
        reference = session.cmd_output(reference_cmd)

        try:
            # get function for adding device.
            add_fuction = local_functions["%s_iov" % cmd_type]
        except Exception:
            raise error.TestError(
                "No function for adding sr-iov dev with '%s'" %
                cmd_type)
        after_add = None
        if add_fuction:
            # Do add pci device.
            after_add = add_fuction(pci_num)

        try:
            # Define a helper function to compare the output
            def _new_shown():
                o = session.cmd_output(reference_cmd)
                return o != reference

            # Define a helper function to catch PCI device string
            def _find_pci():
                o = session.cmd_output(find_pci_cmd)
                if re.search(match_string, o, re.IGNORECASE):
                    return True
                else:
                    return False

            error.context("Start checking new added device")
            # Compare the output of 'info pci'
            if after_add == info_pci_ref:
                raise error.TestFail("No new PCI device shown after executing "
                                     "monitor command: 'info pci'")

            secs = int(params["wait_secs_for_hook_up"])
            if not utils_misc.wait_for(_new_shown, test_timeout, secs, 3):
                raise error.TestFail("No new device shown in output of command "
                                     "executed inside the guest: %s" %
                                     reference_cmd)

            if not utils_misc.wait_for(_find_pci, test_timeout, 3, 3):
                raise error.TestFail("New add sr-iov device not found in guest. "
                                     "Command was: %s" % find_pci_cmd)

            # Test the newly added device
            try:
                session.cmd(params["pci_test_cmd"] % (pci_num + 1))
            except aexpect.ShellError, e:
                raise error.TestFail("Check for sr-iov device failed after PCI "
                                     "hotplug. Output: %r" % e.output)

        except Exception:
            pci_del(pci_num, ignore_failure=True)
            raise
开发者ID:FT4VT,项目名称:FT4VM-L1_test,代码行数:58,代码来源:sr_iov_hotplug.py



注:本文中的virttest.utils_misc.wait_for函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils_net.generate_mac_address_simple函数代码示例发布时间:2022-05-26
下一篇:
Python utils_misc.umount函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap