• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ConsumerProperties类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.springframework.cloud.stream.binder.ConsumerProperties的典型用法代码示例。如果您正苦于以下问题:Java ConsumerProperties类的具体用法?Java ConsumerProperties怎么用?Java ConsumerProperties使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ConsumerProperties类属于org.springframework.cloud.stream.binder包,在下文中一共展示了ConsumerProperties类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: run

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Override
public void run(ApplicationArguments args) throws Exception {
	logger.info("Consumer running with binder {}", binder);
	SubscribableChannel consumerChannel = new ExecutorSubscribableChannel();
	consumerChannel.subscribe(new MessageHandler() {
		@Override
		public void handleMessage(Message<?> message) throws MessagingException {
			messagePayload = (String) message.getPayload();
			logger.info("Received message: {}", messagePayload);
		}
	});
	String group = null;

	if (args.containsOption("group")) {
		group = args.getOptionValues("group").get(0);
	}

	binder.bindConsumer(ConsulBinderTests.BINDING_NAME, group, consumerChannel,
			new ConsumerProperties());
	isBound = true;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-consul,代码行数:22,代码来源:TestConsumer.java


示例2: createInboundAdapter

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
private MessageProducerSupport createInboundAdapter(ConsumerProperties accessor, String queueName) {
	MessageProducerSupport adapter;
	int concurrency = accessor.getConcurrency();
	concurrency = concurrency > 0 ? concurrency : 1;
	if (concurrency == 1) {
		RedisQueueMessageDrivenEndpoint single = new RedisQueueMessageDrivenEndpoint(queueName,
				this.connectionFactory);
		single.setBeanFactory(getBeanFactory());
		single.setSerializer(null);
		adapter = single;
	}
	else {
		adapter = new CompositeRedisQueueMessageDrivenEndpoint(queueName, concurrency);
	}
	return adapter;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:17,代码来源:RedisMessageChannelBinder.java


示例3: doRegisterConsumer

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
private Binding<MessageChannel> doRegisterConsumer(String bindingName, String group, String channelName, MessageChannel moduleInputChannel,
		MessageProducerSupport adapter, final ConsumerProperties properties) {
	DirectChannel bridgeToModuleChannel = new DirectChannel();
	bridgeToModuleChannel.setBeanFactory(this.getBeanFactory());
	bridgeToModuleChannel.setBeanName(channelName + ".bridge");
	MessageChannel bridgeInputChannel = addRetryIfNeeded(channelName, bridgeToModuleChannel, properties);
	adapter.setOutputChannel(bridgeInputChannel);
	adapter.setBeanName("inbound." + channelName);
	adapter.afterPropertiesSet();
	DefaultBinding<MessageChannel> consumerBinding = new DefaultBinding<MessageChannel>(bindingName, group, moduleInputChannel, adapter) {

		@Override
		protected void afterUnbind() {
			String key = RedisMessageChannelBinder.CONSUMER_GROUPS_KEY_PREFIX + getName();
			RedisMessageChannelBinder.this.redisOperations.boundZSetOps(key).incrementScore(getGroup(), -1);
		}
	};
	ReceivingHandler convertingBridge = new ReceivingHandler(properties);
	convertingBridge.setOutputChannel(moduleInputChannel);
	convertingBridge.setBeanName(channelName + ".bridge.handler");
	convertingBridge.afterPropertiesSet();
	bridgeToModuleChannel.subscribe(convertingBridge);
	this.redisOperations.boundZSetOps(CONSUMER_GROUPS_KEY_PREFIX + bindingName).incrementScore(group, 1);
	adapter.start();
	return consumerBinding;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:27,代码来源:RedisMessageChannelBinder.java


示例4: testRetryFail

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Test
public void testRetryFail() {
	RedisTestBinder binder = getBinder();
	DirectChannel channel = new DirectChannel();
	binder.bindProducer("retry.0", channel, createProducerProperties());
	ConsumerProperties consumerProperties = new ConsumerProperties();
	consumerProperties.setMaxAttempts(2);
	consumerProperties.setBackOffInitialInterval(100);
	consumerProperties.setBackOffMultiplier(1.0);
	Binding<MessageChannel> consumerBinding = binder.bindConsumer("retry.0", "test", new DirectChannel(), consumerProperties); // no subscriber
	channel.send(new GenericMessage<>("foo"));
	RedisTemplate<String, Object> template = createTemplate();
	Object rightPop = template.boundListOps("ERRORS:retry.0.test").rightPop(5, TimeUnit.SECONDS);
	assertNotNull(rightPop);
	assertThat(new String((byte[]) rightPop), containsString("foo"));
	consumerBinding.unbind();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:18,代码来源:RedisBinderTests.java


示例5: testSendAndReceive

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Test
@Override
public void testSendAndReceive() throws Exception {
	RedisTestBinder binder = getBinder();
	DirectChannel moduleOutputChannel = new DirectChannel();
	QueueChannel moduleInputChannel = new QueueChannel();
	ProducerProperties producerProperties = createProducerProperties();
	producerProperties.setHeaderMode(HeaderMode.raw);
	Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel, producerProperties);
	ConsumerProperties consumerProperties = createConsumerProperties();
	consumerProperties.setHeaderMode(HeaderMode.raw);
	Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel, consumerProperties);
	Message<?> message = MessageBuilder.withPayload("foo".getBytes()).build();
	// Let the consumer actually bind to the producer before sending a msg
	binderBindUnbindLatency();
	moduleOutputChannel.send(message);
	Message<?> inbound = receive(moduleInputChannel);
	assertNotNull(inbound);
	assertEquals("foo", new String((byte[])inbound.getPayload()));
	producerBinding.unbind();
	consumerBinding.unbind();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:23,代码来源:RawModeRedisBinderTests.java


示例6: doBindConsumer

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
public <T> Binding<T> doBindConsumer(T input, String inputName, Binder<T, ConsumerProperties, ?> binder,
		ConsumerProperties consumerProperties, String target) {
	if (this.taskScheduler == null || this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
		return binder.bindConsumer(target,
				this.bindingServiceProperties.getGroup(inputName), input,
				consumerProperties);
	}
	else {
		try {
			return binder.bindConsumer(target,
					this.bindingServiceProperties.getGroup(inputName), input,
					consumerProperties);
		}
		catch (RuntimeException e) {
			LateBinding<T> late = new LateBinding<T>();
			rescheduleConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
			return late;
		}
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:21,代码来源:BindingService.java


示例7: rescheduleConsumerBinding

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
public <T> void rescheduleConsumerBinding(final T input, final String inputName,
		final Binder<T, ConsumerProperties, ?> binder, final ConsumerProperties consumerProperties,
		final String target, final LateBinding<T> late, RuntimeException exception) {
	assertNotIllegalException(exception);
	this.log.error("Failed to create consumer binding; retrying in " +
		this.bindingServiceProperties.getBindingRetryInterval() + " seconds", exception);
	this.scheduleTask(() -> {
		try {
			late.setDelegate(binder.bindConsumer(target,
					this.bindingServiceProperties.getGroup(inputName), input, consumerProperties));
		}
		catch (RuntimeException e) {
			rescheduleConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
		}
	});
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:17,代码来源:BindingService.java


示例8: doBindPollableConsumer

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> Binding<T> doBindPollableConsumer(T input, String inputName, Binder<T, ConsumerProperties, ?> binder,
		ConsumerProperties consumerProperties, String target) {
	if (this.taskScheduler == null || this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
		return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
				this.bindingServiceProperties.getGroup(inputName), (PollableSource) input,
				consumerProperties);
	}
	else {
		try {
			return ((PollableConsumerBinder) binder).bindPollableConsumer(target,
					this.bindingServiceProperties.getGroup(inputName), (PollableSource) input,
					consumerProperties);
		}
		catch (RuntimeException e) {
			LateBinding<T> late = new LateBinding<T>();
			reschedulePollableConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
			return late;
		}
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:22,代码来源:BindingService.java


示例9: reschedulePollableConsumerBinding

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> void reschedulePollableConsumerBinding(final T input, final String inputName,
		final Binder<T, ConsumerProperties, ?> binder, final ConsumerProperties consumerProperties,
		final String target, final LateBinding<T> late, RuntimeException exception) {
	assertNotIllegalException(exception);
	this.log.error("Failed to create consumer binding; retrying in " +
		this.bindingServiceProperties.getBindingRetryInterval() + " seconds", exception);
	this.scheduleTask(() -> {
		try {
			late.setDelegate(((PollableConsumerBinder) binder).bindPollableConsumer(target,
					this.bindingServiceProperties.getGroup(inputName), (PollableSource) input,
					consumerProperties));
		}
		catch (RuntimeException e) {
			reschedulePollableConsumerBinding(input, inputName, binder, consumerProperties, target, late, e);
		}
	});
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:19,代码来源:BindingService.java


示例10: configureMessageChannel

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
/**
 * Setup data-type and message converters for the given message channel.
 *
 * @param channel message channel to set the data-type and message converters
 * @param channelName the channel name
 * @param inbound inbound (i.e., "input") or outbound channel
 */
private void configureMessageChannel(MessageChannel channel, String channelName, boolean inbound) {
	Assert.isAssignable(AbstractMessageChannel.class, channel.getClass());
	AbstractMessageChannel messageChannel = (AbstractMessageChannel) channel;
	BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(channelName);
	String contentType = bindingProperties.getContentType();
	ProducerProperties producerProperties = bindingProperties.getProducer();
	if (!inbound && producerProperties != null && producerProperties.isPartitioned()) {
		messageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties,
				getPartitionKeyExtractorStrategy(producerProperties),
				getPartitionSelectorStrategy(producerProperties)));
	}

	ConsumerProperties consumerProperties = bindingProperties.getConsumer();
	if (this.isNativeEncodingNotSet(producerProperties, consumerProperties, inbound)) {
		if (inbound) {
			messageChannel.addInterceptor(new InboundContentTypeConvertingInterceptor(contentType, this.compositeMessageConverterFactory));
		}
		else {
			messageChannel.addInterceptor(new OutboundContentTypeConvertingInterceptor(contentType, this.compositeMessageConverterFactory
					.getMessageConverterForAllRegistered()));
		}
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:31,代码来源:MessageConverterConfigurer.java


示例11: testConsumerPropertiesValidation

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Test
public void testConsumerPropertiesValidation() {
	BindingServiceProperties serviceProperties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	ConsumerProperties consumerProperties = new ConsumerProperties();
	consumerProperties.setConcurrency(0);
	props.setDestination("foo");
	props.setConsumer(consumerProperties);
	final String inputChannelName = "input";
	bindingProperties.put(inputChannelName, props);
	serviceProperties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	BindingService service = new BindingService(serviceProperties,
			binderFactory);
	MessageChannel inputChannel = new DirectChannel();
	try {
		service.bindConsumer(inputChannel, inputChannelName);
		fail("Consumer properties should be validated.");
	}
	catch (IllegalStateException e) {
		assertThat(e).hasMessageContaining("Concurrency should be greater than zero.");
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:25,代码来源:BindingServiceTests.java


示例12: createConsumerEndpoint

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ConsumerProperties properties)
		throws Exception {
	ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
	SubscribableChannel siBinderInputChannel = ((SpringIntegrationConsumerDestination)destination).getChannel();

	IntegrationMessageListeningContainer messageListenerContainer = new IntegrationMessageListeningContainer();
	IntegrationBinderInboundChannelAdapter adapter = new IntegrationBinderInboundChannelAdapter(messageListenerContainer);

	String groupName = StringUtils.hasText(group) ? group : "anonymous";
	ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, groupName, properties);
	if (properties.getMaxAttempts() > 1) {
		adapter.setRetryTemplate(buildRetryTemplate(properties));
		adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
	}
	else {
		adapter.setErrorMessageStrategy(errorMessageStrategy);
		adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
	}

	siBinderInputChannel.subscribe(messageListenerContainer);

	return adapter;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:25,代码来源:SpringIntegrationChannelBinder.java


示例13: doBindConsumer

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Override
protected Binding<MessageChannel> doBindConsumer(final String name, String group, MessageChannel moduleInputChannel, ConsumerProperties properties) {
	if (!StringUtils.hasText(group)) {
		group = "anonymous." + UUID.randomUUID().toString();
	}
	String queueName = groupedName(name, group);
	if (properties.isPartitioned()) {
		queueName += "-" + properties.getInstanceIndex();
	}
	MessageProducerSupport adapter = createInboundAdapter(properties, queueName);
	return doRegisterConsumer(name, group, queueName, moduleInputChannel, adapter, properties);
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:13,代码来源:RedisMessageChannelBinder.java


示例14: testConsumerProperties

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Test
public void testConsumerProperties() throws Exception {
	RedisTestBinder binder = getBinder();
	ConsumerProperties properties1 = new ConsumerProperties();
	properties1.setMaxAttempts(1);
	Binding<MessageChannel> binding = binder.bindConsumer("props.0", "test", new DirectChannel(), properties1);
	AbstractEndpoint endpoint = extractEndpoint(binding);
	assertThat(endpoint, instanceOf(RedisQueueMessageDrivenEndpoint.class));
	assertSame(DirectChannel.class, TestUtils.getPropertyValue(endpoint, "outputChannel").getClass());
	binding.unbind();
	assertFalse(endpoint.isRunning());

	ConsumerProperties properties2 = new ConsumerProperties();
	properties2.setBackOffInitialInterval(2000);
	properties2.setBackOffMaxInterval(20000);
	properties2.setBackOffMultiplier(5.0);
	properties2.setConcurrency(2);
	properties2.setMaxAttempts(23);
	properties2.setInstanceIndex(0);

	binding = binder.bindConsumer("props.0", "test", new DirectChannel(), properties2);
	endpoint = extractEndpoint(binding);
	verifyConsumer(endpoint);

	binding.unbind();
	assertFalse(endpoint.isRunning());
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-redis,代码行数:28,代码来源:RedisBinderTests.java


示例15: bindConsumer

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Collection<Binding<T>> bindConsumer(T input, String inputName) {
	String bindingTarget = this.bindingServiceProperties
			.getBindingDestination(inputName);
	String[] bindingTargets = StringUtils
			.commaDelimitedListToStringArray(bindingTarget);
	Collection<Binding<T>> bindings = new ArrayList<>();
	Binder<T, ConsumerProperties, ?> binder = (Binder<T, ConsumerProperties, ?>) getBinder(
			inputName, input.getClass());
	ConsumerProperties consumerProperties = this.bindingServiceProperties
			.getConsumerProperties(inputName);
	if (binder instanceof ExtendedPropertiesBinder) {
		Object extension = ((ExtendedPropertiesBinder) binder)
				.getExtendedConsumerProperties(inputName);
		ExtendedConsumerProperties extendedConsumerProperties = new ExtendedConsumerProperties(
				extension);
		BeanUtils.copyProperties(consumerProperties, extendedConsumerProperties);
		consumerProperties = extendedConsumerProperties;
	}
	validate(consumerProperties);
	for (String target : bindingTargets) {
		Binding<T> binding;
		if (input instanceof PollableSource) {
			binding = doBindPollableConsumer(input, inputName, binder, consumerProperties, target);
		}
		else {
			binding = doBindConsumer(input, inputName, binder, consumerProperties, target);
		}
		bindings.add(binding);
	}
	bindings = Collections.unmodifiableCollection(bindings);
	this.consumerBindings.put(inputName, new ArrayList<Binding<?>>(bindings));
	return bindings;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:35,代码来源:BindingService.java


示例16: configurePolledMessageSource

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Override
public void configurePolledMessageSource(PollableMessageSource binding, String name) {
	BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(name);
	String contentType = bindingProperties.getContentType();
	ConsumerProperties consumerProperties = bindingProperties.getConsumer();
	if ((consumerProperties == null || !consumerProperties.isUseNativeDecoding())
			&& binding instanceof DefaultPollableMessageSource) {
		((DefaultPollableMessageSource) binding).addInterceptor(
				new InboundContentTypeConvertingInterceptor(contentType, this.compositeMessageConverterFactory));
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:12,代码来源:MessageConverterConfigurer.java


示例17: isNativeEncodingNotSet

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
private boolean isNativeEncodingNotSet(ProducerProperties producerProperties, ConsumerProperties consumerProperties, boolean input) {
	if (input) {
		return consumerProperties == null || !consumerProperties.isUseNativeDecoding();
	}
	else {
		return producerProperties == null || !producerProperties.isUseNativeEncoding();
	}
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:9,代码来源:MessageConverterConfigurer.java


示例18: testBindingPartitionedConsumer

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testBindingPartitionedConsumer() {
	Binder binder = this.binderFactory.getBinder(null, MessageChannel.class);
	ArgumentCaptor<ConsumerProperties> argumentCaptor = ArgumentCaptor.forClass(ConsumerProperties.class);
	verify(binder).bindConsumer(eq("partIn"), isNull(), eq(this.testSink.input()),
			argumentCaptor.capture());
	Assert.assertThat(argumentCaptor.getValue().getInstanceIndex(), equalTo(0));
	Assert.assertThat(argumentCaptor.getValue().getInstanceCount(), equalTo(2));
	verifyNoMoreInteractions(binder);
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:12,代码来源:PartitionedConsumerTest.java


示例19: testDefaultGroup

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDefaultGroup() throws Exception {
	BindingServiceProperties properties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	props.setDestination("foo");
	final String inputChannelName = "input";
	bindingProperties.put(inputChannelName, props);
	properties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
	BindingService service = new BindingService(properties, binderFactory);
	MessageChannel inputChannel = new DirectChannel();
	Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
	when(binder.bindConsumer(eq("foo"), isNull(), same(inputChannel),
			any(ConsumerProperties.class))).thenReturn(mockBinding);
	Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
			inputChannelName);
	assertThat(bindings).hasSize(1);
	Binding<MessageChannel> binding = bindings.iterator().next();
	assertThat(binding).isSameAs(mockBinding);
	service.unbindConsumers(inputChannelName);
	verify(binder).bindConsumer(eq("foo"), isNull(), same(inputChannel),
			any(ConsumerProperties.class));
	verify(binding).unbind();
	binderFactory.destroy();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:29,代码来源:BindingServiceTests.java


示例20: testExplicitGroup

import org.springframework.cloud.stream.binder.ConsumerProperties; //导入依赖的package包/类
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testExplicitGroup() throws Exception {
	BindingServiceProperties properties = new BindingServiceProperties();
	Map<String, BindingProperties> bindingProperties = new HashMap<>();
	BindingProperties props = new BindingProperties();
	props.setDestination("foo");
	props.setGroup("fooGroup");
	final String inputChannelName = "input";
	bindingProperties.put(inputChannelName, props);
	properties.setBindings(bindingProperties);
	DefaultBinderFactory binderFactory = createMockBinderFactory();
	Binder binder = binderFactory.getBinder("mock", MessageChannel.class);
	BindingService service = new BindingService(properties,
			binderFactory);
	MessageChannel inputChannel = new DirectChannel();
	Binding<MessageChannel> mockBinding = Mockito.mock(Binding.class);
	when(binder.bindConsumer(eq("foo"), eq("fooGroup"), same(inputChannel),
			any(ConsumerProperties.class))).thenReturn(mockBinding);
	Collection<Binding<MessageChannel>> bindings = service.bindConsumer(inputChannel,
			inputChannelName);
	assertThat(bindings).hasSize(1);
	Binding<MessageChannel> binding = bindings.iterator().next();
	assertThat(binding).isSameAs(mockBinding);

	service.unbindConsumers(inputChannelName);
	verify(binder).bindConsumer(eq("foo"), eq(props.getGroup()), same(inputChannel),
			any(ConsumerProperties.class));
	verify(binding).unbind();
	binderFactory.destroy();
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream,代码行数:32,代码来源:BindingServiceTests.java



注:本文中的org.springframework.cloud.stream.binder.ConsumerProperties类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java MutableIssue类代码示例发布时间:2022-05-22
下一篇:
Java DefaultBitmapLoadCallBack类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap