Kafka Messaging with Spring 101

Kafka Messaging with Spring 101

Written by Ketone Maniac on Mar 13th, 2021 Views Report Post

My first serious application using Kafka is Spring Cloud Functions with Webflux. While exploring its (lack of) documentation and related tutorials, I turned out finding lots of stuff along the way on different favors of messaging with Spring. In this article I would cover, in the sequence below:

  • spring-kafka
  • spring-cloud-stream
  • spring-cloud-function
  • spring-cloud-function on Webflux (reactive)
  • spring-cloud-bus

Documentation becomes more scarce as we go down the list. But I'm still fortunate enough to find a bunch of online resources for each of the topics above, though some don't work off the shelf. I will supply links to credit the original source and probably try to notify the authors of the fixes needed when I complete this article.

Code and Kafka Docker Image

The working code for this article is in Github. It contains one module for each of the spring messaging modules given above plus a docker-compose file to boot Kafka up. While there is Embedded Kafka for testing I prefer booting up a real one to better visualize how the topics/consumer groups are actually created in the real Kafka cluster.

I've borrowed the docker-compose file from this tutorial which I highly recommend to go through. It doesn't just boots up the Kafka Cluster but also introduces a lot of goodies around it (one you would definitely want to use as you go along is the control-center which gives you a nice Web UI at http://localhost:9021. I got a PRIMARY KEY error on the way, you could refer to the README if you encounter the same.

spring-kafka

If you want a simple pub/sub without any fancy stuff, this is the way to go. Thanks to this tutorial, here's how a typical Producer looks like:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        log.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

and the Consumer:

@Slf4j
@Component
@Data
public class KafkaConsumer {

    private CountDownLatch latch = new CountDownLatch(1);

    private String payload = null;

    @KafkaListener(topics = "${test.topic}", groupId = "spring-kafka-consumer")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        log.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());
        latch.countDown();
    }

}

You see, so simple. No BS. Just say kafkaTemplate.send() on one side and put @KafkaListener on the other. How can it be simplier than that! The only catch is in the configuration, which somehow I wasn't able to run correctly without the value-serializer, and the test.topic needs to be created in Kafka manually first:

spring:
  kafka:
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

test:
  topic: embedded-test-topic

Other than that, it's all good.

spring-cloud-stream

Now this and spring-kafka's documentation is still pretty abundant, it's easy to find examples of such out there. The example here follows this tutorial, which is pretty decent and works without any modification. Things start to get not so intuitive, but it's still acceptable as of my standards. In essence you create a Producer with @EnableBinding

@Data
@EnableBinding(Source.class)
public class Producer {

    public Producer(final Source source) {
        super();
        this.mySource = source;
    }

    private Source mySource;

}

Then whoever wants to send gets a MessageChannel from the Source above and sends it:

    // get the String message via HTTP, publish it to broker using spring cloud stream
    @RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
    public String publishMessageString(@RequestBody String payload) {

        // send message to channel
        producer.getMySource()
                .output()
                .send(MessageBuilder.withPayload(payload)
                        .setHeader("type", "string")
                        .build());

        return "success";
    }

The consumer is somewhat like the spring-kafka one, just that it becomes StreamListener now with the definitions put to the configs.

@Slf4j
@EnableBinding(Sink.class)
public class Consumer {

    @StreamListener(target = Sink.INPUT)
    public void consume(String message) {

        log.info("recieved a string message : " + message);
    }

Configuration still feels normal:

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
            - localhost:9092
      bindings:
        input:
          binder: kafka
          destination: spring-cloud-stream-test
          content-type: text/plain
          group: input-group-1
        output:
          binder: kafka
          destination: spring-cloud-stream-test
          group: output-group-1
          content-type: text/plain

spring-cloud-function

Firstly, Spring Cloud Function is not an individual messaging package, we are still using Spring Cloud Stream but with functions. It's starting to get difficult to find good docs with this one, this is where I base my findings on. Now, first just look at the configuration:

spring:
  cloud:
    stream:
      function:
        definition: send;accumulate;receive
      bindings:
        send-out-0:
          destination: ints
        accumulate-in-0:
          destination: ints
        accumulate-out-0:
          destination: total
        receive-in-0:
          destination: total

Now you need to know the strings in spring.cloud.stream.function.definition are Spring beans of either java.util.function.Supplier,java.util.function.Function or java.util.function.Consumer. Then they must correspond to the first "-" delimited segment inside spring.cloud.stream.bindings, with the topic name as destination regardless of input/output. There's more on spring.cloud.stream.bindings -- the 2nd segment in/out determines whether this is a Producer or Receiver. I've never seen anything other than 0 in the 3rd segment. That's a lot of cryptic information isn't it? At first I was surprised to see the whole Producer/Consumer are just functions like the below:

