本文整理汇总了Python中pyon.ion.stream.StandaloneStreamPublisher类的典型用法代码示例。如果您正苦于以下问题:Python StandaloneStreamPublisher类的具体用法?Python StandaloneStreamPublisher怎么用?Python StandaloneStreamPublisher使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了StandaloneStreamPublisher类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_data_product_subscription
def test_data_product_subscription(self):
pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
stream_def_id = self.pubsub_management.create_stream_definition('ctd parsed', parameter_dictionary_id=pdict_id)
self.addCleanup(self.pubsub_management.delete_stream_definition, stream_def_id)
tdom, sdom = time_series_domain()
dp = DataProduct(name='ctd parsed')
dp.spatial_domain = sdom.dump()
dp.temporal_domain = tdom.dump()
data_product_id = self.data_product_management.create_data_product(data_product=dp, stream_definition_id=stream_def_id)
self.addCleanup(self.data_product_management.delete_data_product, data_product_id)
subscription_id = self.pubsub_management.create_subscription('validator', data_product_ids=[data_product_id])
self.addCleanup(self.pubsub_management.delete_subscription, subscription_id)
validated = Event()
def validation(msg, route, stream_id):
validated.set()
stream_ids, _ = self.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasStream, id_only=True)
dp_stream_id = stream_ids.pop()
validator = StandaloneStreamSubscriber('validator', callback=validation)
validator.start()
self.addCleanup(validator.stop)
self.pubsub_management.activate_subscription(subscription_id)
self.addCleanup(self.pubsub_management.deactivate_subscription, subscription_id)
route = self.pubsub_management.read_stream_route(dp_stream_id)
publisher = StandaloneStreamPublisher(dp_stream_id, route)
publisher.publish('hi')
self.assertTrue(validated.wait(10))
开发者ID:mbarry02,项目名称:coi-services,代码行数:35,代码来源:test_pubsub.py
示例2: test_granule_publish
def test_granule_publish(self):
log.debug("test_granule_publish ")
self.loggerpids = []
#retrieve the param dict from the repository
pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict',id_only=True)
stream_definition_id = self.pubsubclient.create_stream_definition('parsed stream', parameter_dictionary_id=pdict_id)
tdom, sdom = time_series_domain()
dp_obj = IonObject(RT.DataProduct,
name=str(uuid.uuid4()),
description='ctd stream test',
temporal_domain = tdom.dump(),
spatial_domain = sdom.dump())
data_product_id1 = self.dpclient.create_data_product(data_product=dp_obj, stream_definition_id=stream_definition_id)
# Retrieve the id of the output stream of the out Data Product
stream_ids, _ = self.rrclient.find_objects(data_product_id1, PRED.hasStream, None, True)
log.debug( 'test_granule_publish: Data product streams1 = %s', stream_ids)
pid = self.create_logger('ctd_parsed', stream_ids[0] )
self.loggerpids.append(pid)
rdt = RecordDictionaryTool(stream_definition_id=stream_definition_id)
#create the publisher from the stream route
stream_route = self.pubsubclient.read_stream_route(stream_ids[0])
publisher = StandaloneStreamPublisher(stream_ids[0], stream_route)
# this is one sample from the ctd driver
tomato = {"driver_timestamp": 3555971105.1268806, "instrument_id": "ABC-123", "pkt_format_id": "JSON_Data", "pkt_version": 1, "preferred_timestamp": "driver_timestamp", "quality_flag": "ok", "stream_name": "parsed", "values": [{"value": 22.9304, "value_id": "temp"}, {"value": 51.57381, "value_id": "conductivity"}, {"value": 915.551, "value_id": "pressure"}]}
for value in tomato['values']:
log.debug("test_granule_publish: Looping tomato values key: %s val: %s ", str(value['value']), str(value['value_id']))
if value['value_id'] in rdt:
rdt[value['value_id']] = numpy.array( [ value['value'] ] )
log.debug("test_granule_publish: Added data item %s val: %s ", str(value['value']), str(value['value_id']) )
g = rdt.to_granule()
publisher.publish(g)
gevent.sleep(3)
for pid in self.loggerpids:
self.processdispatchclient.cancel_process(pid)
#--------------------------------------------------------------------------------
# Cleanup data products
#--------------------------------------------------------------------------------
dp_ids, _ = self.rrclient.find_resources(restype=RT.DataProduct, id_only=True)
for dp_id in dp_ids:
self.dataproductclient.delete_data_product(dp_id)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:60,代码来源:test_granule_publish.py
示例3: test_serialize_compatability
def test_serialize_compatability(self):
ph = ParameterHelper(self.dataset_management, self.addCleanup)
pdict_id = ph.create_extended_parsed()
stream_def_id = self.pubsub_management.create_stream_definition('ctd extended', parameter_dictionary_id=pdict_id)
self.addCleanup(self.pubsub_management.delete_stream_definition, stream_def_id)
stream_id, route = self.pubsub_management.create_stream('ctd1', 'xp1', stream_definition_id=stream_def_id)
self.addCleanup(self.pubsub_management.delete_stream, stream_id)
sub_id = self.pubsub_management.create_subscription('sub1', stream_ids=[stream_id])
self.addCleanup(self.pubsub_management.delete_subscription, sub_id)
self.pubsub_management.activate_subscription(sub_id)
self.addCleanup(self.pubsub_management.deactivate_subscription, sub_id)
verified = Event()
def verifier(msg, route, stream_id):
for k,v in msg.record_dictionary.iteritems():
if v is not None:
self.assertIsInstance(v, np.ndarray)
rdt = RecordDictionaryTool.load_from_granule(msg)
for k,v in rdt.iteritems():
self.assertIsInstance(rdt[k], np.ndarray)
self.assertIsInstance(v, np.ndarray)
verified.set()
subscriber = StandaloneStreamSubscriber('sub1', callback=verifier)
subscriber.start()
self.addCleanup(subscriber.stop)
publisher = StandaloneStreamPublisher(stream_id,route)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
ph.fill_rdt(rdt,10)
publisher.publish(rdt.to_granule())
self.assertTrue(verified.wait(60))
开发者ID:ednad,项目名称:coi-services,代码行数:35,代码来源:test_granule.py
示例4: test_qc_events
def test_qc_events(self):
ph = ParameterHelper(self.dataset_management, self.addCleanup)
pdict_id = ph.create_qc_pdict()
stream_def_id = self.pubsub_management.create_stream_definition('qc stream def', parameter_dictionary_id=pdict_id)
self.addCleanup(self.pubsub_management.delete_stream_definition, stream_def_id)
stream_id, route = self.pubsub_management.create_stream('qc stream', exchange_point=self.exchange_point_name, stream_definition_id=stream_def_id)
self.addCleanup(self.pubsub_management.delete_stream, stream_id)
ingestion_config_id = self.get_ingestion_config()
dataset_id = self.create_dataset(pdict_id)
config = DotDict()
self.ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingestion_config_id, dataset_id=dataset_id, config=config)
self.addCleanup(self.ingestion_management.unpersist_data_stream, stream_id, ingestion_config_id)
publisher = StandaloneStreamPublisher(stream_id, route)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.arange(10)
rdt['temp'] = np.arange(10) * 3
verified = Event()
def verification(event, *args, **kwargs):
self.assertEquals(event.qc_parameter, 'temp_qc')
self.assertEquals(event.temporal_value, 7)
verified.set()
es = EventSubscriber(event_type=OT.ParameterQCEvent, origin=dataset_id, callback=verification, auto_delete=True)
es.start()
self.addCleanup(es.stop)
publisher.publish(rdt.to_granule())
self.assertTrue(verified.wait(10))
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:33,代码来源:test_dm_end_2_end.py
示例5: publish_to_data_product
def publish_to_data_product(self, data_product_id):
stream_ids, _ = self.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasStream, id_only=True)
self.assertTrue(len(stream_ids))
stream_id = stream_ids.pop()
route = self.pubsub_management.read_stream_route(stream_id)
stream_definition = self.pubsub_management.read_stream_definition(stream_id=stream_id)
stream_def_id = stream_definition._id
publisher = StandaloneStreamPublisher(stream_id, route)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
now = time.time()
ntp_now = now + 2208988800 # Do not use in production, this is a loose translation
rdt['internal_timestamp'] = [ntp_now]
rdt['temp'] = [300000]
rdt['preferred_timestamp'] = ['driver_timestamp']
rdt['time'] = [ntp_now]
rdt['port_timestamp'] = [ntp_now]
rdt['quality_flag'] = [None]
rdt['lat'] = [45]
rdt['conductivity'] = [4341400]
rdt['driver_timestamp'] = [ntp_now]
rdt['lon'] = [-71]
rdt['pressure'] = [256.8]
granule = rdt.to_granule()
publisher.publish(granule)
开发者ID:mbarry02,项目名称:coi-services,代码行数:26,代码来源:test_int_data_process_management_service.py
示例6: test_ingestion_pause
def test_ingestion_pause(self):
ctd_stream_id, route, stream_def_id, dataset_id = self.make_simple_dataset()
ingestion_config_id = self.get_ingestion_config()
self.start_ingestion(ctd_stream_id, dataset_id)
self.addCleanup(self.stop_ingestion, ctd_stream_id)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.arange(10)
publisher = StandaloneStreamPublisher(ctd_stream_id, route)
monitor = DatasetMonitor(dataset_id)
self.addCleanup(monitor.stop)
publisher.publish(rdt.to_granule())
self.assertTrue(monitor.wait())
granule = self.data_retriever.retrieve(dataset_id)
self.ingestion_management.pause_data_stream(ctd_stream_id, ingestion_config_id)
monitor.event.clear()
rdt['time'] = np.arange(10,20)
publisher.publish(rdt.to_granule())
self.assertFalse(monitor.event.wait(1))
self.ingestion_management.resume_data_stream(ctd_stream_id, ingestion_config_id)
self.assertTrue(monitor.wait())
granule = self.data_retriever.retrieve(dataset_id)
rdt2 = RecordDictionaryTool.load_from_granule(granule)
np.testing.assert_array_almost_equal(rdt2['time'], np.arange(20))
开发者ID:ednad,项目名称:coi-services,代码行数:31,代码来源:test_dm_end_2_end.py
示例7: test_execute_advanced_transform
def test_execute_advanced_transform(self):
# Runs a transform across L0-L2 with stream definitions including available fields
streams = self.setup_advanced_transform()
in_stream_id, in_stream_def_id = streams[0]
out_stream_id, out_stream_defs_id = streams[1]
validation_event = Event()
def validator(msg, route, stream_id):
rdt = RecordDictionaryTool.load_from_granule(msg)
if not np.allclose(rdt['rho'], np.array([1001.0055034])):
return
validation_event.set()
self.setup_validator(validator)
in_route = self.pubsub_management.read_stream_route(in_stream_id)
publisher = StandaloneStreamPublisher(in_stream_id, in_route)
outbound_rdt = RecordDictionaryTool(stream_definition_id=in_stream_def_id)
outbound_rdt['time'] = [0]
outbound_rdt['TEMPWAT_L0'] = [280000]
outbound_rdt['CONDWAT_L0'] = [100000]
outbound_rdt['PRESWAT_L0'] = [2789]
outbound_rdt['lat'] = [45]
outbound_rdt['lon'] = [-71]
outbound_granule = outbound_rdt.to_granule()
publisher.publish(outbound_granule)
self.assertTrue(validation_event.wait(2))
开发者ID:newbrough,项目名称:coi-services,代码行数:32,代码来源:test_transform_prime.py
示例8: _publish_to_transform
def _publish_to_transform(self, stream_id = '', stream_route = None):
pub = StandaloneStreamPublisher(stream_id, stream_route)
publish_granule = self._get_new_ctd_L0_packet(stream_definition_id=self.in_stream_def_id_for_L0, length = 5)
pub.publish(publish_granule)
log.debug("Published the following granule: %s", publish_granule)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:7,代码来源:test_ctdbp_chain_L0_L1_L2.py
示例9: test_derived_data_product
def test_derived_data_product(self):
pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
ctd_stream_def_id = self.pubsubcli.create_stream_definition(name='ctd parsed', parameter_dictionary_id=pdict_id)
self.addCleanup(self.pubsubcli.delete_stream_definition, ctd_stream_def_id)
tdom, sdom = time_series_domain()
dp = DataProduct(name='Instrument DP', temporal_domain=tdom.dump(), spatial_domain=sdom.dump())
dp_id = self.dpsc_cli.create_data_product(dp, stream_definition_id=ctd_stream_def_id)
self.addCleanup(self.dpsc_cli.force_delete_data_product, dp_id)
self.dpsc_cli.activate_data_product_persistence(dp_id)
self.addCleanup(self.dpsc_cli.suspend_data_product_persistence, dp_id)
dataset_ids, _ = self.rrclient.find_objects(subject=dp_id, predicate=PRED.hasDataset, id_only=True)
if not dataset_ids:
raise NotFound("Data Product %s dataset does not exist" % str(dp_id))
dataset_id = dataset_ids[0]
# Make the derived data product
simple_stream_def_id = self.pubsubcli.create_stream_definition(name='TEMPWAT stream def', parameter_dictionary_id=pdict_id, available_fields=['time','temp'])
tempwat_dp = DataProduct(name='TEMPWAT')
tempwat_dp_id = self.dpsc_cli.create_data_product(tempwat_dp, stream_definition_id=simple_stream_def_id, parent_data_product_id=dp_id)
self.addCleanup(self.dpsc_cli.delete_data_product, tempwat_dp_id)
self.dpsc_cli.activate_data_product_persistence(tempwat_dp_id)
self.addCleanup(self.dpsc_cli.suspend_data_product_persistence, tempwat_dp_id)
# Check that the streams associated with the data product are persisted with
stream_ids, _ = self.rrclient.find_objects(dp_id,PRED.hasStream,RT.Stream,True)
for stream_id in stream_ids:
self.assertTrue(self.ingestclient.is_persisted(stream_id))
stream_id = stream_ids[0]
route = self.pubsubcli.read_stream_route(stream_id=stream_id)
rdt = RecordDictionaryTool(stream_definition_id=ctd_stream_def_id)
rdt['time'] = np.arange(20)
rdt['temp'] = np.arange(20)
rdt['pressure'] = np.arange(20)
publisher = StandaloneStreamPublisher(stream_id,route)
dataset_modified = Event()
def cb(*args, **kwargs):
dataset_modified.set()
es = EventSubscriber(event_type=OT.DatasetModified, callback=cb, origin=dataset_id, auto_delete=True)
es.start()
self.addCleanup(es.stop)
publisher.publish(rdt.to_granule())
self.assertTrue(dataset_modified.wait(30))
tempwat_dataset_ids, _ = self.rrclient.find_objects(tempwat_dp_id, PRED.hasDataset, id_only=True)
tempwat_dataset_id = tempwat_dataset_ids[0]
granule = self.data_retriever.retrieve(tempwat_dataset_id, delivery_format=simple_stream_def_id)
rdt = RecordDictionaryTool.load_from_granule(granule)
np.testing.assert_array_equal(rdt['time'], np.arange(20))
self.assertEquals(set(rdt.fields), set(['time','temp']))
开发者ID:MauriceManning,项目名称:coi-services,代码行数:59,代码来源:test_data_product_management_service_integration.py
示例10: publish_and_wait
def publish_and_wait(self, dataset_id, granule):
stream_ids, _ = self.resource_registry.find_objects(dataset_id, PRED.hasStream,id_only=True)
stream_id=stream_ids[0]
route = self.pubsub_management.read_stream_route(stream_id)
publisher = StandaloneStreamPublisher(stream_id,route)
dataset_monitor = DatasetMonitor(dataset_id)
publisher.publish(granule)
self.assertTrue(dataset_monitor.event.wait(10))
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:8,代码来源:test_dm_end_2_end.py
示例11: publish_rdt_to_data_product
def publish_rdt_to_data_product(cls,data_product_id, rdt, connection_id='', connection_index=''):
resource_registry = Container.instance.resource_registry
pubsub_management = PubsubManagementServiceClient()
stream_ids, _ = resource_registry.find_objects(data_product_id,'hasStream',id_only=True)
stream_id = stream_ids[0]
route = pubsub_management.read_stream_route(stream_id)
publisher = StandaloneStreamPublisher(stream_id,route)
publisher.publish(rdt.to_granule(connection_id=connection_id, connection_index=connection_index))
开发者ID:MauriceManning,项目名称:coi-services,代码行数:8,代码来源:parameter_helper.py
示例12: test_granule
def test_granule(self):
pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
stream_def_id = self.pubsub_management.create_stream_definition('ctd', parameter_dictionary_id=pdict_id, stream_configuration={'reference_designator':"GA03FLMA-RI001-13-CTDMOG999"})
pdict = DatasetManagementService.get_parameter_dictionary_by_name('ctd_parsed_param_dict')
self.addCleanup(self.pubsub_management.delete_stream_definition,stream_def_id)
stream_id, route = self.pubsub_management.create_stream('ctd_stream', 'xp1', stream_definition_id=stream_def_id)
self.addCleanup(self.pubsub_management.delete_stream,stream_id)
publisher = StandaloneStreamPublisher(stream_id, route)
subscriber = StandaloneStreamSubscriber('sub', self.verify_incoming)
subscriber.start()
self.addCleanup(subscriber.stop)
subscription_id = self.pubsub_management.create_subscription('sub', stream_ids=[stream_id])
self.pubsub_management.activate_subscription(subscription_id)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.arange(10)
rdt['temp'] = np.random.randn(10) * 10 + 30
rdt['pressure'] = [20] * 10
self.assertEquals(set(pdict.keys()), set(rdt.fields))
self.assertEquals(pdict.temporal_parameter_name, rdt.temporal_parameter)
self.assertEquals(rdt._stream_config['reference_designator'],"GA03FLMA-RI001-13-CTDMOG999")
self.rdt = rdt
self.data_producer_id = 'data_producer'
self.provider_metadata_update = {1:1}
publisher.publish(rdt.to_granule(data_producer_id='data_producer', provider_metadata_update={1:1}))
self.assertTrue(self.event.wait(10))
self.pubsub_management.deactivate_subscription(subscription_id)
self.pubsub_management.delete_subscription(subscription_id)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.array([None,None,None])
self.assertTrue(rdt['time'] is None)
rdt['time'] = np.array([None, 1, 2])
self.assertEquals(rdt['time'][0], rdt.fill_value('time'))
stream_def_obj = self.pubsub_management.read_stream_definition(stream_def_id)
rdt = RecordDictionaryTool(stream_definition=stream_def_obj)
rdt['time'] = np.arange(20)
rdt['temp'] = np.arange(20)
granule = rdt.to_granule()
rdt = RecordDictionaryTool.load_from_granule(granule)
np.testing.assert_array_equal(rdt['time'], np.arange(20))
np.testing.assert_array_equal(rdt['temp'], np.arange(20))
开发者ID:ednad,项目名称:coi-services,代码行数:58,代码来源:test_granule.py
示例13: publish_hifi
def publish_hifi(self,stream_id,stream_route,offset=0):
pub = StandaloneStreamPublisher(stream_id, stream_route)
stream_def = self.pubsub_management.read_stream_definition(stream_id=stream_id)
stream_def_id = stream_def._id
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.arange(10) + (offset * 10)
rdt['temp'] = np.arange(10) + (offset * 10)
pub.publish(rdt.to_granule())
开发者ID:tomoreilly,项目名称:coi-services,代码行数:9,代码来源:test_dm_end_2_end.py
示例14: _publish_granules
def _publish_granules(self, stream_id=None, stream_route=None, values = None,number=None, length=None):
pub = StandaloneStreamPublisher(stream_id, stream_route)
stream_def = self.pubsub_management.read_stream_definition(stream_id=stream_id)
stream_def_id = stream_def._id
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
for i in xrange(number):
rdt['input_voltage'] = values
rdt['preferred_timestamp'] = numpy.array([random.uniform(0,1000) for l in xrange(length)])
g = rdt.to_granule()
pub.publish(g)
开发者ID:kerfoot,项目名称:coi-services,代码行数:13,代码来源:test_transform_prototype.py
示例15: test_transform_prime_no_available_fields
def test_transform_prime_no_available_fields(self):
available_fields_in = []
available_fields_out = []
exchange_pt1 = 'xp1'
exchange_pt2 = 'xp2'
stream_id_in,stream_id_out,stream_route_in,stream_route_out,stream_def_in_id,stream_def_out_id = self._setup_streams(exchange_pt1, exchange_pt2, available_fields_in, available_fields_out)
#launch transform
config = {'process':{'routes':{(stream_id_in, stream_id_out):None},'queue_name':exchange_pt1, 'publish_streams':{str(stream_id_out):stream_id_out}, 'process_type':'stream_process'}}
pid = self.container.spawn_process('transform_stream','ion.processes.data.transforms.transform_prime','TransformPrime',config)
#create publish
publisher = StandaloneStreamPublisher(stream_id_in, stream_route_in)
self.container.proc_manager.procs[pid].subscriber.xn.bind(stream_route_in.routing_key, publisher.xp)
#data
rdt_in = RecordDictionaryTool(stream_definition_id=stream_def_in_id)
dt = 20
rdt_in['time'] = np.arange(dt)
rdt_in['lat'] = [40.992469] * dt
rdt_in['lon'] = [-71.727069] * dt
rdt_in['TEMPWAT_L0'] = self._get_param_vals('TEMPWAT_L0', slice(None), (dt,))
rdt_in['CONDWAT_L0'] = self._get_param_vals('CONDWAT_L0', slice(None), (dt,))
rdt_in['PRESWAT_L0'] = self._get_param_vals('PRESWAT_L0', slice(None), (dt,))
msg = rdt_in.to_granule()
#publish granule to transform and have transform publish it to subsciber
#validate transformed data
e = gevent.event.Event()
def cb(msg, sr, sid):
self.assertEqual(sid, stream_id_out)
rdt_out = RecordDictionaryTool.load_from_granule(msg)
self.assertEquals(set([k for k,v in rdt_out.iteritems()]), set(available_fields_out))
for k,v in rdt_out.iteritems():
self.assertEquals(rdt_out[k], None)
e.set()
sub = StandaloneStreamSubscriber('stream_subscriber', cb)
sub.xn.bind(stream_route_out.routing_key, getattr(self.container.proc_manager.procs[pid], stream_id_out).xp)
self.addCleanup(sub.stop)
sub.start()
#publish msg to transform
publisher.publish(msg)
#wait to receive msg
self.assertTrue(e.wait(4))
开发者ID:blazetopher,项目名称:coi-services,代码行数:47,代码来源:test_transform_prime.py
示例16: test_move_activated_subscription
def test_move_activated_subscription(self):
stream_id, route = self.pubsub_management.create_stream(name="test_stream", exchange_point="test_xp")
# --------------------------------------------------------------------------------
# Test moving after activate
# --------------------------------------------------------------------------------
subscription_id = self.pubsub_management.create_subscription("first_queue", stream_ids=[stream_id])
self.pubsub_management.activate_subscription(subscription_id)
xn_ids, _ = self.resource_registry.find_resources(restype=RT.ExchangeName, name="first_queue", id_only=True)
subjects, _ = self.resource_registry.find_subjects(
object=subscription_id, predicate=PRED.hasSubscription, id_only=True
)
self.assertEquals(xn_ids[0], subjects[0])
self.verified = Event()
def verify(m, r, s):
self.assertEquals(m, "verified")
self.verified.set()
subscriber = StandaloneStreamSubscriber("second_queue", verify)
subscriber.start()
self.pubsub_management.move_subscription(subscription_id, exchange_name="second_queue")
xn_ids, _ = self.resource_registry.find_resources(restype=RT.ExchangeName, name="second_queue", id_only=True)
subjects, _ = self.resource_registry.find_subjects(
object=subscription_id, predicate=PRED.hasSubscription, id_only=True
)
self.assertEquals(len(subjects), 1)
self.assertEquals(subjects[0], xn_ids[0])
publisher = StandaloneStreamPublisher(stream_id, route)
publisher.publish("verified")
self.assertTrue(self.verified.wait(2))
self.pubsub_management.deactivate_subscription(subscription_id)
self.pubsub_management.delete_subscription(subscription_id)
self.pubsub_management.delete_stream(stream_id)
开发者ID:blazetopher,项目名称:coi-services,代码行数:44,代码来源:test_pubsub.py
示例17: test_retrieve_and_transform
def test_retrieve_and_transform(self):
# Stream definition for the CTD data
pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
stream_def_id = self.pubsub_management.create_stream_definition('ctd data', parameter_dictionary_id=pdict_id)
ctd_stream_id, route = self.pubsub_management.create_stream('ctd stream', 'xp1', stream_definition_id=stream_def_id)
# Stream definition for the salinity data
salinity_pdict_id = self.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
sal_stream_def_id = self.pubsub_management.create_stream_definition('sal data', parameter_dictionary_id=salinity_pdict_id)
ingest_config_id = self.get_ingestion_config()
dataset_id = self.create_dataset(pdict_id)
#--------------------------------------------------------------------------------
# Again with this ridiculous problem
#--------------------------------------------------------------------------------
self.get_datastore(dataset_id)
self.ingestion_management.persist_data_stream(stream_id=ctd_stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.arange(10)
rdt['temp'] = np.random.randn(10) * 10 + 30
rdt['conductivity'] = np.random.randn(10) * 2 + 10
publisher = StandaloneStreamPublisher(ctd_stream_id, route)
publisher.publish(rdt.to_granule())
rdt['time'] = np.arange(10,20)
publisher.publish(rdt.to_granule())
self.wait_until_we_have_enough_granules(dataset_id, 2)
granule = self.data_retriever.retrieve(dataset_id,
None,
None,
'ion.processes.data.transforms.ctd.ctd_L2_salinity',
'CTDL2SalinityTransformAlgorithm',
kwargs=dict(params=sal_stream_def_id))
rdt = RecordDictionaryTool.load_from_granule(granule)
for i in rdt['salinity']:
self.assertNotEquals(i,0)
开发者ID:tomoreilly,项目名称:coi-services,代码行数:44,代码来源:test_dm_end_2_end.py
示例18: write_to_data_product
def write_to_data_product(self,data_product_id):
dataset_ids, _ = self.resource_registry.find_objects(data_product_id, 'hasDataset', id_only=True)
dataset_id = dataset_ids.pop()
stream_ids , _ = self.resource_registry.find_objects(data_product_id, 'hasStream', id_only=True)
stream_id = stream_ids.pop()
stream_def_ids, _ = self.resource_registry.find_objects(stream_id, 'hasStreamDefinition', id_only=True)
stream_def_id = stream_def_ids.pop()
route = self.pubsub_management.read_stream_route(stream_id)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
time_param = rdt._pdict.temporal_parameter_name
if time_param is None:
print '%s has no temporal parameter' % self.resource_registry.read(data_product_id).name
return
rdt[time_param] = np.arange(40)
for field in rdt.fields:
if field == rdt._pdict.temporal_parameter_name:
continue
rdt[field] = self.fill_values(rdt._pdict.get_context(field).param_type,40)
publisher = StandaloneStreamPublisher(stream_id, route)
publisher.publish(rdt.to_granule())
self.wait_until_we_have_enough_granules(dataset_id,40)
granule = self.data_retriever.retrieve(dataset_id)
rdt_out = RecordDictionaryTool.load_from_granule(granule)
bad = []
for field in rdt.fields:
if not np.array_equal(rdt[field], rdt_out[field]):
print '%s' % field
print '%s != %s' % (rdt[field], rdt_out[field])
bad.append(field)
return bad
开发者ID:ednad,项目名称:coi-services,代码行数:44,代码来源:test_types.py
示例19: test_event_transform_worker
def test_event_transform_worker(self):
self.data_process_objs = []
self._output_stream_ids = []
self.event_verified = Event()
# test that a data process (type: data-product-in / event-out) can be defined and launched.
# verify that event fields are correctly populated
self.parameter_dict_id = self.dataset_management_client.read_parameter_dictionary_by_name(name='ctd_parsed_param_dict', id_only=True)
# create the StreamDefinition
self.stream_def_id = self.pubsub_client.create_stream_definition(name='stream_def', parameter_dictionary_id=self.parameter_dict_id)
self.addCleanup(self.pubsub_client.delete_stream_definition, self.stream_def_id)
# create the DataProduct
input_dp_obj = IonObject( RT.DataProduct, name='input_data_product', description='input test stream',
temporal_domain = self.time_dom.dump(), spatial_domain = self.spatial_dom.dump())
self.input_dp_id = self.dataproductclient.create_data_product(data_product=input_dp_obj, stream_definition_id=self.stream_def_id)
# retrieve the Stream for this data product
stream_ids, assoc_ids = self.rrclient.find_objects(self.input_dp_id, PRED.hasStream, RT.Stream, True)
self.stream_id = stream_ids[0]
# create the DPD and two DPs
self.event_data_process_id = self.create_event_data_processes()
# retrieve subscription from data process
subscription_objs, _ = self.rrclient.find_objects(subject=self.event_data_process_id, predicate=PRED.hasSubscription, object_type=RT.Subscription, id_only=False)
log.debug('test_event_transform_worker subscription_obj: %s', subscription_objs[0])
# create a queue to catch the published granules
self.subscription_id = self.pubsub_client.create_subscription(name='parsed_subscription', stream_ids=[self.stream_id], exchange_name=subscription_objs[0].exchange_name)
self.addCleanup(self.pubsub_client.delete_subscription, self.subscription_id)
self.pubsub_client.activate_subscription(self.subscription_id)
self.addCleanup(self.pubsub_client.deactivate_subscription, self.subscription_id)
stream_route = self.pubsub_client.read_stream_route(self.stream_id)
self.publisher = StandaloneStreamPublisher(stream_id=self.stream_id, stream_route=stream_route )
self.start_event_transform_listener()
self.data_modified = Event()
rdt = RecordDictionaryTool(stream_definition_id=self.stream_def_id)
rdt['time'] = [0] # time should always come first
rdt['conductivity'] = [1]
rdt['pressure'] = [2]
rdt['salinity'] = [8]
self.publisher.publish(rdt.to_granule())
self.assertTrue(self.event_verified.wait(self.wait_time))
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:55,代码来源:test_transform_worker.py
示例20: test_retrieve_and_transform
def test_retrieve_and_transform(self):
# Make a simple dataset and start ingestion, pretty standard stuff.
ctd_stream_id, route, stream_def_id, dataset_id = self.make_simple_dataset()
self.start_ingestion(ctd_stream_id, dataset_id)
# Stream definition for the salinity data
salinity_pdict_id = self.dataset_management.read_parameter_dictionary_by_name(
"ctd_parsed_param_dict", id_only=True
)
sal_stream_def_id = self.pubsub_management.create_stream_definition(
"sal data", parameter_dictionary_id=salinity_pdict_id
)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt["time"] = np.arange(10)
rdt["temp"] = np.random.randn(10) * 10 + 30
rdt["conductivity"] = np.random.randn(10) * 2 + 10
rdt["pressure"] = np.random.randn(10) * 1 + 12
publisher = StandaloneStreamPublisher(ctd_stream_id, route)
publisher.publish(rdt.to_granule())
rdt["time"] = np.arange(10, 20)
publisher.publish(rdt.to_granule())
self.wait_until_we_have_enough_granules(dataset_id, 20)
granule = self.data_retriever.retrieve(
dataset_id,
None,
None,
"ion.processes.data.transforms.ctd.ctd_L2_salinity",
"CTDL2SalinityTransformAlgorithm",
kwargs=dict(params=sal_stream_def_id),
)
rdt = RecordDictionaryTool.load_from_granule(granule)
for i in rdt["salinity"]:
self.assertNotEquals(i, 0)
self.streams.append(ctd_stream_id)
self.stop_ingestion(ctd_stream_id)
开发者ID:blazetopher,项目名称:coi-services,代码行数:41,代码来源:test_dm_end_2_end.py
注:本文中的pyon.ion.stream.StandaloneStreamPublisher类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论