本文整理汇总了Python中simpleflow.swf.executor.Executor类的典型用法代码示例。如果您正苦于以下问题:Python Executor类的具体用法?Python Executor怎么用?Python Executor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Executor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_activity_task_timeout
def test_activity_task_timeout():
workflow = TestDefinition
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
decision_id = history.last_id
(history
.add_activity_task(
increment,
activity_id='activity-tests.test_dataflow.increment-1',
decision_id=decision_id,
last_state='timed_out',
timeout_type='START_TO_CLOSE'))
decisions, _ = executor.replay(history)
# The task timed out and there is no retry.
assert len(decisions) == 1
reason = (
"Cannot replay the workflow: MultipleExceptions("
"('futures failed', [TimeoutError(START_TO_CLOSE)]))"
)
workflow_failed = swf.models.decision.WorkflowExecutionDecision()
workflow_failed.fail(reason=reason)
decision = decisions[0]
assert decision.type == 'FailWorkflowExecution'
assert decision['failWorkflowExecutionDecisionAttributes']['reason'] == reason
开发者ID:someboredkiddo,项目名称:simpleflow,代码行数:28,代码来源:test_dataflow.py
示例2: test_workflow_failed_from_definition
def test_workflow_failed_from_definition():
workflow = TestDefinitionFailWorkflow
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# Let's directly add the task in state ``failed`` to make the executor fail
# the workflow.
history.add_activity_task(
raise_error,
decision_id=history.last_id,
activity_id='activity-tests.test_dataflow.raise_error-1',
last_state='failed',
result=json.dumps(None))
(history
.add_decision_task_scheduled()
.add_decision_task_started())
# Now the workflow definition calls ``Workflow.fail('error')`` that should
# fail the whole workflow.
decisions, _ = executor.replay(history)
assert executor._workflow.failed is True
workflow_failed = swf.models.decision.WorkflowExecutionDecision()
workflow_failed.fail(reason='Workflow execution failed: error')
assert decisions[0] == workflow_failed
开发者ID:darkjh,项目名称:simpleflow,代码行数:28,代码来源:test_dataflow.py
示例3: test_more_than_1000_open_activities_scheduled_and_running
def test_more_than_1000_open_activities_scheduled_and_running():
def get_random_state():
import random
return random.choice(['scheduled', 'started'])
workflow = TestDefinitionMoreThanMaxOpenActivities
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# The first time, the executor should schedule
# ``constants.MAX_OPEN_ACTIVITY_COUNT`` decisions.
# No timer because we wait for at least an activity to complete.
for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT / constants.MAX_DECISIONS):
decisions, _ = executor.replay(history)
assert len(decisions) == constants.MAX_DECISIONS
decision_id = history.last_id
for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT):
history.add_activity_task(
increment,
decision_id=decision_id,
activity_id='activity-tests.test_dataflow.increment-{}'.format(
i + 1),
last_state=get_random_state(),
result=i + 1)
(history
.add_decision_task_scheduled()
.add_decision_task_started())
decisions, _ = executor.replay(history)
assert len(decisions) == 0
开发者ID:someboredkiddo,项目名称:simpleflow,代码行数:31,代码来源:test_dataflow.py
示例4: test_workflow_activity_raises_on_failure
def test_workflow_activity_raises_on_failure():
workflow = TestDefinitionActivityRaisesOnFailure
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
history.add_activity_task(
raise_on_failure,
decision_id=history.last_id,
activity_id='activity-tests.data.activities.raise_on_failure-1',
last_state='failed',
reason='error')
(history
.add_decision_task_scheduled()
.add_decision_task_started())
# The executor should fail the workflow and extract the reason from the
# exception raised in the workflow definition.
decisions, _ = executor.replay(history)
assert executor._workflow.failed is True
workflow_failed = swf.models.decision.WorkflowExecutionDecision()
workflow_failed.fail(
details='DETAILS',
reason='Workflow execution error in task '
'activity-tests.data.activities.raise_on_failure: '
'"error"')
assert decisions[0] == workflow_failed
开发者ID:zenefits,项目名称:simpleflow,代码行数:30,代码来源:test_dataflow.py
示例5: test_multiple_scheduled_activities
def test_multiple_scheduled_activities():
"""
When ``Future.exception`` was made blocking if the future is not finished,
:py:meth:`swf.executor.Executor.resume` did not check ``future.finished``
before ``future.exception is None``. It mades the call to ``.resume()`` to
block for the first scheduled task it encountered instead of returning it.
This issue was fixed in commit 6398aa8.
With the wrong behaviour, the call to ``executor.replay()`` would not
schedule the ``double`` task even after the task represented by *b*
(``self.submit(increment, 2)``) has completed.
"""
workflow = TestMultipleScheduledActivitiesDefinition
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
decision_id = history.last_id
(history
.add_activity_task_scheduled(
increment,
decision_id=decision_id,
activity_id='activity-tests.data.activities.increment-1',
input={'args': 1})
# The right behaviour is to schedule the ``double`` task when *b* is in
# state finished.
.add_activity_task(
increment,
decision_id=decision_id,
activity_id='activity-tests.data.activities.increment-2',
last_state='completed',
input={'args': 2},
result='3'))
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], double)
开发者ID:zenefits,项目名称:simpleflow,代码行数:34,代码来源:test_dataflow.py
示例6: test_workflow_with_input
def test_workflow_with_input():
workflow = TestDefinitionWithInput
executor = Executor(DOMAIN, workflow)
result = 5
history = builder.History(workflow,
input={'args': (4,)})
# The executor should only schedule the *increment* task.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(increment,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.increment-1',
input={'args': 1},
result=result)
.add_decision_task_scheduled()
.add_decision_task_started())
# As there is only a single task, the executor should now complete the
# workflow and set its result accordingly.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result=json.dumps(result))
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:31,代码来源:test_dataflow.py
示例7: test_on_failure_callback
def test_on_failure_callback():
workflow = TestOnFailureDefinition
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
history.add_activity_task(
raise_error,
decision_id=history.last_id,
activity_id='activity-tests.test_dataflow.raise_error-1',
last_state='failed',
reason='error')
(history
.add_decision_task_scheduled()
.add_decision_task_started())
# The executor should fail the workflow and extract the reason from the
# exception raised in the workflow definition.
decisions, _ = executor.replay(history)
assert executor._workflow.failed is True
workflow_failed = swf.models.decision.WorkflowExecutionDecision()
workflow_failed.fail(
reason='Workflow execution failed: FAIL')
assert decisions[0] == workflow_failed
开发者ID:darkjh,项目名称:simpleflow,代码行数:27,代码来源:test_dataflow.py
示例8: test_workflow_with_two_tasks_same_future
def test_workflow_with_two_tasks_same_future():
workflow = TestDefinitionTwoTasksSameFuture
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# ``b.result`` and ``c.result`` requires the execution of ``double(a)`` and
# ``increment(a)``. They both depend on the execution of ``increment(1)``so
# the executor should schedule ``increment(1)``.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(increment,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.increment-1',
input={'args': 1},
result=2)
.add_decision_task_scheduled()
.add_decision_task_started())
# Now ``a.result`` is available and the executor should schedule the
# execution of ``double(a)`` and ``increment(a)`` at the same time.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], double)
check_task_scheduled_decision(decisions[1], increment)
# Let's add both tasks to the history to simulate their completion.
decision_id = history.last_id
(history
.add_activity_task(double,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.double-1',
input={'args': 2},
result=4)
.add_activity_task(increment,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.increment-2',
input={'args': 2},
result=3)
.add_decision_task_scheduled()
.add_decision_task_started())
# Both tasks completed, hence the executor should complete the workflow.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result=json.dumps((4, 3)))
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:54,代码来源:test_dataflow.py
示例9: test_workflow_with_after_run
def test_workflow_with_after_run():
workflow = TestDefinitionWithAfterRun
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow,
input={'args': (4,)})
# The executor should only schedule the *increment* task.
assert not hasattr(executor._workflow, 'b')
decisions, _ = executor.replay(history)
assert executor._workflow.b == 5
开发者ID:someboredkiddo,项目名称:simpleflow,代码行数:11,代码来源:test_dataflow.py
示例10: test_workflow_with_timeout_override
def test_workflow_with_timeout_override():
workflow = TestDefinitionWithInputWithTaskTimeoutOverride
executor = Executor(DOMAIN, workflow)
result = 5
history = builder.History(workflow,
input={'args': (4,), 'kwargs': { 'task_start_to_close_timeout': '100' }})
# The executor should only schedule the *increment* task.
decisions, _ = executor.replay(history)
assert decisions[0]['scheduleActivityTaskDecisionAttributes']['startToCloseTimeout'] == str(100 + int(default.ACTIVITY_SOFT_TIMEOUT_BUFFER) + int(default.ACTIVITY_HARD_TIMEOUT_BUFFER))
开发者ID:zenefits,项目名称:simpleflow,代码行数:12,代码来源:test_dataflow.py
示例11: test_workflow_retry_activity_failed_again
def test_workflow_retry_activity_failed_again():
workflow = TestDefinitionRetryActivity
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# There is a single task, hence the executor should schedule it first.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment_retry)
# Let's add the task in ``failed`` state.
decision_id = history.last_id
(history
.add_activity_task(
increment_retry,
decision_id=decision_id,
last_state='failed',
activity_id='activity-tests.test_dataflow.increment_retry-1')
.add_decision_task_scheduled()
.add_decision_task_started())
# As the retry value is one, the executor should retry i.e. schedule the
# task again.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment_retry)
# Let's add the task in ``failed`` state again.
decision_id = history.last_id
(history
.add_activity_task(
increment_retry,
decision_id=decision_id,
last_state='failed',
activity_id='activity-tests.test_dataflow.increment_retry-1')
.add_decision_task_scheduled()
.add_decision_task_started())
# There is no more retry. The executor should set `Future.exception` and
# complete the workflow as there is no further task.
decisions, _ = executor.replay(history)
reason = (
"Cannot replay the workflow: TaskFailed("
"('activity-tests.test_dataflow.increment_retry-1', 'REASON', 'DETAILS'))"
)
workflow_failed = swf.models.decision.WorkflowExecutionDecision()
workflow_failed.fail(reason=reason)
decision = decisions[0]
assert decision.type == 'FailWorkflowExecution'
assert decision['failWorkflowExecutionDecisionAttributes']['reason'] == reason
开发者ID:someboredkiddo,项目名称:simpleflow,代码行数:52,代码来源:test_dataflow.py
示例12: test_workflow_with_same_task_called_two_times
def test_workflow_with_same_task_called_two_times():
"""
This test checks how the executor behaves when the same task is executed
two times with a different argument.
"""
workflow = TestDefinitionSameTask
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# As the second task depends on the first, the executor should only
# schedule the first task.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(increment,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.increment-1',
input={'args': 1},
result=2)
.add_decision_task_scheduled()
.add_decision_task_started())
# The first task is finished, the executor should schedule the second one.
decision_id = history.last_id
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(increment,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.increment-2',
input={'args': 2},
result=3)
.add_decision_task_scheduled()
.add_decision_task_started())
# The executor should now complete the workflow.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result=json.dumps(3))
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:51,代码来源:test_dataflow.py
示例13: test_workflow_with_two_tasks_not_completed
def test_workflow_with_two_tasks_not_completed():
"""
This test checks how the executor behaves when a task is still running.
"""
workflow = TestDefinitionWithInput
executor = Executor(DOMAIN, workflow)
arg = 4
result = 5
history = builder.History(workflow,
input={'args': (arg,)})
# The executor should schedule *increment*.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
# Let's add the task in state ``started`` to the history.
decision_id = history.last_id
scheduled_id = decision_id + 1
(history
.add_activity_task(increment,
decision_id=decision_id,
last_state='started',
activity_id='activity-tests.test_dataflow.increment-1',
input={'args': 1},
result=5)
.add_decision_task_scheduled()
.add_decision_task_started())
# The executor cannot schedule any other task, it returns an empty
# decision.
decisions, _ = executor.replay(history)
assert len(decisions) == 0
# Let's now set the task as ``completed`` in the history.
decision_id = history.last_id
(history
.add_activity_task_completed(scheduled=scheduled_id,
started=scheduled_id + 1,
result=result)
.add_decision_task_scheduled()
.add_decision_task_started())
# As there is a single task and it is now finished, the executor should
# complete the workflow.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result=json.dumps(result))
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:51,代码来源:test_dataflow.py
示例14: test_get_event_details
def test_get_event_details(self):
history = builder.History(ExampleWorkflow, input={})
signal_input = {'x': 42, 'foo': 'bar', '__propagate': False}
marker_details = {'baz': 'bae'}
history.add_signal('a_signal', signal_input)
history.add_marker('a_marker', marker_details)
history.add_timer_started('a_timer', 1, decision_id=2)
history.add_timer_fired('a_timer')
executor = Executor(DOMAIN, ExampleWorkflow)
executor.replay(Response(history=history, execution=None))
details = executor.get_event_details('signal', 'a_signal')
del details['timestamp']
expect(details).to.equal({
'type': 'signal',
'state': 'signaled',
'name': 'a_signal',
'input': signal_input,
'event_id': 4,
'external_initiated_event_id': 0,
'external_run_id': None,
'external_workflow_id': None,
})
details = executor.get_event_details('signal', 'another_signal')
expect(details).to.be.none
details = executor.get_event_details('marker', 'a_marker')
del details['timestamp']
expect(details).to.equal({
'type': 'marker',
'state': 'recorded',
'name': 'a_marker',
'details': marker_details,
'event_id': 5,
})
details = executor.get_event_details('marker', 'another_marker')
expect(details).to.be.none
details = executor.get_event_details('timer', 'a_timer')
del details['started_event_timestamp']
del details['fired_event_timestamp']
expect(details).to.equal({
'type': 'timer',
'state': 'fired',
'id': 'a_timer',
'decision_task_completed_event_id': 2,
'start_to_fire_timeout': 1,
'started_event_id': 6,
'fired_event_id': 7,
'control': None,
})
details = executor.get_event_details('timer', 'another_timer')
expect(details).to.be.none
开发者ID:botify-labs,项目名称:simpleflow,代码行数:55,代码来源:test_executor.py
示例15: test_workflow_with_two_tasks
def test_workflow_with_two_tasks():
workflow = TestDefinition
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# *double* requires the result of *increment*, hold by the *a* future.
# Hence the executor schedule *increment*.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(increment,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.increment-1',
input={'args': 1},
result=2)
.add_decision_task_scheduled()
.add_decision_task_started())
# Now ``a.result``contains the result of *increment*'s that is finished.
# The line ``return b.result`` requires the computation of *double* with
# ``a.result``, then the executor should schedule *double*.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], double)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(double,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.double-1',
input={'args': 2},
result=4)
.add_decision_task_scheduled()
.add_decision_task_started())
# *double* has completed and the ``b.result``is now available. The executor
# should complete the workflow and its result to ``b.result``.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result=json.dumps(4))
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:48,代码来源:test_dataflow.py
示例16: test_workflow_with_more_than_max_decisions
def test_workflow_with_more_than_max_decisions():
workflow = TestDefinitionMoreThanMaxDecisions
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# The first time, the executor should schedule ``constants.MAX_DECISIONS``
# decisions and a timer to force the scheduling of the remaining tasks.
decisions, _ = executor.replay(history)
assert len(decisions) == constants.MAX_DECISIONS
assert decisions[-1].type == 'StartTimer'
decision_id = history.last_id
for i in xrange(constants.MAX_DECISIONS):
history.add_activity_task(
increment,
decision_id=decision_id,
activity_id='activity-tests.test_dataflow.increment-{}'.format(
i + 1),
last_state='completed',
result=i + 1)
(history
.add_decision_task_scheduled()
.add_decision_task_started())
# Once the first batch of ``constants.MAX_DECISIONS`` tasks is finished,
# the executor should schedule the 20 remaining ones.
decisions, _ = executor.replay(history)
assert len(decisions) == 20
for i in xrange(constants.MAX_DECISIONS - 1, constants.MAX_DECISIONS + 20):
history.add_activity_task(
increment,
decision_id=decision_id,
activity_id='activity-tests.test_dataflow.increment-{}'.format(
i + 1),
last_state='completed',
result=i + 1)
(history
.add_decision_task_scheduled()
.add_decision_task_started())
# All tasks are finised, the executor should complete the workflow.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result='null')
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:47,代码来源:test_dataflow.py
示例17: test_activity_task_timeout_retry
def test_activity_task_timeout_retry():
workflow = TestDefinitionRetryActivity
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
decision_id = history.last_id
(history
.add_activity_task(
increment_retry,
activity_id='activity-tests.test_dataflow.increment_retry-1',
decision_id=decision_id,
last_state='timed_out',
timeout_type='START_TO_CLOSE'))
decisions, _ = executor.replay(history)
assert len(decisions) == 1
check_task_scheduled_decision(decisions[0], increment_retry)
开发者ID:darkjh,项目名称:simpleflow,代码行数:17,代码来源:test_dataflow.py
示例18: test_workflow_retry_activity_failed_again
def test_workflow_retry_activity_failed_again():
workflow = TestDefinitionRetryActivity
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# There is a single task, hence the executor should schedule it first.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment_retry)
# Let's add the task in ``failed`` state.
decision_id = history.last_id
(history
.add_activity_task(
increment_retry,
decision_id=decision_id,
last_state='failed',
activity_id='activity-tests.test_dataflow.increment_retry-1')
.add_decision_task_scheduled()
.add_decision_task_started())
# As the retry value is one, the executor should retry i.e. schedule the
# task again.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment_retry)
# Let's add the task in ``failed`` state again.
decision_id = history.last_id
(history
.add_activity_task(
increment_retry,
decision_id=decision_id,
last_state='failed',
activity_id='activity-tests.test_dataflow.increment_retry-1')
.add_decision_task_scheduled()
.add_decision_task_started())
# There is no more retry. The executor should set `Future.exception` and
# complete the workflow as there is no further task.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
# ``a.result`` is ``None`` because it was not set.
workflow_completed.complete(result=json.dumps(None))
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:46,代码来源:test_dataflow.py
示例19: test_workflow_reuse_same_future
def test_workflow_reuse_same_future():
workflow = TestDefinitionSameFuture
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
# *double* depends on *increment*, then the executor should only schedule
# *increment* at first.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(increment,
decision_id=decision_id,
last_state='completed',
input={'args': 1},
activity_id='activity-tests.test_dataflow.increment-1',
result=2)
.add_decision_task_scheduled()
.add_decision_task_started())
# *increment* is finished, the executor should schedule *double*.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], double)
# Let's add the task to the history to simulate its completion.
decision_id = history.last_id
(history
.add_activity_task(double,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.double-1',
input={'args': 2},
result=4)
.add_decision_task_scheduled()
.add_decision_task_started())
# The executor should now complete the workflow.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result=json.dumps(4))
assert decisions[0] == workflow_completed
开发者ID:darkjh,项目名称:simpleflow,代码行数:45,代码来源:test_dataflow.py
示例20: test_activity_not_found_schedule_failed
def test_activity_not_found_schedule_failed():
workflow = TestDefinition
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
decision_id = history.last_id
(history
.add_activity_task_schedule_failed(
activity_id='activity-tests.test_dataflow.increment-1',
decision_id=decision_id,
activity_type={
'name': increment.name,
'version': increment.version
},
cause='ACTIVITY_TYPE_DOES_NOT_EXIST'))
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)
开发者ID:darkjh,项目名称:simpleflow,代码行数:18,代码来源:test_dataflow.py
注:本文中的simpleflow.swf.executor.Executor类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论