本文整理汇总了Java中com.alibaba.rocketmq.common.TopicConfig类的典型用法代码示例。如果您正苦于以下问题:Java TopicConfig类的具体用法?Java TopicConfig怎么用?Java TopicConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TopicConfig类属于com.alibaba.rocketmq.common包,在下文中一共展示了TopicConfig类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: createTopic
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:27,代码来源:MQClientAPIImpl.java
示例2: updateTopicUnitFlag
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void updateTopicUnitFlag(final String topic, final boolean unit) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) {
int oldTopicSysFlag = topicConfig.getTopicSysFlag();
if (unit) {
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
}
else {
topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true);
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:24,代码来源:TopicConfigManager.java
示例3: updateTopicUnitSubFlag
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) {
int oldTopicSysFlag = topicConfig.getTopicSysFlag();
if (hasUnitSub) {
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true);
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:20,代码来源:TopicConfigManager.java
示例4: examineTopicConfig
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
@Override
public TopicConfig examineTopicConfig(String addr, String topic) {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
} catch (Exception err) {
throw Throwables.propagate(err);
}
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class);
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
}
default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
}
}
开发者ID:didapinchegit,项目名称:rocket-console,代码行数:20,代码来源:MQAdminExtImpl.java
示例5: examineTopicConfig
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
@Override
public List<TopicConfigInfo> examineTopicConfig(String topic) {
List<TopicConfigInfo> topicConfigInfoList = Lists.newArrayList();
TopicRouteData topicRouteData = route(topic);
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName());
BeanUtils.copyProperties(topicConfig, topicConfigInfo);
boolean hasSameTopicConfig = false;
// for (TopicConfigInfo topicConfigInfoExist : topicConfigInfoList) {
// if (topicConfigInfoExist.equals(topicConfigInfo)) {
// topicConfigInfoExist.getBrokerNameList().add(brokerData.getBrokerName());
// hasSameTopicConfig = true;
// break;
// }
// } //每一个broker的配置单独展示 变更 交互可以优化下
if (!hasSameTopicConfig) {
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
topicConfigInfoList.add(topicConfigInfo);
}
}
return topicConfigInfoList;
}
开发者ID:didapinchegit,项目名称:rocket-console,代码行数:24,代码来源:TopicServiceImpl.java
示例6: createTopic
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:27,代码来源:MQClientAPIImpl.java
示例7: updateTopicUnitFlag
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void updateTopicUnitFlag(final String topic, final boolean unit) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) {
int oldTopicSysFlag = topicConfig.getTopicSysFlag();
if (unit) {
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
} else {
topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true);
}
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:23,代码来源:TopicConfigManager.java
示例8: updateTopicUnitSubFlag
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) {
int oldTopicSysFlag = topicConfig.getTopicSysFlag();
if (hasUnitSub) {
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true);
}
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:20,代码来源:TopicConfigManager.java
示例9: createAndUpdateQueueData
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
/**
*
*
* 注意topicQueueTable<String,Liat<QueueData>>这个比较难理解的数据结构 。
* key是topic, value是topic下面的所有队列配置信息。
*
* 这个List<QueueData> 可以这么理解, 其中的每一个QueueData是topic在某一个brokername下面的队列配置元数据。
* QueueData List 就是topic在所有brokername下面的配置信息。
*
*
* @param brokerName
* @param topicConfig
*/
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
}
else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) { //如果是同一个brokername的broker上报的queuedata 。
if (qd.equals(queueData)) { //如果队列信息完全一致,则不用新增。
addNewOne = false;
}
else { //如果队列信息不一致, 则把老的queuedata移除掉
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
if (addNewOne) { //326行把brokername中老的queuedata移除掉以后 ,把当前这个queuedata加上。 这个操作保证了topic在一个brokername下只会有一个queuedata.
queueDataList.add(queueData);
}
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:52,代码来源:RouteInfoManager.java
示例10: updateAndCreateTopic
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
//写入topicConfigTable,同时持久化到topics.json文件
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
/* BrokerController.registerBrokerAll */
this.brokerController.registerBrokerAll(false, true);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:31,代码来源:AdminBrokerProcessor.java
示例11: cloneGroupOffset
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
Set<String> topics;
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());
}
else {
topics = new HashSet<String>();
topics.add(requestHeader.getTopic());
}
for (String topic : topics) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.warn("[cloneGroupOffset], topic config not exist, {}", topic);
continue;
}
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0
&& findSubscriptionData == null) {
log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);
continue;
}
}
this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),
requestHeader.getTopic());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:40,代码来源:AdminBrokerProcessor.java
示例12: buildInnerMsg
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
int queueIdInt = requestHeader.getQueueId();
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MultiTagsFlag;
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner,
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),
msgInner.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(sysFlag);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader
.getReconsumeTimes());
return msgInner;
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:32,代码来源:AbstractSendMessageProcessor.java
示例13: updateTopicConfig
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void updateTopicConfig(final TopicConfig topicConfig) {
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
log.info("update topic config, old: " + old + " new: " + topicConfig);
}
else {
log.info("create new topic, " + topicConfig);
}
this.dataVersion.nextVersion();
this.persist(); //把该topic信息持久化到topics.json文件
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:14,代码来源:TopicConfigManager.java
示例14: isOrderTopic
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public boolean isOrderTopic(final String topic) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig == null) {
return false;
}
else {
return topicConfig.isOrder();
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:10,代码来源:TopicConfigManager.java
示例15: deleteTopicConfig
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
public void deleteTopicConfig(final String topic) {
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
log.info("delete topic config OK, topic: " + old);
this.dataVersion.nextVersion();
this.persist();
}
else {
log.warn("delete topic config failed, topic: " + topic + " not exist");
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:12,代码来源:TopicConfigManager.java
示例16: printLoadDataWhenFirstBoot
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, TopicConfig> next = it.next();
log.info("load exist local topic, {}", next.getValue().toString());
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:8,代码来源:TopicConfigManager.java
示例17: test_flushTopicConfig
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
@Test
public void test_flushTopicConfig() throws Exception {
BrokerController brokerController = new BrokerController(//
new BrokerConfig(), //
new NettyServerConfig(), //
new NettyClientConfig(), //
new MessageStoreConfig());
boolean initResult = brokerController.initialize();
System.out.println("initialize " + initResult);
brokerController.start();
TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
TopicConfig topicConfig =
topicConfigManager.createTopicInSendMessageMethod("TestTopic_SEND", MixAll.DEFAULT_TOPIC,
null, 4, 0);
assertTrue(topicConfig != null);
System.out.println(topicConfig);
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
topicConfig =
topicConfigManager
.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
assertTrue(topicConfig != null);
}
topicConfigManager.persist();
brokerController.shutdown();
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:33,代码来源:TopicConfigManagerTest.java
示例18: createOrUpdate
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
@Override
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
TopicConfig topicConfig = new TopicConfig();
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (String brokerName : topicCreateOrUpdateRequest.getBrokerNameList()) {
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
}
} catch (Exception err) {
throw Throwables.propagate(err);
}
}
开发者ID:didapinchegit,项目名称:rocket-console,代码行数:14,代码来源:TopicServiceImpl.java
示例19: updateAndCreateTopic
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
/**
* 更新、创建Topic
*
* @param ctx
* @param request
* @return
* @throws RemotingCommandException
*/
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
/**
* 更新或新增Topic
*/
TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
/**
* Broker信息重新注册到NameServer
*/
this.brokerController.registerBrokerAll(false, true);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:44,代码来源:AdminBrokerProcessor.java
示例20: cloneGroupOffset
import com.alibaba.rocketmq.common.TopicConfig; //导入依赖的package包/类
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
Set<String> topics;
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());
} else {
topics = new HashSet<String>();
topics.add(requestHeader.getTopic());
}
for (String topic : topics) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.warn("[cloneGroupOffset], topic config not exist, {}", topic);
continue;
}
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0
&& findSubscriptionData == null) {
log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);
continue;
}
}
this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),
requestHeader.getTopic());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:39,代码来源:AdminBrokerProcessor.java
注:本文中的com.alibaba.rocketmq.common.TopicConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论