Wick Technology Blog

Testing Micronaut Kafka

December 28, 2019

When testing Kafka in Micronaut you can use embedded Kafka or use Testcontainers.

Configuration

To use embedded Kafka set these properties in a src/test/resources/application-kafka.yaml file:

kafka:
  bootstrap:
    servers: localhost:${random.port}
  embedded:
    enabled: true
    topics:
    - topic1
    - topic2

When you set up a test with @MicronautTest(environments = "kafka") it will set the environment as both “kafka” and “test” which will enable all the properties needed to start the embedded Kafka.

It’s best to start the embedded Kafka on a random port to avoid clashing with another Kafka which may be running locally (such as in docker).

All the topics required by your application also need to be specified under the kafka.embedded.topics key so they’re created in the embedded Kafka.

Make sure these dependencies are also in your gradle.build (or pom.xml):

testImplementation 'org.apache.kafka:kafka-clients:2.3.1:test'
testImplementation 'org.apache.kafka:kafka_2.12:2.3.1'
testImplementation 'org.apache.kafka:kafka_2.12:2.3.1:test' 

Producing test messages

To set up a producer to send test messages to your application, create a file in src/test/java/ called TestProducer.java and create an interface inside with the needed annotations:

@KafkaClient
public interface TestProducer {
    @Topic("topic1")
    void sendTestMessage(String body);
}

Then use the test producer by injecting it into your test:

@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {

    @Inject
    public TestProducer topicClient;

    @Test
    public void listensToMessages() {
        // When
        topicClient.sendTestMessage("Hello");
        // Then assertions...
        // assertThat()...
    }
}

Consuming messages from the application under test

Set up a consumer in your test classes called TestListener with the following code:

@Singleton
@KafkaListener(groupId = "test", clientId = "test-consumer")
public class TestListener {

    private BlockingQueue<String> messages = new LinkedBlockingDeque<>();

    @Topic("topic1")
    public void eventOccurred(String body) {
        messages.add(body);
    }

    public BlockingQueue<String> getMessages() {
        return messages;
    }
}

This allows you to get the messages that were sent to the topic and then check the contents. It also copies the approach taken by Spring Cloud Stream of storing the messages in a BlockingQueue so you can easily wait for a message to arrive and then remove it from the data structure when you retrieve it.

Using it in your test requires you to inject it:

import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import technology.wick.blog.consumer.TestListener;
import technology.wick.blog.producer.TestProducer;

import javax.inject.Inject;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {

    @Inject
    public TestListener topicListener;
    @Inject
    public TestProducer topicClient;

    @BeforeEach
    void setUp() {
        // Given no messages exist
        topicListener.getMessages().clear();
    }

    @Test
    public void producesMessages() throws InterruptedException {
        // When
        topicClient.sendTestMessage("Hello");
        // Then
        String bodyOfMessage = topicListener.getMessages().poll(2, TimeUnit.SECONDS);
        assertThat(bodyOfMessage).isEqualTo("Hello");
    }
}

Conclusion

Micronaut’s approach to testing is simple but functional, there aren’t many test-only features - it requires you to use the same code you would in the real application, so it’s best to approach building a test like you would coding features in the full application.

A working example of using this code in tests can be found on Github.


Phil Hardwick

Written by Phil Hardwick