• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java AdminClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中kafka.admin.AdminClient的典型用法代码示例。如果您正苦于以下问题:Java AdminClient类的具体用法?Java AdminClient怎么用?Java AdminClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



AdminClient类属于kafka.admin包,在下文中一共展示了AdminClient类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: ensureStreamsApplicationDown

import kafka.admin.AdminClient; //导入依赖的package包/类
private static void ensureStreamsApplicationDown(final String kafka) {
    AdminClient adminClient = null;
    try {
        adminClient = AdminClient.createSimplePlaintext(kafka);

        final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
        while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
            if (System.currentTimeMillis() > maxWaitTime) {
                throw new RuntimeException("Streams application not down after 30 seconds.");
            }
            sleep(1000);
        }
    } finally {
        if (adminClient != null) {
            adminClient.close();
        }
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:EosTestDriver.java


示例2: KafkaAdminClient

import kafka.admin.AdminClient; //导入依赖的package包/类
/**
 * Creates and connects a new KafkaAdminClient
 * @param zkConnect The zk connect string
 * @param sessionTimeout The session timeout
 * @param connectionTimeout The connection timeout
 */
private KafkaAdminClient(final String zkConnect, final int sessionTimeout, final int connectionTimeout) {
	this.zkConnect = zkConnect;
	this.sessionTimeout = sessionTimeout;
	this.connectionTimeout = connectionTimeout;
	zkClient = new ZkClient(
			zkConnect,
			this.sessionTimeout,
			this.connectionTimeout,
		    ZKStringSerializer$.MODULE$);
	zkConnection = new ZkConnection(zkConnect, DEFAULT_SESSION_TIMEOUT);
	zkConnection.connect(this);
	zkUtils = new ZkUtils(zkClient, zkConnection, false);
	final String endPoint = findEndpoint(zkConnection);
	adminClient = endPoint==null ? null : AdminClient.createSimplePlaintext(endPoint);
	
}
 
开发者ID:nickman,项目名称:HeliosStreams,代码行数:23,代码来源:KafkaAdminClient.java


示例3: getAllConsumers

import kafka.admin.AdminClient; //导入依赖的package包/类
public ImmutableList<KafkaConsumerInfo> getAllConsumers() {
    ImmutableList<GroupOverview> consumerGroups =
            ImmutableList.copyOf(JavaConversions.asJavaCollection(adminClient.listAllConsumerGroupsFlattened()));

    ImmutableList.Builder<KafkaConsumerInfo> consumers = ImmutableList.builder();

    consumerGroups.stream()
            .filter(groupOverview -> !groupOverview.groupId().equals(KafkaStatics.GROUP_ID))
            .forEach(consumerGroup -> {
                AdminClient.ConsumerGroupSummary consumerGroupSummary =
                        adminClient.describeConsumerGroup(consumerGroup.groupId(), Duration.ofSeconds(10).toMillis());

                if (!consumerGroupSummary.consumers().isDefined()) {
                    return;
                }

                ImmutableList<AdminClient.ConsumerSummary> consumerSummaries = ImmutableList.copyOf(
                        JavaConversions.asJavaCollection(consumerGroupSummary.consumers().get()));

                if (consumerSummaries.isEmpty()) {
                    return;
                }

                consumers.addAll(convertToKafkaConsumerInfos(consumerSummaries, consumerGroup.groupId()));
            });

    return consumers.build();
}
 
开发者ID:enthusiast94,项目名称:kafka-visualizer,代码行数:29,代码来源:KafkaUtils.java


示例4: convertToKafkaConsumerInfos

import kafka.admin.AdminClient; //导入依赖的package包/类
private ImmutableList<KafkaConsumerInfo> convertToKafkaConsumerInfos(ImmutableList<AdminClient.ConsumerSummary> consumerSummaries, String groupId) {
    return consumerSummaries.stream()
            .map(consumerSummary -> {
                ImmutableList<KafkaConsumerInfo.Assignment> assignments = JavaConversions.asJavaCollection(consumerSummary.assignment())
                        .stream()
                        .map(assignment -> new KafkaConsumerInfo.Assignment(assignment.topic(), assignment.partition()))
                        .collect(ImmutableList.toImmutableList());

                return new KafkaConsumerInfo(consumerSummary.consumerId(), consumerSummary.clientId(), assignments, groupId);
            })
            .collect(ImmutableList.toImmutableList());
}
 
开发者ID:enthusiast94,项目名称:kafka-visualizer,代码行数:13,代码来源:KafkaUtils.java


示例5: cleanup

import kafka.admin.AdminClient; //导入依赖的package包/类
@Before
public void cleanup() throws Exception {
    ++testNo;

    if (adminClient == null) {
        adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
    }

    // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
    while (true) {
        Thread.sleep(50);

        try {
            TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
                    "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
        } catch (final TimeoutException e) {
            continue;
        }
        break;
    }

    prepareInputData();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:ResetIntegrationTest.java


示例6: adminClient

import kafka.admin.AdminClient; //导入依赖的package包/类
@Bean(destroyMethod = "close")
public AdminClient adminClient(CommandLineArgs commandLineArgs) {
    return AdminClient.createSimplePlaintext(commandLineArgs.kafkaServers);
}
 
开发者ID:enthusiast94,项目名称:kafka-visualizer,代码行数:5,代码来源:AppBootstrapper.java


示例7: kafkaAdmin

import kafka.admin.AdminClient; //导入依赖的package包/类
@Bean
public KafkaUtils kafkaAdmin(ZkClient zkClient, AdminClient adminClient) {
    return new KafkaUtils(zkClient, adminClient);
}
 
开发者ID:enthusiast94,项目名称:kafka-visualizer,代码行数:5,代码来源:AppBootstrapper.java


示例8: KafkaUtils

import kafka.admin.AdminClient; //导入依赖的package包/类
public KafkaUtils(ZkClient zkClient, AdminClient adminClient) {
    this.zkClient = zkClient;
    this.adminClient = adminClient;
}
 
开发者ID:enthusiast94,项目名称:kafka-visualizer,代码行数:5,代码来源:KafkaUtils.java


示例9: KafkaConsumerCommand

import kafka.admin.AdminClient; //导入依赖的package包/类
public KafkaConsumerCommand(String bootstrapServer) {
    this.bootstrapServer = bootstrapServer;
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    adminClient = AdminClient.create(props);
}
 
开发者ID:warlock-china,项目名称:azeroth,代码行数:7,代码来源:KafkaConsumerCommand.java


示例10: run

import kafka.admin.AdminClient; //导入依赖的package包/类
public int run(final String[] args, final Properties config) {
    consumerConfig.clear();
    consumerConfig.putAll(config);

    int exitCode = EXIT_CODE_SUCCESS;

    AdminClient adminClient = null;
    ZkUtils zkUtils = null;
    try {
        parseArguments(args);
        dryRun = options.has(dryRunOption);

        adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
        final String groupId = options.valueOf(applicationIdOption);


        zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
            30000,
            30000,
            JaasUtils.isZkSecurityEnabled());

        allTopics.clear();
        allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));


        if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
            throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
                        "Make sure to stop all running application instances before running the reset tool.");
        }

        if (dryRun) {
            System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
        }
        maybeResetInputAndSeekToEndIntermediateTopicOffsets();
        maybeDeleteInternalTopics(zkUtils);

    } catch (final Throwable e) {
        exitCode = EXIT_CODE_ERROR;
        System.err.println("ERROR: " + e.getMessage());
    } finally {
        if (adminClient != null) {
            adminClient.close();
        }
        if (zkUtils != null) {
            zkUtils.close();
        }
    }

    return exitCode;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:51,代码来源:StreamsResetter.java


示例11: KafkaConsumerCommand

import kafka.admin.AdminClient; //导入依赖的package包/类
public KafkaConsumerCommand(String bootstrapServer) {
	this.bootstrapServer = bootstrapServer;
	Properties props = new Properties();
	props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
	adminClient = AdminClient.create(props);
}
 
开发者ID:vakinge,项目名称:jeesuite-libs,代码行数:7,代码来源:KafkaConsumerCommand.java



注:本文中的kafka.admin.AdminClient类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java DescribeInternetGatewaysRequest类代码示例发布时间:2022-05-22
下一篇:
Java JobEntryDeleteResultFilenames类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap