本文整理汇总了Python中zstackwoodpecker.test_lib.lib_find_host_by_vm函数的典型用法代码示例。如果您正苦于以下问题:Python lib_find_host_by_vm函数的具体用法?Python lib_find_host_by_vm怎么用?Python lib_find_host_by_vm使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了lib_find_host_by_vm函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test
def test():
global test_obj_dict
#enable vmware vmotion
SI = vct_ops.connect_vcenter(os.environ['vcenter'])
content = SI.RetrieveContent()
hosts = vct_ops.get_host(content)
for host in hosts:
vct_ops.enable_vmotion(host)
network_pattern = 'L3-%s'%os.environ['dportgroup']
if not vct_ops.lib_get_vcenter_l3_by_name(network_pattern):
network_pattern = 'L3-%s'%os.environ['portgroup0']
ova_image_name = os.environ['vcenterDefaultmplate']
disk_offering = test_lib.lib_get_disk_offering_by_name(os.environ.get('largeDiskOfferingName'))
#create vm
vm = test_stub.create_vm_in_vcenter(vm_name = 'migrate_vm', image_name = ova_image_name, l3_name = network_pattern)
vm.check()
test_obj_dict.add_vm(vm)
#check whether vm migration candidate hosts exist
candidate_hosts = vm_ops.get_vm_migration_candidate_hosts(vm.vm.uuid).inventories
if candidate_hosts == []:
test_util.test_logger('Not find vm migration candidate hosts, skip test migrate vm')
else:
test_util.test_dsc('Migrate vm to the specified host')
host_uuid = candidate_hosts[0].uuid
vm_ops.migrate_vm(vm.vm.uuid, host_uuid)
vm.update()
vm.check()
#check whether the specified host is effective
assert candidate_hosts[0].name == test_lib.lib_find_host_by_vm(vm.vm).name
#check the consistency of the migration in zstack and vmware
assert candidate_hosts[0].name == vct_ops.find_host_by_vm(content, vm.vm.name)
test_util.test_dsc('vm in suspended state does not allow migration')
vm.suspend()
candidate_host = vm_ops.get_vm_migration_candidate_hosts(vm.vm.uuid).inventories
assert candidate_host == []
#create vm with disk
vm1 = test_stub.create_vm_in_vcenter(vm_name = 'migrate_vm_with_disk', image_name = ova_image_name, l3_name = network_pattern, disk_offering_uuids = [disk_offering.uuid])
vm1.check()
test_obj_dict.add_vm(vm1)
#check whether vm migration candidate hosts exist
candidate_hosts = vm_ops.get_vm_migration_candidate_hosts(vm1.vm.uuid).inventories
if candidate_hosts == []:
test_util.test_logger('Not find vm migration candidate hosts, skip test migrate vm with disk')
else:
test_util.test_dsc('Migrate vm with disk to the specified host')
host_uuid = candidate_hosts[0].uuid
vm_ops.migrate_vm(vm1.vm.uuid, host_uuid)
vm1.update()
vm1.check()
assert candidate_hosts[0].name == test_lib.lib_find_host_by_vm(vm1.vm).name
assert candidate_hosts[0].name == vct_ops.find_host_by_vm(content, vm1.vm.name)
#cleanup
test_lib.lib_error_cleanup(test_obj_dict)
test_util.test_pass("Migrate vm test passed.")
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:60,代码来源:test_migrate_vm_with_shared_storage.py
示例2: test
def test():
global vm
vm = test_stub.create_vm()
#1
hostname='vm123.zstack.org'
vm_ops.set_vm_hostname(vm.get_vm().uuid,'vm123.zstack.org')
host=test_lib.lib_find_host_by_vm(vm.get_vm())
host_ops.reconnect_host(host.uuid)
hostname_inv=vm_ops.get_vm_hostname(vm.get_vm().uuid)
if hostname_inv != hostname:
test_util.test_fail('can not get the vm hostname after set vm hostname')
vm_inv=res_ops.get_resource(res_ops.VM_INSTANCE,uuid=vm.get_vm().uuid)[0]
if vm_inv.vmNics[0].ip !=vm.get_vm().vmNics[0].ip:
test_util.test_fail('can not get the correct ip address after set vm hostname and reconnected host')
#2
hostname = 'vm1234.zstack.org'
vm_ops.set_vm_hostname(vm.get_vm().uuid,hostname)
host=test_lib.lib_find_host_by_vm(vm.get_vm())
vm_ops.reboot_vm(vm.get_vm().uuid)
hostname_inv=vm_ops.get_vm_hostname(vm.get_vm().uuid)
if hostname_inv != hostname:
test_util.test_fail('can not get the vm hostname after set vm hostname')
vm_inv=res_ops.get_resource(res_ops.VM_INSTANCE,uuid=vm.get_vm().uuid)[0]
if vm_inv.vmNics[0].ip !=vm.get_vm().vmNics[0].ip:
test_util.test_fail('can not get the correct ip address after set vm hostname and reboot vm')
#3
hostname = 'vm12345.zstack.org'
vm_ops.set_vm_hostname(vm.get_vm().uuid, hostname)
host = test_lib.lib_find_host_by_vm(vm.get_vm())
host_ops.reconnect_host(host.uuid)
vm_ops.reboot_vm(vm.get_vm().uuid)
hostname_inv = vm_ops.get_vm_hostname(vm.get_vm().uuid)
if hostname_inv != hostname:
test_util.test_fail('can not get the vm hostname after set vm hostname')
vm_inv = res_ops.get_resource(res_ops.VM_INSTANCE, uuid=vm.get_vm().uuid)[0]
if vm_inv.vmNics[0].ip != vm.get_vm().vmNics[0].ip:
test_util.test_fail('can not get the correct ip address after set vm hostname and reboot vm and reconnect host')
test_util.test_pass('SetVMHostname and get vm\'s correct ip')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:48,代码来源:test_set_vm_hostname.py
示例3: test
def test():
global vm
loop = 20
count = 0
vm_creation_option = test_util.VmOption()
image_name = os.environ.get('imageName_net')
l3_name = os.environ.get('l3VlanNetworkName1')
vm = test_stub.create_vm("test_libvirt_snapshot_vm", image_name, l3_name)
test_obj_dict.add_vm(vm)
vm.check()
volume_uuid = test_lib.lib_get_root_volume(vm.get_vm()).uuid
snapshots = test_obj_dict.get_volume_snapshot(volume_uuid)
snapshots.set_utility_vm(vm)
while count <= loop:
sp_name = "create_snapshot" + str(count)
snapshots.create_snapshot(sp_name)
snapshots.check()
count += 1
host = test_lib.lib_find_host_by_vm(vm.get_vm())
result = test_lib.lib_execute_ssh_cmd(host.managementIp, 'root', 'password', 'service libvirtd restart')
time.sleep(600)
vm.check()
snapshots.check()
snapshots.delete()
test_obj_dict.rm_volume_snapshot(snapshots)
test_lib.lib_error_cleanup(test_obj_dict)
test_util.test_pass('Libvirt restart with snapshot passed')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:33,代码来源:test_vm_snapshot_libvirt_restart.py
示例4: compare
def compare(ps, vm, dvol, backup):
test_util.test_logger("-----------------compare----------------")
# find vm_host
host = test_lib.lib_find_host_by_vm(vm.vm)
bs = res_ops.query_resource(res_ops.BACKUP_STORAGE)[0]
cond = res_ops.gen_query_conditions("uuid", '=', dvol.volume.uuid)
current_volume = res_ops.query_resource(res_ops.VOLUME, cond)[0]
vol_path = current_volume.installPath
if ps.type == "SharedBlock":
vol_path = "/dev/" + current_volume.installPath.split("/")[2] + "/" + current_volume.installPath.split("/")[3]
test_util.test_logger(vol_path)
name = backup.backupStorageRefs[0].installPath.split("/")[2]
id = backup.backupStorageRefs[0].installPath.split("/")[3]
# compare vm_root_volume & image
cmd = "mkdir /root/%s;" \
"/usr/local/zstack/imagestore/bin/zstcli " \
"-rootca=/var/lib/zstack/imagestorebackupstorage/package/certs/ca.pem " \
"-url=%s:8000 " \
"pull -installpath /root/%s/old.qcow2 %s:%s;" \
"qemu-img compare %s /root/%s/old.qcow2;" % (id, bs.hostname, id, name, id, vol_path, id)
# clean image
result = test_lib.lib_execute_ssh_cmd(host.managementIp, "root", "password", cmd, timeout=300)
if result != "Images are identical.\n":
test_util.test_fail("compare vm_root_volume & image created by backup")
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:27,代码来源:test_backup_replay_volume.py
示例5: test
def test():
global vm
global host_uuid
global host_ip
global max_attempts
global storagechecker_timeout
allow_ps_list = [inventory.NFS_PRIMARY_STORAGE_TYPE]
test_lib.skip_test_when_ps_type_not_in_list(allow_ps_list)
test_stub.skip_if_not_storage_network_separate(test_lib.all_scenario_config)
if test_lib.lib_get_ha_enable() != 'true':
test_util.test_skip("vm ha not enabled. Skip test")
vm_creation_option = test_util.VmOption()
image_name = os.environ.get('imageName_net')
image_uuid = test_lib.lib_get_image_by_name(image_name).uuid
l3_name = os.environ.get('l3VlanNetworkName1')
l3_net_uuid = test_lib.lib_get_l3_by_name(l3_name).uuid
test_lib.clean_up_all_vr()
mn_ip = res_ops.query_resource(res_ops.MANAGEMENT_NODE)[0].hostName
conditions = res_ops.gen_query_conditions('type', '=', 'UserVm')
instance_offering_uuid = res_ops.query_resource(res_ops.INSTANCE_OFFERING, conditions)[0].uuid
conditions = res_ops.gen_query_conditions('state', '=', 'Enabled')
conditions = res_ops.gen_query_conditions('status', '=', 'Connected', conditions)
conditions = res_ops.gen_query_conditions('managementIp', '!=', mn_ip, conditions)
host_uuid = res_ops.query_resource(res_ops.HOST, conditions)[0].uuid
vm_creation_option.set_host_uuid(host_uuid)
vm_creation_option.set_l3_uuids([l3_net_uuid])
vm_creation_option.set_image_uuid(image_uuid)
vm_creation_option.set_instance_offering_uuid(instance_offering_uuid)
vm_creation_option.set_name('multihost_basic_vm')
vm = test_vm_header.ZstackTestVm()
vm.set_creation_option(vm_creation_option)
vm.create()
vr_hosts = test_stub.get_host_has_vr()
mn_hosts = test_stub.get_host_has_mn()
nfs_hosts = test_stub.get_host_has_nfs()
if not test_stub.ensure_vm_not_on(vm.get_vm().uuid, vm.get_vm().hostUuid, vr_hosts+mn_hosts+nfs_hosts):
test_util.test_fail("Not find out a suitable host")
#vm.check()
host_ip = test_lib.lib_find_host_by_vm(vm.get_vm()).managementIp
test_util.test_logger("host %s is disconnecting" %(host_ip))
ha_ops.set_vm_instance_ha_level(vm.get_vm().uuid, "NeverStop")
#test_stub.down_host_network(host_ip, test_lib.all_scenario_config)
host_username = os.environ.get('hostUsername')
host_password = os.environ.get('hostPassword')
t = test_stub.async_exec_ifconfig_nic_down_up(1200, host_ip, host_username, host_password, "zsn1")
vm.destroy()
test_util.test_pass('Test VM ha change to running within 300s Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:58,代码来源:test_stor_sep_nfs_vm_ha_nfs_dis_destroy_vm.py
示例6: test
def test():
ps_env = test_stub.PSEnvChecker()
mn_ip = res_ops.query_resource(res_ops.MANAGEMENT_NODE)[0].hostName
conditions = res_ops.gen_query_conditions('state', '=', 'Enabled')
conditions = res_ops.gen_query_conditions('status', '=', 'Connected', conditions)
conditions = res_ops.gen_query_conditions('managementIp', '!=', mn_ip, conditions)
host = random.choice(res_ops.query_resource(res_ops.HOST, conditions))
record['host_ip'] = host.managementIp
record['host_port'] = host.sshPort
record['host_uuid'] = host.uuid
test_util.test_dsc("Create {0} vm each with {1} datavolume".format(VM_COUNT, DATA_VOLUME_NUMBER))
if ps_env.is_sb_ceph_env:
ps_list = res_ops.query_resource(res_ops.PRIMARY_STORAGE)
ps = random.choice(ps_list)
ps_uuid = ps.uuid
vm_list = test_stub.create_multi_vms(name_prefix='test-', count=VM_COUNT,
data_volume_number=DATA_VOLUME_NUMBER, host_uuid=host.uuid, ps_uuid=ps_uuid, timeout=1800000, bs_type="ImageStoreBackupStorage" if ps.type == "SharedBlock" else "Ceph")
else:
vm_list = test_stub.create_multi_vms(name_prefix='test-', count=VM_COUNT,
data_volume_number=DATA_VOLUME_NUMBER, host_uuid=host.uuid, timeout=1800000)
for vm in vm_list:
test_obj_dict.add_vm(vm)
test_util.test_logger("host %s is disconnecting" % host.managementIp)
for vm in vm_list:
ha_ops.set_vm_instance_ha_level(vm.get_vm().uuid, "NeverStop")
#l2_network_interface = os.environ.get('l2ManagementNetworkInterface')
l2interface = test_lib.lib_get_l2s_by_vm(vm.get_vm())[0].physicalInterface
l2_network_interface = test_stub.get_host_l2_nic_name(l2interface)
cmd = "ifdown %s && sleep 180 && ifup %s" % (l2_network_interface, l2_network_interface)
host_username = os.environ.get('hostUsername')
host_password = os.environ.get('hostPassword')
rsp = test_lib.lib_execute_ssh_cmd(host.managementIp, host_username, host_password, cmd, 240)
if not rsp:
test_util.test_logger("host is expected to shutdown after its network down for a while")
test_util.test_logger("wait for 180 seconds")
time.sleep(180)
for vm in vm_list:
vm.update()
if test_lib.lib_find_host_by_vm(vm.get_vm()).managementIp == host.managementIp:
test_util.test_fail("VM is expected to start running on another host")
vm.set_state(vm_header.RUNNING)
vm.check()
cmd = 'PORT=%s bash -ex %s %s' % (host.sshPort, os.environ.get('hostRecoverScript'),host.managementIp)
test_util.test_logger(cmd)
os.system(cmd)
host_ops.reconnect_host(host.uuid)
test_util.test_pass('Multi PrimaryStorage Vm HA Test Pass')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:56,代码来源:test_vm_ha_multi_ps.py
示例7: test
def test():
global vm
global host_uuid
global host_ip
global max_attempts
global storagechecker_timeout
must_ps_list = ['MiniStorage']
test_lib.skip_test_if_any_ps_not_deployed(must_ps_list)
test_stub.skip_if_not_storage_network_separate(test_lib.all_scenario_config)
if test_lib.lib_get_ha_enable() != 'true':
test_util.test_skip("vm ha not enabled. Skip test")
vm_creation_option = test_util.VmOption()
#image_name = os.environ.get('imageName_net')
image_uuid = test_lib.lib_get_image_by_name("ttylinux").uuid
l3_name = os.environ.get('l3VlanNetworkName1')
l3_net_uuid = test_lib.lib_get_l3_by_name(l3_name).uuid
test_lib.clean_up_all_vr()
mn_ip = res_ops.query_resource(res_ops.MANAGEMENT_NODE)[0].hostName
conditions = res_ops.gen_query_conditions('type', '=', 'UserVm')
instance_offering_uuid = res_ops.query_resource(res_ops.INSTANCE_OFFERING, conditions)[0].uuid
conditions = res_ops.gen_query_conditions('state', '=', 'Enabled')
conditions = res_ops.gen_query_conditions('status', '=', 'Connected', conditions)
#conditions = res_ops.gen_query_conditions('managementIp', '!=', mn_ip, conditions)
host_uuid = res_ops.query_resource(res_ops.HOST, conditions)[0].uuid
vm_creation_option.set_host_uuid(host_uuid)
vm_creation_option.set_l3_uuids([l3_net_uuid])
vm_creation_option.set_image_uuid(image_uuid)
vm_creation_option.set_instance_offering_uuid(instance_offering_uuid)
vm_creation_option.set_name('multihost_basic_vm')
vm = test_vm_header.ZstackTestVm()
vm.set_creation_option(vm_creation_option)
vm.create()
#vm.check()
host_ip = test_lib.lib_find_host_by_vm(vm.get_vm()).managementIp
test_util.test_logger("host %s is disconnecting" %(host_ip))
ha_ops.set_vm_instance_ha_level(vm.get_vm().uuid, "NeverStop")
test_stub.down_host_network(host_ip, test_lib.all_scenario_config, "managment_net")
if test_stub.check_vm_running_on_host(vm.vm.uuid, host_ip):
test_util.test_fail("VM is expected to start running on another host")
vm.set_state(vm_header.RUNNING)
vm.check()
vm.destroy()
test_stub.up_host_network(host_ip, test_lib.all_scenario_config, "managment_net")
test_util.test_pass('Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:55,代码来源:test_minips_net_sep_vm_ha_mag_dis_vm_run.py
示例8: test
def test():
bs_cond = res_ops.gen_query_conditions("status", '=', "Connected")
bss = res_ops.query_resource_fields(res_ops.BACKUP_STORAGE, bs_cond, \
None, fields=['uuid'])
if not bss:
test_util.test_skip("not find available backup storage. Skip test")
image_option = test_util.ImageOption()
image_option.set_name('test_image_cache_cleanup')
image_option.set_format('qcow2')
image_option.set_mediaType('RootVolumeTemplate')
image_option.set_url(os.environ.get('imageUrl_s'))
image_option.set_backup_storage_uuid_list([bss[0].uuid])
new_image = zstack_image_header.ZstackTestImage()
new_image.set_creation_option(image_option)
new_image.add_root_volume_template()
l3_name = os.environ.get('l3VlanNetworkName1')
l3_net_uuid = test_lib.lib_get_l3_by_name(l3_name).uuid
vm = test_stub.create_vm([l3_net_uuid], new_image.image.uuid, 'imagecache_vm', \
default_l3_uuid = l3_net_uuid)
test_obj_dict.add_vm(vm)
vm.check()
host = test_lib.lib_find_host_by_vm(vm.get_vm())
ps = test_lib.lib_get_primary_storage_by_vm(vm.get_vm())
vm.destroy()
if test_lib.lib_get_vm_delete_policy() != 'Direct':
vm.expunge()
new_image.delete()
if test_lib.lib_get_image_delete_policy() != 'Direct':
new_image.expunge()
if ps.type == 'SharedMountPoint':
test_util.test_skip('CleanUpImageCacheOnPrimaryStorage not supported on SMP storage, skip test.')
elif ps.type == inventory.CEPH_PRIMARY_STORAGE_TYPE:
test_util.test_skip('ceph is not directly using image cache, skip test.')
ps_ops.cleanup_imagecache_on_primary_storage(ps.uuid)
if ps.type == inventory.LOCAL_STORAGE_TYPE:
image_cache_path = "%s/imagecache/template/%s/%s.qcow2" % (ps.mountPath, new_image.image.uuid, new_image.image.uuid)
if test_lib.lib_check_file_exist(host, image_cache_path):
test_util.test_fail('image cache is expected to be deleted')
elif ps.type == inventory.NFS_PRIMARY_STORAGE_TYPE:
image_cache_path = "%s/imagecache/template/%s/%s.qcow2" % (ps.mountPath, new_image.image.uuid, new_image.image.uuid)
if test_lib.lib_check_file_exist(host, image_cache_path):
test_util.test_fail('image cache is expected to be deleted')
# elif ps.type == inventory.CEPH_PRIMARY_STORAGE_TYPE:
# elif ps.type == 'SharedMountPoint':
test_util.test_pass('imagecache cleanup Pass.')
开发者ID:TinaL3,项目名称:zstack-woodpecker,代码行数:54,代码来源:test_imagecache_cleanup.py
示例9: test
def test():
test_util.test_dsc('Create test vm as utility vm')
vm1 = test_stub.create_vlan_vm()
test_obj_dict.add_vm(vm1)
#this test will rely on live snapshot capability supporting
host_inv = test_lib.lib_find_host_by_vm(vm1.get_vm())
if not test_lib.lib_check_live_snapshot_cap(host_inv):
vm1.destroy()
test_obj_dict.rm_vm(vm1)
test_util.test_skip('Skip test, since [host:] %s does not support live snapshot.' % host_inv.uuid)
live_snapshot = test_lib.lib_check_live_snapshot_cap(host_inv)
if not live_snapshot:
vm1.destroy()
test_obj_dict.rm_vm(vm1)
test_util.test_skip("Skip test, since [host:] %s doesn't support live snapshot " % host_inv.uuid)
vm = test_stub.create_vlan_vm()
test_obj_dict.add_vm(vm)
test_util.test_dsc('Create volume for snapshot testing')
disk_offering = test_lib.lib_get_disk_offering_by_name(os.environ.get('smallDiskOfferingName'))
volume_creation_option = test_util.VolumeOption()
volume_creation_option.set_name('volume for snapshot testing')
volume_creation_option.set_disk_offering_uuid(disk_offering.uuid)
volume1 = test_stub.create_volume(volume_creation_option)
test_obj_dict.add_volume(volume1)
volume2 = test_stub.create_volume(volume_creation_option)
test_obj_dict.add_volume(volume2)
#make sure utility vm is starting and running
vm.check()
volume1.attach(vm1)
volume2.attach(vm1)
test_util.test_dsc('create snapshot for root')
vm_root_volume_inv = test_lib.lib_get_root_volume(vm1.get_vm())
snapshots = test_obj_dict.get_volume_snapshot(vm_root_volume_inv.uuid)
snapshots.set_utility_vm(vm)
snapshots.create_snapshot('create_root_volume_snapshot1')
volume1.check()
volume2.check()
snapshots2 = test_obj_dict.get_volume_snapshot(volume1.get_volume().uuid)
snapshots2.set_utility_vm(vm)
snapshots2.create_snapshot('create_data_volume_snapshot1')
snapshots.check()
volume1.check()
volume2.check()
test_lib.lib_robot_cleanup(test_obj_dict)
test_util.test_pass('Create root Snapshot and test data volume status test Success')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:53,代码来源:test_crt_root_sp_in_live_vm_and_data.py
示例10: check_vm_internal_cpu_mem
def check_vm_internal_cpu_mem(vm):
managerip = test_lib.lib_find_host_by_vm(vm.get_vm()).managementIp
vm_ip = vm.get_vm().vmNics[0].ip
get_cpu_cmd = "cat /proc/cpuinfo| grep 'processor'| wc -l"
get_mem_cmd = "free -m |grep Mem"
res = test_lib.lib_ssh_vm_cmd_by_agent(managerip, vm_ip, 'root',
'password', get_cpu_cmd)
vm_cpu = int(res.result.strip())
res = test_lib.lib_ssh_vm_cmd_by_agent(managerip, vm_ip, 'root',
'password', get_mem_cmd)
vm_mem = int(res.result.split()[1])
return vm_cpu, vm_mem
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:12,代码来源:Hot_plug_common.py
示例11: migrate_vm_to_differnt_cluster
def migrate_vm_to_differnt_cluster(vm):
test_util.test_dsc("migrate vm to different cluster")
if not test_lib.lib_check_vm_live_migration_cap(vm.vm):
test_util.test_skip('skip migrate if live migrate not supported')
current_host = test_lib.lib_find_host_by_vm(vm.vm)
conditions = res_ops.gen_query_conditions('clusterUuid', '!=', current_host.clusterUuid)
candidate_hosts = res_ops.query_resource(res_ops.HOST, conditions, None)
if len(candidate_hosts) == 0:
test_util.test_fail('Not find available Hosts to do migration')
vm.migrate(candidate_hosts[0].uuid)
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:12,代码来源:test_stub.py
示例12: test
def test():
test_util.test_dsc('Create test vm as utility vm')
vm = test_stub.create_vlan_vm()
test_obj_dict.add_vm(vm)
#this test will rely on live snapshot capability supporting
host_inv = test_lib.lib_find_host_by_vm(vm.get_vm())
if not test_lib.lib_check_live_snapshot_cap(host_inv):
vm.destroy()
test_obj_dict.rm_vm(vm)
test_util.test_skip('Skip test, since [host:] %s does not support live snapshot.')
libvirt_ver = test_lib.lib_get_host_libvirt_tag(host_inv)
if not libvirt_ver or LooseVersion(libvirt_ver) < LooseVersion('1.2.7'):
vm.destroy()
test_obj_dict.rm_vm(vm)
test_util.test_skip("Skip test, since [host:] %s libvert version: %s is lower than 1.2.7, which doesn't support live merge, when doing snapshot deleting." % (host_inv.uuid, libvirt_ver))
vm1 = test_stub.create_vlan_vm()
test_obj_dict.add_vm(vm1)
test_util.test_dsc('Create volume for snapshot testing')
disk_offering = test_lib.lib_get_disk_offering_by_name(os.environ.get('smallDiskOfferingName'))
volume_creation_option = test_util.VolumeOption()
volume_creation_option.set_name('volume for snapshot testing')
volume_creation_option.set_disk_offering_uuid(disk_offering.uuid)
volume = test_stub.create_volume(volume_creation_option)
test_obj_dict.add_volume(volume)
#make sure utility vm is starting and running
vm.check()
test_util.test_dsc('create snapshot and check')
#snapshots = zstack_sp_header.ZstackVolumeSnapshot()
#snapshots.set_target_volume(volume)
#test_obj_dict.add_volume_snapshot(snapshots)
snapshots = test_obj_dict.get_volume_snapshot(volume.get_volume().uuid)
snapshots.set_utility_vm(vm)
snapshots.create_snapshot('create_snapshot1')
snapshots.create_snapshot('create_snapshot2')
snapshots.create_snapshot('create_snapshot3')
volume.attach(vm1)
snapshots.delete()
test_obj_dict.rm_volume_snapshot(snapshots)
volume.check()
vm.destroy()
test_obj_dict.rm_vm(vm)
test_lib.lib_robot_cleanup(test_obj_dict)
test_util.test_pass('Delete Snapshot on running vm test Success')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:50,代码来源:test_delete_snapshot_on_running_vm.py
示例13: migrate_vm_to_random_host
def migrate_vm_to_random_host(vm, timeout = None):
test_util.test_dsc("migrate vm to random host")
target_host = test_lib.lib_find_random_host(vm.vm)
current_host = test_lib.lib_find_host_by_vm(vm.vm)
vm.migrate(target_host.uuid, timeout)
new_host = test_lib.lib_get_vm_host(vm.vm)
if not new_host:
test_util.test_fail('Not find available Hosts to do migration')
if new_host.uuid != target_host.uuid:
test_util.test_fail('[vm:] did not migrate from [host:] %s to target [host:] %s, but to [host:] %s' % (vm.vm.uuid, current_host.uuid, target_host.uuid, new_host.uuid))
else:
test_util.test_logger('[vm:] %s has been migrated from [host:] %s to [host:] %s' % (vm.vm.uuid, current_host.uuid, target_host.uuid))
开发者ID:TinaL3,项目名称:zstack-woodpecker,代码行数:14,代码来源:test_stub.py
示例14: compare
def compare(ps, vm, backup):
test_util.test_logger("-----------------compare----------------")
# find vm_host
host = test_lib.lib_find_host_by_vm(vm.vm)
cond = res_ops.gen_query_conditions("type", '=', "ImageStoreBackupStorage")
bs = res_ops.query_resource(res_ops.BACKUP_STORAGE, cond)[0]
root_volume = test_lib.lib_get_root_volume(vm.get_vm())
for i in test_lib.lib_get_data_volumes(vm.get_vm()):
if i.name == "DATA-for-test_vm":
data_volume = i
vm_path = root_volume.installPath
data_path = data_volume.installPath
if ps.type == "SharedBlock":
vm_path = "/dev/" + root_volume.installPath.split("/")[2] + "/" + root_volume.installPath.split("/")[3]
data_path = "/dev/" + data_volume.installPath.split("/")[2] + "/" + data_volume.installPath.split("/")[3]
test_util.test_logger(vm_path)
test_util.test_logger(data_path)
for i in backup:
if i.type == "Root":
name = i.backupStorageRefs[0].installPath.split("/")[2]
id = i.backupStorageRefs[0].installPath.split("/")[3]
if i.type == "Data" and "DATA-for-test_vm" in i.metadata:
name1 = i.backupStorageRefs[0].installPath.split("/")[2]
id1 = i.backupStorageRefs[0].installPath.split("/")[3]
# compare vm_root_volume & image
cmd = "mkdir /root/%s;" \
"/usr/local/zstack/imagestore/bin/zstcli " \
"-rootca=/var/lib/zstack/imagestorebackupstorage/package/certs/ca.pem " \
"-url=%s:8000 " \
"pull -installpath /root/%s/old.qcow2 %s:%s;" \
"qemu-img compare %s /root/%s/old.qcow2;" % (id, bs.hostname, id, name, id, vm_path, id)
cmd1 = "mkdir /root/%s;" \
"/usr/local/zstack/imagestore/bin/zstcli " \
"-rootca=/var/lib/zstack/imagestorebackupstorage/package/certs/ca.pem " \
"-url=%s:8000 " \
"pull -installpath /root/%s/old.qcow2 %s:%s;" \
"qemu-img compare %s /root/%s/old.qcow2;" % (id1, bs.hostname, id1, name1, id1, data_path, id1)
# clean image
result = test_lib.lib_execute_ssh_cmd(host.managementIp, "root", "password", cmd, timeout=300)
if result != "Images are identical.\n":
test_util.test_fail("compare vm_root_volume & image created by backup")
result = test_lib.lib_execute_ssh_cmd(host.managementIp, "root", "password", cmd1, timeout=300)
if result != "Images are identical.\n":
test_util.test_fail("compare vm_data_volume & image created by backup")
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:48,代码来源:test_del_backup_full.py
示例15: check_vm_internal_cpu_mem
def check_vm_internal_cpu_mem(vm, shutdown, window):
if window:
return check_window_vm_internal_cpu_mem(vm)
managerip = test_lib.lib_find_host_by_vm(vm.get_vm()).managementIp
vm_ip = vm.get_vm().vmNics[0].ip
get_cpu_cmd = "cat /proc/cpuinfo| grep 'processor'| wc -l"
if not shutdown:
get_mem_cmd = "free -m |grep Mem"
else:
get_mem_cmd = "dmidecode -t 17 | grep 'Size:'"
res = test_lib.lib_ssh_vm_cmd_by_agent(managerip, vm_ip, 'root',
'password', get_cpu_cmd)
vm_cpu = int(res.result.strip())
res = test_lib.lib_ssh_vm_cmd_by_agent(managerip, vm_ip, 'root',
'password', get_mem_cmd)
vm_mem = int(res.result.split()[1])
return vm_cpu, vm_mem
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:17,代码来源:test_stub.py
示例16: test
def test():
mn_ip = res_ops.query_resource(res_ops.MANAGEMENT_NODE)[0].hostName
conditions = res_ops.gen_query_conditions('state', '=', 'Enabled')
conditions = res_ops.gen_query_conditions('status', '=', 'Connected', conditions)
conditions = res_ops.gen_query_conditions('managementIp', '!=', mn_ip, conditions)
host = random.choice(res_ops.query_resource(res_ops.HOST, conditions))
record['host_ip'] = host.managementIp
record['host_port'] = host.sshPort
record['host_uuid'] = host.uuid
vm_list=list(test_stub.generate_local_shared_test_vms(test_obj_dict, vm_ha=True, host_uuid=host.uuid))
(vm_root_local, vm_root_local_data_local,
vm_root_local_data_shared, vm_root_local_data_mixed,
vm_root_shared, vm_root_shared_data_local,
vm_root_shared_data_shared, vm_root_shared_data_mixed) = vm_list
l2interface = test_lib.lib_get_l2s_by_vm(vm_list[0].get_vm())[0].physicalInterface
l2_network_interface = test_stub.get_host_l2_nic_name(l2interface)
cmd = "ifdown %s && sleep 180 && ifup %s" % (l2_network_interface, l2_network_interface)
host_username = os.environ.get('hostUsername')
host_password = os.environ.get('hostPassword')
rsp = test_lib.lib_execute_ssh_cmd(host.managementIp, host_username, host_password, cmd, 240)
if not rsp:
test_util.test_logger("host is expected to shutdown after its network down for a while")
test_util.test_logger("wait for 180 seconds")
time.sleep(180)
for vm in vm_list:
vm.update()
for vm in (vm_root_shared,vm_root_shared_data_shared):
if test_lib.lib_find_host_by_vm(vm.get_vm()).managementIp == host.managementIp:
test_util.test_fail("VM is expected to start running on another host")
vm.set_state(vm_header.RUNNING)
vm.check()
for vm in (vm_root_local, vm_root_local_data_local, vm_root_local_data_shared,
vm_root_local_data_mixed, vm_root_shared_data_local, vm_root_shared_data_mixed):
assert vm.get_vm().state == inventory.STOPPED
cmd = 'PORT=%s bash -ex %s %s' % (host.sshPort, os.environ.get('hostRecoverScript'),host.managementIp)
test_util.test_logger(cmd)
os.system(cmd)
host_ops.reconnect_host(host.uuid)
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:45,代码来源:test_local_shared_vm_ha.py
示例17: test
def test():
global vm
if test_lib.lib_get_ha_enable() != 'true':
test_util.test_skip("vm ha not enabled. Skip test")
vm = test_stub.create_vm()
vm.check()
host_uuid = test_lib.lib_find_host_by_vm(vm.get_vm()).uuid
host_ops.change_host_state(host_uuid, "disable")
time.sleep(10)
ha_ops.set_vm_instance_ha_level(vm.get_vm().uuid, "NeverStop")
vm.stop()
time.sleep(60)
vm.check()
vm.destroy()
vm.check()
host_ops.change_host_state(host_uuid, "enable")
test_util.test_pass('VM ha never stop Test Success')
开发者ID:dennis-sun-chao,项目名称:zstack-woodpecker,代码行数:18,代码来源:test_vm_ha_never_stop_disable_host.py
示例18: print_iptables
def print_iptables(vm):
host = test_lib.lib_find_host_by_vm(vm)
try:
if not host.managementIp:
test_util.test_logger('did not find host for vm: %s ' % vm.uuid)
return
except:
test_util.test_logger('did not find host for vm: %s ' % vm.uuid)
return
host_cmd = host_plugin.HostShellCmd()
host_cmd.command = 'iptables-save'
rspstr = http.json_dump_post(testagent.build_http_path(host.managementIp, host_plugin.HOST_SHELL_CMD_PATH), host_cmd)
rsp = jsonobject.loads(rspstr)
if rsp.return_code != 0:
test_util.test_logger('can not dump iptables on host: %s; reason: %s' % (host.managementIp, rsp.stderr))
else:
test_util.test_logger('iptables-save result on %s: %s' % (host.managementIp, rsp.stdout))
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:18,代码来源:zstack_kvm_security_group_checker.py
示例19: test
def test():
global vm
vm = test_stub.create_spice_vm()
vm.check()
vm_inv=vm.get_vm()
vm_uuid=vm_inv.uuid
vm_state = vm.get_vm().state
host_inv = test_lib.lib_find_host_by_vm(vm_inv)
host_ip= host_inv.managementIp
host_username=host_inv.username
host_password=host_inv.password
test_util.test_dsc('check the vm console protocol is spice via read xml')
cmd='''virsh dumpxml %s |grep spice | grep -v spicevmc | grep -v com.redhat.spice.0 |awk '{print $2}' |awk -F= '{print $2}' '''% (vm_uuid)
result=test_lib.lib_execute_ssh_cmd(host_ip, host_username, host_password, cmd, 180)
if eval(result) == "spice" :
print "Vm console protocol is spice, test success"
else :
print "Vm console protocol is %s " % eval(result)
print "test is fail"
test_util.test_fail('Create VM with spice Test Failed')
test_util.test_dsc('check the spiceStreamingMode is off via read xml')
cmd='''virsh dumpxml %s |grep streaming|awk -F/ '{print $1}' |awk -F= '{print $2}' '''% (vm_uuid)
result=test_lib.lib_execute_ssh_cmd(host_ip, host_username, host_password, cmd, 180)
if eval(result) == "off" :
print "Vm spiceStreamingMode is off, test success"
else :
print "Vm spiceStreamingModeis %s " % eval(result)
print "test is fail"
test_util.test_fail('Create VM with spice and spiceStreamingMode is off Test Failed')
test_util.test_dsc('check the vm console protocol is spice via GetVmConsoleAddressAction')
if test_stub.get_vm_console_protocol(vm.vm.uuid).protocol == "spice" :
print "Vm console protocol is spice, test success"
else :
print "Vm console protocol is %s " % test_stub.get_vm_console_protocol(vm.vm.uuid).protocol
print "test is fail"
test_util.test_fail('Create VM with spice Test Failed')
vm.destroy()
vm.check()
test_util.test_pass('Create VM with spice Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:44,代码来源:test_create_spice_vm.py
示例20: test
def test():
vm = test_stub.create_vr_vm('vm_imagecache', 'imageName_net', 'l3VlanNetwork3')
test_obj_dict.add_vm(vm)
vm.check()
host = test_lib.lib_find_host_by_vm(vm.get_vm())
ps = test_lib.lib_get_primary_storage_by_vm(vm.get_vm())
image = test_lib.lib_get_image_by_name(os.environ.get('imageName_net'))
if ps.type == inventory.LOCAL_STORAGE_TYPE or ps.type == inventory.NFS_PRIMARY_STORAGE_TYPE or ps.type == 'SharedMountPoint':
image_cache_path = "%s/imagecache/template/%s/" % (ps.mountPath, image.uuid)
imagecache_file_size = int(test_lib.lib_get_file_size(host, image_cache_path))
image_actual_size = int(image.actualSize)
if imagecache_file_size < image.actualSize*0.99 or imagecache_file_size > image.actualSize*1.01:
test_util.test_fail('image cache size (%s) not match image actual size(%s)' % (imagecache_file_size, image_actual_size))
else:
test_util.test_skip("Skip test when primary storage is not local or NFS")
# elif ps.type == inventory.CEPH_PRIMARY_STORAGE_TYPE:
test_lib.lib_error_cleanup(test_obj_dict)
test_util.test_pass('imagecache cleanup Pass.')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:20,代码来源:test_imagecache_size.py
注:本文中的zstackwoodpecker.test_lib.lib_find_host_by_vm函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论