本文整理汇总了Python中virttest.utils_misc.get_path函数的典型用法代码示例。如果您正苦于以下问题:Python get_path函数的具体用法?Python get_path怎么用?Python get_path使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_path函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: start
def start(self):
"""
start block device committing job;
"""
base_image = self.params["base_image"]
base_image = utils_misc.get_path(self.data_dir, base_image)
top_image = self.params["top_image"]
top_image = utils_misc.get_path(self.data_dir, top_image)
default_speed = self.params.get("default_speed")
backing_file = self.params.get("backing_file", None)
if backing_file is not None:
backing_file = utils_misc.get_path(self.data_dir, backing_file)
if self.vm.monitor.protocol == "qmp":
self.vm.monitor.clear_event("BLOCK_JOB_READY")
self.vm.monitor.clear_event("BLOCK_JOB_COMPLETED")
else:
self.test.cancel("hmp command is not supportable.")
logging.info("start to commit block device")
self.vm.block_commit(self.device, default_speed, base_image, top_image,
backing_file)
status = self.get_status()
if not status:
self.test.fail("no active job found")
logging.info("block commit job running, with limited speed {0} B/s".format(default_speed))
开发者ID:Zhengtong,项目名称:tp-qemu,代码行数:25,代码来源:blk_commit.py
示例2: create_iso_image
def create_iso_image(params, name, prepare=True, file_size=None):
"""
Creates 'new' iso image with one file on it
:param params: parameters for test
:param name: name of new iso image file
:param preapre: if True then it prepare cd images.
:param file_size: Size of iso image in MB
:return: path to new iso image file.
"""
error.context("Creating test iso image '%s'" % name, logging.info)
cdrom_cd = params["target_cdrom"]
cdrom_cd = params[cdrom_cd]
if not os.path.isabs(cdrom_cd):
cdrom_cd = utils_misc.get_path(data_dir.get_data_dir(), cdrom_cd)
iso_image_dir = os.path.dirname(cdrom_cd)
if file_size is None:
file_size = 10
file_name = utils_misc.get_path(iso_image_dir, "%s.iso" % (name))
if prepare:
cmd = "dd if=/dev/urandom of=%s bs=1M count=%d"
utils.run(cmd % (name, file_size))
utils.run("mkisofs -o %s %s" % (file_name, name))
utils.run("rm -rf %s" % (name))
return file_name
开发者ID:QiuMike,项目名称:tp-qemu,代码行数:27,代码来源:cdrom.py
示例3: save_image
def save_image(self, params, filename, root_dir=None):
"""
Save images to a path for later debugging.
:param params: Dictionary containing the test parameters.
:param filename: new filename for saved images.
:param root_dir: directory for saved images.
"""
src = self.image_filename
if root_dir is None:
root_dir = os.path.dirname(src)
backup_func = self.copy_data_file
if params.get('image_raw_device') == 'yes':
ifmt = params.get("image_format", "qcow2")
src = utils_misc.get_path(root_dir, ("raw_device.%s" % ifmt))
backup_func = self.copy_data_raw
backup_size = 0
if os.path.isfile(src):
backup_size = os.path.getsize(src)
s = os.statvfs(root_dir)
image_dir_free_disk_size = s.f_bavail * s.f_bsize
logging.info("Checking disk size on %s.", root_dir)
if not self.is_disk_size_enough(backup_size,
image_dir_free_disk_size):
return
backup_func(src, utils_misc.get_path(root_dir, filename))
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:29,代码来源:storage.py
示例4: rebase_test
def rebase_test(cmd):
"""
Subcommand 'qemu-img rebase' test
Change the backing file of a snapshot image in "unsafe mode":
Assume the previous backing file had missed and we just have to change
reference of snapshot to new one. After change the backing file of a
snapshot image in unsafe mode, the snapshot should work still.
:param cmd: qemu-img base command.
"""
if not 'rebase' in utils.system_output(cmd + ' --help',
ignore_status=True):
raise error.TestNAError("Current kvm user space version does not"
" support 'rebase' subcommand")
sn_fmt = params.get("snapshot_format", "qcow2")
sn1 = params["image_name_snapshot1"]
sn1 = utils_misc.get_path(
data_dir.get_data_dir(), sn1) + ".%s" % sn_fmt
base_img = storage.get_image_filename(params, data_dir.get_data_dir())
_create(cmd, sn1, sn_fmt, base_img=base_img, base_img_fmt=image_format)
# Create snapshot2 based on snapshot1
sn2 = params["image_name_snapshot2"]
sn2 = utils_misc.get_path(
data_dir.get_data_dir(), sn2) + ".%s" % sn_fmt
_create(cmd, sn2, sn_fmt, base_img=sn1, base_img_fmt=sn_fmt)
rebase_mode = params.get("rebase_mode")
if rebase_mode == "unsafe":
os.remove(sn1)
_rebase(cmd, sn2, base_img, image_format, mode=rebase_mode)
# Boot snapshot image after rebase
img_name, img_format = sn2.split('.')
_boot(img_name, img_format)
# Check sn2's format and backing_file
actual_base_img = _info(cmd, sn2, "backing file")
base_img_name = os.path.basename(base_img)
if not base_img_name in actual_base_img:
raise error.TestFail("After rebase the backing_file of 'sn2' is "
"'%s' which is not expected as '%s'"
% (actual_base_img, base_img_name))
status, output = _check(cmd, sn2)
if not status:
raise error.TestFail("Check image '%s' failed after rebase;"
"got error: %s" % (sn2, output))
try:
os.remove(sn2)
os.remove(sn1)
except Exception:
pass
开发者ID:QiuMike,项目名称:tp-qemu,代码行数:53,代码来源:qemu_img.py
示例5: create_iso_image
def create_iso_image(params, name, prepare=True, file_size=None):
"""
Creates 'new' iso image with one file on it
:param params: parameters for test
:param name: name of new iso image file
:param preapre: if True then it prepare cd images.
:param file_size: Size of iso image in MB
:return: path to new iso image file.
"""
error_context.context("Creating test iso image '%s'" % name,
logging.info)
cdrom_cd = params["target_cdrom"]
cdrom_cd = params[cdrom_cd]
if not os.path.isabs(cdrom_cd):
cdrom_cd = utils_misc.get_path(data_dir.get_data_dir(), cdrom_cd)
iso_image_dir = os.path.dirname(cdrom_cd)
if file_size is None:
file_size = 10
g_mount_point = tempfile.mkdtemp("gluster")
image_params = params.object_params(name)
if image_params.get("enable_gluster") == "yes":
if params.get("gluster_server"):
gluster_server = params.get("gluster_server")
else:
gluster_server = "localhost"
volume_name = params["gluster_volume_name"]
g_mount_link = "%s:/%s" % (gluster_server, volume_name)
mount_cmd = "mount -t glusterfs %s %s" % (g_mount_link, g_mount_point)
process.system(mount_cmd, timeout=60)
file_name = os.path.join(g_mount_point, "%s.iso" % name)
else:
file_name = utils_misc.get_path(iso_image_dir, "%s.iso" % name)
if prepare:
cmd = "dd if=/dev/urandom of=%s bs=1M count=%d"
process.run(cmd % (name, file_size))
process.run("mkisofs -o %s %s" % (file_name, name))
process.run("rm -rf %s" % (name))
if image_params.get("enable_gluster") == "yes":
gluster_uri = gluster.create_gluster_uri(image_params)
file_name = "%s%s.iso" % (gluster_uri, name)
try:
umount_cmd = "umount %s" % g_mount_point
process.system(umount_cmd, timeout=60)
os.rmdir(g_mount_point)
except Exception as err:
msg = "Fail to clean up %s" % g_mount_point
msg += "Error message %s" % err
logging.warn(msg)
return file_name
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:51,代码来源:cdrom.py
示例6: check_test
def check_test(cmd):
"""
Subcommand 'qemu-img check' test.
This tests will 'dd' to create a specified size file, and check it.
Then convert it to supported image_format in each loop and check again.
@param cmd: qemu-img base command.
"""
test_image = utils_misc.get_path(test.bindir,
params.get("image_name_dd"))
print "test_image = %s" % test_image
create_image_cmd = params.get("create_image_cmd")
create_image_cmd = create_image_cmd % test_image
print "create_image_cmd = %s" % create_image_cmd
utils.system(create_image_cmd)
s, o = _check(cmd, test_image)
if not s:
raise error.TestFail("Check image '%s' failed with error: %s" %
(test_image, o))
for fmt in params.get("supported_image_formats").split():
output_image = test_image + ".%s" % fmt
_convert(cmd, fmt, test_image, output_image)
s, o = _check(cmd, output_image)
if not s:
raise error.TestFail("Check image '%s' got error: %s" %
(output_image, o))
os.remove(output_image)
os.remove(test_image)
开发者ID:kongove,项目名称:virt-test,代码行数:29,代码来源:qemu_img.py
示例7: copy_file_from_nfs
def copy_file_from_nfs(src, dst, mount_point, image_name):
logging.info("Test failed before the install process start."
" So just copy a good image from nfs for following tests.")
utils_misc.mount(src, mount_point, "nfs", perm="ro")
image_src = utils_misc.get_path(mount_point, image_name)
shutil.copy(image_src, dst)
utils_misc.umount(src, mount_point, "nfs")
开发者ID:aginies,项目名称:virt-test,代码行数:7,代码来源:unattended_install.py
示例8: check_tray_status_test
def check_tray_status_test(vm, qemu_cdrom_device, guest_cdrom_device,
max_times):
"""
Test cdrom tray status reporting function.
"""
error.context("Copy test script to guest")
tray_check_src = params.get("tray_check_src")
if tray_check_src:
tray_check_src = utils_misc.get_path(test.virtdir,
"deps/%s" % tray_check_src)
vm.copy_files_to(tray_check_src, "/tmp")
if is_tray_opened(vm, qemu_cdrom_device) is None:
logging.warn("Tray status reporting is not supported by qemu!")
logging.warn("cdrom_test_tray_status test is skipped...")
return
error.context("Eject the cdrom in guest %s times" % max_times,
logging.info)
session = vm.wait_for_login(timeout=login_timeout)
for i in range(1, max_times):
session.cmd('eject %s' % guest_cdrom_device)
if not is_tray_opened(vm, qemu_cdrom_device):
raise error.TestFail("Monitor reports tray closed"
" when ejecting (round %s)" % i)
session.cmd('dd if=%s of=/dev/null count=1' % guest_cdrom_device)
if is_tray_opened(vm, qemu_cdrom_device):
raise error.TestFail("Monitor reports tray opened when reading"
" cdrom in guest (round %s)" % i)
time.sleep(workaround_eject_time)
开发者ID:WenliQuan,项目名称:virt-test,代码行数:30,代码来源:cdrom.py
示例9: get_snapshot_file
def get_snapshot_file(self):
"""
Get path of snapshot file.
"""
image_format = self.params["image_format"]
snapshot_file = "images/%s.%s" % (self.snapshot_file, image_format)
return utils_misc.get_path(data_dir.get_data_dir(), snapshot_file)
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:7,代码来源:live_snapshot_basic.py
示例10: check_test
def check_test(cmd):
"""
Subcommand 'qemu-img check' test.
This tests will 'dd' to create a specified size file, and check it.
Then convert it to supported image_format in each loop and check again.
@param cmd: qemu-img base command.
"""
test_image = utils_misc.get_path(data_dir.get_data_dir(),
params["image_name_dd"])
create_image_cmd = params["create_image_cmd"]
create_image_cmd = create_image_cmd % test_image
msg = " Create image %s by command %s" % (test_image, create_image_cmd)
error.context(msg, logging.info)
utils.system(create_image_cmd, verbose=False)
status, output = _check(cmd, test_image)
if not status:
raise error.TestFail("Check image '%s' failed with error: %s" %
(test_image, output))
for fmt in params["supported_image_formats"].split():
output_image = test_image + ".%s" % fmt
_convert(cmd, fmt, test_image, output_image)
status, output = _check(cmd, output_image)
if not status:
raise error.TestFail("Check image '%s' got error: %s" %
(output_image, output))
os.remove(output_image)
os.remove(test_image)
开发者ID:LeiCui,项目名称:virt-test,代码行数:29,代码来源:qemu_img.py
示例11: run_qemu_img
def run_qemu_img(test, params, env):
"""
'qemu-img' functions test:
1) Judge what subcommand is going to be tested
2) Run subcommand test
@param test: kvm test object
@param params: Dictionary with the test parameters
@param env: Dictionary with test environment.
"""
cmd = utils_misc.get_path(test.bindir, params.get("qemu_img_binary"))
if not os.path.exists(cmd):
raise error.TestError("Binary of 'qemu-img' not found")
image_format = params.get("image_format")
image_size = params.get("image_size", "10G")
image_name = storage.get_image_filename(params, test.bindir)
def _check(cmd, img):
"""
Simple 'qemu-img check' function implementation.
@param cmd: qemu-img base command.
@param img: image to be checked
"""
cmd += " check %s" % img
logging.info("Checking image '%s'...", img)
try:
output = utils.system_output(cmd)
except error.CmdError, e:
if "does not support checks" in str(e):
return (True, "")
else:
return (False, str(e))
return (True, output)
开发者ID:kongove,项目名称:virt-test,代码行数:35,代码来源:qemu_img.py
示例12: get_image_info
def get_image_info(image_name):
"""
Get the details of the volume(image) from qemu-img info
"""
qemu_img = utils_misc.get_path(test.bindir,
params.get("qemu_img_binary"))
if not os.path.exists(qemu_img):
raise error.TestError("Binary of 'qemu-img' not found")
cmd = "%s info %s" % (qemu_img, image_name)
format_vol = utils.system_output(cmd)
reg1 = re.compile(r'image:\s+(\S+)')
reg2 = re.compile(r'file format:\s+(\S+)')
reg3 = re.compile(r'virtual size:\s+(\S+.*)')
reg4 = re.compile(r'disk size:\s+(\S+.*)')
image_info = {}
for line in format_vol.splitlines():
match1 = re.search(reg1, line)
match2 = re.search(reg2, line)
match3 = re.search(reg3, line)
match4 = re.search(reg4, line)
if match1 is not None:
image_info['name'] = match1.group(1)
if match2 is not None:
image_info['format'] = match2.group(1)
if match3 is not None:
image_info['capacity'] = match3.group(1)
if match4 is not None:
image_info['allocation'] = match4.group(1)
return image_info
开发者ID:LeiCui,项目名称:virt-test,代码行数:30,代码来源:virsh_volume.py
示例13: create_iso_image
def create_iso_image(params, name, prepare=True, file_size=None):
"""
Creates 'new' iso image with one file on it
:param params: parameters for test
:param name: name of new iso image file. It could be the full path
of cdrom.
:param preapre: if True then it prepare cd images.
:param file_size: Size of iso image in MB
:return: path to new iso image file.
"""
error.context("Creating test iso image '%s'" % name, logging.info)
if not os.path.isabs(name):
cdrom_path = utils_misc.get_path(data_dir.get_data_dir(), name)
else:
cdrom_path = name
if not cdrom_path.endswith(".iso"):
cdrom_path = "%s.iso" % cdrom_path
name = os.path.basename(cdrom_path)
if file_size is None:
file_size = 10
if prepare:
cmd = "dd if=/dev/urandom of=%s bs=1M count=%d"
utils.run(cmd % (name, file_size))
utils.run("mkisofs -o %s %s" % (cdrom_path, name))
utils.run("rm -rf %s" % (name))
return cdrom_path
开发者ID:Chenditang,项目名称:tp-qemu,代码行数:30,代码来源:cdrom_block_size_check.py
示例14: run_test
def run_test(qemu_src_dir):
"""
run QEMU I/O test suite
:qemu_src_dir: path of qemu source code
"""
iotests_root = params.get("iotests_root", "tests/qemu-iotests")
extra_options = params.get("qemu_io_extra_options", "")
image_format = params.get("qemu_io_image_format")
result_pattern = params.get("iotests_result_pattern")
error_context.context("running qemu-iotests for image format %s"
% image_format, logging.info)
os.environ["QEMU_PROG"] = utils_misc.get_qemu_binary(params)
os.environ["QEMU_IMG_PROG"] = utils_misc.get_qemu_img_binary(params)
os.environ["QEMU_IO_PROG"] = utils_misc.get_qemu_io_binary(params)
os.environ["QEMU_NBD_PROG"] = utils_misc.get_binary('qemu-nbd', params)
os.chdir(os.path.join(qemu_src_dir, iotests_root))
cmd = './check'
if extra_options:
cmd += " %s" % extra_options
cmd += " -%s" % image_format
output = process.system_output(cmd, ignore_status=True, shell=True)
match = re.search(result_pattern, output, re.I | re.M)
if match:
iotests_log_file = "qemu_iotests_%s.log" % image_format
iotests_log_file = utils_misc.get_path(test.debugdir, iotests_log_file)
with open(iotests_log_file, 'w+') as log:
log.write(output)
log.flush()
msg = "Total test %s cases, %s failed"
raise exceptions.TestFail(msg % (match.group(2), match.group(1)))
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:31,代码来源:rh_qemu_iotests.py
示例15: coredump_exists
def coredump_exists(mntpnt, files, out_dir):
"""
Check if there is specified file in the image
If hit the files, copy them to output_dir
The out_dir is the directory contains debug log
:param mntpnt: The mountpoint on the host
:param files: The files pattern need be checked
:param out_dir: If found coredump files, copy them
to the directory
:return: Format as Bool, List which contains tuples
If the checked file exists,
return True, [("checked_file_name", "file_created_time")]
If not, return False, []
"""
file_exists = False
msgs_return = []
for chk_file in files:
file_need_check = utils_misc.get_path(mntpnt, chk_file)
files_glob = glob.glob(file_need_check)
if files_glob:
file_exists = True
for item in files_glob:
file_ctime = time.ctime(os.path.getctime(item))
msgs_return.append((os.path.basename(item),
file_ctime))
error.context("copy files %s %s" %
(item, out_dir), logging.info)
os.system("cp -rf %s %s" % (item, out_dir))
return file_exists, msgs_return
开发者ID:CongLi,项目名称:tp-qemu,代码行数:32,代码来源:check_coredump.py
示例16: run
def run(test, params, env):
vm = env.get_vm(params.get("vms", "main_vm").split(" ")[0])
vm.verify_alive()
steps_filename = params.get("steps")
if not steps_filename:
image_name = os.path.basename(params["image_name"])
steps_filename = "steps/%s.steps" % image_name
steps_filename = utils_misc.get_path(test.virtdir, steps_filename)
if not os.path.exists(steps_filename):
raise error.TestError("Steps file not found: %s" % steps_filename)
sf = open(steps_filename, "r")
lines = sf.readlines()
sf.close()
vm.resume()
current_step_num = 0
current_screendump = None
skip_current_step = False
# Iterate over the lines in the file
for line in lines:
line = line.strip()
if not line:
continue
logging.info(line)
if line.startswith("#"):
continue
words = line.split()
if words[0] == "step":
current_step_num += 1
current_screendump = None
skip_current_step = False
elif words[0] == "screendump":
current_screendump = words[1]
elif skip_current_step:
continue
elif words[0] == "sleep":
timeout_multiplier = float(params.get("timeout_multiplier") or 1)
time.sleep(float(words[1]) * timeout_multiplier)
elif words[0] == "key":
vm.send_key(words[1])
elif words[0] == "var":
if not handle_var(vm, params, words[1]):
logging.error("Variable not defined: %s", words[1])
elif words[0] == "barrier_2":
if current_screendump:
scrdump_filename = os.path.join(ppm_utils.get_data_dir(steps_filename), current_screendump)
else:
scrdump_filename = None
if not barrier_2(vm, words, params, test.debugdir, scrdump_filename, current_step_num):
skip_current_step = True
else:
vm.send_key(words[0])
开发者ID:MiriamDeng,项目名称:tp-qemu,代码行数:59,代码来源:steps.py
示例17: get_images
def get_images():
"""
Find the image names under the image directory
:return: image names
"""
return glob.glob(utils_misc.get_path(data_dir.get_data_dir(),
"images/*.*"))
开发者ID:CongLi,项目名称:tp-qemu,代码行数:8,代码来源:check_coredump.py
示例18: cleanup_cdroms
def cleanup_cdroms():
"""
Removes created cdrom
"""
logging.info("cleaning up temp cdrom images")
cdrom_test = utils_misc.get_path(data_dir.get_data_dir(), params.get("cdrom_test"))
os.remove(cdrom_test)
开发者ID:FengYang,项目名称:virt-test,代码行数:8,代码来源:boot_from_device.py
示例19: copy_uefishell
def copy_uefishell(self):
"""
Copy uefishell.iso
:return uefishell.iso path
"""
ovmf_path = self.params["ovmf_path"]
uefishell_filename = "UefiShell.iso"
uefishell_dst_path = "images/%s" % uefishell_filename
uefishell_src_path = utils_misc.get_path(
ovmf_path, uefishell_filename)
uefishell_dst_path = utils_misc.get_path(
data_dir.get_data_dir(), uefishell_dst_path)
if not os.path.exists(uefishell_dst_path):
cp_command = "cp -f %s %s" % (
uefishell_src_path, uefishell_dst_path)
process.system(cp_command)
return uefishell_dst_path
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:17,代码来源:uefishell.py
示例20: run_iometer_windows
def run_iometer_windows(test, params, env):
"""
Run Iometer for Windows on a Windows guest:
1) Boot guest with additional disk
2) Format the additional disk
3) Install and register Iometer
4) Perpare icf to Iometer.exe
5) Run Iometer.exe with icf
6) Copy result to host
:param test: kvm test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""
vm = env.get_vm(params["main_vm"])
vm.verify_alive()
timeout = int(params.get("login_timeout", 360))
session = vm.wait_for_login(timeout=timeout)
# format the target disk
utils_test.run_virt_sub_test(test, params, env, "format_disk")
error.context("Install Iometer", logging.info)
cmd_timeout = int(params.get("cmd_timeout", 360))
ins_cmd = params["install_cmd"]
vol_utils = utils_misc.get_winutils_vol(session)
if not vol_utils:
raise error.TestError("WIN_UTILS CDROM not found")
ins_cmd = re.sub("WIN_UTILS", vol_utils, ins_cmd)
session.cmd(cmd=ins_cmd, timeout=cmd_timeout)
time.sleep(0.5)
error.context("Register Iometer", logging.info)
reg_cmd = params["register_cmd"]
reg_cmd = re.sub("WIN_UTILS", vol_utils, reg_cmd)
session.cmd_output(cmd=reg_cmd, timeout=cmd_timeout)
error.context("Prepare icf for Iometer", logging.info)
icf_name = params["icf_name"]
ins_path = params["install_path"]
res_file = params["result_file"]
icf_file = utils_misc.get_path(test.virtdir, "deps/%s" % icf_name)
vm.copy_files_to(icf_file, "%s\\%s" % (ins_path, icf_name))
# Run Iometer
error.context("Start Iometer", logging.info)
session.cmd("cd %s" % ins_path)
logging.info("Change dir to: %s" % ins_path)
run_cmd = params["run_cmd"]
run_timeout = int(params.get("run_timeout", 1000))
logging.info("Set Timeout: %ss" % run_timeout)
run_cmd = run_cmd % (icf_name, res_file)
logging.info("Execute Command: %s" % run_cmd)
session.cmd(cmd=run_cmd, timeout=run_timeout)
error.context("Copy result '%s' to host" % res_file, logging.info)
vm.copy_files_from(res_file, test.resultsdir)
开发者ID:Antique,项目名称:virt-test,代码行数:58,代码来源:iometer_windows.py
注:本文中的virttest.utils_misc.get_path函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论