本文整理汇总了Python中st2common.models.api.action.LiveActionAPI类的典型用法代码示例。如果您正苦于以下问题:Python LiveActionAPI类的具体用法?Python LiveActionAPI怎么用?Python LiveActionAPI使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了LiveActionAPI类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _schedule_execution
def _schedule_execution(self, execution):
# Initialize execution context if it does not exist.
if not hasattr(execution, 'context'):
execution.context = dict()
# Retrieve username of the authed user (note - if auth is disabled, user will not be
# set so we fall back to the system user name)
request_token = pecan.request.context.get('token', None)
if request_token:
user = request_token.user
else:
user = cfg.CONF.system_user.user
execution.context['user'] = user
# Retrieve other st2 context from request header.
if ('st2-context' in pecan.request.headers and pecan.request.headers['st2-context']):
context = jsonify.try_loads(pecan.request.headers['st2-context'])
if not isinstance(context, dict):
raise ValueError('Unable to convert st2-context from the headers into JSON.')
execution.context.update(context)
# Schedule the action execution.
liveactiondb = LiveActionAPI.to_model(execution)
_, actionexecutiondb = action_service.request(liveactiondb)
return ActionExecutionAPI.from_model(actionexecutiondb)
开发者ID:SamMarkowitz,项目名称:st2,代码行数:27,代码来源:actionexecutions.py
示例2: test_execution_creation_action_triggered_by_rule
def test_execution_creation_action_triggered_by_rule(self):
# Wait for the action execution to complete and then confirm outcome.
trigger_type = self.MODELS['triggertypes']['triggertype2.yaml']
trigger = self.MODELS['triggers']['trigger2.yaml']
trigger_instance = self.MODELS['triggerinstances']['trigger_instance_1.yaml']
test_liveaction = self.FIXTURES['liveactions']['liveaction3.yaml']
rule = self.MODELS['rules']['rule3.yaml']
# Setup LiveAction to point to right rule and trigger_instance.
# XXX: We need support for dynamic fixtures.
test_liveaction['context']['rule']['id'] = str(rule.id)
test_liveaction['context']['trigger_instance']['id'] = str(trigger_instance.id)
test_liveaction_api = LiveActionAPI(**test_liveaction)
test_liveaction = LiveAction.add_or_update(LiveActionAPI.to_model(test_liveaction_api))
liveaction = LiveAction.get(context__trigger_instance__id=str(trigger_instance.id))
self.assertIsNotNone(liveaction)
self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_REQUESTED)
executions_util.create_execution_object(liveaction)
execution = self._get_action_execution(liveaction__id=str(liveaction.id),
raise_exception=True)
self.assertDictEqual(execution.trigger, vars(TriggerAPI.from_model(trigger)))
self.assertDictEqual(execution.trigger_type, vars(TriggerTypeAPI.from_model(trigger_type)))
self.assertDictEqual(execution.trigger_instance,
vars(TriggerInstanceAPI.from_model(trigger_instance)))
self.assertDictEqual(execution.rule, vars(RuleAPI.from_model(rule)))
action = action_utils.get_action_by_ref(liveaction.action)
self.assertDictEqual(execution.action, vars(ActionAPI.from_model(action)))
runner = RunnerType.get_by_name(action.runner_type['name'])
self.assertDictEqual(execution.runner, vars(RunnerTypeAPI.from_model(runner)))
liveaction = LiveAction.get_by_id(str(liveaction.id))
self.assertEquals(execution.liveaction['id'], str(liveaction.id))
开发者ID:ruslantum,项目名称:st2,代码行数:30,代码来源:test_executions_util.py
示例3: test_process_post_generic_notify_trigger_on_custom_emit_when_states
def test_process_post_generic_notify_trigger_on_custom_emit_when_states(self,
mock_LiveAction, mock_dispatch):
# Verify that generic action trigger is posted on all completed states when action sensor
# is enabled
for status in LIVEACTION_STATUSES:
notifier = Notifier(connection=None, queues=[])
liveaction_db = LiveActionDB(id=bson.ObjectId(), action='core.local')
liveaction_db.status = status
execution = MOCK_EXECUTION
execution.liveaction = vars(LiveActionAPI.from_model(liveaction_db))
execution.status = liveaction_db.status
mock_LiveAction.get_by_id.return_value = liveaction_db
notifier = Notifier(connection=None, queues=[])
notifier.process(execution)
if status in ['scheduled', 'pending', 'abandoned']:
exp = {'status': status,
'start_timestamp': str(liveaction_db.start_timestamp),
'result': {}, 'parameters': {},
'action_ref': u'core.local',
'runner_ref': 'local-shell-cmd',
'execution_id': str(MOCK_EXECUTION.id),
'action_name': u'core.local'}
mock_dispatch.assert_called_with('core.st2.generic.actiontrigger',
payload=exp, trace_context={})
self.assertEqual(mock_dispatch.call_count, 3)
开发者ID:nzlosh,项目名称:st2,代码行数:30,代码来源:test_notifier.py
示例4: test_notify_triggers_jinja_patterns
def test_notify_triggers_jinja_patterns(self, dispatch):
liveaction_db = LiveActionDB(action='core.local')
liveaction_db.id = bson.ObjectId()
liveaction_db.description = ''
liveaction_db.status = 'succeeded'
liveaction_db.parameters = {'cmd': 'mamma mia', 'runner_foo': 'foo'}
on_success = NotificationSubSchema(message='Command {{action_parameters.cmd}} succeeded.',
data={'stdout': '{{action_results.stdout}}'})
liveaction_db.notify = NotificationSchema(on_success=on_success)
liveaction_db.start_timestamp = date_utils.get_datetime_utc_now()
liveaction_db.end_timestamp = \
(liveaction_db.start_timestamp + datetime.timedelta(seconds=50))
LiveAction.add_or_update(liveaction_db)
execution = MOCK_EXECUTION
execution.liveaction = vars(LiveActionAPI.from_model(liveaction_db))
execution.status = liveaction_db.status
notifier = Notifier(connection=None, queues=[])
notifier.process(execution)
exp = {'status': 'succeeded',
'start_timestamp': isotime.format(liveaction_db.start_timestamp),
'route': 'notify.default', 'runner_ref': 'local-shell-cmd',
'channel': 'notify.default', 'message': u'Command mamma mia succeeded.',
'data': {'result': '{}', 'stdout': 'stuff happens'},
'action_ref': u'core.local',
'execution_id': str(MOCK_EXECUTION.id),
'end_timestamp': isotime.format(liveaction_db.end_timestamp)}
dispatch.assert_called_once_with('core.st2.generic.notifytrigger', payload=exp,
trace_context={})
notifier.process(execution)
开发者ID:nzlosh,项目名称:st2,代码行数:32,代码来源:test_notifier.py
示例5: _schedule_execution
def _schedule_execution(self, liveaction):
# Initialize execution context if it does not exist.
if not hasattr(liveaction, 'context'):
liveaction.context = dict()
liveaction.context['user'] = get_requester()
LOG.debug('User is: %s' % liveaction.context['user'])
# Retrieve other st2 context from request header.
if 'st2-context' in pecan.request.headers and pecan.request.headers['st2-context']:
context = jsonify.try_loads(pecan.request.headers['st2-context'])
if not isinstance(context, dict):
raise ValueError('Unable to convert st2-context from the headers into JSON.')
liveaction.context.update(context)
# Schedule the action execution.
liveaction_db = LiveActionAPI.to_model(liveaction)
liveaction_db, actionexecution_db = action_service.create_request(liveaction_db)
action_db = action_utils.get_action_by_ref(liveaction_db.action)
runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
try:
liveaction_db.parameters = param_utils.render_live_params(
runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
liveaction_db.context)
except ParamException as e:
raise ValueValidationException(str(e))
liveaction_db = LiveAction.add_or_update(liveaction_db, publish=False)
_, actionexecution_db = action_service.publish_request(liveaction_db, actionexecution_db)
from_model_kwargs = self._get_from_model_kwargs_for_request(request=pecan.request)
return ActionExecutionAPI.from_model(actionexecution_db, from_model_kwargs)
开发者ID:azamsheriff,项目名称:st2,代码行数:34,代码来源:actionexecutions.py
示例6: test_notify_triggers_end_timestamp_none
def test_notify_triggers_end_timestamp_none(self):
liveaction_db = LiveActionDB(action='core.local')
liveaction_db.id = bson.ObjectId()
liveaction_db.description = ''
liveaction_db.status = 'succeeded'
liveaction_db.parameters = {}
on_success = NotificationSubSchema(message='Action succeeded.')
on_failure = NotificationSubSchema(message='Action failed.')
liveaction_db.notify = NotificationSchema(on_success=on_success,
on_failure=on_failure)
liveaction_db.start_timestamp = date_utils.get_datetime_utc_now()
# This tests for end_timestamp being set to None, which can happen when a policy cancels
# a request.
# The assertions within "MockDispatcher.dispatch" will validate that the underlying code
# handles this properly, so all we need to do is keep the call to "notifier.process" below
liveaction_db.end_timestamp = None
LiveAction.add_or_update(liveaction_db)
execution = MOCK_EXECUTION
execution.liveaction = vars(LiveActionAPI.from_model(liveaction_db))
execution.status = liveaction_db.status
dispatcher = NotifierTestCase.MockDispatcher(self)
notifier = Notifier(connection=None, queues=[], trigger_dispatcher=dispatcher)
notifier.process(execution)
开发者ID:nzlosh,项目名称:st2,代码行数:26,代码来源:test_notifier.py
示例7: _schedule_execution
def _schedule_execution(self, liveaction):
# Initialize execution context if it does not exist.
if not hasattr(liveaction, "context"):
liveaction.context = dict()
# Retrieve username of the authed user (note - if auth is disabled, user will not be
# set so we fall back to the system user name)
request_token = pecan.request.context.get("token", None)
if request_token:
user = request_token.user
else:
user = cfg.CONF.system_user.user
liveaction.context["user"] = user
LOG.debug("User is: %s" % user)
# Retrieve other st2 context from request header.
if "st2-context" in pecan.request.headers and pecan.request.headers["st2-context"]:
context = jsonify.try_loads(pecan.request.headers["st2-context"])
if not isinstance(context, dict):
raise ValueError("Unable to convert st2-context from the headers into JSON.")
liveaction.context.update(context)
# Schedule the action execution.
liveactiondb = LiveActionAPI.to_model(liveaction)
_, actionexecutiondb = action_service.request(liveactiondb)
from_model_kwargs = self._get_from_model_kwargs_for_request(request=pecan.request)
return ActionExecutionAPI.from_model(actionexecutiondb, from_model_kwargs)
开发者ID:jonico,项目名称:st2,代码行数:29,代码来源:actionexecutions.py
示例8: _decompose_liveaction
def _decompose_liveaction(liveaction_db):
"""
Splits the liveaction into an ActionExecution compatible dict.
"""
decomposed = {'liveaction': {}}
liveaction_api = vars(LiveActionAPI.from_model(liveaction_db))
for k in liveaction_api.keys():
if k in SKIPPED:
decomposed['liveaction'][k] = liveaction_api[k]
else:
decomposed[k] = getattr(liveaction_db, k)
return decomposed
开发者ID:ruslantum,项目名称:st2,代码行数:12,代码来源:executions.py
示例9: _schedule_execution
def _schedule_execution(self, liveaction, user=None):
# Initialize execution context if it does not exist.
if not hasattr(liveaction, 'context'):
liveaction.context = dict()
liveaction.context['user'] = user
LOG.debug('User is: %s' % liveaction.context['user'])
# Retrieve other st2 context from request header.
if 'st2-context' in pecan.request.headers and pecan.request.headers['st2-context']:
context = jsonify.try_loads(pecan.request.headers['st2-context'])
if not isinstance(context, dict):
raise ValueError('Unable to convert st2-context from the headers into JSON.')
liveaction.context.update(context)
# Schedule the action execution.
liveaction_db = LiveActionAPI.to_model(liveaction)
liveaction_db, actionexecution_db = action_service.create_request(liveaction_db)
action_db = action_utils.get_action_by_ref(liveaction_db.action)
runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
try:
liveaction_db.parameters = param_utils.render_live_params(
runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
liveaction_db.context)
except ParamException:
# By this point the execution is already in the DB therefore need to mark it failed.
_, e, tb = sys.exc_info()
action_service.update_status(
liveaction=liveaction_db,
new_status=LIVEACTION_STATUS_FAILED,
result={'error': str(e), 'traceback': ''.join(traceback.format_tb(tb, 20))})
# Might be a good idea to return the actual ActionExecution rather than bubble up
# the execption.
raise ValueValidationException(str(e))
liveaction_db = LiveAction.add_or_update(liveaction_db, publish=False)
_, actionexecution_db = action_service.publish_request(liveaction_db, actionexecution_db)
from_model_kwargs = self._get_from_model_kwargs_for_request(request=pecan.request)
return ActionExecutionAPI.from_model(actionexecution_db, from_model_kwargs)
开发者ID:Bala96,项目名称:st2,代码行数:42,代码来源:actionexecutions.py
示例10: _schedule_execution
def _schedule_execution(self, liveaction):
# Initialize execution context if it does not exist.
if not hasattr(liveaction, 'context'):
liveaction.context = dict()
liveaction.context['user'] = self._get_requester()
LOG.debug('User is: %s' % liveaction.context['user'])
# Retrieve other st2 context from request header.
if 'st2-context' in pecan.request.headers and pecan.request.headers['st2-context']:
context = jsonify.try_loads(pecan.request.headers['st2-context'])
if not isinstance(context, dict):
raise ValueError('Unable to convert st2-context from the headers into JSON.')
liveaction.context.update(context)
# Schedule the action execution.
liveactiondb = LiveActionAPI.to_model(liveaction)
_, actionexecutiondb = action_service.request(liveactiondb)
from_model_kwargs = self._get_from_model_kwargs_for_request(request=pecan.request)
return ActionExecutionAPI.from_model(actionexecutiondb, from_model_kwargs)
开发者ID:langelee,项目名称:st2,代码行数:20,代码来源:actionexecutions.py
示例11: test_notify_triggers
def test_notify_triggers(self):
liveaction_db = LiveActionDB(action='core.local')
liveaction_db.id = bson.ObjectId()
liveaction_db.description = ''
liveaction_db.status = 'succeeded'
liveaction_db.parameters = {}
on_success = NotificationSubSchema(message='Action succeeded.')
on_failure = NotificationSubSchema(message='Action failed.')
liveaction_db.notify = NotificationSchema(on_success=on_success,
on_failure=on_failure)
liveaction_db.start_timestamp = date_utils.get_datetime_utc_now()
liveaction_db.end_timestamp = \
(liveaction_db.start_timestamp + datetime.timedelta(seconds=50))
LiveAction.add_or_update(liveaction_db)
execution = MOCK_EXECUTION
execution.liveaction = vars(LiveActionAPI.from_model(liveaction_db))
execution.status = liveaction_db.status
dispatcher = NotifierTestCase.MockDispatcher(self)
notifier = Notifier(connection=None, queues=[], trigger_dispatcher=dispatcher)
notifier.process(execution)
开发者ID:nzlosh,项目名称:st2,代码行数:22,代码来源:test_notifier.py
示例12: test_post_generic_trigger_with_emit_condition
def test_post_generic_trigger_with_emit_condition(self, dispatch):
for status in LIVEACTION_STATUSES:
liveaction_db = LiveActionDB(action='core.local')
liveaction_db.status = status
execution = MOCK_EXECUTION
execution.liveaction = vars(LiveActionAPI.from_model(liveaction_db))
execution.status = liveaction_db.status
notifier = Notifier(connection=None, queues=[])
notifier._post_generic_trigger(liveaction_db, execution)
if status in ['scheduled', 'pending', 'abandoned']:
exp = {'status': status,
'start_timestamp': str(liveaction_db.start_timestamp),
'result': {}, 'parameters': {},
'action_ref': u'core.local',
'runner_ref': 'local-shell-cmd',
'execution_id': str(MOCK_EXECUTION.id),
'action_name': u'core.local'}
dispatch.assert_called_with('core.st2.generic.actiontrigger',
payload=exp, trace_context={})
self.assertEqual(dispatch.call_count, 3)
开发者ID:nzlosh,项目名称:st2,代码行数:23,代码来源:test_notifier.py
示例13: post
def post(self, execution):
try:
# Initialize execution context if it does not exist.
if not hasattr(execution, 'context'):
execution.context = dict()
# Retrieve username of the authed user (note - if auth is disabled, user will not be
# set so we fall back to the system user name)
request_token = pecan.request.context.get('token', None)
if request_token:
user = request_token.user
else:
user = cfg.CONF.system_user.user
execution.context['user'] = user
# Retrieve other st2 context from request header.
if ('st2-context' in pecan.request.headers and pecan.request.headers['st2-context']):
context = jsonify.try_loads(pecan.request.headers['st2-context'])
if not isinstance(context, dict):
raise ValueError('Unable to convert st2-context from the headers into JSON.')
execution.context.update(context)
# Schedule the action execution.
liveactiondb = LiveActionAPI.to_model(execution)
_, actionexecutiondb = action_service.schedule(liveactiondb)
return ActionExecutionAPI.from_model(actionexecutiondb)
except ValueError as e:
LOG.exception('Unable to execute action.')
abort(http_client.BAD_REQUEST, str(e))
except jsonschema.ValidationError as e:
LOG.exception('Unable to execute action. Parameter validation failed.')
abort(http_client.BAD_REQUEST, str(e))
except Exception as e:
LOG.exception('Unable to execute action. Unexpected error encountered.')
abort(http_client.INTERNAL_SERVER_ERROR, str(e))
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:37,代码来源:actionexecutions.py
示例14: _schedule_execution
def _schedule_execution(self, liveaction, requester_user, action_db, user=None,
context_string=None, show_secrets=False):
# Initialize execution context if it does not exist.
if not hasattr(liveaction, 'context'):
liveaction.context = dict()
liveaction.context['user'] = user
liveaction.context['pack'] = action_db.pack
LOG.debug('User is: %s' % liveaction.context['user'])
# Retrieve other st2 context from request header.
if context_string:
context = try_loads(context_string)
if not isinstance(context, dict):
raise ValueError('Unable to convert st2-context from the headers into JSON.')
liveaction.context.update(context)
# Include RBAC context (if RBAC is available and enabled)
if cfg.CONF.rbac.enable:
user_db = UserDB(name=user)
role_dbs = rbac_service.get_roles_for_user(user_db=user_db, include_remote=True)
roles = [role_db.name for role_db in role_dbs]
liveaction.context['rbac'] = {
'user': user,
'roles': roles
}
# Schedule the action execution.
liveaction_db = LiveActionAPI.to_model(liveaction)
runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
try:
liveaction_db.parameters = param_utils.render_live_params(
runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
liveaction_db.context)
except param_exc.ParamException:
# We still need to create a request, so liveaction_db is assigned an ID
liveaction_db, actionexecution_db = action_service.create_request(
liveaction=liveaction_db,
action_db=action_db,
runnertype_db=runnertype_db)
# By this point the execution is already in the DB therefore need to mark it failed.
_, e, tb = sys.exc_info()
action_service.update_status(
liveaction=liveaction_db,
new_status=action_constants.LIVEACTION_STATUS_FAILED,
result={'error': six.text_type(e),
'traceback': ''.join(traceback.format_tb(tb, 20))})
# Might be a good idea to return the actual ActionExecution rather than bubble up
# the exception.
raise validation_exc.ValueValidationException(six.text_type(e))
# The request should be created after the above call to render_live_params
# so any templates in live parameters have a chance to render.
liveaction_db, actionexecution_db = action_service.create_request(liveaction=liveaction_db,
action_db=action_db,
runnertype_db=runnertype_db)
_, actionexecution_db = action_service.publish_request(liveaction_db, actionexecution_db)
mask_secrets = self._get_mask_secrets(requester_user, show_secrets=show_secrets)
execution_api = ActionExecutionAPI.from_model(actionexecution_db, mask_secrets=mask_secrets)
return Response(json=execution_api, status=http_client.CREATED)
开发者ID:nzlosh,项目名称:st2,代码行数:65,代码来源:actionexecutions.py
注:本文中的st2common.models.api.action.LiveActionAPI类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论