本文整理汇总了Java中org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly类的典型用法代码示例。如果您正苦于以下问题:Java MessageListenerOrderly类的具体用法?Java MessageListenerOrderly怎么用?Java MessageListenerOrderly使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MessageListenerOrderly类属于org.apache.rocketmq.client.consumer.listener包,在下文中一共展示了MessageListenerOrderly类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: ConsumeMessageOrderlyService
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(//
this.defaultMQPushConsumer.getConsumeThreadMin(), //
this.defaultMQPushConsumer.getConsumeThreadMax(), //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.consumeRequestQueue, //
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
开发者ID:lirenzuo,项目名称:rocketmq-rocketmq-all-4.1.0-incubating,代码行数:19,代码来源:ConsumeMessageOrderlyService.java
示例2: testPullMessage_SuccessWithOrderlyService
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
@Test
public void testPullMessage_SuccessWithOrderlyService() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return null;
}
};
pushConsumer.registerMessageListener(listenerOrderly);
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), listenerOrderly));
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true);
pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestLater(createPullRequest(), 100);
countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
开发者ID:lirenzuo,项目名称:rocketmq-rocketmq-all-4.1.0-incubating,代码行数:25,代码来源:DefaultMQPushConsumerTest.java
示例3: main
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_message_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
// 顺序消费特点:不会有两个消费者共同消费任一队列,且当消费者数量小于队列数时,消费者会消费多个队列
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//context.setAutoCommit(false);
System.out.printf(System.currentTimeMillis() + "," + Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
开发者ID:javahongxi,项目名称:whatsmars,代码行数:23,代码来源:Consumer.java
示例4: ConsumeMessageOrderlyService
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
开发者ID:apache,项目名称:rocketmq,代码行数:20,代码来源:ConsumeMessageOrderlyService.java
示例5: getMessageListener
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
@Test
public void getMessageListener() throws Exception {
RocketmqMessageConsumerListener<String> listener = new RocketmqMessageConsumerListener<String>();
Assert.assertNotNull(listener.getBatch());
Assert.assertNotNull(listener.getModel());
Assert.assertNull(listener.getConsumer());
Assert.assertNull(listener.getMessageDecoder());
Assert.assertTrue(listener.getMessageListener() instanceof MessageListenerOrderly);
listener.setBatch(BATCH.NON_BATCH.name());
listener.setModel(MODEL.MODEL_2.name());
listener.setConsumer(new MessageConsumer<String>());
listener.setMessageDecoder(new RocketmqMessageDecoderDemo());
Assert.assertTrue(listener.getMessageListener() instanceof MessageListenerConcurrently);
}
开发者ID:DarkPhoenixs,项目名称:message-queue-client-framework,代码行数:20,代码来源:RocketmqMessageConsumerListenerTest.java
示例6: receiveOrderMessage
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
public void receiveOrderMessage(){
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer(RocketMQConfiguration.ROCKETMQ_GROUP);
consumer.setNamesrvAddr(RocketMQConfiguration.ROCKETMQ_NAMESRV);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe(RocketMQConfiguration.ROCKETMQ_TOPIC,"*");
//监听消息接受
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
consumeOrderlyContext.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msgs.get(0).getBody()) + "%n");
System.out.println(consumeTimes.incrementAndGet());
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
consumeOrderlyContext.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
开发者ID:mumudemo,项目名称:mumu-rocketmq,代码行数:33,代码来源:RocketMQOrderConsumer.java
示例7: main
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
开发者ID:lirenzuo,项目名称:rocketmq-rocketmq-all-4.1.0-incubating,代码行数:34,代码来源:Consumer.java
示例8: main
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
开发者ID:apache,项目名称:rocketmq,代码行数:34,代码来源:Consumer.java
示例9: registerMessageListener
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
/**
* Register a callback to execute on message arrival for orderly consuming.
*
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
开发者ID:lirenzuo,项目名称:rocketmq-rocketmq-all-4.1.0-incubating,代码行数:11,代码来源:DefaultMQPushConsumer.java
示例10: registerMessageListener
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; //导入依赖的package包/类
void registerMessageListener(final MessageListenerOrderly messageListener);
开发者ID:lirenzuo,项目名称:rocketmq-rocketmq-all-4.1.0-incubating,代码行数:2,代码来源:MQPushConsumer.java
注:本文中的org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论