本文整理汇总了Java中org.apache.kafka.common.config.ConfigResource类的典型用法代码示例。如果您正苦于以下问题:Java ConfigResource类的具体用法?Java ConfigResource怎么用?Java ConfigResource使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ConfigResource类属于org.apache.kafka.common.config包,在下文中一共展示了ConfigResource类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: describeResource
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
private List<ConfigItem> describeResource(final ConfigResource configResource) {
final DescribeConfigsResult result = adminClient.describeConfigs(Collections.singleton(configResource));
final List<ConfigItem> configItems = new ArrayList<>();
try {
final Map<ConfigResource, Config> configMap = result.all().get();
final Config config = configMap.get(configResource);
for (final ConfigEntry configEntry : config.entries()) {
// Skip sensitive entries
if (configEntry.isSensitive()) {
continue;
}
configItems.add(
new ConfigItem(configEntry.name(), configEntry.value(), configEntry.isDefault())
);
}
return configItems;
} catch (InterruptedException | ExecutionException e) {
// TODO Handle this
throw new RuntimeException(e.getMessage(), e);
}
}
开发者ID:SourceLabOrg,项目名称:kafka-webview,代码行数:25,代码来源:KafkaOperations.java
示例2: all
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Return a future which succeeds only if all the config descriptions succeed.
*/
public KafkaFuture<Map<ConfigResource, Config>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
@Override
public Map<ConfigResource, Config> apply(Void v) {
Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
try {
configs.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures
// completed successfully.
throw new RuntimeException(e);
}
}
return configs;
}
});
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:DescribeConfigsResult.java
示例3: configResourceToResource
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
private Resource configResourceToResource(ConfigResource configResource) {
ResourceType resourceType;
switch (configResource.type()) {
case TOPIC:
resourceType = ResourceType.TOPIC;
break;
case BROKER:
resourceType = ResourceType.BROKER;
break;
default:
throw new IllegalArgumentException("Unexpected resource type " + configResource.type());
}
return new Resource(resourceType, configResource.name());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KafkaAdminClient.java
示例4: init
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
private void init() {
try {
DescribeClusterResult describeClusterResult = adminClient.describeCluster();
List<Node> nodes = new ArrayList<>(describeClusterResult.nodes().get());
if (!nodes.isEmpty()) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER,
String.valueOf(nodes.get(0).id()));
DescribeConfigsResult
describeConfigsResult = adminClient.describeConfigs(Collections.singleton(resource));
Map<ConfigResource, Config> config = describeConfigsResult.all().get();
this.isDeleteTopicEnabled = config.get(resource)
.entries()
.stream()
.anyMatch(configEntry -> configEntry.name().equalsIgnoreCase("delete.topic.enable")
&& configEntry.value().equalsIgnoreCase("true"));
} else {
log.warn("No available broker found to fetch config info.");
throw new KsqlException("Could not fetch broker information. KSQL cannot initialize "
+ "AdminCLient.");
}
} catch (InterruptedException | ExecutionException ex) {
log.error("Failed to initialize TopicClient: {}", ex.getMessage());
throw new KsqlException("Could not fetch broker information. KSQL cannot initialize "
+ "AdminCLient.");
}
}
开发者ID:confluentinc,项目名称:ksql,代码行数:30,代码来源:KafkaTopicClientImpl.java
示例5: getDescribeConfigsResult
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
private DescribeConfigsResult getDescribeConfigsResult() {
DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
ConfigEntry configEntry = new ConfigEntry("delete.topic.enable", "true");
Map<ConfigResource, Config> config = new HashMap<>();
config.put(new ConfigResource(ConfigResource.Type.BROKER, "1"), new Config
(Collections.singletonList(configEntry)));
expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config));
replay(describeConfigsResult);
return describeConfigsResult;
}
开发者ID:confluentinc,项目名称:ksql,代码行数:11,代码来源:KafkaTopicClientTest.java
示例6: getTopicConfig
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Get the configuration for topic.
*/
public TopicConfig getTopicConfig(final String topic) {
final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
return new TopicConfig(describeResource(configResource));
}
开发者ID:SourceLabOrg,项目名称:kafka-webview,代码行数:8,代码来源:KafkaOperations.java
示例7: getBrokerConfig
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Get the configuration for topic.
*/
public BrokerConfig getBrokerConfig(final String brokerId) {
final ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
return new BrokerConfig(describeResource(configResource));
}
开发者ID:SourceLabOrg,项目名称:kafka-webview,代码行数:8,代码来源:KafkaOperations.java
示例8: AlterConfigsResult
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
AlterConfigsResult(Map<ConfigResource, KafkaFuture<Void>> futures) {
this.futures = futures;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:AlterConfigsResult.java
示例9: values
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Return a map from resources to futures which can be used to check the status of the operation on each resource.
*/
public Map<ConfigResource, KafkaFuture<Void>> values() {
return futures;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:7,代码来源:AlterConfigsResult.java
示例10: DescribeConfigsResult
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {
this.futures = futures;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:DescribeConfigsResult.java
示例11: resource
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
public ConfigResource resource() {
return resource;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:AlterConfigPolicy.java
示例12: describeConfigs
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Get the configuration for the specified resources with the default options.
*
* See {@link #describeConfigs(Collection, DescribeConfigsOptions)} for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param resources The resources (topic and broker resource types are currently supported)
* @return The DescribeConfigsResult
*/
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
return describeConfigs(resources, new DescribeConfigsOptions());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:AdminClient.java
示例13: alterConfigs
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Update the configuration for the specified resources with the default options.
*
* See {@link #alterConfigs(Map, AlterConfigsOptions)} for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param configs The resources with their configs (topic is the only resource type with configs that can
* be updated currently)
* @return The AlterConfigsResult
*/
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
return alterConfigs(configs, new AlterConfigsOptions());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:AdminClient.java
示例14: values
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Return a map from resources to futures which can be used to check the status of the configuration for each
* resource.
*/
public Map<ConfigResource, KafkaFuture<Config>> values() {
return futures;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:DescribeConfigsResult.java
示例15: RequestMetadata
import org.apache.kafka.common.config.ConfigResource; //导入依赖的package包/类
/**
* Create an instance of this class with the provided parameters.
*
* This constructor is public to make testing of <code>AlterConfigPolicy</code> implementations easier.
*/
public RequestMetadata(ConfigResource resource, Map<String, String> configs) {
this.resource = resource;
this.configs = configs;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:AlterConfigPolicy.java
注:本文中的org.apache.kafka.common.config.ConfigResource类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论