• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ContainerProperties类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.springframework.kafka.listener.config.ContainerProperties的典型用法代码示例。如果您正苦于以下问题:Java ContainerProperties类的具体用法?Java ContainerProperties怎么用?Java ContainerProperties使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ContainerProperties类属于org.springframework.kafka.listener.config包,在下文中一共展示了ContainerProperties类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createSystemConsumer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
private void createSystemConsumer(String name, MessageListener<String, String> consumeEvent) {
    log.info("Creating kafka consumer for topic {}", name);
    ContainerProperties containerProps = new ContainerProperties(name);

    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    if (name.equals(applicationProperties.getKafkaSystemTopic())) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    }
    ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

    ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(factory, containerProps);
    container.setupMessageListener(consumeEvent);
    container.start();
    log.info("Successfully created kafka consumer for topic {}", name);
}
 
开发者ID:xm-online,项目名称:xm-uaa,代码行数:17,代码来源:ApplicationStartup.java


示例2: createKafkaConsumer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
/**
 * Create topic consumer.
 * @param tenant the kafka topic
 */
public void createKafkaConsumer(String tenant) {
    StopWatch stopWatch = StopWatch.createStarted();
    try {
        log.info("START - SETUP:CreateTenant:kafka consumer tenantKey: {}", tenant);
        ConcurrentMessageListenerContainer<String, String> container = consumers.get(tenant);
        if (container != null) {
            if (!container.isRunning()) {
                container.start();
            }
        } else {
            ContainerProperties containerProps = new ContainerProperties(tenant);
            container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
            container.setupMessageListener((MessageListener<String, String>) consumer::consumeEvent);
            container.setBeanName(tenant);
            container.start();
            consumers.put(tenant, container);
        }
        log.info("STOP  - SETUP:CreateTenant:kafka consumer tenantKey: {}, result: OK, time = {} ms",
            tenant, stopWatch.getTime());
    } catch (Exception e) {
        log.error("STOP  - SETUP:CreateTenant:kafka consumer tenantKey: {}, result: FAIL, error: {}, time = {} ms",
            tenant, e.getMessage(), stopWatch.getTime(), e);
    }
}
 
开发者ID:xm-online,项目名称:xm-ms-timeline,代码行数:29,代码来源:KafkaService.java


示例3: messageListenerContainer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Bean
public MessageListenerContainer messageListenerContainer(
        ConsumerFactory<String, DefaultAvMessage> consumerFactory,
        MessageListener<String, AvMessage> messageListener,
        ThreadPoolTaskScheduler kafkaClientThreadPoolTaskScheduler
) {
    ContainerProperties props = new ContainerProperties(resultTopic);
    // shouldn't be necessary but the default scheduler is not destroyed after shutdown
    props.setScheduler(kafkaClientThreadPoolTaskScheduler);

    MessageListenerContainer container = new ConcurrentMessageListenerContainer<>(
            consumerFactory,
            props
    );
    container.setupMessageListener(messageListener);

    return container;
}
 
开发者ID:dvoraka,项目名称:av-service,代码行数:19,代码来源:KafkaFileClientConfig.java


示例4: testAutoCommit

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Test
public void testAutoCommit() throws Exception {
    LOG.info("Start testAutoCommit");
    ContainerProperties containerProps = new ContainerProperties("topic3", "topic4");
    final CountDownLatch latch = new CountDownLatch(4);
    containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
        LOG.info("received: " + message);
        latch.countDown();
    });
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps,
            IntegerDeserializer.class, StringDeserializer.class);
    container.setBeanName("testAutoCommit");
    container.start();
    Thread.sleep(5000); // wait a bit for the container to start
    KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class);
    template.setDefaultTopic("topic3");
    template.sendDefault(0, "foo");
    template.sendDefault(2, "bar");
    template.sendDefault(0, "baz");
    template.sendDefault(2, "qux");
    template.flush();
    assertTrue(latch.await(60, TimeUnit.SECONDS));
    container.stop();
    LOG.info("Stop testAutoCommit");
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:26,代码来源:SimpleKafkaIT.java


示例5: fileServerMessageListenerContainer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Bean
public MessageListenerContainer fileServerMessageListenerContainer(
        ConsumerFactory<String, DefaultAvMessage> consumerFactory,
        MessageListener<String, AvMessage> fileServerMessageListener,
        ThreadPoolTaskScheduler kafkaServerThreadPoolTaskScheduler
) {
    ContainerProperties props = new ContainerProperties(fileTopic);
    // shouldn't be necessary but the default scheduler is not destroyed after shutdown
    props.setScheduler(kafkaServerThreadPoolTaskScheduler);

    MessageListenerContainer container = new ConcurrentMessageListenerContainer<>(
            consumerFactory,
            props
    );
    container.setupMessageListener(fileServerMessageListener);

    return container;
}
 
开发者ID:dvoraka,项目名称:av-service,代码行数:19,代码来源:KafkaServerConfig.java


示例6: createConsumer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
private void createConsumer(String name) {
    log.info("Creating kafka consumer for tenant {}", name);
    ContainerProperties containerProps = new ContainerProperties(name);
    ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
    container.setupMessageListener((MessageListener<String, String>) timelineConsumer::consumeEvent);
    container.start();
    log.info("Successfully created kafka consumer for tenant {}", name);
}
 
开发者ID:xm-online,项目名称:xm-ms-timeline,代码行数:10,代码来源:ApplicationStartup.java


示例7: createCommandConsumer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
private void createCommandConsumer(String name) {
    log.info("Creating kafka command consumer for topic {}", name);
    ContainerProperties containerProps = new ContainerProperties(name);

    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

    ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(factory, containerProps);
    container.setupMessageListener((MessageListener<String, String>) commandConsumer::consumeEvent);
    container.start();
    log.info("Successfully created kafka command consumer for topic {}", name);
}
 
开发者ID:xm-online,项目名称:xm-ms-timeline,代码行数:15,代码来源:ApplicationStartup.java


示例8: setup

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Before
public void setup() throws Exception {
	Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
	DefaultKafkaConsumerFactory<String, String> cf =
		new DefaultKafkaConsumerFactory<>(consumerProps);
	ContainerProperties containerProperties = new ContainerProperties(TEST_TOPIC);
	container = new KafkaMessageListenerContainer<>(cf, containerProperties);
	final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
	container.setupMessageListener((MessageListener<String, String>) record -> {
           log.error("Message received: " + record);
           records.add(record);
       });
	container.start();
	ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
	Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
	ProducerFactory<String, String> pf =
		new DefaultKafkaProducerFactory<>(senderProps);
	template = new KafkaTemplate<>(pf);
	template.setDefaultTopic(TEST_TOPIC);
}
 
开发者ID:underscorenico,项目名称:skeleton-oms-java,代码行数:21,代码来源:HelloProcessTest.java


示例9: createContainerForDto

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
private KafkaMessageListenerContainer<Integer, IndexDTO> createContainerForDto(
        ContainerProperties containerProps, Object keyDeser, Object valDeser) {
    Map<String, Object> props = consumerProps(keyDeser, valDeser);
    DefaultKafkaConsumerFactory<Integer, IndexDTO> cf =
            new DefaultKafkaConsumerFactory<>(props);
    KafkaMessageListenerContainer<Integer, IndexDTO> container =
            new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:10,代码来源:SimpleKafkaIT.java


示例10: createContainer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
private KafkaMessageListenerContainer<Integer, String> createContainer(
        ContainerProperties containerProps, Object keyDeser, Object valDeser) {
    Map<String, Object> props = consumerProps(keyDeser, valDeser);
    DefaultKafkaConsumerFactory<Integer, String> cf =
            new DefaultKafkaConsumerFactory<>(props);
    KafkaMessageListenerContainer<Integer, String> container =
            new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:10,代码来源:SimpleKafkaIT.java


示例11: KafkaMessageConsumer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
private KafkaMessageConsumer(String bootstrapServers,
                             String[] topics,
                             MessageTypeParameter messageType,
                             OffsetResetParameter offsetReset) {
    final Map<String, Object> consumerConfig =
            consumerConfig(bootstrapServers, UUID.randomUUID().toString(), messageType, offsetReset);
    final ConsumerFactory<String, String> consumerFactory = createConsumerFactory(consumerConfig);

    final ContainerProperties containerProperties = new ContainerProperties(topics);
    listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

    messageListener = createMessageListener(messageType);
    listenerContainer.setupMessageListener(messageListener);
}
 
开发者ID:rkluszczynski,项目名称:avro-cli,代码行数:15,代码来源:KafkaMessageConsumer.java


示例12: container

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Bean
public KafkaMessageListenerContainer<String, String> container(
		ConsumerFactory<String, String> consumerFactory,
		ConfigProperties config) {
	ContainerProperties containerProperties = new ContainerProperties(config.getTopic());
	containerProperties.setMessageListener(listener());
	containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
	return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
 
开发者ID:SpringOnePlatform2016,项目名称:grussell-spring-kafka,代码行数:10,代码来源:S1pKafkaApplication.java


示例13: container

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Bean
public KafkaMessageListenerContainer<String, String> container(
		ConsumerFactory<String, String> consumerFactory,
		ConfigProperties config) {
	ContainerProperties containerProperties = new ContainerProperties(config.getTopic());
	containerProperties.setMessageListener(listener());
	return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
 
开发者ID:SpringOnePlatform2016,项目名称:grussell-spring-kafka,代码行数:9,代码来源:S1pKafkaApplication.java


示例14: testReceive

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Test
public void testReceive() throws Exception {
  String bootstrapServers = embeddedKafka.getBrokersAsString();
  LOGGER.info("bootstrapServers='{}'", bootstrapServers);

  ContainerProperties containerProperties =
      new ContainerProperties(JAVA_TOPIC);
  Receiver receiver = new Receiver();

  KafkaMessageListenerContainer<String, String> messageListenerContainer =
      ReceiverConfig.createMessageListenerContainer(
          containerProperties, bootstrapServers);
  messageListenerContainer.setupMessageListener(receiver);
  messageListenerContainer.start();

  // wait a bit for the container to start
  Thread.sleep(1000);

  KafkaTemplate<String, String> kafkaTemplate =
      SenderConfig.createKafkaTemplate(bootstrapServers);
  kafkaTemplate.send(JAVA_TOPIC, "Hello Java!");

  receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
  assertThat(receiver.getLatch().getCount()).isEqualTo(0);

  messageListenerContainer.stop();
}
 
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:28,代码来源:SpringKafkaMainTest.java


示例15: setUp

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
  // set up the Kafka consumer properties
  Map<String, Object> consumerProperties =
      KafkaTestUtils.consumerProps("sender", "false", embeddedKafka);

  // create a Kafka consumer factory
  DefaultKafkaConsumerFactory<String, String> consumerFactory =
      new DefaultKafkaConsumerFactory<String, String>(consumerProperties);

  // set the topic that needs to be consumed
  ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

  // create a Kafka MessageListenerContainer
  container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

  // create a thread safe queue to store the received message
  records = new LinkedBlockingQueue<>();

  // setup a Kafka message listener
  container.setupMessageListener(new MessageListener<String, String>() {
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
      LOGGER.debug("test-listener received message='{}'", record.toString());
      records.add(record);
    }
  });

  // start the container and underlying message listener
  container.start();

  // wait until the container has the required number of assigned partitions
  ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
 
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:35,代码来源:SpringKafkaSenderTest.java


示例16: kafkaListenerContainer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
  ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);

  return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(
      consumerFactory(), containerProps);
}
 
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:9,代码来源:ConsumingChannelConfig.java


示例17: testSendIndexDTO

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Test
public void testSendIndexDTO() throws Exception {
    LOG.info("Start testSendIndexDTO");
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");

    List<IndexDTO> dtos = prepareIndexableDtos(rdfHandler, "/data/discos/rmd18mddcw", null)
            .collect(Collectors.toList());

    Queue<IndexDTO> expectedDtos = new ArrayDeque<>(dtos);
    Queue<ExpectedActualDTOPair> receivedDtos = new ArrayDeque<>();

    final CountDownLatch latch = new CountDownLatch(3);

    containerProps.setMessageListener((MessageListener<Integer, IndexDTO>) message -> {
        LOG.info("received: " + message);
        IndexDTO expected = expectedDtos.remove();
        LOG.debug("expected: " + expected);
        IndexDTO actual = message.value();
        LOG.debug("actual: " + actual);
        receivedDtos.add(new ExpectedActualDTOPair(expected, actual));
        LOG.debug("Decrementing latch.");
        latch.countDown();
    });

    KafkaMessageListenerContainer<Integer, IndexDTO> container = createContainerForDto(containerProps, IntegerDeserializer.class, GenericJvmObjectDeserializer.class);
    container.setBeanName("testSendIndexDTO");
    container.start();
    Thread.sleep(5000); // wait a bit for the container to start
    KafkaTemplate<Integer, IndexDTO> template = createTemplate(IntegerSerializer.class, GenericJvmObjectSerializer.class);
    template.setDefaultTopic("topic1");

    prepareIndexableDtos(rdfHandler, "/data/discos/rmd18mddcw", null)
            .peek(dto -> LOG.debug("Prepared DTO {}", dto))
            .forEach(template::sendDefault);

    // do anything with the completablefuture returned by the template?

    template.flush();

    assertTrue(latch.await(120, TimeUnit.SECONDS));
    container.stop();

    LOG.info("Stop testSendIndexDTO");

    receivedDtos.forEach(pair -> assertEquals(pair.expected, pair.actual));
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:47,代码来源:SimpleKafkaIT.java


示例18: createMessageListenerContainer

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
public static KafkaMessageListenerContainer<String, String> createMessageListenerContainer(
    ContainerProperties containerProperties,
    String bootstrapServers) {
  return new KafkaMessageListenerContainer<>(
      createConsumerFactory(bootstrapServers), containerProperties);
}
 
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:7,代码来源:ReceiverConfig.java


示例19: setUp

import org.springframework.kafka.listener.config.ContainerProperties; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
  // set up the Kafka consumer properties
  Map<String, Object> consumerProperties =
      KafkaTestUtils.consumerProps("sender_group", "false", AllSpringKafkaTests.embeddedKafka);

  // create a Kafka consumer factory
  DefaultKafkaConsumerFactory<String, String> consumerFactory =
      new DefaultKafkaConsumerFactory<String, String>(consumerProperties);

  // set the topic that needs to be consumed
  ContainerProperties containerProperties =
      new ContainerProperties(AllSpringKafkaTests.SENDER_TOPIC);

  // create a Kafka MessageListenerContainer
  container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

  // create a thread safe queue to store the received message
  records = new LinkedBlockingQueue<>();

  // setup a Kafka message listener
  container.setupMessageListener(new MessageListener<String, String>() {
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
      LOGGER.debug("test-listener received message='{}'", record.toString());
      records.add(record);
    }
  });

  // start the container and underlying message listener
  container.start();
  // wait until the container has the required number of assigned partitions
  ContainerTestUtils.waitForAssignment(container,
      AllSpringKafkaTests.embeddedKafka.getPartitionsPerTopic());
}
 
开发者ID:code-not-found,项目名称:spring-kafka,代码行数:36,代码来源:SpringKafkaSenderTest.java



注:本文中的org.springframework.kafka.listener.config.ContainerProperties类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java TcpPeerServer类代码示例发布时间:2022-05-22
下一篇:
Java DexBackedReference类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap