• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ConsumeStats类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.alibaba.rocketmq.common.admin.ConsumeStats的典型用法代码示例。如果您正苦于以下问题:Java ConsumeStats类的具体用法?Java ConsumeStats怎么用?Java ConsumeStats使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ConsumeStats类属于com.alibaba.rocketmq.common.admin包,在下文中一共展示了ConsumeStats类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
        MQBrokerException {
    GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    requestHeader.setTopic(topic);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    switch (response.getCode()) {
    case ResponseCode.SUCCESS: {
        ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
        return consumeStats;
    }
    default:
        break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:22,代码来源:MQClientAPIImpl.java


示例2: queryConsumeStatsList

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
@Override
public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) {
    ConsumeStats consumeStats = null;
    try {
        consumeStats = mqAdminExt.examineConsumeStats(groupName); // todo  ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) can use
    } catch (Exception e) {
        throw propagate(e);
    }
    List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
        @Override
        public boolean apply(MessageQueue o) {
            return StringUtils.isBlank(topic) || o.getTopic().equals(topic);
        }
    }));
    Collections.sort(mqList);
    List<TopicConsumerInfo> topicConsumerInfoList = Lists.newArrayList();
    TopicConsumerInfo nowTopicConsumerInfo = null;
    for (MessageQueue mq : mqList) {
        if (nowTopicConsumerInfo == null || (!StringUtils.equals(mq.getTopic(), nowTopicConsumerInfo.getTopic()))) {
            nowTopicConsumerInfo = new TopicConsumerInfo(mq.getTopic());
            topicConsumerInfoList.add(nowTopicConsumerInfo);
        }
        nowTopicConsumerInfo.appendQueueStatInfo(QueueStatInfo.fromOffsetTableEntry(mq, consumeStats.getOffsetTable().get(mq)));
    }
    return topicConsumerInfoList;
}
 
开发者ID:didapinchegit,项目名称:rocket-console,代码行数:27,代码来源:ConsumerServiceImpl.java


示例3: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
        MQBrokerException {
    GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    requestHeader.setTopic(topic);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
            return consumeStats;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:22,代码来源:MQClientAPIImpl.java


示例4: fetchBrokerNameSetBySubscriptionGroup

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
@Override
public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group){
    Set<String> brokerNameSet = Sets.newHashSet();
    ConsumeStats consumeStats = null;
    try {
        consumeStats = mqAdminExt.examineConsumeStats(group);
    } catch (Exception err){
        throw propagate(err);
    }
    for(MessageQueue messageQueue : consumeStats.getOffsetTable().keySet()){
        brokerNameSet.add(messageQueue.getBrokerName());
    }
    return brokerNameSet;

}
 
开发者ID:didapinchegit,项目名称:rocket-console,代码行数:16,代码来源:ConsumerServiceImpl.java


示例5: examineConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
@Override
public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException,
        MQClientException, InterruptedException, MQBrokerException {
    String retryTopic = MixAll.getRetryTopic(consumerGroup);
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
    ConsumeStats result = new ConsumeStats();

    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        String addr = bd.selectBrokerAddr();
        if (addr != null) {
            // 由于查询时间戳会产生IO操作,可能会耗时较长,所以超时时间设置为15s
            ConsumeStats consumeStats =
                    this.mQClientFactory.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, 15000);
            result.getOffsetTable().putAll(consumeStats.getOffsetTable());
            long value = result.getConsumeTps() + consumeStats.getConsumeTps();
            result.setConsumeTps(value);
        }
    }

    if (result.getOffsetTable().isEmpty()) {
        throw new MQClientException(
            "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message",
            null);
    }

    return result;
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:28,代码来源:DefaultMQAdminExtImpl.java


示例6: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup,
        final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    // 添加虚拟运行环境相关的projectGroupPrefix
    String consumerGroupWithProjectGroup = consumerGroup;
    if (!UtilAll.isBlank(projectGroupPrefix)) {
        consumerGroupWithProjectGroup =
                VirtualEnvUtil.buildWithProjectGroup(consumerGroup, projectGroupPrefix);
    }

    GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
    requestHeader.setConsumerGroup(consumerGroupWithProjectGroup);

    RemotingCommand request =
            RemotingCommand.createRequestCommand(MQRequestCode.GET_CONSUME_STATS_VALUE, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    switch (response.getCode()) {
    case ResponseCode.SUCCESS_VALUE: {
        ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
        // 清除虚拟运行环境相关的projectGroupPrefix
        if (!UtilAll.isBlank(projectGroupPrefix)) {
            HashMap<MessageQueue, OffsetWrapper> newTopicOffsetMap =
                    new HashMap<MessageQueue, OffsetWrapper>();
            for (Map.Entry<MessageQueue, OffsetWrapper> messageQueue : consumeStats.getOffsetTable()
                .entrySet()) {
                MessageQueue key = messageQueue.getKey();
                key.setTopic(VirtualEnvUtil.clearProjectGroup(key.getTopic(), projectGroupPrefix));
                newTopicOffsetMap.put(key, messageQueue.getValue());
            }
            consumeStats.setOffsetTable(newTopicOffsetMap);
        }

        return consumeStats;
    }
    default:
        break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
开发者ID:brucechan0921,项目名称:RocketMQ-3.0.8,代码行数:41,代码来源:MQClientAPIImpl.java


示例7: examineConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
@Override
public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
        MQBrokerException {
    return examineConsumeStats(consumerGroup, null);
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:6,代码来源:DefaultMQAdminExt.java


示例8: examineConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:3,代码来源:MQAdminExt.java


示例9: computeUndoneMsgs

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
private void computeUndoneMsgs(final UndoneMsgs undoneMsgs, final ConsumeStats consumeStats) {
    long total = 0;
    long singleMax = 0;
    long delayMax = 0;
    Iterator<Entry<MessageQueue, OffsetWrapper>> it = consumeStats.getOffsetTable().entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, OffsetWrapper> next = it.next();
        MessageQueue mq = next.getKey();
        OffsetWrapper ow = next.getValue();
        long diff = ow.getBrokerOffset() - ow.getConsumerOffset();

        if (diff > singleMax) {
            singleMax = diff;
        }

        if (diff > 0) {
            total += diff;
        }

        // Delay
        if (ow.getLastTimestamp() > 0) {
            try {
                long maxOffset = this.defaultMQPullConsumer.maxOffset(mq);
                if (maxOffset > 0) {
                    PullResult pull = this.defaultMQPullConsumer.pull(mq, "*", maxOffset - 1, 1);
                    switch (pull.getPullStatus()) {
                    case FOUND:
                        long delay =
                                pull.getMsgFoundList().get(0).getStoreTimestamp() - ow.getLastTimestamp();
                        if (delay > delayMax) {
                            delayMax = delay;
                        }
                        break;
                    case NO_MATCHED_MSG:
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
            }
            catch (Exception e) {
            }
        }
    }

    undoneMsgs.setUndoneMsgsTotal(total);
    undoneMsgs.setUndoneMsgsSingleMQ(singleMax);
    undoneMsgs.setUndoneMsgsDelayTimeMills(delayMax);
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:52,代码来源:MonitorService.java


示例10: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetConsumeStatsRequestHeader requestHeader =
            (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);

    ConsumeStats consumeStats = new ConsumeStats();

    Set<String> topics = new HashSet<String>();
    if (UtilAll.isBlank(requestHeader.getTopic())) {
        topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
    }
    else {
        topics.add(requestHeader.getTopic());
    }

    for (String topic : topics) {
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            log.warn("consumeStats, topic config not exist, {}", topic);
            continue;
        }
        {
            SubscriptionData findSubscriptionData =
                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
            if (null == findSubscriptionData //
                    && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
                log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
                continue;
            }
        }

        for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);

            OffsetWrapper offsetWrapper = new OffsetWrapper();

            long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
            if (brokerOffset < 0)
                brokerOffset = 0;

            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
                requestHeader.getConsumerGroup(),//
                topic,//
                i);
            if (consumerOffset < 0)
                consumerOffset = 0;

            offsetWrapper.setBrokerOffset(brokerOffset);
            offsetWrapper.setConsumerOffset(consumerOffset);

            long timeOffset = consumerOffset - 1;
            if (timeOffset >= 0) {
                long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
                if (lastTimestamp > 0) {
                    offsetWrapper.setLastTimestamp(lastTimestamp);
                }
            }

            consumeStats.getOffsetTable().put(mq, offsetWrapper);
        }

        double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic);

        consumeTps += consumeStats.getConsumeTps();
        consumeStats.setConsumeTps(consumeTps);
    }

    byte[] body = consumeStats.encode();
    response.setBody(body);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:77,代码来源:AdminBrokerProcessor.java


示例11: getConsumeStatsList

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
public List<Map<String, List<ConsumeStats>>> getConsumeStatsList() {
    return consumeStatsList;
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:4,代码来源:ConsumeStatsList.java


示例12: setConsumeStatsList

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
public void setConsumeStatsList(List<Map<String, List<ConsumeStats>>> consumeStatsList) {
    this.consumeStatsList = consumeStatsList;
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:4,代码来源:ConsumeStatsList.java


示例13: examineConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
@Override
public ConsumeStats examineConsumeStats(String consumerGroup)
        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
    return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
}
 
开发者ID:didapinchegit,项目名称:rocket-console,代码行数:6,代码来源:MQAdminExtImpl.java


示例14: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetConsumeStatsRequestHeader requestHeader =
            (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);

    ConsumeStats consumeStats = new ConsumeStats();

    Set<String> topics = new HashSet<String>();
    if (UtilAll.isBlank(requestHeader.getTopic())) {
        topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
    } else {
        topics.add(requestHeader.getTopic());
    }

    for (String topic : topics) {
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            log.warn("consumeStats, topic config not exist, {}", topic);
            continue;
        }
        {
            SubscriptionData findSubscriptionData =
                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
            if (null == findSubscriptionData //
                    && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
                log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
                continue;
            }
        }

        for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);

            OffsetWrapper offsetWrapper = new OffsetWrapper();

            long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
            if (brokerOffset < 0)
                brokerOffset = 0;

            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
                    requestHeader.getConsumerGroup(),//
                    topic,//
                    i);
            if (consumerOffset < 0)
                consumerOffset = 0;

            offsetWrapper.setBrokerOffset(brokerOffset);
            offsetWrapper.setConsumerOffset(consumerOffset);

            long timeOffset = consumerOffset - 1;
            if (timeOffset >= 0) {
                long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
                if (lastTimestamp > 0) {
                    offsetWrapper.setLastTimestamp(lastTimestamp);
                }
            }

            consumeStats.getOffsetTable().put(mq, offsetWrapper);
        }

        double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic);

        consumeTps += consumeStats.getConsumeTps();
        consumeStats.setConsumeTps(consumeTps);
    }

    byte[] body = consumeStats.encode();
    response.setBody(body);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:76,代码来源:AdminBrokerProcessor.java


示例15: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup,
        final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    // 添加虚拟运行环境相关的projectGroupPrefix
    String consumerGroupWithProjectGroup = consumerGroup;
    if (!UtilAll.isBlank(projectGroupPrefix)) {
        consumerGroupWithProjectGroup =
                VirtualEnvUtil.buildWithProjectGroup(consumerGroup, projectGroupPrefix);
    }

    GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
    requestHeader.setConsumerGroup(consumerGroupWithProjectGroup);

    RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    switch (response.getCode()) {
    case ResponseCode.SUCCESS: {
        ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
        // 清除虚拟运行环境相关的projectGroupPrefix
        if (!UtilAll.isBlank(projectGroupPrefix)) {
            HashMap<MessageQueue, OffsetWrapper> newTopicOffsetMap =
                    new HashMap<MessageQueue, OffsetWrapper>();
            for (Map.Entry<MessageQueue, OffsetWrapper> messageQueue : consumeStats.getOffsetTable()
                .entrySet()) {
                MessageQueue key = messageQueue.getKey();
                key.setTopic(VirtualEnvUtil.clearProjectGroup(key.getTopic(), projectGroupPrefix));
                newTopicOffsetMap.put(key, messageQueue.getValue());
            }
            consumeStats.setOffsetTable(newTopicOffsetMap);
        }

        return consumeStats;
    }
    default:
        break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:42,代码来源:MQClientAPIImpl.java


示例16: examineConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
@Override
public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException,
        MQClientException, InterruptedException, MQBrokerException {
    return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup);
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:6,代码来源:DefaultMQAdminExt.java


示例17: resetOffsetByTimestampOld

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
@Override
public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
        boolean force) throws RemotingException, MQBrokerException, InterruptedException,
        MQClientException {
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        String addr = bd.selectBrokerAddr();
        if (addr != null) {
            // 根据 consumerGroup 查找对应的 mq
            ConsumeStats consumeStats =
                    this.mQClientFactory.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, 3000);

            // 根据 topic 过滤不需要的 mq
            for (Map.Entry<MessageQueue, OffsetWrapper> entry : consumeStats.getOffsetTable().entrySet()) {
                MessageQueue queue = entry.getKey();
                OffsetWrapper offsetWrapper = entry.getValue();
                if (topic.equals(queue.getTopic())) {
                    // 根据 timestamp 查找对应的offset
                    long offset =
                            this.mQClientFactory.getMQClientAPIImpl().searchOffset(addr, topic,
                                queue.getQueueId(), timestamp, 3000);
                    // 构建按时间回溯消费进度
                    RollbackStats rollbackStats = new RollbackStats();
                    rollbackStats.setBrokerName(bd.getBrokerName());
                    rollbackStats.setQueueId(queue.getQueueId());
                    rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset());
                    rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset());
                    rollbackStats.setTimestampOffset(offset);
                    rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset());
                    // 更新 offset
                    if (force || offset <= offsetWrapper.getConsumerOffset()) {
                        rollbackStats.setRollbackOffset(offset);
                        UpdateConsumerOffsetRequestHeader requestHeader =
                                new UpdateConsumerOffsetRequestHeader();
                        requestHeader.setConsumerGroup(consumerGroup);
                        requestHeader.setTopic(topic);
                        requestHeader.setQueueId(queue.getQueueId());
                        requestHeader.setCommitOffset(offset);
                        this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(addr,
                            requestHeader, 3000);
                    }
                    rollbackStatsList.add(rollbackStats);
                }
            }
        }
    }
    return rollbackStatsList;
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:50,代码来源:DefaultMQAdminExtImpl.java


示例18: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetConsumeStatsRequestHeader requestHeader =
            (GetConsumeStatsRequestHeader) request
                .decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);

    ConsumeStats consumeStats = new ConsumeStats();

    Set<String> topics =
            this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(
                requestHeader.getConsumerGroup());

    for (String topic : topics) {
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            log.warn("consumeStats, topic config not exist, {}", topic);
            continue;
        }

        SubscriptionData findSubscriptionData =
                this.brokerController.getConsumerManager().findSubscriptionData(
                    requestHeader.getConsumerGroup(), topic);
        if (null == findSubscriptionData) {
            log.warn("consumeStats, the consumer group[{}], topic[{}] not exist",
                requestHeader.getConsumerGroup(), topic);
            continue;
        }

        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);

            OffsetWrapper offsetWrapper = new OffsetWrapper();

            long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
            if (brokerOffset < 0)
                brokerOffset = 0;

            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
                requestHeader.getConsumerGroup(),//
                topic,//
                i);
            if (consumerOffset < 0)
                consumerOffset = 0;

            offsetWrapper.setBrokerOffset(brokerOffset);
            offsetWrapper.setConsumerOffset(consumerOffset);

            // 查询消费者最后一条消息对应的时间戳
            long timeOffset = consumerOffset - 1;
            if (timeOffset >= 0) {
                long lastTimestamp =
                        this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i,
                            timeOffset);
                if (lastTimestamp > 0) {
                    offsetWrapper.setLastTimestamp(lastTimestamp);
                }
            }

            consumeStats.getOffsetTable().put(mq, offsetWrapper);
        }

        long consumeTps =
                this.brokerController.getConsumerOffsetManager().computePullTPS(topic,
                    requestHeader.getConsumerGroup());

        consumeTps += consumeStats.getConsumeTps();
        consumeStats.setConsumeTps(consumeTps);
    }

    byte[] body = consumeStats.encode();
    response.setBody(body);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:80,代码来源:AdminBrokerProcessor.java


示例19: getConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetConsumeStatsRequestHeader requestHeader =
            (GetConsumeStatsRequestHeader) request
                .decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);

    ConsumeStats consumeStats = new ConsumeStats();

    Set<String> topics =
            this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(
                requestHeader.getConsumerGroup());

    for (String topic : topics) {
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            log.warn("consumeStats, topic config not exist, {}", topic);
            continue;
        }

        SubscriptionData findSubscriptionData =
                this.brokerController.getConsumerManager().findSubscriptionData(
                    requestHeader.getConsumerGroup(), topic);
        if (null == findSubscriptionData) {
            log.warn("consumeStats, the consumer group[{}], topic[{}] not exist",
                requestHeader.getConsumerGroup(), topic);
            continue;
        }

        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);

            OffsetWrapper offsetWrapper = new OffsetWrapper();

            long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
            if (brokerOffset < 0)
                brokerOffset = 0;

            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
                requestHeader.getConsumerGroup(),//
                topic,//
                i);
            if (consumerOffset < 0)
                consumerOffset = 0;

            offsetWrapper.setBrokerOffset(brokerOffset);
            offsetWrapper.setConsumerOffset(consumerOffset);

            // 查询消费者最后一条消息对应的时间戳
            long timeOffset = consumerOffset - 1;
            if (timeOffset >= 0) {
                long lastTimestamp =
                        this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i,
                            timeOffset);
                if (lastTimestamp > 0) {
                    offsetWrapper.setLastTimestamp(lastTimestamp);
                }
            }

            consumeStats.getOffsetTable().put(mq, offsetWrapper);
        }

        long consumeTps =
                this.brokerController.getConsumerOffsetManager().computePullTPS(topic,
                    requestHeader.getConsumerGroup());

        consumeTps += consumeStats.getConsumeTps();
        consumeStats.setConsumeTps(consumeTps);
    }

    byte[] body = consumeStats.encode();
    response.setBody(body);
    response.setCode(ResponseCode.SUCCESS_VALUE);
    response.setRemark(null);
    return response;
}
 
开发者ID:brucechan0921,项目名称:RocketMQ-3.0.8,代码行数:80,代码来源:AdminBrokerProcessor.java


示例20: examineConsumeStats

import com.alibaba.rocketmq.common.admin.ConsumeStats; //导入依赖的package包/类
/**
 * 查询消费进度
 * 
 * @param consumerGroup
 * @return
 * @throws InterruptedException
 * @throws MQClientException
 * @throws RemotingException
 * @throws MQBrokerException
 */
public ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException,
        MQClientException, InterruptedException, MQBrokerException;
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:13,代码来源:MQAdminExt.java



注:本文中的com.alibaba.rocketmq.common.admin.ConsumeStats类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java MBeanModule类代码示例发布时间:1970-01-01
下一篇:
Java MatterProperty类代码示例发布时间:1970-01-01
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap