本文整理汇总了Python中qpid_dispatch.management.client.Node类的典型用法代码示例。如果您正苦于以下问题:Python Node类的具体用法?Python Node怎么用?Python Node使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Node类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: on_link_opened
def on_link_opened(self, event):
if event.receiver == self.receiver1:
local_node = Node.connect(self.first_host, timeout=TIMEOUT)
out = local_node.query(type='org.apache.qpid.dispatch.router.link')
link_type_index = out.attribute_names.index('linkType')
link_dir_index = out.attribute_names.index('linkDir')
owning_addr_index = out.attribute_names.index('owningAddr')
link_name_index = out.attribute_names.index('linkName')
for result in out.results:
if result[link_type_index] == "endpoint" and result[link_dir_index] == "out" and result[link_name_index] == 'AAA' and result[owning_addr_index] == 'M10.0.0.0/queue.ext':
self.receiver1_phase = True
elif event.receiver == self.receiver2:
local_node = Node.connect(self.second_host, timeout=TIMEOUT)
out = local_node.query(type='org.apache.qpid.dispatch.router.link')
link_type_index = out.attribute_names.index('linkType')
link_dir_index = out.attribute_names.index('linkDir')
owning_addr_index = out.attribute_names.index('owningAddr')
link_name_index = out.attribute_names.index('linkName')
for result in out.results:
if result[link_type_index] == "endpoint" and result[link_dir_index] == "out" and result[link_name_index] == 'BBB' and result[owning_addr_index] == 'M10.0.0.0/queue.ext':
self.receiver2_phase = True
if self.receiver1_phase and self.receiver2_phase:
self.first_conn.close()
self.second_conn.close()
self.timer.cancel()
开发者ID:apache,项目名称:qpid-dispatch,代码行数:28,代码来源:system_tests_autolinks.py
示例2: test_remote_node
def test_remote_node(self):
"""Test that we can access management info of remote nodes using get_mgmt_nodes addresses"""
nodes = [self.cleanup(Node.connect(Url(r.addresses[0]))) for r in self.routers]
remotes = sum([n.get_mgmt_nodes() for n in nodes], [])
self.assertEqual([u'amqp:/_topo/0/router2/$management', u'amqp:/_topo/0/router1/$management'], remotes)
# Query router2 indirectly via router1
remote_url = Url(self.routers[0].addresses[0], path=Url(remotes[0]).path)
remote = self.cleanup(Node.connect(remote_url))
self.assertEqual(["router2"], [r.id for r in remote.query(type=ROUTER).get_entities()])
开发者ID:ChugR,项目名称:qpid-dispatch,代码行数:9,代码来源:system_tests_management.py
示例3: test_two_router_ingress_egress_counts
def test_two_router_ingress_egress_counts(self):
address1 = self.routers[0].addresses[0]
address2 = self.routers[1].addresses[0]
# Gather the values for deliveries_ingress and deliveries_egress before running the test.
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
results = outs.results[0]
pre_deliveries_ingresss = results[deliveries_ingress_index]
local_node = Node.connect(address2, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_accepted_index = outs.attribute_names.index('acceptedDeliveries')
results = outs.results[0]
pre_deliveries_egress = results[deliveries_egress_index]
pre_deliveries_accepted = results[deliveries_accepted_index]
# Now run the test.
test = IngressEgressTwoRouterTest(address1, address2)
test.run()
# Gather the values for deliveries_ingress and deliveries_egress after running the test.
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
results = outs.results[0]
post_deliveries_ingresss = results[deliveries_ingress_index]
local_node = Node.connect(address2, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_accepted_index = outs.attribute_names.index('acceptedDeliveries')
results = outs.results[0]
post_deliveries_egress = results[deliveries_egress_index]
post_deliveries_accepted = results[deliveries_accepted_index]
accepted_deliveries_diff = post_deliveries_accepted - pre_deliveries_accepted
self.assertEqual(post_deliveries_ingresss - pre_deliveries_ingresss, 11)
self.assertEqual(post_deliveries_egress - pre_deliveries_egress, 11)
# The management requests are counted in the acceptedDeliveries, so it is difficult to measure the
# exact number of accepted deliveries at this point in time. But it must at least be 10 since
# we know for sure from the test that the 10 dispositions related to the 10 sent messages
# were definitely received
self.assertTrue(accepted_deliveries_diff >= 10)
开发者ID:kgiusti,项目名称:dispatch,代码行数:53,代码来源:system_tests_global_delivery_counts.py
示例4: test_zzz_delete_create_ssl_profile
def test_zzz_delete_create_ssl_profile(self):
"""
Deletes a connector and its corresponding ssl profile and recreates both
"""
ssl_profile_name = 'client-ssl-profile'
# Deleting the connector first and then its SSL profile must work.
delete_command = 'DELETE --type=connector --name=connectorToX'
self.run_qdmanage(delete_command, address=self.routers[1].addresses[0])
# Delete the connector's associated ssl profile
delete_command = 'DELETE --type=sslProfile --name=' + ssl_profile_name
self.run_qdmanage(delete_command, address=self.routers[1].addresses[0])
local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT)
results = local_node.query(type='org.apache.qpid.dispatch.connection').results
search = "QDR.X"
found = False
for N in range(0, 3):
if results[N][0] == search:
found = True
break
self.assertFalse(found)
# re-create the ssl profile
long_type = 'org.apache.qpid.dispatch.sslProfile'
ssl_create_command = 'CREATE --type=' + long_type + ' certFile=' + self.ssl_file('client-certificate.pem') + \
' keyFile=' + self.ssl_file('client-private-key.pem') + ' password=client-password' + \
' name=' + ssl_profile_name + ' certDb=' + self.ssl_file('ca-certificate.pem')
output = json.loads(self.run_qdmanage(ssl_create_command, address=self.routers[1].addresses[0]))
name = output['name']
self.assertEqual(name, ssl_profile_name)
# Re-add connector
connector_create_command = 'CREATE --type=connector name=connectorToX host=127.0.0.1 port=' + \
str(RouterTestVerifyHostNameNo.x_listener_port) + \
' saslMechanisms=PLAIN sslProfile=' + ssl_profile_name + \
' role=inter-router verifyHostName=no [email protected]' \
' saslPassword=password'
json.loads(self.run_qdmanage(connector_create_command, address=self.routers[1].addresses[0]))
sleep(1)
local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT)
results = local_node.query(type='org.apache.qpid.dispatch.connection').results
self.common_asserts(results)
开发者ID:ChugR,项目名称:qpid-dispatch,代码行数:52,代码来源:system_tests_sasl_plain.py
示例5: test_remote_node
def test_remote_node(self):
"""Test that we can access management info of remote nodes using get_mgmt_nodes addresses"""
nodes = [self.cleanup(Node.connect(Url(r.addresses[0]))) for r in self.routers]
remotes = sum([n.get_mgmt_nodes() for n in nodes], [])
self.assertEqual(set([u'amqp:/_topo/0/router%s/$management' % i for i in [0, 1, 2]]),
set(remotes))
self.assertEqual(9, len(remotes))
# Query router2 indirectly via router1
remote_url = Url(self.routers[0].addresses[0], path=Url(remotes[0]).path)
remote = self.cleanup(Node.connect(remote_url))
router_id = remotes[0].split("/")[3]
assert router_id in ['router0', 'router1', 'router2']
self.assertEqual([router_id], [r.id for r in remote.query(type=ROUTER).get_entities()])
开发者ID:lulf,项目名称:qpid-dispatch,代码行数:13,代码来源:system_tests_management.py
示例6: on_connection_remote_open
def on_connection_remote_open(self, event):
if event.connection == self.receiver_connection:
continue_loop = True
# The following loops introduces a wait. It gives time to the
# router so that the address Dpulp.task can show up on the remoteCount
i = 0
while continue_loop:
if i > 100: # If we have run the read command for more than hundred times and we still do not have
# the remoteCount set to 1, there is a problem, just exit out of the function instead
# of looping to infinity.
self.receiver_connection.close()
return
local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount
if out == 1:
continue_loop = False
i += 1
sleep(0.25)
self.sender_connection = event.container.connect(self.sender_address)
# Notice here that the receiver and sender are listening on different addresses. Receiver on
# pulp.task.terminusTestReceiver and the sender on pulp.task.terminusTestSender
self.receiver = event.container.create_receiver(self.receiver_connection, "pulp.task.terminusTestReceiver")
self.sender = event.container.create_sender(self.sender_connection, "pulp.task.terminusTestSender", options=AtMostOnce())
开发者ID:scholzj,项目名称:qpid-dispatch,代码行数:25,代码来源:system_tests_link_routes.py
示例7: test_inter_router_plain_over_ssl_exists
def test_inter_router_plain_over_ssl_exists(self):
"""The setUpClass sets up two routers with SASL PLAIN enabled over TLS/SSLv3.
This test makes executes a query for type='org.apache.qpid.dispatch.connection' over
an unauthenticated listener to
QDR.X and makes sure that the output has an "inter-router" connection to
QDR.Y whose authentication is PLAIN. This ensures that QDR.Y did not
somehow use SASL ANONYMOUS to connect to QDR.X
Also makes sure that TLSv1/SSLv3 was used as sslProto
"""
if not SASL.extended():
self.skipTest("Cyrus library not available. skipping test")
local_node = Node.connect(self.routers[0].addresses[1], timeout=TIMEOUT)
results = local_node.query(type='org.apache.qpid.dispatch.connection').results
# sslProto should be TLSv1/SSLv3
self.assertEqual(u'TLSv1/SSLv3', results[0][10])
# role should be inter-router
self.assertEqual(u'inter-router', results[0][3])
# sasl must be plain
self.assertEqual(u'PLAIN', results[0][6])
# user must be [email protected]
self.assertEqual(u'[email protected]', results[0][8])
开发者ID:scholzj,项目名称:qpid-dispatch,代码行数:28,代码来源:system_tests_sasl_plain.py
示例8: test_000_wait_for_link_route_up
def test_000_wait_for_link_route_up(self):
# wait up to 60 seconds for link route to get set up
# The name of this test must dictate that it runs first
wLoops = 600
wTimeS = 0.1
waitTimeS = float(wLoops) * wTimeS
local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT)
counted = False
for i in range(wLoops):
try:
results = local_node.query(type='org.apache.qpid.dispatch.router.address',
attribute_names=[u'name', u'containerCount']
).results
for res in results:
if res[0] == 'Corg.apache':
if res[1] == 1:
counted = True
break
if counted:
break
sleep(wTimeS)
except Exception as e:
self.fail("Exception: " + str(e))
if not counted:
self.fail("Interrouter link route failed to connect after %f seconds" % waitTimeS)
开发者ID:apache,项目名称:qpid-dispatch,代码行数:25,代码来源:system_tests_disallow_link_resumable_link_route.py
示例9: test_inter_router_plain_over_ssl_exists
def test_inter_router_plain_over_ssl_exists(self):
"""
Tests to make sure that an inter-router connection exists between the routers since verifyHostName is 'no'.
"""
local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT)
results = local_node.query(type='org.apache.qpid.dispatch.connection').results
self.common_asserts(results)
开发者ID:ChugR,项目名称:qpid-dispatch,代码行数:9,代码来源:system_tests_sasl_plain.py
示例10: test_deprecated
def test_deprecated(self):
"""
Tests deprecated attributes like linkRoutePattern, container, fixedAddress etc.
This test makes executes a query for type='org.apache.qpid.dispatch.connection' over
an unauthenticated listener to
QDR.X and makes sure that the output has an "inter-router" connection to
QDR.Y whose authentication is PLAIN. This ensures that QDR.Y did not
somehow use SASL ANONYMOUS to connect to QDR.X
Also makes sure that TLSv1/SSLv3 was used as sslProto
"""
if not SASL.extended():
self.skipTest("Cyrus library not available. skipping test")
local_node = Node.connect(self.routers[0].addresses[1], timeout=TIMEOUT)
# saslConfigName and saslConfigPath were set in the ContainerEntity. This tests makes sure that the
# saslConfigName and saslConfigPath were loaded properly from the ContainerEntity.
# ContainerEntity has been deprecated.
# role should be inter-router
self.assertEqual(u'inter-router', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][3])
# sasl must be plain
self.assertEqual(u'PLAIN', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][6])
# user must be [email protected]
self.assertEqual(u'[email protected]', local_node.query(type='org.apache.qpid.dispatch.connection').results[0][8])
# Make sure that the deprecated linkRoutePattern is set up correctly
query_response = local_node.query(type='org.apache.qpid.dispatch.router.config.linkRoute')
self.assertEqual(2, len(query_response.results))
self.assertEqual("in", query_response.results[0][7])
self.assertEqual("out", query_response.results[1][7])
results = local_node.query(type='org.apache.qpid.dispatch.router.config.address').results
multicast_found = False
spread_found = False
closest_found = False
for result in results:
if result[3] == 'closest':
closest_found = True
self.assertEqual(result[4], 'closest')
if result[3] == 'spread':
spread_found = True
self.assertEqual(result[4], 'balanced')
if result[3] == 'multicast':
multicast_found = True
self.assertEqual(result[4], 'multicast')
self.assertTrue(multicast_found)
self.assertTrue(spread_found)
self.assertTrue(closest_found)
开发者ID:scholzj,项目名称:qpid-dispatch,代码行数:56,代码来源:system_tests_deprecated.py
示例11: on_link_opened
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.receiver_link_opened = True
local_node = Node.connect(self.query_address_listening, timeout=TIMEOUT)
out = local_node.query(type='org.apache.qpid.dispatch.router.link')
link_dir_index = out.attribute_names.index("linkDir")
owning_addr_index = out.attribute_names.index("owningAddr")
# Make sure that the owningAddr M0pulp.task.terminusTestReceiver shows up on both in and out.
# The 'out' link is on address M0pulp.task.terminusTestReceiver outgoing from the router B to the receiver
# The 'in' link is on address M0pulp.task.terminusTestReceiver incoming from router C to router B
for result in out.results:
if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
self.in_receiver_found = True
if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
self.out_receiver_found = True
if event.sender == self.sender:
self.sender_link_opened = True
local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
out = local_node.query(type='org.apache.qpid.dispatch.router.link')
link_dir_index = out.attribute_names.index("linkDir")
owning_addr_index = out.attribute_names.index("owningAddr")
# Make sure that the owningAddr M0pulp.task.terminusTestSender shows up on both in and out.
# The 'in' link is on address M0pulp.task.terminusTestSender incoming from sender to router
# The 'out' link is on address M0pulp.task.terminusTestSender outgoing from router C to router B
for result in out.results:
if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
self.in_sender_found = True
if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
self.out_sender_found = True
# Shutdown the connections only if the on_link_opened has been called for sender and receiver links.
if self.sender_link_opened and self.receiver_link_opened:
self.sender.close()
self.receiver.close()
self.sender_connection.close()
self.receiver_connection.close()
开发者ID:scholzj,项目名称:qpid-dispatch,代码行数:43,代码来源:system_tests_link_routes.py
示例12: test_entity_names
def test_entity_names(self):
nodes = [self.cleanup(Node.connect(Url(r.addresses[0]))) for r in self.routers]
# Test that all entities have a consitent identity format: type/name
entities = list(chain(
*[n.query(attribute_names=['type', 'identity', 'name']).iter_entities() for n in nodes]))
for e in entities:
if e.type == MANAGEMENT:
self.assertEqual(e.identity, "self")
else:
self.assertRegexpMatches(e.identity, "^%s/" % short_name(e.type), e)
开发者ID:ajssmith,项目名称:qpid-dispatch,代码行数:10,代码来源:system_tests_management.py
示例13: test_deprecated
def test_deprecated(self):
"""
Tests deprecated attributes like linkRoutePattern, container, fixedAddress etc.
This test makes executes a query for type='org.apache.qpid.dispatch.connection' over
an unauthenticated listener to
QDR.X and makes sure that the output has an "inter-router" connection to
QDR.Y whose authentication is PLAIN. This ensures that QDR.Y did not
somehow use SASL ANONYMOUS to connect to QDR.X
Also makes sure that TLSv1/SSLv3 was used as sslProto
"""
local_node = Node.connect(self.routers[0].addresses[1], timeout=TIMEOUT)
# saslConfigName and saslConfigPath were set in the ContainerEntity. This tests makes sure that the
# saslConfigName and saslConfigPath were loaded properly from the ContainerEntity.
# ContainerEntity has been deprecated.
# role should be inter-router
self.assertEqual(u"inter-router", local_node.query(type="org.apache.qpid.dispatch.connection").results[0][9])
# sasl must be plain
self.assertEqual(u"PLAIN", local_node.query(type="org.apache.qpid.dispatch.connection").results[0][12])
# user must be [email protected]
self.assertEqual(
u"[email protected]", local_node.query(type="org.apache.qpid.dispatch.connection").results[0][16]
)
# Make sure that the deprecated linkRoutePattern is set up correctly
query_response = local_node.query(type="org.apache.qpid.dispatch.router.config.linkRoute")
self.assertEqual(2, len(query_response.results))
self.assertEqual("in", query_response.results[0][7])
self.assertEqual("out", query_response.results[1][7])
results = local_node.query(type="org.apache.qpid.dispatch.router.config.address").results
multicast_found = False
spread_found = False
closest_found = False
for result in results:
if result[3] == "closest":
closest_found = True
self.assertEqual(result[4], "closest")
if result[3] == "spread":
spread_found = True
self.assertEqual(result[4], "balanced")
if result[3] == "multicast":
multicast_found = True
self.assertEqual(result[4], "multicast")
self.assertTrue(multicast_found)
self.assertTrue(spread_found)
self.assertTrue(closest_found)
开发者ID:ChugR,项目名称:qpid-dispatch,代码行数:55,代码来源:system_tests_deprecated.py
示例14: test_link_route_ingress_egress_transit_counts
def test_link_route_ingress_egress_transit_counts(self):
address1 = self.routers[2].addresses[0]
address2 = self.routers[2].addresses[0]
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_transit_index = outs.attribute_names.index('deliveriesTransit')
results = outs.results[0]
pre_ingress_count = results[deliveries_ingress_index]
pre_egress_count = results[deliveries_egress_index]
pre_transit_count = results[deliveries_transit_index]
# Send and receive on the same router, router C
test = IngressEgressTransitLinkRouteTest(address1, address2)
test.run()
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_transit_index = outs.attribute_names.index('deliveriesTransit')
results = outs.results[0]
post_ingress_count = results[deliveries_ingress_index]
post_egress_count = results[deliveries_egress_index]
post_transit_count = results[deliveries_transit_index]
# 10 messages entered the router, and 10 messages were echoed by router A and one mgmt request
self.assertEqual(post_ingress_count - pre_ingress_count, 21)
# 10 messages + 1 mgmt request
self.assertEqual(post_egress_count - pre_egress_count, 11)
# 10 messages went out this router
self.assertEqual(post_transit_count - pre_transit_count, 10)
开发者ID:kgiusti,项目名称:dispatch,代码行数:41,代码来源:system_tests_global_delivery_counts.py
示例15: is_router_connected
def is_router_connected(self, router_id, **retry_kwargs):
try:
self.management.read(identity="router.node/%s" % router_id)
# TODO aconway 2015-01-29: The above check should be enough, we
# should not advertise a remote router in managment till it is fully
# connected. However we still get a race where the router is not
# actually ready for traffic. Investigate.
# Meantime the following actually tests send-thru to the router.
node = Node.connect(self.addresses[0], router_id, timeout=1)
return retry_exception(lambda: node.query('org.apache.qpid.dispatch.router'))
except:
return False
开发者ID:ChugR,项目名称:qpid-dispatch,代码行数:12,代码来源:system_test.py
示例16: on_start
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.normal_conn = event.container.connect(self.normal_address)
self.sender = event.container.create_sender(self.normal_conn, self.dest)
self.last_action = "Attached normal sender"
local_node = Node.connect(self.normal_address, timeout=TIMEOUT)
res = local_node.query(type='org.apache.qpid.dispatch.router')
results = res.results[0]
attribute_names = res.attribute_names
if 8 == results[attribute_names.index('autoLinkCount')]:
self.autolink_count_ok = True
开发者ID:apache,项目名称:qpid-dispatch,代码行数:12,代码来源:system_tests_autolinks.py
示例17: test_aaa_partial_link_route_match
def test_aaa_partial_link_route_match(self):
"""
The linkRoutePattern on Routers C and B is set to org.apache.
Creates a receiver listening on the address 'org.apache.dev' and a sender that sends to address 'org.apache.dev'.
Sends a message to org.apache.dev via router QDR.C and makes sure that the message was successfully
routed (using partial address matching) and received using pre-created links that were created as a
result of specifying addresses in the linkRoutePattern attribute('org.apache.').
"""
hello_world_1 = "Hello World_1!"
# Connects to listener #2 on QDR.C
addr = self.routers[2].addresses[1]
blocking_connection = BlockingConnection(addr)
# Receive on org.apache.dev
blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev")
apply_options = AtMostOnce()
# Sender to to org.apache.dev
blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options)
msg = Message(body=hello_world_1)
# Send a message
blocking_sender.send(msg)
received_message = blocking_receiver.receive()
self.assertEqual(hello_world_1, received_message.body)
# Connect to the router acting like the broker (QDR.A) and check the deliveriesIngress and deliveriesEgress
local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
self.assertEqual(u'QDR.A', local_node.query(type='org.apache.qpid.dispatch.router',
attribute_names=['routerId']).results[0][0])
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='router.address/M0org.apache.dev').deliveriesEgress,
"deliveriesEgress is wrong")
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='router.address/M0org.apache.dev').deliveriesIngress,
"deliveriesIngress is wrong")
# There should be 4 links -
# 1. outbound receiver link on org.apache.dev
# 2. inbound sender link on blocking_sender
# 3. inbound link to the $management
# 4. outbound link to $management
# self.assertEqual(4, len()
self.assertEquals(4, len(local_node.query(type='org.apache.qpid.dispatch.router.link').results))
#blocking_receiver.close()
blocking_connection.close()
开发者ID:ErnieAllen,项目名称:qpid-dispatch,代码行数:52,代码来源:system_tests_link_routes.py
示例18: test_route_container_ingress
def test_route_container_ingress(self):
regular_addr = self.router.addresses[0]
route_container_addr = self.router.addresses[1]
test = RouteContainerIngressTest(route_container_addr, regular_addr)
test.run()
local_node = Node.connect(regular_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_route_container_index = outs.attribute_names.index('deliveriesIngressRouteContainer')
results = outs.results[0]
self.assertEqual(results[deliveries_ingress_route_container_index], 20)
开发者ID:kgiusti,项目名称:dispatch,代码行数:13,代码来源:system_tests_global_delivery_counts.py
示例19: test_entity_names
def test_entity_names(self):
nodes = [self.cleanup(Node.connect(Url(r.addresses[0]))) for r in self.routers]
# Test that all entities have a consitent identity format: type/name
entities = list(chain(
*[n.query(attribute_names=['type', 'identity', 'name']).iter_entities() for n in nodes]))
for e in entities:
if e.type == MANAGEMENT:
self.assertEqual(e.identity, "self")
else:
if e.type == 'org.apache.qpid.dispatch.connection':
# This will make sure that the identity of the connection object is always numeric
self.assertRegexpMatches(str(e.identity), "[1-9]+", e)
else:
self.assertRegexpMatches(e.identity, "^%s/" % short_name(e.type), e)
开发者ID:lulf,项目名称:qpid-dispatch,代码行数:14,代码来源:system_tests_management.py
示例20: test_create_listener
def test_create_listener(self):
"""Create a new listener on a running router"""
port = self.get_port()
# Note qdrouter schema defines port as string not int, since it can be a service name.
attributes = {'name':'foo', 'port':str(port), 'role':'normal', 'saslMechanisms': 'ANONYMOUS', 'authenticatePeer': False}
entity = self.assert_create_ok(LISTENER, 'foo', attributes)
self.assertEqual(entity['name'], 'foo')
self.assertEqual(entity['host'], '')
# Connect via the new listener
node3 = self.cleanup(Node.connect(Url(port=port)))
router = node3.query(type=ROUTER).get_entities()
self.assertEqual(self.router.name, router[0]['id'])
开发者ID:lulf,项目名称:qpid-dispatch,代码行数:14,代码来源:system_tests_management.py
注:本文中的qpid_dispatch.management.client.Node类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论