Wick Technology Blog

Configuring Micronaut Kafka with Serialisers and Deserialisers

April 04, 2020

Here are all the ways you can configure Micronaut Kafka, both regular applications and streams, to use particular serialisers and deserialisers.

A key thing to remember is properties are used first and then the configured serde registries are used.

(as an aside I found it helpful to know a serde is simply and object with both a serialiser and a deserialiser)

For regular kafka

If you want all consumers and producers to have the same config

kafka:
  key:
    serializer: org.apache.kafka.common.serialization.UUIDSerializer
    deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
  value:
    serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
    deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer

If you want all producers to have the same config

kafka:
  producers:
    default:
      key.serializer: org.apache.kafka.common.serialization.UUIDSerializer
      value.serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer

Documentation

If you want just consumers to have the same config

kafka:
  consumers:
    default:
      key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
      value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer

Documentation

If you want just one consumer (with group id “my-consumer-group”) to be configured

kafka:
  consumers:
    my-consumer-group:
      key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
      value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
      

The same goes for producers, you can use their id to configure them specifically.

If you want to configure the serialisers and deserialisers in code

Create an instance of a io.micronaut.configuration.kafka.serde.SerdeRegistry:

@Singleton
public class AvroSerdeRegistry implements SerdeRegistry {

    @Inject
    private SchemaRegistryClient schemaRegistryClient;

    @SuppressWarnings("unchecked")
    @Override
    public <T> Serde<T> getSerde(Class<T> type) {
        if (Arrays.asList(type.getInterfaces()).contains(SpecificRecord.class)) {
            SpecificAvroSerde serde = new SpecificAvroSerde(schemaRegistryClient);
            Map<String, String> config = Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true",
                    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "");
            serde.configure(config, false);
            return (Serde<T>) serde;
        }
        return null;
    }

    @Override
    public int getOrder() {
        // Before JSON Serde
        return -1;
    }
}

The order is important as this class will be picked up by the CompositeSerdeRegistry and all the SerdeRegistry will be tried in order until one returns a non-null Serde.

Option 2 for code configuration

If you want more control than just ordered Serde Registries, or if you want to make sure JsonSerde is never used, replace the CompositeSerdeRegistry:

@Singleton
@Replaces(CompositeSerdeRegistry.class)
public class StringSerdeRegistry implements SerdeRegistry {
    @Override
    public <T> Serde<T> getSerde(Class<T> type) {
        return (Serde<T>) Serdes.String();
    }
}

This will only ever serialise or deserialise Strings.

For Kafka Streams

Official Documentation

By properties, for all streams

kafka:
  streams:
    #Micronaut default - applies this config to all streams
    default:
      #Kafka streams default
      default:
        key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
        value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

By properties, for just one stream called bank-transfer

kafka:
  streams:
    bank-transfer:
      default:
        key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
        value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        

In code as properties for just one stream

@Factory
public class TransferStreamFactory {
    public static final String BANK_TRANSFER = "bank-transfer";
    public static final String INPUT = "transfer-commands";
    public static final String OUTPUT = "transfer-events";

    @Singleton
    @Named(BANK_TRANSFER)
    KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) {
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.UUID().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());

        KStream<UUID, MakeTransfer> source = builder.stream(INPUT);
        source.mapValues(value -> TransferEvent.newBuilder()
                        .setTransferId(UUID.randomUUID())
                        .setSrcAccountId(value.getSrcAccountId())
                        .setDestAccountId(value.getDestAccountId())
                        .setAmount(value.getAmount())
                        .build())
                .to(OUTPUT);
        return source;
    }
}      
  

In code for just one stream, with different configuration for input and output

@Factory
public class TransferStreamFactory {
    public static final String BANK_TRANSFER = "bank-transfer";
    public static final String INPUT = "transfer-commands";
    public static final String OUTPUT = "transfer-events";

    @Singleton
    @Named(BANK_TRANSFER)
    KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) {
        Properties props = builder.getConfiguration();
        Map<String, Object> serdeConfig = Map.of(SCHEMA_REGISTRY_URL_CONFIG, props.get(SCHEMA_REGISTRY_URL_CONFIG));
        
        SpecificAvroSerde<MakeTransfer> inputValueSerde = new SpecificAvroSerde<>();
        inputValueSerde.configure(serdeConfig, false);
        SpecificAvroSerde<TransferEvent> outputValueSerde = new SpecificAvroSerde<>();
        outputValueSerde.configure(serdeConfig, false);
        
        KStream<UUID, MakeTransfer> source = builder.stream(INPUT, Consumed.with(Serdes.UUID(), inputValueSerde));
        source.mapValues(value -> TransferEvent.newBuilder()
                        .setTransferId(UUID.randomUUID())
                        .setSrcAccountId(value.getSrcAccountId())
                        .setDestAccountId(value.getDestAccountId())
                        .setAmount(value.getAmount())
                        .build())
                .to(OUTPUT, Produced.with(Serdes.UUID(), outputValueSerde));
        return source;
    }
}

In code

You can also, by not configuring any serdes in properties or the stream code allow Micronaut to pick the serde from the SerdeRegistry, so the same advice above about adding new Serde Registries (or replacing existing ones) apply in Kafka Streams!

Conclusion

There’s lots of flexibility here. The best approach is to configure everything to be the same - so use the default properties and then use other more specific approaches as the need arises e.g. if all messages have UUID keys apart from one, configure all producers and consumers to use UUID serdes and then for the one consumer that needs it configure a String serde. This keeps configuration and repetition low. I would also favour properties over class and code configuration as it means less classes to understand when others open the project and less clutter around the business logic.


Phil Hardwick

Written by Phil Hardwick