December 29, 2019
Update - 18th June 2020
As of Confluent version 5.4.0 it’s best to use Confluent’s built in mocking system. Simply define your schema registry url with a mock scheme e.g. mock://test-schema-registry
and the serialisers/deserialisers will use a mocked schema registry client.
Here’s a guide to using Avro message formats when testing a Micronaut service. Most of this code is not specific to Micronaut but about how to use the Confluent libraries correctly. Micronaut provides a simple approach to testing which gives you the framework and it’s our job to use the framework to integrate libraries like Avro.
The first step is to add a schema registry client as a singleton bean in the main application. This will allow us to override the bean with a mock version provided by the Confluent library.
@Factory
public class SchemaRegistryClientFactory {
@Property(name = "kafka.schema.registry.url")
private String schemaRegistryUrl;
@Value("${kafka.schema.registry.schema-versions:1000}")
private int maxSchemaVersions;
@Singleton
public SchemaRegistryClient schemaRegistryClient() {
return new CachedSchemaRegistryClient(schemaRegistryUrl, maxSchemaVersions);
}
}
Now you have a schema registry client as a bean, it needs to be used by the Avro Serdes (Serde is simply short for Serialiser/Deserialiser, and although Serdes are usually associated with Kafka Streams we can use them here as a simple way to configure a normal Kafka application). Register an Avro Serde registry implementation in the Micronaut CompositeSerdeRegistry by creating the class below:
public class AvroSerdeRegistry implements SerdeRegistry {
@Inject
private SchemaRegistryClient schemaRegistryClient;
@SuppressWarnings("unchecked")
@Override
public <T> Serde<T> getSerde(Class<T> type) {
if (Arrays.asList(type.getInterfaces()).contains(SpecificRecord.class)) {
SpecificAvroSerde serde = new SpecificAvroSerde(schemaRegistryClient);
Map<String, String> config = Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true",
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "");
serde.configure(config, false);
return (Serde<T>) serde;
}
return null;
}
@Override
public int getOrder() {
// Before JSON Serde
return -1;
}
}
Now Avro is being used by your application we can simply override the schema registry client with a mock implementation provided by Confluent. Place the below class in your test classes so it’s only registered in your tests:
@Factory
@Replaces(factory = SchemaRegistryClientFactory.class)
public class MockSchemaRegistryFactory {
@Singleton
SchemaRegistryClient schemaRegistryClient() {
return new MockSchemaRegistryClient();
}
}
This approach allows you to simply test your Kafka application whilst using the Avro message format. All of the code in this post is on Github.
Written by Phil Hardwick