• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pythonrunner.get_runner函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中st2actions.runners.pythonrunner.get_runner函数的典型用法代码示例。如果您正苦于以下问题:Python get_runner函数的具体用法?Python get_runner怎么用?Python get_runner使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_runner函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_simple_action_fail

 def test_simple_action_fail(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.entry_point = PACAL_ROW_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     result = runner.run({'row_index': '4'})
     self.assertTrue(result)
     self.assertEqual(runner.container_service.get_status(), ACTIONEXEC_STATUS_FAILED)
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:8,代码来源:test_pythonrunner.py


示例2: test_simple_action_no_file

 def test_simple_action_no_file(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.entry_point = ''
     runner.container_service = service.RunnerContainerService()
     result = runner.run({})
     self.assertTrue(result)
     self.assertEqual(runner.container_service.get_status(), ACTIONEXEC_STATUS_FAILED)
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:8,代码来源:test_pythonrunner.py


示例3: test_simple_action_no_entry_point

    def test_simple_action_no_entry_point(self):
        runner = pythonrunner.get_runner()
        runner.action = self._get_mock_action_obj()
        runner.runner_parameters = {}
        runner.entry_point = ''
        runner.container_service = service.RunnerContainerService()

        expected_msg = 'Action .*? is missing entry_point attribute'
        self.assertRaisesRegexp(Exception, expected_msg, runner.run, {})
开发者ID:maniacs-ops,项目名称:st2,代码行数:9,代码来源:test_pythonrunner.py


示例4: test_simple_action

 def test_simple_action(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.entry_point = PACAL_ROW_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     result = runner.run({'row_index': 4})
     self.assertTrue(result)
     self.assertEqual(runner.container_service.get_status(), ACTIONEXEC_STATUS_SUCCEEDED)
     self.assertEqual(runner.container_service.get_result()['result'], '[1, 4, 6, 4, 1]')
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:9,代码来源:test_pythonrunner.py


示例5: test_exception_in_simple_action_with_invalid_status

 def test_exception_in_simple_action_with_invalid_status(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.runner_parameters = {}
     runner.entry_point = PASCAL_ROW_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     runner.pre_run()
     self.assertRaises(ValueError,
                       runner.run, action_parameters={'row_index': 'd'})
开发者ID:maniacs-ops,项目名称:st2,代码行数:9,代码来源:test_pythonrunner.py


示例6: test_stdout_interception_and_parsing

    def test_stdout_interception_and_parsing(self, mock_popen):
        values = {'delimiter': ACTION_OUTPUT_RESULT_DELIMITER}

        # No output to stdout and no result (implicit None)
        mock_stdout = '%(delimiter)sNone%(delimiter)s' % values
        mock_stderr = 'foo stderr'
        mock_process = mock.Mock()
        mock_process.communicate.return_value = (mock_stdout, mock_stderr)
        mock_process.returncode = 0
        mock_popen.return_value = mock_process

        runner = pythonrunner.get_runner()
        runner.action = self._get_mock_action_obj()
        runner.runner_parameters = {}
        runner.entry_point = PACAL_ROW_ACTION_PATH
        runner.container_service = service.RunnerContainerService()
        runner.pre_run()
        (_, output, _) = runner.run({'row_index': 4})

        self.assertEqual(output['stdout'], '')
        self.assertEqual(output['stderr'], mock_stderr)
        self.assertEqual(output['result'], 'None')
        self.assertEqual(output['exit_code'], 0)

        # Output to stdout and no result (implicit None)
        mock_stdout = 'pre result%(delimiter)sNone%(delimiter)spost result' % values
        mock_stderr = 'foo stderr'
        mock_process = mock.Mock()
        mock_process.communicate.return_value = (mock_stdout, mock_stderr)
        mock_process.returncode = 0
        mock_popen.return_value = mock_process

        runner = pythonrunner.get_runner()
        runner.action = self._get_mock_action_obj()
        runner.runner_parameters = {}
        runner.entry_point = PACAL_ROW_ACTION_PATH
        runner.container_service = service.RunnerContainerService()
        runner.pre_run()
        (_, output, _) = runner.run({'row_index': 4})

        self.assertEqual(output['stdout'], 'pre resultpost result')
        self.assertEqual(output['stderr'], mock_stderr)
        self.assertEqual(output['result'], 'None')
        self.assertEqual(output['exit_code'], 0)
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:44,代码来源:test_pythonrunner.py


示例7: test_simple_action_no_file

 def test_simple_action_no_file(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.runner_parameters = {}
     runner.entry_point = 'foo.py'
     runner.container_service = service.RunnerContainerService()
     runner.pre_run()
     (status, result, _) = runner.run({})
     self.assertTrue(result is not None)
     self.assertEqual(status, LIVEACTION_STATUS_FAILED)
开发者ID:maniacs-ops,项目名称:st2,代码行数:10,代码来源:test_pythonrunner.py


示例8: test_simple_action_fail

 def test_simple_action_fail(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.runner_parameters = {}
     runner.entry_point = PASCAL_ROW_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     runner.pre_run()
     (status, result, _) = runner.run({'row_index': '4'})
     self.assertTrue(result is not None)
     self.assertEqual(status, LIVEACTION_STATUS_FAILED)
开发者ID:maniacs-ops,项目名称:st2,代码行数:10,代码来源:test_pythonrunner.py


示例9: test_simple_action_with_status_succeeded

 def test_simple_action_with_status_succeeded(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.runner_parameters = {}
     runner.entry_point = PASCAL_ROW_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     runner.pre_run()
     (status, output, _) = runner.run({'row_index': 4})
     self.assertEqual(status, LIVEACTION_STATUS_SUCCEEDED)
     self.assertTrue(output is not None)
     self.assertEqual(output['result'], [1, 4, 6, 4, 1])
开发者ID:maniacs-ops,项目名称:st2,代码行数:11,代码来源:test_pythonrunner.py


示例10: test_action_with_same_module_name_as_module_in_stdlib

 def test_action_with_same_module_name_as_module_in_stdlib(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.runner_parameters = {}
     runner.entry_point = TEST_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     runner.pre_run()
     (status, output, _) = runner.run({})
     self.assertEqual(status, LIVEACTION_STATUS_SUCCEEDED)
     self.assertTrue(output is not None)
     self.assertEqual(output['result'], 'test action')
开发者ID:maniacs-ops,项目名称:st2,代码行数:11,代码来源:test_pythonrunner.py


示例11: test_simple_action_with_status_failed

 def test_simple_action_with_status_failed(self):
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.runner_parameters = {}
     runner.entry_point = PASCAL_ROW_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     runner.pre_run()
     (status, output, _) = runner.run({'row_index': 'a'})
     self.assertEqual(status, LIVEACTION_STATUS_FAILED)
     self.assertTrue(output is not None)
     self.assertEqual(output['result'], "This is suppose to fail don't worry!!")
开发者ID:maniacs-ops,项目名称:st2,代码行数:11,代码来源:test_pythonrunner.py


示例12: test_simple_action_timeout

 def test_simple_action_timeout(self):
     timeout = 0
     runner = pythonrunner.get_runner()
     runner.action = self._get_mock_action_obj()
     runner.runner_parameters = {pythonrunner.RUNNER_TIMEOUT: timeout}
     runner.entry_point = PASCAL_ROW_ACTION_PATH
     runner.container_service = service.RunnerContainerService()
     runner.pre_run()
     (status, output, _) = runner.run({'row_index': 4})
     self.assertEqual(status, LIVEACTION_STATUS_TIMED_OUT)
     self.assertTrue(output is not None)
     self.assertEqual(output['result'], 'None')
     self.assertEqual(output['error'], 'Action failed to complete in 0 seconds')
     self.assertEqual(output['exit_code'], -9)
开发者ID:maniacs-ops,项目名称:st2,代码行数:14,代码来源:test_pythonrunner.py


示例13: test_common_st2_env_vars_are_available_to_the_action

    def test_common_st2_env_vars_are_available_to_the_action(self, mock_popen):
        mock_process = mock.Mock()
        mock_process.communicate.return_value = ('', '')
        mock_popen.return_value = mock_process

        runner = pythonrunner.get_runner()
        runner.auth_token = mock.Mock()
        runner.auth_token.token = 'ponies'
        runner.action = self._get_mock_action_obj()
        runner.runner_parameters = {}
        runner.entry_point = PASCAL_ROW_ACTION_PATH
        runner.container_service = service.RunnerContainerService()
        runner.pre_run()
        (_, _, _) = runner.run({'row_index': 4})

        _, call_kwargs = mock_popen.call_args
        actual_env = call_kwargs['env']
        self.assertCommonSt2EnvVarsAvailableInEnv(env=actual_env)
开发者ID:maniacs-ops,项目名称:st2,代码行数:18,代码来源:test_pythonrunner.py


示例14: test_action_with_user_supplied_env_vars

    def test_action_with_user_supplied_env_vars(self, mock_popen):
        env_vars = {'key1': 'val1', 'key2': 'val2', 'PYTHONPATH': 'foobar'}

        mock_process = mock.Mock()
        mock_process.communicate.return_value = ('', '')
        mock_popen.return_value = mock_process

        runner = pythonrunner.get_runner()
        runner.action = self._get_mock_action_obj()
        runner.runner_parameters = {'env': env_vars}
        runner.entry_point = PASCAL_ROW_ACTION_PATH
        runner.container_service = service.RunnerContainerService()
        runner.pre_run()
        (_, _, _) = runner.run({'row_index': 4})

        _, call_kwargs = mock_popen.call_args
        actual_env = call_kwargs['env']

        for key, value in env_vars.items():
            # Verify that a blacklsited PYTHONPATH has been filtered out
            if key == 'PYTHONPATH':
                self.assertTrue(actual_env[key] != value)
            else:
                self.assertEqual(actual_env[key], value)
开发者ID:maniacs-ops,项目名称:st2,代码行数:24,代码来源:test_pythonrunner.py


示例15: test_runner_creation

 def test_runner_creation(self):
     runner = pythonrunner.get_runner()
     self.assertTrue(runner is not None, 'Creation failed. No instance.')
     self.assertEqual(type(runner), pythonrunner.PythonRunner, 'Creation failed. No instance.')
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:4,代码来源:test_pythonrunner.py



注:本文中的st2actions.runners.pythonrunner.get_runner函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python parallel_ssh.ParallelSSHClient类代码示例发布时间:2022-05-27
下一篇:
Python actionchainrunner.get_runner函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap