本文整理汇总了Python中virttest.utils_misc.get_image_info函数的典型用法代码示例。如果您正苦于以下问题:Python get_image_info函数的具体用法?Python get_image_info怎么用?Python get_image_info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_image_info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: copy_nfs_image
def copy_nfs_image(params, image_name, root_dir):
"""
copy image from image_path to nfs mount dir if image is not available
or corrupted.
:param params: Test dict params
:param image_name: Master image name.
:param root_dir: Base directory for relative filenames.
:raise: TestSetupFail if image is unavailable/corrupted
"""
image_format = params.get("image_format", "qcow2")
if params.get("setup_local_nfs", "no") == "yes":
# check for image availability in NFS shared path
base_dir = params.get("images_base_dir", data_dir.get_data_dir())
dst = get_image_filename(params, base_dir)
if(not os.path.isfile(dst) or
utils_misc.get_image_info(dst)['lcounts'].lower() == "true"):
source = os.path.join(root_dir, "images", image_name)
if image_format not in source:
source = "%s.%s" % (source, image_format)
logging.debug("Checking for image available in image data "
"path - %s", source)
# check for image availability in images data directory
if(os.path.isfile(source) and not
utils_misc.get_image_info(source)['lcounts'].lower() == "true"):
logging.debug("Copying guest image from %s to %s", source,
dst)
shutil.copy(source, dst)
else:
raise exceptions.TestSetupFail("Guest image is unavailable"
"/corrupted in %s and %s" %
(source, dst))
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:32,代码来源:storage.py
示例2: check_vol_info
def check_vol_info(pool_vol, vol_name, expect_info=None):
"""
Check the volume info, or/and compare with the expect_info.
:params pool_vol: Instance of PoolVolume.
:params vol_name: Name of the volume.
:params expect_info: Expect volume info for comparation.
"""
vol_info = pool_vol.volume_info(vol_name)
for key in vol_info:
logging.debug("Volume info: %s = %s", key, vol_info[key])
if not expect_info:
return True
else:
check_capacity_pass = True
check_allocation_pass = True
try:
# Get image info
vol_path = pool_vol.list_volumes()[vol_name]
img_info = utils_misc.get_image_info(vol_path)
if expect_info['Capacity'] != img_info['vsize']:
logging.debug("Capacity(Virtual size) is %s bytes",
img_info['vsize'])
logging.error("Volume capacity not equal to expect value %s",
expect_info['Capacity'])
check_capacity_pass = False
if expect_info['Allocation'] != img_info['dsize']:
logging.debug("Allocation(Disk size) is %s bytes",
img_info['dsize'])
logging.error("Volume Allocation not equal to expect value %s",
expect_info['Allocation'])
check_allocation_pass = False
return check_capacity_pass & check_allocation_pass
except KeyError, detail:
raise error.TestError("Fail to check volume info:\n%s" % detail)
开发者ID:uni-peter-zheng,项目名称:tp-libvirt,代码行数:35,代码来源:virsh_vol_resize.py
示例3: prepare_gluster_disk
def prepare_gluster_disk(disk_img, disk_format):
"""
Setup glusterfs and prepare disk image.
"""
# Get the image path and name from parameters
data_path = data_dir.get_data_dir()
image_name = params.get("image_name")
image_format = params.get("image_format")
image_source = os.path.join(data_path,
image_name + '.' + image_format)
# Setup gluster.
host_ip = libvirt.setup_or_cleanup_gluster(True, vol_name,
brick_path, pool_name)
logging.debug("host ip: %s ", host_ip)
image_info = utils_misc.get_image_info(image_source)
if image_info["format"] == disk_format:
disk_cmd = ("cp -f %s /mnt/%s" % (image_source, disk_img))
else:
# Convert the disk format
disk_cmd = ("qemu-img convert -f %s -O %s %s /mnt/%s" %
(image_info["format"], disk_format, image_source, disk_img))
# Mount the gluster disk and create the image.
utils.run("mount -t glusterfs %s:%s /mnt;"
" %s; chmod a+rw /mnt/%s; umount /mnt"
% (host_ip, vol_name, disk_cmd, disk_img))
return host_ip
开发者ID:noxdafox,项目名称:tp-libvirt,代码行数:29,代码来源:virtual_disks_gluster.py
示例4: prepare_gluster_disk
def prepare_gluster_disk(blk_source, test, **kwargs):
"""
Set up gluster disk device and replace the domain disk image
:param blk_source: The domain disk image path
:param test: Avocado test object
:param kwargs: Key words for gluster device setup
:return: host_ip
"""
vol_name = kwargs.get("vol_name")
brick_path = kwargs.get("brick_path")
disk_img = kwargs.get("disk_img")
disk_format = kwargs.get("disk_format")
host_ip = utlv.setup_or_cleanup_gluster(True, vol_name, brick_path)
logging.debug("host ip: %s ", host_ip)
# Copy the domain disk image to gluster disk path
image_info = utils_misc.get_image_info(blk_source)
dest_image = "/mnt/%s" % disk_img
if image_info["format"] == disk_format:
disk_cmd = ("cp -f %s %s" % (blk_source, dest_image))
else:
disk_cmd = ("qemu-img convert -f %s -O %s %s %s" %
(image_info["format"], disk_format,
blk_source, dest_image))
# Mount the gluster disk and create the image
src_mnt = "%s:%s" % (host_ip, vol_name)
if not utils_misc.mount(src_mnt, "/mnt", "glusterfs"):
test.error("glusterfs mount failed")
process.run("%s && chmod a+rw /mnt/%s && umount /mnt" %
(disk_cmd, disk_img), shell=True)
return host_ip
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:31,代码来源:virsh_boot.py
示例5: prepare_gluster_disk
def prepare_gluster_disk(disk_img, disk_format):
"""
Setup glusterfs and prepare disk image.
"""
# Get the image path
image_source = vm.get_first_disk_devices()['source']
# Setup gluster
host_ip = libvirt.setup_or_cleanup_gluster(True, vol_name,
brick_path, pool_name)
logging.debug("host ip: %s ", host_ip)
image_info = utils_misc.get_image_info(image_source)
image_dest = "/mnt/%s" % disk_img
if image_info["format"] == disk_format:
disk_cmd = ("cp -f %s %s" % (image_source, image_dest))
else:
# Convert the disk format
disk_cmd = ("qemu-img convert -f %s -O %s %s %s" %
(image_info["format"], disk_format,
image_source, image_dest))
# Mount the gluster disk and create the image.
process.run("mount -t glusterfs %s:%s /mnt && "
"%s && chmod a+rw /mnt/%s && umount /mnt"
% (host_ip, vol_name, disk_cmd, disk_img),
shell=True)
return host_ip
开发者ID:lento-sun,项目名称:tp-libvirt,代码行数:29,代码来源:virtual_disks_gluster.py
示例6: check_image
def check_image(output, check_point, expected_value):
"""
Verify converted image file allocation mode and format
"""
img_path = get_img_path(output)
if not img_path or not os.path.isfile(img_path):
logging.error("Fail to get image path: %s", img_path)
return
img_info = utils_misc.get_image_info(img_path)
logging.info("Image info after converted: %s", img_info)
if check_point == "allocation":
if expected_value == "sparse":
if img_info['vsize'] > img_info['dsize']:
logging.info("Image file is sparse")
else:
raise error.TestFail("Image allocation check fail")
elif expected_value == "preallocated":
if img_info['vsize'] <= img_info['dsize']:
logging.info("Image file is preallocated")
else:
raise error.TestFail("Image allocation check fail")
if check_point == "format":
if expected_value == img_info['format']:
logging.info("Image file format is %s", expected_value)
else:
raise error.TestFail("Image format check fail")
开发者ID:noxdafox,项目名称:tp-libvirt,代码行数:26,代码来源:v2v_options.py
示例7: check_image
def check_image(img_path, check_point, expected_value):
"""
Verify image file allocation mode and format
"""
if not img_path or not os.path.isfile(img_path):
raise exceptions.TestError("Image path: '%s' is invalid" % img_path)
img_info = utils_misc.get_image_info(img_path)
logging.debug("Image info: %s", img_info)
if check_point == "allocation":
if expected_value == "sparse":
if img_info['vsize'] > img_info['dsize']:
logging.info("%s is a sparse image", img_path)
else:
raise exceptions.TestFail("%s is not a sparse image" % img_path)
elif expected_value == "preallocated":
if img_info['vsize'] <= img_info['dsize']:
logging.info("%s is a preallocated image", img_path)
else:
raise exceptions.TestFail("%s is not a preallocated image"
% img_path)
if check_point == "format":
if expected_value == img_info['format']:
logging.info("%s format is %s", img_path, expected_value)
else:
raise exceptions.TestFail("%s format is not %s"
% (img_path, expected_value))
开发者ID:chloerh,项目名称:tp-libvirt,代码行数:26,代码来源:v2v_options.py
示例8: modify_source
def modify_source(vm_name, target, dst_image):
"""
Modify domain's configuration to change its disk source
"""
try:
virsh.detach_disk(vm_name, target, extra="--config",
ignore_status=False)
dst_image_format = utils_misc.get_image_info(dst_image)['format']
options = "--config --subdriver %s" % dst_image_format
virsh.attach_disk(vm_name, dst_image, target, extra=options,
ignore_status=False)
except (remote.LoginError, virt_vm.VMError,
aexpect.ShellError), detail:
raise error.TestFail("Modify guest source failed: %s" % detail)
开发者ID:FengYang,项目名称:tp-libvirt,代码行数:14,代码来源:virt_sysprep.py
示例9: __init__
def __init__(self, test, params):
self.td = None
self.cpu_num = int(params.get("cpu_num", "1"))
self.vm_name = params.get("main_vm")
self.vm_new_name = params.get("vm_new_name")
self.cgroup_name = params.get("cgroup_name")
self.cgroup_dir = params.get("cgroup_dir")
self.new_image_file = params.get("new_image_file")
if self.new_image_file:
self.new_image_file = os.path.join(test.virtdir,
self.new_image_file)
self.time_out = int(params.get("time_out", "600"))
self.cpu_status = utils_misc.get_cpu_status(self.cpu_num)
self.twice_execute = "yes" == params.get("twice_execute", "no")
self.kill_first = "yes" == params.get("kill_first", "no")
if params.get("abnormal_type") in ["disk_lack", ""]:
self.selinux_enforcing = utils_selinux.is_enforcing()
if self.selinux_enforcing:
utils_selinux.set_status("permissive")
self.fs_type = params.get("fs_type", "ext4")
xml_file = vm_xml.VMXML.new_from_inactive_dumpxml(self.vm_name)
disk_node = xml_file.get_disk_all()['vda']
source_file = disk_node.find('source').get('file')
self.image_size = utils_misc.get_image_info(source_file)['dsize']
# Set the size to be image_size
iscsi_size = "%sM" % (self.image_size / 1024 / 1024)
params['image_size'] = iscsi_size
self.iscsi_dev = qemu_storage.Iscsidev(params, test.virtdir,
"iscsi")
try:
device_source = self.iscsi_dev.setup()
except (exceptions.TestError, ValueError) as detail:
self.iscsi_dev.cleanup()
self.test.cancel("Cannot get iscsi device on this"
" host:%s\n" % detail)
libvirt.mk_label(device_source)
libvirt.mk_part(device_source, iscsi_size)
self.mount_dir = os.path.join(test.virtdir,
params.get('mount_dir'))
if not os.path.exists(self.mount_dir):
os.mkdir(self.mount_dir)
params['mount_dir'] = self.mount_dir
self.partition = device_source + "1"
libvirt.mkfs(self.partition, self.fs_type)
utils_misc.mount(self.partition, self.mount_dir, self.fs_type)
self.new_image_file = os.path.join(self.mount_dir, "new_file")
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:46,代码来源:resource_abnormal.py
示例10: __init__
def __init__(self, test, params):
self.cpu_num = int(params.get("cpu_num", "1"))
self.cgroup_name = params.get("cgroup_name")
self.cgroup_dir = params.get("cgroup_dir")
self.time_out = int(params.get("time_out", "600"))
self.vm_name = params.get("main_vm")
self.time_out = int(params.get("time_out", "600"))
self.twice_execute = "yes" == params.get("twice_execute", "no")
self.kill_first = "yes" == params.get("kill_first", "no")
xml_file = vm_xml.VMXML.new_from_inactive_dumpxml(self.vm_name)
disk_node = xml_file.get_disk_all()['vda']
source_file = disk_node.find('source').get('file')
image_type = utils_misc.get_image_info(source_file)['format']
if image_type != "qcow2":
raise error.TestNAError("Disk image format is not qcow2, "
"ignore snapshot test!")
self.cpu_status = utils_misc.get_cpu_status(self.cpu_num)
self.current_snp_list = []
self.snp_list = virsh.snapshot_list(self.vm_name)
env = params.get("env")
vm = env.get_vm(self.vm_name)
# This can add snapshot create time
vm.wait_for_login()
开发者ID:noxdafox,项目名称:tp-libvirt,代码行数:23,代码来源:resource_abnormal.py
示例11: run
def run(test, params, env):
"""
Test virsh blockresize command for block device of domain.
1) Init the variables from params.
2) Create an image with specified format.
3) Attach a disk image to vm.
4) Test blockresize for the disk
5) Detach the disk
"""
# MAIN TEST CODE ###
# Process cartesian parameters
vm_name = params.get("main_vm", "virt-tests-vm1")
image_format = params.get("disk_image_format", "qcow2")
initial_disk_size = params.get("initial_disk_size", "1M")
status_error = "yes" == params.get("status_error", "yes")
resize_value = params.get("resize_value")
virsh_dargs = {'debug': True}
# Create an image.
tmp_dir = data_dir.get_tmp_dir()
image_path = os.path.join(tmp_dir, "blockresize_test")
logging.info("Create image: %s, "
"size %s, "
"format %s", image_path, initial_disk_size, image_format)
cmd = "qemu-img create -f %s %s %s" % (image_format, image_path,
initial_disk_size)
status, output = commands.getstatusoutput(cmd)
if status:
raise error.TestError("Creating image file %s failed: %s" % \
(image_path, output))
# Hotplug the image as disk device
result = virsh.attach_disk(vm_name, source=image_path, target="vdd",
extra=" --subdriver %s" % image_format)
if result.exit_status:
raise error.TestError("Failed to attach disk %s to VM: %s." %
(image_path, result.stderr))
if resize_value == "over_size":
# Use byte unit for over_size test
resize_value = "%s" % OVER_SIZE + "b"
# Run the test
try:
result = virsh.blockresize(vm_name, image_path,
resize_value, **virsh_dargs)
status = result.exit_status
err = result.stderr.strip()
# Check status_error
if status_error:
if status == 0 or err == "":
raise error.TestFail("Expect failure, but run successfully!")
# No need to do more test
return
else:
if status != 0 or err != "":
raise error.TestFail("Run failed with right "
"virsh blockresize command")
if resize_value[-1] in "bkm":
expected_size = 1024*1024
elif resize_value[-1] == "g":
expected_size = 1024*1024*1024
else:
raise error.TestError("Unknown infomation of unit")
image_info = utils_misc.get_image_info(image_path)
actual_size = int(image_info['vsize'])
logging.info("The expected block size is %s bytes,"
"the actual block size is %s bytes",
expected_size, actual_size)
if int(actual_size) != int(expected_size):
raise error.TestFail("New blocksize set by blockresize is "
"different from actual size from "
"'qemu-img info'")
finally:
virsh.detach_disk(vm_name, target="vdd")
if os.path.exists(image_path):
os.remove(image_path)
开发者ID:Guannan-Ren,项目名称:tp-libvirt,代码行数:86,代码来源:virsh_blockresize.py
示例12:
"this may lead to migration problems. "
"Consider specifying vm.connect_uri using "
"fully-qualified network-based style.")
if srcuri.count('///') or srcuri.count('EXAMPLE'):
raise error.TestNAError(warning_text % ('source', srcuri))
if dsturi.count('///') or dsturi.count('EXAMPLE'):
raise error.TestNAError(warning_text % ('destination', dsturi))
# Config auto-login to remote host for migration
ssh_key.setup_ssh_key(remote_ip, username, host_pwd)
sys_image = vm.get_first_disk_devices()
sys_image_source = sys_image["source"]
sys_image_info = utils_misc.get_image_info(sys_image_source)
logging.debug("System image information:\n%s", sys_image_info)
sys_image_fmt = sys_image_info["format"]
created_img_path = os.path.join(os.path.dirname(sys_image_source),
"vsmimages")
migrate_in_advance = "yes" == params.get("migrate_in_advance", "no")
status_error = "yes" == params.get("status_error", "no")
if source_type == "file" and device_type == "lun":
status_error = True
try:
# For safety and easily reasons, we'd better define a new vm
new_vm_name = "%s_vsmtest" % vm.name
mig = utlv.MigrationTest()
开发者ID:Chenditang,项目名称:tp-libvirt,代码行数:31,代码来源:virsh_migrate_virtio_scsi.py
示例13: run
#.........这里部分代码省略.........
% detail)
# Use id to get same path on local and remote
block_device = get_disk_id(target)
if block_device is None:
rdm.iscsi_login_setup(local_host, target, is_login=False)
utlv.setup_or_cleanup_iscsi(is_setup=False)
test.error("Set iscsi device couldn't find id?")
srcuri = params.get("virsh_migrate_srcuri")
dsturi = params.get("virsh_migrate_dsturi")
remote_ip = params.get("remote_ip")
username = params.get("remote_user", "root")
host_pwd = params.get("remote_pwd")
# Connection to remote, init here for cleanup
runner = None
# Identify easy config. mistakes early
warning_text = ("Migration VM %s URI %s appears problematic "
"this may lead to migration problems. "
"Consider specifying vm.connect_uri using "
"fully-qualified network-based style.")
if srcuri.count('///') or srcuri.count('EXAMPLE'):
test.cancel(warning_text % ('source', srcuri))
if dsturi.count('///') or dsturi.count('EXAMPLE'):
test.cancel(warning_text % ('destination', dsturi))
# Config auto-login to remote host for migration
ssh_key.setup_ssh_key(remote_ip, username, host_pwd)
sys_image = vm.get_first_disk_devices()
sys_image_source = sys_image["source"]
sys_image_info = utils_misc.get_image_info(sys_image_source)
logging.debug("System image information:\n%s", sys_image_info)
sys_image_fmt = sys_image_info["format"]
created_img_path = os.path.join(os.path.dirname(sys_image_source),
"vsmimages")
migrate_in_advance = "yes" == params.get("migrate_in_advance", "no")
status_error = "yes" == params.get("status_error", "no")
if source_type == "file" and device_type == "lun":
status_error = True
try:
# For safety and easily reasons, we'd better define a new vm
new_vm_name = "%s_vsmtest" % vm.name
mig = utlv.MigrationTest()
if vm.is_alive():
vm.destroy()
utlv.define_new_vm(vm.name, new_vm_name)
vm = libvirt_vm.VM(new_vm_name, vm.params, vm.root_dir,
vm.address_cache)
# Change the disk of the vm to shared disk
# Detach exist devices
devices = vm.get_blk_devices()
for device in devices:
s_detach = virsh.detach_disk(vm.name, device, "--config",
debug=True)
if not s_detach:
test.error("Detach %s failed before test.", device)
# Attach system image as vda
# Then added scsi disks will be sda,sdb...
开发者ID:nasastry,项目名称:tp-libvirt,代码行数:67,代码来源:virsh_migrate_virtio_scsi.py
示例14: run
#.........这里部分代码省略.........
cmd = "qemu-img create -f %s %s %s" % (image_format, image_path,
initial_disk_size)
status, output = commands.getstatusoutput(cmd)
if status:
raise error.TestError("Creating image file %s failed: %s"
% (image_path, output))
# Hotplug the image as disk device
result = virsh.attach_disk(vm_name, source=image_path, target="vdd",
extra=" --subdriver %s" % image_format)
if result.exit_status:
raise error.TestError("Failed to attach disk %s to VM: %s."
% (image_path, result.stderr))
if resize_value == "over_size":
# Use byte unit for over_size test
resize_value = "%s" % OVER_SIZE + "b"
# Run the test
try:
result = virsh.blockresize(vm_name, image_path,
resize_value, **virsh_dargs)
status = result.exit_status
err = result.stderr.strip()
# Check status_error
if status_error:
if status == 0 or err == "":
raise error.TestFail("Expect failure, but run successfully!")
# No need to do more test
return
else:
if status != 0 or err != "":
# bz 1002813 will result in an error on this
err_str = "unable to execute QEMU command 'block_resize': Could not resize: Invalid argument"
if resize_value[-2] in "kb" and re.search(err_str, err):
raise error.TestNAError("BZ 1002813 not yet applied")
else:
raise error.TestFail("Run failed with right "
"virsh blockresize command")
# Although kb should not be used, libvirt/virsh will accept it and
# consider it as a 1000 bytes, which caused issues for qed & qcow2
# since they expect a value evenly divisible by 512 (hence bz 1002813).
if "kb" in resize_value:
value = int(resize_value[:-2])
if image_format in ["qed", "qcow2"]:
# qcow2 and qed want a VIR_ROUND_UP value based on 512 byte
# sectors - hence this less than visually appealing formula
expected_size = (((value * 1000) + 512 - 1) / 512) * 512
else:
# Raw images...
# Ugh - there's some rather ugly looking math when kb
# (or mb, gb, tb, etc.) are used as the scale for the
# value to create an image. The blockresize for the
# running VM uses a qemu json call which differs from
# qemu-img would do - resulting in (to say the least)
# awkward sizes. We'll just have to make sure we don't
# deviates more than a sector.
expected_size = value * 1000
elif "kib" in resize_value:
value = int(resize_value[:-3])
expected_size = value * 1024
elif resize_value[-1] in "b":
expected_size = int(resize_value[:-1])
elif resize_value[-1] in "k":
value = int(resize_value[:-1])
expected_size = value * 1024
elif resize_value[-1] == "m":
value = int(resize_value[:-1])
expected_size = value * 1024 * 1024
elif resize_value[-1] == "g":
value = int(resize_value[:-1])
expected_size = value * 1024 * 1024 * 1024
else:
raise error.TestError("Unknown scale value")
image_info = utils_misc.get_image_info(image_path)
actual_size = int(image_info['vsize'])
logging.info("The expected block size is %s bytes, "
"the actual block size is %s bytes",
expected_size, actual_size)
# See comment above regarding Raw images
if image_format == "raw" and resize_value[-2] in "kb":
if abs(int(actual_size) - int(expected_size)) > 512:
raise error.TestFail("New raw blocksize set by blockresize do "
"not match the expected value")
else:
if int(actual_size) != int(expected_size):
raise error.TestFail("New blocksize set by blockresize is "
"different from actual size from "
"'qemu-img info'")
finally:
virsh.detach_disk(vm_name, target="vdd")
if os.path.exists(image_path):
os.remove(image_path)
开发者ID:Antique,项目名称:tp-libvirt,代码行数:101,代码来源:virsh_blockresize.py
示例15: len
os.remove(clone_image)
except error.CmdError, detail:
raise error.TestFail("Clean clone guest failed!:%s" % detail)
sysprep_type = params.get("sysprep_type", 'clone')
sysprep_target = params.get("sysprep_target", 'guest')
sysprep_hostname = params.get("sysprep_hostname", 'sysprep_test')
vm_name = params.get("main_vm", "virt-tests-vm1")
file_system = params.get("sysprep_file_system", "ext3")
vm = env.get_vm(vm_name)
disks = vm.get_disk_devices()
if len(disks):
disk = disks.values()[0]
image = disk['source']
target = disks.keys()[0]
image_info_dict = utils_misc.get_image_info(image)
if sysprep_type == "sparsify" and image_info_dict['format'] != 'qcow2':
raise error.TestNAError("This test case needs qcow2 format image.")
else:
raise error.TestError("Can not get disk of %s" % vm_name)
vt = libguestfs.VirtTools(vm, params)
fs_type = vt.get_primary_disk_fs_type()
if fs_type != file_system:
raise error.TestNAError("This test case gets wrong disk file system."
"get: %s, expected: %s" % (fs_type,
file_system))
# Do some prepare action
vm_clone_name = "%s_clone" % vm_name
clone_image = "%s_clone.img" % image
vmxml = vm_xml.VMXML.new_from_dumpxml(vm_name)
开发者ID:FengYang,项目名称:tp-libvirt,代码行数:31,代码来源:virt_sysprep.py
示例16: run
#.........这里部分代码省略.........
del_pool = False
else:
# Create a new pool
disk_vol = []
if pool_type == "disk":
disk_vol.append(params.get("pre_vol", "10M"))
libv_pvt.pre_pool(
pool_name=pool_name,
pool_type=pool_type,
pool_target=pool_target,
emulated_image=emulated_image,
image_size=emulated_image_size,
pre_disk_vol=disk_vol,
)
libv_vol = libvirt_storage.PoolVolume(pool_name)
if libv_vol.volume_exists(vol_name):
logging.debug("Use exist volume '%s'", vol_name)
elif vol_format in ["raw", "qcow2", "qed", "vmdk"]:
# Create a new volume
libv_pvt.pre_vol(
vol_name=vol_name, vol_format=vol_format, capacity=vol_capability, allocation=None, pool_name=pool_name
)
elif vol_format == "partition":
vol_name = libv_vol.list_volumes().keys()[0]
logging.debug("Partition %s in disk pool is volume" % vol_name)
elif vol_format == "sparse":
# Create a sparse file in pool
sparse_file = pool_target + "/" + vol_name
cmd = "dd if=/dev/zero of=" + sparse_file
cmd += " bs=1 count=0 seek=" + vol_capability
utils.run(cmd)
else:
raise error.TestError("Unknown volume format %s" % vol_format)
# Refresh the pool
virsh.pool_refresh(pool_name)
vol_info = libv_vol.volume_info(vol_name)
for key in vol_info:
logging.debug("Original volume info: %s = %s", key, vol_info[key])
# Metadata preallocation is not support for block volume
if vol_info["Type"] == "block" and clone_option.count("prealloc-metadata"):
clone_status_error = True
if pool_type == "disk":
new_vol_name = libvirt.new_disk_vol_name(pool_name)
if new_vol_name is None:
raise error.TestError("Fail to generate volume name")
# update polkit rule as the volume name changed
if setup_libvirt_polkit:
vol_pat = r"lookup\('vol_name'\) == ('\S+')"
new_value = "lookup('vol_name') == '%s'" % new_vol_name
libvirt.update_polkit_rule(params, vol_pat, new_value)
# Clone volume
clone_result = virsh.vol_clone(vol_name, new_vol_name, pool_name, clone_option, debug=True)
if not clone_status_error:
if clone_result.exit_status != 0:
raise error.TestFail("Clone volume fail:\n%s" % clone_result.stderr.strip())
else:
vol_info = libv_vol.volume_info(new_vol_name)
for key in vol_info:
logging.debug("Cloned volume info: %s = %s", key, vol_info[key])
logging.debug("Clone volume successfully.")
# Wipe the new clone volume
if alg:
logging.debug("Wiping volume by '%s' algorithm", alg)
wipe_result = virsh.vol_wipe(
new_vol_name, pool_name, alg, unprivileged_user=unpri_user, uri=uri, debug=True
)
unsupported_err = ["Unsupported algorithm", "no such pattern sequence"]
if not wipe_status_error:
if wipe_result.exit_status != 0:
if any(err in wipe_result.stderr for err in unsupported_err):
raise error.TestNAError(wipe_result.stderr)
raise error.TestFail("Wipe volume fail:\n%s" % clone_result.stdout.strip())
else:
virsh_vol_info = libv_vol.volume_info(new_vol_name)
for key in virsh_vol_info:
logging.debug("Wiped volume info(virsh): %s = %s", key, virsh_vol_info[key])
vol_path = virsh.vol_path(new_vol_name, pool_name).stdout.strip()
qemu_vol_info = utils_misc.get_image_info(vol_path)
for key in qemu_vol_info:
logging.debug("Wiped volume info(qemu): %s = %s", key, qemu_vol_info[key])
if qemu_vol_info["format"] != "raw":
raise error.TestFail("Expect wiped volume " "format is raw")
elif wipe_status_error and wipe_result.exit_status == 0:
raise error.TestFail("Expect wipe volume fail, but run" " successfully.")
elif clone_status_error and clone_result.exit_status == 0:
raise error.TestFail("Expect clone volume fail, but run" " successfully.")
finally:
# Clean up
try:
if del_pool:
libv_pvt.cleanup_pool(pool_name, pool_type, pool_target, emulated_image)
else:
# Only delete the volumes
libv_vol = libvirt_storage.PoolVolume(pool_name)
for vol in [vol_name, new_vol_name]:
libv_vol.delete_volume(vol)
except error.TestFail, detail:
logging.error(str(detail))
开发者ID:uni-peter-zheng,项目名称:tp-libvirt,代码行数:101,代码来源:virsh_vol_clone_wipe.py
示例17: check_vol
#.........这里部分代码省略.........
expected['name'], expected['type'],
actual_list['type'])
error_count += 1
else:
logging.debug("Type of volume: %s from virsh vol-list "
"successfully checked against the created "
"volume type", expected['name'])
# Check type against virsh vol-info
if expected['type'] != actual_info['Type']:
logging.error("Volume type mismatch for volume: %s\n"
"Expected Type: %s\n Type from vol-info: %s",
expected['name'], expected['type'],
actual_info['Type'])
error_count += 1
else:
logging.debug("Type of volume: %s from virsh vol-info successfully"
" checked against the created volume type",
expected['name'])
# Check name against virsh vol-info
if expected['name'] != actual_info['Name']:
logging.error("Volume name mismatch for volume: %s\n"
"Expected name: %s\n Name from vol-info: %s",
expected['name'],
expected['name'], actual_info['Name'])
error_count += 1
else:
logging.debug("Name of volume: %s from virsh vol-info successfully"
" checked against the created volume name",
expected['name'])
# Check format from against qemu-img info
img_info = utils_misc.get_image_info(expected['path'])
if expected['format']:
if expected['format'] != img_info['format']:
logging.error("Volume format mismatch for volume: %s\n"
"Expected format: %s\n"
"Format from qemu-img info: %s",
expected['name'], expected['format'],
img_info['format'])
error_count += 1
else:
logging.debug("Format of volume: %s from qemu-img info "
"checked successfully against the created "
"volume format", expected['name'])
# Check format against vol-dumpxml
if expected['format']:
if expected['format'] != volume_xml.format:
logging.error("Volume format mismatch for volume: %s\n"
"Expected format: %s\n"
"Format from vol-dumpxml: %s",
expected['name'], expected['format'],
volume_xml.format)
error_count += 1
else:
logging.debug("Format of volume: %s from virsh vol-dumpxml "
"checked successfully against the created"
" volume format", expected['name'])
logging.info(expected['encrypt_format'])
# Check encrypt against vol-dumpxml
if expected['encrypt_format']:
# As the 'default' format will change to specific valut(qcow), so
# just output it here
开发者ID:Antique,项目名称:tp-libvirt,代码行数:67,代码来源:virsh_volume.py
示例18: run
#.........这里部分代码省略.........
bad_cloned_vol_name = params.get("bad_cloned_vol_name", "")
if bad_cloned_vol_name:
new_vol_name = bad_cloned_vol_name
# Clone volume
clone_result = virsh.vol_c
|
请发表评论