本文整理汇总了Java中org.apache.qpid.proton.amqp.messaging.TerminusDurability类的典型用法代码示例。如果您正苦于以下问题:Java TerminusDurability类的具体用法?Java TerminusDurability怎么用?Java TerminusDurability使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TerminusDurability类属于org.apache.qpid.proton.amqp.messaging包,在下文中一共展示了TerminusDurability类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: size
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
public int size()
{
return _impl.getCapabilities() != null
? 7
: _impl.getDynamicNodeProperties() != null
? 6
: _impl.getDynamic()
? 5
: (_impl.getTimeout() != null && !_impl.getTimeout().equals(UnsignedInteger.ZERO))
? 4
: !_impl.getExpiryPolicy().equals(TerminusExpiryPolicy.SESSION_END)
? 3
: !_impl.getDurable().equals(TerminusDurability.NONE)
? 2
: _impl.getAddress() != null
? 1
: 0;
}
开发者ID:apache,项目名称:qpid-proton-j,代码行数:20,代码来源:TargetType.java
示例2: doTestCreateDynamicSender
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void doTestCreateDynamicSender(boolean topic) throws Exception {
Target target = createDynamicTarget(topic);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(target);
assertNotNull(sender);
Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
assertTrue(remoteTarget.getDynamic());
assertTrue(remoteTarget.getDurable().equals(TerminusDurability.NONE));
assertTrue(remoteTarget.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH));
// Check the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = remoteTarget.getDynamicNodeProperties();
assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
Queue queueView = getProxyToQueue(remoteTarget.getAddress());
assertNotNull(queueView);
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:27,代码来源:AmqpTempDestinationTest.java
示例3: doTestCreateDynamicReceiver
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void doTestCreateDynamicReceiver(boolean topic) throws Exception {
Source source = createDynamicSource(topic);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(source);
assertNotNull(receiver);
Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
assertTrue(remoteSource.getDynamic());
assertTrue(remoteSource.getDurable().equals(TerminusDurability.NONE));
assertTrue(remoteSource.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH));
// Check the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = remoteSource.getDynamicNodeProperties();
assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
Queue queueView = getProxyToQueue(remoteSource.getAddress());
assertNotNull(queueView);
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:27,代码来源:AmqpTempDestinationTest.java
示例4: createDynamicSource
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
protected Source createDynamicSource(boolean topic) {
Source source = new Source();
source.setDynamic(true);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
source.setDynamicNodeProperties(dynamicNodeProperties);
// Set the capability to indicate the node type being created
if (!topic) {
source.setCapabilities(TEMP_QUEUE_CAPABILITY);
} else {
source.setCapabilities(TEMP_TOPIC_CAPABILITY);
}
return source;
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:22,代码来源:AmqpTempDestinationTest.java
示例5: createDynamicTarget
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
protected Target createDynamicTarget(boolean topic) {
Target target = new Target();
target.setDynamic(true);
target.setDurable(TerminusDurability.NONE);
target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
target.setDynamicNodeProperties(dynamicNodeProperties);
// Set the capability to indicate the node type being created
if (!topic) {
target.setCapabilities(TEMP_QUEUE_CAPABILITY);
} else {
target.setCapabilities(TEMP_TOPIC_CAPABILITY);
}
return target;
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:22,代码来源:AmqpTempDestinationTest.java
示例6: test2ConsumersOnNonSharedDurableAddress
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnNonSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
try {
session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
fail("Exception expected");
} catch (Exception e) {
//expected
} finally {
receiver.close();
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:24,代码来源:ClientDefinedMultiConsumerTest.java
示例7: testAddressDoesntExist
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void testAddressDoesntExist() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
try {
session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
fail("Exception expected");
} catch (Exception e) {
//expected
} finally {
receiver.close();
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:21,代码来源:ClientDefinedMultiConsumerTest.java
示例8: configureSource
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
protected void configureSource(Source source) {
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) {
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setDistributionMode(COPY);
} else {
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
}
if (info.isNoLocal()) {
filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
}
if (info.getSelector() != null && !info.getSelector().trim().equals("")) {
filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(info.getSelector()));
}
if (!filters.isEmpty()) {
source.setFilter(filters);
}
}
开发者ID:fusesource,项目名称:hawtjms,代码行数:25,代码来源:AmqpConsumer.java
示例9: publish
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
public void publish(Endpoint endpoint, String address, String content) throws InterruptedException {
String containerId = "test-publisher";
ProtonClient client = ProtonClient.create(vertx);
CountDownLatch latch = new CountDownLatch(1);
client.connect(endpoint.hostname(), endpoint.port(), connection -> {
if (connection.succeeded()) {
ProtonConnection conn = connection.result();
conn.setContainer(containerId);
conn.openHandler(result -> {
System.out.println("Connected: " + result.result().getRemoteContainer());
Target target = new Target();
target.setAddress(address);
target.setCapabilities(Symbol.getSymbol("topic"));
target.setDurable(TerminusDurability.UNSETTLED_STATE);
ProtonSender sender = conn.createSender(address);
sender.setTarget(target);
sender.openHandler(res -> {
if (res.succeeded()) {
System.out.println("Opened sender");
Message message = Message.Factory.create();
message.setAddress(address);
message.setBody(new AmqpValue(content));
sender.send(message, protonDelivery -> latch.countDown());
} else {
System.out.println("Failed opening sender: " + res.cause().getMessage());
}
});
sender.open();
});
conn.open();
} else {
System.out.println("Connection failed: " + connection.cause().getMessage());
}
});
latch.await(1, TimeUnit.MINUTES);
}
开发者ID:EnMasseProject,项目名称:enmasse,代码行数:37,代码来源:TestPublisher.java
示例10: createSender
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
private void createSender(org.apache.qpid.proton.engine.Session session) throws Exception {
Sender sender = session.sender(subscriberInfo.getClientId());
Target target = new Target();
target.setAddress(subscriberInfo.getClientAddress());
sender.setTarget(target);
Source source = new Source();
source.setAddress(subscriberInfo.getClientAddress());
source.setDurable(TerminusDurability.UNSETTLED_STATE);
sender.setSource(source);
sender.open();
}
开发者ID:EnMasseProject,项目名称:enmasse,代码行数:14,代码来源:LinkInitiator.java
示例11: size
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
public int size()
{
return _impl.getCapabilities() != null
? 11
: _impl.getOutcomes() != null
? 10
: _impl.getDefaultOutcome() != null
? 9
: _impl.getFilter() != null
? 8
: _impl.getDistributionMode() != null
? 7
: _impl.getDynamicNodeProperties() != null
? 6
: _impl.getDynamic()
? 5
: (_impl.getTimeout() != null && !_impl.getTimeout().equals(UnsignedInteger.ZERO))
? 4
: _impl.getExpiryPolicy() != TerminusExpiryPolicy.SESSION_END
? 3
: _impl.getDurable() != TerminusDurability.NONE
? 2
: _impl.getAddress() != null
? 1
: 0;
}
开发者ID:apache,项目名称:qpid-proton-j,代码行数:28,代码来源:SourceType.java
示例12: testClientIdIsSetInSubscriptionList
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void testClientIdIsSetInSubscriptionList() throws Exception {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
connection.setContainerId("testClient");
connection.connect();
try {
AmqpSession session = connection.createSession();
Source source = new Source();
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setCapabilities(Symbol.getSymbol("topic"));
source.setAddress("mytopic");
session.createReceiver(source, "testSub");
SimpleString fo = new SimpleString("testClient.testSub:mytopic");
assertNotNull(server.locateQueue(fo));
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:28,代码来源:AmqpReceiverTest.java
示例13: testLookupExistingSubscription
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void testLookupExistingSubscription() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
receiver.detach();
receiver = session.lookupSubscription(getSubscriptionName());
assertNotNull(receiver);
Receiver protonReceiver = receiver.getReceiver();
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
if (remoteSource.getFilter() != null) {
assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
}
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
assertEquals(COPY, remoteSource.getDistributionMode());
receiver.close();
try {
receiver = session.lookupSubscription(getSubscriptionName());
fail("Should not be able to lookup the subscription");
} catch (Exception e) {
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:41,代码来源:AmqpDurableReceiverTest.java
示例14: testLookupExistingSubscriptionWithNoLocal
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void testLookupExistingSubscriptionWithNoLocal() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), null, true);
receiver.detach();
receiver = session.lookupSubscription(getSubscriptionName());
assertNotNull(receiver);
Receiver protonReceiver = receiver.getReceiver();
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
assertNotNull(remoteSource.getFilter());
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
assertEquals(COPY, remoteSource.getDistributionMode());
receiver.close();
try {
receiver = session.lookupSubscription(getSubscriptionName());
fail("Should not be able to lookup the subscription");
} catch (Exception e) {
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:40,代码来源:AmqpDurableReceiverTest.java
示例15: test2ConsumersOnSharedVolatileAddress
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(address.toString(), 2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close();
//check its been deleted
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
}
}, 1000);
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:33,代码来源:ClientDefinedMultiConsumerTest.java
示例16: test2ConsumersOnSharedVolatileAddressBrokerDefined
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1");
receiver.flow(1);
receiver2.flow(1);
sendMessages(address.toString(), 2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close();
//check its **Hasn't** been deleted
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:29,代码来源:ClientDefinedMultiConsumerTest.java
示例17: test2ConsumersOnSharedVolatileAddressNoReceiverClose
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(address.toString(), 2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
//check its been deleted
connection.close();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
}
}, 1000);
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:31,代码来源:ClientDefinedMultiConsumerTest.java
示例18: test2ConsumersOnSharedVolatileAddressGlobal
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect(false));
AmqpSession session = connection.createSession();
Source source = createSharedGlobalSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(address.toString(), 2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
receiver2.close();
//check its been deleted
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null;
}
}, 1000);
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:33,代码来源:ClientDefinedMultiConsumerTest.java
示例19: test2ConsumersOnSharedDurableAddress
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(address.toString(), 2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:28,代码来源:ClientDefinedMultiConsumerTest.java
示例20: test2ConsumersOnSharedDurableAddressReconnect
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(address.toString(), 2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
connection.close();
connection = addConnection(client.connect("myClientId"));
session = connection.createSession();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:38,代码来源:ClientDefinedMultiConsumerTest.java
注:本文中的org.apache.qpid.proton.amqp.messaging.TerminusDurability类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论