Dealing with large messages in Kafka

Dealing with large messages in Kafka
20 June, 2022 lmg

If you are running many data flows on a Kafka platform, you have probably at some point thought: will the messages in this flow fit in Kafka? Or should this integration really be done in another way?

The default Kafka broker configuration specifies a default limit around 1Mb for message batches, after any applied compression. It seems to be theoretically possible to increase this limit in your cluster, but it involves changing default settings both between client/broker and for broker/broker replication. In several discussions, arguments are raised against tweaking the default configurations, to maintain the stability of the platform and predictable operational behavior (memory use, CPU load etc).

At Irori, we have developed the open source Claim Check Interceptors library to help with large messages, follow along for some background, motivation and details.

Analyzing Kafka message sizing for your flow

So where does this 1Mb limit leave us? Either you make a serious investigation up-front to determine if your flow is sure to stay at all times under the 1Mb limit, or you design up-front for alternative integration patterns when messages are large. When discussing this issue with developers at some Irori clients, a fairly common result of the initial analysis is:

  • All current observed messages are well within the limit
  • The messages contain some dynamic portion, so we can’t rule out that they will grow in the future
  • The flow is too important to suddenly start failing in case some messages grow too large

In this scenario you are really in a gray-area: is it really worth it to look into alternative integration patterns? Everything seems to be working fine today…

Thoughts about compression in Kafka

Now a quick side-track about compression: can’t you solve many cases by applying compression (e.g set gzip as the compression.type in the producer config)? This approach is nice because it is seamlessly supported by all major Kafka clients. In most cases this will significantly reduce your risk of exceeding the 1Mb limit. However, this also makes the reasoning a bit harder about how close you are to the limit. It now depends on the compression ratio on your particular payload. This can be all over the place:

  • Repetitive XML: quite good compression ratio (maybe 1:5, 1:10)
  • Unique XML, eg a list of unique identifiers: decent compression ratio (maybe 1:2)
  • Similar results for JSON but it is slightly less verbose than XML
  • A JPEG image: almost no compression (1:1), for certain payloads the compression could actually add size

In general, compression is likely to give you more buffer before hitting the size limit, but probably does not once and for all solve the problem and any uncertainties of the original analysis.

Introducing the Claim Check Interceptors library for Kafka Java clients

We realized this was a common problem and set out to implement the Claim Check integration pattern for Kafka Java clients. The what now?!, you might ask:

The analogy is here from the world of airline travel – if you can remember that from pre-pandemic times 😉. The Sender (passenger) wants to send a large message (transport heavy and bulky luggage) over a message bus (airplane). Instead of sending your message over the normal message bus (carry-on luggage), you store your message in a separate datastore (check in your luggage for transport in the storage compartment). The consumer (passenger at the destination) fetches the message form the separate datastore (retrieves the luggage from the carousel).

In some ways, this analogy breaks down. Because in this scenario the sender and receiver are the same passenger, while in the realm of IT systems they are typically two different systems. In terms of dealing with the luggage the analogy is quite clear, and particularly fitting for the implementation we went with, since smaller messages (carry-on) can still be transported as-is.

In our solution the messages are stored in Azure Blob Storage:

We implemented the library using the concepts of (De-)Serializers and Interceptors in the standard Kafka Java client. This means that any existing software that uses the Java Kafka client and allows you to add config settings dynamically to the Kafka Producer and Consumer can keep working as is with two small changes:

An example from the world of Spring Boot, say your config looks like this today (simplified):

spring:
  kafka:
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

The simple modification will then be:

spring:
  kafka:
    properties:
      claimcheck.backend.class: se.irori.kafka.claimcheck.azure.AzureBlobStorageClaimCheckBackend
      azure.blob.storage.account.endpoint: https://<your-storage-account>.blob.core.windows.net
      azure.blob.storage.account.sastoken.from: file:/path/to/my-token.sastoken
    producer:
      value-serializer: se.irori.kafka.claimcheck.ClaimCheckSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
    interceptor.classes: se.irori.kafka.claimcheck.ClaimCheckProducerInterceptor
    value.serializer.wrapped.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
      value-deserializer: se.irori.kafka.claimcheck.ClaimCheckDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
    value.deserializer.wrapped.deserializer: org.apache.kafka.common.serialization.StringDeserializer

And then add the library dependency in your Maven pom/Gradle build file or how you are managing your dependencies. Simple as Pie! One gotcha that should be mentioned here however is that we depend on the Azure Java SDK, which has quite a bit of dependency surface that could in theory collide with some dependencies in your application. See this comment about potential mitigations.

Enough with the Spring Boot YAML! What does this actually do?

  • The Kafka Java ecosystem is now “Claim Check Ready” 😀
    • Kafka Connect, Spring Kafka, or most places you can use the standard Java Kafka client (even kafka-console-consumer.sh if you put your JARs in the right spot)
  • By default messages that uncompressed are larger than the default limit will get checked in to the Blob store, all other messages will be processed only via Kafka
  • In the current implementation all message batches still need to be able to be kept in memory for the systems running the producer/consumer.

The main design goal for the library was to support the flows that might be “just slightly too big” for Kafka. The Azure Blob size limit is around 4TB, so we we don’t know for sure where the sweet spot for using this library lies, but somewhere between 1Mb and 4TB 😏 .

We are pushing this library to more and more production flows with Irori clients so hopefully we will be able to report back on best practices in a future blog post.

Feel free to start using the library, but we recommend carefully testing your use case and potentially tweaking timeout settings, batch sizes and what size limit to start checking in messages. We have written it with pluggable backends in mind, so going forward we aim to support further storage solutions, such as S3 and GCS.

Also check out the Spring Boot example project to easily get started.

Irori has a lot of experience setting up Kafka clusters and designing applications that play to the strengths in Kafka. Feel free to contact us if you want to learn more. We are also working on a batteries-included Kafka platform, see our previous blog post.

Author:
Björn Löfroth
Senior Software Engineer and Kafka Expert


References

https://github.com/irori-ab/claim-check-interceptors