• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python base.DummyProcess类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中supervisor.tests.base.DummyProcess的典型用法代码示例。如果您正苦于以下问题:Python DummyProcess类的具体用法?Python DummyProcess怎么用?Python DummyProcess使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DummyProcess类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_process_state_events_starting_and_backoff

    def test_process_state_events_starting_and_backoff(self):
        from supervisor.states import ProcessStates
        from supervisor import events

        for klass in (events.ProcessStateStartingEvent, events.ProcessStateBackoffEvent):
            options = DummyOptions()
            pconfig1 = DummyPConfig(options, "process1", "process1", "/bin/process1")

            class DummyGroup:
                config = pconfig1

            process1 = DummyProcess(pconfig1)
            process1.group = DummyGroup
            event = klass(process1, ProcessStates.STARTING)
            headers, payload = self._deserialize(str(event))
            self.assertEqual(len(headers), 4)
            self.assertEqual(headers["processname"], "process1")
            self.assertEqual(headers["groupname"], "process1")
            self.assertEqual(headers["from_state"], "STARTING")
            self.assertEqual(headers["tries"], "0")
            self.assertEqual(payload, "")
            process1.backoff = 1
            event = klass(process1, ProcessStates.STARTING)
            headers, payload = self._deserialize(str(event))
            self.assertEqual(headers["tries"], "1")
            process1.backoff = 2
            event = klass(process1, ProcessStates.STARTING)
            headers, payload = self._deserialize(str(event))
            self.assertEqual(headers["tries"], "2")
开发者ID:GulsahKose,项目名称:supervisor,代码行数:29,代码来源:test_events.py


示例2: test_process_state_events_with_pid

    def test_process_state_events_with_pid(self):
        from supervisor.states import ProcessStates
        from supervisor import events

        for klass in (
            events.ProcessStateRunningEvent,
            events.ProcessStateStoppedEvent,
            events.ProcessStateStoppingEvent,
        ):
            options = DummyOptions()
            pconfig1 = DummyPConfig(options, "process1", "process1", "/bin/process1")

            class DummyGroup:
                config = pconfig1

            process1 = DummyProcess(pconfig1)
            process1.group = DummyGroup
            process1.pid = 1
            event = klass(process1, ProcessStates.STARTING)
            headers, payload = self._deserialize(str(event))
            self.assertEqual(len(headers), 4)
            self.assertEqual(headers["processname"], "process1")
            self.assertEqual(headers["groupname"], "process1")
            self.assertEqual(headers["from_state"], "STARTING")
            self.assertEqual(headers["pid"], "1")
            self.assertEqual(payload, "")
开发者ID:GulsahKose,项目名称:supervisor,代码行数:26,代码来源:test_events.py


示例3: test_handle_result_exception

 def test_handle_result_exception(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def exception(event, result):
         raise ValueError
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = exception
     process.group.result_handler = exception
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 1)
     self.assertEqual(L[0].__class__, events.EventRejectedEvent)
     self.assertEqual(process.listener_state,
                      EventListenerStates.UNKNOWN)
     self.assertEqual(options.logger.data[0],
                      'process1: event caused an error')
     self.assertEqual(options.logger.data[1],
                      'process1: BUSY -> UNKNOWN')
     self.assertEqual(options.logger.data[2],
                      'process1: has entered the UNKNOWN state and will '
                      'no longer receive events, this usually indicates '
                      'the process violated the eventlistener protocol')
开发者ID:1T,项目名称:supervisor,代码行数:34,代码来源:test_dispatchers.py


示例4: test_toggle_capturemode_sends_event

    def test_toggle_capturemode_sends_event(self):
        executable = "/bin/cat"
        options = DummyOptions()
        config = DummyPConfig(
            options, "process1", "/bin/process1", stdout_logfile="/tmp/foo", stdout_capture_maxbytes=500
        )
        process = DummyProcess(config)
        process.pid = 4000
        dispatcher = self._makeOne(process)
        dispatcher.capturemode = True
        dispatcher.capturelog.data = ["hallooo"]
        L = []

        def doit(event):
            L.append(event)

        from supervisor import events

        events.subscribe(events.EventTypes.PROCESS_COMMUNICATION, doit)
        dispatcher.toggle_capturemode()
        self.assertEqual(len(L), 1)
        event = L[0]
        self.assertEqual(event.process, process)
        self.assertEqual(event.pid, 4000)
        self.assertEqual(event.data, "hallooo")
开发者ID:Jude7,项目名称:minos,代码行数:25,代码来源:test_dispatchers.py


示例5: test_handle_result_accept

 def test_handle_result_accept(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def handle(event, result):
         pass
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = handle
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 0)
     self.assertEqual(process.listener_state,
                      EventListenerStates.ACKNOWLEDGED)
     result = options.logger.data[0]
     self.assertTrue(result.endswith('BUSY -> ACKNOWLEDGED (processed)'))
开发者ID:lmcdonough,项目名称:supervisor,代码行数:26,代码来源:test_dispatchers.py


示例6: test_handle_result_rejectevent

 def test_handle_result_rejectevent(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def rejected(event, result):
         from supervisor.dispatchers import RejectEvent
         raise RejectEvent(result)
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = rejected
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 1)
     self.assertEqual(L[0].__class__, events.EventRejectedEvent)
     self.assertEqual(process.listener_state,
                      EventListenerStates.ACKNOWLEDGED)
     self.assertEqual(options.logger.data[0],
                      'process1: event was rejected')
     self.assertEqual(options.logger.data[1],
                      'process1: BUSY -> ACKNOWLEDGED')
开发者ID:1T,项目名称:supervisor,代码行数:30,代码来源:test_dispatchers.py


示例7: test_process_state_events_starting_and_backoff

 def test_process_state_events_starting_and_backoff(self):
     from supervisor.states import ProcessStates
     from supervisor import events
     for klass in (
         events.ProcessStateStartingEvent,
         events.ProcessStateBackoffEvent,
         ):
         options = DummyOptions()
         pconfig1 = DummyPConfig(options, 'process1', 'process1',
                                 '/bin/process1')
         class DummyGroup:
             config = pconfig1
         process1 = DummyProcess(pconfig1)
         process1.group = DummyGroup
         event = klass(process1, ProcessStates.STARTING)
         headers, payload = self._deserialize(str(event))
         self.assertEqual(len(headers), 4)
         self.assertEqual(headers['processname'], 'process1')
         self.assertEqual(headers['groupname'], 'process1')
         self.assertEqual(headers['from_state'], 'STARTING')
         self.assertEqual(headers['tries'], '0')
         self.assertEqual(payload, '')
         process1.backoff = 1
         event = klass(process1, ProcessStates.STARTING)
         headers, payload = self._deserialize(str(event))
         self.assertEqual(headers['tries'], '1')
         process1.backoff = 2
         event = klass(process1, ProcessStates.STARTING)
         headers, payload = self._deserialize(str(event))
         self.assertEqual(headers['tries'], '2')
开发者ID:alexsilva,项目名称:supervisor,代码行数:30,代码来源:test_events.py


示例8: test_handle_result_exception

 def test_handle_result_exception(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def exception(event, result):
         raise ValueError
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = exception
     process.group.result_handler = exception
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 1)
     self.assertEqual(L[0].__class__, events.EventRejectedEvent)
     self.assertEqual(process.listener_state,
                      EventListenerStates.UNKNOWN)
     result = options.logger.data[0]
     self.assertTrue(result.endswith('BUSY -> UNKNOWN'))
开发者ID:lmcdonough,项目名称:supervisor,代码行数:28,代码来源:test_dispatchers.py


示例9: test_handle_listener_state_busy_gobbles

 def test_handle_listener_state_busy_gobbles(self):
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     process.listener_state = EventListenerStates.BUSY
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     from supervisor.dispatchers import default_handler
     process.group.config.result_handler = default_handler
     dispatcher.state_buffer = 'RESULT 2\nOKbogus data\n'
     self.assertEqual(dispatcher.handle_listener_state_change(), None)
     self.assertEqual(dispatcher.state_buffer, '')
     self.assertEqual(options.logger.data[0],
                      'process1: event was processed')
     self.assertEqual(options.logger.data[1],
                      'process1: BUSY -> ACKNOWLEDGED')
     self.assertEqual(options.logger.data[2],
                      'process1: ACKNOWLEDGED -> UNKNOWN')
     self.assertEqual(options.logger.data[3],
                      'process1: has entered the UNKNOWN state and will '
                      'no longer receive events, this usually indicates '
                      'the process violated the eventlistener protocol')
     self.assertEqual(process.listener_state,
                      EventListenerStates.UNKNOWN)
开发者ID:1T,项目名称:supervisor,代码行数:28,代码来源:test_dispatchers.py


示例10: test_handle_listener_state_change_busy_to_unknown

    def test_handle_listener_state_change_busy_to_unknown(self):
        from supervisor.events import EventRejectedEvent
        from supervisor.events import subscribe
        events = []

        def doit(event):
            events.append(event)

        subscribe(EventRejectedEvent, doit)
        options = DummyOptions()
        config = DummyPConfig(options, 'process1', '/bin/process1')
        process = DummyProcess(config)
        from supervisor.dispatchers import EventListenerStates
        dispatcher = self._makeOne(process)
        process.listener_state = EventListenerStates.BUSY
        current_event = DummyEvent()
        process.event = current_event
        dispatcher.state_buffer = 'bogus data\n'
        self.assertEqual(dispatcher.handle_listener_state_change(), None)
        self.assertEqual(dispatcher.state_buffer, '')
        self.assertEqual(options.logger.data[0],
                         'process1: BUSY -> UNKNOWN (bad result line \'bogus data\')')
        self.assertEqual(process.listener_state,
                         EventListenerStates.UNKNOWN)
        self.assertEqual(events[0].process, process)
        self.assertEqual(events[0].event, current_event)
开发者ID:alexsilva,项目名称:supervisor,代码行数:26,代码来源:test_dispatchers.py


示例11: test_handle_listener_state_change_busy_to_unknown

 def test_handle_listener_state_change_busy_to_unknown(self):
     from supervisor.events import EventRejectedEvent
     from supervisor.events import subscribe
     events = []
     def doit(event):
         events.append(event)
     subscribe(EventRejectedEvent, doit)
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     process.listener_state = EventListenerStates.BUSY
     current_event = DummyEvent()
     process.event = current_event
     dispatcher.state_buffer = 'bogus data\n'
     self.assertEqual(dispatcher.handle_listener_state_change(), None)
     self.assertEqual(dispatcher.state_buffer, '')
     self.assertEqual(options.logger.data[0],
             "process1: bad result line: 'bogus data'")
     self.assertEqual(options.logger.data[1],
             'process1: BUSY -> UNKNOWN')
     self.assertEqual(options.logger.data[2],
                      'process1: has entered the UNKNOWN state and will '
                      'no longer receive events, this usually indicates '
                      'the process violated the eventlistener protocol')
     self.assertEqual(process.listener_state,
                      EventListenerStates.UNKNOWN)
     self.assertEqual(events[0].process, process)
     self.assertEqual(events[0].event, current_event)
开发者ID:1T,项目名称:supervisor,代码行数:30,代码来源:test_dispatchers.py


示例12: test_handle_listener_state_change_busy_to_insufficient

 def test_handle_listener_state_change_busy_to_insufficient(self):
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     process.listener_state = EventListenerStates.BUSY
     dispatcher.state_buffer = 'bogus data yo'
     self.assertEqual(dispatcher.handle_listener_state_change(), None)
     self.assertEqual(dispatcher.state_buffer, 'bogus data yo')
     self.assertEqual(process.listener_state, EventListenerStates.BUSY)
开发者ID:1T,项目名称:supervisor,代码行数:11,代码来源:test_dispatchers.py


示例13: test_handle_listener_state_change_from_unknown

 def test_handle_listener_state_change_from_unknown(self):
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     process.listener_state = EventListenerStates.UNKNOWN
     dispatcher.state_buffer = 'whatever'
     self.assertEqual(dispatcher.handle_listener_state_change(), None)
     self.assertEqual(dispatcher.state_buffer, '')
     self.assertEqual(options.logger.data, [])
     self.assertEqual(process.listener_state, EventListenerStates.UNKNOWN)
开发者ID:1T,项目名称:supervisor,代码行数:12,代码来源:test_dispatchers.py


示例14: test_handle_listener_state_change_acknowledged_to_insufficient

 def test_handle_listener_state_change_acknowledged_to_insufficient(self):
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     process.listener_state = EventListenerStates.ACKNOWLEDGED
     dispatcher.state_buffer = 'RE'
     self.assertEqual(dispatcher.handle_listener_state_change(), None)
     self.assertEqual(dispatcher.state_buffer, 'RE')
     self.assertEqual(options.logger.data, [])
     self.assertEqual(process.listener_state,
                      EventListenerStates.ACKNOWLEDGED)
开发者ID:1T,项目名称:supervisor,代码行数:13,代码来源:test_dispatchers.py


示例15: test_handle_listener_state_change_ready_to_unknown

 def test_handle_listener_state_change_ready_to_unknown(self):
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     process.listener_state = EventListenerStates.READY
     dispatcher.state_buffer = 'bogus data yo'
     self.assertEqual(dispatcher.handle_listener_state_change(), None)
     self.assertEqual(dispatcher.state_buffer, '')
     self.assertEqual(options.logger.data[0],
                      'process1: READY -> UNKNOWN')
     self.assertEqual(process.listener_state, EventListenerStates.UNKNOWN)
