Kafka – Listening to topics dynamically

I’ve been working with Kafka for quite sometime now, using it as a Messaging queue between numerous Micro Services which in turn are Spring-Boot Java applications. Recently I got caught up with a use case which needed 2 specific functionalities from Kafka:

  1. Listening to topics dynamically
  2. Flow control on the Kafka queue or Consumer Pausing.

I searched all around the Internet to find out ways to achieve these but couldn’t find a proper solution at all. Now it was time I sat down with the root levels and figured out a solution by myself. With the help of Gary Russell, One of the developers of Kafka, I came up with a solution to achieve it. I would like to propose this solution so that it comes handy in future for someone having the same problem and also to showcase it to other developers who can contribute/criticise and make it better.

1) Reading topics dynamically:

Speaking at top level, The design philosophy of Kafka doesn’t allow anything like this because adding topics at runtime causes hard rebalance on the brokers and hence it has to be avoided, But if we manage to restart the consumer groups every time a new topic is added to the list, We can gracefully avoid this hazard.

The most recent Spring-Kafka integration and its usage includes the annotation ‘@KafkaListener’ which turns your POJO listener to a Kafka consumer by creating a KafkaListenerContainer using the container factory passed to it. This consumer listens to the topics hardcoded as a string array of topics, topic expressions, topic patterns etc. This limits our design to fetching these topics as keys through a Java DSL at max instead of directly hardcoding. However the keys are still hardcoded within an array passed as a parameter to @KafkaListener.

Example: @KafkaListener(topics = {“${kafka.topics.receipt.cancel.name}”}, containerFactory = “kafkaContainerFactory”)

Note: The value for annotation attribute KafkaListener.topics must be an array initializer, Therefore the hardcoding is mandatory.

This hardcoding at best will always need a code change for addition of any new topic or confining the topic names to abide by a specific pattern. Both of these are bad design solutions for large scale applications.

So what do we do? The solution is to go one step below and break this @KafkaListener into a program and use that instead.

To do this we need to understand what does this annotation do. @KafkaListener builds a KafkaListenerContainer out of the container factory provided to it, If there’s no one provided it always takes the default container factory. It injects a MessageListener to this container which is actually your POJO listener where the annotation is being used. With this the listener now becomes a Kafka consumer subscribed to the queue as informed to @KafkaListener by the container factory. We will do exactly the same. So I first created a Consumer Configuration files where I built a KafkaListenerContainer Factory and a KafkaListenerContainer out of it. Next, I created a simple Java MessageListener to listen and process the message I receive on the queue. As a config, I injected this MessageListener to my KafkaListenerContainer and there it is, My own container whose properties I can decide. To this container, We pass container properties based on our requirement. Here is where you define a array of topics as the topics this listener will be listening to and this array is constructed dynamically, Here we are not bound by any rule that says these topics that the listener listens to has to be hardcoded like in @KafkaListener.

The next part is to get these topics dynamically at runtime. This can be done by putting these topics in a yaml file or any DSL for that matter which can be reloaded at anytime while the application is running. A REST endpoint can do this job for us. Everytime a new topic is added to the file, all we need to do is to hit a reload API that reloads the yaml file to the application, reinitializes all the global variables and thereby reinitializes the topics array too. A non void static method in the Spring-Boot Class can be used to do this. This topics array gets injected into the container properties and the listener now listens to the new topic as well after a restart of the subscribed consumers. There’s no hard rebalancing problem here. The container is a spring bean, which gets created right at the application start, So is another bean that has to be created which starts this container at the application start.

This implementation enables us to read topics on the fly without code change or any server restart and have our listener listen to them gracefully.

It would be very helpful if people can throw more light on this and point out flaws in this solution to collectively bring out a better one for the problem.

I will discuss the second problem and its proposed solution in the next article.

Recent Blogs ///////////////////////

Not a Zero-Sum Game: How to Simultaneously Maximize Efficiency and Privacy in Data-driven Urban Governance

Balancing Efficiency And Citizen Privacy In Urban Governance