• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python base.DummyProcessGroup类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中supervisor.tests.base.DummyProcessGroup的典型用法代码示例。如果您正苦于以下问题:Python DummyProcessGroup类的具体用法?Python DummyProcessGroup怎么用?Python DummyProcessGroup使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DummyProcessGroup类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_removeProcessFromGroup_raises_bad_name_when_process_does_not_exist

    def test_removeProcessFromGroup_raises_bad_name_when_process_does_not_exist(self):
        pconfig = DummyPConfig(None, 'foo', '/bin/foo')
        gconfig = DummyPGroupConfig(None, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.processes = {}

        supervisord = DummySupervisor(process_groups = {'group_name': pgroup})
        interface = self.makeOne(supervisord)

        self.assertRPCError(SupervisorFaults.BAD_NAME,
                            interface.removeProcessFromGroup,
                            'group_name', 'nonexistant_process_name')
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:12,代码来源:test_rpcinterface.py


示例2: test_runforever_select_dispatcher_exitnow

 def test_runforever_select_dispatcher_exitnow(self):
     options = DummyOptions()
     supervisord = self._makeOne(options)
     pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
     gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
     pgroup = DummyProcessGroup(gconfig)
     from supervisor.medusa import asyncore_25 as asyncore
     exitnow = DummyDispatcher(readable=True, error=asyncore.ExitNow)
     pgroup.dispatchers = {6:exitnow}
     supervisord.process_groups = {'foo': pgroup}
     options.select_result = [6], [], []
     options.test = True
     self.assertRaises(asyncore.ExitNow, supervisord.runforever)
开发者ID:GulsahKose,项目名称:supervisor,代码行数:13,代码来源:test_supervisord.py


示例3: test_runforever_select_dispatcher_handle_error_via_write

 def test_runforever_select_dispatcher_handle_error_via_write(self):
     options = DummyOptions()
     options.poller.result = [], [sys.stdout]
     supervisord = self._makeOne(options)
     pconfig = DummyPConfig(options, 'foo', '/bin/foo', )
     gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
     pgroup = DummyProcessGroup(gconfig)
     notimpl = DummyDispatcher(readable=True, error=NotImplementedError)
     pgroup.dispatchers = {sys.stdout: notimpl}
     supervisord.process_groups = {'foo': pgroup}
     options.test = True
     supervisord.runforever()
     self.assertEqual(notimpl.error_handled, True)
开发者ID:alexsilva,项目名称:supervisor,代码行数:13,代码来源:test_supervisord.py


示例4: test_removeProcessFromGroup_transitions_process_group

    def test_removeProcessFromGroup_transitions_process_group(self):
        pconfig = DummyPConfig(None, 'foo', '/bin/foo')
        process = DummyProcess(pconfig, ProcessStates.EXITED)

        gconfig = DummyPGroupConfig(None, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.processes = { 'process_name': process }

        supervisord = DummySupervisor(process_groups = {'group_name': pgroup})
        interface = self.makeOne(supervisord)

        result = interface.removeProcessFromGroup('group_name', 'process_name')
        self.assertTrue(result)
        self.assertTrue(pgroup.transitioned)
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:14,代码来源:test_rpcinterface.py


示例5: test_removeProcessFromGroup_deletes_the_process

    def test_removeProcessFromGroup_deletes_the_process(self):
        pconfig = DummyPConfig(None, 'foo', '/bin/foo')
        process = DummyProcess(pconfig, ProcessStates.STOPPED)

        gconfig = DummyPGroupConfig(None, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.processes = { 'process_name': process }

        supervisord = DummySupervisor(process_groups = {'group_name': pgroup})
        interface = self.makeOne(supervisord)

        result = interface.removeProcessFromGroup('group_name', 'process_name')
        self.assertTrue(result)
        self.assertTrue(pgroup.processes.get('process_name') is None)
        self.assertEqual('removeProcessFromGroup', interface.update_text)
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:15,代码来源:test_rpcinterface.py


示例6: test_removeProcessFromGroup_raises_still_running_when_process_has_pid

    def test_removeProcessFromGroup_raises_still_running_when_process_has_pid(self):
        pconfig = DummyPConfig(None, 'foo', '/bin/foo')
        process = DummyProcess(pconfig)
        process.pid = 42

        gconfig = DummyPGroupConfig(None, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.processes = { 'process_with_pid': process }

        supervisord = DummySupervisor(process_groups = {'group_name': pgroup})
        interface = self.makeOne(supervisord)

        self.assertRPCError(SupervisorFaults.STILL_RUNNING,
                            interface.removeProcessFromGroup,
                            'group_name', 'process_with_pid')
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:15,代码来源:test_rpcinterface.py


示例7: test_exit_delayed

 def test_exit_delayed(self):
     options = DummyOptions()
     supervisord = self._makeOne(options)
     pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
     process = DummyProcess(pconfig)
     gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
     pgroup = DummyProcessGroup(gconfig)
     pgroup.unstopped_processes = [process]
     L = []
     def callback():
         L.append(1)
     supervisord.process_groups = {'foo': pgroup}
     supervisord.options.mood = 0
     supervisord.options.test = True
     supervisord.runforever()
     self.assertNotEqual(supervisord.lastshutdownreport, 0)
开发者ID:GulsahKose,项目名称:supervisor,代码行数:16,代码来源:test_supervisord.py


示例8: test_runforever_select_dispatchers

 def test_runforever_select_dispatchers(self):
     options = DummyOptions()
     supervisord = self._makeOne(options)
     pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
     gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
     pgroup = DummyProcessGroup(gconfig)
     readable = DummyDispatcher(readable=True)
     writable = DummyDispatcher(writable=True)
     error = DummyDispatcher(writable=True, error=OSError)
     pgroup.dispatchers = {6:readable, 7:writable, 8:error}
     supervisord.process_groups = {'foo': pgroup}
     options.select_result = [6], [7, 8], []
     options.test = True
     supervisord.runforever()
     self.assertEqual(pgroup.transitioned, True)
     self.assertEqual(readable.read_event_handled, True)
     self.assertEqual(writable.write_event_handled, True)
     self.assertEqual(error.error_handled, True)
开发者ID:GulsahKose,项目名称:supervisor,代码行数:18,代码来源:test_supervisord.py


示例9: test_addProgramToGroup_adds_all_processes_resulting_from_program_options

    def test_addProgramToGroup_adds_all_processes_resulting_from_program_options(self):
        gconfig = DummyPGroupConfig(None, pconfigs=[])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.processes = {}

        supervisord = DummySupervisor(process_groups = {'group_name': pgroup})
        supervisord.options = supervisor.options.ServerOptions()

        interface = self.makeOne(supervisord)

        poptions = {'command': '/usr/bin/find /',
                    'process_name': 'find_%(process_num)d',
                    'numprocs': 3}
        self.assertTrue(interface.addProgramToGroup('group_name', 'new_process', poptions))
        self.assertEqual('addProgramToGroup', interface.update_text)

        self.assertEqual(3, len(pgroup.config.process_configs))
        self.assertEqual(3, len(pgroup.processes))
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:18,代码来源:test_rpcinterface.py


示例10: test_addProgramToGroup_adds_new_process_config_to_group

    def test_addProgramToGroup_adds_new_process_config_to_group(self):
        pconfig = DummyPConfig(None, 'foo', '/bin/foo')
        gconfig = DummyPGroupConfig(None, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.processes = {}

        supervisord = DummySupervisor(process_groups = {'group_name': pgroup})
        supervisord.options = supervisor.options.ServerOptions()

        interface = self.makeOne(supervisord)

        poptions = {'command': '/usr/bin/find /'}
        self.assertTrue(interface.addProgramToGroup('group_name', 'new_process', poptions))
        self.assertEqual('addProgramToGroup', interface.update_text)

        config = pgroup.config.process_configs[1]
        self.assertEqual('new_process', config.name)
        self.assertTrue(isinstance(config, supervisor.options.ProcessConfig))
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:18,代码来源:test_rpcinterface.py


示例11: test_addProgramToGroup_uses_process_name_from_options

    def test_addProgramToGroup_uses_process_name_from_options(self):
        gconfig = DummyPGroupConfig(None, pconfigs=[])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.processes = {}

        supervisord = DummySupervisor(process_groups = {'group_name': pgroup})
        supervisord.options = supervisor.options.ServerOptions()

        interface = self.makeOne(supervisord)

        poptions = {'process_name': 'renamed', 'command': '/usr/bin/find /'}
        self.assertTrue(interface.addProgramToGroup('group_name', 'new_process', poptions))
        self.assertEqual('addProgramToGroup', interface.update_text)

        config = pgroup.config.process_configs[0]
        self.assertEqual('renamed', config.name)
        self.assertTrue(pgroup.processes.get('new_process') is None)
        self.assertTrue(isinstance(pgroup.processes.get('renamed'),
          supervisor.process.Subprocess))
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:19,代码来源:test_rpcinterface.py



注:本文中的supervisor.tests.base.DummyProcessGroup类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python base.DummyRequest类代码示例发布时间:2022-05-27
下一篇:
Python base.DummyProcess类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap