SaFi Bank Space : Kafka configuration

This is a work in progress, anyone is welcome to contribute. It is not a strict guide, but something to consider when developing apps with Kafka.

The topics in this article are annotated with symbols to mark the importance:

🟢 Nice to have

🟠 Needs to be considered

🔴 Important

🔴 Idempotence

Since Apache Kafka 3.x (Confluent Platform 7.0.0) the producer is by default idempotent:

Transient errors such as network connection issues can mean that the Kafka broker fails to acknowledge whether a message has been successfully written, leaving the Producer unsure if that message has been emitted. To ensure no message loss occurs if the write did fail the Producer must retry. By configuring the Producer to be idempotent, it is guaranteed that the message will only be written once, with no duplicate writes occurring.

Enabling idempotence enforces specific configuration:

Property name

Value

Explanation

acks

-1 (all)

Producer will wait for acknowledgments from all brokers in cluster until it marks record as successfully sent.

max.in.flight.requests.per.connection

5

The maximum values of requests (batches) sent to broker must be no more than 5 to enable idempotence and order guarantee.

retries

Integer.MAX_VALUE

The amount of retries made for un-acked batch. It makes sense to limit this property to lower number than MAX_VALUE in some cases, since retry.backoff.ms is 100 by default.

It is recommended to leave idempotence enabled (or enable it when using kafka-clients version lower than 3), even though the throughput might decrease, since acks=all itself increases latency.

🟢 Batching and compression

To increase overall producer throughput when multiple records are sent, it might be sensible to improve batching. By default, the linger.ms property which defines how long producer waits until batch is filled is 0. Adding a short delay would improve chance of batching. For larger messages it would also make sense to increase batch.size (default is 16384 bytes).

Another property that improves throughput is compression.type (by default none). The compression adds latency in producer (CPU usage), but makes requests smaller. Supported compression methods are gzip, snappy, lz4 or zstd.

Example of settings for improving throughput of producer:

Property name

Example value

linger.ms

200

batch.size

65536

compression.type

gzip

Consumer configuration

🔴 Offset committing

For consumer applications it is essential to mark the records that were read from partitions as “received” in broker. This is handled by committing offset that was read to the broker. By default, this is done automatically via enable.auto.commit = true. In this case, offsets are not committed after processing the message but after auto.commit.interval.ms period, which is by default 5 seconds. In case when application crashes and offsets are not committed, the next consumer in group that is assigned partition from the crashed consumer will start reading from offset that was last committed - which means it can process some offsets twice.

The safest choice is to enable synchronous committing after each records. This can be achieved in micronaut-kafka by creating Kafka listener with @KafkaListener(offsetStrategy = OffsetStrategy.SYNC_PER_RECORD). This, of course, decreases the throughput, since it performs synchronous (blocking) offset committing after each record, so it might be worth examining other options based on the application needs (e.g. async per record).

🟠 Offset reset policy

When new consumer group is created and consumers start reading records from partitions they need to determine what offset to start with. Default policy is auto.offset.reset=latest, which means it will start reading offset after partitions are assigned (meaning from next produced offset, not the last offset in partition). Setting auto.offset.reset=earliest means it will start reading from the first available offset. If the consumer group has changed (e.g. due to naming convention update or major version upgrade) it means it would re-read records that might have already been processed.

For instance the snapshots topics that contain compacted data might need to be re-read when consumer group is created, thus earliest makes sense here. On the other hand, events topics that contain short-lived records might be read with latest offset reset policy.

🟢 Static group membership

In case of network failure or application crash the consumer might be dropped out of group (after session.timeout.ms has passed - default is 45 seconds in Kafka 3.x), which would trigger rebalancing and partition reassignment.

(eager) Rebalancing is usually stop-the-world process, which should be avoided.

Setting group.instance.id to specific identifier would guarantee that after consumer is dropped out and returns within session.timeout.ms it will continue processing with same partition assignment.

🟠 Polling optimisation

When creating consumer in micronaut-kafka via @KafkaListener annotation it will create consumer with default poll timeout of 100ms. This can be overridden via pollTimeout property and it defines how long will consumer wait for incoming records. If timeout passes, the polling will return empty record set (NB: This is basically a long polling mechanism).

It seems that Micronaut’s @KafkaListener will not execute method that is defined via @Topic subscription when poll returns empty record set.

If consumer is not able to process records without creating consumer lag (diff between last produced and last consumed offset in partition), it might be useful to optimise polling mechanism:

Property name

Example value

Explanation

fetch.min.bytes

512

With default value of 1, consumer process batch immediately, which - depending on how long the processing takes - might block other records. Increasing this value would create a buffer and together with fetch.max.wait.ms optimise efficiency.

fetch.max.wait.ms

250

Default is 500, but this indicates how much time will consumer wait until amount of fetch.min.bytes is received.

Topic configuration

🔴 Retention

Default topic configurations are quite generous when it comes to how much space will be available for records. By default, topics retain data for longer period of time than desired, so it makes sense to specify these properties for specific topics:

NB: It is advised to create topics manually and not via consumers/producers, which may result in invalid topic configuration.

NB2: Most of these settings are defined per broker, but can be overridden in topic configuration.

Property name

Example value(s)

Explanation

cleanup.policy

delete / compact

By default records are deleted from partitions, but for “fact” topics the records could be compacted based on key (the latest record per key)

delete.retention.ms

86400000 (1 day)

The amount of time to retain delete tombstone markers (records with null value for specific key) for log compacted topics.

retention.bytes

1073741824 (1gb)

For some topics with delete retention policy we would like to retain only certain amount of data. After partition grows up beyond this value, it will start retention process on segments. Default is -1 (no limit).

retention.ms

86400000 (1 day)

Similarly to previous, this limits the amount of segments based on time. Default is 7 days.

segment.bytes

104857600 (100mb)

This controls the size of segment. To make retention meaningful, this must be set to value lower than retention.bytes

segment.ms

3600000 (1 hour)

This value specifies when to roll the new log segment, so that the old segments can be later retained. In other words - to make time-based retention work, this must be set to value lower than retention.ms

Broker configuration

This is more relevant to DevOps team or Kafka administrators.

🔴 Topic creation

In production environments, it is not advised that topics are automatically created (either by producers or consumers). It might be that the client will create topic with default properties and insufficient partitions. Therefore, auto.create.topics.enable should be set to false.

🔴 Partition replication

In production environment, Kafka cluster should consist of at least 3 brokers, where it is expected that partitions are replicated on each of them (The default.replication.factor=3).

Also min.insync.replicas defines the minimum number o in-sync replicas that must be available for the producer in order to successfully send the messages to the partition.If min.insync.replicas is set to 2 and acks is set to all, each message must be written successfully to at least two replicas. This means that the messages won't be lost, unless both brokers fail (unlikely). If one of the brokers fails, the partition will no longer be available for writes.

Attachments: