本文整理汇总了Java中com.alibaba.rocketmq.common.message.MessageExt类的典型用法代码示例。如果您正苦于以下问题:Java MessageExt类的具体用法?Java MessageExt怎么用?Java MessageExt使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MessageExt类属于com.alibaba.rocketmq.common.message包,在下文中一共展示了MessageExt类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args){
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
开发者ID:youngMen1,项目名称:-Spring-SpringMVC-Mybatis-,代码行数:27,代码来源:Consumer.java
示例2: makeMessageToCosumeAgain
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
this.msgTreeMapTemp.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
}
finally {
this.lockTreeMap.writeLock().unlock();
}
}
catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:18,代码来源:ProcessQueue.java
示例3: consumeMessageDirectly
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
(ConsumeMessageDirectlyResultRequestHeader) request
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
ConsumeMessageDirectlyResult result =
this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
if (null != result) {
response.setCode(ResponseCode.SUCCESS);
response.setBody(result.encode());
}
else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
}
return response;
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:23,代码来源:ClientRemotingProcessor.java
示例4: viewMessage
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis) throws RemotingException,
MQBrokerException, InterruptedException {
ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
requestHeader.setOffset(phyoffset);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
MessageExt messageExt = MessageDecoder.decode(byteBuffer);
return messageExt;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:21,代码来源:MQClientAPIImpl.java
示例5: checkLocalTransactionState
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("server checking TrMsg " + msg.toString());
int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:18,代码来源:TransactionCheckListenerImpl.java
示例6: main
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:22,代码来源:Consumer.java
示例7: main
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"key113",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
QueryResult queryMessage =
producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
for (MessageExt m : queryMessage.getMessageList()) {
System.out.println(m);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:27,代码来源:TestProducer.java
示例8: main
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
filterCode);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:21,代码来源:Consumer.java
示例9: preProcess
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void preProcess(MessageListenerOrderly t, Object proxy, Method method, Object[] args) {
List<MessageExt> msgs = (List<MessageExt>) args[0];
String url = address + "/" + msgs.get(0).getTopic();
Map<String, Object> params = new HashMap<String, Object>();
params.put(CaptureConstants.INFO_CLIENT_REQUEST_URL, url);
params.put(CaptureConstants.INFO_CLIENT_REQUEST_ACTION, "Consumer." + method.getName());
params.put(CaptureConstants.INFO_CLIENT_APPID, applicationId);
params.put(CaptureConstants.INFO_CLIENT_TYPE, "rabbitmq.client");
params.put(CaptureConstants.INFO_CAPCONTEXT_TAG, method.getName());
if (logger.isDebugable()) {
logger.debug("Invoke START:" + url + ",op=Consumer." + method.getName(), null);
}
UAVServer.instance().runMonitorCaptureOnServerCapPoint(CaptureConstants.CAPPOINT_APP_CLIENT,
Monitor.CapturePhase.PRECAP, params);
}
开发者ID:uavorg,项目名称:uavstack,代码行数:22,代码来源:RocketmqIT.java
示例10: handleRocketMqMessage
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
private void handleRocketMqMessage(List<MessageExt> msgs, MQMessageListener messageListner) {
for (MessageExt msg : msgs) {
MQMessage ceMessage = new MQMessage();
byte[] msgBody = msg.getBody();
ceMessage.setMessage(msgBody);
try {
messageListner.handle(ceMessage);
}
catch (Exception e) {
log.err(this, "MsgId=" + msg.getMsgId() + ",Topic=" + msg.getTopic() + ",MsgBornTimeStamp="
+ msg.getBornTimestamp() + "处理异常:" + e.getMessage(), e);
}
}
}
开发者ID:uavorg,项目名称:uavstack,代码行数:18,代码来源:RocketMQConsumer.java
示例11: consume
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public void consume(String topic, String subExpression) throws MQClientException {
consumer.subscribe(topic, "*");
// consumer.subscribe(topic, "TagA || TagB");
// consumer.subscribe(topic, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for(MessageExt msg : msgs) {
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
}
else if (msg.getTags() != null && msg.getTags().equals("TagB")) {
// 执行TagB的消费
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
开发者ID:TFdream,项目名称:mq-in-action,代码行数:27,代码来源:MqPushConsumer.java
示例12: consumeMessage
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// TODO Auto-generated method stub
for (MessageExt msg : msgs) {
byte[] body = msg.getBody();
if (body.length == 2 && body[0] == 0 && body[1] == 0) {
LOG.error("Young:Got the end signal");
_collector.emit("stop",new Values("stop"));
continue;
}
if (msg.getTopic().equals(RaceConfig.MqPayTopic)) {
return doPayTopic(body);
}else if (msg.getTopic().equals(RaceConfig.MqTaobaoTradeTopic)) {
putTaobaoTradeToTair(body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else if (msg.getTopic().equals(RaceConfig.MqTmallTradeTopic)) {
putTmallTradeToTair(body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
开发者ID:yangliguang,项目名称:preliminary.demo,代码行数:27,代码来源:RaceSentenceSpout.java
示例13: consumeMessage
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
byte[] body = msg.getBody();
if (body.length == 2 && body[0] == 0 && body[1] == 0) {
LOG.error("Young:Got the end signal");
_collector.emit("stop",new Values("stop"));
continue;
}
if (msg.getTopic().equals(RaceConfig.MqPayTopic)) {
return doPayTopic(body);
}else if (msg.getTopic().equals(RaceConfig.MqTaobaoTradeTopic)) {
putTaobaoTradeToTair(body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else if (msg.getTopic().equals(RaceConfig.MqTmallTradeTopic)) {
putTmallTradeToTair(body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
开发者ID:yangliguang,项目名称:preliminary.demo,代码行数:25,代码来源:SpoutLocal.java
示例14: testScheduledMessageConsumer
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Before
public void testScheduledMessageConsumer() throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// Subscribe topics
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
开发者ID:dzh,项目名称:coca,代码行数:22,代码来源:TestScheduledMessageProducer.java
示例15: testBroadcastConsumer
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Before
public void testBroadcastConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
开发者ID:dzh,项目名称:coca,代码行数:24,代码来源:TestBroadcastProducer.java
示例16: consumeMessage
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public ConsumeConcurrentlyStatus consumeMessage(String strBody, MessageExt msg,
ConsumeConcurrentlyContext context)
{
// TODO 待完善 日志系统
for (Map<String, String> map : matching)
{
Map<String, String> params = new HashMap<String, String>();
params.put("Topic", msg.getTopic());
params.put("Tags", msg.getTags());
if (externalCall == null) params.put("Body", strBody);
else params.put("Body", externalCall.MessageConsumer(strBody, msg, context));
return sendMqTags(map, msg.getTags(), params);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
开发者ID:atliwen,项目名称:rocketMqCurrency,代码行数:21,代码来源:ExternalCallConcurrentlyStatus.java
示例17: viewMessage
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis) throws RemotingException,
MQBrokerException, InterruptedException {
ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
requestHeader.setOffset(phyoffset);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
MessageExt messageExt = MessageDecoder.decode(byteBuffer);
return messageExt;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:21,代码来源:MQClientAPIImpl.java
示例18: testStartupTwice
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Test
public void testStartupTwice() throws MQClientException {
DefaultMQPushConsumer consumer = getConsumer("S_fundmng_demo_producer", "TopicTest-fundmng");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context) {
System.out.println("Consumer1:" + JSON.toJSONString(msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("start 1");
consumer.start();
System.out.println("start 2");
LockSupport.park();
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:18,代码来源:DoubleConsumerTest.java
示例19: consumed
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
ConsumeStats cstats = this.examineConsumeStats(group);
ClusterInfo ci = this.examineBrokerClusterInfo();
Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, OffsetWrapper> next = it.next();
MessageQueue mq = next.getKey();
if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
if (brokerData != null) {
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
return true;
}
}
}
}
}
return false;
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:26,代码来源:DefaultMQAdminExtImpl.java
示例20: createBodyFile
import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
private static String createBodyFile(MessageExt msg) throws IOException {
DataOutputStream dos = null;
try {
String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
File file = new File(bodyTmpFilePath);
if (!file.exists()) {
file.mkdirs();
}
bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
dos.write(msg.getBody());
return bodyTmpFilePath;
}
finally {
if (dos != null)
dos.close();
}
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:20,代码来源:QueryMsgByIdSubCommand.java
注:本文中的com.alibaba.rocketmq.common.message.MessageExt类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论