• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java MetadataResponse类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.common.requests.MetadataResponse的典型用法代码示例。如果您正苦于以下问题:Java MetadataResponse类的具体用法?Java MetadataResponse怎么用?Java MetadataResponse使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



MetadataResponse类属于org.apache.kafka.common.requests包,在下文中一共展示了MetadataResponse类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: makeReady

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
 * Prepares a set of given internal topics.
 *
 * If a topic does not exist creates a new topic.
 * If a topic with the correct number of partitions exists ignores it.
 * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
 */
public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
    for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
        try {
            final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
            final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
            final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
            if (metadata.brokers().size() < replicationFactor) {
                throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " +
                    " but replication factor is " + replicationFactor + "." +
                    " Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\""  +
                    " or add more brokers to your cluster.");
            }
            streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata);
            return;
        } catch (StreamsException ex) {
            log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
        }
        // backoff
        time.sleep(100L);
    }
    throw new StreamsException("Could not create internal topics.");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:30,代码来源:InternalTopicManager.java


示例2: getNumPartitions

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
 * Get the number of partitions for the given topics
 */
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
    for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
        try {
            final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
            final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
            existingTopicPartitions.keySet().retainAll(topics);

            return existingTopicPartitions;
        } catch (StreamsException ex) {
            log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i);
        }
        // backoff
        time.sleep(100L);
    }
    throw new StreamsException("Could not get number of partitions.");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:InternalTopicManager.java


示例3: fetchMetadata

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
 * Fetch the metadata for all topics
 */
public MetadataResponse fetchMetadata() {

    final ClientRequest clientRequest = kafkaClient.newClientRequest(
        getAnyReadyBrokerId(),
        MetadataRequest.Builder.allTopics(),
        Time.SYSTEM.milliseconds(),
        true);
    final ClientResponse clientResponse = sendRequest(clientRequest);

    if (!clientResponse.hasResponse()) {
        throw new StreamsException("Empty response for client request.");
    }
    if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
        throw new StreamsException("Inconsistent response type for internal topic metadata request. " +
            "Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
    }
    final MetadataResponse metadataResponse = (MetadataResponse) clientResponse.responseBody();
    return metadataResponse;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:StreamsKafkaClient.java


示例4: handleCompletedMetadataResponse

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
// 处理MetadataResponse
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
    // 修改metadataFetchInProgress
    this.metadataFetchInProgress = false;
    // 创建Cluster
    Cluster cluster = response.cluster();
    // check if any topics metadata failed to get updated
    Map<String, Errors> errors = response.errors();
    if (!errors.isEmpty())
        log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);

    // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
    // created which means we will get errors and no nodes until it exists
    if (cluster.nodes().size() > 0) {
        // 在update中,首先通知Metadata上的监听器,更新cluster,唤醒等待Metadata更新完成后的线程
        this.metadata.update(cluster, response.unavailableTopics(), now);
    } else {
        log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
        // 更新失败,只是更新lastRefreshMs字段
        this.metadata.failedUpdate(now);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:NetworkClient.java


示例5: newMetadataResponse

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private MetadataResponse newMetadataResponse(String topic, Errors error) {
    List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
    if (error == Errors.NONE) {
        for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
            partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
                    Errors.NONE,
                    partitionInfo.partition(),
                    partitionInfo.leader(),
                    Arrays.asList(partitionInfo.replicas()),
                    Arrays.asList(partitionInfo.inSyncReplicas())));
        }
    }

    MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
    return new MetadataResponse(cluster.nodes(), null, MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:FetcherTest.java


示例6: handleResponse

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private void handleResponse(RequestHeader header, Struct body, long now) {
    this.metadataFetchInProgress = false;
    MetadataResponse response = new MetadataResponse(body);
    Cluster cluster = response.cluster();
    // check if any topics metadata failed to get updated
    Map<String, Errors> errors = response.errors();
    if (!errors.isEmpty())
        log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);

    // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
    // created which means we will get errors and no nodes until it exists
    if (cluster.nodes().size() > 0) {
        this.metadata.update(cluster, now);
    } else {
        log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
        this.metadata.failedUpdate(now);
    }
}
 
开发者ID:txazo,项目名称:kafka,代码行数:19,代码来源:NetworkClient.java


示例7: newMetadataResponse

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private MetadataResponse newMetadataResponse(String topic, Errors error) {
    List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
    if (error == Errors.NONE) {
        for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
            partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
                    Errors.NONE,
                    partitionInfo.partition(),
                    partitionInfo.leader(),
                    Arrays.asList(partitionInfo.replicas()),
                    Arrays.asList(partitionInfo.inSyncReplicas())));
        }
    }

    MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
    return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
}
 
开发者ID:txazo,项目名称:kafka,代码行数:17,代码来源:FetcherTest.java


