本文整理汇总了Python中salt.ext.six.moves.range函数的典型用法代码示例。如果您正苦于以下问题:Python range函数的具体用法?Python range怎么用?Python range使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了range函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _xfs_inventory_output
def _xfs_inventory_output(out):
'''
Transform xfsrestore inventory data output to a Python dict source and evaluate it.
'''
data = []
out = [line for line in out.split("\n") if line.strip()]
# No inventory yet
if len(out) == 1 and 'restore status' in out[0].lower():
return {'restore_status': out[0]}
ident = 0
data.append("{")
for line in out[:-1]:
if len([elm for elm in line.strip().split(":") if elm]) == 1:
n_ident = len(re.sub("[^\t]", "", line))
if ident > n_ident:
for step in range(ident):
data.append("},")
ident = n_ident
data.append(_xr_to_keyset(line))
data.append("{")
else:
data.append(_xr_to_keyset(line))
for step in range(ident + 1):
data.append("},")
data.append("},")
# We are evaling into a python dict, a json load
# would be safer
data = eval('\n'.join(data))[0] # pylint: disable=W0123
data['restore_status'] = out[-1]
return data
开发者ID:DavideyLee,项目名称:salt,代码行数:34,代码来源:xfs.py
示例2: hex2ip
def hex2ip(hex_ip, invert=False):
'''
Convert a hex string to an ip, if a failure occurs the original hex is
returned. If 'invert=True' assume that ip from /proc/net/<proto>
'''
if len(hex_ip) == 32: # ipv6
ip = []
for i in range(0, 32, 8):
ip_part = hex_ip[i:i + 8]
ip_part = [ip_part[x:x + 2] for x in range(0, 8, 2)]
if invert:
ip.append("{0[3]}{0[2]}:{0[1]}{0[0]}".format(ip_part))
else:
ip.append("{0[0]}{0[1]}:{0[2]}{0[3]}".format(ip_part))
try:
return ipaddress.IPv6Address(":".join(ip)).compressed
except ipaddress.AddressValueError as ex:
log.error('hex2ip - ipv6 address error: {0}'.format(ex))
return hex_ip
try:
hip = int(hex_ip, 16)
except ValueError:
return hex_ip
if invert:
return '{3}.{2}.{1}.{0}'.format(hip >> 24 & 255,
hip >> 16 & 255,
hip >> 8 & 255,
hip & 255)
return '{0}.{1}.{2}.{3}'.format(hip >> 24 & 255,
hip >> 16 & 255,
hip >> 8 & 255,
hip & 255)
开发者ID:bryson,项目名称:salt,代码行数:33,代码来源:network.py
示例3: plugins
def plugins():
'''
Fetches the plugins added to the database server
CLI Example::
salt '*' drizzle.plugins
'''
# Initializing the required variables
ret_val = {}
count = 1
drizzle_db = _connect()
cursor = drizzle_db.cursor()
# Fetching the plugins
query = 'SELECT PLUGIN_NAME FROM DATA_DICTIONARY.PLUGINS WHERE IS_ACTIVE LIKE "YES"'
cursor.execute(query)
for iter, count in zip(list(range(cursor.rowcount)), list(range(1, cursor.rowcount+1))):
table = cursor.fetchone()
ret_val[count] = table[0]
cursor.close()
drizzle_db.close()
return ret_val
开发者ID:beornf,项目名称:salt-contrib,代码行数:25,代码来源:drizzle.py
示例4: tables
def tables(schema):
'''
Displays all the tables that are
present in the given schema
CLI Example::
salt '*' drizzle.tables schema_name
'''
# Initializing the required variables
ret_val = {}
drizzle_db = _connect()
cursor = drizzle_db.cursor()
# Fetching tables
try:
cursor.execute('SHOW TABLES IN {0}'.format(schema))
except MySQLdb.OperationalError:
return 'Unknown Schema'
for iter, count in zip(list(range(cursor.rowcount)), list(range(1, cursor.rowcount+1))):
table = cursor.fetchone()
ret_val[count] = table[0]
cursor.close()
drizzle_db.close()
return ret_val
开发者ID:beornf,项目名称:salt-contrib,代码行数:28,代码来源:drizzle.py
示例5: test_reload
def test_reload(self):
# ensure it doesn't exist
self.assertNotIn(self.module_key, self.loader)
# update both the module and the lib
for x in range(1, 3):
self.update_module()
self.update_lib()
self.loader.clear()
self.assertEqual(self.loader[self.module_key](), (self.count, self.lib_count))
# update just the module
for x in range(1, 3):
self.update_module()
self.loader.clear()
self.assertEqual(self.loader[self.module_key](), (self.count, self.lib_count))
# update just the lib
for x in range(1, 3):
self.update_lib()
self.loader.clear()
self.assertEqual(self.loader[self.module_key](), (self.count, self.lib_count))
self.rm_module()
# make sure that even if we remove the module, its still loaded until a clear
self.assertEqual(self.loader[self.module_key](), (self.count, self.lib_count))
self.loader.clear()
self.assertNotIn(self.module_key, self.loader)
开发者ID:bryson,项目名称:salt,代码行数:28,代码来源:loader.py
示例6: sunos_netdev
def sunos_netdev():
'''
sunos specific implementation of netdev
'''
ret = {}
##NOTE: we cannot use hwaddr_interfaces here, so we grab both ip4 and ip6
for dev in __grains__['ip4_interfaces'].keys() + __grains__['ip6_interfaces'].keys():
# fetch device info
netstat_ipv4 = __salt__['cmd.run']('netstat -i -I {dev} -n -f inet'.format(dev=dev)).splitlines()
netstat_ipv6 = __salt__['cmd.run']('netstat -i -I {dev} -n -f inet6'.format(dev=dev)).splitlines()
# prepare data
netstat_ipv4[0] = netstat_ipv4[0].split()
netstat_ipv4[1] = netstat_ipv4[1].split()
netstat_ipv6[0] = netstat_ipv6[0].split()
netstat_ipv6[1] = netstat_ipv6[1].split()
# add data
ret[dev] = {}
for i in range(len(netstat_ipv4[0])-1):
if netstat_ipv4[0][i] == 'Name':
continue
if netstat_ipv4[0][i] in ['Address', 'Net/Dest']:
ret[dev]['IPv4 {field}'.format(field=netstat_ipv4[0][i])] = netstat_ipv4[1][i]
else:
ret[dev][netstat_ipv4[0][i]] = _number(netstat_ipv4[1][i])
for i in range(len(netstat_ipv6[0])-1):
if netstat_ipv6[0][i] == 'Name':
continue
if netstat_ipv6[0][i] in ['Address', 'Net/Dest']:
ret[dev]['IPv6 {field}'.format(field=netstat_ipv6[0][i])] = netstat_ipv6[1][i]
else:
ret[dev][netstat_ipv6[0][i]] = _number(netstat_ipv6[1][i])
return ret
开发者ID:bryson,项目名称:salt,代码行数:35,代码来源:status.py
示例7: _gather_update_categories
def _gather_update_categories(updateCollection):
categories = []
for i in range(updateCollection.Count):
update = updateCollection.Item(i)
for j in range(update.Categories.Count):
name = update.Categories.Item(j).Name
if name not in categories:
log.debug('found category: {0}'.format(name))
categories.append(name)
return categories
开发者ID:beornf,项目名称:salt-contrib,代码行数:10,代码来源:win_update.py
示例8: test_event_many_backlog
def test_event_many_backlog(self):
'''Test a large number of events, send all then recv all'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR, listen=True)
# Must not exceed zmq HWM
for i in range(500):
me.fire_event({'data': '{0}'.format(i)}, 'testevents')
for i in range(500):
evt = me.get_event(tag='testevents')
self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
开发者ID:mahak,项目名称:salt,代码行数:10,代码来源:event_test.py
示例9: rm_special
def rm_special(user, special, cmd):
'''
Remove a special cron job for a specified user.
CLI Example:
.. code-block:: bash
salt '*' cron.rm_job root @hourly /usr/bin/foo
'''
lst = list_tab(user)
ret = 'absent'
rm_ = None
for ind in range(len(lst['special'])):
if lst['special'][ind]['cmd'] == cmd and \
lst['special'][ind]['spec'] == special:
lst['special'].pop(ind)
rm_ = ind
if rm_ is not None:
ret = 'removed'
comdat = _write_cron_lines(user, _render_tab(lst))
if comdat['retcode']:
# Failed to commit
return comdat['stderr']
return ret
开发者ID:bryson,项目名称:salt,代码行数:25,代码来源:cron.py
示例10: _retry_get_url
def _retry_get_url(url, num_retries=10, timeout=5):
'''
Retry grabbing a URL.
Based heavily on boto.utils.retry_url
'''
for i in range(0, num_retries):
try:
result = requests.get(url, timeout=timeout, proxies={'http': ''})
if hasattr(result, 'text'):
return result.text
elif hasattr(result, 'content'):
return result.content
else:
return ''
except requests.exceptions.HTTPError as exc:
return ''
except Exception as exc:
pass
log.warning(
'Caught exception reading from URL. Retry no. {0}'.format(i)
)
log.warning(pprint.pformat(exc))
time.sleep(2 ** i)
log.error(
'Failed to read from URL for {0} times. Giving up.'.format(num_retries)
)
return ''
开发者ID:shineforever,项目名称:ops,代码行数:28,代码来源:iam.py
示例11: check_inheritance
def check_inheritance(path, objectType):
'''
check a specified path to verify if inheritance is enabled
returns 'Inheritance' of True/False
hkey: HKEY_LOCAL_MACHINE, HKEY_CURRENT_USER, etc
path: path of the registry key to check
'''
ret = {'result': False,
'Inheritance': False,
'comment': []}
dc = daclConstants()
objectType = dc.getObjectTypeBit(objectType)
path = dc.processPath(path, objectType)
try:
sd = win32security.GetNamedSecurityInfo(path, objectType, win32security.DACL_SECURITY_INFORMATION)
dacls = sd.GetSecurityDescriptorDacl()
except Exception as e:
ret['result'] = False
ret['comment'].append((
'Error obtaining the Security Descriptor or DACL of the path: {0}'
).format(e))
return ret
for counter in range(0, dacls.GetAceCount()):
ace = dacls.GetAce(counter)
if (ace[0][1] & win32security.INHERITED_ACE) == win32security.INHERITED_ACE:
ret['Inheritance'] = True
ret['result'] = True
return ret
开发者ID:mahak,项目名称:salt,代码行数:32,代码来源:win_dacl.py
示例12: generate_selectors
def generate_selectors(labels=None, *fields, **kwargs):
'''
Create an element list based in another objects labels that will create
a value of True in the corresponding element if in either selectors
or kwargs, otherwise False.
Args:
labels:
Example:
>>> labels = ['one', 'two', 'three', 'four']
>>> fields = ['two', 'three']
>>> generate_selectors(labels, fields)
[False, True, True, False]
'''
if not labels:
return []
enabled = True if 'all' in fields or 'all' in kwargs else False
selectors = [enabled for i in range(len(labels))] # pylint: disable=W0612
if enabled:
return selectors
for index, selector in enumerate(labels):
if selector in fields or selector in kwargs:
selectors[index] = True
return selectors
开发者ID:joesaland,项目名称:qubes-mgmt-salt-base-topd,代码行数:28,代码来源:matcher.py
示例13: nameservers
def nameservers(ns,
host=None,
admin_username=None,
admin_password=None,
module=None):
'''
Configure the nameservers on the DRAC
CLI Example:
.. code-block:: bash
salt dell dracr.nameservers [NAMESERVERS]
salt dell dracr.nameservers ns1.example.com ns2.example.com
admin_username=root admin_password=calvin module=server-1
host=192.168.1.1
'''
if len(ns) > 2:
log.warning('racadm only supports two nameservers')
return False
for i in range(1, len(ns) + 1):
if not __execute_cmd('config -g cfgLanNetworking -o '
'cfgDNSServer{0} {1}'.format(i, ns[i - 1]),
host=host,
admin_username=admin_username,
admin_password=admin_password,
module=module):
return False
return True
开发者ID:mahak,项目名称:salt,代码行数:31,代码来源:dracr.py
示例14: GetInstallationResults
def GetInstallationResults(self):
'''
this gets results of installation process.
'''
# if the blugger is empty, the results are nil.
log.debug('blugger has {0} updates in it'.format(self.install_collection.Count))
if self.install_collection.Count == 0:
return {}
updates = []
log.debug('repairing update list')
for i in range(self.install_collection.Count):
# this gets the result from install_results, but the title comes from the update
# collection install_collection.
updates.append('{0}: {1}'.format(
self.install_results.GetUpdateResult(i).ResultCode,
self.install_collection.Item(i).Title))
log.debug('Update results enumerated, now making a library to pass back')
results = {}
# translates the list of update results into a library that salt expects.
for i, update in enumerate(updates):
results['update {0}'.format(i)] = update
log.debug('Update information complied. returning')
return results
开发者ID:bryson,项目名称:salt,代码行数:27,代码来源:win_update.py
示例15: stop
def stop(name):
'''
Stop the specified service
CLI Example:
.. code-block:: bash
salt '*' service.stop <service name>
'''
# net stop issues a stop command and waits briefly (~30s), but will give
# up if the service takes too long to stop with a misleading
# "service could not be stopped" message and RC 0.
cmd = ['net', 'stop', name]
res = __salt__['cmd.run'](cmd, python_shell=False)
if 'service was stopped' in res:
return True
# we requested a stop, but the service is still thinking about it.
# poll for the real status
for attempt in range(SERVICE_STOP_POLL_MAX_ATTEMPTS):
if not status(name):
return True
log.debug('Waiting for %s to stop', name)
time.sleep(SERVICE_STOP_DELAY_SECONDS)
log.warning('Giving up on waiting for service `%s` to stop', name)
return False
开发者ID:DaveQB,项目名称:salt,代码行数:29,代码来源:win_service.py
示例16: waitfor_job
def waitfor_job(conn=None, LinodeID=None, JobID=None, timeout=300, quiet=True):
if not conn:
conn = get_conn()
interval = 5
iterations = int(timeout / interval)
for i in range(0, iterations):
try:
result = conn.linode_job_list(LinodeID=LinodeID, JobID=JobID)
except linode.ApiError as exc:
log.info('Waiting for job {0} on host {1} returned {2}'.
format(LinodeID, JobID, exc))
return False
if result[0]['HOST_SUCCESS'] == 1:
return True
time.sleep(interval)
if not quiet:
log.info('Still waiting on Job {0} for {1}'.format(JobID,
LinodeID))
else:
log.debug('Still waiting on Job {0} for {1}'.format(JobID,
LinodeID))
return False
开发者ID:dmyerscough,项目名称:salt,代码行数:27,代码来源:linode.py
示例17: test_mutually_exclusive_list_options
def test_mutually_exclusive_list_options(self):
test_options = ['--list-locations', '--list-images', '--list-sizes']
while True:
for idx in range(1, len(test_options)):
output = self.run_cloud(
'{0} ec2 {1} ec2'.format(
test_options[0], test_options[idx]
), catch_stderr=True
)
try:
self.assertIn(
'salt-cloud: error: The options {0}/{1} are mutually '
'exclusive. Please only choose one of them'.format(
test_options[0], test_options[idx]
),
output[1]
)
except AssertionError:
print(output)
raise
# Remove the first option from the list
test_options.pop(0)
if len(test_options) <= 1:
# Only one left? Stop iterating
break
开发者ID:DaveQB,项目名称:salt,代码行数:25,代码来源:cloud.py
示例18: find_room
def find_room(name, api_key=None):
'''
Find a room by name and return it.
:param name: The room name.
:param api_key: The Slack admin api key.
:return: The room object.
CLI Example:
.. code-block:: bash
salt '*' slack.find_room name="random"
salt '*' slack.find_room name="random" api_key=peWcBiMOS9HrZG15peWcBiMOS9HrZG15
'''
if not api_key:
api_key = _get_api_key()
# search results don't include the name of the
# channel with a hash, if the passed channel name
# has a hash we remove it.
if name.startswith('#'):
name = name[1:]
ret = list_rooms(api_key)
if ret['res']:
rooms = ret['message']
if rooms:
for room in range(0, len(rooms)):
if rooms[room]['name'] == name:
return rooms[room]
return False
开发者ID:bryson,项目名称:salt,代码行数:32,代码来源:slack_notify.py
示例19: find_user
def find_user(name, api_key=None):
'''
Find a user by name and return it.
:param name: The user name.
:param api_key: The Slack admin api key.
:return: The user object.
CLI Example:
.. code-block:: bash
salt '*' slack.find_user name="ThomasHatch"
salt '*' slack.find_user name="ThomasHatch" api_key=peWcBiMOS9HrZG15peWcBiMOS9HrZG15
'''
if not api_key:
api_key = _get_api_key()
ret = list_users(api_key)
if ret['res']:
users = ret['message']
if users:
for user in range(0, len(users)):
if users[user]['name'] == name:
return users[user]
return False
开发者ID:bryson,项目名称:salt,代码行数:27,代码来源:slack_notify.py
示例20: test_update_secret
def test_update_secret(self):
name = self.name
filename = "/tmp/{0}.json".format(name)
with open(filename, 'w') as f:
json.dump(self.request, f)
create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.1)
expected_data = {}
names = []
for i in range(3):
names.append("/tmp/{0}-{1}-updated".format(name, i))
with open("/tmp/{0}-{1}-updated".format(name, i), 'w') as f:
expected_data["{0}-{1}-updated".format(name, i)] = base64.b64encode("{0}{1}-updated".format(name, i))
f.write("{0}{1}-updated".format(name, i))
res = k8s.update_secret("default", name, names, apiserver_url="http://127.0.0.1:8080")
# if creation is failed, kubernetes return non json error message
proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
# if creation is failed, kubernetes return non json error message
b = kubectl_out.get("data", {})
self.assertTrue(isinstance(kubectl_out, dict))
self.assertEqual(expected_data, b)
开发者ID:bryson,项目名称:salt,代码行数:25,代码来源:k8s_test.py
注:本文中的salt.ext.six.moves.range函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论