本文整理汇总了Java中org.apache.kafka.common.requests.MetadataResponse类的典型用法代码示例。如果您正苦于以下问题:Java MetadataResponse类的具体用法?Java MetadataResponse怎么用?Java MetadataResponse使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MetadataResponse类属于org.apache.kafka.common.requests包,在下文中一共展示了MetadataResponse类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: makeReady
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
* Prepares a set of given internal topics.
*
* If a topic does not exist creates a new topic.
* If a topic with the correct number of partitions exists ignores it.
* If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
*/
public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
try {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
if (metadata.brokers().size() < replicationFactor) {
throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " +
" but replication factor is " + replicationFactor + "." +
" Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\"" +
" or add more brokers to your cluster.");
}
streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata);
return;
} catch (StreamsException ex) {
log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
}
// backoff
time.sleep(100L);
}
throw new StreamsException("Could not create internal topics.");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:30,代码来源:InternalTopicManager.java
示例2: getNumPartitions
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
* Get the number of partitions for the given topics
*/
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
try {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
existingTopicPartitions.keySet().retainAll(topics);
return existingTopicPartitions;
} catch (StreamsException ex) {
log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i);
}
// backoff
time.sleep(100L);
}
throw new StreamsException("Could not get number of partitions.");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:InternalTopicManager.java
示例3: fetchMetadata
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
* Fetch the metadata for all topics
*/
public MetadataResponse fetchMetadata() {
final ClientRequest clientRequest = kafkaClient.newClientRequest(
getAnyReadyBrokerId(),
MetadataRequest.Builder.allTopics(),
Time.SYSTEM.milliseconds(),
true);
final ClientResponse clientResponse = sendRequest(clientRequest);
if (!clientResponse.hasResponse()) {
throw new StreamsException("Empty response for client request.");
}
if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
throw new StreamsException("Inconsistent response type for internal topic metadata request. " +
"Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
}
final MetadataResponse metadataResponse = (MetadataResponse) clientResponse.responseBody();
return metadataResponse;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:StreamsKafkaClient.java
示例4: handleCompletedMetadataResponse
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
// 处理MetadataResponse
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
// 修改metadataFetchInProgress
this.metadataFetchInProgress = false;
// 创建Cluster
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
// 在update中,首先通知Metadata上的监听器,更新cluster,唤醒等待Metadata更新完成后的线程
this.metadata.update(cluster, response.unavailableTopics(), now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
// 更新失败,只是更新lastRefreshMs字段
this.metadata.failedUpdate(now);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:NetworkClient.java
示例5: newMetadataResponse
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private MetadataResponse newMetadataResponse(String topic, Errors error) {
List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
if (error == Errors.NONE) {
for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
Errors.NONE,
partitionInfo.partition(),
partitionInfo.leader(),
Arrays.asList(partitionInfo.replicas()),
Arrays.asList(partitionInfo.inSyncReplicas())));
}
}
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
return new MetadataResponse(cluster.nodes(), null, MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:FetcherTest.java
示例6: handleResponse
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);
}
}
开发者ID:txazo,项目名称:kafka,代码行数:19,代码来源:NetworkClient.java
示例7: newMetadataResponse
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private MetadataResponse newMetadataResponse(String topic, Errors error) {
List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
if (error == Errors.NONE) {
for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
Errors.NONE,
partitionInfo.partition(),
partitionInfo.leader(),
Arrays.asList(partitionInfo.replicas()),
Arrays.asList(partitionInfo.inSyncReplicas())));
}
}
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
}
开发者ID:txazo,项目名称:kafka,代码行数:17,代码来源:FetcherTest.java
示例8: getLeaderToShutDown
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public int getLeaderToShutDown(String topic) throws Exception {
ZkUtils zkUtils = getZkUtils();
try {
MetadataResponse.PartitionMetadata firstPart = null;
do {
if (firstPart != null) {
LOG.info("Unable to find leader. error code {}", firstPart.error().code());
// not the first try. Sleep a bit
Thread.sleep(150);
}
List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata();
firstPart = partitionMetadata.get(0);
}
while (firstPart.error().code() != 0);
return firstPart.leader().id();
} finally {
zkUtils.close();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:KafkaTestEnvironmentImpl.java
示例9: fetchMetadata
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public MetadataResponse fetchMetadata() {
Node node = new Node(1, "host1", 1001);
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>());
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
Collections.singletonList(topicMetadata));
return response;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:InternalTopicManagerTest.java
示例10: handleCompletedReceives
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
* Handle any completed receives and update the response list with the responses received.
*
* @param responses The list of responses to update
* @param now The current time
*/
// 遍历completedReceives,并在InFlightRequests中删除对应的ClientRequest,并向response列表中添加对应的ClientResponse
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
// 接受到完整的响应,可以删除inFlightRequests中的ClientRequest
InFlightRequest req = inFlightRequests.completeNext(source);
// 解析响应
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {}, for key {}, received {}", req.destination,
req.header.apiKey(), responseStruct.toString());
}
AbstractResponse body = createResponse(responseStruct, req.header);
// 是MetadataResponse响应
if (req.isInternalRequest && body instanceof MetadataResponse)
// 处理MetadataResponse,更新集群中的元数据
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
// 如果不是MetadataResponse,则创建ClientResposne并添加到response集合
responses.add(req.completed(body, now));
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:33,代码来源:NetworkClient.java
示例11: getTopic
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public Topic getTopic(final String topicName) {
if (AdminUtils.topicExists(zkUtils, topicName)) {
final MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
final Topic topic = new Topic();
topic.setName(topicMetadata.topic());
topic.setPartitions(topicMetadata.partitionMetadata().size());
final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> e.replicas().size()).sum();
topic.setReplications(replicas);
topic.setProperties(getTopicProperties(topicName));
return topic;
}
throw new UnknownTopicException(topicName);
}
开发者ID:craftsmenlabs,项目名称:kafka-admin-rest-api,代码行数:15,代码来源:TopicServiceImpl.java
示例12: startEmbeddedKafka
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
//Start mock Kakfa
String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
System.out.println("zkConnectionStr" + zkConnectionStr);
zkConnection = new ZkConnection(zkConnectionStr);
// Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
kafkaServer.start();
kafkaServer.createTopic(topicName, 3, 1);
kafkaServer.waitTopicUntilReady(topicName);
MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
Assert.assertEquals(topicName, topicMetadata.topic());
}
开发者ID:apache,项目名称:kylin,代码行数:16,代码来源:BuildCubeWithStream.java
示例13: fetchTopicMeta
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) {
ZkClient zkClient = new ZkClient(zkConnection);
ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
zkClient.setZkSerializer(new ZKStringSerializer());
MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
zkClient.close();
return topicMetadata;
}
开发者ID:apache,项目名称:kylin,代码行数:9,代码来源:MockKafka.java
示例14: waitTopicUntilReady
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) {
boolean isReady = false;
MetadataResponse.TopicMetadata topicMeta = null;
while (!isReady) {
Random random = new Random();
topicMeta = this.fetchTopicMeta(topic);
List<MetadataResponse.PartitionMetadata> partitionsMetadata = topicMeta.partitionMetadata();
Iterator<MetadataResponse.PartitionMetadata> iterator = partitionsMetadata.iterator();
boolean hasGotLeader = true;
boolean hasGotReplica = true;
while (iterator.hasNext()) {
MetadataResponse.PartitionMetadata partitionMeta = iterator.next();
hasGotLeader &= (!partitionMeta.leader().isEmpty());
if (partitionMeta.leader().isEmpty()) {
System.out.println("Partition leader is not ready, wait 1s.");
break;
}
hasGotReplica &= (!partitionMeta.replicas().isEmpty());
if (partitionMeta.replicas().isEmpty()) {
System.out.println("Partition replica is not ready, wait 1s.");
break;
}
}
isReady = hasGotLeader & hasGotReplica;
if (!isReady) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
return topicMeta;
}
开发者ID:apache,项目名称:kylin,代码行数:34,代码来源:MockKafka.java
示例15: createTopics
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor,
final long windowChangeLogAdditionalRetention, final MetadataResponse metadata) {
// do nothing
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:6,代码来源:InternalTopicManagerTest.java
示例16: handleCompletedMetadataResponse
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
// Do nothing
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:ManualMetadataUpdater.java
示例17: getPartitions
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
public int getPartitions(String topicName) {
MetadataResponse.TopicMetadata metaData = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
return metaData.partitionMetadata().size();
}
开发者ID:Stratio,项目名称:bdt,代码行数:5,代码来源:KafkaUtils.java
示例18: getControllerReadyBrokerId
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
*
* @return if Id of the controller node, or an exception if no controller is found or
* controller is not ready
*/
private String getControllerReadyBrokerId(final MetadataResponse metadata) {
return ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:StreamsKafkaClient.java
示例19: handleCompletedMetadataResponse
import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
* If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`.
*
* This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
* requests with special handling for completed receives of such requests.
*/
void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse);
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:MetadataUpdater.java
注:本文中的org.apache.kafka.common.requests.MetadataResponse类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论