Kafka – Listening to topics dynamically

I’ve been working with Kafka for quite sometime now, using it as a Messaging queue between numerous Micro Services which in turn are Spring-Boot Java applications. Recently I got caught up with a use case which needed 2 specific functionalities from Kafka:

  1. Listening to topics dynamically
  2. Flow control on the Kafka queue or Consumer Pausing.

I searched all around the Internet to find out ways to achieve these but couldn’t find a proper solution at all. Now it was time I sat down with the root levels and figured out a solution by myself. With the help of Gary Russell, One of the developers of Kafka, I came up with a solution to achieve it. I would like to propose this solution so that it comes handy in future for someone having the same problem and also to showcase it to other developers who can contribute/criticise and make it better.

1) Reading topics dynamically:

Speaking at top level, The design philosophy of Kafka doesn’t allow anything like this because adding topics at runtime causes hard rebalance on the brokers and hence it has to be avoided, But if we manage to restart the consumer groups every time a new topic is added to the list, We can gracefully avoid this hazard.

The most recent Spring-Kafka integration and its usage includes the annotation ‘@KafkaListener’ which turns your POJO listener to a Kafka consumer by creating a KafkaListenerContainer using the container factory passed to it. This consumer listens to the topics hardcoded as a string array of topics, topic expressions, topic patterns etc. This limits our design to fetching these topics as keys through a Java DSL at max instead of directly hardcoding. However the keys are still hardcoded within an array passed as a parameter to @KafkaListener.

Example: @KafkaListener(topics = {“${kafka.topics.receipt.cancel.name}”}, containerFactory = “kafkaContainerFactory”)

Note: The value for annotation attribute KafkaListener.topics must be an array initializer, Therefore the hardcoding is mandatory.

This hardcoding at best will always need a code change for addition of any new topic or confining the topic names to abide by a specific pattern. Both of these are bad design solutions for large scale applications.

So what do we do? The solution is to go one step below and break this @KafkaListener into a program and use that instead.

To do this we need to understand what does this annotation do. @KafkaListener builds a KafkaListenerContainer out of the container factory provided to it, If there’s no one provided it always takes the default container factory. It injects a MessageListener to this container which is actually your POJO listener where the annotation is being used. With this the listener now becomes a Kafka consumer subscribed to the queue as informed to @KafkaListener by the container factory. We will do exactly the same. So I first created a Consumer Configuration files where I built a KafkaListenerContainer Factory and a KafkaListenerContainer out of it. Next, I created a simple Java MessageListener to listen and process the message I receive on the queue. As a config, I injected this MessageListener to my KafkaListenerContainer and there it is, My own container whose properties I can decide. To this container, We pass container properties based on our requirement. Here is where you define a array of topics as the topics this listener will be listening to and this array is constructed dynamically, Here we are not bound by any rule that says these topics that the listener listens to has to be hardcoded like in @KafkaListener.

The next part is to get these topics dynamically at runtime. This can be done by putting these topics in a yaml file or any DSL for that matter which can be reloaded at anytime while the application is running. A REST endpoint can do this job for us. Everytime a new topic is added to the file, all we need to do is to hit a reload API that reloads the yaml file to the application, reinitializes all the global variables and thereby reinitializes the topics array too. A non void static method in the Spring-Boot Class can be used to do this. This topics array gets injected into the container properties and the listener now listens to the new topic as well after a restart of the subscribed consumers. There’s no hard rebalancing problem here. The container is a spring bean, which gets created right at the application start, So is another bean that has to be created which starts this container at the application start.

This implementation enables us to read topics on the fly without code change or any server restart and have our listener listen to them gracefully.

It would be very helpful if people can throw more light on this and point out flaws in this solution to collectively bring out a better one for the problem.

I will discuss the second problem and its proposed solution in the next article.

Recent Blogs ///////////////////////

Article on ‘Kerala needs to raise its own bar of transparency in service delivery’ | eGov Foundation and CPPR

eGovernance in Property tax | Lessons from Andhra Pradesh

Societal Dialogues: Restoring Agency | eGov Foundation, Aapti Institute & Societal Platform

Not a Zero-Sum Game: How to Simultaneously Maximize Efficiency and Privacy in Data-driven Urban Governance

Balancing Efficiency And Citizen Privacy In Urban Governance

The success of this initiative is driven by how the citizens of the state are enabled to access the services of the Planning Authorities anytime, anywhere. These services will also have a direct impact on the Ease of Doing Business ranking for the state which has been one of our priority focus areas. We believe that DIGIT open platform will be a key enabler in the areas of online building permission system and help Planning Authorities to create a citizen-centric urban governance

Shri.A.Namassivayam, Hon’ble Minister for Town Planning, Govt of Puducherry

During mSeva WhatsApp Launch- Public grievance redressal in a time bound manner is of paramount importance for the Govt of Punjab. As a part of the “Digital Citizen Services First” approach, we digitized citizen-centric municipal services since 2018 and more than 8 modules can be accessed via web portal and mobile app. Extending the services , we now aim at the resolution of civic complaints with the widely used messaging platform, WhatsApp. Driving a paradigm shift from clicks to personalized seamless conversations will result in improved citizen experience and foster belongingness among citizens with speedy resolution of issues

Shri Ajoy Sharma, CEO, PMIDC

eChhawani is inaugurated today with 8 municipal services and I’m confident that this will continue to enhance citizen centric services for residents of our Cantonment Boards. This is a great step towards ‘good governance’ and is a shining example of India’s emergence as a leader in various sectors offering ease of doing business and ease of living for citizens. I congratulate all stakeholders involved in this initiative and I expect that officers will continue to take feedback from the citizens on whether this initiative is fulfilling the aim of citizen centric governance.

Shri Rajnath Singh, Defence Minister, Government of India

eChhawani is an effort towards offering multiple citizen services in a simple and straightforward manner to citizens across all Cantonment Boards and is the result of efforts of eGov Foundation, BEL, NIC and 62 Cantt Board Administrators supervised by DGDE and MoD. It is a new start and a digital milestone for 62 Cantonment Boards across the country.

Smt Deepa Bajwa, Director General, DGDE

eChhawani is an effort towards offering multiple citizen services in a simple and straightforward manner to citizens across all Cantonment Boards and is the result of efforts of eGov Foundation, BEL, NIC and 62 Cantt Board Administrators supervised by DGDE and MoD. It is a new start and a digital milestone for 62 Cantonment Boards across the country.

Smt Deepa Bajwa, Director General, DGDE

Till date we have seen that citizen services are made online either in a single ULB or at district, or state level. This is a beginning where multiple citizen services are simplified, standardised and launched at once pan-country across all 62 Cantonment Boards and eGov Foundation, BEL, NIC, DGDE and MoD teams have played a major role in the speedy implementation of this initiative. I congratulate all of them.

Shri Ajay Kumar, Defense Secretary, Government of India

Ministry Of Defence

Till date we have seen that citizen services are made online either in a single ULB or at district, or state level. This is a beginning where multiple citizen services are simplified, standardised and launched at once pan-country across all 62 Cantonment Boards and eGov Foundation, BEL, NIC, DGDE and MoD teams have played a major role in the speedy implementation of this initiative. I congratulate all of them.

Shri Ajay Kumar, Defense Secretary, Government of India

Ministry Of Defence

Till date we have seen that citizen services are made online either in a single ULB or at district, or state level. This is a beginning where multiple citizen services are simplified, standardised and launched at once pan-country across all 62 Cantonment Boards and eGov Foundation, BEL, NIC, DGDE and MoD teams have played a major role in the speedy implementation of this initiative. I congratulate all of them.

Shri Ajay Kumar, Defense Secretary, Government of India

Play Video

“Digitisation of Andhra Pradesh through eGov’s platform has really benefited govt employees by saving almost 19hrs of their time every week. Earlier the citizens used to run around to access municipal services. Today the ULB officers run around to deliver services to the citizens”

-Shri Kanna Babu, Ex. Director, CDMA

Government Of Andhra Pradesh

The AP govt recently rolled out the prestigious Ward Secretariat Program for doorstep delivery of 24 urban services. This was rapidly enabled on the Core Municipal Platform, ensuring easy roll out and consistency of operations.”

Shri GSRKR Vijay Kumar, IAS Commissioner, and Director – Municipal Administration, Andhra Pradesh

Nagpur Municipal Corporation

In 2008, as a part of the comprehensive eGovernance initiative that was meant to provide efficient services to citizens, administrators and corporators by implementing integrated governance solutions, NMC revamped the entire legacy system with eGov’s integrated e-governance system based on open technologies for over 60 functions of the ULB.