Wick Technology Blog

Producing messages from Spring Cloud Stream v2 which v1.3 can understand

May 07, 2019

When using microservices, each service is standalone. It can use whichever technology, language, version that it needs to, in order to fulfill the service’s purpose. In practice it’s easier and quicker for deployment, management and support to use a similar pattern across most microservices.

However, even when following a similar microservice pattern, using different major versions of dependencies can cause problems. For example, using Spring Cloud Stream (SCS) 2.0 with Spring Cloud Stream 1.3.

Spring Cloud Stream 2.0 came with some major changes which meant it would no longer work out of the box when sending messages to Spring Cloud Stream applications running any version before 2.0. The changes revolved around the way content types are handled. When using Avro schemas which serialises the payload into bytes, Spring Cloud Stream 1.3 puts the content type (application/avro*) into an originalContentType header and set the contentType header as application/octet-stream. When a 2.0 Spring Cloud Stream application tries to produce messages which are consumed by an Spring Cloud Stream 1.3 application it’s unsure how to deserialise the application/avro* content type and it can’t find an originalContentType header.

One solution is to use a global channel interceptor to change the headers after Spring has serialised the message but before sending it.

For producing messages from a 2.0 application

@Component
@GlobalChannelInterceptor(patterns = "*-out")
public class ContentTypeSwappingInterceptor implements ChannelInterceptor {

    @Override
    public Message preSend(Message<?> message, MessageChannel channel) {
        Object originalContentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
        return MessageBuilder.fromMessage(message)
                .setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType)
                .setHeader(MessageHeaders.CONTENT_TYPE, "application/octet-stream").build();
    }

}

This interceptor simply swaps headers using a MessageBuilder to match how Spring Cloud Stream v1.3 applications expect it.
It’s important to filter which channels this gets applied to because Spring Cloud Stream has it’s own InboundContentTypeEnhancingInterceptor class which will deal with 1.3 headers on incoming messages - messages which a 2.0 application is consuming. Change the value in the GlobalChannelInterceptor annotation to make sure this interceptor only applies to output channels otherwise this will be applied to input channels too and deserialisation will not work. Because version 2.0 deals with 1.3 headers via InboundContentTypeEnhancingInterceptor this solution will allow both versions of Spring Cloud Stream to consumer messages from a topic.

Further information can be found in this github issue, GH-1106.


Phil Hardwick

Written by Phil Hardwick