本文整理汇总了Python中st2actions.runners.pythonrunner.get_runner函数的典型用法代码示例。如果您正苦于以下问题:Python get_runner函数的具体用法?Python get_runner怎么用?Python get_runner使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_runner函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_simple_action_fail
def test_simple_action_fail(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.entry_point = PACAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
result = runner.run({'row_index': '4'})
self.assertTrue(result)
self.assertEqual(runner.container_service.get_status(), ACTIONEXEC_STATUS_FAILED)
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:8,代码来源:test_pythonrunner.py
示例2: test_simple_action_no_file
def test_simple_action_no_file(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.entry_point = ''
runner.container_service = service.RunnerContainerService()
result = runner.run({})
self.assertTrue(result)
self.assertEqual(runner.container_service.get_status(), ACTIONEXEC_STATUS_FAILED)
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:8,代码来源:test_pythonrunner.py
示例3: test_simple_action_no_entry_point
def test_simple_action_no_entry_point(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = ''
runner.container_service = service.RunnerContainerService()
expected_msg = 'Action .*? is missing entry_point attribute'
self.assertRaisesRegexp(Exception, expected_msg, runner.run, {})
开发者ID:maniacs-ops,项目名称:st2,代码行数:9,代码来源:test_pythonrunner.py
示例4: test_simple_action
def test_simple_action(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.entry_point = PACAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
result = runner.run({'row_index': 4})
self.assertTrue(result)
self.assertEqual(runner.container_service.get_status(), ACTIONEXEC_STATUS_SUCCEEDED)
self.assertEqual(runner.container_service.get_result()['result'], '[1, 4, 6, 4, 1]')
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:9,代码来源:test_pythonrunner.py
示例5: test_exception_in_simple_action_with_invalid_status
def test_exception_in_simple_action_with_invalid_status(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = PASCAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
self.assertRaises(ValueError,
runner.run, action_parameters={'row_index': 'd'})
开发者ID:maniacs-ops,项目名称:st2,代码行数:9,代码来源:test_pythonrunner.py
示例6: test_stdout_interception_and_parsing
def test_stdout_interception_and_parsing(self, mock_popen):
values = {'delimiter': ACTION_OUTPUT_RESULT_DELIMITER}
# No output to stdout and no result (implicit None)
mock_stdout = '%(delimiter)sNone%(delimiter)s' % values
mock_stderr = 'foo stderr'
mock_process = mock.Mock()
mock_process.communicate.return_value = (mock_stdout, mock_stderr)
mock_process.returncode = 0
mock_popen.return_value = mock_process
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = PACAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(_, output, _) = runner.run({'row_index': 4})
self.assertEqual(output['stdout'], '')
self.assertEqual(output['stderr'], mock_stderr)
self.assertEqual(output['result'], 'None')
self.assertEqual(output['exit_code'], 0)
# Output to stdout and no result (implicit None)
mock_stdout = 'pre result%(delimiter)sNone%(delimiter)spost result' % values
mock_stderr = 'foo stderr'
mock_process = mock.Mock()
mock_process.communicate.return_value = (mock_stdout, mock_stderr)
mock_process.returncode = 0
mock_popen.return_value = mock_process
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = PACAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(_, output, _) = runner.run({'row_index': 4})
self.assertEqual(output['stdout'], 'pre resultpost result')
self.assertEqual(output['stderr'], mock_stderr)
self.assertEqual(output['result'], 'None')
self.assertEqual(output['exit_code'], 0)
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:44,代码来源:test_pythonrunner.py
示例7: test_simple_action_no_file
def test_simple_action_no_file(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = 'foo.py'
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(status, result, _) = runner.run({})
self.assertTrue(result is not None)
self.assertEqual(status, LIVEACTION_STATUS_FAILED)
开发者ID:maniacs-ops,项目名称:st2,代码行数:10,代码来源:test_pythonrunner.py
示例8: test_simple_action_fail
def test_simple_action_fail(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = PASCAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(status, result, _) = runner.run({'row_index': '4'})
self.assertTrue(result is not None)
self.assertEqual(status, LIVEACTION_STATUS_FAILED)
开发者ID:maniacs-ops,项目名称:st2,代码行数:10,代码来源:test_pythonrunner.py
示例9: test_simple_action_with_status_succeeded
def test_simple_action_with_status_succeeded(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = PASCAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(status, output, _) = runner.run({'row_index': 4})
self.assertEqual(status, LIVEACTION_STATUS_SUCCEEDED)
self.assertTrue(output is not None)
self.assertEqual(output['result'], [1, 4, 6, 4, 1])
开发者ID:maniacs-ops,项目名称:st2,代码行数:11,代码来源:test_pythonrunner.py
示例10: test_action_with_same_module_name_as_module_in_stdlib
def test_action_with_same_module_name_as_module_in_stdlib(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = TEST_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(status, output, _) = runner.run({})
self.assertEqual(status, LIVEACTION_STATUS_SUCCEEDED)
self.assertTrue(output is not None)
self.assertEqual(output['result'], 'test action')
开发者ID:maniacs-ops,项目名称:st2,代码行数:11,代码来源:test_pythonrunner.py
示例11: test_simple_action_with_status_failed
def test_simple_action_with_status_failed(self):
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = PASCAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(status, output, _) = runner.run({'row_index': 'a'})
self.assertEqual(status, LIVEACTION_STATUS_FAILED)
self.assertTrue(output is not None)
self.assertEqual(output['result'], "This is suppose to fail don't worry!!")
开发者ID:maniacs-ops,项目名称:st2,代码行数:11,代码来源:test_pythonrunner.py
示例12: test_simple_action_timeout
def test_simple_action_timeout(self):
timeout = 0
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {pythonrunner.RUNNER_TIMEOUT: timeout}
runner.entry_point = PASCAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(status, output, _) = runner.run({'row_index': 4})
self.assertEqual(status, LIVEACTION_STATUS_TIMED_OUT)
self.assertTrue(output is not None)
self.assertEqual(output['result'], 'None')
self.assertEqual(output['error'], 'Action failed to complete in 0 seconds')
self.assertEqual(output['exit_code'], -9)
开发者ID:maniacs-ops,项目名称:st2,代码行数:14,代码来源:test_pythonrunner.py
示例13: test_common_st2_env_vars_are_available_to_the_action
def test_common_st2_env_vars_are_available_to_the_action(self, mock_popen):
mock_process = mock.Mock()
mock_process.communicate.return_value = ('', '')
mock_popen.return_value = mock_process
runner = pythonrunner.get_runner()
runner.auth_token = mock.Mock()
runner.auth_token.token = 'ponies'
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {}
runner.entry_point = PASCAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(_, _, _) = runner.run({'row_index': 4})
_, call_kwargs = mock_popen.call_args
actual_env = call_kwargs['env']
self.assertCommonSt2EnvVarsAvailableInEnv(env=actual_env)
开发者ID:maniacs-ops,项目名称:st2,代码行数:18,代码来源:test_pythonrunner.py
示例14: test_action_with_user_supplied_env_vars
def test_action_with_user_supplied_env_vars(self, mock_popen):
env_vars = {'key1': 'val1', 'key2': 'val2', 'PYTHONPATH': 'foobar'}
mock_process = mock.Mock()
mock_process.communicate.return_value = ('', '')
mock_popen.return_value = mock_process
runner = pythonrunner.get_runner()
runner.action = self._get_mock_action_obj()
runner.runner_parameters = {'env': env_vars}
runner.entry_point = PASCAL_ROW_ACTION_PATH
runner.container_service = service.RunnerContainerService()
runner.pre_run()
(_, _, _) = runner.run({'row_index': 4})
_, call_kwargs = mock_popen.call_args
actual_env = call_kwargs['env']
for key, value in env_vars.items():
# Verify that a blacklsited PYTHONPATH has been filtered out
if key == 'PYTHONPATH':
self.assertTrue(actual_env[key] != value)
else:
self.assertEqual(actual_env[key], value)
开发者ID:maniacs-ops,项目名称:st2,代码行数:24,代码来源:test_pythonrunner.py
示例15: test_runner_creation
def test_runner_creation(self):
runner = pythonrunner.get_runner()
self.assertTrue(runner is not None, 'Creation failed. No instance.')
self.assertEqual(type(runner), pythonrunner.PythonRunner, 'Creation failed. No instance.')
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:4,代码来源:test_pythonrunner.py
注:本文中的st2actions.runners.pythonrunner.get_runner函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论