• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python events.subscribe函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中supervisor.events.subscribe函数的典型用法代码示例。如果您正苦于以下问题:Python subscribe函数的具体用法?Python subscribe怎么用?Python subscribe使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了subscribe函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_toggle_capturemode_sends_event

    def test_toggle_capturemode_sends_event(self):
        executable = "/bin/cat"
        options = DummyOptions()
        config = DummyPConfig(
            options, "process1", "/bin/process1", stdout_logfile="/tmp/foo", stdout_capture_maxbytes=500
        )
        process = DummyProcess(config)
        process.pid = 4000
        dispatcher = self._makeOne(process)
        dispatcher.capturemode = True
        dispatcher.capturelog.data = ["hallooo"]
        L = []

        def doit(event):
            L.append(event)

        from supervisor import events

        events.subscribe(events.EventTypes.PROCESS_COMMUNICATION, doit)
        dispatcher.toggle_capturemode()
        self.assertEqual(len(L), 1)
        event = L[0]
        self.assertEqual(event.process, process)
        self.assertEqual(event.pid, 4000)
        self.assertEqual(event.data, "hallooo")
开发者ID:Jude7,项目名称:minos,代码行数:25,代码来源:test_dispatchers.py


示例2: test_handle_result_exception

 def test_handle_result_exception(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def exception(event, result):
         raise ValueError
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = exception
     process.group.result_handler = exception
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 1)
     self.assertEqual(L[0].__class__, events.EventRejectedEvent)
     self.assertEqual(process.listener_state,
                      EventListenerStates.UNKNOWN)
     self.assertEqual(options.logger.data[0],
                      'process1: event caused an error')
     self.assertEqual(options.logger.data[1],
                      'process1: BUSY -> UNKNOWN')
     self.assertEqual(options.logger.data[2],
                      'process1: has entered the UNKNOWN state and will '
                      'no longer receive events, this usually indicates '
                      'the process violated the eventlistener protocol')
开发者ID:1T,项目名称:supervisor,代码行数:34,代码来源:test_dispatchers.py


示例3: test_handle_result_rejectevent

 def test_handle_result_rejectevent(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def rejected(event, result):
         from supervisor.dispatchers import RejectEvent
         raise RejectEvent(result)
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = rejected
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 1)
     self.assertEqual(L[0].__class__, events.EventRejectedEvent)
     self.assertEqual(process.listener_state,
                      EventListenerStates.ACKNOWLEDGED)
     self.assertEqual(options.logger.data[0],
                      'process1: event was rejected')
     self.assertEqual(options.logger.data[1],
                      'process1: BUSY -> ACKNOWLEDGED')
开发者ID:1T,项目名称:supervisor,代码行数:30,代码来源:test_dispatchers.py


示例4: test_handle_result_accept

 def test_handle_result_accept(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def handle(event, result):
         pass
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = handle
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 0)
     self.assertEqual(process.listener_state,
                      EventListenerStates.ACKNOWLEDGED)
     result = options.logger.data[0]
     self.assertTrue(result.endswith('BUSY -> ACKNOWLEDGED (processed)'))
开发者ID:lmcdonough,项目名称:supervisor,代码行数:26,代码来源:test_dispatchers.py


示例5: test_handle_result_exception

 def test_handle_result_exception(self):
     from supervisor.events import subscribe
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     L = []
     def doit(event):
         L.append(event)
     from supervisor import events
     subscribe(events.EventRejectedEvent, doit)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     def exception(event, result):
         raise ValueError
     class Dummy:
         pass
     process.group = Dummy()
     process.group.config = Dummy()
     process.group.config.result_handler = exception
     process.group.result_handler = exception
     process.listener_state = EventListenerStates.BUSY
     dispatcher.handle_result('foo')
     self.assertEqual(len(L), 1)
     self.assertEqual(L[0].__class__, events.EventRejectedEvent)
     self.assertEqual(process.listener_state,
                      EventListenerStates.UNKNOWN)
     result = options.logger.data[0]
     self.assertTrue(result.endswith('BUSY -> UNKNOWN'))
开发者ID:lmcdonough,项目名称:supervisor,代码行数:28,代码来源:test_dispatchers.py


示例6: test_handle_listener_state_change_busy_to_unknown

 def test_handle_listener_state_change_busy_to_unknown(self):
     from supervisor.events import EventRejectedEvent
     from supervisor.events import subscribe
     events = []
     def doit(event):
         events.append(event)
     subscribe(EventRejectedEvent, doit)
     options = DummyOptions()
     config = DummyPConfig(options, 'process1', '/bin/process1')
     process = DummyProcess(config)
     from supervisor.dispatchers import EventListenerStates
     dispatcher = self._makeOne(process)
     process.listener_state = EventListenerStates.BUSY
     current_event = DummyEvent()
     process.event = current_event
     dispatcher.state_buffer = 'bogus data\n'
     self.assertEqual(dispatcher.handle_listener_state_change(), None)
     self.assertEqual(dispatcher.state_buffer, '')
     self.assertEqual(options.logger.data[0],
             "process1: bad result line: 'bogus data'")
     self.assertEqual(options.logger.data[1],
             'process1: BUSY -> UNKNOWN')
     self.assertEqual(options.logger.data[2],
                      'process1: has entered the UNKNOWN state and will '
                      'no longer receive events, this usually indicates '
                      'the process violated the eventlistener protocol')
     self.assertEqual(process.listener_state,
                      EventListenerStates.UNKNOWN)
     self.assertEqual(events[0].process, process)
     self.assertEqual(events[0].event, current_event)
开发者ID:1T,项目名称:supervisor,代码行数:30,代码来源:test_dispatchers.py


示例7: test_handle_listener_state_change_busy_to_unknown

    def test_handle_listener_state_change_busy_to_unknown(self):
        from supervisor.events import EventRejectedEvent
        from supervisor.events import subscribe
        events = []

        def doit(event):
            events.append(event)

        subscribe(EventRejectedEvent, doit)
        options = DummyOptions()
        config = DummyPConfig(options, 'process1', '/bin/process1')
        process = DummyProcess(config)
        from supervisor.dispatchers import EventListenerStates
        dispatcher = self._makeOne(process)
        process.listener_state = EventListenerStates.BUSY
        current_event = DummyEvent()
        process.event = current_event
        dispatcher.state_buffer = 'bogus data\n'
        self.assertEqual(dispatcher.handle_listener_state_change(), None)
        self.assertEqual(dispatcher.state_buffer, '')
        self.assertEqual(options.logger.data[0],
                         'process1: BUSY -> UNKNOWN (bad result line \'bogus data\')')
        self.assertEqual(process.listener_state,
                         EventListenerStates.UNKNOWN)
        self.assertEqual(events[0].process, process)
        self.assertEqual(events[0].event, current_event)
开发者ID:alexsilva,项目名称:supervisor,代码行数:26,代码来源:test_dispatchers.py


示例8: test_tick

    def test_tick(self):
        from supervisor import events
        L = []
        def callback(event):
            L.append(event)
        events.subscribe(events.TickEvent, callback)
        options = DummyOptions()
        supervisord = self._makeOne(options)

        supervisord.tick(now=0)
        self.assertEqual(supervisord.ticks[5], 0)
        self.assertEqual(supervisord.ticks[60], 0)
        self.assertEqual(supervisord.ticks[3600], 0)
        self.assertEqual(len(L), 0)

        supervisord.tick(now=6)
        self.assertEqual(supervisord.ticks[5], 5)
        self.assertEqual(supervisord.ticks[60], 0)
        self.assertEqual(supervisord.ticks[3600], 0)
        self.assertEqual(len(L), 1)
        self.assertEqual(L[-1].__class__, events.Tick5Event)

        supervisord.tick(now=61)
        self.assertEqual(supervisord.ticks[5], 60)
        self.assertEqual(supervisord.ticks[60], 60)
        self.assertEqual(supervisord.ticks[3600], 0)
        self.assertEqual(len(L), 3)
        self.assertEqual(L[-1].__class__, events.Tick60Event)

        supervisord.tick(now=3601)
        self.assertEqual(supervisord.ticks[5], 3600)
        self.assertEqual(supervisord.ticks[60], 3600)
        self.assertEqual(supervisord.ticks[3600], 3600)
        self.assertEqual(len(L), 6)
        self.assertEqual(L[-1].__class__, events.Tick3600Event)
开发者ID:GulsahKose,项目名称:supervisor,代码行数:35,代码来源:test_supervisord.py


示例9: __init__

 def __init__(self, config):
     ProcessGroupBase.__init__(self, config)
     self.event_buffer = []
     for event_type in self.config.pool_events:
         events.subscribe(event_type, self._acceptEvent)
     events.subscribe(events.EventRejectedEvent, self.handle_rejected)
     self.serial = -1
     self.last_dispatch = 0
     self.dispatch_throttle = 0 # in seconds: .00195 is an interesting one
开发者ID:JeremyGrosser,项目名称:supervisor,代码行数:9,代码来源:process.py


示例10: test_runforever_emits_generic_startup_event

 def test_runforever_emits_generic_startup_event(self):
     from supervisor import events
     L = []
     def callback(event):
         L.append(1)
     events.subscribe(events.SupervisorStateChangeEvent, callback)
     options = DummyOptions()
     supervisord = self._makeOne(options)
     options.test = True
     supervisord.runforever()
     self.assertEqual(L, [1])
开发者ID:GulsahKose,项目名称:supervisor,代码行数:11,代码来源:test_supervisord.py


示例11: test_runforever_emits_generic_specific_event

 def test_runforever_emits_generic_specific_event(self):
     from supervisor import events
     L = []
     def callback(event):
         L.append(2)
     events.subscribe(events.SupervisorRunningEvent, callback)
     options = DummyOptions()
     options.test = True
     supervisord = self._makeOne(options)
     supervisord.runforever()
     self.assertEqual(L, [2])
开发者ID:GulsahKose,项目名称:supervisor,代码行数:11,代码来源:test_supervisord.py


示例12: test_stdout_capturemode_single_buffer

    def test_stdout_capturemode_single_buffer(self):
        # mike reported that comm events that took place within a single
        # output buffer were broken 8/20/2007
        from supervisor.events import ProcessCommunicationEvent
        from supervisor.events import subscribe

        events = []

        def doit(event):
            events.append(event)

        subscribe(ProcessCommunicationEvent, doit)
        BEGIN_TOKEN = ProcessCommunicationEvent.BEGIN_TOKEN
        END_TOKEN = ProcessCommunicationEvent.END_TOKEN
        data = BEGIN_TOKEN + "hello" + END_TOKEN
        options = DummyOptions()
        from supervisor.loggers import getLogger

        options.getLogger = getLogger  # actually use real logger
        logfile = "/tmp/log"
        config = DummyPConfig(
            options, "process1", "/bin/process1", stdout_logfile=logfile, stdout_capture_maxbytes=1000
        )
        process = DummyProcess(config)
        dispatcher = self._makeOne(process)

        try:
            dispatcher.output_buffer = data
            dispatcher.record_output()
            self.assertEqual(open(logfile, "r").read(), "")
            self.assertEqual(dispatcher.output_buffer, "")
            self.assertEqual(len(events), 1)

            event = events[0]
            from supervisor.events import ProcessCommunicationStdoutEvent

            self.assertEqual(event.__class__, ProcessCommunicationStdoutEvent)
            self.assertEqual(event.process, process)
            self.assertEqual(event.channel, "stdout")
            self.assertEqual(event.data, "hello")

        finally:
            try:
                dispatcher.capturelog.close()
                dispatcher.childlog.close()
                os.remove(logfile)
            except (OSError, IOError):
                pass
开发者ID:whenjonny,项目名称:tupppai-tools,代码行数:48,代码来源:test_dispatchers.py


示例13: test_record_output_does_not_emit_stderr_event_when_disabled

    def test_record_output_does_not_emit_stderr_event_when_disabled(self):
        options = DummyOptions()
        config = DummyPConfig(options, 'process1', '/bin/process1',
                              stderr_events_enabled=False)
        process = DummyProcess(config)
        dispatcher = self._makeOne(process, 'stderr')
        dispatcher.output_buffer = 'hello from stderr'

        L = []
        def doit(event):
            L.append(event)
        from supervisor import events
        events.subscribe(events.EventTypes.PROCESS_LOG_STDERR, doit)
        dispatcher.record_output()

        self.assertEqual(len(L), 0)
开发者ID:1T,项目名称:supervisor,代码行数:16,代码来源:test_dispatchers.py


示例14: test_add_process_group_event

    def test_add_process_group_event(self):
        from supervisor import events
        L = []
        def callback(event):
            L.append(1)
        events.subscribe(events.ProcessGroupAddedEvent, callback)
        options = DummyOptions()
        pconfig = DummyPConfig(options, 'foo', 'foo', '/bin/foo')
        gconfig = DummyPGroupConfig(options,'foo', pconfigs=[pconfig])
        options.process_group_configs = [gconfig]
        supervisord = self._makeOne(options)

        supervisord.add_process_group(gconfig)

        options.test = True
        supervisord.runforever()
        self.assertEqual(L, [1])
开发者ID:GulsahKose,项目名称:supervisor,代码行数:17,代码来源:test_supervisord.py


示例15: test_record_output_emits_stdout_event_when_enabled

    def test_record_output_emits_stdout_event_when_enabled(self):
        options = DummyOptions()
        config = DummyPConfig(options, 'process1', '/bin/process1',
                              stdout_events_enabled=True)
        process = DummyProcess(config)
        dispatcher = self._makeOne(process, 'stdout')
        dispatcher.output_buffer = 'hello from stdout'

        L = []
        def doit(event):
            L.append(event)
        from supervisor import events
        events.subscribe(events.EventTypes.PROCESS_LOG_STDOUT, doit)
        dispatcher.record_output()

        self.assertEqual(len(L), 1)
        event = L[0]
        self.assertEqual(event.process, process)
        self.assertEqual(event.data, 'hello from stdout')
开发者ID:1T,项目名称:supervisor,代码行数:19,代码来源:test_dispatchers.py


示例16: test_runforever_stopping_emits_events

 def test_runforever_stopping_emits_events(self):
     options = DummyOptions()
     supervisord = self._makeOne(options)
     gconfig = DummyPGroupConfig(options)
     pgroup = DummyProcessGroup(gconfig)
     supervisord.process_groups = {'foo': pgroup}
     supervisord.options.mood = -1
     L = []
     def callback(event):
         L.append(event)
     from supervisor import events
     events.subscribe(events.SupervisorStateChangeEvent, callback)
     from supervisor.medusa import asyncore_25 as asyncore
     options.test = True
     self.assertRaises(asyncore.ExitNow, supervisord.runforever)
     self.assertTrue(pgroup.all_stopped)
     self.assertTrue(isinstance(L[0], events.SupervisorRunningEvent))
     self.assertTrue(isinstance(L[0], events.SupervisorStateChangeEvent))
     self.assertTrue(isinstance(L[1], events.SupervisorStoppingEvent))
     self.assertTrue(isinstance(L[1], events.SupervisorStateChangeEvent))
开发者ID:GulsahKose,项目名称:supervisor,代码行数:20,代码来源:test_supervisord.py


示例17: test_toggle_capturemode_sends_event

    def test_toggle_capturemode_sends_event(self):
        options = DummyOptions()
        config = DummyPConfig(options, 'process1', '/bin/process1',
                              stdout_logfile=os.path.join(tempfile.gettempdir(), 'foo.txt'),
                              stdout_capture_maxbytes=500)
        process = DummyProcess(config)
        process.pid = 4000
        dispatcher = self._makeOne(process)
        dispatcher.capturemode = True
        dispatcher.capturelog.getvalue = lambda: 'hallooo'
        L = []

        def doit(event):
            L.append(event)

        from supervisor import events
        events.subscribe(events.EventTypes.PROCESS_COMMUNICATION, doit)
        dispatcher.toggle_capturemode()
        self.assertEqual(len(L), 1)
        event = L[0]
        self.assertEqual(event.process, process)
        self.assertEqual(event.pid, 4000)
        self.assertEqual(event.data, 'hallooo')
开发者ID:alexsilva,项目名称:supervisor,代码行数:23,代码来源:test_dispatchers.py


示例18: __init__

 def __init__(self, supvisors):
     """ Initialization of the attributes. """
     self.supvisors = supvisors
     # shortcuts for source code readability
     supvisors_short_cuts(self, ['fsm', 'info_source',
                                 'logger', 'statistician'])
     # test if statistics collector can be created for local host
     try:
         from supvisors.statscollector import instant_statistics
         self.collector = instant_statistics
     except ImportError:
         self.logger.warn('psutil not installed')
         self.logger.warn('this Supvisors will not publish statistics')
         self.collector = None
     # other attributes
     self.address = self.supvisors.address_mapper.local_address
     self.publisher = None
     self.main_loop = None
     # subscribe to internal events
     events.subscribe(events.SupervisorRunningEvent, self.on_running)
     events.subscribe(events.SupervisorStoppingEvent, self.on_stopping)
     events.subscribe(events.ProcessStateEvent, self.on_process)
     events.subscribe(events.Tick5Event, self.on_tick)
     events.subscribe(events.RemoteCommunicationEvent, self.on_remote_event)
开发者ID:julien6387,项目名称:supervisors,代码行数:24,代码来源:listener.py


示例19: test_stdout_capturemode_multiple_buffers

    def test_stdout_capturemode_multiple_buffers(self):
        from supervisor.events import ProcessCommunicationEvent
        from supervisor.events import subscribe

        events = []

        def doit(event):
            events.append(event)

        subscribe(ProcessCommunicationEvent, doit)
        import string

        letters = string.letters
        digits = string.digits * 4
        BEGIN_TOKEN = ProcessCommunicationEvent.BEGIN_TOKEN
        END_TOKEN = ProcessCommunicationEvent.END_TOKEN
        data = letters + BEGIN_TOKEN + digits + END_TOKEN + letters

        # boundaries that split tokens
        broken = data.split(":")
        first = broken[0] + ":"
        second = broken[1] + ":"
        third = broken[2]

        options = DummyOptions()
        from supervisor.loggers import getLogger

        options.getLogger = getLogger  # actually use real logger
        logfile = "/tmp/log"
        config = DummyPConfig(
            options, "process1", "/bin/process1", stdout_logfile=logfile, stdout_capture_maxbytes=10000
        )
        process = DummyProcess(config)
        dispatcher = self._makeOne(process)
        try:
            dispatcher.output_buffer = first
            dispatcher.record_output()
            [x.flush() for x in dispatcher.childlog.handlers]
            self.assertEqual(open(logfile, "r").read(), letters)
            self.assertEqual(dispatcher.output_buffer, first[len(letters) :])
            self.assertEqual(len(events), 0)

            dispatcher.output_buffer += second
            dispatcher.record_output()
            self.assertEqual(len(events), 0)
            [x.flush() for x in dispatcher.childlog.handlers]
            self.assertEqual(open(logfile, "r").read(), letters)
            self.assertEqual(dispatcher.output_buffer, first[len(letters) :])
            self.assertEqual(len(events), 0)

            dispatcher.output_buffer += third
            dispatcher.record_output()
            [x.flush() for x in dispatcher.childlog.handlers]
            self.assertEqual(open(logfile, "r").read(), letters * 2)
            self.assertEqual(len(events), 1)
            event = events[0]
            from supervisor.events import ProcessCommunicationStdoutEvent

            self.assertEqual(event.__class__, ProcessCommunicationStdoutEvent)
            self.assertEqual(event.process, process)
            self.assertEqual(event.channel, "stdout")
            self.assertEqual(event.data, digits)

        finally:
            try:
                os.remove(logfile)
            except (OSError, IOError):
                pass
开发者ID:Jude7,项目名称:minos,代码行数:68,代码来源:test_dispatchers.py


示例20: _subscribe

 def _subscribe(self):
     for event_type in self.config.pool_events:
         events.subscribe(event_type, self._acceptEvent)
     events.subscribe(events.EventRejectedEvent, self.handle_rejected)
开发者ID:the5fire,项目名称:supervisor,代码行数:4,代码来源:process.py



注:本文中的supervisor.events.subscribe函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python asyncore_25.compact_traceback函数代码示例发布时间:2022-05-27
下一篇:
Python events.notify函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap