• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python Action.Action类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中MilkCheck.Engine.Action.Action的典型用法代码示例。如果您正苦于以下问题:Python Action类的具体用法?Python Action怎么用?Python Action使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Action类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUp

    def setUp(self):
        '''
        Set up the graph of services within the service manager

        Graph
                              _ start
           group --> service /
                             `- stop
        '''
        CLICommon.setUp(self)

        # ServiceGroup
        group = ServiceGroup('ServiceGroup')
        # Service
        self.service = service = Service('service')
        service.desc = 'I am the service'
        # Actions
        self.start_action = Action('start', command='/bin/true')
        self.stop_action = Action('stop', command='/bin/false')
        self.timeout_action = Action('timeout', command='sleep 1', timeout=0.1)
        self.start_action.inherits_from(service)
        self.stop_action.inherits_from(service)
        service.add_action(self.start_action)
        service.add_action(self.stop_action)
        service.add_action(self.timeout_action)

        # Build graph
        group.add_inter_dep(target=service)

        # Register services within the manager
        self.manager.add_service(group)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:31,代码来源:CliTest.py


示例2: test_create_action1

 def test_create_action1(self):
     """Test instanciation of an Action through a dictionnary"""
     act = Action("start")
     act.fromdict(
         {
             "target": "localhost",
             "fanout": 4,
             "retry": 5,
             "delay": 2,
             "errors": 3,
             "timeout": 4,
             "cmd": "/bin/True",
             "desc": "my desc",
             "mode": "delegate",
         }
     )
     self.assertTrue(act)
     self.assertEqual(act.name, "start")
     self.assertEqual(act.target, NodeSet("localhost"))
     self.assertEqual(act.fanout, 4)
     self.assertEqual(act.maxretry, 5)
     self.assertEqual(act.errors, 3)
     self.assertEqual(act.delay, 2)
     self.assertEqual(act.timeout, 4)
     self.assertEqual(act.command, "/bin/True")
     self.assertEqual(act.desc, "my desc")
     self.assertEqual(act.mode, "delegate")
开发者ID:mdlx,项目名称:milkcheck,代码行数:27,代码来源:ActionTest.py


示例3: test_create_action1

 def test_create_action1(self):
     '''Test instanciation of an Action through a dictionnary'''
     act = Action('start')
     act.fromdict(
         {
             'target': 'localhost',
             'fanout': 4,
             'retry': 5,
             'delay': 2,
             'errors': 3,
             'timeout': 4,
             'cmd': '/bin/True',
             'desc': 'my desc',
             'mode': 'delegate',
         }
     )
     self.assertTrue(act)
     self.assertEqual(act.name, 'start')
     self.assertEqual(act.target, NodeSet('localhost'))
     self.assertEqual(act.fanout, 4)
     self.assertEqual(act.maxretry, 5)
     self.assertEqual(act.errors, 3)
     self.assertEqual(act.delay, 2)
     self.assertEqual(act.timeout, 4)
     self.assertEqual(act.command, '/bin/True')
     self.assertEqual(act.desc, 'my desc')
     self.assertEqual(act.mode, 'delegate')
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:27,代码来源:ActionTest.py


示例4: test_local_variables

    def test_local_variables(self):
        '''Test Action local variables'''
        action = Action('bar')
        self.assertEqual(action._resolve("I'm %ACTION"), "I'm bar")

        svc = Service('foo')
        svc.add_action(action)
        self.assertEqual(action._resolve("I'm %SERVICE.%ACTION"), "I'm foo.bar")
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:8,代码来源:ActionTest.py


示例5: test_nb_errors_remote2

 def test_nb_errors_remote2(self):
     """Test the method nb_errors() with no error (remote)."""
     action = Action(name="test", target=HOSTNAME, command="/bin/true")
     service = Service("test_service")
     service.add_action(action)
     service.run("test")
     self.assertEqual(action.nb_errors(), 0)
     self.assertEqual(action.status, DONE)
开发者ID:mdlx,项目名称:milkcheck,代码行数:8,代码来源:ActionTest.py


示例6: test_local_variables

    def test_local_variables(self):
        """Test Action local variables"""
        action = Action("bar")
        self.assertEqual(action._resolve("I'm %ACTION"), "I'm bar")

        svc = Service("foo")
        svc.add_action(action)
        self.assertEqual(action._resolve("I'm %SERVICE.%ACTION"), "I'm foo.bar")
开发者ID:mdlx,项目名称:milkcheck,代码行数:8,代码来源:ActionTest.py


示例7: test_nb_timeout_remote

 def test_nb_timeout_remote(self):
     """Test nb_timeout() method (remote mode)"""
     action = Action(name="start", target=HOSTNAME, command="sleep 3", timeout=0.5)
     service = Service("test_service")
     service.add_action(action)
     service.run("start")
     self.assertEqual(action.nb_timeout(), 1)
     self.assertEqual(action.status, TIMEOUT)
开发者ID:mdlx,项目名称:milkcheck,代码行数:8,代码来源:ActionTest.py


示例8: test_action_with_variables

 def test_action_with_variables(self):
     """Test variables in action command"""
     cmd = 'echo %([ "%VAR1" != "" ] && echo "-x %VAR1")'
     action = Action("start", command=cmd)
     service = Service("TEST")
     service.add_actions(action)
     service.add_var("VAR1", "foo")
     action.run()
     self.assertEqual(action.worker.command, "echo -x foo")
开发者ID:mdlx,项目名称:milkcheck,代码行数:9,代码来源:ActionTest.py


示例9: test_nb_errors_remote

 def test_nb_errors_remote(self):
     """Test the method nb_errors() (remote)."""
     action = Action(name="start", target="aury[12,13,21]", command="/bin/false")
     action.errors = 2
     service = Service("test_service")
     service.add_action(action)
     service.run("start")
     self.assertEqual(action.nb_errors(), 3)
     self.assertEqual(action.status, ERROR)
开发者ID:mdlx,项目名称:milkcheck,代码行数:9,代码来源:ActionTest.py


示例10: test_nb_timeout_local

 def test_nb_timeout_local(self):
     """Test nb_timeout() method (local)"""
     action = Action(name="start", command="sleep 3", timeout=0.3)
     service = Service("test_service")
     service.add_action(action)
     service.run("start")
     self.assertEqual(action.nb_errors(), 0)
     self.assertEqual(action.nb_timeout(), 1)
     self.assertEqual(action.status, TIMEOUT)
开发者ID:mdlx,项目名称:milkcheck,代码行数:9,代码来源:ActionTest.py


示例11: test_nb_errors_remote2

 def test_nb_errors_remote2(self):
     """Test the method nb_errors() with no error (remote)."""
     action = Action(name='test', target=HOSTNAME, command='/bin/true')
     service = Service('test_service')
     service.add_action(action)
     service.run('test')
     self.assertEqual(action.nodes_error(), NodeSet())
     self.assertEqual(action.nb_errors(), 0)
     self.assertEqual(action.status, DONE)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:9,代码来源:ActionTest.py


示例12: test_run_skipped

    def test_run_skipped(self):
        """Run a service with empty target is SKIPPED"""
        svc = Service('test_service')
        action = Action('start', target="TEMPNODE", command=":")
        svc.add_action(action)
        action.update_target("TEMPNODE", 'DIF')
        svc.run('start')

        self.assertEqual(action.status, SKIPPED)
        self.assertEqual(svc.status, SKIPPED)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:10,代码来源:ServiceTest.py


示例13: test_command_output_warning_status

    def test_command_output_warning_status(self):
        '''Test command line output with one action WARNING'''
        svc = Service('warn')
        act = Action('go', command='/bin/false')
        act.errors = 1
        svc.add_action(act)
        self.manager.register_service(svc)
        self._output_check(['warn', 'go', '-q'], RC_WARNING,
"""warn                                                              [ WARNING ]
""")
开发者ID:mdlx,项目名称:milkcheck,代码行数:10,代码来源:CliTest.py


示例14: test_failed_nodes

 def test_failed_nodes(self):
     """failed nodes are backup"""
     action = Action('start', command='/bin/false', target=HOSTNAME)
     service = Service('test')
     service.add_actions(action)
     action.run()
     self.assertEqual(action.failed_nodes, NodeSet(HOSTNAME))
     self.assertEqual(action.status, ERROR)
     # This is propagated to action service
     self.assertEqual(service.failed_nodes, action.failed_nodes)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:10,代码来源:ActionTest.py


示例15: test_nb_timeout_remote

 def test_nb_timeout_remote(self):
     """Test nb_timeout() method (remote mode)"""
     action = Action(name='start', target=HOSTNAME,
                     command='sleep 3', timeout=0.5)
     service = Service('test_service')
     service.add_action(action)
     service.run('start')
     self.assertEqual(action.nodes_timeout(), NodeSet(HOSTNAME))
     self.assertEqual(action.nb_timeout(), 1)
     self.assertEqual(action.status, TIMEOUT)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:10,代码来源:ActionTest.py


示例16: test_action_with_variables

 def test_action_with_variables(self):
     """Test variables in action command"""
     cmd = 'echo %([ "%VAR1" != "" ] && echo "-x %VAR1")'
     action = Action('start', command=cmd)
     service = Service('TEST')
     service.add_actions(action)
     service.add_var('VAR1', 'foo')
     service.resolve_all()
     action.run()
     self.assertEqual(action.worker.command, 'echo -x foo')
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:10,代码来源:ActionTest.py


示例17: test_retry_timeout

 def test_retry_timeout(self):
     """Test retry behaviour when timeout"""
     action = Action("start", command="/bin/sleep 0.5", timeout=0.1)
     action.delay = 0.1
     action.maxretry = 2
     service = Service("retry")
     service.add_action(action)
     service.run("start")
     self.assertEqual(action.tries, 3)
     self.assertEqual(action.status, TIMEOUT)
     self.assertTrue(0.6 < action.duration < 0.8, "%.3f is not between 0.6 and 0.8" % action.duration)
开发者ID:mdlx,项目名称:milkcheck,代码行数:11,代码来源:ActionTest.py


示例18: test_nb_timeout_local

 def test_nb_timeout_local(self):
     """Test nb_timeout() method (local)"""
     action = Action(name='start', command='sleep 3', timeout=0.3)
     service = Service('test_service')
     service.add_action(action)
     service.run('start')
     self.assertEqual(action.nodes_error(), NodeSet())
     self.assertEqual(action.nb_errors(), 0)
     self.assertEqual(action.nodes_timeout(), NodeSet("localhost"))
     self.assertEqual(action.nb_timeout(), 1)
     self.assertEqual(action.status, TIMEOUT)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:11,代码来源:ActionTest.py


示例19: test_retry_error

 def test_retry_error(self):
     """Test retry behaviour when errors"""
     action = Action("start", command="/bin/false")
     action.delay = 0.1
     action.maxretry = 3
     service = Service("retry")
     service.add_action(action)
     service.run("start")
     self.assertEqual(action.tries, 4)
     self.assertEqual(action.status, ERROR)
     self.assertTrue(0.3 < action.duration < 0.5, "%.3f is not between 0.3 and 0.5" % action.duration)
开发者ID:mdlx,项目名称:milkcheck,代码行数:11,代码来源:ActionTest.py


示例20: test_nb_errors_remote

 def test_nb_errors_remote(self):
     """Test the method nb_errors() (remote)."""
     action = Action(name='start', target='badname[12,13,21]',
                     command='/bin/false')
     action.errors = 2
     service = Service('test_service')
     service.add_action(action)
     service.run('start')
     self.assertEqual(action.nodes_error(), NodeSet("badname[12,13,21]"))
     self.assertEqual(action.nb_errors(), 3)
     self.assertEqual(action.status, ERROR)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:11,代码来源:ActionTest.py



注:本文中的MilkCheck.Engine.Action.Action类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python BaseEntity.BaseEntity类代码示例发布时间:2022-05-24
下一篇:
Python MessageParser.MessageParser类代码示例发布时间:2022-05-24
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap