本文整理汇总了Python中salt.utils.dictupdate.update函数的典型用法代码示例。如果您正苦于以下问题:Python update函数的具体用法?Python update怎么用?Python update使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了update函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: ext_pillar
def ext_pillar(self, pillar):
'''
Render the external pillar data
'''
if not 'ext_pillar' in self.opts:
return {}
if not isinstance(self.opts['ext_pillar'], list):
log.critical('The "ext_pillar" option is malformed')
return {}
for run in self.opts['ext_pillar']:
if not isinstance(run, dict):
log.critical('The "ext_pillar" option is malformed')
return {}
for key, val in run.items():
if key not in self.ext_pillars:
err = ('Specified ext_pillar interface {0} is '
'unavailable').format(key)
log.critical(err)
continue
try:
if isinstance(val, dict):
ext = self.ext_pillars[key](pillar, **val)
elif isinstance(val, list):
ext = self.ext_pillars[key](pillar, *val)
else:
ext = self.ext_pillars[key](pillar, val)
update(pillar, ext)
except Exception as exc:
log.exception('Failed to load ext_pillar {0}: {1}'.format(key, exc))
return pillar
开发者ID:gspradeep,项目名称:salt,代码行数:30,代码来源:__init__.py
示例2: render_pillar
def render_pillar(self, matches):
'''
Extract the sls pillar files from the matches and render them into the
pillar
'''
pillar = {}
errors = []
for saltenv, pstates in matches.items():
mods = set()
for sls in pstates:
pstate, mods, err = self.render_pstate(sls, saltenv, mods)
if err:
errors += err
if pstate is not None:
if not isinstance(pstate, dict):
log.error(
'The rendered pillar sls file, {0!r} state did '
'not return the expected data format. This is '
'a sign of a malformed pillar sls file. Returned '
'errors: {1}'.format(
sls,
', '.join(['{0!r}'.format(e) for e in errors])
)
)
continue
update(pillar, pstate)
return pillar, errors
开发者ID:AccelerationNet,项目名称:salt,代码行数:30,代码来源:__init__.py
示例3: absent
def absent(
name,
region=None,
key=None,
keyid=None,
profile=None):
'''
Ensure the IAM role is deleted.
name
Name of the IAM role.
region
Region to connect to.
key
Secret key to be used.
keyid
Access key to be used.
profile
A dict with region, key and keyid, or a pillar key (string)
that contains a dict with region, key and keyid.
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
_ret = _policies_absent(name, region, key, keyid, profile)
ret['changes'] = _ret['changes']
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
if ret['result'] is False:
return ret
_ret = _policies_detached(name, region, key, keyid, profile)
ret['changes'] = _ret['changes']
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
if ret['result'] is False:
return ret
_ret = _instance_profile_disassociated(name, region, key, keyid, profile)
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
if ret['result'] is False:
return ret
_ret = _instance_profile_absent(name, region, key, keyid, profile)
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
if ret['result'] is False:
return ret
_ret = _role_absent(name, region, key, keyid, profile)
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
return ret
开发者ID:bryson,项目名称:salt,代码行数:60,代码来源:boto_iam_role.py
示例4: ext_pillar
def ext_pillar(self, pillar):
'''
Render the external pillar data
'''
if not 'ext_pillar' in self.opts:
return {}
if not isinstance(self.opts['ext_pillar'], list):
log.critical('The "ext_pillar" option is malformed')
return {}
for run in self.opts['ext_pillar']:
if not isinstance(run, dict):
log.critical('The "ext_pillar" option is malformed')
return {}
for key, val in run.items():
if key not in self.ext_pillars:
err = ('Specified ext_pillar interface {0} is '
'unavailable').format(key)
log.critical(err)
continue
try:
try:
# try the new interface, which includes the minion ID
# as first argument
if isinstance(val, dict):
ext = self.ext_pillars[key](self.opts['id'], pillar, **val)
elif isinstance(val, list):
ext = self.ext_pillars[key](self.opts['id'], pillar, *val)
else:
ext = self.ext_pillars[key](self.opts['id'], pillar, val)
update(pillar, ext)
except TypeError as e:
if e.message.startswith('ext_pillar() takes exactly '):
log.warning('Deprecation warning: ext_pillar "{0}"'
' needs to accept minion_id as first'
' argument'.format(key))
else:
raise
if isinstance(val, dict):
ext = self.ext_pillars[key](pillar, **val)
elif isinstance(val, list):
ext = self.ext_pillars[key](pillar, *val)
else:
ext = self.ext_pillars[key](pillar, val)
update(pillar, ext)
except Exception as exc:
log.exception(
'Failed to load ext_pillar {0}: {1}'.format(
key,
exc
)
)
return pillar
开发者ID:AccelerationNet,项目名称:salt,代码行数:55,代码来源:__init__.py
示例5: _alarms_present
def _alarms_present(name, alarms, alarms_from_pillar, region, key, keyid, profile):
'''helper method for present. ensure that cloudwatch_alarms are set'''
# load data from alarms_from_pillar
tmp = __salt__['config.option'](alarms_from_pillar, {})
# merge with data from alarms
if alarms:
tmp = dictupdate.update(tmp, alarms)
# set alarms, using boto_cloudwatch_alarm.present
merged_return_value = {'name': name, 'result': True, 'comment': '', 'changes': {}}
for _, info in tmp.items():
# add elb to name and description
info["name"] = name + " " + info["name"]
info["attributes"]["description"] = name + " " + info["attributes"]["description"]
# add dimension attribute
info["attributes"]["dimensions"] = {"LoadBalancerName": [name]}
# set alarm
kwargs = {
"name": info["name"],
"attributes": info["attributes"],
"region": region,
"key": key,
"keyid": keyid,
"profile": profile,
}
ret = __salt__["state.single"]('boto_cloudwatch_alarm.present', **kwargs)
results = ret.values()[0]
if not results["result"]:
merged_return_value["result"] = results["result"]
if results.get("changes", {}) != {}:
merged_return_value["changes"][info["name"]] = results["changes"]
if "comment" in results:
merged_return_value["comment"] += results["comment"]
return merged_return_value
开发者ID:qzchenwl,项目名称:saltstack-verify,代码行数:33,代码来源:boto_elb.py
示例6: present
def present(
name,
description,
vpc_id=None,
rules=None,
rules_egress=None,
region=None,
key=None,
keyid=None,
profile=None):
'''
Ensure the security group exists with the specified rules.
name
Name of the security group.
description
A description of this security group.
vpc_id
The ID of the VPC to create the security group in, if any.
rules
A list of ingress rule dicts.
rules_egress
A list of egress rule dicts.
region
Region to connect to.
key
Secret key to be used.
keyid
Access key to be used.
profile
A dict with region, key and keyid, or a pillar key (string)
that contains a dict with region, key and keyid.
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
_ret = _security_group_present(name, description, vpc_id, region, key,
keyid, profile)
ret['changes'] = _ret['changes']
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
if ret['result'] is False:
return ret
if not rules:
rules = []
if not rules_egress:
rules_egress = []
_ret = _rules_present(name, rules, rules_egress, vpc_id, region, key, keyid, profile)
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
return ret
开发者ID:DaveQB,项目名称:salt,代码行数:60,代码来源:boto_secgroup.py
示例7: _alarms_present
def _alarms_present(name, alarms, alarms_from_pillar, region, key, keyid, profile):
'''helper method for present. ensure that cloudwatch_alarms are set'''
# load data from alarms_from_pillar
tmp = __salt__['config.option'](alarms_from_pillar, {})
# merge with data from alarms
if alarms:
tmp = dictupdate.update(tmp, alarms)
# set alarms, using boto_cloudwatch_alarm.present
merged_return_value = {'name': name, 'result': True, 'comment': '', 'changes': {}}
for _, info in six.iteritems(tmp):
# add asg to name and description
info['name'] = name + ' ' + info['name']
info['attributes']['description'] = name + ' ' + info['attributes']['description']
# add dimension attribute
info['attributes']['dimensions'] = {'AutoScalingGroupName': [name]}
# set alarm
kwargs = {
'name': info['name'],
'attributes': info['attributes'],
'region': region,
'key': key,
'keyid': keyid,
'profile': profile,
}
ret = __salt__['state.single']('boto_cloudwatch_alarm.present', **kwargs)
results = next(six.itervalues(ret))
if not results['result']:
merged_return_value['result'] = False
if results.get('changes', {}) != {}:
merged_return_value['changes'][info['name']] = results['changes']
if 'comment' in results:
merged_return_value['comment'] += results['comment']
return merged_return_value
开发者ID:DaveQB,项目名称:salt,代码行数:33,代码来源:boto_asg.py
示例8: _determine_scheduled_actions
def _determine_scheduled_actions(scheduled_actions, scheduled_actions_from_pillar):
'''
helper method for present, ensure scheduled actions are setup
'''
tmp = copy.deepcopy(
__salt__['config.option'](scheduled_actions_from_pillar, {})
)
# merge with data from state
if scheduled_actions:
tmp = dictupdate.update(tmp, scheduled_actions)
return tmp
开发者ID:bryson,项目名称:salt,代码行数:11,代码来源:boto_asg.py
示例9: _alarms_present
def _alarms_present(name, alarms, alarms_from_pillar,
write_capacity_units, read_capacity_units,
region, key, keyid, profile):
'''helper method for present. ensure that cloudwatch_alarms are set'''
# load data from alarms_from_pillar
tmp = copy.deepcopy(
__salt__['config.option'](alarms_from_pillar, {})
)
# merge with data from alarms
if alarms:
tmp = dictupdate.update(tmp, alarms)
# set alarms, using boto_cloudwatch_alarm.present
merged_return_value = {'name': name, 'result': True, 'comment': '', 'changes': {}}
for _, info in six.iteritems(tmp):
# add dynamodb table to name and description
info["name"] = name + " " + info["name"]
info["attributes"]["description"] = name + " " + info["attributes"]["description"]
# add dimension attribute
info["attributes"]["dimensions"] = {"TableName": [name]}
if info["attributes"]["metric"] == "ConsumedWriteCapacityUnits" \
and "threshold" not in info["attributes"]:
info["attributes"]["threshold"] = math.ceil(write_capacity_units * info["attributes"]["threshold_percent"])
del info["attributes"]["threshold_percent"]
# the write_capacity_units is given in unit / second. So we need
# to multiply by the period to get the proper threshold.
# http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/MonitoringDynamoDB.html
info["attributes"]["threshold"] *= info["attributes"]["period"]
if info["attributes"]["metric"] == "ConsumedReadCapacityUnits" \
and "threshold" not in info["attributes"]:
info["attributes"]["threshold"] = math.ceil(read_capacity_units * info["attributes"]["threshold_percent"])
del info["attributes"]["threshold_percent"]
# the read_capacity_units is given in unit / second. So we need
# to multiply by the period to get the proper threshold.
# http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/MonitoringDynamoDB.html
info["attributes"]["threshold"] *= info["attributes"]["period"]
# set alarm
kwargs = {
"name": info["name"],
"attributes": info["attributes"],
"region": region,
"key": key,
"keyid": keyid,
"profile": profile,
}
results = __states__['boto_cloudwatch_alarm.present'](**kwargs)
if not results["result"]:
merged_return_value["result"] = results["result"]
if results.get("changes", {}) != {}:
merged_return_value["changes"][info["name"]] = results["changes"]
if "comment" in results:
merged_return_value["comment"] += results["comment"]
return merged_return_value
开发者ID:bryson,项目名称:salt,代码行数:52,代码来源:boto_dynamodb.py
示例10: merge
def merge(dest, upd):
'''
defaults.merge
Allows deep merging of dicts in formulas.
CLI Example:
.. code-block:: bash
salt '*' default.merge a=b d=e
It is more typical to use this in a templating language in formulas,
instead of directly on the command-line.
'''
return dictupdate.update(dest, upd)
开发者ID:HowardMei,项目名称:saltstack,代码行数:14,代码来源:defaults.py
示例11: _alarms_present
def _alarms_present(name, min_size_equals_max_size, alarms, alarms_from_pillar, region, key, keyid, profile):
'''
helper method for present. ensure that cloudwatch_alarms are set
'''
# load data from alarms_from_pillar
tmp = copy.deepcopy(__salt__['config.option'](alarms_from_pillar, {}))
# merge with data from alarms
if alarms:
tmp = dictupdate.update(tmp, alarms)
# set alarms, using boto_cloudwatch_alarm.present
merged_return_value = {'name': name, 'result': True, 'comment': '', 'changes': {}}
for _, info in six.iteritems(tmp):
# add asg to name and description
info['name'] = name + ' ' + info['name']
info['attributes']['description'] = name + ' ' + info['attributes']['description']
# add dimension attribute
if 'dimensions' not in info['attributes']:
info['attributes']['dimensions'] = {'AutoScalingGroupName': [name]}
scaling_policy_actions_only = True
# replace ":self:" with our name
for action_type in ['alarm_actions', 'insufficient_data_actions', 'ok_actions']:
if action_type in info['attributes']:
new_actions = []
for action in info['attributes'][action_type]:
if 'scaling_policy' not in action:
scaling_policy_actions_only = False
if ':self:' in action:
action = action.replace(':self:', ':{0}:'.format(name))
new_actions.append(action)
info['attributes'][action_type] = new_actions
# skip alarms that only have actions for scaling policy, if min_size == max_size for this ASG
if scaling_policy_actions_only and min_size_equals_max_size:
continue
# set alarm
kwargs = {
'name': info['name'],
'attributes': info['attributes'],
'region': region,
'key': key,
'keyid': keyid,
'profile': profile,
}
results = __states__['boto_cloudwatch_alarm.present'](**kwargs)
if not results['result']:
merged_return_value['result'] = False
if results.get('changes', {}) != {}:
merged_return_value['changes'][info['name']] = results['changes']
if 'comment' in results:
merged_return_value['comment'] += results['comment']
return merged_return_value
开发者ID:bryson,项目名称:salt,代码行数:50,代码来源:boto_asg.py
示例12: test_update
def test_update(self):
# level 1 value changes
mdict = copy.deepcopy(self.dict1)
mdict['A'] = 'Z'
res = dictupdate.update(copy.deepcopy(self.dict1), {'A': 'Z'})
self.assertEqual(res, mdict)
# level 2 value changes
mdict = copy.deepcopy(self.dict1)
mdict['C']['D'] = 'Z'
res = dictupdate.update(copy.deepcopy(self.dict1), {'C': {'D': 'Z'}})
self.assertEqual(res, mdict)
# level 3 value changes
mdict = copy.deepcopy(self.dict1)
mdict['C']['F']['G'] = 'Z'
res = dictupdate.update(
copy.deepcopy(self.dict1),
{'C': {'F': {'G': 'Z'}}}
)
self.assertEqual(res, mdict)
# replace a sub-dictionary
mdict = copy.deepcopy(self.dict1)
mdict['C'] = 'Z'
res = dictupdate.update(copy.deepcopy(self.dict1), {'C': 'Z'})
self.assertEqual(res, mdict)
# add a new scalar value
mdict = copy.deepcopy(self.dict1)
mdict['Z'] = 'Y'
res = dictupdate.update(copy.deepcopy(self.dict1), {'Z': 'Y'})
self.assertEqual(res, mdict)
# add a dictionary
mdict = copy.deepcopy(self.dict1)
mdict['Z'] = {'Y': 'X'}
res = dictupdate.update(copy.deepcopy(self.dict1), {'Z': {'Y': 'X'}})
self.assertEqual(res, mdict)
# add a nested dictionary
mdict = copy.deepcopy(self.dict1)
mdict['Z'] = {'Y': {'X': 'W'}}
res = dictupdate.update(
copy.deepcopy(self.dict1),
{'Z': {'Y': {'X': 'W'}}}
)
self.assertEqual(res, mdict)
开发者ID:DaveQB,项目名称:salt,代码行数:49,代码来源:dictupdate_test.py
示例13: function_present
#.........这里部分代码省略.........
'''
ret = {'name': FunctionName,
'result': True,
'comment': '',
'changes': {}
}
if Permissions is not None:
if isinstance(Permissions, string_types):
Permissions = json.loads(Permissions)
required_keys = set(('Action', 'Principal'))
optional_keys = set(('SourceArn', 'SourceAccount'))
for sid, permission in Permissions.iteritems():
keyset = set(permission.keys())
if not keyset.issuperset(required_keys):
raise SaltInvocationError('{0} are required for each permission '
'specification'.format(', '.join(required_keys)))
keyset = keyset - required_keys
keyset = keyset - optional_keys
if bool(keyset):
raise SaltInvocationError('Invalid permission value {0}'.format(', '.join(keyset)))
r = __salt__['boto_lambda.function_exists'](FunctionName=FunctionName, region=region,
key=key, keyid=keyid, profile=profile)
if 'error' in r:
ret['result'] = False
ret['comment'] = 'Failed to create function: {0}.'.format(r['error']['message'])
return ret
if not r.get('exists'):
if __opts__['test']:
ret['comment'] = 'Function {0} is set to be created.'.format(FunctionName)
ret['result'] = None
return ret
r = __salt__['boto_lambda.create_function'](FunctionName=FunctionName, Runtime=Runtime,
Role=Role, Handler=Handler,
ZipFile=ZipFile, S3Bucket=S3Bucket,
S3Key=S3Key,
S3ObjectVersion=S3ObjectVersion,
Description=Description,
Timeout=Timeout, MemorySize=MemorySize,
WaitForRole=True,
RoleRetries=RoleRetries,
region=region, key=key,
keyid=keyid, profile=profile)
if not r.get('created'):
ret['result'] = False
ret['comment'] = 'Failed to create function: {0}.'.format(r['error']['message'])
return ret
if Permissions:
for sid, permission in Permissions.iteritems():
r = __salt__['boto_lambda.add_permission'](FunctionName=FunctionName,
StatementId=sid,
**permission)
if not r.get('updated'):
ret['result'] = False
ret['comment'] = 'Failed to create function: {0}.'.format(r['error']['message'])
_describe = __salt__['boto_lambda.describe_function'](FunctionName,
region=region, key=key, keyid=keyid, profile=profile)
_describe['function']['Permissions'] = __salt__['boto_lambda.get_permissions'](FunctionName,
region=region, key=key, keyid=keyid, profile=profile)['permissions']
ret['changes']['old'] = {'function': None}
ret['changes']['new'] = _describe
ret['comment'] = 'Function {0} created.'.format(FunctionName)
return ret
ret['comment'] = os.linesep.join([ret['comment'], 'Function {0} is present.'.format(FunctionName)])
ret['changes'] = {}
# function exists, ensure config matches
_ret = _function_config_present(FunctionName, Role, Handler, Description, Timeout,
MemorySize, region, key, keyid, profile)
if not _ret.get('result'):
ret['result'] = False
ret['comment'] = _ret['comment']
ret['changes'] = {}
return ret
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
_ret = _function_code_present(FunctionName, ZipFile, S3Bucket, S3Key, S3ObjectVersion,
region, key, keyid, profile)
if not _ret.get('result'):
ret['result'] = False
ret['comment'] = _ret['comment']
ret['changes'] = {}
return ret
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
_ret = _function_permissions_present(FunctionName, Permissions,
region, key, keyid, profile)
if not _ret.get('result'):
ret['result'] = False
ret['comment'] = _ret['comment']
ret['changes'] = {}
return ret
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
return ret
开发者ID:HowardMei,项目名称:saltstack,代码行数:101,代码来源:boto_lambda.py
示例14: present
def present(
name,
policy_document=None,
policy_document_from_pillars=None,
path=None,
policies=None,
policies_from_pillars=None,
managed_policies=None,
create_instance_profile=True,
region=None,
key=None,
keyid=None,
profile=None,
delete_policies=True):
'''
Ensure the IAM role exists.
name
Name of the IAM role.
policy_document
The policy that grants an entity permission to assume the role. (See https://boto.readthedocs.io/en/latest/ref/iam.html#boto.iam.connection.IAMConnection.create_role)
policy_document_from_pillars
A pillar key that contains a role policy document. The statements
defined here will be appended with the policy document statements
defined in the policy_document argument.
.. versionadded:: Nitrogen
path
The path to the role/instance profile. (See https://boto.readthedocs.io/en/latest/ref/iam.html#boto.iam.connection.IAMConnection.create_role)
policies
A dict of IAM role policies.
policies_from_pillars
A list of pillars that contain role policy dicts. Policies in the
pillars will be merged in the order defined in the list and key
conflicts will be handled by later defined keys overriding earlier
defined keys. The policies defined here will be merged with the
policies defined in the policies argument. If keys conflict, the keys
in the policies argument will override the keys defined in
policies_from_pillars.
managed_policies
A list of (AWS or Customer) managed policies to be attached to the role.
create_instance_profile
A boolean of whether or not to create an instance profile and associate
it with this role.
region
Region to connect to.
key
Secret key to be used.
keyid
Access key to be used.
profile
A dict with region, key and keyid, or a pillar key (string)
that contains a dict with region, key and keyid.
delete_policies
Deletes existing policies that are not in the given list of policies. Default
value is ``True``. If ``False`` is specified, existing policies will not be deleted
allowing manual modifications on the IAM role to be persistent.
.. versionadded:: 2015.8.0
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
# Build up _policy_document
_policy_document = {}
if policy_document_from_pillars:
from_pillars = __salt__['pillar.get'](policy_document_from_pillars)
if from_pillars:
_policy_document['Version'] = from_pillars['Version']
_policy_document.setdefault('Statement', [])
_policy_document['Statement'].extend(from_pillars['Statement'])
if policy_document:
_policy_document['Version'] = policy_document['Version']
_policy_document.setdefault('Statement', [])
_policy_document['Statement'].extend(policy_document['Statement'])
_ret = _role_present(name, _policy_document, path, region, key, keyid,
profile)
# Build up _policies
if not policies:
policies = {}
if not policies_from_pillars:
policies_from_pillars = []
if not managed_policies:
managed_policies = []
_policies = {}
for policy in policies_from_pillars:
_policy = __salt__['pillar.get'](policy)
_policies.update(_policy)
_policies.update(policies)
#.........这里部分代码省略.........
开发者ID:bryson,项目名称:salt,代码行数:101,代码来源:boto_iam_role.py
示例15: test_filter_by
#.........这里部分代码省略.........
res = grainsmod.filter_by(dict1, grain='xxx', default='Z')
self.assertIs(res, None)
#test giving a list as merge argument raise exception
self.assertRaises(
SaltException,
grainsmod.filter_by,
dict1,
'xxx',
['foo'],
'C'
)
#Now, re-test with an existing grain (os_family), but with no match.
res = grainsmod.filter_by(dict1)
self.assertIs(res, None)
res = grainsmod.filter_by(dict1, default='C')
self.assertEqual(res, {'D': {'E': 'F', 'G': 'H'}})
res = grainsmod.filter_by(dict1, merge=mdict1, default='C')
self.assertEqual(res, {'D': {'E': 'I', 'G': 'H'}, 'J': 'K'})
# dict1 was altered, reestablish
dict1 = {'A': 'B', 'C': {'D': {'E': 'F', 'G': 'H'}}}
res = grainsmod.filter_by(dict1, merge=mdict1, default='Z')
self.assertEqual(res, mdict1)
res = grainsmod.filter_by(dict1, default='Z')
self.assertIs(res, None)
# this one is in fact a traceback in updatedict, merging a string with a dictionary
self.assertRaises(
TypeError,
grainsmod.filter_by,
dict1,
merge=mdict1,
default='A'
)
#Now, re-test with a matching grain.
dict1 = {'A': 'B', 'MockedOS': {'D': {'E': 'F', 'G': 'H'}}}
res = grainsmod.filter_by(dict1)
self.assertEqual(res, {'D': {'E': 'F', 'G': 'H'}})
res = grainsmod.filter_by(dict1, default='A')
self.assertEqual(res, {'D': {'E': 'F', 'G': 'H'}})
res = grainsmod.filter_by(dict1, merge=mdict1, default='A')
self.assertEqual(res, {'D': {'E': 'I', 'G': 'H'}, 'J': 'K'})
# dict1 was altered, reestablish
dict1 = {'A': 'B', 'MockedOS': {'D': {'E': 'F', 'G': 'H'}}}
res = grainsmod.filter_by(dict1, merge=mdict1, default='Z')
self.assertEqual(res, {'D': {'E': 'I', 'G': 'H'}, 'J': 'K'})
# dict1 was altered, reestablish
dict1 = {'A': 'B', 'MockedOS': {'D': {'E': 'F', 'G': 'H'}}}
res = grainsmod.filter_by(dict1, default='Z')
self.assertEqual(res, {'D': {'E': 'F', 'G': 'H'}})
# Base tests
# NOTE: these may fail to detect errors if dictupdate.update() is broken
# but then the unit test for dictupdate.update() should fail and expose
# that. The purpose of these tests is it validate the logic of how
# in filter_by() processes its arguments.
# Test with just the base
res = grainsmod.filter_by(dict2, grain='xxx', default='xxx', base='default')
self.assertEqual(res, dict2['default'])
# Test the base with the OS grain look-up
res = grainsmod.filter_by(dict2, default='xxx', base='default')
self.assertEqual(
res,
dictupdate.update(copy.deepcopy(dict2['default']), dict2['MockedOS'])
)
# Test the base with default
res = grainsmod.filter_by(dict2, grain='xxx', base='default')
self.assertEqual(res, dict2['default'])
res = grainsmod.filter_by(dict2, grain='1', base='default')
self.assertEqual(
res,
dictupdate.update(copy.deepcopy(dict2['default']), dict2['1'])
)
res = grainsmod.filter_by(dict2, base='default', merge=mdict2)
self.assertEqual(
res,
dictupdate.update(
dictupdate.update(
copy.deepcopy(dict2['default']),
dict2['MockedOS']),
mdict2
)
)
res = grainsmod.filter_by(dict2, base='default', merge=mdict3)
self.assertEqual(
res,
dictupdate.update(
dictupdate.update(
copy.deepcopy(dict2['default']),
dict2['MockedOS']),
mdict3
)
)
开发者ID:DaveQB,项目名称:salt,代码行数:101,代码来源:grains_test.py
示例16: group_present
def group_present(name, policies=None, policies_from_pillars=None, users=None, region=None, key=None, keyid=None, profile=None):
'''
Ensure the IAM group is present
name (string)
The name of the new group.
policies (dict)
A dict of IAM group policy documents.
policies_from_pillars (list)
A list of pillars that contain role policy dicts. Policies in the
pillars will be merged in the order defined in the list and key
conflicts will be handled by later defined keys overriding earlier
defined keys. The policies defined here will be merged with the
policies defined in the policies argument. If keys conflict, the keys
in the policies argument will override the keys defined in
policies_from_pillars.
users (list)
A list of users to be added to the group.
region (string)
Region to connect to.
key (string)
Secret key to be used.
keyid (string)
Access key to be used.
profile (dict)
A dict with region, key and keyid, or a pillar key (string) that
contains a dict with region, key and keyid.
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
if not policies:
policies = {}
if not policies_from_pillars:
policies_from_pillars = []
_policies = {}
for policy in policies_from_pillars:
_policy = __salt__['pillar.get'](policy)
_policies.update(_policy)
_policies.update(policies)
exists = __salt__['boto_iam.get_group'](group_name=name, region=region, key=key, keyid=keyid, profile=profile)
if not exists:
if __opts__['test']:
ret['comment'] = 'IAM group {0} is set to be created.'.format(name)
ret['result'] = None
return ret
created = __salt__['boto_iam.create_group'](group_name=name, region=region, key=key, keyid=keyid, profile=profile)
if not created:
ret['comment'] = 'Failed to create IAM group {0}.'.format(name)
ret['result'] = False
return ret
ret['changes']['group'] = created
ret['comment'] = os.linesep.join([ret['comment'], 'Group {0} has been created.'.format(name)])
else:
ret['comment'] = os.linesep.join([ret['comment'], 'Group {0} is present.'.format(name)])
# Group exists, ensure group policies and users are set.
_ret = _group_policies_present(
name, _policies, region, key, keyid, profile
)
ret['changes'] = dictupdate.update(ret['changes'], _ret['changes'])
ret['comment'] = ' '.join([ret['comment'], _ret['comment']])
if not _ret['result']:
ret['result'] = _ret['result']
return ret
if users:
log.debug('Users are : {0}.'.format(users))
group_result = __salt__['boto_iam.get_group'](group_name=name, region=region, key=key, keyid=keyid, profile=profile)
ret = _case_group(ret, users, name, group_result, region, key, keyid, profile)
return ret
开发者ID:DaveQB,项目名称:salt,代码行数:74,代码来源:boto_iam.py
示例17: present
def present(name=None,
table_name=None,
region=None,
key=None,
keyid=None,
profile=None,
read_capacity_units=None,
write_capacity_units=None,
alarms=None,
alarms_from_pillar="boto_dynamodb_alarms",
hash_key=None,
hash_key_data_type=None,
range_key=None,
range_key_data_type=None,
local_indexes=None,
global_indexes=None,
backup_configs_from_pillars='boto_dynamodb_backup_configs'):
'''
Ensure the DynamoDB table exists. Table throughput can be updated after
table creation.
Global secondary indexes (GSIs) are managed with some exceptions:
* If a GSI deletion is detected, a failure will occur (deletes should be
done manually in the AWS console).
* If multiple GSIs are added in a single Salt call, a failure will occur
(boto supports one creation at a time). Note that this only applies after
table creation; multiple GSIs can be created during table creation.
* Updates to existing GSIs are limited to read/write capacity only
(DynamoDB limitation).
name
Name of the DynamoDB table
table_name
Name of the DynamoDB table (deprecated)
region
Region to connect to.
key
Secret key to be used.
keyid
Access key to be used.
profile
A dict with region, key and keyid, or a pillar key (string)
that contains a dict with region, key and keyid.
read_capacity_units
The read throughput for this table
write_capacity_units
The write throughput for this table
hash_key
The name of the attribute that will be used as the hash key
for this table
hash_key_data_type
The DynamoDB datatype of the hash key
range_key
The name of the attribute that will be used as the range key
for this table
range_key_data_type
The DynamoDB datatype of the range key
local_indexes
The local indexes you would like to create
global_indexes
The global indexes you would like to create
backup_configs_from_pillars
Pillars to use to configure DataPipeline backups
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
if table_name:
ret['warnings'] = ['boto_dynamodb.present: `table_name` is deprecated.'
' Please use `name` instead.']
ret['name'] = table_name
name = table_name
comments = []
changes_old = {}
changes_new = {}
# Ensure DynamoDB table exists
table_exists = __salt__['boto_dynamodb.exists'](
name,
region,
key,
keyid,
profile
)
if not table_exists:
if __opts__['test']:
ret['result'] = None
#.........这里部分代码省略.........
开发者ID:bryson,项目名称:salt,代码行数:101,代码来源:boto_dynamodb.py
示例18: present
def present(
name,
description,
vpc_id=None,
vpc_name=None,
rules=None,
rules_egress=None,
region=None,
key=None,
keyid=None,
profile=None,
tags=None):
'''
Ensure the security group exists with the specified rules.
name
Name of the security group.
description
A description of this security group.
vpc_id
The ID of the VPC to create the security group in, if any. Exclusive with vpc_name.
vpc_name
The name of the VPC to create the security group in, if any. Exlusive with vpc_id.
.. versionadded:: Boron
|
请发表评论