本文整理汇总了Python中st2common.persistence.policy.PolicyType类的典型用法代码示例。如果您正苦于以下问题:Python PolicyType类的具体用法?Python PolicyType怎么用?Python PolicyType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了PolicyType类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: register_policy_types
def register_policy_types(module):
registered_count = 0
mod_path = os.path.dirname(os.path.realpath(sys.modules[module.__name__].__file__))
path = '%s/policies/meta' % mod_path
files = []
for ext in ALLOWED_EXTS:
exp = '%s/*%s' % (path, ext)
files += glob.glob(exp)
for f in files:
try:
LOG.debug('Loading policy type from "%s".', f)
content = loader.load_meta_file(f)
policy_type_api = PolicyTypeAPI(**content)
policy_type_db = PolicyTypeAPI.to_model(policy_type_api)
try:
existing_entry = PolicyType.get_by_ref(policy_type_db.ref)
if existing_entry:
policy_type_db.id = existing_entry.id
except ValueError:
LOG.debug('Policy type "%s" is not found. Creating new entry.',
policy_type_db.ref)
policy_type_db = PolicyType.add_or_update(policy_type_db)
extra = {'policy_type_db': policy_type_db}
LOG.audit('Policy type "%s" is updated.', policy_type_db.ref, extra=extra)
registered_count += 1
except:
LOG.exception('Unable to register policy type from "%s".', f)
return registered_count
开发者ID:emptywee,项目名称:st2,代码行数:34,代码来源:policiesregistrar.py
示例2: test_register_policy_types
def test_register_policy_types(self):
self.assertEqual(register_policy_types(st2tests), 2)
type1 = PolicyType.get_by_ref("action.concurrency")
self.assertEqual(type1.name, "concurrency")
self.assertEqual(type1.resource_type, "action")
type2 = PolicyType.get_by_ref("action.mock_policy_error")
self.assertEqual(type2.name, "mock_policy_error")
self.assertEqual(type2.resource_type, "action")
开发者ID:azamsheriff,项目名称:st2,代码行数:10,代码来源:test_policies.py
示例3: setUpClass
def setUpClass(cls):
super(PolicyControllerTest, cls).setUpClass()
for _, fixture in six.iteritems(FIXTURES['policytypes']):
instance = PolicyTypeAPI(**fixture)
PolicyType.add_or_update(PolicyTypeAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES['policies']):
instance = PolicyAPI(**fixture)
Policy.add_or_update(PolicyAPI.to_model(instance))
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:10,代码来源:test_policies.py
示例4: setUpClass
def setUpClass(cls):
super(SchedulingPolicyTest, cls).setUpClass()
# Register runners
runners_registrar.register_runners()
for _, fixture in six.iteritems(FIXTURES['actions']):
instance = ActionAPI(**fixture)
Action.add_or_update(ActionAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES['policytypes']):
instance = PolicyTypeAPI(**fixture)
PolicyType.add_or_update(PolicyTypeAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES['policies']):
instance = PolicyAPI(**fixture)
Policy.add_or_update(PolicyAPI.to_model(instance))
开发者ID:StackStorm,项目名称:st2,代码行数:17,代码来源:test_policies.py
示例5: setUpClass
def setUpClass(cls):
super(PolicyTest, cls).setUpClass()
for _, fixture in six.iteritems(FIXTURES['runners']):
instance = RunnerTypeAPI(**fixture)
RunnerType.add_or_update(RunnerTypeAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES['actions']):
instance = ActionAPI(**fixture)
Action.add_or_update(ActionAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES['policytypes']):
instance = PolicyTypeAPI(**fixture)
PolicyType.add_or_update(PolicyTypeAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES['policies']):
instance = PolicyAPI(**fixture)
Policy.add_or_update(PolicyAPI.to_model(instance))
开发者ID:hejin,项目名称:st2,代码行数:18,代码来源:test_policies.py
示例6: setUpClass
def setUpClass(cls):
super(PolicyTypeControllerTestCase, cls).setUpClass()
cls.policy_type_dbs = []
for _, fixture in six.iteritems(FIXTURES['policytypes']):
instance = PolicyTypeAPI(**fixture)
policy_type_db = PolicyType.add_or_update(PolicyTypeAPI.to_model(instance))
cls.policy_type_dbs.append(policy_type_db)
开发者ID:StackStorm,项目名称:st2,代码行数:9,代码来源:test_policies.py
示例7: test_get_by_ref
def test_get_by_ref(self):
policy_db = Policy.get_by_ref("wolfpack.action-1.concurrency")
self.assertIsNotNone(policy_db)
self.assertEqual(policy_db.pack, "wolfpack")
self.assertEqual(policy_db.name, "action-1.concurrency")
policy_type_db = PolicyType.get_by_ref(policy_db.policy_type)
self.assertIsNotNone(policy_type_db)
self.assertEqual(policy_type_db.resource_type, "action")
self.assertEqual(policy_type_db.name, "concurrency")
开发者ID:azamsheriff,项目名称:st2,代码行数:10,代码来源:test_policies.py
示例8: setUp
def setUp(self):
EventletTestCase.setUpClass()
DbTestCase.setUpClass()
for _, fixture in six.iteritems(FIXTURES["runners"]):
instance = RunnerTypeAPI(**fixture)
RunnerType.add_or_update(RunnerTypeAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES["actions"]):
instance = ActionAPI(**fixture)
Action.add_or_update(ActionAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES["policytypes"]):
instance = PolicyTypeAPI(**fixture)
PolicyType.add_or_update(PolicyTypeAPI.to_model(instance))
for _, fixture in six.iteritems(FIXTURES["policies"]):
instance = PolicyAPI(**fixture)
Policy.add_or_update(PolicyAPI.to_model(instance))
开发者ID:ipv1337,项目名称:st2,代码行数:19,代码来源:test_concurrency_by_attr.py
示例9: validate
def validate(self):
# Validate policy itself
cleaned = super(PolicyAPI, self).validate()
# Validate policy parameters
policy_type_db = PolicyType.get_by_ref(cleaned.policy_type)
if not policy_type_db:
raise ValueError('Referenced policy_type "%s" doesnt exist' % (cleaned.policy_type))
parameters_schema = policy_type_db.parameters
parameters = getattr(cleaned, 'parameters', {})
schema = util_schema.get_schema_for_resource_parameters(
parameters_schema=parameters_schema)
validator = util_schema.get_validator()
cleaned_parameters = util_schema.validate(parameters, schema, validator, use_default=True,
allow_default_none=True)
cleaned.parameters = cleaned_parameters
return cleaned
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:20,代码来源:policy.py
注:本文中的st2common.persistence.policy.PolicyType类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论