        @Bean
        public Supplier<Integer> send() {
            return () -> 1;
        }
        private AtomicInteger count = new AtomicInteger(0);

        @Bean
        public Function<Integer, String> accumulate() {
            return payload -> "Current value: " + payload + ", Total: " + this.count.addAndGet(payload);
        }
    @Bean
    public Consumer<String> receive() {
        return payload -> {
            log.info(payload);
        };
    }

What they do is actually do all the magic behind the scenes -- it automatically marshalls/unmarshalls the message and calls the send/receive functions for you, so all you see are the message bodies.

Think that's enough magic? Look more closely to the code above. How many messages do you think the above setup would send (I've already pasted all meaningful code)? Just 1 then stop? I would guess so since the Supplier just gives a constant 1. Turns out that it's giving the 1 every second. Jeeess that really blew my mind! I haven't dug real deep into the configurations but if you don't want the 1 second default behavior, my personal recommendation is to use Webflux as described below.

spring-cloud-function with Webflux

So you could actually replace all the parameters in the functions with Fluxes. The Supplier would hence become this:

        @Bean
        public Supplier<Flux<Integer>> sendFlux() {
            return () -> Flux.interval(Duration.ofSeconds(5))
                    .map(Long::intValue);
        }

Now this gives a Integer from an ascending sequence starting from 0 every 5 seconds. If you do Flux.just(1), then it gives a 1 and it's over. No more magic 1 second trick.

However, from my working experience, playing Flux with messaging streams is pretty dangerous since it requires very careful error handling. If the Flux errors out without anyone catching it (e.g. onErrorContinue() or onErrorResume()), the stream breaks but the program will not exit. There is also another deadly error which the Flux reads the messages, buffers it, then the program dies. Those messages will be considered "processed" by Kafka and will not be replayed unless you explicitly move the offset. I should cover this in another article later.

spring-cloud-bus

This is the latest addition to the suite of Spring messaging packages and you're getting really hard-pressed to get examples of this now. I've only found a Youtube video for it and even that has it's problems (see the README for fixes).

Essentially, spring-cloud-bus tries to multicast events via Spring's ApplicationEvent, and extends it to its own RemoteApplicationEvent. So firstly you create the event:

@Data
public class MyCustomRemoteEvent extends RemoteApplicationEvent {

    private String message;

    public MyCustomRemoteEvent() {}

    public MyCustomRemoteEvent(Object source, String originService, String message) {
        super(source, originService);
        this.message = message;
    }

}

Now the sender publishes the event via the ApplicationContext:

    @Autowired
    private ApplicationContext context;

    @Autowired
    private BusProperties busProperties;

    @PutMapping(value="/publish")
    public String publishMessage() {
        final String uniqueId = context.getId();
        final MyCustomRemoteEvent event = new MyCustomRemoteEvent(this, busProperties.getId(), "Hello World!");
        context.publishEvent(event);
        return "Event Published.";
    }

and the receiver side does 2 things: a special scan for the MyCustomRemoteEvent

@SpringBootApplication
@RemoteApplicationEventScan(basePackages = {"net.ketone.kafka.scb.model"})
public class SpringCloudBusReceiverApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringCloudBusReceiverApplication.class, args);
	}

}

Then a normal ApplicationListener to listen to events of such type:

@Slf4j
@Component
public class MyCustomRemoteEventListener implements ApplicationListener<MyCustomRemoteEvent>{

    private AtomicInteger count = new AtomicInteger();

    @Override
    public void onApplicationEvent(MyCustomRemoteEvent myCustomRemoteEvent) {
        log.info("Received Message: {} {}", myCustomRemoteEvent.getMessage(), count.incrementAndGet());
    }
}

Configuration is as concise as can be, the only thing that matters is the topic:

spring:
  application:
    index: ${random.uuid}
  cloud:
    bus:
      destination: spring-cloud-bus-test

To be frank I haven't really used this with any real project code, but my gut feel is that this has gone way too cryptic. Like ApplicationEvents don't even look anything like messaging-related until you finally realize there's a spring-cloud-bus dependency in your project!

Anyway, this ends my journey into finding different favors of Kafka messaging with Spring. Definitely more to come on messaging -- till then, happy coding!

Comments (0)