This is a work in progress, anyone is welcome to contribute. It is not a strict guide, but something to consider when developing apps with Kafka.
The topics in this article are annotated with symbols to mark the importance:
🟢 Nice to have
🟠 Needs to be considered
🔴 Important
🔴 Idempotence
Since Apache Kafka 3.x (Confluent Platform 7.0.0) the producer is by default idempotent:
Transient errors such as network connection issues can mean that the Kafka broker fails to acknowledge whether a message has been successfully written, leaving the Producer unsure if that message has been emitted. To ensure no message loss occurs if the write did fail the Producer must retry. By configuring the Producer to be idempotent, it is guaranteed that the message will only be written once, with no duplicate writes occurring.
Enabling idempotence enforces specific configuration:
Property name | Value | Explanation |
---|---|---|
|
| Producer will wait for acknowledgments from all brokers in cluster until it marks record as successfully sent. |
|
| The maximum values of requests (batches) sent to broker must be no more than 5 to enable idempotence and order guarantee. |
|
| The amount of retries made for un-acked batch. It makes sense to limit this property to lower number than MAX_VALUE in some cases, since |
It is recommended to leave idempotence enabled (or enable it when using kafka-clients version lower than 3), even though the throughput might decrease, since acks=all
itself increases latency.
🟢 Batching and compression
To increase overall producer throughput when multiple records are sent, it might be sensible to improve batching. By default, the linger.ms
property which defines how long producer waits until batch is filled is 0
. Adding a short delay would improve chance of batching. For larger messages it would also make sense to increase batch.size
(default is 16384
bytes).
Another property that improves throughput is compression.type
(by default none
). The compression adds latency in producer (CPU usage), but makes requests smaller. Supported compression methods are gzip
, snappy
, lz4
or zstd
.
Example of settings for improving throughput of producer:
Property name | Example value |
---|---|
|
|
|
|
|
|
Consumer configuration
🔴 Offset committing
For consumer applications it is essential to mark the records that were read from partitions as “received” in broker. This is handled by committing offset that was read to the broker. By default, this is done automatically via enable.auto.commit = true
. In this case, offsets are not committed after processing the message but after auto.commit.interval.ms
period, which is by default 5 seconds. In case when application crashes and offsets are not committed, the next consumer in group that is assigned partition from the crashed consumer will start reading from offset that was last committed - which means it can process some offsets twice.
The safest choice is to enable synchronous committing after each records. This can be achieved in micronaut-kafka
by creating Kafka listener with @KafkaListener(offsetStrategy = OffsetStrategy.SYNC_PER_RECORD)
. This, of course, decreases the throughput, since it performs synchronous (blocking) offset committing after each record, so it might be worth examining other options based on the application needs (e.g. async per record).
🟠 Offset reset policy
When new consumer group is created and consumers start reading records from partitions they need to determine what offset to start with. Default policy is auto.offset.reset=latest
, which means it will start reading offset after partitions are assigned (meaning from next produced offset, not the last offset in partition). Setting auto.offset.reset=earliest
means it will start reading from the first available offset. If the consumer group has changed (e.g. due to naming convention update or major version upgrade) it means it would re-read records that might have already been processed.
For instance the snapshots topics that contain compacted data might need to be re-read when consumer group is created, thus
earliest
makes sense here. On the other hand, events topics that contain short-lived records might be read withlatest
offset reset policy.
🟢 Static group membership
In case of network failure or application crash the consumer might be dropped out of group (after session.timeout.ms
has passed - default is 45 seconds in Kafka 3.x), which would trigger rebalancing and partition reassignment.
(eager) Rebalancing is usually stop-the-world process, which should be avoided.
Setting group.instance.id
to specific identifier would guarantee that after consumer is dropped out and returns within session.timeout.ms
it will continue processing with same partition assignment.
🟠 Polling optimisation
When creating consumer in micronaut-kafka
via @KafkaListener
annotation it will create consumer with default poll timeout of 100ms. This can be overridden via pollTimeout
property and it defines how long will consumer wait for incoming records. If timeout passes, the polling will return empty record set (NB: This is basically a long polling mechanism).
It seems that Micronaut’s
@KafkaListener
will not execute method that is defined via@Topic
subscription when poll returns empty record set.
If consumer is not able to process records without creating consumer lag (diff between last produced and last consumed offset in partition), it might be useful to optimise polling mechanism:
Property name | Example value | Explanation |
---|---|---|
|
| With default value of |
|
| Default is |
Topic configuration
🔴 Retention
Default topic configurations are quite generous when it comes to how much space will be available for records. By default, topics retain data for longer period of time than desired, so it makes sense to specify these properties for specific topics:
NB: It is advised to create topics manually and not via consumers/producers, which may result in invalid topic configuration.
NB2: Most of these settings are defined per broker, but can be overridden in topic configuration.
Property name | Example value(s) | Explanation |
---|---|---|
|
| By default records are deleted from partitions, but for “fact” topics the records could be compacted based on key (the latest record per key) |
|
| The amount of time to retain delete tombstone markers (records with null value for specific key) for log compacted topics. |
|
| For some topics with delete retention policy we would like to retain only certain amount of data. After partition grows up beyond this value, it will start retention process on segments. Default is -1 (no limit). |
|
| Similarly to previous, this limits the amount of segments based on time. Default is 7 days. |
|
| This controls the size of segment. To make retention meaningful, this must be set to value lower than |
|
| This value specifies when to roll the new log segment, so that the old segments can be later retained. In other words - to make time-based retention work, this must be set to value lower than |
Broker configuration
This is more relevant to DevOps team or Kafka administrators.
🔴 Topic creation
In production environments, it is not advised that topics are automatically created (either by producers or consumers). It might be that the client will create topic with default properties and insufficient partitions. Therefore, auto.create.topics.enable
should be set to false
.
🔴 Partition replication
In production environment, Kafka cluster should consist of at least 3 brokers, where it is expected that partitions are replicated on each of them (The default.replication.factor=3
).
Also min.insync.replicas
defines the minimum number o in-sync replicas that must be available for the producer in order to successfully send the messages to the partition.If min.insync.replicas
is set to 2
and acks
is set to all
, each message must be written successfully to at least two replicas. This means that the messages won't be lost, unless both brokers fail (unlikely). If one of the brokers fails, the partition will no longer be available for writes.