本文整理汇总了Python中pyon.net.endpoint.Subscriber类的典型用法代码示例。如果您正苦于以下问题:Python Subscriber类的具体用法?Python Subscriber怎么用?Python Subscriber使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Subscriber类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, xp_name=None, event_type=None, origin=None, queue_name=None, callback=None,
sub_type=None, origin_type=None, pattern=None, auto_delete=None, *args, **kwargs):
"""
Initializer.
If the queue_name is specified here, the sysname is prefixed automatically to it. This is because
named queues are not namespaces to their exchanges, so two different systems on the same broker
can cross-pollute messages if a named queue is used.
Note: an EventSubscriber needs to be closed to free broker resources
"""
self._cbthread = None
# sets self._ev_recv_name, self.binding
BaseEventSubscriberMixin.__init__(self, xp_name=xp_name, event_type=event_type, origin=origin,
queue_name=queue_name, sub_type=sub_type, origin_type=origin_type,
pattern=pattern, auto_delete=auto_delete)
log.debug("EventPublisher events pattern %s", self.binding)
from_name = self._get_from_name()
binding = self._get_binding()
Subscriber.__init__(self, from_name=from_name, binding=binding, callback=callback,
auto_delete=self._auto_delete, **kwargs)
开发者ID:edwardhunter,项目名称:scioncc,代码行数:25,代码来源:event.py
示例2: get_realtime_visualization_data
def get_realtime_visualization_data(self, query_token=''):
"""This operation returns a block of visualization data for displaying data product in real time. This operation requires a
user specific token which was provided from a previous request to the init_realtime_visualization operation.
@param query_token str
@retval datatable str
@throws NotFound Throws if specified query_token or its visualization product does not exist
"""
log.debug( "get_realtime_visualization_data Vis worker: %s", self.id)
ret_val = []
if not query_token:
raise BadRequest("The query_token parameter is missing")
#Taking advantage of idempotency
xq = self.container.ex_manager.create_xn_queue(query_token)
subscriber = Subscriber(from_name=xq)
subscriber.initialize()
msgs = subscriber.get_all_msgs(timeout=2)
for x in range(len(msgs)):
msgs[x].ack()
# Different messages should get processed differently. Ret val will be decided by the viz product type
ret_val = self._process_visualization_message(msgs)
#TODO - replace as need be to return valid GDT data
#return {'viz_data': ret_val}
return ret_val
开发者ID:shenrie,项目名称:coi-services,代码行数:34,代码来源:visualization_service.py
示例3: test_create_endpoint
def test_create_endpoint(self):
def mycb(msg, headers):
return "test"
sub = Subscriber(node=self._node, from_name="testsub", callback=mycb)
e = sub.create_endpoint()
self.assertEquals(e._callback, mycb)
开发者ID:daf,项目名称:pyon,代码行数:8,代码来源:test_endpoint.py
示例4: __init__
def __init__(self, callback=None, pattern='#', *args, **kwargs):
"""
Note: a ConversationSubscriber needs to be closed to free broker resources
"""
self._cbthread = None
self.binding = pattern
log.debug("ConversationSubscriber pattern %s", self.binding)
Subscriber.__init__(self, binding=self.binding, callback=callback, **kwargs)
开发者ID:ateranishi,项目名称:pyon,代码行数:10,代码来源:conversation_log.py
示例5: get_realtime_visualization_data
def get_realtime_visualization_data(self, query_token='', callback='', tqx=""):
"""This operation returns a block of visualization data for displaying data product in real time. This operation requires a
user specific token which was provided from a previous request to the init_realtime_visualization operation.
@param query_token str
@retval datatable str
@throws NotFound Throws if specified query_token or its visualization product does not exist
"""
print " >>>>>>>>>>>>>>> QUERY TOKEN : ", query_token
print " >>>>>>>>>>>>>>> callback : ", callback
print ">>>>>>>>>>>>>>> TQX : ", tqx
reqId = 0
# If a reqId was passed in tqx, extract it
if tqx:
tqx_param_list = tqx.split(";")
for param in tqx_param_list:
key, value = param.split(":")
if key == 'reqId':
reqId = value
ret_val = []
if not query_token:
raise BadRequest("The query_token parameter is missing")
#try:
#Taking advantage of idempotency
xq = self.container.ex_manager.create_xn_queue(query_token)
subscriber = Subscriber(from_name=xq)
subscriber.initialize()
msgs = subscriber.get_all_msgs(timeout=2)
for x in range(len(msgs)):
msgs[x].ack()
# Different messages should get processed differently. Ret val will be decided by the viz product type
ret_val = self._process_visualization_message(msgs, callback, reqId)
#except Exception, e:
# raise e
#finally:
# subscriber.close()
#TODO - replace as need be to return valid GDT data
#return {'viz_data': ret_val}
return ret_val
开发者ID:blazetopher,项目名称:coi-services,代码行数:52,代码来源:visualization_service.py
示例6: test_subscribe
def test_subscribe(self):
#Test Subscriber.
#The goal of this test is to get messages routed to the callback mock.
cbmock = Mock()
sub = Subscriber(node=self._node, from_name="testsub", callback=cbmock)
# tell the subscriber to create this as the main listening channel
listen_channel_mock = self._setup_mock_channel(ch_type=SubscriberChannel, value="subbed", error_message="")
sub.node.channel.return_value = listen_channel_mock
# tell our channel to return itself when accepted
listen_channel_mock.accept.return_value = listen_channel_mock
# we're ready! call listen
sub.listen()
# make sure we got our message
cbmock.assert_called_once_with('subbed', {'conv-id': sentinel.conv_id, 'status_code':200, 'error_message':'', 'op': None})
开发者ID:daf,项目名称:pyon,代码行数:18,代码来源:test_endpoint.py
示例7: _create_channel
def _create_channel(self, **kwargs):
"""
Override to set the channel's queue_auto_delete property.
"""
ch = Subscriber._create_channel(self, **kwargs)
if self._auto_delete is not None:
ch.queue_auto_delete = self._auto_delete
return ch
开发者ID:caseybryant,项目名称:pyon,代码行数:9,代码来源:event.py
示例8: test_subscribe
def test_subscribe(self):
"""
Test Subscriber.
The goal of this test is to get messages routed to the callback mock.
"""
cbmock = Mock()
sub = Subscriber(node=self._node, from_name="testsub", callback=cbmock)
# tell the subscriber to create this as the main listening channel
listen_channel_mock = self._setup_mock_channel(ch_type=SubscriberChannel, value="subbed", error_message="")
sub.node.channel.return_value = listen_channel_mock
# tell our channel to return itself when accepted
listen_channel_mock.accept.return_value.__enter__.return_value = listen_channel_mock
# we're ready! call listen
sub.listen()
# make sure we got our message
cbmock.assert_called_once_with("subbed", {"status_code": 200, "error_message": "", "op": None})
开发者ID:ooici-dm,项目名称:pyon,代码行数:20,代码来源:test_endpoint.py
示例9: _build_header
def _build_header(self, raw_msg):
"""
Override to direct the calls in _build_header - first the Subscriber, then the Process mixin.
"""
header1 = Subscriber._build_header(self, raw_msg)
header2 = ProcessEndpointUnitMixin._build_header(self, raw_msg)
header1.update(header2)
return header1
开发者ID:swarbhanu,项目名称:pyon,代码行数:11,代码来源:endpoint.py
示例10: on_start
def on_start(self):
TransformDataProcess.on_start(self)
# set up subscriber to *
self._bt_sub = Subscriber(callback=lambda m, h: self.call_process(m),
from_name=NameTrio(get_sys_name(), 'bench_queue', '*'))
# spawn listener
self._sub_gl = spawn(self._bt_sub.listen)
# set up publisher to anything!
self._bt_pub = Publisher(to_name=NameTrio(get_sys_name(), str(uuid.uuid4())[0:6]))
开发者ID:swarbhanu,项目名称:pyon,代码行数:12,代码来源:transform.py
示例11: __init__
def __init__(self, xp_name=None, event_name=None, origin=None, queue_name=None, callback=None, *args, **kwargs):
"""
Initializer.
If the queue_name is specified here, the sysname is prefixed automatically to it. This is becuase
named queues are not namespaces to their exchanges, so two different systems on the same broker
can cross-pollute messages if a named queue is used.
"""
self._event_name = event_name or self.event_name
xp_name = xp_name or get_events_exchange_point()
binding = self._topic(origin)
# prefix the queue_name, if specified, with the sysname
# this is because queue names transcend xp boundaries (see R1 OOIION-477)
if queue_name is not None:
if not queue_name.startswith(bootstrap.sys_name):
queue_name = "%s.%s" % (bootstrap.sys_name, queue_name)
log.warn("queue_name specified, prepending sys_name to it: %s" % queue_name)
name = (xp_name, queue_name)
Subscriber.__init__(self, name=name, binding=binding, callback=callback, **kwargs)
开发者ID:blazetopher,项目名称:pyon,代码行数:23,代码来源:event.py
示例12: __init__
def __init__(self, xp_name=None, event_type=None, origin=None, queue_name=None, callback=None,
sub_type=None, origin_type=None, *args, **kwargs):
"""
Initializer.
If the queue_name is specified here, the sysname is prefixed automatically to it. This is because
named queues are not namespaces to their exchanges, so two different systems on the same broker
can cross-pollute messages if a named queue is used.
Note: an EventSubscriber needs to be closed to free broker resources
"""
self.callback = callback
self._cbthread = None
self.event_type = event_type
self.sub_type = sub_type
self.origin_type = origin_type
self.origin = origin
xp_name = xp_name or get_events_exchange_point()
binding = self._topic(event_type, origin, sub_type, origin_type)
self.binding = binding
# TODO: Provide a case where we can have multiple bindings (e.g. different event_types)
# prefix the queue_name, if specified, with the sysname
# this is because queue names transcend xp boundaries (see R1 OOIION-477)
if queue_name is not None:
if not queue_name.startswith(bootstrap.get_sys_name()):
queue_name = "%s.%s" % (bootstrap.get_sys_name(), queue_name)
log.warn("queue_name specified, prepending sys_name to it: %s" % queue_name)
name = (xp_name, queue_name)
log.debug("EventPublisher events pattern %s", binding)
Subscriber.__init__(self, from_name=name, binding=binding, callback=self._callback, **kwargs)
开发者ID:oldpatricka,项目名称:pyon,代码行数:37,代码来源:event.py
示例13: get_realtime_visualization_data
def get_realtime_visualization_data(self, query_token=''):
"""This operation returns a block of visualization data for displaying data product in real time. This operation requires a
user specific token which was provided from a previous request to the init_realtime_visualization operation.
@param query_token str
@retval datatable str
@throws NotFound Throws if specified query_token or its visualization product does not exist
"""
log.debug( "get_realtime_visualization_data Vis worker: %s", self.id)
ret_val = []
if not query_token:
raise BadRequest("The query_token parameter is missing")
try:
#Taking advantage of idempotency
queue_name = '-'.join([USER_VISUALIZATION_QUEUE, query_token])
xq = self.container.ex_manager.create_xn_queue(queue_name)
subscriber = Subscriber(from_name=xq)
subscriber.initialize()
except:
# Close the subscriber if it exists
if subscriber:
subscriber.close()
raise BadRequest("Could not subscribe to the real-time queue")
msgs = subscriber.get_all_msgs(timeout=2)
for x in range(len(msgs)):
msgs[x].ack()
subscriber.close()
# Different messages should get processed differently. Ret val will be decided by the viz product type
ret_val = self._process_visualization_message(msgs)
return ret_val
开发者ID:ednad,项目名称:coi-services,代码行数:39,代码来源:visualization_service.py
示例14: get_realtime_visualization_data
def get_realtime_visualization_data(self, query_token=''):
"""This operation returns a block of visualization data for displaying data product in real time. This operation requires a
user specific token which was provided from a previsou request to the init_realtime_visualization operation.
@param query_token str
@retval datatable str
@throws NotFound Throws if specified query_token or its visualization product does not exist
"""
if not query_token:
raise BadRequest("The query_token parameter is missing")
try:
#Taking advantage of idempotency
xq = self.container.ex_manager.create_xn_queue(query_token)
subscriber = Subscriber(from_name=xq)
subscriber.initialize()
msg_count,_ = subscriber.get_stats()
log.info('Messages in user queue 1: %s ' % msg_count)
ret_val = []
msgs = subscriber.get_all_msgs(timeout=2)
for x in range(len(msgs)):
msgs[x].ack()
# Different messages should get processed differently. Ret val will be decided by the viz product type
ret_val = self._process_visualization_message(msgs)
msg_count,_ = subscriber.get_stats()
log.info('Messages in user queue 2: %s ' % msg_count)
except Exception, e:
raise e
开发者ID:pombredanne,项目名称:coi-services,代码行数:36,代码来源:visualization_service.py
示例15: TransformBenchTesting
class TransformBenchTesting(TransformDataProcess):
"""
Easiest way to run:
from pyon.util.containers import DotDict
tbt=cc.proc_manager._create_service_instance('55', 'tbt', 'pyon.ion.transform', 'TransformBenchTesting', DotDict({'process':{'name':'tbt', 'transform_id':'55'}}))
tbt.init()
tbt.start()
"""
transform_number = 0
message_length = 0
def __init__(self):
super(TransformBenchTesting,self).__init__()
self.count = 0
TransformBenchTesting.transform_number += 1
def perf(self):
with open('/tmp/pyon_performance.dat','a') as f:
then = time.time()
ocount = self.count
while True:
gevent.sleep(2.)
now = time.time()
count = self.count
delta_t = now - then
delta_c = count - ocount
f.write('%s|%s\t%s\t%s\t%3.3f\n' % (get_sys_name(),time.strftime("%H:%M:%S", time.gmtime()),TransformBenchTesting.message_length,TransformBenchTesting.transform_number, float(delta_c) / delta_t))
then = now
ocount = count
f.flush()
@staticmethod
def launch_benchmark(transform_number=1, primer=1,message_length=4):
import gevent
from gevent.greenlet import Greenlet
from pyon.util.containers import DotDict
from pyon.net.transport import NameTrio
from pyon.net.endpoint import Publisher
import uuid
num = transform_number
msg_len = message_length
transforms = list()
pids = 1
TransformBenchTesting.message_length = message_length
cc = Container.instance
pub = Publisher(to_name=NameTrio(get_sys_name(),str(uuid.uuid4())[0:6]))
for i in xrange(num):
tbt=cc.proc_manager._create_service_instance(str(pids), 'tbt', 'prototype.transforms.linear', 'TransformInPlace', DotDict({'process':{'name':'tbt%d' % pids, 'transform_id':pids}}))
tbt.init()
tbt.start()
gevent.sleep(0.2)
for i in xrange(primer):
pub.publish(list(xrange(msg_len)))
g = Greenlet(tbt.perf)
g.start()
transforms.append(tbt)
pids += 1
def on_start(self):
TransformDataProcess.on_start(self)
# set up subscriber to *
self._bt_sub = Subscriber(callback=lambda m, h: self.call_process(m),
from_name=NameTrio(get_sys_name(), 'bench_queue', '*'))
# spawn listener
self._sub_gl = spawn(self._bt_sub.listen)
# set up publisher to anything!
self._bt_pub = Publisher(to_name=NameTrio(get_sys_name(), str(uuid.uuid4())[0:6]))
def publish(self, msg):
self._bt_pub.publish(msg)
self.count+=1
def _stop_listener(self):
self._bt_sub.close()
self._sub_gl.join(timeout=2)
self._sub_gl.kill()
def on_stop(self):
TransformDataProcess.on_stop(self)
self._stop_listener()
def on_quit(self):
TransformDataProcess.on_quit(self)
self._stop_listener()
开发者ID:swarbhanu,项目名称:pyon,代码行数:92,代码来源:transform.py
示例16: create_endpoint
def create_endpoint(self, **kwargs):
newkwargs = kwargs.copy()
newkwargs['process'] = self._process
newkwargs['routing_call'] = self._routing_call
return Subscriber.create_endpoint(self, **newkwargs)
开发者ID:j2project,项目名称:pyon,代码行数:5,代码来源:endpoint.py
示例17: __init__
def __init__(self, process=None, routing_call=None, **kwargs):
assert process
self._process = process
self._routing_call = routing_call
Subscriber.__init__(self, **kwargs)
开发者ID:j2project,项目名称:pyon,代码行数:5,代码来源:endpoint.py
示例18: test_consume_one_message_at_a_time
def test_consume_one_message_at_a_time(self):
# see also pyon.net.test.test_channel:TestChannelInt.test_consume_one_message_at_a_time
pub3 = Publisher(to_name=(self.container.ex_manager.default_xs.exchange, 'routed.3'))
pub5 = Publisher(to_name=(self.container.ex_manager.default_xs.exchange, 'routed.5'))
#
# SETUP COMPLETE, BEGIN TESTING OF EXCHANGE OBJECTS
#
xq = self.container.ex_manager.create_xn_queue('random_queue')
self.addCleanup(xq.delete)
# recv'd messages from the subscriber
self.recv_queue = Queue()
def cb(m, h):
raise StandardError("Subscriber callback never gets called back!")
sub = Subscriber(from_name=xq, callback=cb)
sub.initialize()
# publish 10 messages - we're not bound yet, so they'll just dissapear
for x in xrange(10):
pub3.publish("3,%s" % str(x))
# allow time for routing
time.sleep(2)
# no messages yet
self.assertRaises(Timeout, sub.get_one_msg, timeout=0)
# now, we'll bind the xq
xq.bind('routed.3')
# even tho we are consuming, there are no messages - the previously published ones all dissapeared
self.assertRaises(Timeout, sub.get_one_msg, timeout=0)
# publish those messages again
for x in xrange(10):
pub3.publish("3,%s" % str(x))
# allow time for routing
time.sleep(2)
# NOW we have messages!
for x in xrange(10):
mo = sub.get_one_msg(timeout=10)
self.assertEquals(mo.body, "3,%s" % str(x))
mo.ack()
# we've cleared it all
self.assertRaises(Timeout, sub.get_one_msg, timeout=0)
# bind a wildcard and publish on both
xq.bind('routed.*')
for x in xrange(10):
time.sleep(0.3)
pub3.publish("3,%s" % str(x))
time.sleep(0.3)
pub5.publish("5,%s" % str(x))
# allow time for routing
time.sleep(2)
# should get all 20, interleaved
for x in xrange(10):
mo = sub.get_one_msg(timeout=1)
self.assertEquals(mo.body, "3,%s" % str(x))
mo.ack()
mo = sub.get_one_msg(timeout=1)
self.assertEquals(mo.body, "5,%s" % str(x))
mo.ack()
# add 5 binding, remove all other bindings
xq.bind('routed.5')
xq.unbind('routed.3')
xq.unbind('routed.*')
# try publishing to 3, shouldn't arrive anymore
pub3.publish("3")
self.assertRaises(Timeout, sub.get_one_msg, timeout=0)
# let's turn off the consumer and let things build up a bit
sub._chan.stop_consume()
for x in xrange(10):
pub5.publish("5,%s" % str(x))
# allow time for routing
time.sleep(2)
# 10 messages in the queue, no consumers
self.assertTupleEqual((10, 0), sub._chan.get_stats())
# drain queue
sub._chan.start_consume()
#.........这里部分代码省略.........
开发者ID:j2project,项目名称:pyon,代码行数:101,代码来源:test_exchange.py
示例19: test_multiple_visualization_queue
def test_multiple_visualization_queue(self):
# set up a workflow with the salinity transform and the doubler. We will direct the original stream and the doubled stream to queues
# and test to make sure the subscription to the queues is working correctly
assertions = self.assertTrue
# Build the workflow definition
workflow_def_obj = IonObject(RT.WorkflowDefinition, name='Viz_Test_Workflow',description='A workflow to test collection of multiple data products in queues')
workflow_data_product_name = 'TEST-Workflow_Output_Product' #Set a specific output product name
#-------------------------------------------------------------------------------------------------------------------------
#Add a transformation process definition for salinity
#-------------------------------------------------------------------------------------------------------------------------
ctd_L2_salinity_dprocdef_id = self.create_salinity_data_process_definition()
workflow_step_obj = IonObject('DataProcessWorkflowStep', data_process_definition_id=ctd_L2_salinity_dprocdef_id, persist_process_output_data=False) #Don't persist the intermediate data product
configuration = {'stream_name' : 'salinity'}
workflow_step_obj.configuration = configuration
workflow_def_obj.workflow_steps.append(workflow_step_obj)
#Create it in the resource registry
workflow_def_id = self.workflowclient.create_workflow_definition(workflow_def_obj)
aids = self.rrclient.find_associations(workflow_def_id, PRED.hasDataProcessDefinition)
assertions(len(aids) == 1 )
#The list of data product streams to monitor
data_product_stream_ids = list()
#Create the input data product
ctd_stream_id, ctd_parsed_data_product_id = self.create_ctd_input_stream_and_data_product()
data_product_stream_ids.append(ctd_stream_id)
#Create and start the workflow
workflow_id, workflow_product_id = self.workflowclient.create_data_process_workflow(workflow_def_id, ctd_parsed_data_product_id, timeout=30)
workflow_output_ids,_ = self.rrclient.find_subjects(RT.Workflow, PRED.hasOutputProduct, workflow_product_id, True)
assertions(len(workflow_output_ids) == 1 )
#Walk the associations to find the appropriate output data streams to validate the messages
workflow_dp_ids,_ = self.rrclient.find_objects(workflow_id, PRED.hasDataProduct, RT.DataProduct, True)
assertions(len(workflow_dp_ids) == 1 )
for dp_id in workflow_dp_ids:
stream_ids, _ = self.rrclient.find_objects(dp_id, PRED.hasStream, None, True)
assertions(len(stream_ids) == 1 )
data_product_stream_ids.append(stream_ids[0])
# Now for each of the data_product_stream_ids create a queue and pipe their data to the queue
user_queue_name1 = USER_VISUALIZATION_QUEUE + '1'
user_queue_name2 = USER_VISUALIZATION_QUEUE + '2'
# use idempotency to create queues
xq1 = self.container.ex_manager.create_xn_queue(user_queue_name1)
self.addCleanup(xq1.delete)
xq2 = self.container.ex_manager.create_xn_queue(user_queue_name2)
self.addCleanup(xq2.delete)
xq1.purge()
xq2.purge()
# the create_subscription call takes a list of stream_ids so create temp ones
dp_stream_id1 = list()
dp_stream_id1.append(data_product_stream_ids[0])
dp_stream_id2 = list()
dp_stream_id2.append(data_product_stream_ids[1])
salinity_subscription_id1 = self.pubsubclient.create_subscription( stream_ids=dp_stream_id1,
exchange_name = user_queue_name1, name = "user visualization queue1")
salinity_subscription_id2 = self.pubsubclient.create_subscription( stream_ids=dp_stream_id2,
exchange_name = user_queue_name2, name = "user visualization queue2")
# Create subscribers for the output of the queue
subscriber1 = Subscriber(from_name=xq1)
subscriber1.initialize()
subscriber2 = Subscriber(from_name=xq2)
subscriber2.initialize()
# after the queue has been created it is safe to activate the subscription
self.pubsubclient.activate_subscription(subscription_id=salinity_subscription_id1)
self.pubsubclient.activate_subscription(subscription_id=salinity_subscription_id2)
# Start input stream and wait for some time
ctd_sim_pid = self.start_simple_input_stream_process(ctd_stream_id)
gevent.sleep(5.0) # Send some messages - don't care how many
msg_count,_ = xq1.get_stats()
log.info('Messages in user queue 1: %s ' % msg_count)
msg_count,_ = xq2.get_stats()
log.info('Messages in user queue 2: %s ' % msg_count)
msgs1 = subscriber1.get_all_msgs(timeout=2)
msgs2 = subscriber2.get_all_msgs(timeout=2)
for x in range(min(len(msgs1), len(msgs2))):
msgs1[x].ack()
msgs2[x].ack()
#.........这里部分代码省略.........
开发者ID:Bobfrat,项目名称:coi-services,代码行数:101,代码来源:test_visualization_service.py
示例20: test_visualization_queue
def test_visualization_queue(self):
#The list of data product streams to monitor
data_product_stream_ids = list()
#Create the input data product
ctd_stream_id, ctd_parsed_data_product_id = self.create_ctd_input_stream_and_data_product()
data_product_stream_ids.append(ctd_stream_id)
user_queue_name = USER_VISUALIZATION_QUEUE
xq = self.container.ex_manager.create_xn_queue(user_queue_name)
salinity_subscription_id = self.pubsubclient.create_subscription(
stream_ids=data_product_stream_ids,
exchange_name = user_queue_name,
name = "user visualization queue"
)
subscriber = Subscriber(from_name=xq)
subscriber.initialize()
# after the queue has been created it is safe to activate the subscription
self.pubsubclient.activate_subscription(subscription_id=salinity_subscription_id)
#Start the output stream listener to monitor and collect messages
#results = self.start_output_stream_and_listen(None, data_product_stream_ids)
#Not sure why this is needed - but it is
#subscriber._chan.stop_consume()
ctd_sim_pid = self.start_simple_input_stream_process(ctd_stream_id)
gevent.sleep(10.0) # Send some messages - don't care how many
msg_count,_ = xq.get_stats()
log.info('Messages in user queue 1: %s ' % msg_count)
#Validate the data from each of the messages along the way
#self.validate_messages(results)
# for x in range(msg_count):
# mo = subscriber.get_one_msg(timeout=1)
# print mo.body
# mo.ack()
msgs = subscriber.get_all_msgs(timeout=2)
for x in range(len(msgs)):
msgs[x].ack()
self.validate_messages(msgs[x])
# print msgs[x].body
#Should be zero after pulling all of the messages.
msg_count,_ = xq.get_stats()
log.info('Messages in user queue 2: %s ' % msg_count)
#Trying to continue to receive messages in the queue
gevent.sleep(5.0) # Send some messages - don't care how many
#Turning off after everything - since it is more representative of an always on stream of data!
self.process_dispatcher.cancel_process(ctd_sim_pid) # kill the ctd simulator process - that is enough data
#Should see more messages in the queue
msg_count,_ = xq.get_stats()
log.info('Messages in user queue 3: %s ' % msg_count)
msgs = subscriber.get_all_msgs(timeout=2)
for x in range(len(msgs)):
msgs[x].ack()
self.validate_messages(msgs[x])
#Should be zero after pulling all of the messages.
msg_count,_ = xq.get_stats()
log.info('Messages in user queue 4: %s ' % msg_count)
subscriber.close()
self.container.ex_manager.delete_xn(xq)
开发者ID:ednad,项目名称:coi-services,代码行数:82,代码来源:test_visualization_service.py
注:本文中的pyon.net.endpoint.Subscriber类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论