• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python stream.StandaloneStreamSubscriber类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyon.ion.stream.StandaloneStreamSubscriber的典型用法代码示例。如果您正苦于以下问题:Python StandaloneStreamSubscriber类的具体用法?Python StandaloneStreamSubscriber怎么用?Python StandaloneStreamSubscriber使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了StandaloneStreamSubscriber类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _start_data_subscribers

    def _start_data_subscribers(self):
        """
        """
        # Create a pubsub client to create streams.
        pubsub_client = PubsubManagementServiceClient(node=self.container.node)

        # Create streams and subscriptions for each stream named in driver.
        self._data_subscribers = []
        self._samples_received = []
        self._raw_samples_received = []
        self._async_sample_result = AsyncResult()
        self._async_raw_sample_result = AsyncResult()

        # A callback for processing subscribed-to data.
        def recv_data(message, stream_route, stream_id):
            log.info('Received parsed data on %s (%s,%s)', stream_id, stream_route.exchange_point, stream_route.routing_key)
            self._samples_received.append(message)

        from pyon.util.containers import create_unique_identifier

        stream_name = 'ctdpf_parsed'
        parsed_config = self._stream_config[stream_name]
        stream_id = parsed_config['stream_id']
        exchange_name = create_unique_identifier("%s_queue" %
                    stream_name)
        self._purge_queue(exchange_name)
        sub = StandaloneStreamSubscriber(exchange_name, recv_data)
        sub.start()
        self._data_subscribers.append(sub)
        sub_id = pubsub_client.create_subscription(name=exchange_name, stream_ids=[stream_id])
        pubsub_client.activate_subscription(sub_id)
        sub.subscription_id = sub_id # Bind the subscription to the standalone subscriber (easier cleanup, not good in real practice)
开发者ID:ateranishi,项目名称:coi-services,代码行数:32,代码来源:dataset_test.py


示例2: test_serialize_compatability

    def test_serialize_compatability(self):
        ph = ParameterHelper(self.dataset_management, self.addCleanup)
        pdict_id = ph.create_extended_parsed()

        stream_def_id = self.pubsub_management.create_stream_definition('ctd extended', parameter_dictionary_id=pdict_id)
        self.addCleanup(self.pubsub_management.delete_stream_definition, stream_def_id)

        stream_id, route = self.pubsub_management.create_stream('ctd1', 'xp1', stream_definition_id=stream_def_id)
        self.addCleanup(self.pubsub_management.delete_stream, stream_id)

        sub_id = self.pubsub_management.create_subscription('sub1', stream_ids=[stream_id])
        self.addCleanup(self.pubsub_management.delete_subscription, sub_id)
        self.pubsub_management.activate_subscription(sub_id)
        self.addCleanup(self.pubsub_management.deactivate_subscription, sub_id)

        verified = Event()
        def verifier(msg, route, stream_id):
            for k,v in msg.record_dictionary.iteritems():
                if v is not None:
                    self.assertIsInstance(v, np.ndarray)
            rdt = RecordDictionaryTool.load_from_granule(msg)
            for k,v in rdt.iteritems():
                self.assertIsInstance(rdt[k], np.ndarray)
                self.assertIsInstance(v, np.ndarray)
            verified.set()

        subscriber = StandaloneStreamSubscriber('sub1', callback=verifier)
        subscriber.start()
        self.addCleanup(subscriber.stop)

        publisher = StandaloneStreamPublisher(stream_id,route)
        rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
        ph.fill_rdt(rdt,10)
        publisher.publish(rdt.to_granule())
        self.assertTrue(verified.wait(60))
开发者ID:ednad,项目名称:coi-services,代码行数:35,代码来源:test_granule.py


示例3: create_output_data_product

    def create_output_data_product(self):
        dp1_outgoing_stream_id = self.pubsub_client.create_stream_definition(name='dp1_stream', parameter_dictionary_id=self.parameter_dict_id)

        dp1_output_dp_obj = IonObject(  RT.DataProduct,
            name='data_process1_data_product',
            description='output of add array func',
            temporal_domain = self.time_dom.dump(),
            spatial_domain = self.spatial_dom.dump())

        dp1_func_output_dp_id = self.dataproductclient.create_data_product(dp1_output_dp_obj,  dp1_outgoing_stream_id)
        self.addCleanup(self.dataproductclient.delete_data_product, dp1_func_output_dp_id)
        # retrieve the id of the OUTPUT stream from the out Data Product and add to granule logger
        stream_ids, _ = self.rrclient.find_objects(dp1_func_output_dp_id, PRED.hasStream, None, True)
        self._output_stream_ids.append(stream_ids[0])

        subscription_id = self.pubsub_client.create_subscription('validator', data_product_ids=[dp1_func_output_dp_id])
        self.addCleanup(self.pubsub_client.delete_subscription, subscription_id)

        def on_granule(msg, route, stream_id):
            log.debug('recv_packet stream_id: %s route: %s   msg: %s', stream_id, route, msg)
            self.validate_output_granule(msg, route, stream_id)
            self.granule_verified.set()

        validator = StandaloneStreamSubscriber('validator', callback=on_granule)
        validator.start()
        self.addCleanup(validator.stop)

        self.pubsub_client.activate_subscription(subscription_id)
        self.addCleanup(self.pubsub_client.deactivate_subscription, subscription_id)

        return dp1_func_output_dp_id
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:31,代码来源:test_transform_worker.py


示例4: test_data_product_subscription

    def test_data_product_subscription(self):
        pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
        stream_def_id = self.pubsub_management.create_stream_definition('ctd parsed', parameter_dictionary_id=pdict_id)
        self.addCleanup(self.pubsub_management.delete_stream_definition, stream_def_id)

        tdom, sdom = time_series_domain()
        dp = DataProduct(name='ctd parsed')
        dp.spatial_domain = sdom.dump()
        dp.temporal_domain = tdom.dump()

        data_product_id = self.data_product_management.create_data_product(data_product=dp, stream_definition_id=stream_def_id)
        self.addCleanup(self.data_product_management.delete_data_product, data_product_id)

        subscription_id = self.pubsub_management.create_subscription('validator', data_product_ids=[data_product_id])
        self.addCleanup(self.pubsub_management.delete_subscription, subscription_id)

        validated = Event()
        def validation(msg, route, stream_id):
            validated.set()

        stream_ids, _ = self.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasStream, id_only=True)
        dp_stream_id = stream_ids.pop()

        validator = StandaloneStreamSubscriber('validator', callback=validation)
        validator.start()
        self.addCleanup(validator.stop)

        self.pubsub_management.activate_subscription(subscription_id)
        self.addCleanup(self.pubsub_management.deactivate_subscription, subscription_id)

        route = self.pubsub_management.read_stream_route(dp_stream_id)

        publisher = StandaloneStreamPublisher(dp_stream_id, route)
        publisher.publish('hi')
        self.assertTrue(validated.wait(10))
开发者ID:mbarry02,项目名称:coi-services,代码行数:35,代码来源:test_pubsub.py


示例5: test_event_in_stream_out_transform

    def test_event_in_stream_out_transform(self):
        """
        Test the event-in/stream-out transform
        """

        stream_id, _ = self.pubsub.create_stream('test_stream', exchange_point='science_data')
        self.exchange_cleanup.append('science_data')

        #---------------------------------------------------------------------------------------------
        # Launch a ctd transform
        #---------------------------------------------------------------------------------------------
        # Create the process definition
        process_definition = ProcessDefinition(
            name='EventToStreamTransform',
            description='For testing an event-in/stream-out transform')
        process_definition.executable['module']= 'ion.processes.data.transforms.event_in_stream_out_transform'
        process_definition.executable['class'] = 'EventToStreamTransform'
        proc_def_id = self.process_dispatcher.create_process_definition(process_definition=process_definition)

        # Build the config
        config = DotDict()
        config.process.queue_name = 'test_queue'
        config.process.exchange_point = 'science_data'
        config.process.publish_streams.output = stream_id
        config.process.event_type = 'ExampleDetectableEvent'
        config.process.variables = ['voltage', 'temperature' ]

        # Schedule the process
        pid = self.process_dispatcher.schedule_process(process_definition_id=proc_def_id, configuration=config)
        self.addCleanup(self.process_dispatcher.cancel_process,pid)

        #---------------------------------------------------------------------------------------------
        # Create a subscriber for testing
        #---------------------------------------------------------------------------------------------

        ar_cond = gevent.event.AsyncResult()
        def subscriber_callback(m, r, s):
            ar_cond.set(m)
        sub = StandaloneStreamSubscriber('sub', subscriber_callback)
        self.addCleanup(sub.stop)
        sub_id = self.pubsub.create_subscription('subscription_cond',
            stream_ids=[stream_id],
            exchange_name='sub')
        self.pubsub.activate_subscription(sub_id)
        self.queue_cleanup.append(sub.xn.queue)
        sub.start()

        gevent.sleep(4)

        #---------------------------------------------------------------------------------------------
        # Publish an event. The transform has been configured to receive this event
        #---------------------------------------------------------------------------------------------

        event_publisher = EventPublisher("ExampleDetectableEvent")
        event_publisher.publish_event(origin = 'fake_origin', voltage = '5', temperature = '273')

        # Assert that the transform processed the event and published data on the output stream
        result_cond = ar_cond.get(timeout=10)
        self.assertTrue(result_cond)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:59,代码来源:event_management_test.py


