My first serious application using Kafka is Spring Cloud Functions with Webflux. While exploring its (lack of) documentation and related tutorials, I turned out finding lots of stuff along the way on different favors of messaging with Spring. In this article I would cover, in the sequence below:
- spring-kafka
- spring-cloud-stream
- spring-cloud-function
- spring-cloud-function on Webflux (reactive)
- spring-cloud-bus
Documentation becomes more scarce as we go down the list. But I'm still fortunate enough to find a bunch of online resources for each of the topics above, though some don't work off the shelf. I will supply links to credit the original source and probably try to notify the authors of the fixes needed when I complete this article.
Code and Kafka Docker Image
The working code for this article is in Github. It contains one module for each of the spring messaging modules given above plus a docker-compose file to boot Kafka up. While there is Embedded Kafka for testing I prefer booting up a real one to better visualize how the topics/consumer groups are actually created in the real Kafka cluster.
I've borrowed the docker-compose file from this tutorial which I highly recommend to go through. It doesn't just boots up the Kafka Cluster but also introduces a lot of goodies around it (one you would definitely want to use as you go along is the control-center which gives you a nice Web UI at http://localhost:9021. I got a PRIMARY KEY error on the way, you could refer to the README if you encounter the same.
spring-kafka
If you want a simple pub/sub without any fancy stuff, this is the way to go. Thanks to this tutorial, here's how a typical Producer looks like:
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
and the Consumer:
@Slf4j
@Component
@Data
public class KafkaConsumer {
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}", groupId = "spring-kafka-consumer")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
log.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
}
You see, so simple. No BS. Just say kafkaTemplate.send()
on one side and put @KafkaListener
on the other. How can it be simplier than that! The only catch is in the configuration, which somehow I wasn't able to run correctly without the value-serializer
, and the test.topic
needs to be created in Kafka manually first:
spring:
kafka:
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
auto-offset-reset: earliest
group-id: baeldung
value-serializer: org.apache.kafka.common.serialization.StringSerializer
test:
topic: embedded-test-topic
Other than that, it's all good.
spring-cloud-stream
Now this and spring-kafka
's documentation is still pretty abundant, it's easy to find examples of such out there. The example here follows this tutorial, which is pretty decent and works without any modification. Things start to get not so intuitive, but it's still acceptable as of my standards. In essence you create a Producer with @EnableBinding
@Data
@EnableBinding(Source.class)
public class Producer {
public Producer(final Source source) {
super();
this.mySource = source;
}
private Source mySource;
}
Then whoever wants to send gets a MessageChannel
from the Source
above and sends it:
// get the String message via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
public String publishMessageString(@RequestBody String payload) {
// send message to channel
producer.getMySource()
.output()
.send(MessageBuilder.withPayload(payload)
.setHeader("type", "string")
.build());
return "success";
}
The consumer is somewhat like the spring-kafka one, just that it becomes StreamListener
now with the definitions put to the configs.
@Slf4j
@EnableBinding(Sink.class)
public class Consumer {
@StreamListener(target = Sink.INPUT)
public void consume(String message) {
log.info("recieved a string message : " + message);
}
Configuration still feels normal:
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers:
- localhost:9092
bindings:
input:
binder: kafka
destination: spring-cloud-stream-test
content-type: text/plain
group: input-group-1
output:
binder: kafka
destination: spring-cloud-stream-test
group: output-group-1
content-type: text/plain
spring-cloud-function
Firstly, Spring Cloud Function is not an individual messaging package, we are still using Spring Cloud Stream but with functions. It's starting to get difficult to find good docs with this one, this is where I base my findings on. Now, first just look at the configuration:
spring:
cloud:
stream:
function:
definition: send;accumulate;receive
bindings:
send-out-0:
destination: ints
accumulate-in-0:
destination: ints
accumulate-out-0:
destination: total
receive-in-0:
destination: total
Now you need to know the strings in spring.cloud.stream.function.definition
are Spring beans of either java.util.function.Supplier
,java.util.function.Function
or java.util.function.Consumer
. Then they must correspond to the first "-" delimited segment inside spring.cloud.stream.bindings
, with the topic name as destination
regardless of input/output. There's more on spring.cloud.stream.bindings
-- the 2nd segment in
/out
determines whether this is a Producer or Receiver. I've never seen anything other than 0
in the 3rd segment. That's a lot of cryptic information isn't it? At first I was surprised to see the whole Producer/Consumer are just functions like the below:
@Bean
public Supplier<Integer> send() {
return () -> 1;
}
private AtomicInteger count = new AtomicInteger(0);
@Bean
public Function<Integer, String> accumulate() {
return payload -> "Current value: " + payload + ", Total: " + this.count.addAndGet(payload);
}
@Bean
public Consumer<String> receive() {
return payload -> {
log.info(payload);
};
}
What they do is actually do all the magic behind the scenes -- it automatically marshalls/unmarshalls the message and calls the send/receive functions for you, so all you see are the message bodies.
Think that's enough magic? Look more closely to the code above. How many messages do you think the above setup would send (I've already pasted all meaningful code)? Just 1 then stop? I would guess so since the Supplier just gives a constant 1. Turns out that it's giving the 1 every second. Jeeess that really blew my mind! I haven't dug real deep into the configurations but if you don't want the 1 second default behavior, my personal recommendation is to use Webflux as described below.
spring-cloud-function with Webflux
So you could actually replace all the parameters in the functions with Flux
es. The Supplier
would hence become this:
@Bean
public Supplier<Flux<Integer>> sendFlux() {
return () -> Flux.interval(Duration.ofSeconds(5))
.map(Long::intValue);
}
Now this gives a Integer
from an ascending sequence starting from 0 every 5 seconds. If you do Flux.just(1)
, then it gives a 1 and it's over. No more magic 1 second trick.
However, from my working experience, playing Flux with messaging streams is pretty dangerous since it requires very careful error handling. If the Flux errors out without anyone catching it (e.g. onErrorContinue()
or onErrorResume()
), the stream breaks but the program will not exit. There is also another deadly error which the Flux reads the messages, buffers it, then the program dies. Those messages will be considered "processed" by Kafka and will not be replayed unless you explicitly move the offset. I should cover this in another article later.
spring-cloud-bus
This is the latest addition to the suite of Spring messaging packages and you're getting really hard-pressed to get examples of this now. I've only found a Youtube video for it and even that has it's problems (see the README for fixes).
Essentially, spring-cloud-bus
tries to multicast events via Spring's ApplicationEvent
, and extends it to its own RemoteApplicationEvent
. So firstly you create the event:
@Data
public class MyCustomRemoteEvent extends RemoteApplicationEvent {
private String message;
public MyCustomRemoteEvent() {}
public MyCustomRemoteEvent(Object source, String originService, String message) {
super(source, originService);
this.message = message;
}
}
Now the sender publishes the event via the ApplicationContext
:
@Autowired
private ApplicationContext context;
@Autowired
private BusProperties busProperties;
@PutMapping(value="/publish")
public String publishMessage() {
final String uniqueId = context.getId();
final MyCustomRemoteEvent event = new MyCustomRemoteEvent(this, busProperties.getId(), "Hello World!");
context.publishEvent(event);
return "Event Published.";
}
and the receiver side does 2 things: a special scan for the MyCustomRemoteEvent
@SpringBootApplication
@RemoteApplicationEventScan(basePackages = {"net.ketone.kafka.scb.model"})
public class SpringCloudBusReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudBusReceiverApplication.class, args);
}
}
Then a normal ApplicationListener
to listen to events of such type:
@Slf4j
@Component
public class MyCustomRemoteEventListener implements ApplicationListener<MyCustomRemoteEvent>{
private AtomicInteger count = new AtomicInteger();
@Override
public void onApplicationEvent(MyCustomRemoteEvent myCustomRemoteEvent) {
log.info("Received Message: {} {}", myCustomRemoteEvent.getMessage(), count.incrementAndGet());
}
}
Configuration is as concise as can be, the only thing that matters is the topic:
spring:
application:
index: ${random.uuid}
cloud:
bus:
destination: spring-cloud-bus-test
To be frank I haven't really used this with any real project code, but my gut feel is that this has gone way too cryptic. Like ApplicationEvent
s don't even look anything like messaging-related until you finally realize there's a spring-cloud-bus
dependency in your project!
Anyway, this ends my journey into finding different favors of Kafka messaging with Spring. Definitely more to come on messaging -- till then, happy coding!
Comments (0)