本文整理汇总了Java中org.springframework.boot.autoconfigure.kafka.KafkaProperties类的典型用法代码示例。如果您正苦于以下问题:Java KafkaProperties类的具体用法?Java KafkaProperties怎么用?Java KafkaProperties使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KafkaProperties类属于org.springframework.boot.autoconfigure.kafka包,在下文中一共展示了KafkaProperties类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: KafkaTopicProvisioner
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
Map<String, Object> adminClientProperties = kafkaProperties.buildAdminProperties();
String kafkaConnectionString = kafkaBinderConfigurationProperties.getKafkaConnectionString();
if (ObjectUtils.isEmpty(adminClientProperties.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG))
|| !kafkaConnectionString.equals(kafkaBinderConfigurationProperties.getDefaultKafkaConnectionString())) {
adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnectionString);
}
this.configurationProperties = kafkaBinderConfigurationProperties;
this.adminClient = AdminClient.create(adminClientProperties);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:14,代码来源:KafkaTopicProvisioner.java
示例2: testPropertyOverrides
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
@Test
public void testPropertyOverrides() throws Exception {
KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties();
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(binderConfigurationProperties, new KafkaProperties());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfigurationProperties,
provisioningProvider);
KafkaConsumerProperties consumerProps = new KafkaConsumerProperties();
ExtendedConsumerProperties<KafkaConsumerProperties> ecp =
new ExtendedConsumerProperties<KafkaConsumerProperties>(consumerProps);
Method method = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
String.class, ExtendedConsumerProperties.class);
method.setAccessible(true);
// test default for anon
Object factory = method.invoke(binder, true, "foo", ecp);
Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
// test default for named
factory = method.invoke(binder, false, "foo", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
// binder level setting
binderConfigurationProperties.setConfiguration(
Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
factory = method.invoke(binder, false, "foo", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
// consumer level setting
consumerProps.setConfiguration(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
factory = method.invoke(binder, false, "foo", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:37,代码来源:KafkaBinderUnitTests.java
示例3: testProducerRunsInTx
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerRunsInTx() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties();
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(configurationProperties, kafkaProperties);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
final Producer mockProducer = mock(Producer.class);
willReturn(Collections.singletonList(new TopicPartition("foo", 0))).given(mockProducer).partitionsFor(anyString());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) {
@Override
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory =
spy(super.getProducerFactory(transactionIdPrefix, producerProperties));
willReturn(mockProducer).given(producerFactory).createProducer();
return producerFactory;
}
};
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
DirectChannel channel = new DirectChannel();
KafkaProducerProperties extension = new KafkaProducerProperties();
ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(extension);
binder.bindProducer("foo", channel, properties);
channel.send(new GenericMessage<>("foo".getBytes()));
InOrder inOrder = inOrder(mockProducer);
inOrder.verify(mockProducer).beginTransaction();
inOrder.verify(mockProducer).send(any(ProducerRecord.class), any(Callback.class));
inOrder.verify(mockProducer).commitTransaction();
inOrder.verify(mockProducer).close();
inOrder.verifyNoMoreInteractions();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:39,代码来源:KafkaTransactionTests.java
示例4: getBinder
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
@Override
protected KafkaTestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(binderConfiguration, new KafkaProperties());
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
catch (Exception e) {
throw new RuntimeException(e);
}
binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
}
return binder;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:16,代码来源:KafkaBinderTests.java
示例5: kafkaSender
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
@Bean Sender kafkaSender(KafkaProperties config) {
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
// Kafka expects the input to be a String, but KafkaProperties returns a list
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder()
.topic(this.topic)
.overrides(properties)
.build();
}
开发者ID:spring-cloud,项目名称:spring-cloud-sleuth,代码行数:15,代码来源:ZipkinKafkaSenderConfiguration.java
示例6: kafkaProducerFactory
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
/**
* Customized ProducerFactory bean.
* @param properties the kafka properties.
* @return the bean.
*/
@Bean("kafkaProducerFactory")
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProperties);
}
开发者ID:Paleozoic,项目名称:storm_spring_boot_demo,代码行数:11,代码来源:KafkaConfig.java
示例7: kafkaProperties
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
@Bean
KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:5,代码来源:KafkaBinderAutoConfigurationPropertiesTest.java
示例8: kafkaPropertiesMap
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
/**
* TODO: eggs hurt:Kyro Serialization needs the default constructor and 'implements Serializable'
* KafkaProperties and DefaultKafkaProducerFactory can not be Serialized
* @param properties
* @return
*/
@Bean("kafkaPropertiesMap")
public Map<String, Object> kafkaPropertiesMap(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
return producerProperties;
}
开发者ID:Paleozoic,项目名称:storm_spring_boot_demo,代码行数:12,代码来源:KafkaConfig.java
示例9: kafkaConsumerFactory
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; //导入依赖的package包/类
/**
* 注意:目前Spring Boot自动配置只支持单个分组group-id创建consumer,
* 如需要多个应该创建多个不同的DefaultKafkaConsumerFactory properties.getConsumer().setGroupId(groupId);
* Customized ConsumerFactory bean.
* @param properties the kafka properties.
* @return the bean.
*/
@Bean("kafkaConsumerFactory")
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
开发者ID:Paleozoic,项目名称:storm_spring_boot_demo,代码行数:13,代码来源:KafkaConfig.java
注:本文中的org.springframework.boot.autoconfigure.kafka.KafkaProperties类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论