本文整理汇总了Python中utils.log.logger.warning函数的典型用法代码示例。如果您正苦于以下问题:Python warning函数的具体用法?Python warning怎么用?Python warning使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了warning函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: cleanup_vm
def cleanup_vm(vm_name, provider_key, provider_mgmt):
try:
logger.info('Cleaning up VM {} on provider {}'.format(vm_name, provider_key))
provider_mgmt.delete_vm(vm_name)
except Exception as e:
logger.warning('Failed to clean up VM {} on provider {}: {}'.format(vm_name,
provider_key, str(e)))
开发者ID:petrblaho,项目名称:cfme_tests,代码行数:7,代码来源:provisioning.py
示例2: run
def run(self):
global workercinfo
global workerinfo
cnt = 0
while not self.thread_stop:
self.collect_net_stats()
containers = self.list_container()
countR = 0
conlist = []
for container in containers:
# collect data of each container
if not container == '':
conlist.append(container)
if not container in workercinfo.keys():
workercinfo[container] = {}
try:
success= self.collect_containerinfo(container)
if(success):
countR += 1
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
containers_num = len(containers)-1
concnt = {}
concnt['total'] = containers_num
concnt['running'] = countR
workerinfo['containers'] = concnt
time.sleep(self.interval)
if cnt == 0:
# update containers list on the worker each 5 times
workerinfo['containerslist'] = conlist
cnt = (cnt+1)%5
if self.test:
break
return
开发者ID:FirmlyReality,项目名称:docklet,代码行数:35,代码来源:monitor.py
示例3: template
def template(provider, provisioning, dialog_name):
template_type = provisioning['stack_provisioning']['template_type']
if provider.type == 'azure':
template_name = 'azure-single-vm-from-user-image'
else:
template_name = fauxfactory.gen_alphanumeric()
template = OrchestrationTemplate(template_type=template_type,
template_name=template_name)
if provider.type == "ec2":
method = AWS_TEMPLATE.replace('CloudFormation', random_desc())
elif provider.type == "openstack":
method = HEAT_TEMPLATE.replace('Simple', random_desc())
template.create(method)
if provider.type != "azure":
template.create_service_dialog_from_template(dialog_name, template.template_name)
yield template
try:
template.delete()
except CandidateNotFound as ex:
logger.warning('Exception deleting template fixture, continuing: {}'.format(ex.message))
pass
开发者ID:ManageIQ,项目名称:integration_tests,代码行数:27,代码来源:test_provision_stack.py
示例4: _cleanup_templates
def _cleanup_templates():
try:
stack_data['vm_name'].delete_from_provider()
except Exception as ex:
logger.warning('Exception while checking/deleting stack, continuing: {}'
.format(ex.message))
pass
开发者ID:ManageIQ,项目名称:integration_tests,代码行数:7,代码来源:test_provision_stack.py
示例5: test_suspend
def test_suspend(
self,
provider,
vm_name,
verify_vm_running,
mgmt_sys_api_clients,
register_event,
load_vm_details):
""" Test suspend operation from a vm details page. Verify vm
transitions to suspended. """
vm_details = load_vm_details
vm_details.wait_for_vm_state_change('on', 10)
state_chg_time = vm_details.last_pwr_state_change
register_event(get_sys_type(provider), "vm", vm_name, ["vm_suspend_req", "vm_suspend"])
vm_details.power_button.suspend()
try:
vm_details.wait_for_vm_state_change('suspended', 10)
except TimedOutError:
logger.warning('working around bz977489 by clicking the refresh button')
vm_details.config_button.refresh_relationships()
vm_details.wait_for_vm_state_change('suspended', 5)
Assert.equal(vm_details.power_state, 'suspended',
"power state incorrect")
Assert.not_equal(vm_details.last_pwr_state_change, state_chg_time,
"last state chg time failed to update")
Assert.true(mgmt_sys_api_clients[provider].is_vm_suspended(vm_name),
"vm not suspended")
开发者ID:sshveta,项目名称:cfme_tests,代码行数:27,代码来源:test_vm_power_control.py
示例6: test_start_from_suspend
def test_start_from_suspend(
self, testing_vm, verify_vm_suspended, soft_assert):
"""Tests start from suspend
Metadata:
test_flag: power_control, provision
"""
try:
testing_vm.provider.refresh_provider_relationships()
testing_vm.wait_for_vm_state_change(
desired_state=testing_vm.STATE_SUSPENDED, timeout=450, from_details=True)
except TimedOutError:
if testing_vm.provider.one_of(RHEVMProvider):
logger.warning('working around bz1174858, ignoring timeout')
else:
raise
last_boot_time = testing_vm.get_detail(properties=("Power Management", "Last Boot Time"))
testing_vm.power_control_from_cfme(option=testing_vm.POWER_ON, cancel=False,
from_details=True)
flash.assert_message_contain("Start initiated")
if_scvmm_refresh_provider(testing_vm.provider)
testing_vm.wait_for_vm_state_change(
desired_state=testing_vm.STATE_ON, timeout=720, from_details=True)
wait_for_last_boot_timestamp_refresh(testing_vm, last_boot_time, timeout=600)
soft_assert(
testing_vm.provider.mgmt.is_vm_running(testing_vm.name), "vm not running")
开发者ID:dajohnso,项目名称:cfme_tests,代码行数:26,代码来源:test_vm_power_control.py
示例7: checkskip
def checkskip(metafunc, argvalues):
"""Helper function to check if parametrizing yielded results
If argvalues is empty, the test being represented by metafunc will be skipped in collection.
Args:
metafunc: metafunc objects from pytest_generate_tests
argvalues: argvalues list for use in metafunc.parametrize
Returns:
True if this test should be skipped due to empty argvalues
"""
if not argvalues:
# module and class are optional, but function isn't
modname = getattr(metafunc.module, '__name__', None)
classname = getattr(metafunc.cls, '__name__', None)
funcname = metafunc.function.__name__
test_name = '.'.join(filter(None, (modname, classname, funcname)))
logger.warning('Parametrization for %s yielded no values, skipping' %
test_name)
# Raising pytest.skip in collection halts future fixture evaluation,
# and preemptively filters this test out of the test results
pytest.skip(msg="Parametrize yielded no values")(metafunc.function)
return True
开发者ID:sshveta,项目名称:cfme_tests,代码行数:26,代码来源:testgen.py
示例8: get_search_filter_spec
def get_search_filter_spec(self, *args, **kwargs):
# A datastore traversal spec is missing from this method in psphere.
# psav has opened a PR to add it, but until it gets merged we'll need to come behind
# psphere and add it in just like his PR does
# https://github.com/jkinred/psphere/pull/18/files
pfs = super(_PsphereClient, self).get_search_filter_spec(*args, **kwargs)
select_sets = pfs.objectSet[0].selectSet
missing_ss = 'datacenter_datastore_traversal_spec'
ss_names = [ss.name for ss in select_sets]
if missing_ss not in ss_names:
logger.trace('Injecting %s into psphere search filter spec', missing_ss)
# pull out the folder traversal spec traversal specs
fts_ts = pfs.objectSet[0].selectSet[0]
# and get the select set from the traversal spec
fts_ss = fts_ts.selectSet[0]
# add ds selection spec to folder traversal spec
dsss = self.create('SelectionSpec', name=missing_ss)
fts_ts.selectSet.append(dsss)
# add ds traversal spec to search filter object set select spec
dsts = self.create('TraversalSpec')
dsts.name = 'datacenter_datastore_traversal_spec'
dsts.type = 'Datacenter'
dsts.path = 'datastoreFolder'
dsts.selectSet = [fts_ss]
select_sets.append(dsts)
else:
logger.warning('%s already in psphere search filer spec, not adding it', missing_ss)
return pfs
开发者ID:richardfontana,项目名称:cfme_tests,代码行数:32,代码来源:virtualcenter.py
示例9: test_contents
def test_contents(appliance, soft_assert):
"""Test title of each document."""
view = navigate_to(appliance.server, 'Documentation')
cur_ver = appliance.version
for doc_type, title in doc_titles.items():
doc_widget = getattr(view.links, doc_type, None)
if not doc_widget:
logger.warning('Skipping contents check for document: "{}: {}", no widget to read'
.format(doc_type, title))
href = view.browser.get_attribute(attr='href',
locator=doc_widget.link.locator)
data = requests.get(href, verify=False)
pdf_titlepage_text_low = pdf_get_text(StringIO(data.content), [0]).lower()
# don't include the word 'guide'
expected = [title]
if cur_ver == version.LATEST:
expected.append('manageiq')
else:
expected.append('cloudforms')
maj_min = '{}.{}'.format(cur_ver.version[0], cur_ver.version[1])
expected.append(version.get_product_version(maj_min))
for exp_str in expected:
soft_assert(exp_str in pdf_titlepage_text_low, "{} not in {}"
.format(exp_str, pdf_titlepage_text_low))
开发者ID:dajohnso,项目名称:cfme_tests,代码行数:26,代码来源:test_docs.py
示例10: test_suspend
def test_suspend(self, provider_init, test_vm, verify_vm_running, soft_assert, register_event):
test_vm.wait_for_vm_state_change(
desired_state=Vm.STATE_ON, timeout=720, from_details=True)
last_boot_time = test_vm.get_detail(properties=("Power Management", "Last Boot Time"))
register_event(
test_vm.provider_crud.get_yaml_data()['type'],
"vm", test_vm.name, ["vm_suspend_req", "vm_suspend"])
test_vm.power_control_from_cfme(option=Vm.SUSPEND, cancel=False, from_details=True)
flash.assert_message_contain("Suspend initiated")
pytest.sel.force_navigate(
'infrastructure_provider', context={'provider': test_vm.provider_crud})
try:
test_vm.wait_for_vm_state_change(
desired_state='suspended', timeout=600, from_details=True)
except TimedOutError:
logger.warning('working around bz977489 by clicking the refresh button')
test_vm.refresh_relationships()
test_vm.wait_for_vm_state_change(
desired_state=Vm.STATE_SUSPENDED, timeout=300, from_details=True)
soft_assert(
test_vm.provider_crud.get_mgmt_system().is_vm_suspended(
test_vm.name), "vm not suspended")
# BUG - https://bugzilla.redhat.com/show_bug.cgi?id=1101604
if not isinstance(test_vm.provider_crud, RHEVMProvider):
new_last_boot_time = test_vm.get_detail(
properties=("Power Management", "Last Boot Time"))
soft_assert(new_last_boot_time == last_boot_time,
"ui: " + new_last_boot_time + " should == orig: " + last_boot_time)
开发者ID:slouderm,项目名称:cfme_tests,代码行数:28,代码来源:test_vm_power_control.py
示例11: dialog
def dialog():
dialog_name = "dialog_" + fauxfactory.gen_alphanumeric()
element_data = dict(
ele_label="ele_" + fauxfactory.gen_alphanumeric(),
ele_name=fauxfactory.gen_alphanumeric(),
ele_desc="my ele desc",
choose_type="Text Box",
default_text_box="default value"
)
service_dialog = ServiceDialog(label=dialog_name, description="my dialog",
submit=True, cancel=True,
tab_label="tab_" + fauxfactory.gen_alphanumeric(),
tab_desc="my tab desc",
box_label="box_" + fauxfactory.gen_alphanumeric(),
box_desc="my box desc")
service_dialog.create(element_data)
flash.assert_success_message('Dialog "{}" was added'.format(dialog_name))
yield service_dialog
# fixture cleanup
try:
service_dialog.delete()
except NoSuchElementException or TimeoutException:
logger.warning('test_catalog_item: dialog yield fixture cleanup, dialog "{}" not '
'found'.format(dialog_name))
开发者ID:RonnyPfannschmidt,项目名称:cfme_tests,代码行数:27,代码来源:test_catalog_item.py
示例12: remove_label
def remove_label(self, name, silent_failure=False):
"""Remove label by name.
:var: name: name of label
:var: silent_failure: whether to raise an error or not in case of failure.
Returns:
:py:type:`bool` pass or fail
Raises:
:py:class:`LabelNotFoundException`.
"""
json_content = self._get_json()
if name not in json_content['metadata'].get('labels', {}).keys():
failure_signature = 'Could not find label "{}", labels: {}' \
.format(name, json_content['metadata']['labels'])
if silent_failure:
logger.warning(failure_signature)
return False
else:
raise exceptions.LabelNotFoundException(failure_signature)
self.provider.cli.run_command(
'oc label {} {} {}-'.format(
self._cli_resource_type,
('sha256:{}'.format(self.sha256)
if (self.__class__.__name__ == 'Image') else self.name),
name
)
)
return True
开发者ID:dajohnso,项目名称:cfme_tests,代码行数:28,代码来源:__init__.py
示例13: collect_diskinfo
def collect_diskinfo(self):
global workercinfo
parts = psutil.disk_partitions()
setval = []
devices = {}
for part in parts:
# deal with each partition
if not part.device in devices:
devices[part.device] = 1
diskval = {}
diskval['device'] = part.device
diskval['mountpoint'] = part.mountpoint
try:
usage = psutil.disk_usage(part.mountpoint)
diskval['total'] = usage.total
diskval['used'] = usage.used
diskval['free'] = usage.free
diskval['percent'] = usage.percent
if(part.mountpoint.startswith('/opt/docklet/local/volume')):
# the mountpoint indicate that the data is the disk used information of a container
names = re.split('/',part.mountpoint)
container = names[len(names)-1]
if not container in workercinfo.keys():
workercinfo[container] = {}
workercinfo[container]['disk_use'] = diskval
setval.append(diskval) # make a list
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
#print(output)
#print(diskparts)
return setval
开发者ID:FirmlyReality,项目名称:docklet,代码行数:32,代码来源:monitor.py
示例14: cleanup_host
def cleanup_host():
try:
logger.info('Cleaning up host %s on provider %s' % (prov_host_name, provider_crud.key))
mgmt_system = provider_crud.get_mgmt_system()
host_list = mgmt_system.list_host()
if host_provisioning['ip_addr'] in host_list:
wait_for(mgmt_system.is_host_connected, [host_provisioning['ip_addr']])
mgmt_system.remove_host_from_cluster(host_provisioning['ip_addr'])
ipmi = test_host.get_ipmi()
ipmi.power_off()
# During host provisioning,the host name gets changed from what's specified at creation
# time.If host provisioning succeeds,the original name is reverted to,otherwise the
# changed names are retained upon failure
renamed_host_name1 = "{} ({})".format('IPMI', host_provisioning['ipmi_address'])
renamed_host_name2 = "{} ({})".format('VMware ESXi', host_provisioning['ip_addr'])
host_list_ui = host.get_all_hosts()
if host_provisioning['hostname'] in host_list_ui:
test_host.delete(cancel=False)
host.wait_for_host_delete(test_host)
elif renamed_host_name1 in host_list_ui:
host_renamed_obj1 = host.Host(name=renamed_host_name1)
host_renamed_obj1.delete(cancel=False)
host.wait_for_host_delete(host_renamed_obj1)
elif renamed_host_name2 in host_list_ui:
host_renamed_obj2 = host.Host(name=renamed_host_name2)
host_renamed_obj2.delete(cancel=False)
host.wait_for_host_delete(host_renamed_obj2)
except:
# The mgmt_sys classes raise Exception :\
logger.warning('Failed to clean up host %s on provider %s' %
(prov_host_name, provider_crud.key))
开发者ID:seandst,项目名称:cfme_tests,代码行数:34,代码来源:test_host_provisioning.py
示例15: instance
def instance(setup_providers, provider_key, provider_mgmt, provisioning, provider_crud):
# tries to delete the VM that gets created here
vm_name = 'test_image_prov_%s' % generate_random_string()
image = provisioning['image']['name']
note = ('Testing provisioning from image %s to vm %s on provider %s' %
(image, vm_name, provider_crud.key))
instance = prov.Instance(
name=vm_name,
email='[email protected]',
first_name='Image',
last_name='Provisioner',
notes=note,
instance_type=provisioning['instance_type'],
availability_zone=provisioning['availability_zone'],
security_groups=[provisioning['security_group']],
provider_mgmt=provider_mgmt,
provider=provider_crud,
guest_keypair="shared",
template=prov.Template(image))
instance.create()
yield instance
try:
logger.info('Cleaning up VM %s on provider %s' % (vm_name, provider_key))
provider_mgmt.delete_vm(vm_name)
except:
# The mgmt_sys classes raise Exception :\
logger.warning('Failed to clean up VM %s on provider %s' % (vm_name, provider_key))
开发者ID:jkrocil,项目名称:cfme_tests,代码行数:28,代码来源:test_provisioning.py
示例16: cleanup_vm
def cleanup_vm(vm_name, provider):
try:
logger.info('Cleaning up VM %s on provider %s', vm_name, provider.key)
provider.mgmt.delete_vm(vm_name)
except:
# The mgmt_sys classes raise Exception :\
logger.warning('Failed to clean up VM %s on provider %s', vm_name, provider.key)
开发者ID:rananda,项目名称:cfme_tests,代码行数:7,代码来源:provider.py
示例17: cleanup_vm
def cleanup_vm(vm_name, provider_key, provider_mgmt):
try:
logger.info('Cleaning up VM %s on provider %s' % (vm_name, provider_key))
provider_mgmt.delete_vm(vm_name + "_0001")
except:
# The mgmt_sys classes raise Exception :\
logger.warning('Failed to clean up VM %s on provider %s' % (vm_name, provider_key))
开发者ID:seandst,项目名称:cfme_tests,代码行数:7,代码来源:test_iso_service_catalogs.py
示例18: test_suspend
def test_suspend(self, test_vm, verify_vm_running, soft_assert, register_event, bug):
"""Tests suspend
Metadata:
test_flag: power_control, provision
"""
test_vm.wait_for_vm_state_change(
desired_state=test_vm.STATE_ON, timeout=720, from_details=True)
last_boot_time = test_vm.get_detail(properties=("Power Management", "Last Boot Time"))
register_event(
test_vm.provider.type,
"vm", test_vm.name, ["vm_suspend_req", "vm_suspend"])
test_vm.power_control_from_cfme(option=test_vm.SUSPEND, cancel=False, from_details=True)
flash.assert_message_contain("Suspend initiated")
pytest.sel.force_navigate(
'infrastructure_provider', context={'provider': test_vm.provider})
if_scvmm_refresh_provider(test_vm.provider)
try:
test_vm.wait_for_vm_state_change(
desired_state=test_vm.STATE_SUSPENDED, timeout=450, from_details=True)
except TimedOutError as e:
if test_vm.provider.type == "rhevm":
logger.warning('working around bz1174858, ignoring timeout')
else:
raise e
soft_assert(
test_vm.provider.mgmt.is_vm_suspended(
test_vm.name), "vm not suspended")
# BUG - https://bugzilla.redhat.com/show_bug.cgi?id=1101604
if test_vm.provider.type != "rhevm":
new_last_boot_time = test_vm.get_detail(
properties=("Power Management", "Last Boot Time"))
soft_assert(new_last_boot_time == last_boot_time,
"ui: {} should == orig: {}".format(new_last_boot_time, last_boot_time))
开发者ID:MattLombana,项目名称:cfme_tests,代码行数:34,代码来源:test_vm_power_control.py
示例19: test_labels_remove
def test_labels_remove(provider, soft_assert, random_labels):
# Removing the labels
for instance, label_name, label_value, results_status, _ in random_labels:
if results_status:
instance.remove_label(label_name)
else:
logger.warning('Cannot remove label ({} = {}) for {} {}. (failed to add it previously)'
.format(label_name, label_value,
instance.__class__.__name__, instance.name))
provider.refresh_provider_relationships()
# Verify that the labels removed successfully from UI:
for instance, label_name, label_value, results_status, _ in random_labels:
if results_status:
soft_assert(
wait_for(
lambda: not check_labels_in_ui(instance, label_name, label_value),
num_sec=120, delay=10,
fail_func=instance.summary.reload,
message='Verifying label ({} = {}) for {} {} removed'
.format(label_name, label_value,
instance.__class__.__name__, instance.name),
silent_failure=True),
'Label ({} = {}) for {} {} found in UI (but should be removed).'
.format(label_name, label_value, instance.__class__.__name__, instance.name)
)
开发者ID:dajohnso,项目名称:cfme_tests,代码行数:26,代码来源:test_labels.py
示例20: patch_file
def patch_file(self, local_path, remote_path, md5=None):
""" Patches a single file on the appliance
Args:
local_path: Path to patch (diff) file
remote_path: Path to file to be patched (on the appliance)
md5: MD5 checksum of the original file to check if it has changed
Returns:
True if changes were applied, False if patching was not necessary
Note:
If there is a .bak file present and the file-to-be-patched was
not patched by the current patch-file, it will be used to restore it first.
Recompiling assets and restarting appropriate services might be required.
"""
logger.info('Patching {remote_path}'.format(remote_path=remote_path))
# Upload diff to the appliance
diff_remote_path = os_path.join('/tmp/', os_path.basename(remote_path))
self.put_file(local_path, diff_remote_path)
# If already patched with current file, exit
logger.info('Checking if already patched')
rc, out = self.run_command(
'patch {} {} -f --dry-run -R'.format(remote_path, diff_remote_path))
if rc == 0:
return False
# If we have a .bak file available, it means the file is already patched
# by some older patch; in that case, replace the file-to-be-patched by the .bak first
logger.info("Checking if {}.bak is available".format(remote_path))
rc, out = self.run_command('test -e {}.bak'.format(remote_path))
if rc == 0:
logger.info(
"{}.bak found; using it to replace {}".format(remote_path, remote_path))
rc, out = self.run_command('mv {}.bak {}'.format(remote_path, remote_path))
if rc != 0:
raise Exception(
"Unable to replace {} with {}.bak".format(remote_path, remote_path))
else:
logger.info("{}.bak not found".format(remote_path))
# If not patched and there's MD5 checksum available, check it
if md5:
logger.info("MD5 sum check in progress for {remote_path}".format(
remote_path=remote_path))
rc, out = self.run_command('md5sum -c - <<< "{} {}"'.format(md5, remote_path))
if rc == 0:
logger.info('MD5 sum check result: file not changed')
else:
logger.warning('MD5 sum check result: file has been changed!')
# Create the backup and patch
rc, out = self.run_command(
'patch {} {} -f -b -z .bak'.format(remote_path, diff_remote_path))
if rc != 0:
raise Exception("Unable to patch file {}: {}".format(remote_path, out))
return True
开发者ID:rananda,项目名称:cfme_tests,代码行数:59,代码来源:ssh.py
注:本文中的utils.log.logger.warning函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论