Wick Technology Blog

Kafka Avro message format in Micronaut Tests

December 29, 2019

Update - 18th June 2020 As of Confluent version 5.4.0 it’s best to use Confluent’s built in mocking system. Simply define your schema registry url with a mock scheme e.g. mock://test-schema-registry and the serialisers/deserialisers will use a mocked schema registry client.

Here’s a guide to using Avro message formats when testing a Micronaut service. Most of this code is not specific to Micronaut but about how to use the Confluent libraries correctly. Micronaut provides a simple approach to testing which gives you the framework and it’s our job to use the framework to integrate libraries like Avro.

Schema Registry Client Bean

The first step is to add a schema registry client as a singleton bean in the main application. This will allow us to override the bean with a mock version provided by the Confluent library.

@Factory
public class SchemaRegistryClientFactory {
    
    @Property(name = "kafka.schema.registry.url")
    private String schemaRegistryUrl;
    @Value("${kafka.schema.registry.schema-versions:1000}")
    private int maxSchemaVersions;

    @Singleton
    public SchemaRegistryClient schemaRegistryClient() {
        return new CachedSchemaRegistryClient(schemaRegistryUrl, maxSchemaVersions);
    }
    
}

Serde Registry

Now you have a schema registry client as a bean, it needs to be used by the Avro Serdes (Serde is simply short for Serialiser/Deserialiser, and although Serdes are usually associated with Kafka Streams we can use them here as a simple way to configure a normal Kafka application). Register an Avro Serde registry implementation in the Micronaut CompositeSerdeRegistry by creating the class below:

public class AvroSerdeRegistry implements SerdeRegistry {

    @Inject
    private SchemaRegistryClient schemaRegistryClient;

    @SuppressWarnings("unchecked")
    @Override
    public <T> Serde<T> getSerde(Class<T> type) {
        if (Arrays.asList(type.getInterfaces()).contains(SpecificRecord.class)) {
            SpecificAvroSerde serde = new SpecificAvroSerde(schemaRegistryClient);
            Map<String, String> config = Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true",
                    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "");
            serde.configure(config, false);
            return (Serde<T>) serde;
        }
        return null;
    }

    @Override
    public int getOrder() {
        // Before JSON Serde
        return -1;
    }
}

Mock Schema Registry Client

Now Avro is being used by your application we can simply override the schema registry client with a mock implementation provided by Confluent. Place the below class in your test classes so it’s only registered in your tests:

@Factory
@Replaces(factory = SchemaRegistryClientFactory.class)
public class MockSchemaRegistryFactory {

    @Singleton
    SchemaRegistryClient schemaRegistryClient() {
        return new MockSchemaRegistryClient();
    }

}

This approach allows you to simply test your Kafka application whilst using the Avro message format. All of the code in this post is on Github.


Phil Hardwick

Written by Phil Hardwick