示例8: getLeaderToShutDown

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public int getLeaderToShutDown(String topic) throws Exception {
	ZkUtils zkUtils = getZkUtils();
	try {
		MetadataResponse.PartitionMetadata firstPart = null;
		do {
			if (firstPart != null) {
				LOG.info("Unable to find leader. error code {}", firstPart.error().code());
				// not the first try. Sleep a bit
				Thread.sleep(150);
			}

			List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata();
			firstPart = partitionMetadata.get(0);
		}
		while (firstPart.error().code() != 0);

		return firstPart.leader().id();
	} finally {
		zkUtils.close();
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:KafkaTestEnvironmentImpl.java


示例9: fetchMetadata

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public MetadataResponse fetchMetadata() {
    Node node = new Node(1, "host1", 1001);
    MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>());
    MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
    MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
        Collections.singletonList(topicMetadata));
    return response;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:InternalTopicManagerTest.java


示例10: handleCompletedReceives

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
 * Handle any completed receives and update the response list with the responses received.
 *
 * @param responses The list of responses to update
 * @param now The current time
 */
// 遍历completedReceives,并在InFlightRequests中删除对应的ClientRequest,并向response列表中添加对应的ClientResponse
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        // 接受到完整的响应,可以删除inFlightRequests中的ClientRequest
        InFlightRequest req = inFlightRequests.completeNext(source);
        // 解析响应
        Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
            throttleTimeSensor, now);
        if (log.isTraceEnabled()) {
            log.trace("Completed receive from node {}, for key {}, received {}", req.destination,
                req.header.apiKey(), responseStruct.toString());
        }
        AbstractResponse body = createResponse(responseStruct, req.header);

        // 是MetadataResponse响应
        if (req.isInternalRequest && body instanceof MetadataResponse)
            // 处理MetadataResponse,更新集群中的元数据
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            // 如果不是MetadataResponse,则创建ClientResposne并添加到response集合
            responses.add(req.completed(body, now));
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:33,代码来源:NetworkClient.java


示例11: getTopic

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public Topic getTopic(final String topicName) {
    if (AdminUtils.topicExists(zkUtils, topicName)) {
        final MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
        final Topic topic = new Topic();
        topic.setName(topicMetadata.topic());
        topic.setPartitions(topicMetadata.partitionMetadata().size());
        final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> e.replicas().size()).sum();
        topic.setReplications(replicas);
        topic.setProperties(getTopicProperties(topicName));
        return topic;
    }
    throw new UnknownTopicException(topicName);
}
 
开发者ID:craftsmenlabs,项目名称:kafka-admin-rest-api,代码行数:15,代码来源:TopicServiceImpl.java


示例12: startEmbeddedKafka

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
    //Start mock Kakfa
    String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
    System.out.println("zkConnectionStr" + zkConnectionStr);
    zkConnection = new ZkConnection(zkConnectionStr);
    // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
    kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
    kafkaServer.start();

    kafkaServer.createTopic(topicName, 3, 1);
    kafkaServer.waitTopicUntilReady(topicName);

    MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
    Assert.assertEquals(topicName, topicMetadata.topic());
}
 
开发者ID:apache,项目名称:kylin,代码行数:16,代码来源:BuildCubeWithStream.java


示例13: fetchTopicMeta

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) {
    ZkClient zkClient = new ZkClient(zkConnection);
    ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
    zkClient.setZkSerializer(new ZKStringSerializer());
    MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
    zkClient.close();
    return topicMetadata;
}
 
开发者ID:apache,项目名称:kylin,代码行数:9,代码来源:MockKafka.java


示例14: waitTopicUntilReady

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) {
    boolean isReady = false;
    MetadataResponse.TopicMetadata topicMeta = null;
    while (!isReady) {
        Random random = new Random();
        topicMeta = this.fetchTopicMeta(topic);
        List<MetadataResponse.PartitionMetadata> partitionsMetadata = topicMeta.partitionMetadata();
        Iterator<MetadataResponse.PartitionMetadata> iterator = partitionsMetadata.iterator();
        boolean hasGotLeader = true;
        boolean hasGotReplica = true;
        while (iterator.hasNext()) {
            MetadataResponse.PartitionMetadata partitionMeta = iterator.next();
            hasGotLeader &= (!partitionMeta.leader().isEmpty());
            if (partitionMeta.leader().isEmpty()) {
                System.out.println("Partition leader is not ready, wait 1s.");
                break;
            }
            hasGotReplica &= (!partitionMeta.replicas().isEmpty());
            if (partitionMeta.replicas().isEmpty()) {
                System.out.println("Partition replica is not ready, wait 1s.");
                break;
            }
        }
        isReady = hasGotLeader & hasGotReplica;
        if (!isReady) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
    }
    return topicMeta;
}
 
开发者ID:apache,项目名称:kylin,代码行数:34,代码来源:MockKafka.java


示例15: createTopics

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor,
                         final long windowChangeLogAdditionalRetention, final MetadataResponse metadata) {
    // do nothing
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:6,代码来源:InternalTopicManagerTest.java


示例16: handleCompletedMetadataResponse

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
@Override
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
    // Do nothing
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:ManualMetadataUpdater.java


示例17: getPartitions

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
public int getPartitions(String topicName) {
    MetadataResponse.TopicMetadata metaData = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
    return metaData.partitionMetadata().size();
}
 
开发者ID:Stratio,项目名称:bdt,代码行数:5,代码来源:KafkaUtils.java


示例18: getControllerReadyBrokerId

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
 *
 * @return if Id of the controller node, or an exception if no controller is found or
 * controller is not ready
 */
private String getControllerReadyBrokerId(final MetadataResponse metadata) {
    return ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:StreamsKafkaClient.java


示例19: handleCompletedMetadataResponse

import org.apache.kafka.common.requests.MetadataResponse; //导入依赖的package包/类
/**
 * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`.
 *
 * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
 * requests with special handling for completed receives of such requests.
 */
void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse);
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:MetadataUpdater.java



注:本文中的org.apache.kafka.common.requests.MetadataResponse类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ResourceContext类代码示例发布时间:2022-05-22
下一篇:
Java Type类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap