本文整理汇总了Python中st2common.models.api.notification.NotificationsHelper类的典型用法代码示例。如果您正苦于以下问题:Python NotificationsHelper类的具体用法?Python NotificationsHelper怎么用?Python NotificationsHelper使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了NotificationsHelper类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_model_transformations
def test_model_transformations(self):
notify = {}
notify['on_complete'] = {
'message': 'Action completed.',
'data': {
'foo': '{{foo}}',
'bar': 1,
'baz': [1, 2, 3]
}
}
notify['on_success'] = {
'message': 'Action succeeded.',
'data': {
'foo': '{{foo}}',
'bar': 1,
}
}
notify_model = NotificationsHelper.to_model(notify)
self.assertEqual(notify['on_complete']['message'], notify_model.on_complete.message)
self.assertDictEqual(notify['on_complete']['data'], notify_model.on_complete.data)
self.assertEqual(notify['on_success']['message'], notify_model.on_success.message)
self.assertDictEqual(notify['on_success']['data'], notify_model.on_success.data)
notify_api = NotificationsHelper.from_model(notify_model)
self.assertEqual(notify['on_complete']['message'], notify_api['on_complete']['message'])
self.assertDictEqual(notify['on_complete']['data'], notify_api['on_complete']['data'])
self.assertEqual(notify['on_success']['message'], notify_api['on_success']['message'])
self.assertDictEqual(notify['on_success']['data'], notify_api['on_success']['data'])
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:28,代码来源:test_notification_helper.py
示例2: to_model
def to_model(cls, action):
name = getattr(action, 'name', None)
description = getattr(action, 'description', None)
enabled = bool(getattr(action, 'enabled', True))
entry_point = str(action.entry_point)
pack = str(action.pack)
runner_type = {'name': str(action.runner_type)}
parameters = getattr(action, 'parameters', dict())
tags = TagsHelper.to_model(getattr(action, 'tags', []))
ref = ResourceReference.to_string_reference(pack=pack, name=name)
if getattr(action, 'notify', None):
notify = NotificationsHelper.to_model(action.notify)
else:
# We use embedded document model for ``notify`` in action model. If notify is
# set notify to None, Mongoengine interprets ``None`` as unmodified
# field therefore doesn't delete the embedded document. Therefore, we need
# to use an empty document.
notify = NotificationsHelper.to_model({})
model = cls.model(name=name, description=description, enabled=enabled,
entry_point=entry_point, pack=pack, runner_type=runner_type,
tags=tags, parameters=parameters, notify=notify,
ref=ref)
return model
开发者ID:Bala96,项目名称:st2,代码行数:26,代码来源:action.py
示例3: to_model
def to_model(cls, action):
name = getattr(action, "name", None)
description = getattr(action, "description", None)
enabled = bool(getattr(action, "enabled", True))
entry_point = str(action.entry_point)
pack = str(action.pack)
runner_type = {"name": str(action.runner_type)}
parameters = getattr(action, "parameters", dict())
tags = TagsHelper.to_model(getattr(action, "tags", []))
ref = ResourceReference.to_string_reference(pack=pack, name=name)
if getattr(action, "notify", None):
notify = NotificationsHelper.to_model(action.notify)
else:
notify = None
model = cls.model(
name=name,
description=description,
enable=enabled,
enabled=enabled,
entry_point=entry_point,
pack=pack,
runner_type=runner_type,
tags=tags,
parameters=parameters,
notify=notify,
ref=ref,
)
return model
开发者ID:jspittman,项目名称:st2,代码行数:31,代码来源:action.py
示例4: test_launch_workflow_with_notifications
def test_launch_workflow_with_notifications(self):
notify_data = {'on_complete': {'channels': ['slack'],
'message': '"@channel: Action succeeded."', 'data': {}}}
MistralRunner.entry_point = mock.PropertyMock(return_value=WF1_YAML_FILE_PATH)
liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS, notify=notify_data)
liveaction, execution = action_service.request(liveaction)
liveaction = LiveAction.get_by_id(str(liveaction.id))
self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
mistral_context = liveaction.context.get('mistral', None)
self.assertIsNotNone(mistral_context)
self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
workflow_input = copy.deepcopy(ACTION_PARAMS)
workflow_input.update({'count': '3'})
env = {
'st2_execution_id': str(execution.id),
'st2_liveaction_id': str(liveaction.id),
'__actions': {
'st2.action': {
'st2_context': {
'endpoint': 'http://0.0.0.0:9101/v1/actionexecutions',
'parent': str(liveaction.id),
'notify': NotificationsHelper.from_model(liveaction.notify),
'skip_notify_tasks': []
}
}
}
}
executions.ExecutionManager.create.assert_called_with(
WF1_NAME, workflow_input=workflow_input, env=env)
开发者ID:emptywee,项目名称:st2,代码行数:35,代码来源:test_mistral_v2.py
示例5: to_model
def to_model(cls, live_action):
name = getattr(live_action, 'name', None)
description = getattr(live_action, 'description', None)
action = live_action.action
if getattr(live_action, 'start_timestamp', None):
start_timestamp = isotime.parse(live_action.start_timestamp)
else:
start_timestamp = None
if getattr(live_action, 'end_timestamp', None):
end_timestamp = isotime.parse(live_action.end_timestamp)
else:
end_timestamp = None
status = getattr(live_action, 'status', None)
parameters = getattr(live_action, 'parameters', dict())
context = getattr(live_action, 'context', dict())
callback = getattr(live_action, 'callback', dict())
result = getattr(live_action, 'result', None)
if getattr(live_action, 'notify', None):
notify = NotificationsHelper.to_model(live_action.notify)
else:
notify = None
model = cls.model(name=name, description=description, action=action,
start_timestamp=start_timestamp, end_timestamp=end_timestamp,
status=status, parameters=parameters, context=context,
callback=callback, result=result, notify=notify)
return model
开发者ID:hejin,项目名称:st2,代码行数:32,代码来源:action.py
示例6: test_model_transformations
def test_model_transformations(self):
notify = {}
notify_model = NotificationsHelper.to_model(notify)
self.assertEqual(notify_model.on_success, None)
self.assertEqual(notify_model.on_failure, None)
self.assertEqual(notify_model.on_complete, None)
notify_api = NotificationsHelper.from_model(notify_model)
self.assertEqual(notify_api, {})
notify['on-complete'] = {
'message': 'Action completed.',
'routes': [
'66'
],
'data': {
'foo': '{{foo}}',
'bar': 1,
'baz': [1, 2, 3]
}
}
notify['on-success'] = {
'message': 'Action succeeded.',
'routes': [
'100'
],
'data': {
'foo': '{{foo}}',
'bar': 1,
}
}
notify_model = NotificationsHelper.to_model(notify)
self.assertEqual(notify['on-complete']['message'], notify_model.on_complete.message)
self.assertDictEqual(notify['on-complete']['data'], notify_model.on_complete.data)
self.assertListEqual(notify['on-complete']['routes'], notify_model.on_complete.routes)
self.assertEqual(notify['on-success']['message'], notify_model.on_success.message)
self.assertDictEqual(notify['on-success']['data'], notify_model.on_success.data)
self.assertListEqual(notify['on-success']['routes'], notify_model.on_success.routes)
notify_api = NotificationsHelper.from_model(notify_model)
self.assertEqual(notify['on-complete']['message'], notify_api['on-complete']['message'])
self.assertDictEqual(notify['on-complete']['data'], notify_api['on-complete']['data'])
self.assertListEqual(notify['on-complete']['routes'], notify_api['on-complete']['routes'])
self.assertEqual(notify['on-success']['message'], notify_api['on-success']['message'])
self.assertDictEqual(notify['on-success']['data'], notify_api['on-success']['data'])
self.assertListEqual(notify['on-success']['routes'], notify_api['on-success']['routes'])
开发者ID:Bala96,项目名称:st2,代码行数:46,代码来源:test_notification_helper.py
示例7: _get_notify
def _get_notify(self, action_node):
if action_node.name not in self._skip_notify_tasks:
if action_node.notify:
task_notify = NotificationsHelper.to_model(action_node.notify)
return task_notify
elif self._chain_notify:
return self._chain_notify
return None
开发者ID:LindsayHill,项目名称:st2,代码行数:9,代码来源:action_chain_runner.py
示例8: from_model
def from_model(cls, model, mask_secrets=False):
action = cls._from_model(model)
action["runner_type"] = action["runner_type"]["name"]
action["tags"] = TagsHelper.from_model(model.tags)
if getattr(model, "notify", None):
action["notify"] = NotificationsHelper.from_model(model.notify)
return cls(**action)
开发者ID:jspittman,项目名称:st2,代码行数:9,代码来源:action.py
示例9: to_model
def to_model(cls, action):
model = super(cls, cls).to_model(action)
model.enabled = bool(action.enabled)
model.entry_point = str(action.entry_point)
model.pack = str(action.pack)
model.runner_type = {'name': str(action.runner_type)}
model.parameters = getattr(action, 'parameters', dict())
model.tags = TagsHelper.to_model(getattr(action, 'tags', []))
model.ref = ResourceReference.to_string_reference(pack=model.pack, name=model.name)
if getattr(action, 'notify', None):
model.notify = NotificationsHelper.to_model(action.notify)
return model
开发者ID:rgaertner,项目名称:st2,代码行数:13,代码来源:action.py
示例10: test_model_transformations
def test_model_transformations(self):
notify = {}
notify_model = NotificationsHelper.to_model(notify)
self.assertEqual(notify_model.on_success, None)
self.assertEqual(notify_model.on_failure, None)
self.assertEqual(notify_model.on_complete, None)
notify_api = NotificationsHelper.from_model(notify_model)
self.assertEqual(notify_api, {})
notify["on-complete"] = {"message": "Action completed.", "data": {"foo": "{{foo}}", "bar": 1, "baz": [1, 2, 3]}}
notify["on-success"] = {"message": "Action succeeded.", "data": {"foo": "{{foo}}", "bar": 1}}
notify_model = NotificationsHelper.to_model(notify)
self.assertEqual(notify["on-complete"]["message"], notify_model.on_complete.message)
self.assertDictEqual(notify["on-complete"]["data"], notify_model.on_complete.data)
self.assertEqual(notify["on-success"]["message"], notify_model.on_success.message)
self.assertDictEqual(notify["on-success"]["data"], notify_model.on_success.data)
notify_api = NotificationsHelper.from_model(notify_model)
self.assertEqual(notify["on-complete"]["message"], notify_api["on-complete"]["message"])
self.assertDictEqual(notify["on-complete"]["data"], notify_api["on-complete"]["data"])
self.assertEqual(notify["on-success"]["message"], notify_api["on-success"]["message"])
self.assertDictEqual(notify["on-success"]["data"], notify_api["on-success"]["data"])
开发者ID:suqi,项目名称:st2,代码行数:23,代码来源:test_notification_helper.py
示例11: _construct_workflow_execution_options
def _construct_workflow_execution_options(self):
# This URL is used by Mistral to talk back to the API
api_url = get_mistral_api_url()
endpoint = api_url + '/actionexecutions'
# This URL is available in the context and can be used by the users inside a workflow,
# similar to "ST2_ACTION_API_URL" environment variable available to actions
public_api_url = get_full_public_api_url()
# Build context with additional information
parent_context = {
'execution_id': self.execution_id
}
if getattr(self.liveaction, 'context', None):
parent_context.update(self.liveaction.context)
st2_execution_context = {
'api_url': api_url,
'endpoint': endpoint,
'parent': parent_context,
'notify': {},
'skip_notify_tasks': self._skip_notify_tasks
}
# Include notification information
if self._notify:
notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
st2_execution_context['notify'] = notify_dict
if self.auth_token:
st2_execution_context['auth_token'] = self.auth_token.token
options = {
'env': {
'st2_execution_id': self.execution_id,
'st2_liveaction_id': self.liveaction_id,
'st2_action_api_url': public_api_url,
'__actions': {
'st2.action': {
'st2_context': st2_execution_context
}
}
}
}
return options
开发者ID:Plexxi,项目名称:st2,代码行数:46,代码来源:mistral_v2.py
示例12: _run_action
def _run_action(action_node, parent_execution_id, params, wait_for_completion=True):
execution = LiveActionDB(action=action_node.ref)
execution.parameters = action_param_utils.cast_params(action_ref=action_node.ref,
params=params)
if action_node.notify:
execution.notify = NotificationsHelper.to_model(action_node.notify)
execution.context = {
'parent': str(parent_execution_id),
'chain': vars(action_node)
}
liveaction, _ = action_service.schedule(execution)
while (wait_for_completion and
liveaction.status != LIVEACTION_STATUS_SUCCEEDED and
liveaction.status != LIVEACTION_STATUS_FAILED):
eventlet.sleep(1)
liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
return liveaction
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:18,代码来源:actionchainrunner.py
示例13: _construct_workflow_execution_options
def _construct_workflow_execution_options(self):
# This URL is used by Mistral to talk back to the API
api_url = get_mistral_api_url()
endpoint = api_url + "/actionexecutions"
# This URL is available in the context and can be used by the users inside a workflow,
# similar to "ST2_ACTION_API_URL" environment variable available to actions
public_api_url = get_full_public_api_url()
# Build context with additional information
parent_context = {"execution_id": self.execution_id}
if getattr(self.liveaction, "context", None):
parent_context.update(self.liveaction.context)
st2_execution_context = {
"endpoint": endpoint,
"parent": parent_context,
"notify": {},
"skip_notify_tasks": self._skip_notify_tasks,
}
# Include notification information
if self._notify:
notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
st2_execution_context["notify"] = notify_dict
if self.auth_token:
st2_execution_context["auth_token"] = self.auth_token.token
options = {
"env": {
"st2_execution_id": self.execution_id,
"st2_liveaction_id": self.liveaction_id,
"st2_action_api_url": public_api_url,
"__actions": {"st2.action": {"st2_context": st2_execution_context}},
}
}
return options
开发者ID:enykeev,项目名称:st2,代码行数:39,代码来源:v2.py
示例14: test_launch_workflow_with_notifications
def test_launch_workflow_with_notifications(self):
notify_data = {'on_complete': {'routes': ['slack'],
'message': '"@channel: Action succeeded."', 'data': {}}}
liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS, notify=notify_data)
liveaction, execution = action_service.request(liveaction)
liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
mistral_context = liveaction.context.get('mistral', None)
self.assertIsNotNone(mistral_context)
self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
workflow_input = copy.deepcopy(ACTION_PARAMS)
workflow_input.update({'count': '3'})
env = {
'st2_execution_id': str(execution.id),
'st2_liveaction_id': str(liveaction.id),
'st2_action_api_url': 'http://0.0.0.0:9101/v1',
'__actions': {
'st2.action': {
'st2_context': {
'api_url': 'http://0.0.0.0:9101/v1',
'endpoint': 'http://0.0.0.0:9101/v1/actionexecutions',
'parent': {
'pack': 'mistral_tests',
'execution_id': str(execution.id)
},
'notify': NotificationsHelper.from_model(liveaction.notify),
'skip_notify_tasks': []
}
}
}
}
executions.ExecutionManager.create.assert_called_with(
WF1_NAME, workflow_input=workflow_input, env=env, notify=NOTIFY)
开发者ID:nzlosh,项目名称:st2,代码行数:38,代码来源:test_mistral_v2.py
示例15: try_run
def try_run(self, action_parameters):
# Test connection
self._client.workflows.list()
# Setup inputs for the workflow execution.
inputs = self.runner_parameters.get("context", dict())
inputs.update(action_parameters)
endpoint = "http://%s:%s/v1/actionexecutions" % (cfg.CONF.api.host, cfg.CONF.api.port)
# Build context with additional information
parent_context = {"execution_id": self.execution_id}
if getattr(self.liveaction, "context", None):
parent_context.update(self.liveaction.context)
st2_execution_context = {
"endpoint": endpoint,
"parent": parent_context,
"notify": {},
"skip_notify_tasks": self._skip_notify_tasks,
}
# Include notification information
if self._notify:
notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
st2_execution_context["notify"] = notify_dict
if self.auth_token:
st2_execution_context["auth_token"] = self.auth_token.token
options = {
"env": {
"st2_execution_id": self.execution_id,
"st2_liveaction_id": self.liveaction_id,
"__actions": {"st2.action": {"st2_context": st2_execution_context}},
}
}
# Get workbook/workflow definition from file.
with open(self.entry_point, "r") as def_file:
def_yaml = def_file.read()
def_dict = yaml.safe_load(def_yaml)
is_workbook = "workflows" in def_dict
if not is_workbook:
# Non-workbook definition containing multiple workflows is not supported.
if len([k for k, _ in six.iteritems(def_dict) if k != "version"]) != 1:
raise Exception(
"Workflow (not workbook) definition is detected. " "Multiple workflows is not supported."
)
action_ref = "%s.%s" % (self.action.pack, self.action.name)
self._check_name(action_ref, is_workbook, def_dict)
def_dict_xformed = utils.transform_definition(def_dict)
def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
# Save workbook/workflow definition.
if is_workbook:
self._save_workbook(action_ref, def_yaml_xformed)
default_workflow = self._find_default_workflow(def_dict_xformed)
execution = self._client.executions.create(default_workflow, workflow_input=inputs, **options)
else:
self._save_workflow(action_ref, def_yaml_xformed)
execution = self._client.executions.create(action_ref, workflow_input=inputs, **options)
status = LIVEACTION_STATUS_RUNNING
partial_results = {"tasks": []}
# pylint: disable=no-member
current_context = {"execution_id": str(execution.id), "workflow_name": execution.workflow_name}
exec_context = self.context
exec_context = self._build_mistral_context(exec_context, current_context)
LOG.info("Mistral query context is %s" % exec_context)
return (status, partial_results, exec_context)
开发者ID:jonico,项目名称:st2,代码行数:77,代码来源:v2.py
示例16: _construct_workflow_execution_options
def _construct_workflow_execution_options(self):
# This URL is used by Mistral to talk back to the API
api_url = get_mistral_api_url()
endpoint = api_url + '/actionexecutions'
# This URL is available in the context and can be used by the users inside a workflow,
# similar to "ST2_ACTION_API_URL" environment variable available to actions
public_api_url = get_full_public_api_url()
# Build context with additional information
parent_context = {
'execution_id': self.execution_id
}
if getattr(self.liveaction, 'context', None):
parent_context.update(self.liveaction.context)
# Convert jinja expressions in the params of Action Chain under the parent context
# into raw block. If there is any jinja expressions, Mistral will try to evaulate
# the expression. If there is a local context reference, the evaluation will fail
# because the local context reference is out of scope.
chain_ctx = parent_context.get('chain') or {}
for attr in ['params', 'parameters']:
chain_params_ctx = chain_ctx.get(attr) or {}
for k, v in six.iteritems(chain_params_ctx):
parent_context['chain'][attr][k] = jinja.convert_jinja_to_raw_block(v)
st2_execution_context = {
'api_url': api_url,
'endpoint': endpoint,
'parent': parent_context,
'notify': {},
'skip_notify_tasks': self._skip_notify_tasks
}
# Include notification information
if self._notify:
notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
st2_execution_context['notify'] = notify_dict
if self.auth_token:
st2_execution_context['auth_token'] = self.auth_token.token
options = {
'env': {
'st2_execution_id': self.execution_id,
'st2_liveaction_id': self.liveaction_id,
'st2_action_api_url': public_api_url,
'__actions': {
'st2.action': {
'st2_context': st2_execution_context
}
}
}
}
if not self.is_polling_enabled():
options['notify'] = [{'type': 'st2'}]
# Only used on reverse type workflows
task_name = self.runner_parameters.get('task_name', None)
if task_name is not None:
options['task_name'] = task_name
return options
开发者ID:nzlosh,项目名称:st2,代码行数:67,代码来源:mistral_v2.py
示例17: run
def run(self, action_parameters):
# Test connection
self._client.workflows.list()
# Setup inputs for the workflow execution.
inputs = self.runner_parameters.get('context', dict())
inputs.update(action_parameters)
# This URL is used by Mistral to talk back to the API
api_url = get_mistral_api_url()
endpoint = api_url + '/actionexecutions'
# This URL is available in the context and can be used by the users inside a workflow,
# similar to "ST2_ACTION_API_URL" environment variable available to actions
public_api_url = get_full_public_api_url()
# Build context with additional information
parent_context = {
'execution_id': self.execution_id
}
if getattr(self.liveaction, 'context', None):
parent_context.update(self.liveaction.context)
st2_execution_context = {
'endpoint': endpoint,
'parent': parent_context,
'notify': {},
'skip_notify_tasks': self._skip_notify_tasks
}
# Include notification information
if self._notify:
notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
st2_execution_context['notify'] = notify_dict
if self.auth_token:
st2_execution_context['auth_token'] = self.auth_token.token
options = {
'env': {
'st2_execution_id': self.execution_id,
'st2_liveaction_id': self.liveaction_id,
'st2_action_api_url': public_api_url,
'__actions': {
'st2.action': {
'st2_context': st2_execution_context
}
}
}
}
# Get workbook/workflow definition from file.
with open(self.entry_point, 'r') as def_file:
def_yaml = def_file.read()
def_dict = yaml.safe_load(def_yaml)
is_workbook = ('workflows' in def_dict)
if not is_workbook:
# Non-workbook definition containing multiple workflows is not supported.
if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
raise Exception('Workflow (not workbook) definition is detected. '
'Multiple workflows is not supported.')
action_ref = '%s.%s' % (self.action.pack, self.action.name)
self._check_name(action_ref, is_workbook, def_dict)
def_dict_xformed = utils.transform_definition(def_dict)
def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
# Save workbook/workflow definition.
if is_workbook:
self._save_workbook(action_ref, def_yaml_xformed)
default_workflow = self._find_default_workflow(def_dict_xformed)
execution = self._client.executions.create(default_workflow,
workflow_input=inputs,
**options)
else:
self._save_workflow(action_ref, def_yaml_xformed)
execution = self._client.executions.create(action_ref,
workflow_input=inputs,
**options)
status = LIVEACTION_STATUS_RUNNING
partial_results = {'tasks': []}
# pylint: disable=no-member
current_context = {
'execution_id': str(execution.id),
'workflow_name': execution.workflow_name
}
exec_context = self.context
exec_context = self._build_mistral_context(exec_context, current_context)
LOG.info('Mistral query context is %s' % exec_context)
return (status, partial_results, exec_context)
开发者ID:azamsheriff,项目名称:st2,代码行数:96,代码来源:v2.py
示例18: try_run
def try_run(self, action_parameters):
# Test connection
self._client.workflows.list()
# Setup inputs for the workflow execution.
inputs = self.runner_parameters.get('context', dict())
inputs.update(action_parameters)
endpoint = 'http://%s:%s/v1/actionexecutions' % (cfg.CONF.api.host, cfg.CONF.api.port)
# Build context with additional information
st2_execution_context = {
'endpoint': endpoint,
'parent': self.liveaction_id,
'notify': {},
'skip_notify_tasks': self._skip_notify_tasks
}
# Include notification information
if self._notify:
notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
st2_execution_context['notify'] = notify_dict
if self.auth_token:
st2_execution_context['auth_token'] = self.auth_token.token
options = {
'env': {
'st2_execution_id': self.execution_id,
'st2_liveaction_id': self.liveaction_id,
'__actions': {
'st2.action': {
'st2_context': st2_execution_context
}
}
}
}
# Get workbook/workflow definition from file.
with open(self.entry_point, 'r') as def_file:
def_yaml = def_file.read()
def_dict = yaml.safe_load(def_yaml)
is_workbook = ('workflows' in def_dict)
if not is_workbook:
# Non-workbook definition containing multiple workflows is not supported.
if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
raise Exception('Workflow (not workbook) definition is detected. '
'Multiple workflows is not supported.')
action_ref = '%s.%s' % (self.action.pack, self.action.name)
self._check_name(action_ref, is_workbook, def_dict)
def_dict_xformed = utils.transform_definition(def_dict)
def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
# Save workbook/workflow definition.
if is_workbook:
self._save_workbook(action_ref, def_yaml_xformed)
default_workflow = self._find_default_workflow(def_dict_xformed)
execution = self._client.executions.create(default_workflow,
workflow_input=inputs,
**options)
else:
self._save_workflow(action_ref, def_yaml_xformed)
execution = self._client.executions.create(action_ref,
workflow_input=inputs,
**options)
status = LIVEACTION_STATUS_RUNNING
partial_results = {'tasks': []}
# pylint: disable=no-member
current_context = {
'execution_id': str(execution.id),
'workflow_name': execution.workflow_name
}
exec_context = self.context
exec_context = self._build_mistral_context(exec_context, current_context)
LOG.info('Mistral query context is %s' % exec_context)
return (status, partial_results, exec_context)
开发者ID:emptywee,项目名称:st2,代码行数:83,代码来源:v2.py
示例19: FixturesLoader
FIXTURES_PACK, 'actionchains', 'malformedchain.yaml')
CHAIN_TYPED_PARAMS = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain_typed_params.yaml')
CHAIN_SYSTEM_PARAMS = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain_typed_system_params.yaml')
CHAIN_WITH_ACTIONPARAM_VARS = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain_with_actionparam_vars.yaml')
CHAIN_WITH_SYSTEM_VARS = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain_with_system_vars.yaml')
CHAIN_WITH_PUBLISH = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain_with_publish.yaml')
CHAIN_WITH_INVALID_ACTION = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain_with_invalid_action.yaml')
CHAIN_NOTIFY_API = {'notify': {'on-complete': {'message': 'foo happened.'}}}
CHAIN_NOTIFY_DB = NotificationsHelper.to_model(CHAIN_NOTIFY_API)
@mock.patch.object(action_db_util, 'get_runnertype_by_name',
mock.MagicMock(return_value=RUNNER))
class TestActionChainRunner(DbTestCase):
def test_runner_creation(self):
runner = acr.get_runner()
self.assertTrue(runner)
self.assertTrue(runner.runner_id)
def test_malformed_chain(self):
try:
chain_runner = acr.get_runner()
chain_runner.entry_point = MALFORMED_CHAIN_PATH
开发者ID:amaline,项目名称:st2,代码行数:31,代码来源:test_actionchain.py
注:本文中的st2common.models.api.notification.NotificationsHelper类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论