• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python runnersregistrar.register_runners函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中st2common.bootstrap.runnersregistrar.register_runners函数的典型用法代码示例。如果您正苦于以下问题:Python register_runners函数的具体用法?Python register_runners怎么用?Python register_runners使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了register_runners函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUpClass

    def setUpClass(cls):
        super(MistralRunnerCallbackTest, cls).setUpClass()

        # Override the retry configuration here otherwise st2tests.config.parse_args
        # in ExecutionDbTestCase.setUpClass will reset these overrides.
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')

        # Register runners.
        runnersregistrar.register_runners()

        # Register test pack(s).
        actions_registrar = actionsregistrar.ActionsRegistrar(
            use_pack_cache=False,
            fail_on_failure=True
        )

        for pack in PACKS:
            actions_registrar.register_from_pack(pack)

        # Get an instance of the callback module and reference to mistral status map
        cls.callback_module = runner_base.get_callback_module(MISTRAL_RUNNER_NAME)
        cls.callback_class = cls.callback_module.get_instance()
        cls.status_map = cls.callback_module.STATUS_MAP
开发者ID:StackStorm,项目名称:st2,代码行数:26,代码来源:test_mistral_v2_callback.py


示例2: setUpClass

    def setUpClass(cls):
        super(ExecutionCancellationTestCase, cls).setUpClass()
        for _, fixture in six.iteritems(FIXTURES['actions']):
            instance = ActionAPI(**fixture)
            Action.add_or_update(ActionAPI.to_model(instance))

        runners_registrar.register_runners()
开发者ID:nzlosh,项目名称:st2,代码行数:7,代码来源:test_execution_cancellation.py


示例3: _do_setUpClass

    def _do_setUpClass(cls):
        tests_config.parse_args()

        cfg.CONF.set_default('enable', cls.enable_auth, group='auth')

        cfg.CONF.set_override(name='enable', override=False, group='rbac')

        opts = cfg.CONF.api_pecan
        cfg_dict = {
            'app': {
                'root': opts.root,
                'template_path': opts.template_path,
                'modules': opts.modules,
                'debug': opts.debug,
                'auth_enable': opts.auth_enable,
                'errors': {'__force_dict__': True},
                'guess_content_type_from_ext': False
            }
        }

        # TODO(manas) : register action types here for now. RunnerType registration can be moved
        # to posting to /runnertypes but that implies implementing POST.
        runners_registrar.register_runners()

        cls.app = load_test_app(config=cfg_dict)
开发者ID:LindsayHill,项目名称:st2,代码行数:25,代码来源:base.py


示例4: setUp

    def setUp(self):
        super(MistralRunnerPolicyTest, self).setUp()

        # Start with a clean database for each test.
        self._establish_connection_and_re_create_db()

        # Register runners.
        runnersregistrar.register_runners()

        actions_registrar = actionsregistrar.ActionsRegistrar(
            use_pack_cache=False,
            fail_on_failure=True
        )

        for pack in PACKS:
            actions_registrar.register_from_pack(pack)

        # Register policies required for the tests.
        policiesregistrar.register_policy_types(st2common)

        policies_registrar = policiesregistrar.PolicyRegistrar(
            use_pack_cache=False,
            fail_on_failure=True
        )

        for pack in PACKS:
            policies_registrar.register_from_pack(pack)
开发者ID:nzlosh,项目名称:st2,代码行数:27,代码来源:test_mistral_v2_policy.py


示例5: setUpClass

    def setUpClass(cls):
        super(WorkerTestCase, cls).setUpClass()

        runners_registrar.register_runners()

        models = WorkerTestCase.fixtures_loader.save_fixtures_to_db(
            fixtures_pack=FIXTURES_PACK, fixtures_dict=TEST_FIXTURES)
        WorkerTestCase.local_action_db = models['actions']['local.yaml']
开发者ID:nzlosh,项目名称:st2,代码行数:8,代码来源:test_worker.py


示例6: setUpClass

 def setUpClass(cls):
     super(TestActionExecutionHistoryWorker, cls).setUpClass()
     runners_registrar.register_runners()
     action_local = ActionAPI(**copy.deepcopy(fixture.ARTIFACTS['actions']['local']))
     Action.add_or_update(ActionAPI.to_model(action_local))
     action_chain = ActionAPI(**copy.deepcopy(fixture.ARTIFACTS['actions']['chain']))
     action_chain.entry_point = fixture.PATH + '/chain.yaml'
     Action.add_or_update(ActionAPI.to_model(action_chain))
开发者ID:StackStorm,项目名称:st2,代码行数:8,代码来源:test_executions.py


示例7: setUpClass

    def setUpClass(cls):
        super(PolicyServiceTestCase, cls).setUpClass()

        # Register runners
        runners_registrar.register_runners()

        # Register common policy types
        policies_registrar.register_policy_types(st2common)

        loader = fixtures.FixturesLoader()
        loader.save_fixtures_to_db(fixtures_pack=PACK,
                                   fixtures_dict=TEST_FIXTURES)
开发者ID:StackStorm,项目名称:st2,代码行数:12,代码来源:test_policy.py


示例8: setUpClass

    def setUpClass(cls):
        ExecutionDbTestCase.setUpClass()

        # Register runners
        runners_registrar.register_runners()

        # Register common policy types
        register_policy_types(st2common)

        loader = FixturesLoader()
        loader.save_fixtures_to_db(fixtures_pack=PACK,
                                   fixtures_dict=TEST_FIXTURES)
开发者ID:StackStorm,项目名称:st2,代码行数:12,代码来源:test_scheduler.py


示例9: setUpClass

    def setUpClass(cls):
        super(BaseRuleEnforcerTestCase, cls).setUpClass()

        runners_registrar.register_runners()

        # Create TriggerTypes before creation of Rule to avoid failure. Rule requires the
        # Trigger and therefore TriggerType to be created prior to rule creation.
        cls.models = FixturesLoader().save_fixtures_to_db(
            fixtures_pack=PACK, fixtures_dict=FIXTURES_1)
        cls.models.update(FixturesLoader().save_fixtures_to_db(
            fixtures_pack=PACK, fixtures_dict=FIXTURES_2))
        MOCK_TRIGGER_INSTANCE.trigger = reference.get_ref_from_model(
            cls.models['triggers']['trigger1.yaml'])
开发者ID:StackStorm,项目名称:st2,代码行数:13,代码来源:test_enforce.py


示例10: _do_setUpClass

    def _do_setUpClass(cls):
        tests_config.parse_args()

        cfg.CONF.set_default('enable', cls.enable_auth, group='auth')

        cfg.CONF.set_override(name='enable', override=False, group='rbac')

        # TODO(manas) : register action types here for now. RunnerType registration can be moved
        # to posting to /runnertypes but that implies implementing POST.
        if cls.register_runners:
            runners_registrar.register_runners()

        cls.app = TestApp(cls.app_module.setup_app())
开发者ID:StackStorm,项目名称:st2,代码行数:13,代码来源:api.py


示例11: setUpClass

    def setUpClass(cls):
        super(OrquestaRunnerTest, cls).setUpClass()

        # Register runners.
        runnersregistrar.register_runners()

        # Register test pack(s).
        actions_registrar = actionsregistrar.ActionsRegistrar(
            use_pack_cache=False,
            fail_on_failure=True
        )

        for pack in PACKS:
            actions_registrar.register_from_pack(pack)
开发者ID:nzlosh,项目名称:st2,代码行数:14,代码来源:test_output_schema.py


示例12: setUpClass

    def setUpClass(cls):
        super(WorkflowExecutionWriteConflictTest, cls).setUpClass()

        # Register runners.
        runnersregistrar.register_runners()

        # Register test pack(s).
        actions_registrar = actionsregistrar.ActionsRegistrar(
            use_pack_cache=False,
            fail_on_failure=True
        )

        for pack in PACKS:
            actions_registrar.register_from_pack(pack)
开发者ID:nzlosh,项目名称:st2,代码行数:14,代码来源:test_workflow_write_conflict.py


示例13: setUpClass

    def setUpClass(cls):
        super(OrquestaRunnerTest, cls).setUpClass()

        # Register runners and policy types.
        runnersregistrar.register_runners()
        policiesregistrar.register_policy_types(st2common)

        # Register test pack(s).
        registrar_options = {'use_pack_cache': False, 'fail_on_failure': True}
        actions_registrar = actionsregistrar.ActionsRegistrar(**registrar_options)
        policies_registrar = policiesregistrar.PolicyRegistrar(**registrar_options)

        for pack in PACKS:
            actions_registrar.register_from_pack(pack)
            policies_registrar.register_from_pack(pack)
开发者ID:nzlosh,项目名称:st2,代码行数:15,代码来源:test_policies.py


示例14: setUpClass

    def setUpClass(cls):
        super(WorkflowInspectionControllerTest, cls).setUpClass()
        st2tests.WorkflowTestCase.setUpClass()

        # Register runners.
        runnersregistrar.register_runners()

        # Register test pack(s).
        actions_registrar = actionsregistrar.ActionsRegistrar(
            use_pack_cache=False,
            fail_on_failure=True
        )

        for pack in PACKS:
            actions_registrar.register_from_pack(pack)
开发者ID:nzlosh,项目名称:st2,代码行数:15,代码来源:test_workflow_inspection.py


示例15: setUp

    def setUp(self):
        super(SchedulerPoliciesTestCase, self).setUp()

        # Register runners
        runners_registrar.register_runners()

        # Register common policy types
        register_policy_types(st2common)

        loader = FixturesLoader()
        models = loader.save_fixtures_to_db(fixtures_pack=PACK,
                                            fixtures_dict=TEST_FIXTURES_2)

        # Policy with "post_run" application
        self.policy_db = models['policies']['policy_1.yaml']
开发者ID:StackStorm,项目名称:st2,代码行数:15,代码来源:test_base.py


示例16: setUpClass

    def setUpClass(cls):
        super(MistralRunnerTest, cls).setUpClass()

        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')

        # Register runners.
        runnersregistrar.register_runners()

        # Register test pack(s).
        actions_registrar = actionsregistrar.ActionsRegistrar(
            use_pack_cache=False,
            fail_on_failure=True
        )

        for pack in PACKS:
            actions_registrar.register_from_pack(pack)
开发者ID:nzlosh,项目名称:st2,代码行数:16,代码来源:test_mistral_v2.py


示例17: register_runners

def register_runners():
    # Register runners
    runner_dir = cfg.CONF.register.runner_dir
    if runner_dir:
        runner_dir = [runner_dir]
    registered_count = 0
    fail_on_failure = cfg.CONF.register.fail_on_failure

    # 1. Register runner types
    try:
        LOG.info('=========================================================')
        LOG.info('############## Registering runners ######################')
        LOG.info('=========================================================')
        registered_count = runners_registrar.register_runners(runner_dirs=runner_dir,
                                                              fail_on_failure=fail_on_failure,
                                                              experimental=False)
    except Exception as error:
        exc_info = not fail_on_failure

        # TODO: Narrow exception window
        LOG.warning('Failed to register runners: %s', error, exc_info=exc_info)

        if fail_on_failure:
            raise error

    LOG.info('Registered %s runners.', registered_count)
开发者ID:lyandut,项目名称:st2,代码行数:26,代码来源:bootstrap.py


示例18: setUpClass

    def setUpClass(cls):
        EventletTestCase.setUpClass()
        DbTestCase.setUpClass()

        # Override the coordinator to use the noop driver otherwise the tests will be blocked.
        tests_config.parse_args(coordinator_noop=True)
        coordination.COORDINATOR = None

        # Register runners
        runners_registrar.register_runners()

        # Register common policy types
        register_policy_types(st2common)

        loader = FixturesLoader()
        loader.save_fixtures_to_db(fixtures_pack=PACK,
                                   fixtures_dict=TEST_FIXTURES)
开发者ID:StackStorm,项目名称:st2,代码行数:17,代码来源:test_concurrency.py


示例19: setUpClass

    def setUpClass(cls):
        super(ActionParamsUtilsTest, cls).setUpClass()

        runners_registrar.register_runners()

        cls.runnertype_dbs = {}
        cls.action_dbs = {}

        for _, fixture in six.iteritems(FIXTURES['runners']):
            instance = RunnerTypeAPI(**fixture)
            runnertype_db = RunnerType.add_or_update(RunnerTypeAPI.to_model(instance))
            cls.runnertype_dbs[runnertype_db.name] = runnertype_db

        for _, fixture in six.iteritems(FIXTURES['actions']):
            instance = ActionAPI(**fixture)
            action_db = Action.add_or_update(ActionAPI.to_model(instance))
            cls.action_dbs[action_db.name] = action_db
开发者ID:StackStorm,项目名称:st2,代码行数:17,代码来源:test_action_param_utils.py


示例20: setUpClass

    def setUpClass(cls):
        super(SchedulingPolicyTest, cls).setUpClass()

        # Register runners
        runners_registrar.register_runners()

        for _, fixture in six.iteritems(FIXTURES['actions']):
            instance = ActionAPI(**fixture)
            Action.add_or_update(ActionAPI.to_model(instance))

        for _, fixture in six.iteritems(FIXTURES['policytypes']):
            instance = PolicyTypeAPI(**fixture)
            PolicyType.add_or_update(PolicyTypeAPI.to_model(instance))

        for _, fixture in six.iteritems(FIXTURES['policies']):
            instance = PolicyAPI(**fixture)
            Policy.add_or_update(PolicyAPI.to_model(instance))
开发者ID:StackStorm,项目名称:st2,代码行数:17,代码来源:test_policies.py



注:本文中的st2common.bootstrap.runnersregistrar.register_runners函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python config.parse_args函数代码示例发布时间:2022-05-27
下一篇:
Python policiesregistrar.register_policy_types函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap