• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python actions.start_one函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中servicemanager.actions.actions.start_one函数的典型用法代码示例。如果您正苦于以下问题:Python start_one函数的具体用法?Python start_one怎么用?Python start_one使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了start_one函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_start_and_stop_one_with_append_args

 def test_start_and_stop_one_with_append_args(self):
     config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
     context = SmContext(SmApplication(config_dir_override), None, False, False)
     actions.start_one(context, "TEST_ONE", True, False, None, None, ["; echo 'Fin du sleep!!'"])
     self.assertEquals(len(context.get_service("TEST_ONE").status()), 2) # it is two in this case because the append creates a forked process
     context.kill("TEST_ONE")
     self.assertEqual(context.get_service("TEST_ONE").status(), [])
开发者ID:liquidarmour,项目名称:service-manager,代码行数:7,代码来源:tests.py


示例2: test_dropwizard_from_jar

    def test_dropwizard_from_jar(self):
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        sm_application = SmApplication(config_dir_override)
        context = SmContext(sm_application, None, False, False)
        service_resolver = ServiceResolver(sm_application)

        # start fake nexus
        actions.start_one(context, "FAKE_NEXUS", True, False, None, port=None)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        servicetostart = "DROPWIZARD_NEXUS_END_TO_END_TEST"
        actions.start_and_wait(
            service_resolver,
            context,
            [servicetostart],
            True,
            False,
            None,
            port=None,
            seconds_to_wait=90,
            append_args=None,
        )
        self.assertIsNotNone(context.get_service(servicetostart).status())
        context.kill(servicetostart)
        context.kill("FAKE_NEXUS")
        time.sleep(5)
        self.assertEqual(context.get_service(servicetostart).status(), [])
        self.assertEqual(context.get_service("FAKE_NEXUS").status(), [])
开发者ID:Jagordon,项目名称:service-manager,代码行数:29,代码来源:tests.py


示例3: test_start_and_stop_one

 def test_start_and_stop_one(self):
     config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
     context = smcontext.SmContext(smcontext.SmApplication(config_dir_override), None, False, False)
     actions.start_one(context, "TEST_ONE", True, False, None, port=None)
     self.assertIsNotNone(context.get_service("TEST_ONE").status())
     context.kill("TEST_ONE")
     self.assertEqual(context.get_service("TEST_ONE").status(), [])
开发者ID:carlosroman,项目名称:service-manager,代码行数:7,代码来源:tests.py


示例4: test_python_server_offline

 def test_python_server_offline(self):
     config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
     context = smcontext.SmContext(smcontext.SmApplication(config_dir_override), None, True, False)
     actions.start_one(context, "PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND", True, False, None, port=None)
     self.assertIsNotNone(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status())
     context.kill("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND")
     time.sleep(5)
     self.assertEqual(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status(), [])
开发者ID:carlosroman,项目名称:service-manager,代码行数:8,代码来源:tests.py


示例5: test_python_server_offline

 def test_python_server_offline(self):
     context = SmContext(SmApplication(self.config_dir_override), None, True, False)
     port = None
     append_args = None
     actions.start_one(context, "PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND", False, True, False, None, port, append_args)
     self.assertIsNotNone(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status())
     context.kill("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND", True)
     self.assertEqual(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status(), [])
开发者ID:elabeca,项目名称:service-manager,代码行数:8,代码来源:test_actions.py


示例6: test_assets_server

    def test_assets_server(self):
        context = SmContext(SmApplication(self.config_dir_override), None, False, False)
        context.kill_everything(True)

        self.startFakeNexus()

        actions.start_one(context, "PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND", False, True, False, None, port=None)
        self.assertIsNotNone(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status())
        context.kill("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND", wait=True)

        self.assertEqual(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status(), [])
开发者ID:elabeca,项目名称:service-manager,代码行数:11,代码来源:test_actions.py


