December 28, 2019
When testing Kafka in Micronaut you can use embedded Kafka or use Testcontainers.
To use embedded Kafka set these properties in a src/test/resources/application-kafka.yaml
file:
kafka:
bootstrap:
servers: localhost:${random.port}
embedded:
enabled: true
topics:
- topic1
- topic2
When you set up a test with @MicronautTest(environments = "kafka")
it will set the environment as both “kafka” and “test” which will enable all the properties needed to start the embedded Kafka.
It’s best to start the embedded Kafka on a random port to avoid clashing with another Kafka which may be running locally (such as in docker).
All the topics required by your application also need to be specified under the kafka.embedded.topics
key so they’re created in the embedded Kafka.
Make sure these dependencies are also in your gradle.build (or pom.xml):
testImplementation 'org.apache.kafka:kafka-clients:2.3.1:test'
testImplementation 'org.apache.kafka:kafka_2.12:2.3.1'
testImplementation 'org.apache.kafka:kafka_2.12:2.3.1:test'
To set up a producer to send test messages to your application, create a file in src/test/java/
called TestProducer.java
and create an interface inside with the needed annotations:
@KafkaClient
public interface TestProducer {
@Topic("topic1")
void sendTestMessage(String body);
}
Then use the test producer by injecting it into your test:
@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {
@Inject
public TestProducer topicClient;
@Test
public void listensToMessages() {
// When
topicClient.sendTestMessage("Hello");
// Then assertions...
// assertThat()...
}
}
Set up a consumer in your test classes called TestListener
with the following code:
@Singleton
@KafkaListener(groupId = "test", clientId = "test-consumer")
public class TestListener {
private BlockingQueue<String> messages = new LinkedBlockingDeque<>();
@Topic("topic1")
public void eventOccurred(String body) {
messages.add(body);
}
public BlockingQueue<String> getMessages() {
return messages;
}
}
This allows you to get the messages that were sent to the topic and then check the contents. It also copies the approach taken by Spring Cloud Stream of storing the messages in a BlockingQueue so you can easily wait for a message to arrive and then remove it from the data structure when you retrieve it.
Using it in your test requires you to inject it:
import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import technology.wick.blog.consumer.TestListener;
import technology.wick.blog.producer.TestProducer;
import javax.inject.Inject;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {
@Inject
public TestListener topicListener;
@Inject
public TestProducer topicClient;
@BeforeEach
void setUp() {
// Given no messages exist
topicListener.getMessages().clear();
}
@Test
public void producesMessages() throws InterruptedException {
// When
topicClient.sendTestMessage("Hello");
// Then
String bodyOfMessage = topicListener.getMessages().poll(2, TimeUnit.SECONDS);
assertThat(bodyOfMessage).isEqualTo("Hello");
}
}
Micronaut’s approach to testing is simple but functional, there aren’t many test-only features - it requires you to use the same code you would in the real application, so it’s best to approach building a test like you would coding features in the full application.
A working example of using this code in tests can be found on Github.
Written by Phil Hardwick