本文整理汇总了Python中st2common.content.loader.MetaLoader类的典型用法代码示例。如果您正苦于以下问题:Python MetaLoader类的具体用法?Python MetaLoader怎么用?Python MetaLoader使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MetaLoader类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_action_alias_db_by_name
def _get_action_alias_db_by_name(self, name):
"""
Retrieve ActionAlias DB object for the provided alias name.
"""
base_pack_path = self._get_base_pack_path()
pack_yaml_path = os.path.join(base_pack_path, MANIFEST_FILE_NAME)
if os.path.isfile(pack_yaml_path):
# 1. 1st try to infer pack name from pack metadata file
meta_loader = MetaLoader()
pack_metadata = meta_loader.load(pack_yaml_path)
pack = get_pack_ref_from_metadata(metadata=pack_metadata)
else:
# 2. If pack.yaml is not available, fail back to directory name
# Note: For this to work, directory name needs to match pack name
_, pack = os.path.split(base_pack_path)
pack_loader = ContentPackLoader()
registrar = AliasesRegistrar(use_pack_cache=False)
aliases_path = pack_loader.get_content_from_pack(pack_dir=base_pack_path,
content_type='aliases')
aliases = registrar._get_aliases_from_pack(aliases_dir=aliases_path)
for alias_path in aliases:
action_alias_db = registrar._get_action_alias_db(pack=pack,
action_alias=alias_path,
ignore_metadata_file_error=True)
if action_alias_db.name == name:
return action_alias_db
raise ValueError('Alias with name "%s" not found' % (name))
开发者ID:nzlosh,项目名称:st2,代码行数:32,代码来源:action_aliases.py
示例2: register_runners
def register_runners(runner_dir=None, experimental=False, fail_on_failure=True):
""" Register runners
"""
LOG.debug('Start : register runners')
runner_count = 0
runner_loader = RunnersLoader()
if runner_dir:
assert isinstance(runner_dir, list)
if not runner_dir:
runner_dirs = content_utils.get_runners_base_paths()
runners = runner_loader.get_runners(runner_dirs)
for runner, path in runners.iteritems():
LOG.info('Runner "%s"' % (runner))
runner_manifest = os.path.join(path, MANIFEST_FILE_NAME)
meta_loader = MetaLoader()
runner_types = meta_loader.load(runner_manifest)
for runner_type in runner_types:
runner_count += register_runner(runner_type, experimental)
LOG.debug('End : register runners')
return runner_count
开发者ID:punalpatel,项目名称:st2,代码行数:26,代码来源:runnersregistrar.py
示例3: RuleTester
class RuleTester(object):
def __init__(self, rule_file_path, trigger_instance_file_path):
"""
:param rule_file_path: Path to the file containing rule definition.
:type rule_file_path: ``str``
:param trigger_instance_file_path: Path to the file containg trigger instance definition.
:type trigger_instance_file_path: ``str``
"""
self._rule_file_path = rule_file_path
self._trigger_instance_file_path = trigger_instance_file_path
self._meta_loader = MetaLoader()
def evaluate(self):
"""
Evaluate trigger instance against the rule.
:return: ``True`` if the rule matches, ``False`` otherwise.
:rtype: ``boolean``
"""
rule_db = self._get_rule_db_from_file(file_path=self._rule_file_path)
trigger_instance_db = \
self._get_trigger_instance_db_from_file(file_path=self._trigger_instance_file_path)
trigger_ref = ResourceReference.from_string_reference(trigger_instance_db['trigger'])
trigger_db = TriggerDB()
trigger_db.pack = trigger_ref.pack
trigger_db.name = trigger_ref.name
trigger_db.type = trigger_ref.ref
matcher = RulesMatcher(trigger_instance=trigger_instance_db, trigger=trigger_db,
rules=[rule_db])
matching_rules = matcher.get_matching_rules()
return len(matching_rules) >= 1
def _get_rule_db_from_file(self, file_path):
data = self._meta_loader.load(file_path=file_path)
rule_db = RuleDB()
rule_db.trigger = data['trigger']['type']
rule_db.criteria = data.get('criteria', None)
rule_db.action = {}
rule_db.enabled = True
return rule_db
def _get_trigger_instance_db_from_file(self, file_path):
data = self._meta_loader.load(file_path=file_path)
instance = TriggerInstanceDB(**data)
return instance
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:49,代码来源:tester.py
示例4: __init__
def __init__(self, runner_id):
super(ActionChainRunner, self).__init__(runner_id=runner_id)
self.chain_holder = None
self._meta_loader = MetaLoader()
self._skip_notify_tasks = []
self._display_published = True
self._chain_notify = None
开发者ID:nzlosh,项目名称:st2,代码行数:7,代码来源:action_chain_runner.py
示例5: __init__
def __init__(self, runner_id):
super(ActionChainRunner, self).__init__(runner_id=runner_id)
self.chain_holder = None
self._meta_loader = MetaLoader()
self._stopped = False
self._skip_notify_tasks = []
self._chain_notify = None
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:7,代码来源:actionchainrunner.py
示例6: __init__
def __init__(self):
base_path = cfg.CONF.system.base_path
rbac_definitions_path = os.path.join(base_path, 'rbac/')
self._role_definitions_path = os.path.join(rbac_definitions_path, 'roles/')
self._role_assignments_path = os.path.join(rbac_definitions_path, 'assignments/')
self._meta_loader = MetaLoader()
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:7,代码来源:loader.py
示例7: __init__
def __init__(self, use_pack_cache=True):
"""
:param use_pack_cache: True to cache which packs have been registered in memory and making
sure packs are only registered once.
:type use_pack_cache: ``bool``
"""
self._use_pack_cache = use_pack_cache
self._meta_loader = MetaLoader()
self._pack_loader = ContentPackLoader()
开发者ID:hejin,项目名称:st2,代码行数:9,代码来源:base.py
示例8: __init__
def __init__(self, rule_file_path, trigger_instance_file_path):
"""
:param rule_file_path: Path to the file containing rule definition.
:type rule_file_path: ``str``
:param trigger_instance_file_path: Path to the file containg trigger instance definition.
:type trigger_instance_file_path: ``str``
"""
self._rule_file_path = rule_file_path
self._trigger_instance_file_path = trigger_instance_file_path
self._meta_loader = MetaLoader()
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:11,代码来源:tester.py
示例9: __init__
def __init__(self, use_pack_cache=True, fail_on_failure=False):
"""
:param use_pack_cache: True to cache which packs have been registered in memory and making
sure packs are only registered once.
:type use_pack_cache: ``bool``
:param fail_on_failure: Throw an exception if resource registration fails.
:type fail_on_failure: ``bool``
"""
self._use_pack_cache = use_pack_cache
self._fail_on_failure = fail_on_failure
self._meta_loader = MetaLoader()
self._pack_loader = ContentPackLoader()
开发者ID:Bala96,项目名称:st2,代码行数:14,代码来源:base.py
示例10: RulesRegistrar
class RulesRegistrar(object):
def __init__(self):
self._meta_loader = MetaLoader()
def _get_json_rules_from_pack(self, rules_dir):
return glob.glob(rules_dir + '/*.json')
def _get_yaml_rules_from_pack(self, rules_dir):
rules = glob.glob(rules_dir + '/*.yaml')
rules.extend(glob.glob(rules_dir + '*.yml'))
return rules
def _get_rules_from_pack(self, rules_dir):
rules = self._get_json_rules_from_pack(rules_dir) or []
rules.extend(self._get_yaml_rules_from_pack(rules_dir) or [])
return rules
def _register_rules_from_pack(self, pack, rules):
for rule in rules:
LOG.debug('Loading rule from %s.', rule)
try:
content = self._meta_loader.load(rule)
rule_api = RuleAPI(**content)
rule_db = RuleAPI.to_model(rule_api)
try:
rule_db.id = Rule.get_by_name(rule_api.name).id
except ValueError:
LOG.info('Rule %s not found. Creating new one.', rule)
try:
rule_db = Rule.add_or_update(rule_db)
LOG.audit('Rule updated. Rule %s from %s.', rule_db, rule)
except Exception:
LOG.exception('Failed to create rule %s.', rule_api.name)
except:
LOG.exception('Failed registering rule from %s.', rule)
def register_rules_from_packs(self, base_dir):
pack_loader = ContentPackLoader()
dirs = pack_loader.get_content(base_dir=base_dir,
content_type='rules')
for pack, rules_dir in six.iteritems(dirs):
try:
LOG.info('Registering rules from pack: %s', pack)
rules = self._get_rules_from_pack(rules_dir)
self._register_rules_from_pack(pack, rules)
except:
LOG.exception('Failed registering all rules from pack: %s', rules_dir)
开发者ID:bjoernbessert,项目名称:st2,代码行数:47,代码来源:rulesregistrar.py
示例11: __init__
def __init__(self, use_pack_cache=True, use_runners_cache=False, fail_on_failure=False):
"""
:param use_pack_cache: True to cache which packs have been registered in memory and making
sure packs are only registered once.
:type use_pack_cache: ``bool``
:param use_runners_cache: True to cache RunnerTypeDB objects in memory to reduce load on
the database.
:type use_runners_cache: ``bool``
:param fail_on_failure: Throw an exception if resource registration fails.
:type fail_on_failure: ``bool``
"""
self._use_pack_cache = use_pack_cache
self._use_runners_cache = use_runners_cache
self._fail_on_failure = fail_on_failure
self._meta_loader = MetaLoader()
self._pack_loader = ContentPackLoader()
# Maps runner name -> RunnerTypeDB
self._runner_type_db_cache = {}
开发者ID:nzlosh,项目名称:st2,代码行数:22,代码来源:base.py
示例12: ResourceRegistrar
class ResourceRegistrar(object):
ALLOWED_EXTENSIONS = []
def __init__(self):
self._meta_loader = MetaLoader()
self._pack_loader = ContentPackLoader()
def get_resources_from_pack(self, resources_dir):
resources = []
for ext in self.ALLOWED_EXTENSIONS:
resources_glob = resources_dir
if resources_dir.endswith('/'):
resources_glob = resources_dir + ext
else:
resources_glob = resources_dir + '/*' + ext
resource_files = glob.glob(resources_glob)
resources.extend(resource_files)
resources = sorted(resources)
return resources
def register_packs(self, base_dirs):
"""
Register packs in all the provided directories.
"""
packs = self._pack_loader.get_packs(base_dirs=base_dirs)
registered_count = 0
for pack_name, pack_path in six.iteritems(packs):
self.register_pack(pack_name=pack_name, pack_dir=pack_path)
registered_count += 1
return registered_count
def register_pack(self, pack_name, pack_dir):
"""
Register pack in the provided directory.
"""
if pack_name in REGISTERED_PACKS_CACHE:
# This pack has already been registered during this register content run
return
LOG.debug('Registering pack: %s' % (pack_name))
REGISTERED_PACKS_CACHE[pack_name] = True
try:
pack_db = self._register_pack(pack_name=pack_name, pack_dir=pack_dir)
except Exception:
LOG.exception('Failed to register pack "%s"' % (pack_name))
return None
return pack_db
def _register_pack(self, pack_name, pack_dir):
"""
Register a pack (create a DB object in the system).
Note: Pack registration now happens when registering the content and not when installing
a pack using packs.install. Eventually this will be moved to the pack management API.
"""
manifest_path = os.path.join(pack_dir, MANIFEST_FILE_NAME)
if not os.path.isfile(manifest_path):
raise ValueError('Pack "%s" is missing %s file' % (pack_name, MANIFEST_FILE_NAME))
content = self._meta_loader.load(manifest_path)
if not content:
raise ValueError('Pack "%s" metadata file is empty' % (pack_name))
content['ref'] = pack_name
pack_api = PackAPI(**content)
pack_db = PackAPI.to_model(pack_api)
try:
pack_db.id = Pack.get_by_ref(pack_name).id
except ValueError:
LOG.debug('Pack %s not found. Creating new one.', pack_name)
pack_db = Pack.add_or_update(pack_db)
LOG.debug('Pack %s registered.' % (pack_name))
return pack_db
开发者ID:ruslantum,项目名称:st2,代码行数:83,代码来源:base.py
示例13: __init__
def __init__(self):
self.meta_loader = MetaLoader()
开发者ID:ruslantum,项目名称:st2,代码行数:2,代码来源:fixturesloader.py
示例14: __init__
def __init__(self, runner_id):
super(ActionChainRunner, self).__init__(runner_id=runner_id)
self.chain_holder = None
self._meta_loader = MetaLoader()
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:4,代码来源:actionchainrunner.py
示例15: ActionsRegistrar
class ActionsRegistrar(object):
def __init__(self):
self._meta_loader = MetaLoader()
def _get_actions_from_pack(self, actions_dir):
actions = []
for ext in ALLOWED_EXTS:
actions_ext = glob.glob(actions_dir + '/*' + ext)
# Exclude global actions configuration file
config_file = 'actions/config' + ext
actions_ext = [file_path for file_path in actions_ext if
config_file not in file_path] or []
actions.extend(actions_ext)
return actions
def _register_action(self, pack, action):
content = self._meta_loader.load(action)
action_ref = ResourceReference(pack=pack, name=str(content['name']))
model = action_utils.get_action_by_ref(action_ref)
if not model:
model = ActionDB()
model.name = content['name']
model.description = content['description']
model.enabled = content['enabled']
model.pack = pack
model.entry_point = content['entry_point']
model.parameters = content.get('parameters', {})
runner_type = str(content['runner_type'])
valid_runner_type, runner_type_db = self._has_valid_runner_type(runner_type)
if valid_runner_type:
model.runner_type = {'name': runner_type_db.name}
else:
LOG.exception('Runner type %s doesn\'t exist.', runner_type)
raise
try:
model = Action.add_or_update(model)
LOG.audit('Action created. Action %s from %s.', model, action)
except Exception:
LOG.exception('Failed to write action to db %s.', model.name)
raise
def _has_valid_runner_type(self, runner_type):
try:
return True, action_utils.get_runnertype_by_name(runner_type)
except:
return False, None
def _register_actions_from_pack(self, pack, actions):
for action in actions:
try:
LOG.debug('Loading action from %s.', action)
self._register_action(pack, action)
except Exception:
LOG.exception('Unable to register action: %s', action)
continue
# XXX: Requirements for actions is tricky because actions can execute remotely.
# Currently, this method is unused.
def _is_requirements_ok(self, actions_dir):
rqmnts_file = os.path.join(actions_dir, 'requirements.txt')
if not os.path.exists(rqmnts_file):
return True
missing = RequirementsValidator.validate(rqmnts_file)
if missing:
LOG.warning('Actions in %s missing dependencies: %s', actions_dir, ','.join(missing))
return False
return True
def register_actions_from_packs(self, base_dir):
pack_loader = ContentPackLoader()
dirs = pack_loader.get_content(base_dir=base_dir,
content_type='actions')
for pack, actions_dir in six.iteritems(dirs):
try:
LOG.debug('Registering actions from pack %s:, dir: %s', pack, actions_dir)
actions = self._get_actions_from_pack(actions_dir)
self._register_actions_from_pack(pack, actions)
except:
LOG.exception('Failed registering all actions from pack: %s', actions_dir)
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:82,代码来源:actionsregistrar.py
示例16: FixturesLoader
class FixturesLoader(object):
def __init__(self):
self.meta_loader = MetaLoader()
def save_fixtures_to_db(self, fixtures_pack='generic', fixtures_dict=None):
"""
Loads fixtures specified in fixtures_dict into the database
and returns DB models for the fixtures.
fixtures_dict should be of the form:
{
'actions': ['action-1.yaml', 'action-2.yaml'],
'rules': ['rule-1.yaml'],
'liveactions': ['execution-1.yaml']
}
:param fixtures_pack: Name of the pack to load fixtures from.
:type fixtures_pack: ``str``
:param fixtures_dict: Dictionary specifying the fixtures to load for each type.
:type fixtures_dict: ``dict``
:rtype: ``dict``
"""
if fixtures_dict is None:
fixtures_dict = {}
fixtures_pack_path = self._validate_fixtures_pack(fixtures_pack)
self._validate_fixture_dict(fixtures_dict, allowed=ALLOWED_DB_FIXTURES)
db_models = {}
for fixture_type, fixtures in six.iteritems(fixtures_dict):
API_MODEL = FIXTURE_API_MODEL.get(fixture_type, None)
PERSISTENCE_MODEL = FIXTURE_PERSISTENCE_MODEL.get(fixture_type, None)
loaded_fixtures = {}
for fixture in fixtures:
fixture_dict = self.meta_loader.load(
self._get_fixture_file_path_abs(fixtures_pack_path, fixture_type, fixture))
api_model = API_MODEL(**fixture_dict)
db_model = API_MODEL.to_model(api_model)
db_model = PERSISTENCE_MODEL.add_or_update(db_model)
loaded_fixtures[fixture] = db_model
db_models[fixture_type] = loaded_fixtures
return db_models
def load_fixtures(self, fixtures_pack='generic', fixtures_dict=None):
"""
Loads fixtures specified in fixtures_dict. We
simply want to load the meta into dict objects.
fixtures_dict should be of the form:
{
'actionchains': ['actionchain1.yaml', 'actionchain2.yaml'],
'workflows': ['workflow.yaml']
}
:param fixtures_pack: Name of the pack to load fixtures from.
:type fixtures_pack: ``str``
:param fixtures_dict: Dictionary specifying the fixtures to load for each type.
:type fixtures_dict: ``dict``
:rtype: ``dict``
"""
if not fixtures_dict:
return {}
fixtures_pack_path = self._validate_fixtures_pack(fixtures_pack)
self._validate_fixture_dict(fixtures_dict)
all_fixtures = {}
for fixture_type, fixtures in six.iteritems(fixtures_dict):
loaded_fixtures = {}
for fixture in fixtures:
fixture_dict = self.meta_loader.load(
self._get_fixture_file_path_abs(fixtures_pack_path, fixture_type, fixture))
loaded_fixtures[fixture] = fixture_dict
all_fixtures[fixture_type] = loaded_fixtures
return all_fixtures
def load_models(self, fixtures_pack='generic', fixtures_dict=None):
"""
Loads fixtures specified in fixtures_dict as db models. This method must be
used for fixtures that have associated DB models. We simply want to load the
meta as DB models but don't want to save them to db.
fixtures_dict should be of the form:
{
'actions': ['action-1.yaml', 'action-2.yaml'],
'rules': ['rule-1.yaml'],
'liveactions': ['execution-1.yaml']
}
:param fixtures_pack: Name of the pack to load fixtures from.
:type fixtures_pack: ``str``
:param fixtures_dict: Dictionary specifying the fixtures to load for each type.
#.........这里部分代码省略.........
开发者ID:ruslantum,项目名称:st2,代码行数:101,代码来源:fixturesloader.py
示例17: main
def main(metadata_path, output_path, print_source=False):
metadata_path = os.path.abspath(metadata_path)
metadata_dir = os.path.dirname(metadata_path)
meta_loader = MetaLoader()
data = meta_loader.load(metadata_path)
action_name = data['name']
entry_point = data['entry_point']
workflow_metadata_path = os.path.join(metadata_dir, entry_point)
chainspec = meta_loader.load(workflow_metadata_path)
chain_holder = ChainHolder(chainspec, 'workflow')
graph_label = '%s action-chain workflow visualization' % (action_name)
graph_attr = {
'rankdir': 'TD',
'labelloc': 't',
'fontsize': '15',
'label': graph_label
}
node_attr = {}
dot = Digraph(comment='Action chain work-flow visualization',
node_attr=node_attr, graph_attr=graph_attr, format='png')
# dot.body.extend(['rankdir=TD', 'size="10,5"'])
# Add all nodes
node = chain_holder.get_next_node()
while node:
dot.node(node.name, node.name)
node = chain_holder.get_next_node(curr_node_name=node.name)
# Add connections
node = chain_holder.get_next_node()
processed_nodes = sets.Set([node.name])
nodes = [node]
while nodes:
previous_node = nodes.pop()
success_node = chain_holder.get_next_node(curr_node_name=previous_node.name,
condition='on-success')
failure_node = chain_holder.get_next_node(curr_node_name=previous_node.name,
condition='on-failure')
# Add success node (if any)
if success_node:
dot.edge(previous_node.name, success_node.name, constraint='true',
color='green', label='on success')
if success_node.name not in processed_nodes:
nodes.append(success_node)
processed_nodes.add(success_node.name)
# Add failure node (if any)
if failure_node:
dot.edge(previous_node.name, failure_node.name, constraint='true',
color='red', label='on failure')
if failure_node.name not in processed_nodes:
nodes.append(failure_node)
processed_nodes.add(failure_node.name)
if print_source:
print(dot.source)
if output_path:
output_path = os.path.join(output_path, action_name)
else:
output_path = output_path or os.path.join(os.getcwd(), action_name)
dot.format = 'png'
dot.render(output_path)
print('Graph saved at %s' % (output_path + '.png'))
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:73,代码来源:visualize_action_chain.py
示例18: RBACDefinitionsLoader
class RBACDefinitionsLoader(object):
"""
A class which loads role definitions and user role assignments from files on
disk.
"""
def __init__(self):
base_path = cfg.CONF.system.base_path
rbac_definitions_path = os.path.join(base_path, 'rbac/')
self._role_definitions_path = os.path.join(rbac_definitions_path, 'roles/')
self._role_assignments_path = os.path.join(rbac_definitions_path, 'assignments/')
self._meta_loader = MetaLoader()
def load(self):
"""
:return: Dict with the following keys: roles, role_assiginments
:rtype: ``dict``
"""
result = {}
result['roles'] = self.load_role_definitions()
result['role_assignments'] = self.load_user_role_assignments()
return result
def load_role_definitions(self):
"""
Load all the role definitions.
:rtype: ``dict``
"""
LOG.info('Loading role definitions from "%s"' % (self._role_definitions_path))
file_paths = self._get_role_definitions_file_paths()
result = {}
for file_path in file_paths:
LOG.debug('Loading role definition from: %s' % (file_path))
role_definition_api = self.load_role_definition_from_file(file_path=file_path)
role_name = role_definition_api.name
if role_name in result:
raise ValueError('Duplicate definition file found for role "%s"' % (role_name))
result[role_name] = role_definition_api
return result
def load_user_role_assignments(self):
"""
Load all the user role assignments.
:rtype: ``dict``
"""
LOG.info('Loading user role assignments from "%s"' % (self._role_assignments_path))
file_paths = self._get_role_assiginments_file_paths()
result = {}
for file_path in file_paths:
LOG.debug('Loading user role assignments from: %s' % (file_path))
role_assignment_api = self.load_user_role_assignments_from_file(file_path=file_path)
username = role_assignment_api.username
if username in result:
raise ValueError('Duplicate definition file found for user "%s"' % (username))
result[username] = role_assignment_api
return result
def load_role_definition_from_file(self, file_path):
"""
Load role definition from file.
:param file_path: Path to the role definition file.
:type file_path: ``str``
:return: Role definition.
:rtype: :class:`RoleDefinitionFileFormatAPI`
"""
content = self._meta_loader.load(file_path)
role_definition_api = RoleDefinitionFileFormatAPI(**content)
role_definition_api.validate()
return role_definition_api
def load_user_role_assignments_from_file(self, file_path):
"""
Load user role assignments from file.
:param file_path: Path to the user role assignment file.
:type file_path: ``str``
:return: User role assignments.
:rtype: :class:`UserRoleAssignmentFileFormatAPI`
"""
content = self._meta_loader.load(file_path)
user_role_assignment_api = UserRoleAssignmentFileFormatAPI(**content)
user_role_assignment_api.validate()
#.........这里部分代码省略.........
开发者ID:alexmakarski,项目名称:st2,代码行数:101,代码来源:loader.py
示例19: RuleTester
class RuleTester(object):
def __init__(self, rule_file_path=None, rule_ref=None, trigger_instance_file_path=None,
trigger_instance_id=None):
"""
:param rule_file_path: Path to the file containing rule definition.
:type rule_file_path: ``str``
:param trigger_instance_file_path: Path to the file containg trigger instance definition.
:type trigger_instance_file_path: ``str``
"""
self._rule_file_path = rule_file_path
self._rule_ref = rule_ref
self._trigger_instance_file_path = trigger_instance_file_path
self._trigger_instance_id = trigger_instance_id
self._meta_loader = MetaLoader()
def evaluate(self):
"""
Evaluate trigger instance against the rule.
:return: ``True`` if the rule matches, ``False`` otherwise.
:rtype: ``boolean``
"""
rule_db = self._get_rule_db()
trigger_instance_db, trigger_db = self._get_trigger_instance_db()
# The trigger check needs to be performed here as that is not performed
# by RulesMatcher.
if rule_db.trigger != trigger_db.ref:
LOG.info('rule.trigger "%s" and trigger.ref "%s" do not match.',
rule_db.trigger, trigger_db.ref)
return False
matcher = RulesMatcher(trigger_instance=trigger_instance_db, trigger=trigger_db,
rules=[rule_db], extra_info=True)
matching_rules = matcher.get_matching_rules()
return len(matching_rules) >= 1
def _get_rule_db(self):
if self._rule_file_path:
return self._get_rule_db_from_file(
file_path=os.path.realpath(self._rule_file_path))
elif self._rule_ref:
return Rule.get_by_ref(self._rule_ref)
raise ValueError('One of _rule_file_path or _rule_ref should be specified.')
def _get_trigger_instance_db(self):
if self._trigger_instance_file_path:
return self._get_trigger_instance_db_from_file(
file_path=os.path.realpath(self._trigger_instance_file_path))
elif self._trigger_instance_id:
trigger_instance_db = TriggerInstance.get_by_id(self._trigger_instance_id)
trigger_db = Trigger.get_by_ref(trigger_instance_db.trigger)
return trigger_instance_db, trigger_db
raise ValueError('One of _trigger_instance_file_path or'
'_trigger_instance_id should be specified.')
def _get_rule_db_from_file(self, file_path):
data = self._meta_loader.load(file_path=file_path)
pack = data.get('pack', 'unknown')
name = data.get('name', 'unknown')
trigger = data['trigger']['type']
criteria = data.get('criteria', None)
rule_db = RuleDB(pack=pack, name=name, trigger=trigger, criteria=criteria, action={},
enabled=True)
return rule_db
def _get_trigger_instance_db_from_file(self, file_path):
data = self._meta_loader.load(file_path=file_path)
instance = TriggerInstanceDB(**data)
trigger_ref = ResourceReference.from_string_reference(instance['trigger'])
trigger_db = TriggerDB(pack=trigger_ref.pack, name=trigger_ref.name, type=trigger_ref.ref)
return instance, trigger_db
开发者ID:automotola,项目名称:st2,代码行数:77,代码来源:tester.py
示例20: SensorsRegistrar
class SensorsRegistrar(object):
def __init__(self):
self._meta_loader = MetaLoader()
def _get_sensors_from_pack(self, sensors_dir):
sensors = glob.glob(sensors_dir + '/*.yaml')
sensors.extend(glob.glob(sensors_dir + '*.yml'))
return sensors
def _register_sensors_from_pack(self, pack, sensors):
for sensor in sensors:
try:
self._register_sensor_from_pack(pack=pack, sensor=sensor)
except Exception as e:
LOG.debug('Failed to register sensor "%s": %s', sensor, str(e))
else:
LOG.debug('Sensor "%s" successfully registered', sensor)
def _register_sensor_from_pack(self, pack, sensor):
sensor_metadata_file_path = sensor
LOG.debug('Loading sensor from %s.', sensor_metadata_file_path)
metadata = self._meta_loader.load(file_path=sensor_metadata_file_path)
class_name = metadata.get('class_name', None)
entry_point = metadata.get('entry_point', None)
description = metadata.get('description', None)
trigger_types = metadata.get('trigger_types', [])
poll_interval = metadata.get('poll_interval', None)
# Add TrigerType models to the DB
trigger_type_dbs = container_utils.add_trigger_models(pack=pack,
trigger_types=trigger_types)
# Populate a list of references belonging to this sensor
trigger_type_refs = []
for trigger_type_db, _ in trigger_type_dbs:
ref_obj = trigger_type_db.get_reference()
trigger_type_ref = ref_obj.ref
trigger_type_refs.append(trigger_type_ref)
if entry_point and class_name:
sensors_dir = os.path.dirname(sensor_metadata_file_path)
sensor_file_path = os.path.join(sensors_dir, entry_point)
# Add Sensor model to the DB
sensor_obj = {
'name': class_name,
'description': description,
'class_name': class_name,
'file_path': sensor_file_path,
'trigger_types': trigger_type_refs,
'poll_interval': poll_interval
}
container_utils.add_sensor_model(pack=pack, sensor=sensor_obj)
def register_sensors_from_packs(self, base_dir):
pack_loader = ContentPackLoader()
dirs = pack_loader.get_content(base_dir=base_dir, content_type='sensors')
# Add system sensors to the core pack
dirs['core'] = {}
dirs['core'] = SYSTEM_SENSORS_PATH
for pack, sensors_dir in six.iteritems(dirs):
try:
LOG.info('Registering sensors from pack: %s', pack)
sensors = self._get_sensors_from_pack(sensors_dir)
self._register_sensors_from_pack(pack=pack, sensors=sensors)
except Exception as e:
LOG.exception('Failed registering all sensors from pack "%s": %s', sensors_dir,
str(e))
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:71,代码来源:sensorsregistrar.py
注:本文中的st2common.content.loader.MetaLoader类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论