示例7: test_successful_play_from_jar_without_waiting_with_append_args

    def test_successful_play_from_jar_without_waiting_with_append_args(self):
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        sm_application = SmApplication(config_dir_override)
        context = SmContext(sm_application, None, False, False)
        service_resolver = ServiceResolver(sm_application)

        context.kill_everything()
        time.sleep(5)

        response1 = actions.start_one(context, "FAKE_NEXUS", True, False, None, None, None)
        self.assertTrue(response1)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        servicetostart = ["PLAY_NEXUS_END_TO_END_TEST"]
        appendArgs = {"PLAY_NEXUS_END_TO_END_TEST": ["-DFoo=Bar"]}
        fatJar = True
        release = False
        proxy = None
        port = None
        seconds_to_wait = None

        try:
            actions.start_and_wait(service_resolver, context, servicetostart, fatJar, release, proxy, port, seconds_to_wait, appendArgs)
            time.sleep(5)
            service = SmPlayService(context, "PLAY_NEXUS_END_TO_END_TEST")
            processes = SmProcess.processes_matching(service.pattern)
            self.assertEqual(len(processes), 1)
            self.assertTrue("-DFoo=Bar" in processes[0].args)
        finally:
            context.kill_everything()
开发者ID:liquidarmour,项目名称:service-manager,代码行数:31,代码来源:tests.py


示例8: test_successful_play_from_jar_without_waiting

    def test_successful_play_from_jar_without_waiting(self):
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        sm_application = SmApplication(config_dir_override)
        context = SmContext(sm_application, None, False, False)
        service_resolver = ServiceResolver(sm_application)

        context.kill_everything()
        time.sleep(5)

        response1 = actions.start_one(context, "FAKE_NEXUS", True, False, None, port=None)
        self.assertTrue(response1)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        fatJar = True
        release = False
        proxy = None
        port = None
        seconds_to_wait = None
        append_args = None

        try:
            servicetostart = ["PLAY_NEXUS_END_TO_END_TEST"]
            actions.start_and_wait(service_resolver, context, servicetostart, fatJar, release, proxy, port, seconds_to_wait, append_args)
        finally:
            context.kill_everything()
开发者ID:liquidarmour,项目名称:service-manager,代码行数:26,代码来源:tests.py


示例9: test_play_with_append_args

    def test_play_with_append_args(self):
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        context = SmContext(SmApplication(config_dir_override), None, False, False)
        context.kill_everything()

        # Start up fake nexus first
        response1 = actions.start_one(context, "FAKE_NEXUS", True, False, None, port=None)
        self.assertTrue(response1)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        server = smserverlogic.SmServer(SmApplication(config_dir_override, None))
        request = dict()
        request["testId"] = "foo"
        request["services"] = [
            {"serviceName": "PLAY_NEXUS_END_TO_END_TEST", "runFrom": "SNAPSHOT", "appendArgs": ["-Dfoo=bar"]}
        ]
        smserverlogic.SmStartRequest(server, request, True, False).process_request()
        time.sleep(5)
        self.assertEqual(len(context.get_service("PLAY_NEXUS_END_TO_END_TEST").status()), 1)
        service = SmPlayService(context, "PLAY_NEXUS_END_TO_END_TEST")
        processes = SmProcess.processes_matching(service.pattern)
        self.assertEqual(len(processes), 1)
        self.assertTrue("-Dfoo=bar" in processes[0].args)
        context.kill_everything()
        self.assertEqual(context.get_service("TEST_ONE").status(), [])
开发者ID:Jagordon,项目名称:service-manager,代码行数:26,代码来源:tests.py


示例10: test_failing_play_from_jar

    def test_failing_play_from_jar(self):

        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        sm_application = SmApplication(config_dir_override)
        context = SmContext(sm_application, None, False, False)
        service_resolver = ServiceResolver(sm_application)

        context.kill_everything()
        time.sleep(5)

        response1 = actions.start_one(context, "FAKE_NEXUS", True, False, None, port=None)
        self.assertTrue(response1)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        try:
            servicetostart = ["BROKEN_PLAY_PROJECT"]
            actions.start_and_wait(
                service_resolver,
                context,
                servicetostart,
                fatjar=True,
                release=False,
                proxy=None,
                port=None,
                seconds_to_wait=2,
                append_args=None,
            )
            self.fail("Did not expect the project to startup.")
        except ServiceManagerException as sme:
            self.assertEqual("Timed out starting service(s): BROKEN_PLAY_PROJECT", sme.message)
        finally:
            context.kill_everything()
开发者ID:Jagordon,项目名称:service-manager,代码行数:33,代码来源:tests.py


示例11: test_file_server

    def test_file_server(self):
        name = "FAKE_NEXUS"
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        context = smcontext.SmContext(smcontext.SmApplication(config_dir_override), None, False, False)

        context.kill("FAKE_NEXUS")
        self.assertEqual(context.get_service("FAKE_NEXUS").status(), [])
        time.sleep(2)

        response1 = actions.start_one(context, name, True, False, None, port=None)
        self.assertTrue(response1)
        self.assertIsNotNone(context.get_service(name).status())
        response2 = actions.start_one(context, name, True, False, None, port=None)
        self.assertFalse(response2)
        context.kill(name)
        self.assertEqual(context.get_service(name).status(), [])
开发者ID:carlosroman,项目名称:service-manager,代码行数:16,代码来源:tests.py


示例12: test_wait_on_assets_server

    def test_wait_on_assets_server(self):
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        sm_application = smcontext.SmApplication(config_dir_override)
        context = smcontext.SmContext(sm_application, None, False, False)
        service_resolver = ServiceResolver(sm_application)
        context.kill_everything()

        context.kill("FAKE_NEXUS")
        self.assertEqual(context.get_service("FAKE_NEXUS").status(), [])
        time.sleep(2)

        # start fake nexus
        self.assertEqual(context.get_service("FAKE_NEXUS").status(), [])
        response1 = actions.start_one(context, "FAKE_NEXUS", True, False, None, port=None)
        self.assertTrue(response1)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        actions.start_and_wait(service_resolver, context, ["PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND"], True, False, None, port=None, seconds_to_wait=5)
        self.assertIsNotNone(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status())
        context.kill("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND")
        context.kill("FAKE_NEXUS")
        time.sleep(15)

        self.assertEqual(context.get_service("PYTHON_SIMPLE_SERVER_ASSETS_FRONTEND").status(), [])
        self.assertEqual(context.get_service("FAKE_NEXUS").status(), [])
开发者ID:carlosroman,项目名称:service-manager,代码行数:26,代码来源:tests.py


示例13: test_start_and_stop_one

    def test_start_and_stop_one(self):
        context = SmContext(SmApplication(self.config_dir_override), None, False, False)
        result = actions.start_one(context, "TEST_ONE", False, True, False, None, port=None)
        self.assertTrue(result)

        self.waitForCondition((lambda : len(context.get_service("TEST_ONE").status())), 1)
        context.kill("TEST_ONE", True)
        self.assertEqual(context.get_service("TEST_ONE").status(), [])
开发者ID:elabeca,项目名称:service-manager,代码行数:8,代码来源:test_actions.py


示例14: test_nexus

    def test_nexus(self):
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")

        # start fake nexus
        context = smcontext.SmContext(smcontext.SmApplication(config_dir_override), None, False, False)
        response1 = actions.start_one(context, "FAKE_NEXUS", True, False, None, port=None)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        context = smcontext.SmContext(smcontext.SmApplication(config_dir_override), None, False, False)
        servicetostart = "PLAY_NEXUS_END_TO_END_TEST"
        actions.start_one(context, servicetostart, True, False, None, port=None)
        self.assertIsNotNone(context.get_service(servicetostart).status())
        context.kill(servicetostart)

        context.kill("FAKE_NEXUS")

        self.assertEqual(context.get_service(servicetostart).status(), [])
        self.assertEqual(context.get_service("FAKE_NEXUS").status(), [])
开发者ID:carlosroman,项目名称:service-manager,代码行数:19,代码来源:tests.py


示例15: test_start_and_stop_one_duplicate

    def test_start_and_stop_one_duplicate(self):
        sm_application = SmApplication(self.config_dir_override)
        context = SmContext(sm_application, None, False, False)
        service_resolver = ServiceResolver(sm_application)

        actions.start_and_wait(service_resolver, context, ["TEST_ONE"], False, False, False, None, port=None, seconds_to_wait=90, append_args=None)

        self.assertIsNotNone(context.get_service("TEST_ONE").status())
        result = actions.start_one(context, "TEST_ONE", False, True, False, None, port=None)
        self.assertFalse(result)
        context.kill("TEST_ONE", True)
        self.assertEqual(context.get_service("TEST_ONE").status(), [])
