• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python event.EventPublisher类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyon.event.event.EventPublisher的典型用法代码示例。如果您正苦于以下问题:Python EventPublisher类的具体用法?Python EventPublisher怎么用?Python EventPublisher使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了EventPublisher类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: process_oms_event

def process_oms_event():

    json_params = {}

    # oms direct request
    if request.data:
        json_params  = json_loads(str(request.data))
        log.debug('ServiceGatewayService:process_oms_event request.data:  %s', json_params)

    #validate payload
    if 'platform_id' not in json_params or 'message' not in json_params:
        log.warning('Invalid OMS event format. payload_data: %s', json_params)
        #return gateway_json_response(OMS_BAD_REQUEST_RESPONSE)

    #prepare the event information
    try:
        #create a publisher to relay OMS events into the system as DeviceEvents
        event_publisher = EventPublisher()

        event_publisher.publish_event(
            event_type='OMSDeviceStatusEvent',
            origin_type='OMS Platform',
            origin=json_params.get('platform_id', 'NOT PROVIDED'),
            sub_type='',
            description = json_params.get('message', ''),
            status_details = json_params)
    except Exception, e:
        log.error('Could not publish OMS  event: %s. Event data: %s', e.message, json_params)
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:28,代码来源:service_gateway_service.py


示例2: publish_event_for_diagnostics

def publish_event_for_diagnostics():  # pragma: no cover
    """
    Convenient method to do the publication of the event to generate diagnostic
    information about the statuses kept in each running platform agent.

    ><> from ion.agents.platform.status_manager import publish_event_for_diagnostics
    ><> publish_event_for_diagnostics()

    and something like the following will be logged out:

2013-05-17 17:25:16,076 INFO Dummy-247 ion.agents.platform.status_manager:760 'MJ01C': (99cb3e71302a4e5ca0c137292103e357) statuses:
                                           AGGREGATE_COMMS     AGGREGATE_DATA      AGGREGATE_LOCATION  AGGREGATE_POWER
        d231ccba8d674b4691b039ceecec8d95 : STATUS_UNKNOWN      STATUS_UNKNOWN      STATUS_UNKNOWN      STATUS_UNKNOWN
        40c787fc727a4734b219fde7c8df7543 : STATUS_UNKNOWN      STATUS_UNKNOWN      STATUS_UNKNOWN      STATUS_UNKNOWN
        55ee7225435444e3a862d7ceaa9d1875 : STATUS_OK           STATUS_OK           STATUS_OK           STATUS_OK
        1d27e0c2723149cc9692488dced7dd95 : STATUS_UNKNOWN      STATUS_UNKNOWN      STATUS_UNKNOWN      STATUS_UNKNOWN
                               aggstatus : STATUS_OK           STATUS_OK           STATUS_OK           STATUS_OK
                           rollup_status : STATUS_OK           STATUS_OK           STATUS_OK           STATUS_OK
    """

    from pyon.event.event import EventPublisher
    ep = EventPublisher()
    evt = dict(event_type='DeviceStatusEvent', sub_type='diagnoser', origin='command_line')
    print("publishing: %s" % str(evt))
    ep.publish_event(**evt)
开发者ID:ednad,项目名称:coi-services,代码行数:25,代码来源:status_manager.py


示例3: test_base_subscriber_as_catchall

    def test_base_subscriber_as_catchall(self):
        ar = event.AsyncResult()
        gq = queue.Queue()
        self.count = 0

        def cb(*args, **kwargs):
            self.count += 1
            gq.put(args[0])
            if self.count == 2:
                ar.set()

        sub = EventSubscriber(node=self.container.node, callback=cb)
        pub1 = self.TestEventPublisher(node=self.container.node)
        pub2 = EventPublisher(node=self.container.node)

        self._listen(sub)

        pub1.create_and_publish_event(origin="some", description="1")
        pub2.create_and_publish_event(origin="other", description="2")

        ar.get(timeout=5)

        res = []
        for x in xrange(self.count):
            res.append(gq.get(timeout=5))

        self.assertEquals(len(res), 2)
        self.assertEquals(res[0].description, "1")
        self.assertEquals(res[1].description, "2")
开发者ID:dstuebe,项目名称:pyon,代码行数:29,代码来源:test_event.py


示例4: publish_alert

 def publish_alert(self):
     """
     """
     event_data = self.make_event_data()
     print '########## publishing: ' + event_data['sub_type'] 
     pub = EventPublisher()
     pub.publish_event(**event_data)
开发者ID:mbarry02,项目名称:coi-services,代码行数:7,代码来源:alerts.py


示例5: test_pub_on_different_origins

    def test_pub_on_different_origins(self):
        ar = event.AsyncResult()
        gq = queue.Queue()
        self.count = 0

        def cb(*args, **kwargs):
            self.count += 1
            gq.put(args[0])
            if self.count == 3:
                ar.set()

        sub = EventSubscriber(event_type="ResourceEvent", callback=cb)
        pub = EventPublisher(event_type="ResourceEvent")

        self._listen(sub)

        pub.publish_event(origin="one", description="1")
        pub.publish_event(origin="two", description="2")
        pub.publish_event(origin="three", description="3")

        ar.get(timeout=5)

        res = []
        for x in xrange(self.count):
            res.append(gq.get(timeout=5))

        self.assertEquals(len(res), 3)
        self.assertEquals(res[0].description, "1")
        self.assertEquals(res[1].description, "2")
        self.assertEquals(res[2].description, "3")
开发者ID:ooici-dm,项目名称:pyon,代码行数:30,代码来源:test_event.py


示例6: test_pub_on_different_subtypes

    def test_pub_on_different_subtypes(self):
        ar = event.AsyncResult()
        gq = queue.Queue()
        self.count = 0

        def cb(event, *args, **kwargs):
            self.count += 1
            gq.put(event)
            if event.description == "end":
                ar.set()

        sub = EventSubscriber(event_type="ResourceModifiedEvent", sub_type="st1", callback=cb)
        sub.activate()

        pub1 = EventPublisher(event_type="ResourceModifiedEvent")
        pub2 = EventPublisher(event_type="ContainerLifecycleEvent")

        pub1.publish_event(origin="two", sub_type="st2", description="2")
        pub2.publish_event(origin="three", sub_type="st1", description="3")
        pub1.publish_event(origin="one", sub_type="st1", description="1")
        pub1.publish_event(origin="four", sub_type="st1", description="end")

        ar.get(timeout=5)
        sub.deactivate()

        res = []
        for x in xrange(self.count):
            res.append(gq.get(timeout=5))

        self.assertEquals(len(res), 2)
        self.assertEquals(res[0].description, "1")
开发者ID:ooici-dm,项目名称:pyon,代码行数:31,代码来源:test_event.py


示例7: _acquire_data

    def _acquire_data(cls, config, publisher, unlock_new_data_callback):
        """
        Ensures required keys (such as stream_id) are available from config, configures the publisher and then calls:
             BaseDataHandler._new_data_constraints (only if config does not contain 'constraints')
             BaseDataHandler._publish_data passing BaseDataHandler._get_data as a parameter
        @param config Dict containing configuration parameters, may include constraints, formatters, etc
        @param unlock_new_data_callback BaseDataHandler callback function to allow conditional unlocking of the BaseDataHandler._semaphore
        """
        log.debug('start _acquire_data: config={0}'.format(config))

        cls._init_acquisition_cycle(config)

        constraints = get_safe(config,'constraints')
        if not constraints:
            gevent.getcurrent().link(unlock_new_data_callback)
            constraints = cls._new_data_constraints(config)
            if constraints is None:
                raise InstrumentParameterException("Data constraints returned from _new_data_constraints cannot be None")
            config['constraints'] = constraints

        cls._publish_data(publisher, cls._get_data(config))

        # Publish a 'TestFinished' event
        if get_safe(config,'TESTING'):
            log.debug('Publish TestingFinished event')
            pub = EventPublisher('DeviceCommonLifecycleEvent')
            pub.publish_event(origin='BaseDataHandler._acquire_data', description='TestingFinished')
