April 04, 2020
Here are all the ways you can configure Micronaut Kafka, both regular applications and streams, to use particular serialisers and deserialisers.
A key thing to remember is properties are used first and then the configured serde registries are used.
(as an aside I found it helpful to know a serde is simply and object with both a serialiser and a deserialiser)
kafka:
key:
serializer: org.apache.kafka.common.serialization.UUIDSerializer
deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value:
serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
kafka:
producers:
default:
key.serializer: org.apache.kafka.common.serialization.UUIDSerializer
value.serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
kafka:
consumers:
default:
key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
kafka:
consumers:
my-consumer-group:
key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
The same goes for producers, you can use their id to configure them specifically.
Create an instance of a io.micronaut.configuration.kafka.serde.SerdeRegistry
:
@Singleton
public class AvroSerdeRegistry implements SerdeRegistry {
@Inject
private SchemaRegistryClient schemaRegistryClient;
@SuppressWarnings("unchecked")
@Override
public <T> Serde<T> getSerde(Class<T> type) {
if (Arrays.asList(type.getInterfaces()).contains(SpecificRecord.class)) {
SpecificAvroSerde serde = new SpecificAvroSerde(schemaRegistryClient);
Map<String, String> config = Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true",
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "");
serde.configure(config, false);
return (Serde<T>) serde;
}
return null;
}
@Override
public int getOrder() {
// Before JSON Serde
return -1;
}
}
The order is important as this class will be picked up by the CompositeSerdeRegistry and all the SerdeRegistry will be tried in order until one returns a non-null Serde.
If you want more control than just ordered Serde Registries, or if you want to make sure JsonSerde is never used, replace the CompositeSerdeRegistry:
@Singleton
@Replaces(CompositeSerdeRegistry.class)
public class StringSerdeRegistry implements SerdeRegistry {
@Override
public <T> Serde<T> getSerde(Class<T> type) {
return (Serde<T>) Serdes.String();
}
}
This will only ever serialise or deserialise Strings.
kafka:
streams:
#Micronaut default - applies this config to all streams
default:
#Kafka streams default
default:
key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
kafka:
streams:
bank-transfer:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
@Factory
public class TransferStreamFactory {
public static final String BANK_TRANSFER = "bank-transfer";
public static final String INPUT = "transfer-commands";
public static final String OUTPUT = "transfer-events";
@Singleton
@Named(BANK_TRANSFER)
KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) {
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.UUID().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
KStream<UUID, MakeTransfer> source = builder.stream(INPUT);
source.mapValues(value -> TransferEvent.newBuilder()
.setTransferId(UUID.randomUUID())
.setSrcAccountId(value.getSrcAccountId())
.setDestAccountId(value.getDestAccountId())
.setAmount(value.getAmount())
.build())
.to(OUTPUT);
return source;
}
}
@Factory
public class TransferStreamFactory {
public static final String BANK_TRANSFER = "bank-transfer";
public static final String INPUT = "transfer-commands";
public static final String OUTPUT = "transfer-events";
@Singleton
@Named(BANK_TRANSFER)
KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) {
Properties props = builder.getConfiguration();
Map<String, Object> serdeConfig = Map.of(SCHEMA_REGISTRY_URL_CONFIG, props.get(SCHEMA_REGISTRY_URL_CONFIG));
SpecificAvroSerde<MakeTransfer> inputValueSerde = new SpecificAvroSerde<>();
inputValueSerde.configure(serdeConfig, false);
SpecificAvroSerde<TransferEvent> outputValueSerde = new SpecificAvroSerde<>();
outputValueSerde.configure(serdeConfig, false);
KStream<UUID, MakeTransfer> source = builder.stream(INPUT, Consumed.with(Serdes.UUID(), inputValueSerde));
source.mapValues(value -> TransferEvent.newBuilder()
.setTransferId(UUID.randomUUID())
.setSrcAccountId(value.getSrcAccountId())
.setDestAccountId(value.getDestAccountId())
.setAmount(value.getAmount())
.build())
.to(OUTPUT, Produced.with(Serdes.UUID(), outputValueSerde));
return source;
}
}
You can also, by not configuring any serdes in properties or the stream code allow Micronaut to pick the serde from the SerdeRegistry, so the same advice above about adding new Serde Registries (or replacing existing ones) apply in Kafka Streams!
There’s lots of flexibility here. The best approach is to configure everything to be the same - so use the default properties and then use other more specific approaches as the need arises e.g. if all messages have UUID keys apart from one, configure all producers and consumers to use UUID serdes and then for the one consumer that needs it configure a String serde. This keeps configuration and repetition low. I would also favour properties over class and code configuration as it means less classes to understand when others open the project and less clutter around the business logic.
Written by Phil Hardwick