开发者ID:elabeca,项目名称:service-manager,代码行数:12,代码来源:test_actions.py


示例16: test_ensure_multiple_instances_of_a_service_can_be_started_from_server

    def test_ensure_multiple_instances_of_a_service_can_be_started_from_server(self):
        config_dir_override = os.path.join(os.path.dirname(__file__), "conf")
        context = SmContext(SmApplication(config_dir_override), None, False, False)
        context.kill_everything()
        server = smserverlogic.SmServer(SmApplication(config_dir_override, None))

        # start fake nexus
        self.assertEqual(context.get_service("FAKE_NEXUS").status(), [])
        response1 = actions.start_one(context, "FAKE_NEXUS", True, False, None, port=None)
        self.assertTrue(response1)
        self.assertIsNotNone(context.get_service("FAKE_NEXUS").status())
        time.sleep(5)

        first_request = dict()
        first_request["testId"] = "multiple-instance-unit-test-1"
        first_request["services"] = [
            {"serviceName": "TEST_ONE", "runFrom": "SNAPSHOT"},
            {"serviceName": "DROPWIZARD_NEXUS_END_TO_END_TEST", "runFrom": "SNAPSHOT"},
            {"serviceName": "PLAY_NEXUS_END_TO_END_TEST", "runFrom": "SNAPSHOT"},
        ]
        request = smserverlogic.SmStartRequest(server, first_request, True, False)
        request.process_request()

        time.sleep(5)
        self.assertEqual(len(context.get_service("TEST_ONE").status()), 1)
        self.assertEqual(len(context.get_service("DROPWIZARD_NEXUS_END_TO_END_TEST").status()), 1)
        self.assertEqual(len(context.get_service("PLAY_NEXUS_END_TO_END_TEST").status()), 1)

        second_request = dict()
        second_request["testId"] = "multiple-instance-unit-test-2"
        second_request["services"] = [
            {"serviceName": "TEST_ONE", "runFrom": "SNAPSHOT"},
            {"serviceName": "DROPWIZARD_NEXUS_END_TO_END_TEST", "runFrom": "SNAPSHOT"},
            {"serviceName": "PLAY_NEXUS_END_TO_END_TEST", "runFrom": "SNAPSHOT"},
        ]
        smserverlogic.SmStartRequest(server, second_request, True, False).process_request()
        time.sleep(5)
        self.assertEqual(len(context.get_service("TEST_ONE").status()), 2)
        self.assertEqual(len(context.get_service("DROPWIZARD_NEXUS_END_TO_END_TEST").status()), 2)
        self.assertEqual(len(context.get_service("PLAY_NEXUS_END_TO_END_TEST").status()), 2)

        # stop does not currently work for extern
        # smserverlogic.SmStopRequest(SERVER, request).process_request()
        context.kill_everything()
        self.assertEqual(context.get_service("TEST_ONE").status(), [])
        context.kill("FAKE_NEXUS")
开发者ID:Jagordon,项目名称:service-manager,代码行数:46,代码来源:tests.py


示例17: test_start_and_stop_one_with_append_args

 def test_start_and_stop_one_with_append_args(self):
     context = SmContext(SmApplication(self.config_dir_override), None, False, False)
     actions.start_one(context, "TEST_FOUR", False, True, False, None, None, ["2"])
     self.waitForCondition((lambda : len(context.get_service("TEST_FOUR").status())), 1)
     context.kill("TEST_FOUR", True)
     self.assertEqual(context.get_service("TEST_FOUR").status(), [])
开发者ID:elabeca,项目名称:service-manager,代码行数:6,代码来源:test_actions.py



注:本文中的servicemanager.actions.actions.start_one函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python smcontext.SmContext类代码示例发布时间:2022-05-27
下一篇:
Python servicelogger.log函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap