作者:陈玲琳2013 | 来源:互联网 | 2022-12-02 13:50
试图弄清楚我是否可以使用spring-kafka和spring-kafka-test为@KafkaListener编写单元测试。
我的侦听器类。
public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;
@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", cOntainerFactory= "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
myMessageProcessor.process(message);
log.info("MyMessage processed");
}}
我的测试班:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitiOns= 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {
@Autowired
private MyMessageProcessor myMessageProcessor;
@Value("${kafka.topic.01}")
private String TOPIC_01;
@Autowired
private KafkaTemplate messageProducer;
@Test
public void testSalesforceMessageListner() {
MyMessageConsumers myMessageCOnsumers= new MyMessageConsumers(mockService);
messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}
我的测试配置类:
@Configuration
@EnableKafka
public class TestKafkaConfig {
@Bean
public MyMessageProcessor myMessageProcessor() {
return mock(MyMessageProcessor.class);
}
@Bean
public KafkaEmbedded kafkaEmbedded() {
return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
}
//Consumer
@Bean
public ConsumerFactory myMessageConsumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory myMessageListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myMessageConsumerFactory());
return factory;
}
//Producer
@Bean
public ProducerFactory producerFactory() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate messageProducer() {
return new KafkaTemplate<>(producerFactory());
}
}
有什么简单的方法可以使这项工作吗?
还是应该以其他方式进行@KafkaListener的测试?在单元测试中,如何确保在Kafka中收到新消息时调用@KafkaListener。
1> Gary Russell..:
您可以将监听器包装在测试用例中。
给定
@SpringBootApplication
public class So52783066Application {
public static void main(String[] args) {
SpringApplication.run(So52783066Application.class, args);
}
@KafkaListener(id = "so52783066", topics = "so52783066")
public void listen(String in) {
System.out.println(in);
}
}
然后
@RunWith(SpringRunner.class)
@SpringBootTest
public class So52783066ApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate template;
@Before
public void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@Test
public void test() throws Exception {
ConcurrentMessageListenerContainer, ?> cOntainer= (ConcurrentMessageListenerContainer, ?>) registry
.getListenerContainer("so52783066");
container.stop();
@SuppressWarnings("unchecked")
AcknowledgingConsumerAwareMessageListener messageListener = (AcknowledgingConsumerAwareMessageListener) container
.getContainerProperties().getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener(new AcknowledgingConsumerAwareMessageListener() {
@Override
public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment,
Consumer, ?> consumer) {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
}
});
container.start();
template.send("so52783066", "foo");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}