Apache Kafka 3.2.1 defines 744 different configuration parameters spread across brokers, producers, consumers and other components. A majority of these of course have sane defaults to make onboarding to the technology at all possible. You probably don’t even need to know most of them while others can be directly dangerous for your solution – and your vacation plans.
“We can’t see any updates, we think it has to do with Kafka”
It was the first thing I heard when I was about to have a nice lunch with some friends during the summer holidays.
“Looks like the consumers are processing the same message over and over again…” they continued
After ditching my lunch-plans, opening the laptop and scrolling through logs from a number of applications I could conclude that their initial analysis seemed correct: for a certain topic the consumers were stuck processing the same records over and over again which meant that the target system never became updated as expected. Strange, this flow had never had this kind of problems for the few years it had been in production.
However, I soon realized that this was just another case of a “slow consumer” which we’ve had a couple of times before. The solution, like before, was to tweak the default settings for max.poll.records and/or max.poll.interval.ms.
For reference, this is what showed up in our logs, note that Kafka logs this on INFO (!) level:
Consumer clientId=status-listener, groupId=status-groupId] Member my-member sending LeaveGroup request to coordinator hostname.net:9093 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
Kafka is not configured for slow consumers per default
For a consumer, the interface between our code and the Kafka client library boils down to the poll() method. This method will fetch records from the brokers and return zero or more records to our code, depending on if there actually are any unconsumed records on the topic. It’s here the configuration values above comes into play:
- We will at maximum get max.poll.records records in return
- We have to call the method at least once every max.poll.interval.ms, otherwise the client library will think that our code is no longer interested in consuming (or have died or something) and leave the consumer group. This behavior enables some other consumer instances to pick up and continue.
(In our case we use Spring Kafka, which abstracts away the poll() method and just gives us a callback handler)
These two configuration properties basically put the following requirement on our code: it needs to be able to consume max.poll.records within max.poll.interval.ms ms. Default values for these are: 500 and 300000 (5 minutes) respectively, giving the code on average 0,6 seconds per record. Now, 600ms might be a pretty tough limit if you have some processing to do for each record (in our case: a couple of database-operations). As you might have guessed, on this occasion our consumer was slower than that. What then happened was that the records stayed uncommitted and another instance picked up the same batch of records, just to run into the exact same problem after a while. So the consumers just re-processes the same batch over and over and didn’t make any progress.
A tricky thing with this is that the issue only manifests when the batches become large enough. This could e.g. happen if the consumer was shut off for some time or we have a sudden spike in producing records. But as long as the consumer doesn’t lag significantly, the batches are small enough for it to complete within max.poll.interval.ms ms. In our case, it took about 2 years until this problem showed up.
Take careful note that the default consumer configuration assumes a pretty fast consumer. If this isn’t the case, you should consider reconfiguring either max.poll.records or max.poll.interval.ms (or both).
- Decreasing max.poll.records don’t affect how responsive certain fault scenarios become, but have the drawback of reducing throughput since overall overhead increases (less batching)
- Increasing max.poll.interval.ms don’t affect throughput, but can lead to slower responsiveness in consumer rebalances.
Solution Architect and Kafka Expert