本文整理汇总了Python中zstackwoodpecker.test_lib.lib_set_delete_policy函数的典型用法代码示例。如果您正苦于以下问题:Python lib_set_delete_policy函数的具体用法?Python lib_set_delete_policy怎么用?Python lib_set_delete_policy使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了lib_set_delete_policy函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test
def test():
global test_obj_dict
global delete_policy
delete_policy = test_lib.lib_set_delete_policy('vm', 'Delay')
l3_name = os.environ.get('l3VlanNetworkName1')
#l3_net_uuid = test_lib.lib_get_l3_by_name(l3_name).uuid
image_name = os.environ.get('imageName_net')
vm = test_stub.create_vm(l3_name=l3_name, image_name=image_name, vm_name='basic-test-vm')
test_obj_dict.add_vm(vm)
vm.check()
vm_nic_uuid = vm.vm.vmNics[0].uuid
net_ops.detach_l3(vm_nic_uuid)
vm.destroy()
vm.set_state(vm_header.DESTROYED)
vm.check()
vm.recover()
vm.set_state(vm_header.STOPPED)
vm.check()
test_lib.lib_set_delete_policy('vm', delete_policy)
try:
vm.start()
except Exception, e:
#if "please attach a nic and try again" in str(e):
test_util.test_pass('test detach l3 check vm passed.')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:28,代码来源:test_detach_vm_net_check_vm_ops.py
示例2: test
def test():
#This vlan creation is not a must, if testing is under nested virt env. But it is required on physical host without enough physcial network devices and your test execution machine is not the same one as Host machine.
#linux.create_vlan_eth("eth0", 10, "10.0.0.200", "255.255.255.0")
#linux.create_vlan_eth("eth0", 11, "10.0.1.200", "255.255.255.0")
#no matter if current host is a ZStest host, we need to create 2 vlan devs for future testing connection for novlan test cases.
linux.create_vlan_eth("eth0", 10)
linux.create_vlan_eth("eth0", 11)
#If test execution machine is not the same one as Host machine, deploy work is needed to separated to 2 steps(deploy_test_agent, execute_plan_without_deploy_test_agent). And it can not directly call SetupAction.run()
test_lib.setup_plan.deploy_test_agent()
cmd = host_plugin.CreateVlanDeviceCmd()
hosts = test_lib.lib_get_all_hosts_from_plan()
if type(hosts) != type([]):
hosts = [hosts]
for host in hosts:
cmd.ethname = 'eth0'
cmd.vlan = 10
http.json_dump_post(testagent.build_http_path(host.managementIp_, host_plugin.CREATE_VLAN_DEVICE_PATH), cmd)
cmd.vlan = 11
http.json_dump_post(testagent.build_http_path(host.managementIp_, host_plugin.CREATE_VLAN_DEVICE_PATH), cmd)
test_lib.setup_plan.execute_plan_without_deploy_test_agent()
deploy_operations.deploy_initial_database(test_lib.deploy_config)
if os.path.exists(EXTRA_SUITE_SETUP_SCRIPT):
os.system("bash %s" % EXTRA_SUITE_SETUP_SCRIPT)
delete_policy = test_lib.lib_set_delete_policy('vm', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('volume', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('image', 'Direct')
if test_lib.lib_get_ha_selffencer_maxattempts() != None:
test_lib.lib_set_ha_selffencer_maxattempts('60')
test_lib.lib_set_ha_selffencer_storagechecker_timeout('60')
test_lib.lib_set_primary_storage_imagecache_gc_interval(1)
test_util.test_pass('Suite Setup Success')
开发者ID:TinaL3,项目名称:zstack-woodpecker,代码行数:35,代码来源:suite_setup.py
示例3: test
def test():
if test_lib.scenario_config != None and test_lib.scenario_file != None and not os.path.exists(test_lib.scenario_file):
scenario_operations.deploy_scenario(test_lib.all_scenario_config, test_lib.scenario_file, test_lib.deploy_config)
test_util.test_skip('Suite Setup Success')
if test_lib.scenario_config != None and test_lib.scenario_destroy != None:
scenario_operations.destroy_scenario(test_lib.all_scenario_config, test_lib.scenario_destroy)
setup = setup_actions.SetupAction()
setup.plan = test_lib.all_config
setup.run()
if test_lib.scenario_config != None and test_lib.scenario_file != None and os.path.exists(test_lib.scenario_file):
mn_ips = deploy_operations.get_nodes_from_scenario_file(test_lib.all_scenario_config, test_lib.scenario_file, test_lib.deploy_config)
if os.path.exists(EXTRA_SUITE_SETUP_SCRIPT):
os.system("bash %s '%s'" % (EXTRA_SUITE_SETUP_SCRIPT, mn_ips))
elif os.path.exists(EXTRA_SUITE_SETUP_SCRIPT):
os.system("bash %s" % (EXTRA_SUITE_SETUP_SCRIPT))
deploy_operations.deploy_initial_database(test_lib.deploy_config, test_lib.all_scenario_config, test_lib.scenario_file)
delete_policy = test_lib.lib_set_delete_policy('vm', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('volume', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('image', 'Direct')
if test_lib.lib_get_ha_selffencer_maxattempts() != None:
test_lib.lib_set_ha_selffencer_maxattempts('60')
test_lib.lib_set_ha_selffencer_storagechecker_timeout('60')
test_lib.lib_set_primary_storage_imagecache_gc_interval(1)
test_util.test_pass('Suite Setup Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:27,代码来源:suite_setup.py
示例4: delete_all_volumes
def delete_all_volumes(thread_threshold = 1000):
session_uuid = acc_ops.login_as_admin()
session_to = con_ops.change_global_config('identity', 'session.timeout', '720000')
session_mc = con_ops.change_global_config('identity', 'session.maxConcurrent', '10000')
delete_policy = test_lib.lib_set_delete_policy('volume', 'Direct')
expunge_time = test_lib.lib_set_expunge_time('volume', 1)
cond = res_ops.gen_query_conditions('status', '!=', 'Deleted')
num = res_ops.query_resource_count(res_ops.VOLUME, cond)
if num <= thread_threshold:
volumes = res_ops.query_resource(res_ops.VOLUME, cond)
do_delete_volumes(volumes, thread_threshold)
else:
start = 0
limit = thread_threshold - 1
curr_num = start
volumes = []
while curr_num < num:
volumes_temp = res_ops.query_resource_fields(res_ops.VOLUME, \
cond, None, ['uuid'], start, limit)
volumes.extend(volumes_temp)
curr_num += limit
start += limit
do_delete_volumes(volumes, thread_threshold)
test_lib.lib_set_delete_policy('volume', delete_policy)
test_lib.lib_set_expunge_time('volume', expunge_time)
test_util.test_logger('Volumes destroy Success. Destroy %d Volumes.' % num)
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:28,代码来源:clean_util.py
示例5: delete_all_volumes
def delete_all_volumes(thread_threshold=1000):
session_uuid = acc_ops.login_as_admin()
session_to = con_ops.change_global_config("identity", "session.timeout", "720000")
session_mc = con_ops.change_global_config("identity", "session.maxConcurrent", "10000")
delete_policy = test_lib.lib_set_delete_policy("volume", "Direct")
expunge_time = test_lib.lib_set_expunge_time("volume", 1)
cond = res_ops.gen_query_conditions("status", "!=", "Deleted")
num = res_ops.query_resource_count(res_ops.VOLUME, cond)
if num <= thread_threshold:
volumes = res_ops.query_resource(res_ops.VOLUME, cond)
do_delete_volumes(volumes, thread_threshold)
else:
start = 0
limit = thread_threshold - 1
curr_num = start
volumes = []
while curr_num < num:
volumes_temp = res_ops.query_resource_fields(res_ops.VOLUME, cond, None, ["uuid"], start, limit)
volumes.extend(volumes_temp)
curr_num += limit
start += limit
do_delete_volumes(volumes, thread_threshold)
test_lib.lib_set_delete_policy("volume", delete_policy)
test_lib.lib_set_expunge_time("volume", expunge_time)
test_util.test_logger("Volumes destroy Success. Destroy %d Volumes." % num)
开发者ID:lstfyt,项目名称:zstack-woodpecker,代码行数:27,代码来源:clean_util.py
示例6: test
def test():
global delete_policy
delete_policy = test_lib.lib_set_delete_policy('vm', 'Delay')
vm = test_stub.create_vm(vm_name = 'basic-test-vm')
test_obj_dict.add_vm(vm)
time.sleep(1)
vm.destroy()
vm.expunge()
test_lib.lib_set_delete_policy('vm', delete_policy)
test_util.test_pass('Expunge VM Test Success')
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:10,代码来源:test_expunge_vm_chg_del_policy.py
示例7: test
def test():
setup = setup_actions.SetupAction()
setup.plan = test_lib.all_config
setup.run()
if os.path.exists(EXTRA_SUITE_SETUP_SCRIPT):
os.system("bash %s" % EXTRA_SUITE_SETUP_SCRIPT)
deploy_operations.deploy_initial_database(test_lib.deploy_config)
delete_policy = test_lib.lib_set_delete_policy('vm', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('volume', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('image', 'Direct')
test_util.test_pass('Suite Setup Success')
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:12,代码来源:suite_setup.py
示例8: test
def test():
global delete_policy
delete_policy = test_lib.lib_set_delete_policy('image', 'Direct')
os.system('dd if=/dev/zero of=%s bs=1M count=300' % test_image)
time.sleep(1)
bs = res_ops.query_resource(res_ops.BACKUP_STORAGE)[0]
img_ops.reconnect_sftp_backup_storage(bs.uuid)
time.sleep(1)
image_name = 'test-image-%s' % time.time()
image_option = test_util.ImageOption()
image_option.set_name(image_name)
image_option.set_description('test image which is upload from local filesystem.')
image_option.set_url('file://%s' % test_image)
bs = res_ops.query_resource(res_ops.BACKUP_STORAGE)[0]
avail_cap = bs.availableCapacity
total_cap = bs.totalCapacity
image_option.set_backup_storage_uuid_list([bs.uuid])
image_option.set_format('raw')
image_option.set_mediaType('RootVolumeTemplate')
image_inv = img_ops.add_root_volume_template(image_option)
time.sleep(10)
image = zstack_image_header.ZstackTestImage()
image.set_creation_option(image_option)
image.set_image(image_inv)
test_obj_dict.add_image(image)
bs = res_ops.query_resource(res_ops.BACKUP_STORAGE)[0]
avail_cap1 = bs.availableCapacity
total_cap1 = bs.totalCapacity
if total_cap != total_cap1:
test_util.test_fail('Backup storage total capacity is not same, after adding new image: %s. The previous value: %s, the current value: %s' % (image_inv.uuid, total_cap, total_cap1))
if avail_cap <= avail_cap1 :
test_util.test_fail('Backup storage available capacity is not correct, after adding new image: %s. The previous value: %s, the current value: %s' % (image_inv.uuid, avail_cap, avail_cap1))
image.delete()
bs = res_ops.query_resource(res_ops.BACKUP_STORAGE)[0]
avail_cap2 = bs.availableCapacity
total_cap2 = bs.totalCapacity
if total_cap != total_cap2 :
test_util.test_fail('Backup storage total capacity is not same, after deleting new image: %s. The previous value: %s, the current value: %s' % (image_inv.uuid, total_cap, total_cap2))
if avail_cap > (avail_cap2 + 1024000) or avail_cap < avail_cap2:
test_util.test_fail('Backup storage available capacity is not correct, after adding and deleting new image: %s. The previous value: %s, the current value: %s' % (image_inv.uuid, avail_cap, avail_cap2))
os.system('rm -f %s' % test_image)
test_lib.lib_set_delete_policy('image', delete_policy)
test_util.test_pass('Test backup storage capacity for adding/deleting image pass.')
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:52,代码来源:test_delete_image_bs_capacity.py
示例9: error_cleanup
def error_cleanup():
global ps_uuid
test_lib.lib_set_delete_policy('vm', 'Direct')
test_lib.lib_set_delete_policy('volume', 'Direct')
if ps_uuid != None:
ps_ops.change_primary_storage_state(ps_uuid, 'enable')
global host_uuid
if host_uuid != None:
host_ops.reconnect_host(host_uuid)
global vr_uuid
if vr_uuid != None:
vm_ops.reconnect_vr(vr_uuid)
global test_obj_dict
test_lib.lib_error_cleanup(test_obj_dict)
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:14,代码来源:test_disable_ps_expunge_vol.py
示例10: test
def test():
global test_obj_dict
global ps_uuid
global host_uuid
global vr_uuid
allow_ps_list = [inventory.CEPH_PRIMARY_STORAGE_TYPE, "SharedBlock"]
test_lib.skip_test_when_ps_type_not_in_list(allow_ps_list)
test_util.test_dsc('Create test vm and check')
bs_cond = res_ops.gen_query_conditions("status", '=', "Connected")
bss = res_ops.query_resource_fields(res_ops.BACKUP_STORAGE, bs_cond, \
None)
if not bss:
test_util.test_skip("not find available backup storage. Skip test")
test_lib.lib_set_delete_policy('vm', 'Delay')
test_lib.lib_set_delete_policy('volume', 'Delay')
l3_1_name = os.environ.get('l3VlanNetworkName1')
vm = test_stub.create_vlan_vm(l3_name=l3_1_name)
l3_1 = test_lib.lib_get_l3_by_name(l3_1_name)
vr = test_lib.lib_find_vr_by_l3_uuid(l3_1.uuid)[0]
vr_uuid = vr.uuid
host = test_lib.lib_get_vm_host(vm.get_vm())
host_uuid = host.uuid
test_obj_dict.add_vm(vm)
vm.check()
disk_offering = test_lib.lib_get_disk_offering_by_name(os.environ.get('rootDiskOfferingName'))
volume_creation_option = test_util.VolumeOption()
volume_creation_option.set_disk_offering_uuid(disk_offering.uuid)
volume_creation_option.set_system_tags(['ephemeral::shareable', 'capability::virtio-scsi'])
volume = test_stub.create_volume(volume_creation_option)
test_obj_dict.add_volume(volume)
volume.check()
volume.attach(vm)
#ps = test_lib.lib_get_primary_storage_by_vm(vm.get_vm())
#ps_uuid = ps.uuid
#ps_ops.change_primary_storage_state(ps_uuid, 'disable')
test_stub.disable_all_pss()
if not test_lib.lib_wait_target_up(vm.get_vm().vmNics[0].ip, '22', 90):
test_util.test_fail('VM is expected to running when PS change to disable state')
vm.set_state(vm_header.RUNNING)
vm.check()
volume.delete()
volume.check()
volume.expunge()
volume.check()
#ps_ops.change_primary_storage_state(ps_uuid, 'enable')
test_stub.enable_all_pss()
host_ops.reconnect_host(host_uuid)
vm_ops.reconnect_vr(vr_uuid)
vm.destroy()
test_lib.lib_set_delete_policy('vm', 'Direct')
test_lib.lib_set_delete_policy('volume', 'Direct')
test_util.test_pass('Delete volume under PS disable mode Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:60,代码来源:test_disable_ps_expunge_attached_share_vol.py
示例11: test
def test():
global vm
vm = test_stub.create_vm()
vm.check()
delete_policy = test_lib.lib_set_delete_policy('vm', 'Delay')
vm.destroy()
ha_ops.set_vm_instance_ha_level(vm.get_vm().uuid, "NeverStop")
vm.recover()
time.sleep(60)
vm.set_state(vm_header.RUNNING)
vm.check()
vm.destroy()
test_lib.lib_set_delete_policy('vm', delete_policy)
test_util.test_pass('VM ha never stop auto start with recover Test Success')
开发者ID:wolfhml,项目名称:zstack-woodpecker,代码行数:14,代码来源:test_vm_del_ha_never_stop_recover_auto_start.py
示例12: test
def test():
setup = setup_actions.SetupAction()
setup.plan = test_lib.all_config
setup.run()
if os.path.exists(EXTRA_SUITE_SETUP_SCRIPT):
os.system("bash %s" % EXTRA_SUITE_SETUP_SCRIPT)
deploy_operations.deploy_initial_database(test_lib.deploy_config)
delete_policy = test_lib.lib_set_delete_policy('vm', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('volume', 'Direct')
delete_policy = test_lib.lib_set_delete_policy('image', 'Direct')
if test_lib.lib_get_ha_selffencer_maxattempts() != None:
test_lib.lib_set_ha_selffencer_maxattempts('60')
test_lib.lib_set_ha_selffencer_storagechecker_timeout('60')
test_util.test_pass('Suite Setup Success')
开发者ID:dennis-sun-chao,项目名称:zstack-woodpecker,代码行数:15,代码来源:suite_setup.py
示例13: test
def test():
global test_obj_dict
global ps_uuid
global host_uuid
global vr_uuid
test_util.test_dsc('Create test vm and check')
test_lib.lib_set_delete_policy('vm', 'Delay')
test_lib.lib_set_delete_policy('volume', 'Delay')
l3_1_name = os.environ.get('l3VlanNetworkName1')
vm = test_stub.create_vlan_vm(l3_name=l3_1_name)
#l3_1 = test_lib.lib_get_l3_by_name(l3_1_name)
#vr = test_lib.lib_find_vr_by_l3_uuid(l3_1.uuid)[0]
#vr_uuid = vr.uuid
host = test_lib.lib_get_vm_host(vm.get_vm())
host_uuid = host.uuid
test_obj_dict.add_vm(vm)
vm.check()
disk_offering = test_lib.lib_get_disk_offering_by_name(os.environ.get('rootDiskOfferingName'))
volume_creation_option = test_util.VolumeOption()
volume_creation_option.set_disk_offering_uuid(disk_offering.uuid)
volume_creation_option.set_system_tags(['ephemeral::shareable', 'capability::virtio-scsi'])
volume = test_stub.create_volume(volume_creation_option)
test_obj_dict.add_volume(volume)
volume.check()
volume.delete()
volume.check()
ps = test_lib.lib_get_primary_storage_by_vm(vm.get_vm())
ps_uuid = ps.uuid
ps_ops.change_primary_storage_state(ps_uuid, 'maintain')
if not test_lib.lib_wait_target_down(vm.get_vm().vmNics[0].ip, '22', 90):
test_util.test_fail('VM is expected to stop when PS change to maintain state')
vm.set_state(vm_header.STOPPED)
vm.check()
volume.recover()
volume.check()
ps_ops.change_primary_storage_state(ps_uuid, 'enable')
host_ops.reconnect_host(host_uuid)
#vm_ops.reconnect_vr(vr_uuid)
vrs = test_lib.lib_get_all_vrs()
for vr in vrs:
vm_ops.start_vm(vr.uuid)
vm.start()
vm.check()
volume.delete()
#volume.expunge()
volume.check()
vm.destroy()
test_lib.lib_set_delete_policy('vm', 'Direct')
test_lib.lib_set_delete_policy('volume', 'Direct')
test_util.test_pass('Delete volume under PS maintain mode Test Success')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:58,代码来源:test_maintain_ps_recover_share_vol.py
示例14: error_cleanup
def error_cleanup():
global vcenter_uuid1
global vcenter_uuid2
global mevoco2_ip
global img_uuid
global delete_policy1
global delete_policy2
test_lib.lib_set_delete_policy('image', delete_policy1)
if img_uuid:
img_ops.delete_image(img_uuid)
img_ops.expunge_image(img_uuid)
if vcenter_uuid1:
vct_ops.delete_vcenter(vcenter_uuid1)
if vcenter_uuid2:
os.environ['ZSTACK_BUILT_IN_HTTP_SERVER_IP'] = mevoco2_ip
test_lib.lib_set_delete_policy('image', delete_policy2)
vct_ops.delete_vcenter(vcenter_uuid2)
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:20,代码来源:test_vcenter_sync_image_add_delete.py
示例15: test
def test():
global delete_policy
delete_policy = test_lib.lib_set_delete_policy('vm', 'Delay')
test_util.test_dsc('Test storage capacity when using expunge vm')
zone_uuid = res_ops.query_resource(res_ops.ZONE)[0].uuid
cond = res_ops.gen_query_conditions('state', '=', 'Enabled')
cond = res_ops.gen_query_conditions('status', '=', 'Connected', cond)
host = res_ops.query_resource_with_num(res_ops.HOST, cond, limit = 1)
if not host:
test_util.test_skip('No Enabled/Connected host was found, skip test.' )
return True
ps = res_ops.query_resource_with_num(res_ops.PRIMARY_STORAGE, cond, limit = 1)
if not ps:
test_util.test_skip('No Enabled/Connected primary storage was found, skip test.' )
return True
host = host[0]
ps = ps[0]
host_res = vol_ops.get_local_storage_capacity(host.uuid, ps.uuid)[0]
avail_cap = host_res.availableCapacity
vm = test_stub.create_vm(vm_name = 'basic-test-vm', host_uuid = host.uuid)
test_obj_dict.add_vm(vm)
host_res1 = vol_ops.get_local_storage_capacity(host.uuid, ps.uuid)[0]
avail_cap1 = host_res1.availableCapacity
if avail_cap == avail_cap1:
test_util.test_fail('PS capacity is same after create vm on host: %s. Capacity before create vm: %s, after create vm: %s ' % (host.uuid, avail_cap, avail_cap1))
time.sleep(1)
vm.destroy()
vm.expunge()
host_res2 = vol_ops.get_local_storage_capacity(host.uuid, ps.uuid)[0]
avail_cap2 = host_res2.availableCapacity
if avail_cap != avail_cap2:
test_util.test_fail('PS capacity is not same after create/expunge vm on host: %s. Capacity before create vm: %s, after expunge vm: %s ' % (host.uuid, avail_cap, avail_cap2))
test_lib.lib_set_delete_policy('vm', delete_policy)
test_util.test_pass('Expunge VM Test Success')
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:38,代码来源:test_expunge_vm_ps_capacity_by_get.py
示例16: test
def test():
global delete_policy1
global delete_policy2
delete_policy1 = test_lib.lib_set_delete_policy('vm', 'Delay')
delete_policy2 = test_lib.lib_set_delete_policy('volume', 'Delay')
test_util.test_dsc('Test storage capacity when using expunge vm')
zone_uuid = res_ops.query_resource(res_ops.ZONE)[0].uuid
cond = res_ops.gen_query_conditions('state', '=', 'Enabled')
cond = res_ops.gen_query_conditions('status', '=', 'Connected', cond)
host = res_ops.query_resource_with_num(res_ops.HOST, cond, limit = 1)
if not host:
test_util.test_skip('No Enabled/Connected host was found, skip test.' )
return True
ps = res_ops.query_resource_with_num(res_ops.PRIMARY_STORAGE, cond, limit = 1)
if not ps:
test_util.test_skip('No Enabled/Connected primary storage was found, skip test.' )
return True
host = host[0]
ps = ps[0]
host_res = vol_ops.get_local_storage_capacity(host.uuid, ps.uuid)[0]
avail_cap = host_res.availableCapacity
vm = test_stub.create_vm(vm_name = 'basic-test-vm', host_uuid = host.uuid)
test_obj_dict.add_vm(vm)
data_volume_size = 1024 * 1024 * 10
disk_offering_option = test_util.DiskOfferingOption()
disk_offering_option.set_name('test-expunge-data-volume')
disk_offering_option.set_diskSize(data_volume_size)
data_volume_offering = vol_ops.create_volume_offering(disk_offering_option)
test_obj_dict.add_disk_offering(data_volume_offering)
volume_creation_option = test_util.VolumeOption()
volume_creation_option.set_disk_offering_uuid(data_volume_offering.uuid)
volume_creation_option.set_name('volume-1')
volume = test_stub.create_volume(volume_creation_option)
test_obj_dict.add_volume(volume)
volume.attach(vm)
time.sleep(1)
vm.destroy()
vm.expunge()
test_obj_dict.rm_vm(vm)
volume.delete()
volume.expunge()
test_obj_dict.rm_volume(volume)
host_res2 = vol_ops.get_local_storage_capacity(host.uuid, ps.uuid)[0]
avail_cap2 = host_res.availableCapacity
if avail_cap != avail_cap2:
test_util.test_fail('PS capacity is not same after create/expunge vm/volume on host: %s. Capacity before create vm: %s, after expunge vm: %s ' % (host.uuid, avail_cap, avail_cap2))
test_lib.lib_set_delete_policy('vm', delete_policy1)
test_lib.lib_set_delete_policy('volume', delete_policy2)
test_lib.lib_robot_cleanup(test_obj_dict)
test_util.test_pass('Expunge VM Test Success')
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:57,代码来源:test_expunge_vm_volume_ps_capacity.py
示例17: test
def test():
global test_obj_dict
global ps_uuid
global host_uuid
global vr_uuid
test_util.test_dsc('Create test vm and check')
test_lib.lib_set_delete_policy('vm', 'Delay')
test_lib.lib_set_delete_policy('volume', 'Delay')
l3_1_name = os.environ.get('l3VlanNetworkName1')
vm = test_stub.create_vlan_vm(l3_name=l3_1_name)
l3_1 = test_lib.lib_get_l3_by_name(l3_1_name)
vr = test_lib.lib_find_vr_by_l3_uuid(l3_1.uuid)[0]
vr_uuid = vr.uuid
l3_1 = test_lib.lib_get_l3_by_name(l3_1_name)
host = test_lib.lib_get_vm_host(vm.get_vm())
host_uuid = host.uuid
test_obj_dict.add_vm(vm)
vm.check()
#ps = test_lib.lib_get_primary_storage_by_vm(vm.get_vm())
#ps_uuid = ps.uuid
#ps_ops.change_primary_storage_state(ps_uuid, 'disable')
test_stub.disable_all_pss()
if not test_lib.lib_wait_target_up(vm.get_vm().vmNics[0].ip, '22', 90):
test_util.test_fail('VM is expected to running when PS change to disable state')
vm.set_state(vm_header.RUNNING)
vm.check()
vm.destroy()
vm.check()
vm.expunge()
vm.check()
#ps_ops.change_primary_storage_state(ps_uuid, 'enable')
test_stub.enable_all_pss()
host_ops.reconnect_host(host_uuid)
vm_ops.reconnect_vr(vr_uuid)
test_lib.lib_set_delete_policy('vm', 'Direct')
test_lib.lib_set_delete_policy('volume', 'Direct')
test_util.test_pass('PS disable mode Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:42,代码来源:test_disable_ps_expunge_vm.py
示例18: error_cleanup
def error_cleanup():
test_lib.lib_error_cleanup(test_obj_dict)
os.system('rm -f %s' % test_image)
test_lib.lib_set_delete_policy('image', delete_policy)
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:4,代码来源:test_delete_image_bs_capacity.py
示例19: test
def test():
global test_obj_dict
global ps_uuid
global host_uuid
global vr_uuid
test_util.test_dsc('Create test vm and check')
test_lib.lib_set_delete_policy('vm', 'Delay')
test_lib.lib_set_delete_policy('volume', 'Delay')
l3_1_name = os.environ.get('l3VlanNetworkName1')
vm = test_stub.create_vlan_vm(l3_name=l3_1_name)
#l3_1 = test_lib.lib_get_l3_by_name(l3_1_name)
#vr = test_lib.lib_find_vr_by_l3_uuid(l3_1.uuid)[0]
#vr_uuid = vr.uuid
#l3_1 = test_lib.lib_get_l3_by_name(l3_1_name)
host = test_lib.lib_get_vm_host(vm.get_vm())
host_uuid = host.uuid
test_obj_dict.add_vm(vm)
vm.check()
disk_offering = test_lib.lib_get_disk_offering_by_name(os.environ.get('rootDiskOfferingName'))
volume_creation_option = test_util.VolumeOption()
volume_creation_option.set_disk_offering_uuid(disk_offering.uuid)
#volume_creation_option.set_system_tags(['ephemeral::shareable', 'capability::virtio-scsi'])
volume = test_stub.create_volume(volume_creation_option)
test_obj_dict.add_volume(volume)
volume.check()
volume.delete()
volume.check()
#ps = test_lib.lib_get_primary_storage_by_vm(vm.get_vm())
#ps_uuid = ps.uuid
#ps_ops.change_primary_storage_state(ps_uuid, 'maintain')
test_stub.maintain_all_pss()
if not test_lib.lib_wait_target_down(vm.get_vm().vmNics[0].ip, '22', 90):
test_util.test_fail('VM is expected to stop when PS change to maintain state')
vm.set_state(vm_header.STOPPED)
vm.check()
volume.recover()
volume.check()
#ps_ops.change_primary_storage_state(ps_uuid, 'enable')
test_stub.enable_all_pss()
host_ops.reconnect_host(host_uuid)
#vm_ops.reconnect_vr(vr_uuid)
test_stub.ensure_pss_connected()
vrs = test_lib.lib_get_all_vrs()
time.sleep(5)
for vr in vrs:
vr_cond = res_ops.gen_query_conditions('uuid', '=', vr.uuid)
vr_inv = res_ops.query_resource(res_ops.VM_INSTANCE, vr_cond)[0]
if vr_inv.state == 'Stopped':
vm_ops.start_vm(vr.uuid)
else:
test_lib.lib_wait_target_up(vr_inv.vmNics[0].ip, '22', 360)
for _ in xrange(100):
if res_ops.query_resource(res_ops.VM_INSTANCE, vr_cond)[0].state != 'Running':
time.sleep(3)
else:
break
vm.start()
vm.check()
volume.delete()
#volume.expunge()
volume.check()
vm.destroy()
test_lib.lib_set_delete_policy('vm', 'Direct')
test_lib.lib_set_delete_policy('volume', 'Direct')
test_util.test_pass('Delete volume under PS maintain mode Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:73,代码来源:test_maintain_ps_recover_vol.py
示例20: error_cleanup
def error_cleanup():
test_lib.lib_error_cleanup(test_obj_dict)
test_lib.lib_set_delete_policy('vm', delete_policy)
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:3,代码来源:test_expunge_vm_ps_capacity_by_query.py
注:本文中的zstackwoodpecker.test_lib.lib_set_delete_policy函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论