开发者ID:ooici-eoi,项目名称:coi-services,代码行数:27,代码来源:base_data_handler.py


示例8: _acquire_data

    def _acquire_data(cls, config, unlock_new_data_callback):
        """
        Ensures required keys (such as stream_id) are available from config, configures the publisher and then calls:
             BaseDataHandler._new_data_constraints (only if config does not contain 'constraints')
             BaseDataHandler._publish_data passing BaseDataHandler._get_data as a parameter
        @param config Dict containing configuration parameters, may include constraints, formatters, etc
        @param unlock_new_data_callback BaseDataHandler callback function to allow conditional unlocking of the BaseDataHandler._semaphore
        """
        stream_id = get_safe(config, 'stream_id')
        if not stream_id:
            raise ConfigurationError('Configuration does not contain required \'stream_id\' key')
        #TODO: Configure the publisher
        publisher=None

        constraints = get_safe(config,'constraints')
        if not constraints:
            gevent.getcurrent().link(unlock_new_data_callback)
            constraints = cls._new_data_constraints(config)
            config['constraints']=constraints

        cls._publish_data(publisher, config, cls._get_data(config))

        # Publish a 'TestFinished' event
        if get_safe(config,'TESTING'):
            log.debug('Publish TestingFinished event')
            pub = EventPublisher('DeviceCommonLifecycleEvent')
            pub.publish_event(origin='BaseDataHandler._acquire_data', description='TestingFinished')
开发者ID:seman,项目名称:coi-services,代码行数:27,代码来源:base_data_handler.py


示例9: test_sub

    def test_sub(self):

        #start interaction observer
        io = InteractionObserver()
        io.start()

        #publish an event
        ev_pub = EventPublisher(event_type="ResourceEvent")
        ev_pub.publish_event(origin="specific", description="event")


        # publish a message
        msg_pub = Publisher()
        msg_pub.publish(to_name='anyone', msg="msg")

        # give 2 seconds for the messages to arrive
        time.sleep(2)

        #verify that two messages (an event and a message) are seen
        self.assertEquals(len(io.msg_log), 2)

        #iterate through the messages observed
        for item in io.msg_log:
            # if event
            if item[2]:
                #verify that the origin is what we sent
                self.assertEquals(item[1]['origin'], 'specific')
        dump = io._get_data(io.msg_log,{})
        sump = dump
开发者ID:ednad,项目名称:coi-services,代码行数:29,代码来源:test_interaction_observer.py


示例10: SystemManagementService

class SystemManagementService(BaseSystemManagementService):
    """ container management requests are handled by the event listener
        ion.processes.event.container_manager.ContainerManager
        which must be running on each container.
    """
    def on_start(self,*a,**b):
        super(SystemManagementService,self).on_start(*a,**b)
        self.sender = EventPublisher()
    def on_quit(self,*a,**b):
        self.sender.close()
    def perform_action(self, predicate, action):
        userid = None # get from context
        self.sender.publish_event(event_type=OT.ContainerManagementRequest, origin=userid, predicate=predicate, action=action)
    def set_log_level(self, logger='', level='', recursive=False):
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.ChangeLogLevel, logger=logger, level=level, recursive=recursive))


    def reset_policy_cache(self, headers=None, timeout=None):
        """Clears and reloads the policy caches in all of the containers.

        @throws BadRequest    None
        """
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.ResetPolicyCache))

    def trigger_garbage_collection(self):
        """Triggers a garbage collection in all containers

        @throws BadRequest    None
        """
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.TriggerGarbageCollection))
开发者ID:MauriceManning,项目名称:coi-services,代码行数:30,代码来源:system_management_service.py


示例11: SystemManagementService

class SystemManagementService(BaseSystemManagementService):
    """ container management requests are handled by the event listener
        ion.processes.event.container_manager.ContainerManager
        which must be running on each container.
    """
    def on_start(self,*a,**b):
        super(SystemManagementService,self).on_start(*a,**b)
        self.sender = EventPublisher()

    def on_quit(self,*a,**b):
        self.sender.close()

    def perform_action(self, predicate, action):
        userid = None # get from context
        self.sender.publish_event(event_type=OT.ContainerManagementRequest, origin=userid, predicate=predicate, action=action)

    def set_log_level(self, logger='', level='', recursive=False):
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.ChangeLogLevel, logger=logger, level=level, recursive=recursive))


    def reset_policy_cache(self, headers=None, timeout=None):
        """Clears and reloads the policy caches in all of the containers.

        @throws BadRequest    None
        """
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.ResetPolicyCache))

    def trigger_garbage_collection(self):
        """Triggers a garbage collection in all containers

        @throws BadRequest    None
        """
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.TriggerGarbageCollection))

    def trigger_container_snapshot(self, snapshot_id='', include_snapshots=None, exclude_snapshots=None,
                                   take_at_time='', clear_all=False, persist_snapshot=True, snapshot_kwargs=None):

        if not snapshot_id:
            snapshot_id = get_ion_ts()
        if not snapshot_kwargs:
            snapshot_kwargs = {}

        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.TriggerContainerSnapshot,
                                                               snapshot_id=snapshot_id,
                                                               include_snapshots=include_snapshots,
                                                               exclude_snapshots=exclude_snapshots,
                                                               take_at_time=take_at_time,
                                                               clear_all=clear_all,
                                                               persist_snapshot=persist_snapshot,
                                                               snapshot_kwargs=snapshot_kwargs))
        log.info("Event to trigger container snapshots sent. snapshot_id=%s" % snapshot_id)

    def start_gevent_block(self, alarm_mode=False):
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.StartGeventBlock, alarm_mode=alarm_mode))

    def stop_gevent_block(self):
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.StopGeventBlock))

    def prepare_system_shutdown(self, mode=''):
        self.perform_action(ALL_CONTAINERS_INSTANCE, IonObject(OT.PrepareSystemShutdown, mode=mode))
开发者ID:edwardhunter,项目名称:coi-services,代码行数:60,代码来源:system_management_service.py


示例12: test_event_in_stream_out_transform

    def test_event_in_stream_out_transform(self):
        """
        Test the event-in/stream-out transform
        """

        stream_id, _ = self.pubsub.create_stream('test_stream', exchange_point='science_data')
        self.exchange_cleanup.append('science_data')

        #---------------------------------------------------------------------------------------------
        # Launch a ctd transform
        #---------------------------------------------------------------------------------------------
        # Create the process definition
        process_definition = ProcessDefinition(
            name='EventToStreamTransform',
            description='For testing an event-in/stream-out transform')
        process_definition.executable['module']= 'ion.processes.data.transforms.event_in_stream_out_transform'
        process_definition.executable['class'] = 'EventToStreamTransform'
        proc_def_id = self.process_dispatcher.create_process_definition(process_definition=process_definition)

        # Build the config
        config = DotDict()
        config.process.queue_name = 'test_queue'
        config.process.exchange_point = 'science_data'
        config.process.publish_streams.output = stream_id
        config.process.event_type = 'ExampleDetectableEvent'
        config.process.variables = ['voltage', 'temperature' ]

        # Schedule the process
        pid = self.process_dispatcher.schedule_process(process_definition_id=proc_def_id, configuration=config)
        self.addCleanup(self.process_dispatcher.cancel_process,pid)

        #---------------------------------------------------------------------------------------------
        # Create a subscriber for testing
        #---------------------------------------------------------------------------------------------

        ar_cond = gevent.event.AsyncResult()
        def subscriber_callback(m, r, s):
            ar_cond.set(m)
        sub = StandaloneStreamSubscriber('sub', subscriber_callback)
        self.addCleanup(sub.stop)
        sub_id = self.pubsub.create_subscription('subscription_cond',
            stream_ids=[stream_id],
            exchange_name='sub')
        self.pubsub.activate_subscription(sub_id)
        self.queue_cleanup.append(sub.xn.queue)
        sub.start()

        gevent.sleep(4)

        #---------------------------------------------------------------------------------------------
        # Publish an event. The transform has been configured to receive this event
        #---------------------------------------------------------------------------------------------

        event_publisher = EventPublisher("ExampleDetectableEvent")
        event_publisher.publish_event(origin = 'fake_origin', voltage = '5', temperature = '273')

        # Assert that the transform processed the event and published data on the output stream
        result_cond = ar_cond.get(timeout=10)
        self.assertTrue(result_cond)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:59,代码来源:event_management_test.py


示例13: publish_alert

 def publish_alert(self):
     """
     Publishes the alert to ION.
     """
     event_data = self.make_event_data()
     log.trace("publishing alert: %s", event_data)
     pub = EventPublisher()
     pub.publish_event(**event_data)
开发者ID:ednad,项目名称:coi-services,代码行数:8,代码来源:alerts.py


示例14: call_process

 def call_process(self, packet, headers=None):
     try:
         self.process(packet)
     except Exception as e:
         log.exception('Unhandled caught in transform process')
         event_publisher = EventPublisher()
         event_publisher.publish_event(origin=self._transform_id, event_type='ExceptionEvent',
             exception_type=str(type(e)), exception_message=e.message)
开发者ID:swarbhanu,项目名称:pyon,代码行数:8,代码来源:transform.py


示例15: publish_link_event

def publish_link_event(up_down, terrestrial_remote=2):
    """
    """
    status = TelemetryStatusType.AVAILABLE if up_down \
        else TelemetryStatusType.UNAVAILABLE
    platform_id = tcaa_args['terrestrial_platform_id'] if terrestrial_remote \
        else tcaa_args['remote_platform_id']
    
    pub = EventPublisher()
    if terrestrial_remote == 0:
        pub.publish_event(
            event_type='PlatformTelemetryEvent',
            origin=tcaa_args['terrestrial_platform_id'],
            status = status)
        
    elif terrestrial_remote == 1:
        pub.publish_event(
            event_type='PlatformTelemetryEvent',
            origin=tcaa_args['remote_platform_id'],
            status = status)
        
    elif terrestrial_remote == 2:
        pub.publish_event(
            event_type='PlatformTelemetryEvent',
            origin=tcaa_args['terrestrial_platform_id'],
            status = status)
        pub.publish_event(
            event_type='PlatformTelemetryEvent',
            origin=tcaa_args['remote_platform_id'],
            status = status)
        
    else:
        raise ValueError('terrestrial_remote must be in range [0,2].')
开发者ID:ednad,项目名称:coi-services,代码行数:33,代码来源:shell_utils.py


示例16: validate_salinity_array

def validate_salinity_array(a, context={}):
    from pyon.agent.agent import ResourceAgentState
    from pyon.event.event import  EventPublisher
    from pyon.public import OT

    stream_id = context['stream_id']
    dataprocess_id = context['dataprocess_id']

    event_publisher = EventPublisher(OT.DeviceStatusAlertEvent)

    event_publisher.publish_event(  origin = stream_id, values=[dataprocess_id], description="Invalid value for salinity")
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:11,代码来源:test_transform_worker.py


示例17: ProcessDispatcherSimpleAPIClient

class ProcessDispatcherSimpleAPIClient(object):

    # State to use when state returned from PD is None
    unknown_state = "400-PENDING"

    state_map = {
        ProcessStateEnum.SPAWN: '500-RUNNING',
        ProcessStateEnum.TERMINATE: '700-TERMINATED',
        ProcessStateEnum.ERROR: '850-FAILED'
    }

    def __init__(self, name, **kwargs):
        self.real_client = ProcessDispatcherServiceClient(to_name=name, **kwargs)
        self.event_pub = EventPublisher()

    def dispatch_process(self, upid, spec, subscribers, constraints=None,
                         immediate=False):

        name = spec.get('name')
        self.event_pub.publish_event(event_type="ProcessLifecycleEvent",
            origin=name, origin_type="DispatchedHAProcess",
            state=ProcessStateEnum.SPAWN)
        process_def = ProcessDefinition(name=name)
        process_def.executable = {'module': spec.get('module'),
                'class': spec.get('class')}

        process_def_id = self.real_client.create_process_definition(process_def)

        pid = self.real_client.create_process(process_def_id)

        process_schedule = ProcessSchedule()

        sched_pid = self.real_client.schedule_process(process_def_id,
                process_schedule, configuration={}, process_id=pid)

        proc = self.real_client.read_process(sched_pid)
        dict_proc = {'upid': proc.process_id,
                'state': self.state_map.get(proc.process_state, self.unknown_state),
                }
        return dict_proc

    def terminate_process(self, pid):
        return self.real_client.cancel_process(pid)

    def describe_processes(self):
        procs = self.real_client.list_processes()
        dict_procs = []
        for proc in procs:
            dict_proc = {'upid': proc.process_id,
                    'state': self.state_map.get(proc.process_state, self.unknown_state),
                    }
            dict_procs.append(dict_proc)
        return dict_procs
开发者ID:pombredanne,项目名称:coi-services,代码行数:53,代码来源:high_availability_agent.py


示例18: TransformEventPublisher

class TransformEventPublisher(TransformEventProcess):

    def on_start(self):
        event_type = self.CFG.get_safe('process.event_type', '')

        self.publisher = EventPublisher(event_type=event_type)

    def publish_event(self, *args, **kwargs):
        raise NotImplementedError('Method publish_event not implemented')

    def on_quit(self):
        self.publisher.close()
开发者ID:swarbhanu,项目名称:pyon,代码行数:12,代码来源:transforma.py


示例19: test_pub_and_sub

    def test_pub_and_sub(self):
        ar = event.AsyncResult()
        def cb(*args, **kwargs):
            ar.set(args)
        sub = EventSubscriber(event_type="ResourceEvent", callback=cb, origin="specific")
        pub = EventPublisher(event_type="ResourceEvent")

        self._listen(sub)
        pub.publish_event(origin="specific", description="hello")

        evmsg, evheaders = ar.get(timeout=5)

        self.assertEquals(evmsg.description, "hello")
        self.assertAlmostEquals(int(evmsg.ts_created), int(get_ion_ts()), delta=5000)
开发者ID:oldpatricka,项目名称:pyon,代码行数:14,代码来源:test_event.py


示例20: test_greater_than_interval

    def test_greater_than_interval(self):
        """
        test_greater_than_interval
        Test interval alarm and alarm event publishing for a greater than
        inteval.
        """

        kwargs = {
            'name' : 'current_warning_interval',
            'stream_name' : 'fakestreamname',
            'value_id' : 'port_current',
            'message' : 'Current is above normal range.',
            'type' : StreamAlarmType.WARNING,
            'lower_bound' : 10.5,
            'lower_rel_op' : '<'
        }

        if TEST_ION_OBJECTS:
            # Create alarm object.
            alarm = IonObject('IntervalAlarmDef', **kwargs)
            alarm = construct_alarm_expression(alarm)
        else:
            alarm = IntervalAlarm(**kwargs)

        # This sequence will produce 5 alarms:
        # All clear on the first value,
        # Warning on the first 30,
        # All clear on the following 5.5,
        # Warning on the 15.1,
        # All clear on the following 3.3.
        self._event_count = 5
        test_vals = [5.5, 5.4, 5.5, 5.6, 30, 30.4, 5.5, 5.6, 15.1, 15.2,
                     15.3, 3.3, 3.4]

        pub = EventPublisher(event_type="StreamAlarmEvent",
            node=self.container.node)

        for x in test_vals:
            if TEST_ION_OBJECTS:
                (alarm, event_data) = eval_alarm(alarm, x)
                
            else:
                event_data = alarm.eval_alarm(x)

            if event_data:
                pub.publish_event(origin=self._resource_id, **event_data)
        
        self._async_event_result.get(timeout=30)
        
        """
开发者ID:swarbhanu,项目名称:coi-services,代码行数:50,代码来源:test_alarms.py



注:本文中的pyon.event.event.EventPublisher类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python event.EventSubscriber类代码示例发布时间:2022-05-27
下一篇:
Python datastore.DatastoreManager类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap