本文整理汇总了Python中virttest.utils_misc.get_qemu_img_binary函数的典型用法代码示例。如果您正苦于以下问题:Python get_qemu_img_binary函数的具体用法?Python get_qemu_img_binary怎么用?Python get_qemu_img_binary使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_qemu_img_binary函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run_test
def run_test(qemu_src_dir):
"""
run QEMU I/O test suite
:qemu_src_dir: path of qemu source code
"""
iotests_root = params.get("iotests_root", "tests/qemu-iotests")
extra_options = params.get("qemu_io_extra_options", "")
image_format = params.get("qemu_io_image_format")
result_pattern = params.get("iotests_result_pattern")
error_context.context("running qemu-iotests for image format %s"
% image_format, logging.info)
os.environ["QEMU_PROG"] = utils_misc.get_qemu_binary(params)
os.environ["QEMU_IMG_PROG"] = utils_misc.get_qemu_img_binary(params)
os.environ["QEMU_IO_PROG"] = utils_misc.get_qemu_io_binary(params)
os.environ["QEMU_NBD_PROG"] = utils_misc.get_binary('qemu-nbd', params)
os.chdir(os.path.join(qemu_src_dir, iotests_root))
cmd = './check'
if extra_options:
cmd += " %s" % extra_options
cmd += " -%s" % image_format
output = process.system_output(cmd, ignore_status=True, shell=True)
match = re.search(result_pattern, output, re.I | re.M)
if match:
iotests_log_file = "qemu_iotests_%s.log" % image_format
iotests_log_file = utils_misc.get_path(test.debugdir, iotests_log_file)
with open(iotests_log_file, 'w+') as log:
log.write(output)
log.flush()
msg = "Total test %s cases, %s failed"
raise exceptions.TestFail(msg % (match.group(2), match.group(1)))
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:31,代码来源:rh_qemu_iotests.py
示例2: get_image_size
def get_image_size(self, image_file):
qemu_img = utils_misc.get_qemu_img_binary(self.params)
cmd = "%s info %s" % (qemu_img, image_file)
info = utils.system_output(cmd)
size = re.findall("(\d+) bytes", info)
if size:
return int(size[0])
return 0
开发者ID:LeiCui,项目名称:virt-test,代码行数:8,代码来源:block_stream.py
示例3: __init__
def __init__(self, params, root_dir, tag):
"""
Init the default value for image object.
:param params: Dictionary containing the test parameters.
:param root_dir: Base directory for relative filenames.
:param tag: Image tag defined in parameter images
"""
storage.QemuImg.__init__(self, params, root_dir, tag)
self.image_cmd = utils_misc.get_qemu_img_binary(params)
q_result = process.run(self.image_cmd + ' -h', ignore_status=True,
shell=True, verbose=False)
self.help_text = results_stdout_52lts(q_result)
self.cap_force_share = '-U' in self.help_text
开发者ID:jcfaracco,项目名称:avocado-vt,代码行数:14,代码来源:qemu_storage.py
示例4: get_backingfile
def get_backingfile(self, method="monitor"):
"""
return backingfile of the device, if not return None;
"""
if method == "monitor":
return self.vm.monitor.get_backingfile(self.device)
qemu_img = utils_misc.get_qemu_img_binary(self.params)
cmd = "%s info %s " % (qemu_img, self.get_image_file())
info = utils.system_output(cmd)
try:
matched = re.search(r"backing file: +(.*)", info, re.M)
return matched.group(1)
except AttributeError:
logging.warn("No backingfile found, cmd output: %s" % info)
开发者ID:Chenditang,项目名称:tp-qemu,代码行数:15,代码来源:block_copy.py
示例5: __init__
def __init__(self, test, params):
self.__dict__ = self.__shared_state
self.tmpdir = test.tmpdir
self.qemu_img_binary = utils_misc.get_qemu_img_binary(params)
self.raw_files = ["stg1.raw", "stg2.raw"]
self.raw_files = list(map(lambda f: os.path.join(self.tmpdir, f),
self.raw_files))
# Here we're trying to choose fairly explanatory names so it's less
# likely that we run in conflict with other devices in the system
self.vgtest_name = params.get("vgtest_name", "vg_kvm_test_qemu_io")
self.lvtest_name = params.get("lvtest_name", "lv_kvm_test_qemu_io")
self.lvtest_device = "/dev/%s/%s" % (
self.vgtest_name, self.lvtest_name)
try:
getattr(self, 'loopback')
except AttributeError:
self.loopback = []
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:17,代码来源:qemu_io.py
示例6: get_backingfile
def get_backingfile(self, method="monitor"):
"""
return backingfile of the device, if not return None;
"""
backing_file = None
if method == "monitor":
backing_file = self.vm.monitor.get_backingfile(self.device)
else:
cmd = utils_misc.get_qemu_img_binary(self.params)
image_file = self.get_image_file()
cmd += " info %s " % image_file
info = utils.system_output(cmd)
matched = re.search(r"backing file: +(.*)", info, re.M)
if matched:
backing_file = matched.group(1)
if backing_file:
backing_file = os.path.abspath(backing_file)
return backing_file
开发者ID:sibiaoluo,项目名称:tp-qemu,代码行数:18,代码来源:block_copy.py
示例7: clean
def clean(self):
super(DriveMirror, self).clean()
params = self.parser_test_args()
if params.get("image_type") == "iscsi":
params["host_setup_flag"] = int(params["host_setup_flag"])
qemu_img = utils_misc.get_qemu_img_binary(self.params)
# Reformat it to avoid impact other test
cmd = "%s create -f %s %s %s" % (qemu_img,
params["image_format"],
self.target_image,
params["image_size"])
process.system(cmd)
image = qemu_storage.Iscsidev(params, self.data_dir,
params["target_image"])
image.cleanup()
elif params.get("image_type") == "nfs":
image = nfs.Nfs(params)
image.cleanup()
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:18,代码来源:drive_mirror.py
示例8: __init__
def __init__(self, test, params, env, tag):
"""
Init the default values for live backup object.
:param test: Kvm test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
:param tag: Image tag defined in parameter images
"""
super(LiveBackup, self).__init__(test, params, env, tag)
self.image_chain = self.params.get("image_chain").split()
self.image_cmd = utils_misc.get_qemu_img_binary(params)
self.source_image = self.params.get("source_image")
self.speed = int(self.params.get("speed", 0))
self.bitmap_name = "bitmap0"
self.backup_index = 1
self.backup_format = self.params.get("backup_format")
self.generate_backup_params()
开发者ID:suqinhuang,项目名称:tp-qemu,代码行数:18,代码来源:live_backup_base.py
示例9: __init__
def __init__(self, test, params):
self.__dict__ = self.__shared_state
root_dir = test.bindir
self.tmpdir = test.tmpdir
self.qemu_img_binary = utils_misc.get_qemu_img_binary(self.params)
self.raw_file_path = os.path.join(self.tmpdir, 'enospc.raw')
# Here we're trying to choose fairly explanatory names so it's less
# likely that we run in conflict with other devices in the system
self.vgtest_name = params["vgtest_name"]
self.lvtest_name = params["lvtest_name"]
self.lvtest_device = "/dev/%s/%s" % (
self.vgtest_name, self.lvtest_name)
image_dir = os.path.join(data_dir.get_data_dir(),
os.path.dirname(params["image_name"]))
self.qcow_file_path = os.path.join(image_dir, 'enospc.qcow2')
try:
getattr(self, 'loopback')
except AttributeError:
self.loopback = ''
开发者ID:arges,项目名称:tp-qemu,代码行数:19,代码来源:enospc.py
示例10: get_image_size
def get_image_size(self, image_file):
try:
qemu_img = utils_misc.get_qemu_img_binary(self.params)
cmd = "%s info %s" % (qemu_img, image_file)
logging.info("Try to get image size via qemu-img info")
info = process.system_output(cmd)
size = int(re.findall(r"(\d+) bytes", info)[0])
except process.CmdError:
logging.info("qemu-img info failed(it happens because later qemu"
" distributions prevent it access a running image.)."
" Now get image size via qmp interface 'query-block'")
blocks_info = self.vm.monitor.info("block")
for block in blocks_info:
info = block["inserted"]
if image_file == info["file"]:
size = info["image"]["virtual-size"]
if size:
return size
return 0
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:19,代码来源:block_stream.py
示例11: backup_img_chain
def backup_img_chain(image_file):
"""
Backup whole image in a image chain;
"""
mount_point = tempfile.mkdtemp(dir=test.resultsdir)
qemu_img = utils_misc.get_qemu_img_binary(params)
if enable_gluster:
g_uri = gluster.create_gluster_uri(params)
gluster.glusterfs_mount(g_uri, mount_point)
image_name = os.path.basename(image_file)
image_file = os.path.join(mount_point, image_name)
logging.warn("backup %s to %s" % (image_file, test.resultsdir))
shutil.copy(image_file, test.resultsdir)
backing_file = _info(qemu_img, image_file, "backing file", None)
if backing_file:
backup_img_chain(backing_file)
elif enable_gluster:
utils_misc.umount(g_uri, mount_point, "glusterfs", False, "fuse.glusterfs")
shutil.rmtree(mount_point)
return None
开发者ID:uni-peter-zheng,项目名称:tp-qemu,代码行数:20,代码来源:qemu_img.py
示例12: run_qemu_iotests
def run_qemu_iotests(test, params, env):
"""
Fetch from git and run qemu-iotests using the qemu binaries under test.
1) Fetch qemu-io from git
3) Run test for the file format detected
4) Report any errors found to autotest
:param test: QEMU test object.
:param params: Dictionary with the test parameters.
:param env: Dictionary with test environment.
"""
# First, let's get qemu-io
std = "http://git.kernel.org/pub/scm/virt/kvm/qemu-kvm.git"
uri = params.get("qemu_io_uri", std)
branch = params.get("qemu_io_branch", 'master')
lbranch = params.get("qemu_io_lbranch", 'master')
commit = params.get("qemu_io_commit", None)
base_uri = params.get("qemu_io_base_uri", None)
iotests_dir = params.get("qemu_iotests_dir", "tests/qemu-iotests")
destination_dir = os.path.join(test.srcdir, "qemu_io_tests")
git.get_repo(uri=uri, branch=branch, lbranch=lbranch, commit=commit,
destination_dir=destination_dir, base_uri=base_uri)
# Then, set the qemu paths for the use of the testsuite
os.environ["QEMU_PROG"] = utils_misc.get_qemu_binary(params)
os.environ["QEMU_IMG_PROG"] = utils_misc.get_qemu_img_binary(params)
os.environ["QEMU_IO_PROG"] = utils_misc.get_qemu_io_binary(params)
# qemu-iotests has merged into tests/qemu_iotests folder
os.chdir(os.path.join(destination_dir, iotests_dir))
image_format = params["qemu_io_image_format"]
extra_options = params.get("qemu_io_extra_options", "")
cmd = './check'
if extra_options:
cmd += extra_options
error.context("running qemu-iotests for image format %s" % image_format)
utils.system("%s -%s" % (cmd, image_format))
开发者ID:Antique,项目名称:virt-test,代码行数:40,代码来源:qemu_iotests.py
示例13: run_qemu_img
def run_qemu_img(test, params, env):
"""
'qemu-img' functions test:
1) Judge what subcommand is going to be tested
2) Run subcommand test
@param test: QEMU test object
@param params: Dictionary with the test parameters
@param env: Dictionary with test environment.
"""
qemu_img_binary = utils_misc.get_qemu_img_binary(params)
cmd = qemu_img_binary
if not os.path.exists(cmd):
raise error.TestError("Binary of 'qemu-img' not found")
image_format = params["image_format"]
image_size = params.get("image_size", "10G")
image_name = storage.get_image_filename(params, data_dir.get_data_dir())
def _check(cmd, img):
"""
Simple 'qemu-img check' function implementation.
@param cmd: qemu-img base command.
@param img: image to be checked
"""
cmd += " check %s" % img
error.context("Checking image '%s' by command '%s'" % (img, cmd),
logging.info)
try:
output = utils.system_output(cmd, verbose=False)
except error.CmdError, err:
if "does not support checks" in str(err):
return (True, "")
else:
return (False, str(err))
return (True, output)
开发者ID:LeiCui,项目名称:virt-test,代码行数:37,代码来源:qemu_img.py
示例14: run_boot_savevm
def run_boot_savevm(test, params, env):
"""
libvirt boot savevm test:
1) Start guest booting
2) Record origin informations of snapshot list for floppy(optional).
3) Periodically savevm/loadvm while guest booting
4) Stop test when able to login, or fail after timeout seconds.
5) Check snapshot list for floppy and compare with the origin
one(optional).
@param test: test object
@param params: Dictionary with the test parameters
@param env: Dictionary with test environment.
"""
vm = env.get_vm(params["main_vm"])
if params.get("with_floppy") == "yes":
floppy_name = params.get("floppies", "fl")
floppy_params = {"image_format": params.get("floppy_format", "qcow2"),
"image_size": params.get("floppy_size", "1.4M"),
"image_name": params.get("%s_name" % floppy_name,
"images/test"),
"vm_type": params.get("vm_type"),
"qemu_img_binary": utils_misc.get_qemu_img_binary(params)}
floppy = qemu_storage.QemuImg(floppy_params,
data_dir.get_data_dir(), floppy_name)
floppy.create(floppy_params)
floppy_orig_info = floppy.snapshot_list()
vm.create(params=params)
vm.verify_alive() # This shouldn't require logging in to guest
savevm_delay = float(params["savevm_delay"])
savevm_login_delay = float(params["savevm_login_delay"])
savevm_login_timeout = float(params["savevm_timeout"])
savevm_statedir = params.get("savevm_statedir", tempfile.gettempdir())
fd, savevm_statefile = tempfile.mkstemp(suffix='.img', prefix=vm.name+'-',
dir=savevm_statedir)
os.close(fd) # save_to_file doesn't need the file open
start_time = time.time()
cycles = 0
successful_login = False
while (time.time() - start_time) < savevm_login_timeout:
logging.info("Save/Restore cycle %d", cycles + 1)
time.sleep(savevm_delay)
vm.pause()
if params['save_method'] == 'save_to_file':
vm.save_to_file(savevm_statefile) # Re-use same filename
vm.restore_from_file(savevm_statefile)
else:
vm.savevm("1")
vm.loadvm("1")
vm.resume() # doesn't matter if already running or not
vm.verify_kernel_crash() # just in case
try:
vm.wait_for_login(timeout=savevm_login_delay)
successful_login = True # not set if timeout expires
os.unlink(savevm_statefile) # don't let these clutter disk
break
except:
pass # loop until successful login or time runs out
cycles += 1
time_elapsed = int(time.time() - start_time)
info = "after %s s, %d load/save cycles" % (time_elapsed, cycles + 1)
if not successful_login:
raise error.TestFail("Can't log on '%s' %s" % (vm.name, info))
else:
logging.info("Test ended %s", info)
if params.get("with_floppy") == "yes":
vm.destroy()
floppy_info = floppy.snapshot_list()
if floppy_info == floppy_orig_info:
raise error.TestFail("savevm didn't create snapshot in floppy."
" original snapshot list is: %s"
" now snapshot list is: %s"
% (floppy_orig_info, floppy_info))
开发者ID:LeiCui,项目名称:virt-test,代码行数:78,代码来源:boot_savevm.py
示例15: run
def run(test, params, env):
"""
'qemu-img' functions test:
1) Judge what subcommand is going to be tested
2) Run subcommand test
:param test: QEMU test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""
qemu_img_binary = utils_misc.get_qemu_img_binary(params)
cmd = qemu_img_binary
if not os.path.exists(cmd):
raise error.TestError("Binary of 'qemu-img' not found")
image_format = params["image_format"]
image_size = params.get("image_size", "10G")
enable_gluster = params.get("enable_gluster", "no") == "yes"
image_name = storage.get_image_filename(params, data_dir.get_data_dir())
def remove(path):
try:
os.remove(path)
except OSError:
pass
def _get_image_filename(img_name, enable_gluster=False, img_fmt=None):
"""
Generate an image path.
:param image_name: Force name of image.
:param enable_gluster: Enable gluster or not.
:param image_format: Format for image.
"""
if enable_gluster:
gluster_uri = gluster.create_gluster_uri(params)
image_filename = "%s%s" % (gluster_uri, img_name)
if img_fmt:
image_filename += ".%s" % img_fmt
else:
if img_fmt:
img_name = "%s.%s" % (img_name, img_fmt)
image_filename = utils_misc.get_path(data_dir.get_data_dir(),
img_name)
return image_filename
def _check(cmd, img):
"""
Simple 'qemu-img check' function implementation.
:param cmd: qemu-img base command.
:param img: image to be checked
"""
cmd += " check %s" % img
error.context("Checking image '%s' by command '%s'" % (img, cmd),
logging.info)
try:
output = utils.system_output(cmd, verbose=False)
except error.CmdError, err:
if "does not support checks" in str(err):
return (True, "")
else:
return (False, str(err))
return (True, output)
开发者ID:gogoxiaoxiao,项目名称:tp-qemu,代码行数:63,代码来源:qemu_img.py
示例16: __init__
def __init__(self, test, params, vm):
"""
Sets class attributes from test parameters.
:param test: QEMU test object.
:param params: Dictionary with test parameters.
"""
root_dir = data_dir.get_data_dir()
self.deps_dir = os.path.join(test.virtdir, 'deps')
self.unattended_dir = os.path.join(test.virtdir, 'unattended')
self.results_dir = test.debugdir
self.params = params
self.attributes = ['kernel_args', 'finish_program', 'cdrom_cd1',
'unattended_file', 'medium', 'url', 'kernel',
'initrd', 'nfs_server', 'nfs_dir', 'install_virtio',
'floppy_name', 'cdrom_unattended', 'boot_path',
'kernel_params', 'extra_params', 'qemu_img_binary',
'cdkey', 'finish_program', 'vm_type',
'process_check', 'vfd_size', 'cdrom_mount_point',
'floppy_mount_point', 'cdrom_virtio',
'virtio_floppy', 're_driver_match',
're_hardware_id', 'driver_in_floppy']
for a in self.attributes:
setattr(self, a, params.get(a, ''))
# Will setup the virtio attributes
v_attributes = ['virtio_floppy', 'virtio_scsi_path', 'virtio_storage_path',
'virtio_network_path', 'virtio_oemsetup_id',
'virtio_network_installer_path',
'virtio_balloon_installer_path',
'virtio_qxl_installer_path']
for va in v_attributes:
setattr(self, va, params.get(va, ''))
self.tmpdir = test.tmpdir
self.qemu_img_binary = utils_misc.get_qemu_img_binary(params)
if getattr(self, 'unattended_file'):
self.unattended_file = os.path.join(test.virtdir,
self.unattended_file)
if getattr(self, 'finish_program'):
self.finish_program = os.path.join(test.virtdir,
self.finish_program)
if getattr(self, 'cdrom_cd1'):
self.cdrom_cd1 = os.path.join(root_dir, self.cdrom_cd1)
self.cdrom_cd1_mount = tempfile.mkdtemp(prefix='cdrom_cd1_',
dir=self.tmpdir)
if getattr(self, 'cdrom_unattended'):
self.cdrom_unattended = os.path.join(root_dir,
self.cdrom_unattended)
if getattr(self, 'virtio_floppy'):
self.virtio_floppy = os.path.join(root_dir, self.virtio_floppy)
if getattr(self, 'cdrom_virtio'):
self.cdrom_virtio = os.path.join(root_dir, self.cdrom_virtio)
if getattr(self, 'kernel'):
self.kernel = os.path.join(root_dir, self.kernel)
if getattr(self, 'initrd'):
self.initrd = os.path.join(root_dir, self.initrd)
if self.medium == 'nfs':
self.nfs_mount = tempfile.mkdtemp(prefix='nfs_',
dir=self.tmpdir)
setattr(self, 'floppy', self.floppy_name)
if getattr(self, 'floppy'):
self.floppy = os.path.join(root_dir, self.floppy)
if not os.path.isdir(os.path.dirname(self.floppy)):
os.makedirs(os.path.dirname(self.floppy))
self.image_path = os.path.dirname(self.kernel)
# Content server params
# lookup host ip address for first nic by interface name
try:
auto_ip = utils_net.get_ip_address_by_interface(
vm.virtnet[0].netdst)
except utils_net.NetError:
auto_ip = None
self.url_auto_content_ip = params.get('url_auto_ip', auto_ip)
self.url_auto_content_port = None
# Kickstart server params
# use the same IP as url_auto_content_ip, but a different port
self.unattended_server_port = None
# Embedded Syslog Server
self.syslog_server_enabled = params.get('syslog_server_enabled', 'no')
self.syslog_server_ip = params.get('syslog_server_ip', auto_ip)
self.syslog_server_port = int(params.get('syslog_server_port', 5140))
self.syslog_server_tcp = params.get('syslog_server_proto',
'tcp') == 'tcp'
#.........这里部分代码省略.........
开发者ID:aginies,项目名称:virt-test,代码行数:101,代码来源:unattended_install.py
示例17: run
def run(test, params, env):
"""
block_stream_without_backingfile test:
1). bootup guest
2). create snapshots chian(base->sn1->sn2), verify backingfile should sn1
3). merge sn1 to sn2 (sn1->sn2) aka block stream with special base, after
job done, then check backingfile is base and sn1 not opening by qemu
4). merge base to sn2(base->sn2) after this step sn2 should no backingfile
and sn1 and base should not opening by qemu
5). reboot guest vierfy it works correctly
6). verify not backingfile with qemu-img command too;
:param test: Qemu test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""
vm = env.get_vm(params["main_vm"])
vm.verify_alive()
timeout = int(params.get("login_timeout", 360))
session = vm.wait_for_login(timeout=timeout)
alive_check_cmd = params.get("alive_check_cmd", "dir")
image_file = storage.get_image_filename(params, data_dir.get_data_dir())
image_dir = os.path.dirname(image_file)
qemu_img = utils_misc.get_qemu_img_binary(params)
speed = int(params.get("limited_speed", 0))
wait_timeout = int(params.get("wait_timeout", 3600))
def wait_job_done(timeout=3600):
"""
Wait for job on the device done, raise TestFail exception if timeout;
"""
if utils_misc.wait_for(lambda:
not vm.monitor.query_block_job(device_id),
timeout, first=0.2, step=2.0,
text="Wait for canceling block job") is None:
raise error.TestFail("Wait job finish timeout in %ss" % timeout)
def verify_backingfile(expect_backingfile):
"""
Got backingfile from monitor then verify it with expect_backingfile,
if not raise TestFail exception;
"""
backing_file = vm.monitor.get_backingfile(device_id)
if backing_file != expect_backingfile:
raise error.TestFail("Unexpect backingfile(%s)" % backing_file)
def get_openingfiles():
"""
Return files which opening by qemu process;
"""
pid = vm.get_pid()
cmd = params.get("snapshot_check_cmd") % pid
return set(utils.system_output(cmd, ignore_status=True).splitlines())
snapshots = map(lambda x: os.path.join(image_dir, x), ["sn1", "sn2"])
try:
error.context("Create snapshots-chain(base->sn1->sn2)", logging.info)
for index, snapshot in enumerate(snapshots):
base_file = index and snapshots[index - 1] or image_file
device_id = vm.live_snapshot(base_file, snapshot)
if not device_id:
raise error.TestFail("Fail to create %s" % snapshot)
error.context("Check backing-file of sn2", logging.info)
verify_backingfile(snapshots[0])
error.context("Merge sn1 to sn2", logging.info)
vm.monitor.block_stream(device_id, base=image_file, speed=speed)
wait_job_done(wait_timeout)
error.context("Check backing-file of sn2", logging.info)
verify_backingfile(image_file)
error.context("Check sn1 is not opening by qemu process",
logging.info)
if snapshots[0] in get_openingfiles():
raise error.TestFail("sn1 (%s) is opening by qemu" % snapshots[0])
error.context("Merge base to sn2", logging.info)
vm.monitor.block_stream(device_id)
wait_job_done(wait_timeout)
error.context("Check backing-file of sn2", logging.info)
verify_backingfile(None)
error.context("check sn1 and base are not opening by qemu process",
logging.info)
if set([snapshots[0], image_file]).issubset(get_openingfiles()):
raise error.TestFail("%s is opening by qemu" % set([snapshots[0],
image_file]))
error.context("Check backing-file of sn2 by qemu-img", logging.info)
cmd = "%s info %s" % (qemu_img, snapshots[1])
if re.search("backing file",
utils.system_output(cmd, ignore_status=True)):
raise error.TestFail("should no backing-file in this step")
error.context("Reboot VM to check it works fine", logging.info)
session = vm.reboot(session=session, timeout=timeout)
session.cmd(alive_check_cmd)
finally:
map(lambda x: utils.system("rm -rf %s" % x), snapshots)
开发者ID:CongLi,项目名称:tp-qemu,代码行数:96,代码来源:block_stream_drop_backingfile.py
注:本文中的virttest.utils_misc.get_qemu_img_binary函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论