示例6: test_granule

    def test_granule(self):
        
        pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
        stream_def_id = self.pubsub_management.create_stream_definition('ctd', parameter_dictionary_id=pdict_id, stream_configuration={'reference_designator':"GA03FLMA-RI001-13-CTDMOG999"})
        pdict = DatasetManagementService.get_parameter_dictionary_by_name('ctd_parsed_param_dict')
        self.addCleanup(self.pubsub_management.delete_stream_definition,stream_def_id)

        stream_id, route = self.pubsub_management.create_stream('ctd_stream', 'xp1', stream_definition_id=stream_def_id)
        self.addCleanup(self.pubsub_management.delete_stream,stream_id)
        publisher = StandaloneStreamPublisher(stream_id, route)

        subscriber = StandaloneStreamSubscriber('sub', self.verify_incoming)
        subscriber.start()
        self.addCleanup(subscriber.stop)

        subscription_id = self.pubsub_management.create_subscription('sub', stream_ids=[stream_id])
        self.pubsub_management.activate_subscription(subscription_id)


        rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
        rdt['time'] = np.arange(10)
        rdt['temp'] = np.random.randn(10) * 10 + 30
        rdt['pressure'] = [20] * 10

        self.assertEquals(set(pdict.keys()), set(rdt.fields))
        self.assertEquals(pdict.temporal_parameter_name, rdt.temporal_parameter)

        self.assertEquals(rdt._stream_config['reference_designator'],"GA03FLMA-RI001-13-CTDMOG999")

        self.rdt = rdt
        self.data_producer_id = 'data_producer'
        self.provider_metadata_update = {1:1}

        publisher.publish(rdt.to_granule(data_producer_id='data_producer', provider_metadata_update={1:1}))

        self.assertTrue(self.event.wait(10))
        
        self.pubsub_management.deactivate_subscription(subscription_id)
        self.pubsub_management.delete_subscription(subscription_id)
        
        rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
        rdt['time'] = np.array([None,None,None])
        self.assertTrue(rdt['time'] is None)
        
        rdt['time'] = np.array([None, 1, 2])
        self.assertEquals(rdt['time'][0], rdt.fill_value('time'))


        stream_def_obj = self.pubsub_management.read_stream_definition(stream_def_id)
        rdt = RecordDictionaryTool(stream_definition=stream_def_obj)
        rdt['time'] = np.arange(20)
        rdt['temp'] = np.arange(20)


        granule = rdt.to_granule()
        rdt = RecordDictionaryTool.load_from_granule(granule)
        np.testing.assert_array_equal(rdt['time'], np.arange(20))
        np.testing.assert_array_equal(rdt['temp'], np.arange(20))
开发者ID:ednad,项目名称:coi-services,代码行数:58,代码来源:test_granule.py


示例7: test_full_pubsub

    def test_full_pubsub(self):

        self.sub1_sat = Event()
        self.sub2_sat = Event()

        def subscriber1(m, r, s):
            self.sub1_sat.set()

        def subscriber2(m, r, s):
            self.sub2_sat.set()

        sub1 = StandaloneStreamSubscriber("sub1", subscriber1)
        self.queue_cleanup.append(sub1.xn.queue)
        sub1.start()

        sub2 = StandaloneStreamSubscriber("sub2", subscriber2)
        self.queue_cleanup.append(sub2.xn.queue)
        sub2.start()

        log_topic = self.pubsub_management.create_topic("instrument_logs", exchange_point="instruments")
        science_topic = self.pubsub_management.create_topic("science_data", exchange_point="instruments")
        events_topic = self.pubsub_management.create_topic("notifications", exchange_point="events")

        log_stream, route = self.pubsub_management.create_stream(
            "instrument1-logs", topic_ids=[log_topic], exchange_point="instruments"
        )
        ctd_stream, route = self.pubsub_management.create_stream(
            "instrument1-ctd", topic_ids=[science_topic], exchange_point="instruments"
        )
        event_stream, route = self.pubsub_management.create_stream(
            "notifications", topic_ids=[events_topic], exchange_point="events"
        )
        raw_stream, route = self.pubsub_management.create_stream("temp", exchange_point="global.data")
        self.exchange_cleanup.extend(["instruments", "events", "global.data"])

        subscription1 = self.pubsub_management.create_subscription(
            "subscription1", stream_ids=[log_stream, event_stream], exchange_name="sub1"
        )
        subscription2 = self.pubsub_management.create_subscription(
            "subscription2", exchange_points=["global.data"], stream_ids=[ctd_stream], exchange_name="sub2"
        )

        self.pubsub_management.activate_subscription(subscription1)
        self.pubsub_management.activate_subscription(subscription2)

        self.publish_on_stream(log_stream, 1)
        self.assertTrue(self.sub1_sat.wait(4))
        self.assertFalse(self.sub2_sat.is_set())

        self.publish_on_stream(raw_stream, 1)
        self.assertTrue(self.sub1_sat.wait(4))

        sub1.stop()
        sub2.stop()
开发者ID:blazetopher,项目名称:coi-services,代码行数:54,代码来源:test_pubsub.py


示例8: _start_data_subscriber

    def _start_data_subscriber(self, config, callback):
        """
        Setup and start a data subscriber
        """
        exchange_point = config['exchange_point']
        stream_id = config['stream_id']

        sub = StandaloneStreamSubscriber(exchange_point, callback)
        sub.start()
        self._data_subscribers.append(sub)

        pubsub_client = PubsubManagementServiceClient(node=self.container.node)
        sub_id = pubsub_client.create_subscription(name=exchange_point, stream_ids=[stream_id])
        pubsub_client.activate_subscription(sub_id)
        sub.subscription_id = sub_id # Bind the subscription to the standalone subscriber (easier cleanup, not good in real practice)
开发者ID:edwardhunter,项目名称:coi-services,代码行数:15,代码来源:dataset_test.py


示例9: start_output_stream_and_listen

    def start_output_stream_and_listen(self, ctd_stream_id, data_product_stream_ids, message_count_per_stream=10):
        assertions = self.assertTrue
        exchange_name = 'workflow_test'

        ###
        ### Make a subscriber in the test to listen for transformed data
        ###

        salinity_subscription_id = self.pubsubclient.create_subscription(
                name = 'test workflow transformations',
                exchange_name = exchange_name,
                stream_ids = data_product_stream_ids
                )

        result = gevent.event.AsyncResult()
        results = []
        message_count = len(data_product_stream_ids) * message_count_per_stream

        def message_received(message, stream_route, stream_id):
            # Heads
            results.append(message)
            if len(results) >= message_count:   #Only wait for so many messages - per stream
                result.set(True)

        subscriber = StandaloneStreamSubscriber(exchange_name='workflow_test', callback=message_received)
        subscriber.xn.purge()
        self.addCleanup(subscriber.xn.delete)
        subscriber.start()

        # after the queue has been created it is safe to activate the subscription
        self.pubsubclient.activate_subscription(subscription_id=salinity_subscription_id)

        #Start the input stream process
        if ctd_stream_id is not None:
            ctd_sim_pid = self.start_simple_input_stream_process(ctd_stream_id)

        # Assert that we have received data
        assertions(result.get(timeout=30))

        # stop the flow parse the messages...
        if ctd_stream_id is not None:
            self.process_dispatcher.cancel_process(ctd_sim_pid) # kill the ctd simulator process - that is enough data

        self.pubsubclient.deactivate_subscription(subscription_id=salinity_subscription_id)

        subscriber.stop()

        return results
开发者ID:blazetopher,项目名称:coi-services,代码行数:48,代码来源:test_helper.py


示例10: test_transform_prime_no_available_fields

    def test_transform_prime_no_available_fields(self):
        available_fields_in = []
        available_fields_out = []
        exchange_pt1 = 'xp1'
        exchange_pt2 = 'xp2'
        stream_id_in,stream_id_out,stream_route_in,stream_route_out,stream_def_in_id,stream_def_out_id = self._setup_streams(exchange_pt1, exchange_pt2, available_fields_in, available_fields_out)
        
        #launch transform
        config = {'process':{'routes':{(stream_id_in, stream_id_out):None},'queue_name':exchange_pt1, 'publish_streams':{str(stream_id_out):stream_id_out}, 'process_type':'stream_process'}}
        pid = self.container.spawn_process('transform_stream','ion.processes.data.transforms.transform_prime','TransformPrime',config)
        
        #create publish
        publisher = StandaloneStreamPublisher(stream_id_in, stream_route_in)
        self.container.proc_manager.procs[pid].subscriber.xn.bind(stream_route_in.routing_key, publisher.xp)

        #data
        rdt_in = RecordDictionaryTool(stream_definition_id=stream_def_in_id)
        dt = 20
        rdt_in['time'] = np.arange(dt)
        rdt_in['lat'] = [40.992469] * dt
        rdt_in['lon'] = [-71.727069] * dt
        rdt_in['TEMPWAT_L0'] = self._get_param_vals('TEMPWAT_L0', slice(None), (dt,))
        rdt_in['CONDWAT_L0'] = self._get_param_vals('CONDWAT_L0', slice(None), (dt,))
        rdt_in['PRESWAT_L0'] = self._get_param_vals('PRESWAT_L0', slice(None), (dt,))
        msg = rdt_in.to_granule()
        #publish granule to transform and have transform publish it to subsciber
        
        #validate transformed data
        e = gevent.event.Event()
        def cb(msg, sr, sid):
            self.assertEqual(sid, stream_id_out)
            rdt_out = RecordDictionaryTool.load_from_granule(msg)
            self.assertEquals(set([k for k,v in rdt_out.iteritems()]), set(available_fields_out))
            for k,v in rdt_out.iteritems():
                self.assertEquals(rdt_out[k], None)
            e.set()

        sub = StandaloneStreamSubscriber('stream_subscriber', cb)
        sub.xn.bind(stream_route_out.routing_key, getattr(self.container.proc_manager.procs[pid], stream_id_out).xp)
        self.addCleanup(sub.stop)
        sub.start()
        
        #publish msg to transform
        publisher.publish(msg)
        
        #wait to receive msg
        self.assertTrue(e.wait(4))
开发者ID:blazetopher,项目名称:coi-services,代码行数:47,代码来源:test_transform_prime.py


示例11: _start_data_subscribers

    def _start_data_subscribers(self, count):
        """
        """        
        # Create a pubsub client to create streams.
        pubsub_client = PubsubManagementServiceClient(node=self.container.node)
                
        # Create streams and subscriptions for each stream named in driver.
        self._data_subscribers = []
        self._samples_received = []
        #self._async_data_result = AsyncResult()

        strXterm = "xterm -T InstrumentScienceData -sb -rightbar "
        pOpenString = strXterm + " -e tail -f " + PIPE_PATH
        subprocess.Popen(['xterm', '-T', 'InstrumentScienceData', '-e', 'tail', '-f', PIPE_PATH])        
        #subprocess.Popen(pOpenString)
                
        #self.pipeData = open(PIPE_PATH, "w", 1) 

        # A callback for processing subscribed-to data.
        def recv_data(message, stream_route, stream_id):
            print 'Received message on ' + str(stream_id) + ' (' + str(stream_route.exchange_point) + ',' + str(stream_route.routing_key) + ')'
            log.info('Received message on %s (%s,%s)', stream_id, stream_route.exchange_point, stream_route.routing_key)
             
            self.pipeData = open(PIPE_PATH, "w", 1) 
            self.pipeData.write(str(message))
            self.pipeData.flush()
            self.pipeData.close()      

            self._samples_received.append(message)
            #if len(self._samples_received) == count:
                #self._async_data_result.set()

        for (stream_name, stream_config) in self._stream_config.iteritems():
            
            stream_id = stream_config['stream_id']
            
            # Create subscriptions for each stream.

            exchange_name = '%s_queue' % stream_name
            self._purge_queue(exchange_name)
            sub = StandaloneStreamSubscriber(exchange_name, recv_data)
            sub.start()
            self._data_subscribers.append(sub)
            print 'stream_id: %s' % stream_id
            sub_id = pubsub_client.create_subscription(name=exchange_name, stream_ids=[stream_id])
            pubsub_client.activate_subscription(sub_id)
            sub.subscription_id = sub_id # Bind the subscription to the standalone subscriber (easier cleanup, not good in real practice)
开发者ID:ccenter,项目名称:marine-integrations,代码行数:47,代码来源:run_instrument.py


示例12: _start_data_subscribers

    def _start_data_subscribers(self):
        """
        """

        # Create streams and subscriptions for each stream named in driver.
        self._stream_config = {}
        self._data_subscribers = []

        #
        # TODO retrieve appropriate stream definitions; for the moment, using
        # adhoc_get_stream_names
        #

        # A callback for processing subscribed-to data.
        def consume_data(message, stream_route, stream_id):
            log.info('Subscriber received data message: %s.' % str(message))
            self._samples_received.append(message)
            self._async_data_result.set()

        for stream_name in adhoc_get_stream_names():
            log.info('creating stream %r ...', stream_name)

            # TODO use appropriate exchange_point
            stream_id, stream_route = self._pubsub_client.create_stream(
                name=stream_name, exchange_point='science_data')

            log.info('create_stream(%r): stream_id=%r, stream_route=%s',
                     stream_name, stream_id, str(stream_route))

            pdict = adhoc_get_parameter_dictionary(stream_name)
            stream_config = dict(stream_route=stream_route.routing_key,
                                 stream_id=stream_id,
                                 parameter_dictionary=pdict.dump())

            self._stream_config[stream_name] = stream_config
            log.info('_stream_config[%r]= %r', stream_name, stream_config)

            # Create subscriptions for each stream.
            exchange_name = '%s_queue' % stream_name
            self._purge_queue(exchange_name)
            sub = StandaloneStreamSubscriber(exchange_name, consume_data)
            sub.start()
            self._data_subscribers.append(sub)
            sub_id = self._pubsub_client.create_subscription(name=exchange_name, stream_ids=[stream_id])
            self._pubsub_client.activate_subscription(sub_id)
            sub.subscription_id = sub_id
开发者ID:kerfoot,项目名称:coi-services,代码行数:46,代码来源:test_platform_agent_with_oms.py


示例13: test_full_pubsub

    def test_full_pubsub(self):

        self.sub1_sat = Event()
        self.sub2_sat = Event()

        def subscriber1(m,r,s):
            self.sub1_sat.set()

        def subscriber2(m,r,s):
            self.sub2_sat.set()

        sub1 = StandaloneStreamSubscriber('sub1', subscriber1)
        self.queue_cleanup.append(sub1.xn.queue)
        sub1.start()

        sub2 = StandaloneStreamSubscriber('sub2', subscriber2)
        self.queue_cleanup.append(sub2.xn.queue)
        sub2.start()

        log_topic = self.pubsub_management.create_topic('instrument_logs', exchange_point='instruments')
        science_topic = self.pubsub_management.create_topic('science_data', exchange_point='instruments')
        events_topic = self.pubsub_management.create_topic('notifications', exchange_point='events')


        log_stream, route = self.pubsub_management.create_stream('instrument1-logs', topic_ids=[log_topic], exchange_point='instruments')
        ctd_stream, route = self.pubsub_management.create_stream('instrument1-ctd', topic_ids=[science_topic], exchange_point='instruments')
        event_stream, route = self.pubsub_management.create_stream('notifications', topic_ids=[events_topic], exchange_point='events')
        raw_stream, route = self.pubsub_management.create_stream('temp', exchange_point='global.data')
        self.exchange_cleanup.extend(['instruments','events','global.data'])


        subscription1 = self.pubsub_management.create_subscription('subscription1', stream_ids=[log_stream,event_stream], exchange_name='sub1')
        subscription2 = self.pubsub_management.create_subscription('subscription2', exchange_points=['global.data'], stream_ids=[ctd_stream], exchange_name='sub2')

        self.pubsub_management.activate_subscription(subscription1)
        self.pubsub_management.activate_subscription(subscription2)

        self.publish_on_stream(log_stream, 1)
        self.assertTrue(self.sub1_sat.wait(4))
        self.assertFalse(self.sub2_sat.is_set())

        self.publish_on_stream(raw_stream,1)
        self.assertTrue(self.sub1_sat.wait(4))

        sub1.stop()
        sub2.stop()
开发者ID:swarbhanu,项目名称:coi-services,代码行数:46,代码来源:test_pubsub.py


示例14: setup_subscriber

    def setup_subscriber(self, data_product_id, callback):
        stream_ids, _ = self.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasStream, id_only=True)
        self.assertTrue(len(stream_ids))
        stream_id = stream_ids.pop()

        sub_id = self.pubsub_management.create_subscription('validator_%s'%self.validators, stream_ids=[stream_id])
        self.addCleanup(self.pubsub_management.delete_subscription, sub_id)


        self.pubsub_management.activate_subscription(sub_id)
        self.addCleanup(self.pubsub_management.deactivate_subscription, sub_id)

        subscriber = StandaloneStreamSubscriber('validator_%s' % self.validators, callback=callback)
        subscriber.start()
        self.addCleanup(subscriber.stop)
        self.validators+=1

        return subscriber
开发者ID:mbarry02,项目名称:coi-services,代码行数:18,代码来源:test_int_data_process_management_service.py


示例15: test_move_activated_subscription

    def test_move_activated_subscription(self):

        stream_id, route = self.pubsub_management.create_stream(name="test_stream", exchange_point="test_xp")
        # --------------------------------------------------------------------------------
        # Test moving after activate
        # --------------------------------------------------------------------------------

        subscription_id = self.pubsub_management.create_subscription("first_queue", stream_ids=[stream_id])
        self.pubsub_management.activate_subscription(subscription_id)

        xn_ids, _ = self.resource_registry.find_resources(restype=RT.ExchangeName, name="first_queue", id_only=True)
        subjects, _ = self.resource_registry.find_subjects(
            object=subscription_id, predicate=PRED.hasSubscription, id_only=True
        )
        self.assertEquals(xn_ids[0], subjects[0])

        self.verified = Event()

        def verify(m, r, s):
            self.assertEquals(m, "verified")
            self.verified.set()

        subscriber = StandaloneStreamSubscriber("second_queue", verify)
        subscriber.start()

        self.pubsub_management.move_subscription(subscription_id, exchange_name="second_queue")

        xn_ids, _ = self.resource_registry.find_resources(restype=RT.ExchangeName, name="second_queue", id_only=True)
        subjects, _ = self.resource_registry.find_subjects(
            object=subscription_id, predicate=PRED.hasSubscription, id_only=True
        )

        self.assertEquals(len(subjects), 1)
        self.assertEquals(subjects[0], xn_ids[0])

        publisher = StandaloneStreamPublisher(stream_id, route)
        publisher.publish("verified")

        self.assertTrue(self.verified.wait(2))

        self.pubsub_management.deactivate_subscription(subscription_id)

        self.pubsub_management.delete_subscription(subscription_id)
        self.pubsub_management.delete_stream(stream_id)
开发者ID:blazetopher,项目名称:coi-services,代码行数:44,代码来源:test_pubsub.py


示例16: _start_data_subscribers

    def _start_data_subscribers(self, count):
        """
        """
        # Create a pubsub client to create streams.
        pubsub_client = PubsubManagementServiceClient(node=self.container.node)

        # Create streams and subscriptions for each stream named in driver.
        self._data_subscribers = []
        self._samples_received = []
        self._async_sample_result = AsyncResult()

        # A callback for processing subscribed-to data.
        def recv_data(message, stream_route, stream_id):
            log.info('Received message on %s (%s,%s)', stream_id,
                     stream_route.exchange_point, stream_route.routing_key)
            #print '######################## stream message:'
            #print str(message)
            self._samples_received.append(message)
            if len(self._samples_received) == count:
                self._async_sample_result.set()

        for (stream_name, stream_config) in self._stream_config.iteritems():

            if stream_name == 'parsed':

                # Create subscription for parsed stream only.

                stream_id = stream_config['stream_id']

                from pyon.util.containers import create_unique_identifier
                # exchange_name = '%s_queue' % stream_name
                exchange_name = create_unique_identifier(
                    "%s_queue" % stream_name)
                self._purge_queue(exchange_name)
                sub = StandaloneStreamSubscriber(exchange_name, recv_data)
                sub.start()
                self._data_subscribers.append(sub)
                print 'stream_id: %s' % stream_id
                sub_id = pubsub_client.create_subscription(
                    name=exchange_name, stream_ids=[stream_id])
                pubsub_client.activate_subscription(sub_id)
                sub.subscription_id = sub_id  # Bind the subscription to the standalone subscriber (easier cleanup, not good in real practice)
开发者ID:newbrough,项目名称:coi-services,代码行数:42,代码来源:test_ia_alarms.py


示例17: test_activation_and_deactivation

    def test_activation_and_deactivation(self):
        stream_id, route = self.pubsub_management.create_stream('stream1','xp1')
        subscription_id = self.pubsub_management.create_subscription('sub1', stream_ids=[stream_id])

        self.check1 = Event()

        def verifier(m,r,s):
            self.check1.set()


        subscriber = StandaloneStreamSubscriber('sub1',verifier)
        subscriber.start()

        publisher = StandaloneStreamPublisher(stream_id, route)
        publisher.publish('should not receive')

        self.assertFalse(self.check1.wait(0.25))

        self.pubsub_management.activate_subscription(subscription_id)

        publisher.publish('should receive')
        self.assertTrue(self.check1.wait(2))

        self.check1.clear()
        self.assertFalse(self.check1.is_set())

        self.pubsub_management.deactivate_subscription(subscription_id)

        publisher.publish('should not receive')
        self.assertFalse(self.check1.wait(0.5))

        self.pubsub_management.activate_subscription(subscription_id)

        publisher.publish('should receive')
        self.assertTrue(self.check1.wait(2))

        subscriber.stop()

        self.pubsub_management.deactivate_subscription(subscription_id)
        self.pubsub_management.delete_subscription(subscription_id)
        self.pubsub_management.delete_stream(stream_id)
开发者ID:mbarry02,项目名称:coi-services,代码行数:41,代码来源:test_pubsub.py


示例18: start_data_subscribers

    def start_data_subscribers(self):
        """
        """
        # Create a pubsub client to create streams.
        pubsub_client = PubsubManagementServiceClient(node=self.container.node)

        # Create streams and subscriptions for each stream named in driver.
        self.data_subscribers = {}
        self.samples_received = {}

        # A callback for processing subscribed-to data.
        def recv_data(message, stream_route, stream_id):
            log.info("Received message on %s (%s,%s)", stream_id, stream_route.exchange_point, stream_route.routing_key)

            name = None

            for (stream_name, stream_config) in self.stream_config.iteritems():
                if stream_route.routing_key == stream_config["routing_key"]:
                    log.debug(
                        "NAME: %s %s == %s" % (stream_name, stream_route.routing_key, stream_config["routing_key"])
                    )
                    name = stream_name
                    if not self.samples_received.get(stream_name):
                        self.samples_received[stream_name] = []
                    self.samples_received[stream_name].append(message)
                    log.debug("Add message to stream '%s' value: %s" % (stream_name, message))

        for (stream_name, stream_config) in self.stream_config.iteritems():
            stream_id = stream_config["stream_id"]

            # Create subscriptions for each stream.
            exchange_name = "%s_queue" % stream_name
            self._purge_queue(exchange_name)
            sub = StandaloneStreamSubscriber(exchange_name, recv_data)
            sub.start()
            self.data_subscribers[stream_name] = sub
            sub_id = pubsub_client.create_subscription(name=exchange_name, stream_ids=[stream_id])
            pubsub_client.activate_subscription(sub_id)
            sub.subscription_id = (
                sub_id
            )  # Bind the subscription to the standalone subscriber (easier cleanup, not good in real practice)
开发者ID:jdosher,项目名称:marine-integrations,代码行数:41,代码来源:instrument_agent_client.py


示例19: _start_subscriber_to_transform

    def _start_subscriber_to_transform(self,  name_of_transform = '', stream_id = ''):

        ar = gevent.event.AsyncResult()
        def subscriber(m,r,s):
            ar.set(m)

        sub = StandaloneStreamSubscriber(exchange_name='sub_%s' % name_of_transform, callback=subscriber)

        # Note that this running the below line creates an exchange since none of that name exists before
        sub_id = self.pubsub.create_subscription('subscriber_to_transform_%s' % name_of_transform,
            stream_ids=[stream_id],
            exchange_name='sub_%s' % name_of_transform)
        self.addCleanup(self.pubsub.delete_subscription, sub_id)

        self.pubsub.activate_subscription(sub_id)
        self.addCleanup(self.pubsub.deactivate_subscription, sub_id)

        sub.start()
        self.addCleanup(sub.stop)

        return ar
开发者ID:Bobfrat,项目名称:coi-services,代码行数:21,代码来源:test_ctdbp_chain_L0_L1_L2.py


示例20: test_demux

    def test_demux(self):
        self.stream0, self.route0 = self.pubsub_client.create_stream('stream0', exchange_point='test')
        self.stream1, self.route1 = self.pubsub_client.create_stream('stream1', exchange_point='main_data')
        self.stream2, self.route2 = self.pubsub_client.create_stream('stream2', exchange_point='alt_data')
        
        self.r_stream1 = gevent.event.Event()
        self.r_stream2 = gevent.event.Event()

        def process(msg, stream_route, stream_id):
            if stream_id == self.stream1:
                self.r_stream1.set()
            elif stream_id == self.stream2:
                self.r_stream2.set()
        
        se 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python channel.BaseChannel类代码示例发布时间:2022-05-27
下一篇:
Python stream.StandaloneStreamPublisher类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap