本文整理汇总了Java中org.springframework.cloud.stream.binder.Binder类的典型用法代码示例。如果您正苦于以下问题:Java Binder类的具体用法?Java Binder怎么用?Java Binder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Binder类属于org.springframework.cloud.stream.binder包,在下文中一共展示了Binder类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testParentConnectionFactoryInheritedByDefault
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
public void testParentConnectionFactoryInheritedByDefault() {
context = SpringApplication.run(SimpleProcessor.class, "--server.port=0");
BinderFactory<?> binderFactory = context.getBean(BinderFactory.class);
Binder binder = binderFactory.getBinder(null);
assertThat(binder, instanceOf(RedisMessageChannelBinder.class));
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
RedisConnectionFactory binderConnectionFactory =
(RedisConnectionFactory) binderFieldAccessor.getPropertyValue("connectionFactory");
assertThat(binderConnectionFactory, instanceOf(RedisConnectionFactory.class));
RedisConnectionFactory connectionFactory = context.getBean(RedisConnectionFactory.class);
assertThat(binderConnectionFactory, is(connectionFactory));
CompositeHealthIndicator bindersHealthIndicator =
context.getBean("bindersHealthIndicator", CompositeHealthIndicator.class);
assertNotNull(bindersHealthIndicator);
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
@SuppressWarnings("unchecked")
Map<String,HealthIndicator> healthIndicators =
(Map<String, HealthIndicator>) directFieldAccessor.getPropertyValue("indicators");
assertThat(healthIndicators, hasKey("redis"));
assertThat(healthIndicators.get("redis").health().getStatus(), equalTo(Status.UP));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:23,代码来源:RedisBinderModuleTests.java
示例2: testParentConnectionFactoryInheritedIfOverridden
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
public void testParentConnectionFactoryInheritedIfOverridden() {
context = new SpringApplication(SimpleProcessor.class, ConnectionFactoryConfiguration.class).run("--server.port=0");
BinderFactory<?> binderFactory = context.getBean(BinderFactory.class);
Binder binder = binderFactory.getBinder(null);
assertThat(binder, instanceOf(RedisMessageChannelBinder.class));
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
RedisConnectionFactory binderConnectionFactory =
(RedisConnectionFactory) binderFieldAccessor.getPropertyValue("connectionFactory");
assertThat(binderConnectionFactory, is(MOCK_CONNECTION_FACTORY));
RedisConnectionFactory connectionFactory = context.getBean(RedisConnectionFactory.class);
assertThat(binderConnectionFactory, is(connectionFactory));
CompositeHealthIndicator bindersHealthIndicator =
context.getBean("bindersHealthIndicator", CompositeHealthIndicator.class);
assertNotNull(bindersHealthIndicator);
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
@SuppressWarnings("unchecked")
Map<String,HealthIndicator> healthIndicators =
(Map<String, HealthIndicator>) directFieldAccessor.getPropertyValue("indicators");
assertThat(healthIndicators, hasKey("redis"));
assertThat(healthIndicators.get("redis").health().getStatus(), equalTo(Status.UP));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:23,代码来源:RedisBinderModuleTests.java
示例3: testAutoConfigureTopicsDisabledSucceedsIfTopicExisting
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting() throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(testTopicName, 5, 1);
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
Binding<MessageChannel> binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
binding.unbind();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:17,代码来源:KafkaBinderTests.java
示例4: testPartitionCountIncreasedIfAutoAddPartitionsSet
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
configurationProperties.setMinPartitionCount(6);
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
Binding<?> binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
binding.unbind();
assertThat(invokePartitionSize(testTopicName)).isEqualTo(6);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:17,代码来源:KafkaBinderTests.java
示例5: testAutoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanceEnabled
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanceEnabled() throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(testTopicName, 1, 1);
configurationProperties.setAutoAddPartitions(false);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
Binding binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
binding.unbind();
assertThat(invokePartitionSize(testTopicName)).isEqualTo(1);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:24,代码来源:KafkaBinderTests.java
示例6: testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled() throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(testTopicName, 1, 1);
configurationProperties.setAutoAddPartitions(false);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel output = createBindableChannel("output", createConsumerBindingProperties(consumerProperties));
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
expectedProvisioningException.expect(ProvisioningException.class);
expectedProvisioningException
.expectMessage("The number of expected partitions was: 3, but 1 has been found instead");
Binding binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
if (binding != null) {
binding.unbind();
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:27,代码来源:KafkaBinderTests.java
示例7: testPartitionCountNotReduced
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testPartitionCountNotReduced() throws Throwable {
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
invokeCreateTopic(testTopicName, 6, 1);
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
Binding<?> binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
binding.unbind();
assertThat(partitionSize(testTopicName)).isEqualTo(6);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:22,代码来源:KafkaBinderTests.java
示例8: testConsumerDefaultDeserializer
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testConsumerDefaultDeserializer() throws Throwable {
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(testTopicName, 5, 1);
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
assertTrue(consumerAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer);
assertTrue(consumerAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer);
}
finally {
if (binding != null) {
binding.unbind();
}
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:26,代码来源:KafkaBinderTests.java
示例9: contextLoads
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
public void contextLoads() {
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1", MessageChannel.class);
KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
KafkaBinderConfigurationProperties configuration1 =
(KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties");
Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString()));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2", MessageChannel.class);
KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
KafkaBinderConfigurationProperties configuration2 =
(KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties");
Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString()));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-samples,代码行数:19,代码来源:TwoKafkaBindersApplicationTest.java
示例10: testParentConnectionFactoryInheritedIfOverridden
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
public void testParentConnectionFactoryInheritedIfOverridden() {
context = new SpringApplicationBuilder(SimpleProcessor.class, ConnectionFactoryConfiguration.class)
.web(WebApplicationType.NONE)
.run("--server.port=0");
BinderFactory binderFactory = context.getBean(BinderFactory.class);
Binder<?, ?, ?> binder = binderFactory.getBinder(null, MessageChannel.class);
assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
.getPropertyValue("connectionFactory");
assertThat(binderConnectionFactory).isSameAs(MOCK_CONNECTION_FACTORY);
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
assertThat(binderConnectionFactory).isSameAs(connectionFactory);
CompositeHealthIndicator bindersHealthIndicator = context.getBean("bindersHealthIndicator",
CompositeHealthIndicator.class);
assertThat(bindersHealthIndicator).isNotNull();
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
@SuppressWarnings("unchecked")
Map<String, HealthIndicator> healthIndicators = (Map<String, HealthIndicator>) directFieldAccessor
.getPropertyValue("indicators");
assertThat(healthIndicators).containsKey("rabbit");
// mock connection factory behaves as if down
assertThat(healthIndicators.get("rabbit").health().getStatus()).isEqualTo(Status.DOWN);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-rabbit,代码行数:26,代码来源:RabbitBinderModuleTests.java
示例11: checkCompatiblePollableBinder
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
/**
* Return the generic type of PollableSource to determine if it is appropriate
* for the binder.
* e.g., with PollableMessageSource extends PollableSource<MessageHandler>
* and AbstractMessageChannelBinder
* implements PollableConsumerBinder<MessageHandler, C>
* We're checking that the the generic type (MessageHandler) matches.
*
* @param binderInstance the binder.
* @param bindingTargetType the binding target type.
* @return
*/
@SuppressWarnings("rawtypes")
public static boolean checkCompatiblePollableBinder(Binder binderInstance, Class<?> bindingTargetType) {
Class<?>[] binderInterfaces = ClassUtils.getAllInterfaces(binderInstance);
for (Class<?> intf : binderInterfaces) {
if (PollableConsumerBinder.class.isAssignableFrom(intf)) {
Class<?>[] targetInterfaces = ClassUtils.getAllInterfacesForClass(bindingTargetType);
Class<?> psType = findPollableSourceType(targetInterfaces);
if (psType != null) {
return getParameterType(binderInstance.getClass(), intf, 0)
.isAssignableFrom(psType);
}
}
}
return false;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:28,代码来源:GenericsUtils.java
示例12: doBindConsumer
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
public <T> Binding<T> doBindConsumer(T input, String inputName, Binder<T, ConsumerProperties, ?> binder,
ConsumerProperties consumerProperties, String target) {
if (this.taskScheduler == null || this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
return binder.bindConsumer(target,
this.bindingServiceProperties.getGroup(inputName), input,
consumerProperties);
}
else {
try {
return binder.bindConsumer(target,
this.bindingServiceProperties.getGroup(inputName), input,
consumerProperties);
}
catch (RuntimeException e) {
LateBinding<T> late = new LateBinding<T>();
rescheduleConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
return late;
}
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:21,代码来源:BindingService.java
示例13: rescheduleConsumerBinding
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
public <T> void rescheduleConsumerBinding(final T input, final String inputName,
final Binder<T, ConsumerProperties, ?> binder, final ConsumerProperties consumerProperties,
final String target, final LateBinding<T> late, RuntimeException exception) {
assertNotIllegalException(exception);
this.log.error("Failed to create consumer binding; retrying in " +
this.bindingServiceProperties.getBindingRetryInterval() + " seconds", exception);
this.scheduleTask(() -> {
try {
late.setDelegate(binder.bindConsumer(target,
this.bindingServiceProperties.getGroup(inputName), input, consumerProperties));
}
catch (RuntimeException e) {
rescheduleConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
}
});
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:17,代码来源:BindingService.java
示例14: doBindPollableConsumer
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> Binding<T> doBindPollableConsumer(T input, String inputName, Binder<T, ConsumerProperties, ?> binder,
ConsumerProperties consumerProperties, String target) {
if (this.taskScheduler == null || this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
this.bindingServiceProperties.getGroup(inputName), (PollableSource) input,
consumerProperties);
}
else {
try {
return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
this.bindingServiceProperties.getGroup(inputName), (PollableSource) input,
consumerProperties);
}
catch (RuntimeException e) {
LateBinding<T> late = new LateBinding<T>();
reschedulePollableConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
return late;
}
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:22,代码来源:BindingService.java
示例15: reschedulePollableConsumerBinding
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> void reschedulePollableConsumerBinding(final T input, final String inputName,
final Binder<T, ConsumerProperties, ?> binder, final ConsumerProperties consumerProperties,
final String target, final LateBinding<T> late, RuntimeException exception) {
assertNotIllegalException(exception);
this.log.error("Failed to create consumer binding; retrying in " +
this.bindingServiceProperties.getBindingRetryInterval() + " seconds", exception);
this.scheduleTask(() -> {
try {
late.setDelegate(((PollableConsumerBinder) binder).bindPollableConsumer(target,
this.bindingServiceProperties.getGroup(inputName), (PollableSource) input,
consumerProperties));
}
catch (RuntimeException e) {
reschedulePollableConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
}
});
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:19,代码来源:BindingService.java
示例16: bindProducer
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Binding<T> bindProducer(T output, String outputName) {
String bindingTarget = this.bindingServiceProperties
.getBindingDestination(outputName);
Binder<T, ?, ProducerProperties> binder = (Binder<T, ?, ProducerProperties>) getBinder(
outputName, output.getClass());
ProducerProperties producerProperties = this.bindingServiceProperties
.getProducerProperties(outputName);
if (binder instanceof ExtendedPropertiesBinder) {
Object extension = ((ExtendedPropertiesBinder) binder)
.getExtendedProducerProperties(outputName);
ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties<>(
extension);
BeanUtils.copyProperties(producerProperties, extendedProducerProperties);
producerProperties = extendedProducerProperties;
}
validate(producerProperties);
Binding<T> binding = doBindProducer(output, bindingTarget, binder, producerProperties);
this.producerBindings.put(outputName, binding);
return binding;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:22,代码来源:BindingService.java
示例17: doBindProducer
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
public <T> Binding<T> doBindProducer(T output, String bindingTarget, Binder<T, ?, ProducerProperties> binder,
ProducerProperties producerProperties) {
if (this.taskScheduler == null || this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
return binder.bindProducer(bindingTarget, output, producerProperties);
}
else {
try {
return binder.bindProducer(bindingTarget, output, producerProperties);
}
catch (RuntimeException e) {
LateBinding<T> late = new LateBinding<T>();
rescheduleProducerBinding(output, bindingTarget, binder, producerProperties, late, e);
return late;
}
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:17,代码来源:BindingService.java
示例18: testParentConnectionFactoryNotInheritedByCustomizedBinders
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
public void testParentConnectionFactoryNotInheritedByCustomizedBinders() {
List<String> params = new ArrayList<>();
params.add("--spring.cloud.stream.input.binder=custom");
params.add("--spring.cloud.stream.output.binder=custom");
params.add("--spring.cloud.stream.binders.custom.type=redis");
params.add("--spring.cloud.stream.binders.custom.environment.foo=bar");
params.add("--server.port=0");
context = SpringApplication.run(SimpleProcessor.class, params.toArray(new String[]{}));
BinderFactory<?> binderFactory = context.getBean(BinderFactory.class);
Binder binder = binderFactory.getBinder(null);
assertThat(binder, instanceOf(RedisMessageChannelBinder.class));
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
RedisConnectionFactory binderConnectionFactory =
(RedisConnectionFactory) binderFieldAccessor.getPropertyValue("connectionFactory");
RedisConnectionFactory connectionFactory = context.getBean(RedisConnectionFactory.class);
assertThat(binderConnectionFactory, not(is(connectionFactory)));
CompositeHealthIndicator bindersHealthIndicator =
context.getBean("bindersHealthIndicator", CompositeHealthIndicator.class);
assertNotNull(bindersHealthIndicator);
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
@SuppressWarnings("unchecked")
Map<String,HealthIndicator> healthIndicators =
(Map<String, HealthIndicator>) directFieldAccessor.getPropertyValue("indicators");
assertThat(healthIndicators, hasKey("custom"));
assertThat(healthIndicators.get("custom").health().getStatus(), equalTo(Status.UP));
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:28,代码来源:RedisBinderModuleTests.java
示例19: getBinder
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
private Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
KafkaTopicProvisioner provisioningProvider =
new KafkaTopicProvisioner(kafkaBinderConfigurationProperties, new KafkaProperties());
try {
provisioningProvider.afterPropertiesSet();
}
catch (Exception e) {
throw new RuntimeException(e);
}
return new KafkaTestBinder(kafkaBinderConfigurationProperties, provisioningProvider);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:12,代码来源:KafkaBinderTests.java
示例20: testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder
import org.springframework.cloud.stream.binder.Binder; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder() throws Exception {
Binder binder = getBinder();
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(createProducerProperties()));
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer(
"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", moduleOutputChannel,
createProducerProperties());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", "test", moduleInputChannel,
consumerProperties);
String testPayload1 = "foo" + UUID.randomUUID().toString();
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
testPayload1.getBytes()).build();
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message1);
Message<?> receivedMessage = receive(moduleInputChannel);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT)).isNull();
producerBinding.unbind();
consumerBinding.unbind();
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:35,代码来源:KafkaBinderTests.java
注:本文中的org.springframework.cloud.stream.binder.Binder类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论