开发者ID:lmcdonough,项目名称:supervisor,代码行数:13,代码来源:test_dispatchers.py


示例16: test_handle_listener_state_change_acknowledged_to_ready

    def test_handle_listener_state_change_acknowledged_to_ready(self):
        options = DummyOptions()
        config = DummyPConfig(options, "process1", "/bin/process1")
        process = DummyProcess(config)
        from supervisor.dispatchers import EventListenerStates

        dispatcher = self._makeOne(process)
        process.listener_state = EventListenerStates.ACKNOWLEDGED
        dispatcher.state_buffer = "READY\n"
        self.assertEqual(dispatcher.handle_listener_state_change(), None)
        self.assertEqual(dispatcher.state_buffer, "")
        self.assertEqual(options.logger.data[0], "process1: ACKNOWLEDGED -> READY")
        self.assertEqual(process.listener_state, EventListenerStates.READY)
开发者ID:whenjonny,项目名称:tupppai-tools,代码行数:13,代码来源:test_dispatchers.py


示例17: test_pcomm_stderr_event

 def test_pcomm_stderr_event(self):
     options = DummyOptions()
     pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
     process1 = DummyProcess(pconfig1)
     class DummyGroup:
         config = pconfig1
     process1.group = DummyGroup
     from supervisor.events import ProcessCommunicationStderrEvent
     event = ProcessCommunicationStderrEvent(process1, 1, 'yo')
     headers, payload = self._deserialize(str(event))
     self.assertEqual(headers['processname'], 'process1', headers)
     self.assertEqual(headers['groupname'], 'process1', headers)
     self.assertEqual(headers['pid'], '1', headers)
     self.assertEqual(payload, 'yo')
开发者ID:alexsilva,项目名称:supervisor,代码行数:14,代码来源:test_events.py


示例18: test_write_encodes_unicode_as_utf8

    def test_write_encodes_unicode_as_utf8(self):
        options = DummyOptions()
        config = DummyPConfig(options, 'cat', 'bin/cat')

        process = DummyProcess(config)
        process.pid = 42
        process.killing = False

        event = DummyEvent()
        event.process = process

        response = _u(_b('STDIN:foobar'))
        supervisor_twiddler.resulthandler.stdin_write_handler(event, response)
        self.assertEqual(process.stdin_buffer, 'foobar')
开发者ID:cjrh-gps,项目名称:supervisor_twiddler,代码行数:14,代码来源:test_resulthandler.py


示例19: test_reap

    def test_reap(self):
        options = DummyOptions()
        options.waitpid_return = 1, 1
        pconfig = DummyPConfig(options, 'process', 'process', '/bin/process1')
        process = DummyProcess(pconfig)
        process.drained = False
        process.killing = 1
        process.laststop = None
        process.waitstatus = None, None
        options.pidhistory = {1:process}
        supervisord = self._makeOne(options)

        supervisord.reap(once=True)
        self.assertEqual(process.finished, (1,1))
开发者ID:GulsahKose,项目名称:supervisor,代码行数:14,代码来源:test_supervisord.py


示例20: test_handle_read_event_calls_handle_listener_state_change

    def test_handle_read_event_calls_handle_listener_state_change(self):
        options = DummyOptions()
        config = DummyPConfig(options, "process1", "/bin/process1", stdout_logfile="/tmp/foo")
        process = DummyProcess(config)
        from supervisor.dispatchers import EventListenerStates

        process.listener_state = EventListenerStates.ACKNOWLEDGED
        dispatcher = self._makeOne(process)
        options.readfd_result = dispatcher.READY_FOR_EVENTS_TOKEN
        self.assertEqual(dispatcher.handle_read_event(), None)
        self.assertEqual(process.listener_state, EventListenerStates.READY)
        self.assertEqual(dispatcher.state_buffer, "")
        self.assertEqual(len(dispatcher.childlog.data), 1)
        self.assertEqual(dispatcher.childlog.data[0], dispatcher.READY_FOR_EVENTS_TOKEN)
开发者ID:whenjonny,项目名称:tupppai-tools,代码行数:14,代码来源:test_dispatchers.py



注:本文中的supervisor.tests.base.DummyProcess类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python base.DummyProcessGroup类代码示例发布时间:2022-05-27
下一篇:
Python base.DummyOptions类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap