SaFi Bank Space : Confluent Kafka DR & Failover with Cluster Link

Deploying a disaster recovery strategy with Cluster Linking can increase availability and reliability of your mission-critical applications by minimizing data loss and downtime during unexpected disasters, like public cloud provider outages. This document will explain how to design your disaster recovery strategy and execute a failover.

You start with a “primary” cluster that contains data in its topics and metadata used for operations, like consumer offsets and ACLs. Your applications are powered by producers that send data into those topics, and consumers that read data out of those topics.

You can use Cluster Linking to create a disaster recovery (DR) cluster that is in a different region or cloud than the primary cluster. When an outage hits the primary cluster, the DR cluster will have an up-to-date copy of your data and metadata. The producers and consumers can switch over to the DR cluster, allowing them to continue running with low downtime and minimal data loss. Thus, your applications can continue to serve your business and customers during a disaster.

A crucial input informing the design of your disaster recovery plan is the Recovery Time Objective(s) (RTO) and Recovery Point Objective(s) (RPO) that you hope to achieve. An RTO is the maximum amount of downtime that your system can have during an outage, measured as the difference between the time the outage occurs and the time your system is back up and running. An RPO is the maximum amount of data you are willing to risk losing because of an outage, measured as the difference between the last message produced to the failed cluster and the last message replicated to the DR cluster from the failed cluster.

The RTO you can achieve with Cluster Linking depends on your tooling and failover procedures, since failing over your applications is your responsibility. There’s no set minimum or maximum RTO.

The RPO you can achieve Cluster Linking is determined by the mirroring lag between the DR cluster and the primary cluster. Mirroring lag is exposed via Metrics API, via REST API, and in the Confluent Cloud Console.

When implementing a disaster recovery (“DR”) plan to failover from a primary cluster to a DR cluster, there are several aspects you must design into your Kafka clients, so that they can failover smoothly:

  • Clients must bootstrap to the DR cluster once a failover is triggered

  • Consumers must be able to tolerate a small number of duplicate messages (“idempotency”)

  • Consumers and producers must be tolerant of an RPO (“Recovery-Point Objective”)

When you detect an outage and decide to failover to the DR cluster, your clients must all switch over to the DR cluster to produce and consume data. To do this, your clients must do two things:

  • When the clients start, they must use the bootstrap server and security credentials of the DR cluster. It is not best practice to hardcode the bootstrap servers and security credentials of your primary and DR clusters into your clients’ code. Instead, you should store the bootstrap server of the active cluster in a Service Discovery tool (like Hashicorp Consul) and the security credentials in a key manager (like Hashicorp Vault or AWS Secret Manager). When a client starts up, it fetches its bootstrap server and security credentials from these tools. To trigger a failover, you change the active bootstrap server and security credentials in these tools to those of the DR cluster. Then, when your clients restart, they will bootstrap to the DR cluster.

  • Any clients that are still running need to stop and restart. If your primary cluster has an outage but not your clients (for example, if you have clients in a different region than the primary cluster that were unaffected by the regional cloud service provider outage), these clients will still be running and attempting to connect to the primary cluster. When you decide to failover, these clients need to stop running and restart, so that they will bootstrap to the DR cluster. There is not a mechanism built into Kafka to do this. There are several approaches you may take to achieve this behavior:

    • If a central Kafka operator manages all clients centrally, such as in a Kubernetes cluster, the operator can order all clients to shut down until the count of running clients is down to 0, and then can scale the clients back up.

    • You can add code wrapping your clients that polls the service discovery tool to check for a change in the bootstrap server. If the bootstrap server changes, the wrapping code restarts the clients.

    • Each team with Kafka clients can be paged and ordered to restart their clients.

How quickly your clients are able to bootstrap to the DR cluster will determine a large part of your recovery time after an outage. It is recommended that you practice the failover process so you can be sure that you can hit your RTOs (“Recovery Time Objectives”).

Cluster Linking consumer offset sync gives your applications a low RTO by enabling your consumers to failover and restart very close to the point where they left off, without missing any new messages that were produced on the DR cluster. However, consumer offset sync is an asynchronous process. When consumers commit their offsets to the source cluster, the commit completes without also committing the offsets to the destination cluster. The offsets will are written to the destination cluster every consumer.offset.sync.ms milliseconds (a cluster link configuration that you can set as low as 1 second).

Because consumer offset sync is asynchronous, when an outage or disaster occurs, the most recent consumer offsets may not have been committed to the destination cluster, yet. So, when your consumer applications begin consuming on the destination cluster, the first few messages they receive may be messages they have already received. Consumer applications must be able to tolerate these duplicates on a failover.

Cluster Linking enables you to have a copy of your topic data on a second cluster, so that you don’t lose all of your business-critical event data should a regional outage or disaster occur. However, Cluster Linking is an asynchronous process. When producers produce messages to the source cluster, they get an acknowledgement (also known as an “ack”) from the source cluster independent of when, and often before, the cluster link replicates those messages to the DR cluster.

Therefore, when an outage occurs and a failover is triggered, there may be a small number of messages that have not been replicated to the DR cluster. Producer and consumer applications must be tolerant of this.

You can monitor your exposure to this via the mirroring lag in the Metrics API, CLI, REST API, and Confluent Cloud Console.

For the Disaster Recovery (DR) cluster to be ready to use when disaster strikes, it will need to have an up-to-date copy of the primary cluster’s topic data, consumer group offsets, and ACLs:

  • The DR cluster needs up-to-date topic data so that consumers can process messages that they haven’t yet consumed. Consumers that are lagging can continue to process topic data while missing as few messages as possible. Any future consumers you create can process historical data without missing any data that was produced before the disaster. This helps you achieve a low Recovery Point Objective (RPO) when a disaster happens.

  • The DR cluster needs up-to-date consumer group offsets so that when the consumers switch over to the DR cluster, they can continue processing messages from the point where they left off. This minimizes the number of duplicate messages the consumers read, which helps you minimize application downtime. This helps you achieve a low Recovery Time Objective (RTO).

  • The DR cluster needs up-to-date ACLs so that the producers and consumers can already be authorized to connect to it when they switch over. Having these ACLs already set and up-to-date also helps you achieve a low RTO.

(warning) When using Schema Linking: To use a mirror topic that has a schema with Confluent Cloud Connect, ksqlDB, broker-side schema validation, or the topic viewer, make sure that Schema Linking puts the schema in the default context of the Confluent Cloud Schema Registry. To learn more, see How Schemas work with Mirror Topics. (warning)

Set up a Disaster Recovery cluster to use with Cluster Linking:

  1. If needed, create a new Dedicated Confluent Cloud cluster with public internet in a different region or cloud provider to use as the DR cluster.

  2. Create a cluster link from the primary cluster to the DR cluster. the cluster link should have these configurations:

    1. Enable consumer offset sync. If you plan to failover only a subset of the consumer groups to the DR cluster, then use a filter to only select those consumer group names. Otherwise, sync all consumer group names.

    2. Enable ACL sync. If you plan to failover only a subset of the Kafka clients to the DR cluster, then use a filter to select only those clients. Otherwise, sync all ACLs.

  3. Using the cluster link, create a mirror topic on the DR cluster for each of the primary cluster’s topics. If you only want DR for a subset of the topics, then only create mirror topics for that subset.

  4. Enable auto-create mirror topics on the cluster link, which will automatically create DR mirror topics for the topics that exist on the source cluster. As new topics are created on your source cluster over time, auto-create mirror topics will automatically mirror them to the DR cluster.

    • If you only need DR for a subset of topics, you can scope auto-create mirror topics by topic prefixes or specific topic names.

    • For some use cases, it may be better to create mirror topics from the API call (POST /clusters/{cluster_id}/links/{link_name}/mirrors), CLI command (confluent kafka mirror create), or Confluent Cloud Console instead of enabling auto-create mirror topics. For example, this may be preferable if your architecture has an onboarding process that topics and clients must follow in order to opt-in to DR. If you choose this option, whenever a new topic is created on the primary cluster that needs DR, you must explicitly create a mirror topic on the DR cluster.

Created this way Disaster Recovery cluster stays up to date as the primary cluster’s data and metadata change.

Whenever you create a new topic on the primary cluster that you want to have DR, create a mirror topic for it on the DR cluster.

(warning) Each Kafka client needs an API key and secret for each cluster that it connects to. To achieve a low RTO, create API keys on the DR cluster ahead of time, and store them in a vault where your Kafka clients can retrieve them when they connect to the DR cluster. (warning)

The Disaster Recovery (DR) cluster needs to stay up-to-date with the primary cluster so you can minimize data loss when a disaster hits the primary cluster. Because Cluster Linking is an “asynchronous” process, there may be “lag:” messages that exist on the primary cluster but haven’t yet been mirrored to the DR cluster. Lagged data is at risk of being lost when a disaster strikes.

You can monitor the DR cluster’s lag using built-in metrics to see how much data any mirror topic risks losing during a disaster. The Metrics API’s mirror lag metric reports an estimate of the maximum number of lagged messages on a mirror topic’s partitions.

In the Confluent CLI, there are two ways to see a mirror topic’s lag at that point in time:

  • confluent kafka mirror list lists all mirror topics on the destination cluster, and includes a column called Max Per Partition Mirror Lag, which shows the maximum lag among each mirror topic’s partitions. You can filter on a specific cluster link or mirror topic status with the --link and --mirror-status flags.

  • confluent kafka mirror describe <mirror-topic> --link <link-name> shows detailed information about each of that mirror topic’s partitions, including a column called Partition Mirror Lag, which shows each partition’s estimated lag.

The Confluent Community REST API returns a list of a mirror topic’s partitions and lag at these endpoints:

  • /kafka/v3/clusters/<destination-cluster-id>/links/<link-name>/mirrors returns all mirror topics for the cluster link.

  • /kafka/v3/clusters/<destination-cluster-id>/links/<link-name>/mirrors/<mirror-topic> returns only the specified mirror topic.

The list of partitions and lags takes this format:

"mirror_lags": [
  {
    "partition": 0,
    "lag": 24
  },
  {
    "partition": 1,
    "lag": 42
  },
  {
    "partition": 2,
    "lag": 15
  },
  ...
],

When a consumer group first consumes from a mirror topic on your DR cluster, or when you run the failover command, please note the following considerations.

You should first stop mirror topics, and then move all of your producers and consumers over to the destination cluster. The destination cluster should become your new active cluster, at least for the duration of the disaster and the recovery. If it works for your use case, a suggested strategy is to make the Destination cluster your new, permanent active cluster.

There may be lagged data that did not make it to the destination before the disaster occurred. When you move your consumers, if any had not already read that data on the source, then they will not read that data on the destination. If/when the disaster resolves your source cluster, that lagged data will still be there. So, you are free to consume or handle it as fits with your use case.

For example, if the Source was up to offset 105, but the Destination was only up to offset 100, then the source data from offsets 101-105 will not be present on the Destination. The Destination will get new, fresh data from the producers that will go into its offsets 101-105. When the disaster resolves, the Source will still have its data from offsets 101-105 available to consume manually.

There may be lagged consumer offsets that did not make it to the destination before the disaster occurred. If this is the case, then when you move your consumers to the destination, they may read duplicate data.

For example, if at the time that you stop your mirroring:

  • Consumer A had read up to offset 100 on the Source

  • Cluster Linking had mirrored the data through offset 100 to the Destination

  • Cluster Linking had last mirrored consumer offsets that showed Consumer A was only at offset 95

Then when you move Consumer A to the Destination, it may read offsets 96-100 again, resulting in duplicate reads.

The failover command “clamps” consumer offsets.

This means that, when you run mirror failover, if the following two conditions are true:

  • Consumer A was on source offset 105, and that was successfully mirrored to the Destination

  • The data on the Destination was lagging and was only up to offset 100 (so it did not have offsets 101-105)

then, when you call failover, Consumer A’s offset on the Destination will be “clamped” down to offset 100, since that is the highest available offset on the Destination.

Note that this will cause Consumer A to “re-consume” offsets 101-105. If your producers send new, fresh data to the Destination, then Consumer A will not read duplicate data. (However, if you had custom-coded your producers to re-send offsets 101-105 with the same data, then your consumers could read the same data twice. This is a rare case, and is likely not how you have designed your system.)

To learn more, see Syncing consumer group offsets in “Mirror Topics”.

Keep in mind that you can configure consumer.offset.sync.ms to suit your needs (default is 30 seconds). A more frequent sync might give you a better failover point for your consumer offsets, at the cost of bandwidth and throughput during normal operation.

See the tutorial of how to use Cluster Linking to fail over a topic and a simple command-line consumer and producer from an original cluster to a Disaster Recovery (DR) cluster: LINK