本文整理汇总了Python中st2common.persistence.action.ActionExecution类的典型用法代码示例。如果您正苦于以下问题:Python ActionExecution类的具体用法?Python ActionExecution怎么用?Python ActionExecution使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ActionExecution类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_no_retry_on_workflow_task
def test_no_retry_on_workflow_task(self):
# Verify initial state
self.assertSequenceEqual(LiveAction.get_all(), [])
self.assertSequenceEqual(ActionExecution.get_all(), [])
# Start a mock action which times out
live_action_db = LiveActionDB(
action='wolfpack.action-1',
parameters={'actionstr': 'foo'},
context={'parent': {'execution_id': 'abcde'}}
)
live_action_db, execution_db = action_service.request(live_action_db)
live_action_db = LiveAction.get_by_id(str(live_action_db.id))
self.assertEqual(live_action_db.status, LIVEACTION_STATUS_REQUESTED)
# Expire the workflow instance.
live_action_db.status = LIVEACTION_STATUS_TIMED_OUT
live_action_db.context['policies'] = {}
execution_db.status = LIVEACTION_STATUS_TIMED_OUT
LiveAction.add_or_update(live_action_db)
ActionExecution.add_or_update(execution_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# Note: There should be no new objects since live action is under the context of a workflow.
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), 1)
self.assertEqual(len(action_execution_dbs), 1)
self.assertEqual(action_execution_dbs[0].status, LIVEACTION_STATUS_TIMED_OUT)
开发者ID:nzlosh,项目名称:st2,代码行数:32,代码来源:test_retry_policy.py
示例2: test_no_retry_on_non_applicable_statuses
def test_no_retry_on_non_applicable_statuses(self):
# Verify initial state
self.assertSequenceEqual(LiveAction.get_all(), [])
self.assertSequenceEqual(ActionExecution.get_all(), [])
# Start a mock action in various statuses in which we shouldn't retry
non_retry_statuses = [
LIVEACTION_STATUS_REQUESTED,
LIVEACTION_STATUS_SCHEDULED,
LIVEACTION_STATUS_DELAYED,
LIVEACTION_STATUS_CANCELING,
LIVEACTION_STATUS_CANCELED,
]
action_ref = 'wolfpack.action-1'
for status in non_retry_statuses:
liveaction = LiveActionDB(action=action_ref, parameters={'actionstr': 'foo'})
live_action_db, execution_db = action_service.request(liveaction)
live_action_db.status = status
execution_db.status = status
LiveAction.add_or_update(live_action_db)
ActionExecution.add_or_update(execution_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# None of the actions should have been retried
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), len(non_retry_statuses))
self.assertEqual(len(action_execution_dbs), len(non_retry_statuses))
开发者ID:nzlosh,项目名称:st2,代码行数:33,代码来源:test_retry_policy.py
示例3: test_retry_on_timeout_max_retries_reached
def test_retry_on_timeout_max_retries_reached(self):
# Verify initial state
self.assertSequenceEqual(LiveAction.get_all(), [])
self.assertSequenceEqual(ActionExecution.get_all(), [])
# Start a mock action which times out
liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'})
live_action_db, execution_db = action_service.request(liveaction)
live_action_db.status = LIVEACTION_STATUS_TIMED_OUT
live_action_db.context['policies'] = {}
live_action_db.context['policies']['retry'] = {'retry_count': 2}
execution_db.status = LIVEACTION_STATUS_TIMED_OUT
LiveAction.add_or_update(live_action_db)
ActionExecution.add_or_update(execution_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# Note: There should be no new objects since max retries has been reached
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), 1)
self.assertEqual(len(action_execution_dbs), 1)
self.assertEqual(action_execution_dbs[0].status, LIVEACTION_STATUS_TIMED_OUT)
开发者ID:Plexxi,项目名称:st2,代码行数:25,代码来源:test_retry_policy.py
示例4: test_dispatch_runner_failure
def test_dispatch_runner_failure(self):
runner_container = get_runner_container()
params = {
'actionstr': 'bar'
}
actionexec_db = self._get_failingaction_exec_db_model(params)
actionexec_db = ActionExecution.add_or_update(actionexec_db)
self.assertTrue(runner_container.dispatch(actionexec_db))
# pickup updated actionexec_db
actionexec_db = ActionExecution.get_by_id(actionexec_db.id)
self.assertTrue('message' in actionexec_db.result)
self.assertTrue('traceback' in actionexec_db.result)
开发者ID:bjoernbessert,项目名称:st2,代码行数:12,代码来源:test_runner_container.py
示例5: test_dispatch
def test_dispatch(self):
runner_container = get_runner_container()
params = {
'actionstr': 'bar'
}
actionexec_db = self._get_action_exec_db_model(params)
actionexec_db = ActionExecution.add_or_update(actionexec_db)
# Assert that execution ran successfully.
self.assertTrue(runner_container.dispatch(actionexec_db))
actionexec_db = ActionExecution.get_by_id(actionexec_db.id)
result = actionexec_db.result
self.assertTrue(result.get('action_params').get('actionint') == 10)
self.assertTrue(result.get('action_params').get('actionstr') == 'bar')
开发者ID:bjoernbessert,项目名称:st2,代码行数:13,代码来源:test_runner_container.py
示例6: test_retry_on_timeout_first_retry_is_successful
def test_retry_on_timeout_first_retry_is_successful(self):
# Verify initial state
self.assertSequenceEqual(LiveAction.get_all(), [])
self.assertSequenceEqual(ActionExecution.get_all(), [])
# Start a mock action which times out
liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'})
live_action_db, execution_db = action_service.request(liveaction)
live_action_db.status = LIVEACTION_STATUS_TIMED_OUT
execution_db.status = LIVEACTION_STATUS_TIMED_OUT
LiveAction.add_or_update(live_action_db)
ActionExecution.add_or_update(execution_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# There should be two objects - original execution and retried execution
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), 2)
self.assertEqual(len(action_execution_dbs), 2)
self.assertEqual(action_execution_dbs[0].status, LIVEACTION_STATUS_TIMED_OUT)
self.assertEqual(action_execution_dbs[1].status, LIVEACTION_STATUS_REQUESTED)
# Verify retried execution contains policy related context
original_liveaction_id = action_execution_dbs[0].liveaction['id']
context = action_execution_dbs[1].context
self.assertTrue('policies' in context)
self.assertEqual(context['policies']['retry']['retry_count'], 1)
self.assertEqual(context['policies']['retry']['applied_policy'], 'test_policy')
self.assertEqual(context['policies']['retry']['retried_liveaction_id'],
original_liveaction_id)
# Simulate success of second action so no it shouldn't be retried anymore
live_action_db = live_action_dbs[1]
live_action_db.status = LIVEACTION_STATUS_SUCCEEDED
LiveAction.add_or_update(live_action_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# There should be no new object since action succeeds so no retry was attempted
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), 2)
self.assertEqual(len(action_execution_dbs), 2)
self.assertEqual(live_action_dbs[0].status, LIVEACTION_STATUS_TIMED_OUT)
self.assertEqual(live_action_dbs[1].status, LIVEACTION_STATUS_SUCCEEDED)
开发者ID:Plexxi,项目名称:st2,代码行数:50,代码来源:test_retry_policy.py
示例7: put
def put(self, id, actionexecution):
try:
actionexec_db = ActionExecution.get_by_id(id)
except:
msg = 'ActionExecution by id: %s not found.' % id
pecan.abort(http_client, msg)
new_actionexec_db = ActionExecutionAPI.to_model(actionexecution)
if actionexec_db.status != new_actionexec_db.status:
actionexec_db.status = new_actionexec_db.status
if actionexec_db.result != new_actionexec_db.result:
actionexec_db.result = new_actionexec_db.result
actionexec_db = ActionExecution.add_or_update(actionexec_db)
actionexec_api = ActionExecutionAPI.from_model(actionexec_db)
return actionexec_api
开发者ID:bjoernbessert,项目名称:st2,代码行数:14,代码来源:actionexecutions.py
示例8: update_actionexecution_status
def update_actionexecution_status(new_status, actionexec_id=None, actionexec_db=None):
"""
Update the status of the specified ActionExecution to the value provided in
new_status.
The ActionExecution may be specified using either actionexec_id, or as an
actionexec_db instance.
"""
if (actionexec_id is None) and (actionexec_db is None):
raise ValueError('Must specify an actionexec_id or an actionexec_db when '
'calling update_actionexecution_status')
if actionexec_db is None:
actionexec_db = get_actionexec_by_id(actionexec_id)
if new_status not in ACTIONEXEC_STATUSES:
raise ValueError('Attempting to set status for ActionExecution "%s" '
'to unknown status string. Unknown status is "%s"',
actionexec_db, new_status)
LOG.debug('Updating ActionExection: "%s" with status="%s"',
actionexec_db, new_status)
actionexec_db.status = new_status
actionexec_db = ActionExecution.add_or_update(actionexec_db)
LOG.debug('Updated status for ActionExecution object: %s', actionexec_db)
return actionexec_db
开发者ID:bjoernbessert,项目名称:st2,代码行数:27,代码来源:action_db.py
示例9: test_dispatch_override_default_action_params
def test_dispatch_override_default_action_params(self):
runner_container = get_runner_container()
params = {
'actionstr': 'foo',
'actionint': 20
}
actionexec_db = self._get_action_exec_db_model(params)
actionexec_db = ActionExecution.add_or_update(actionexec_db)
# Assert that execution ran successfully.
result = runner_container.dispatch(actionexec_db)
self.assertTrue(result)
actionexec_db = ActionExecution.get_by_id(actionexec_db.id)
result = actionexec_db.result
self.assertTrue(result.get('action_params').get('actionint') == 20)
self.assertTrue(result.get('action_params').get('actionstr') == 'foo')
开发者ID:bjoernbessert,项目名称:st2,代码行数:16,代码来源:test_runner_container.py
示例10: test_basic_execution
def test_basic_execution(self):
action_ref = ResourceReference(name='local', pack='core')
execution = ActionExecutionDB(action=action_ref.ref, parameters={'cmd': 'uname -a'})
execution = action_service.schedule(execution)
execution = ActionExecution.get_by_id(str(execution.id))
self.assertEqual(execution.status, ACTIONEXEC_STATUS_SUCCEEDED)
history = ActionExecutionHistory.get(execution__id=str(execution.id), raise_exception=True)
self.assertDictEqual(history.trigger, {})
self.assertDictEqual(history.trigger_type, {})
self.assertDictEqual(history.trigger_instance, {})
self.assertDictEqual(history.rule, {})
action, _ = action_utils.get_action_by_dict(
{'name': action_ref.name, 'pack': action_ref.pack})
self.assertDictEqual(history.action, vars(ActionAPI.from_model(action)))
runner = RunnerType.get_by_name(action.runner_type['name'])
self.assertDictEqual(history.runner, vars(RunnerTypeAPI.from_model(runner)))
execution = ActionExecution.get_by_id(str(execution.id))
self.assertDictEqual(history.execution, vars(ActionExecutionAPI.from_model(execution)))
开发者ID:bjoernbessert,项目名称:st2,代码行数:18,代码来源:test_history.py
示例11: test_callback
def test_callback(self):
execution = ActionExecutionDB(
action='core.local', parameters={'cmd': 'uname -a'},
callback={'source': 'mistral', 'url': 'http://localhost:8989/v2/tasks/12345'})
execution = action_service.schedule(execution)
execution = ActionExecution.get_by_id(str(execution.id))
self.assertEqual(execution.status, ACTIONEXEC_STATUS_SUCCEEDED)
requests.request.assert_called_with('PUT', execution.callback['url'],
data=json.dumps({'state': 'SUCCESS', 'result': '{}'}),
headers={'content-type': 'application/json'})
开发者ID:bjoernbessert,项目名称:st2,代码行数:10,代码来源:test_mistral_v2.py
示例12: test_chained_executions
def test_chained_executions(self):
action_ref = ResourceReference(name='chain', pack='core')
execution = ActionExecutionDB(action=action_ref.ref)
execution = action_service.schedule(execution)
execution = ActionExecution.get_by_id(str(execution.id))
self.assertEqual(execution.status, ACTIONEXEC_STATUS_SUCCEEDED)
history = ActionExecutionHistory.get(execution__id=str(execution.id), raise_exception=True)
action, _ = action_utils.get_action_by_dict(
{'name': action_ref.name, 'pack': action_ref.pack})
self.assertDictEqual(history.action, vars(ActionAPI.from_model(action)))
runner = RunnerType.get_by_name(action.runner_type['name'])
self.assertDictEqual(history.runner, vars(RunnerTypeAPI.from_model(runner)))
execution = ActionExecution.get_by_id(str(execution.id))
self.assertDictEqual(history.execution, vars(ActionExecutionAPI.from_model(execution)))
self.assertGreater(len(history.children), 0)
for child in history.children:
record = ActionExecutionHistory.get(id=child, raise_exception=True)
self.assertEqual(record.parent, str(history.id))
self.assertEqual(record.action['name'], 'local')
self.assertEqual(record.runner['name'], 'run-local')
开发者ID:bjoernbessert,项目名称:st2,代码行数:20,代码来源:test_history.py
示例13: schedule
def schedule(execution):
# Use the user context from the parent action execution. Subtasks in a workflow
# action can be invoked by a system user and so we want to use the user context
# from the original workflow action.
if getattr(execution, 'context', None) and 'parent' in execution.context:
parent = ActionExecution.get_by_id(execution.context['parent'])
execution.context['user'] = getattr(parent, 'context', dict()).get('user')
# Validate action.
action_db = action_utils.get_action_by_ref(execution.action)
if not action_db:
raise ValueError('Action "%s" cannot be found.' % execution.action)
if not action_db.enabled:
raise ValueError('Unable to execute. Action "%s" is disabled.' % execution.action)
runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
if not hasattr(execution, 'parameters'):
execution.parameters = dict()
# Validate action parameters.
schema = util_schema.get_parameter_schema(action_db)
validator = util_schema.get_validator()
jsonschema.validate(execution.parameters, schema, validator)
# validate that no immutable params are being overriden. Although possible to
# ignore the override it is safer to inform the user to avoid surprises.
immutables = _get_immutable_params(action_db.parameters)
immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
overridden_immutables = [p for p in six.iterkeys(execution.parameters) if p in immutables]
if len(overridden_immutables) > 0:
raise ValueError('Override of immutable parameter(s) %s is unsupported.'
% str(overridden_immutables))
# Write to database and send to message queue.
execution.status = ACTIONEXEC_STATUS_SCHEDULED
execution.start_timestamp = isotime.add_utc_tz(datetime.datetime.utcnow())
execution = ActionExecution.add_or_update(execution)
LOG.audit('Action execution scheduled. ActionExecution=%s.', execution)
return execution
开发者ID:bjoernbessert,项目名称:st2,代码行数:41,代码来源:action.py
示例14: test_triggered_execution
def test_triggered_execution(self):
docs = {
'trigger_type': copy.deepcopy(fixture.ARTIFACTS['trigger_type']),
'trigger': copy.deepcopy(fixture.ARTIFACTS['trigger']),
'rule': copy.deepcopy(fixture.ARTIFACTS['rule']),
'trigger_instance': copy.deepcopy(fixture.ARTIFACTS['trigger_instance'])}
# Trigger an action execution.
trigger_type = TriggerType.add_or_update(
TriggerTypeAPI.to_model(TriggerTypeAPI(**docs['trigger_type'])))
trigger = Trigger.add_or_update(TriggerAPI.to_model(TriggerAPI(**docs['trigger'])))
rule = RuleAPI.to_model(RuleAPI(**docs['rule']))
rule.trigger = reference.get_str_resource_ref_from_model(trigger)
rule = Rule.add_or_update(rule)
trigger_instance = TriggerInstance.add_or_update(
TriggerInstanceAPI.to_model(TriggerInstanceAPI(**docs['trigger_instance'])))
enforcer = RuleEnforcer(trigger_instance, rule)
enforcer.enforce()
# Wait for the action execution to complete and then confirm outcome.
execution = ActionExecution.get(context__trigger_instance__id=str(trigger_instance.id))
self.assertIsNotNone(execution)
execution = ActionExecution.get_by_id(str(execution.id))
self.assertEqual(execution.status, ACTIONEXEC_STATUS_SUCCEEDED)
history = ActionExecutionHistory.get(execution__id=str(execution.id), raise_exception=True)
self.assertDictEqual(history.trigger, vars(TriggerAPI.from_model(trigger)))
self.assertDictEqual(history.trigger_type, vars(TriggerTypeAPI.from_model(trigger_type)))
self.assertDictEqual(history.trigger_instance,
vars(TriggerInstanceAPI.from_model(trigger_instance)))
self.assertDictEqual(history.rule, vars(RuleAPI.from_model(rule)))
action_ref = ResourceReference.from_string_reference(ref=execution.action)
action, _ = action_utils.get_action_by_dict(
{'name': action_ref.name, 'pack': action_ref.pack})
self.assertDictEqual(history.action, vars(ActionAPI.from_model(action)))
runner = RunnerType.get_by_name(action.runner_type['name'])
self.assertDictEqual(history.runner, vars(RunnerTypeAPI.from_model(runner)))
execution = ActionExecution.get_by_id(str(execution.id))
self.assertDictEqual(history.execution, vars(ActionExecutionAPI.from_model(execution)))
开发者ID:bjoernbessert,项目名称:st2,代码行数:38,代码来源:test_history.py
示例15: test_retry_on_timeout_no_retry_since_no_timeout_reached
def test_retry_on_timeout_no_retry_since_no_timeout_reached(self):
# Verify initial state
self.assertSequenceEqual(LiveAction.get_all(), [])
self.assertSequenceEqual(ActionExecution.get_all(), [])
# Start a mock action which succeeds
liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'})
live_action_db, execution_db = action_service.request(liveaction)
live_action_db.status = LIVEACTION_STATUS_SUCCEEDED
execution_db.status = LIVEACTION_STATUS_SUCCEEDED
LiveAction.add_or_update(live_action_db)
ActionExecution.add_or_update(execution_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# There should only be 1 object since the action didn't timeout and therefore it wasn't
# retried
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), 1)
self.assertEqual(len(action_execution_dbs), 1)
self.assertEqual(action_execution_dbs[0].status, LIVEACTION_STATUS_SUCCEEDED)
开发者ID:Plexxi,项目名称:st2,代码行数:24,代码来源:test_retry_policy.py
示例16: record_action_execution
def record_action_execution(self, body):
try:
history_id = bson.ObjectId()
execution = ActionExecution.get_by_id(str(body.id))
action_ref = ResourceReference.from_string_reference(ref=execution.action)
action_db, _ = action_utils.get_action_by_dict(
{'name': action_ref.name,
'pack': action_ref.pack})
runner = RunnerType.get_by_name(action_db.runner_type['name'])
attrs = {
'id': history_id,
'action': vars(ActionAPI.from_model(action_db)),
'runner': vars(RunnerTypeAPI.from_model(runner)),
'execution': vars(ActionExecutionAPI.from_model(execution))
}
if 'rule' in execution.context:
rule = reference.get_model_from_ref(Rule, execution.context.get('rule', {}))
attrs['rule'] = vars(RuleAPI.from_model(rule))
if 'trigger_instance' in execution.context:
trigger_instance_id = execution.context.get('trigger_instance', {})
trigger_instance_id = trigger_instance_id.get('id', None)
trigger_instance = TriggerInstance.get_by_id(trigger_instance_id)
trigger = reference.get_model_by_resource_ref(db_api=Trigger,
ref=trigger_instance.trigger)
trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType,
ref=trigger.type)
trigger_instance = reference.get_model_from_ref(
TriggerInstance, execution.context.get('trigger_instance', {}))
attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance))
attrs['trigger'] = vars(TriggerAPI.from_model(trigger))
attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type))
parent = ActionExecutionHistory.get(execution__id=execution.context.get('parent', ''))
if parent:
attrs['parent'] = str(parent.id)
if str(history_id) not in parent.children:
parent.children.append(str(history_id))
ActionExecutionHistory.add_or_update(parent)
history = ActionExecutionHistoryDB(**attrs)
history = ActionExecutionHistory.add_or_update(history)
except:
LOG.exception('An unexpected error occurred while creating the '
'action execution history.')
raise
开发者ID:bjoernbessert,项目名称:st2,代码行数:48,代码来源:history.py
示例17: test_schedule
def test_schedule(self):
context = {'user': USERNAME}
parameters = {'hosts': 'localhost', 'cmd': 'uname -a'}
request = ActionExecutionDB(action=ACTION_REF, context=context, parameters=parameters)
request = action_service.schedule(request)
execution = ActionExecution.get_by_id(str(request.id))
self.assertIsNotNone(execution)
self.assertEqual(execution.id, request.id)
action = '.'.join([self.actiondb.pack, self.actiondb.name])
actual_action = execution.action
self.assertEqual(actual_action, action)
self.assertEqual(execution.context['user'], request.context['user'])
self.assertDictEqual(execution.parameters, request.parameters)
self.assertEqual(execution.status, ACTIONEXEC_STATUS_SCHEDULED)
# mongoengine DateTimeField stores datetime only up to milliseconds
self.assertEqual(isotime.format(execution.start_timestamp, usec=False),
isotime.format(request.start_timestamp, usec=False))
开发者ID:bjoernbessert,项目名称:st2,代码行数:17,代码来源:test_action.py
示例18: get_actionexec_by_id
def get_actionexec_by_id(actionexec_id):
"""
Get ActionExecution by id.
On error, raise ST2DBObjectNotFoundError.
"""
actionexec = None
try:
actionexec = ActionExecution.get_by_id(actionexec_id)
except (ValidationError, ValueError) as e:
LOG.error('Database lookup for actionexecution with id="%s" resulted in '
'exception: %s', actionexec_id, e)
raise StackStormDBObjectNotFoundError('Unable to find actionexecution with '
'id="%s"' % actionexec_id)
return actionexec
开发者ID:bjoernbessert,项目名称:st2,代码行数:17,代码来源:action_db.py
示例19: update_action_execution_history
def update_action_execution_history(self, body):
try:
count = self.timeout / self.wait
# Allow up to 1 minute for the post event to create the history record.
for i in range(count):
history = ActionExecutionHistory.get(execution__id=str(body.id))
if history:
execution = ActionExecution.get_by_id(str(body.id))
history.execution = vars(ActionExecutionAPI.from_model(execution))
history = ActionExecutionHistory.add_or_update(history)
return
if i >= count:
# If wait failed, create the history record regardless.
self.record_action_execution(body)
return
eventlet.sleep(self.wait)
except:
LOG.exception('An unexpected error occurred while updating the '
'action execution history.')
raise
开发者ID:bjoernbessert,项目名称:st2,代码行数:20,代码来源:history.py
示例20: test_retry_on_timeout_policy_is_retried_twice
def test_retry_on_timeout_policy_is_retried_twice(self):
# Verify initial state
self.assertSequenceEqual(LiveAction.get_all(), [])
self.assertSequenceEqual(ActionExecution.get_all(), [])
# Start a mock action which times out
liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'})
live_action_db, execution_db = action_service.request(liveaction)
live_action_db.status = LIVEACTION_STATUS_TIMED_OUT
execution_db.status = LIVEACTION_STATUS_TIMED_OUT
LiveAction.add_or_update(live_action_db)
ActionExecution.add_or_update(execution_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# There should be two objects - original execution and retried execution
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), 2)
self.assertEqual(len(action_execution_dbs), 2)
self.assertEqual(action_execution_dbs[0].status, LIVEACTION_STATUS_TIMED_OUT)
self.assertEqual(action_execution_dbs[1].status, LIVEACTION_STATUS_REQUESTED)
# Verify retried execution contains policy related context
original_liveaction_id = action_execution_dbs[0].liveaction['id']
context = action_execution_dbs[1].context
self.assertTrue('policies' in context)
self.assertEqual(context['policies']['retry']['retry_count'], 1)
self.assertEqual(context['policies']['retry']['applied_policy'], 'test_policy')
self.assertEqual(context['policies']['retry']['retried_liveaction_id'],
original_liveaction_id)
# Simulate timeout of second action which should cause another retry
live_action_db = live_action_dbs[1]
live_action_db.status = LIVEACTION_STATUS_TIMED_OUT
LiveAction.add_or_update(live_action_db)
execution_db = action_execution_dbs[1]
execution_db.status = LIVEACTION_STATUS_TIMED_OUT
ActionExecution.add_or_update(execution_db)
# Simulate policy "apply_after" run
self.policy.apply_after(target=live_action_db)
# There should be three objects - original execution and 2 retried executions
live_action_dbs = LiveAction.get_all()
action_execution_dbs = ActionExecution.get_all()
self.assertEqual(len(live_action_dbs), 3)
self.assertEqual(len(action_execution_dbs), 3)
self.assertEqual(action_execution_dbs[0].status, LIVEACTION_STATUS_TIMED_OUT)
self.assertEqual(action_execution_dbs[1].status, LIVEACTION_STATUS_TIMED_OUT)
self.assertEqual(action_execution_dbs[2].status, LIVEACTION_STATUS_REQUESTED)
# Verify retried execution contains policy related context
original_liveaction_id = action_execution_dbs[1].liveaction['id']
context = action_execution_dbs[2].context
self.assertTrue('policies' in context)
self.assertEqual(context['policies']['retry']['retry_count'], 2)
self.assertEqual(context['policies']['retry']['applied_policy'], 'test_policy')
self.assertEqual(context['policies']['retry']['retried_liveaction_id'],
original_liveaction_id)
开发者ID:Plexxi,项目名称:st2,代码行数:65,代码来源:test_retry_policy.py
注:本文中的st2common.persistence.action.ActionExecution类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论