本文整理汇总了Python中st2common.runners.base.get_runner函数的典型用法代码示例。如果您正苦于以下问题:Python get_runner函数的具体用法?Python get_runner怎么用?Python get_runner使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_runner函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_runner
def _get_runner(self, runnertype_db, action_db, liveaction_db):
runner = get_runner(runnertype_db.runner_module)
resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
action_db.entry_point)
runner.runner_type_db = runnertype_db
runner.container_service = RunnerContainerService()
runner.action = action_db
runner.action_name = action_db.name
runner.liveaction = liveaction_db
runner.liveaction_id = str(liveaction_db.id)
runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
runner.execution_id = str(runner.execution.id)
runner.entry_point = resolved_entry_point
runner.context = getattr(liveaction_db, 'context', dict())
runner.callback = getattr(liveaction_db, 'callback', dict())
runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
action_db.entry_point)
# For re-run, get the ActionExecutionDB in which the re-run is based on.
rerun_ref_id = runner.context.get('re-run', {}).get('ref')
runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
return runner
开发者ID:Pulsant,项目名称:st2,代码行数:25,代码来源:base.py
示例2: setUp
def setUp(self):
super(MistralAuthTest, self).setUp()
# Mock the local runner run method.
local_runner_cls = runners.get_runner('local-shell-cmd').__class__
local_run_result = (action_constants.LIVEACTION_STATUS_SUCCEEDED, NON_EMPTY_RESULT, None)
local_runner_cls.run = mock.Mock(return_value=local_run_result)
开发者ID:StackStorm,项目名称:st2,代码行数:7,代码来源:test_mistral_v2_auth.py
示例3: _invoke_post_run
def _invoke_post_run(self, actionexec_db, action_db):
LOG.info(
"Invoking post run for action execution %s. Action=%s; Runner=%s",
actionexec_db.id,
action_db.name,
action_db.runner_type["name"],
)
# Get an instance of the action runner.
runnertype_db = get_runnertype_by_name(action_db.runner_type["name"])
runner = get_runner(runnertype_db.runner_module)
# Configure the action runner.
runner.container_service = RunnerContainerService()
runner.action = action_db
runner.action_name = action_db.name
runner.action_execution_id = str(actionexec_db.id)
runner.entry_point = RunnerContainerService.get_entry_point_abs_path(
pack=action_db.pack, entry_point=action_db.entry_point
)
runner.context = getattr(actionexec_db, "context", dict())
runner.callback = getattr(actionexec_db, "callback", dict())
runner.libs_dir_path = RunnerContainerService.get_action_libs_abs_path(
pack=action_db.pack, entry_point=action_db.entry_point
)
# Invoke the post_run method.
runner.post_run(actionexec_db.status, actionexec_db.result)
开发者ID:Pulsant,项目名称:st2,代码行数:28,代码来源:base.py
示例4: test_get_runner_module_fail
def test_get_runner_module_fail(self):
runnertype_db = RunnerTypeDB(name='dummy', runner_module='absent.module')
runner = None
try:
runner = get_runner(runnertype_db.runner_module, runnertype_db.runner_module)
except ActionRunnerCreateError:
pass
self.assertFalse(runner, 'TestRunner must be valid.')
开发者ID:lyandut,项目名称:st2,代码行数:8,代码来源:test_runner_container.py
示例5: test_callback_incomplete_state
def test_callback_incomplete_state(self):
local_runner_cls = runners.get_runner('local-shell-cmd').__class__
local_run_result = (action_constants.LIVEACTION_STATUS_RUNNING, NON_EMPTY_RESULT, None)
local_runner_cls.run = mock.Mock(return_value=local_run_result)
liveaction = self.get_liveaction_instance()
liveaction, execution = action_service.request(liveaction)
liveaction = self._wait_on_status(liveaction, local_run_result[0])
self.assertFalse(action_executions.ActionExecutionManager.update.called)
开发者ID:StackStorm,项目名称:st2,代码行数:8,代码来源:test_mistral_v2_callback.py
示例6: test_pre_run_runner_is_disabled
def test_pre_run_runner_is_disabled(self):
runnertype_db = RunnerContainerTest.runnertype_db
runner = get_runner(runnertype_db.runner_module, runnertype_db.runner_module)
runner.runner_type = runnertype_db
runner.runner_type.enabled = False
expected_msg = 'Runner "test-runner-1" has been disabled by the administrator'
self.assertRaisesRegexp(ValueError, expected_msg, runner.pre_run)
开发者ID:lyandut,项目名称:st2,代码行数:9,代码来源:test_runner_container.py
示例7: test_callback_retry
def test_callback_retry(self):
local_runner_cls = runners.get_runner('local-shell-cmd').__class__
local_run_result = (action_constants.LIVEACTION_STATUS_SUCCEEDED, NON_EMPTY_RESULT, None)
local_runner_cls.run = mock.Mock(return_value=local_run_result)
liveaction = self.get_liveaction_instance()
liveaction, execution = action_service.request(liveaction)
liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
calls = [call('12345', state='SUCCESS', output=NON_EMPTY_RESULT) for i in range(0, 2)]
action_executions.ActionExecutionManager.update.assert_has_calls(calls)
开发者ID:StackStorm,项目名称:st2,代码行数:10,代码来源:test_mistral_v2_callback.py
示例8: test_cancel_on_task_action_concurrency_by_attr
def test_cancel_on_task_action_concurrency_by_attr(self):
# Delete other policies in the test pack to avoid conflicts.
required_policy = 'mistral_tests.cancel_on_concurrency_by_attr'
self._drop_all_other_policies(required_policy)
# Get threshold from the policy.
policy = Policy.get_by_ref(required_policy)
threshold = policy.parameters.get('threshold', 0)
self.assertGreater(threshold, 0)
params = {'friend': 'grande animalerie'}
# Launch instances of the workflow up to threshold.
for i in range(0, threshold):
liveaction = LiveActionDB(action=WF1_NAME, parameters=params)
liveaction, execution1 = action_service.request(liveaction)
liveaction = LiveAction.get_by_id(str(liveaction.id))
liveaction = self._wait_on_status(
liveaction,
action_constants.LIVEACTION_STATUS_RUNNING
)
# Check number of running instances
running = LiveAction.count(
action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING,
parameters__friend=params['friend'])
self.assertEqual(running, threshold)
# Mock the mistral runner cancel method to assert cancel is called.
mistral_runner_cls = runners.get_runner('mistral-v2').__class__
mock_cancel_return_value = (action_constants.LIVEACTION_STATUS_CANCELING, None, None)
mock_cancel = mock.MagicMock(return_value=mock_cancel_return_value)
with mock.patch.object(mistral_runner_cls, 'cancel', mock_cancel):
# Launch another instance of the workflow with mistral callback defined
# to indicate that this is executed under a workflow.
callback = {
'source': MISTRAL_RUNNER_NAME,
'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
}
liveaction2 = LiveActionDB(action=WF1_NAME, parameters=params, callback=callback)
liveaction2, execution2 = action_service.request(liveaction2)
liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
# Assert cancel has been called.
liveaction2 = self._wait_on_status(
liveaction2,
action_constants.LIVEACTION_STATUS_CANCELING
)
mistral_runner_cls.cancel.assert_called_once_with()
开发者ID:nzlosh,项目名称:st2,代码行数:54,代码来源:test_mistral_v2_policy.py
示例9: test_callback_paused_state
def test_callback_paused_state(self):
local_runner_cls = runners.get_runner('local-shell-cmd').__class__
local_run_result = (action_constants.LIVEACTION_STATUS_PAUSED, NON_EMPTY_RESULT, None)
local_runner_cls.run = mock.Mock(return_value=local_run_result)
expected_mistral_status = self.status_map[local_run_result[0]]
liveaction = self.get_liveaction_instance()
liveaction, execution = action_service.request(liveaction)
liveaction = self._wait_on_status(liveaction, local_run_result[0])
action_executions.ActionExecutionManager.update.assert_called_with(
'12345', state=expected_mistral_status, output=NON_EMPTY_RESULT)
开发者ID:StackStorm,项目名称:st2,代码行数:11,代码来源:test_mistral_v2_callback.py
示例10: test_callback_retry_exhausted
def test_callback_retry_exhausted(self):
local_runner_cls = runners.get_runner('local-shell-cmd').__class__
local_run_result = (action_constants.LIVEACTION_STATUS_SUCCEEDED, NON_EMPTY_RESULT, None)
local_runner_cls.run = mock.Mock(return_value=local_run_result)
liveaction = self.get_liveaction_instance()
liveaction, execution = action_service.request(liveaction)
liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
# This test initially setup mock for action_executions.ActionExecutionManager.update
# to fail the first 4 times and return success on the 5th times. The max attempts
# is set to 3. We expect only 3 calls to pass thru the update method.
calls = [call('12345', state='SUCCESS', output=NON_EMPTY_RESULT) for i in range(0, 2)]
action_executions.ActionExecutionManager.update.assert_has_calls(calls)
开发者ID:StackStorm,项目名称:st2,代码行数:13,代码来源:test_mistral_v2_callback.py
示例11: test_resume_option_reset_tasks
def test_resume_option_reset_tasks(self):
patched_mistral_runner = runners.get_runner('mistral-v2').__class__
mock_resume_result = (
action_constants.LIVEACTION_STATUS_RUNNING,
{'tasks': []},
{'execution_id': str(uuid.uuid4())}
)
with mock.patch.object(patched_mistral_runner, 'resume_workflow',
mock.MagicMock(return_value=mock_resume_result)):
liveaction1 = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
liveaction1, execution1 = action_service.request(liveaction1)
self.assertFalse(patched_mistral_runner.resume_workflow.called)
# Rerun the execution.
context = {
're-run': {
'ref': execution1.id,
'tasks': ['x', 'y'],
'reset': ['y']
}
}
liveaction2 = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS, context=context)
liveaction2, execution2 = action_service.request(liveaction2)
liveaction2 = self._wait_on_status(
liveaction2,
action_constants.LIVEACTION_STATUS_RUNNING
)
task_specs = {
'x': {
'reset': False
},
'y': {
'reset': True
}
}
patched_mistral_runner.resume_workflow.assert_called_with(
ex_ref=execution1,
task_specs=task_specs
)
开发者ID:StackStorm,项目名称:st2,代码行数:46,代码来源:test_mistral_v2_rerun.py
示例12: _get_runner
def _get_runner(self, runner_type_db, action_db, liveaction_db):
resolved_entry_point = self._get_entry_point_abs_path(action_db.pack, action_db.entry_point)
context = getattr(liveaction_db, 'context', dict())
user = context.get('user', cfg.CONF.system_user.user)
config = None
# Note: Right now configs are only supported by the Python runner actions
if (runner_type_db.name == 'python-script' or
runner_type_db.runner_module == 'python_runner'):
LOG.debug('Loading config from pack for python runner.')
config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
config = config_loader.get_config()
runner = get_runner(
name=runner_type_db.name,
config=config)
# TODO: Pass those arguments to the constructor instead of late
# assignment, late assignment is awful
runner.runner_type = runner_type_db
runner.action = action_db
runner.action_name = action_db.name
runner.liveaction = liveaction_db
runner.liveaction_id = str(liveaction_db.id)
runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
runner.execution_id = str(runner.execution.id)
runner.entry_point = resolved_entry_point
runner.context = context
runner.callback = getattr(liveaction_db, 'callback', dict())
runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
action_db.entry_point)
# For re-run, get the ActionExecutionDB in which the re-run is based on.
rerun_ref_id = runner.context.get('re-run', {}).get('ref')
runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
return runner
开发者ID:nzlosh,项目名称:st2,代码行数:37,代码来源:base.py
示例13: test_get_runner_module
def test_get_runner_module(self):
runnertype_db = RunnerContainerTest.runnertype_db
runner = get_runner(runnertype_db.runner_module, runnertype_db.runner_module)
self.assertTrue(runner is not None, 'TestRunner must be valid.')
开发者ID:lyandut,项目名称:st2,代码行数:4,代码来源:test_runner_container.py
示例14: test_get_runner_module
def test_get_runner_module(self):
runner = get_runner(name='local-shell-script')
self.assertTrue(runner is not None, 'TestRunner must be valid.')
开发者ID:StackStorm,项目名称:st2,代码行数:3,代码来源:test_runner_container.py
示例15: get_runner_class
def get_runner_class(cls, package_name, module_name):
return runners.get_runner(package_name, module_name).__class__
开发者ID:lyandut,项目名称:st2,代码行数:2,代码来源:test_mistral_v2_auth.py
示例16: get_runner_class
def get_runner_class(cls, runner_name):
return runners.get_runner(runner_name, runner_name).__class__
开发者ID:nzlosh,项目名称:st2,代码行数:2,代码来源:test_output_schema.py
示例17: test_get_runner_success
def test_get_runner_success(self):
runner = get_runner('local-shell-cmd')
self.assertTrue(runner)
self.assertEqual(runner.__class__.__name__, 'LocalShellCommandRunner')
开发者ID:nzlosh,项目名称:st2,代码行数:4,代码来源:test_runners_base.py
注:本文中的st2common.runners.base.get_runner函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论