Embedded Kafka tests work for me with below configs,
Annotation on test class
@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = {
"listeners=PLAINTEXT://localhost:3333",
"port=3333"
})
public class KafkaConsumerTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
Before annotation for setup method
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
Note: I am not using @ClassRule
for creating embedded Kafka rather auto-wiring
@Autowired embeddedKafka
@Test
public void testReceive() throws Exception {
kafkaTemplate.send(topic, data);
}
Hope this helps!
Edit: Test configuration class marked with @TestConfiguration
@TestConfiguration
public class TestConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
Now @Test
method will autowire KafkaTemplate and use is to send message
kafkaTemplate.send(topic, data);
Updated answer code block with above line
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…