SaFi Bank Space : Confluent Kafka backup and restore

  1. Backup and Restore Google Cloud Storage Source Connector for Confluent Platform

  2. Cluster Linking for Confluent Platform

Backup and Restore Google Cloud Storage Source Connector for Confluent Platform

The Kafka Connect Backup and Restore Google Cloud Storage (GCS) Source connector provides the capability to read data exported to GCS by the Kafka Connect GCS Sink connector and publish it back to a Kafka topic.

Depending on the format and partitioner used to write the data to GCS, this connector can write to the destination topic using the same partitions as the original messages exported to GCS and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folders GCS objects in alphabetical order. Each record is read based on the format selected. Configuration is designed to mirror the Kafka Connect GCS Sink connector and should be possible to create source connector configs with only minor changes to the original sink configuration.


(warning) The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector. If the topics do not exist, Connect relies on configuring auto topic creation for source connectors and the number of partitions are based upon the Kafka broker defaults. If there are more partitions in the destination cluster, the extra partitions are not used. If there are fewer partitions in the destination cluster, the connector task throws an exception and stops the moment it tries to write to a Kafka partition that does not exist. (warning)


Be aware of the following connector actions:

  • The connector ignores any GCS object with a name that does not start with the configured topics directory. This name is /topics/ by default.

  • The connector ignores any GCS object that is below the topics directory but has an extension that does not match the configured format. For example, a JSON file is ignored when format.class is set for Avro files.

  • The connector stops and fails if the GCS object’s name does not match the expected format or is in an unexpected location.

  • The connector ignores any GCS object that is below the topics directory but has a partitioner structure that does not match the configured class. For example, DefaultPartitioner structure is ignored when partitioner.class is set for TimeBasedPartitioner.

Avoid the following configuration issues:

  • A file with the correct extension and a valid name format e.g. <topic>+<partition>+<offset>.<extension>, placed in a folder of a different topic will be read normally and written to whatever topic as defined by its filename.

  • Do not configure a field partitioner to match the expected folder. Doing so can break the ordering that was reflected when a GCS sink exported the records using a deterministic sink partitioner.

Features

  • At least once delivery

    • In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.

  • Multiple tasks

    • The Backup and Restore GCS Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

  • Pluggable data format with or without schema

    • Out of the box, the connector supports reading data from GCS in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.

  • Matching source partitioning

    • Messages will be put back on to the same Kafka partition for that topic when it was written.

  • Source partition ordering

    • The connector will read records back in time order in each topic-source partition if the DefaultPartitioner or a TimeBasedPartitioner is used. If a FieldPartitioner is used it isn’t possible to guarantee the order of these messages.

  • Pluggable partitioner

    • The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.
      All partitioners will notice new topic folders with the inbuilt task reconfiguration thread. The DefaultPartitioner detects new partition folders. The FieldPartitioner notices new folders for the fields specified. However, the TimeBasedPartititoner does not currently detect new files for a new time period.
      Be careful when both the Connect GCS Sink connector and the GCS Source connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and GCS objects. It is possible to avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector. Use the RegexRouter with the source connector to change the names of the topics where the records are written. Or, use the Extract topic SMT with the source connector to change the topic name based upon a field in each message.

      (warning) By default, connectors inherit the partitioner used for the Kafka topic. You can create a custom partitioner for a connector which you must place in the connector’s /lib folder.

      You can also put partitioners in a common location of choice. If you choose this option, you must add a symlink to the location from each connector’s /lib folder. For example, you would place a custom partitioner in the path share/confluent-hub-components/partitioners and then add the symlink share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners. (warning)

Requirements to run the Backup and Restore GCS Source connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above

  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above

  • Java 1.8

Installation of the Backup and Restore GCS Source Connector

(warning) You must install the connector on every machine where Connect will run. (warning)

Configuration Properties

Complete list of configuration properties: Google Cloud Storage Source Connector Configuration Properties.

Quick start and examples

  1. the GCS Sink connector quick start

  2. Install the connector through the Confluent Hub Client:

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-gcs-source:latest

    (!) By default, the plugin is installed into share/confluent-hub-components and the directory is added to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.

Backup and Restore GCS Source Connector Partitions

The Backup and Restore GCS Source connector supports a variety of partitions. The connector’s partitioner determines how records that are read from blob items are partitioned into Kafka topics. Messages will be put back on to the same Kafka partition for that topic when it was written.

The partitioner is specified in the connector configuration with the partitioner.class configuration property. The Backup and Restore GCS Source connector comes with the following partitioners:

  • DefaultPartitioner: To use the DefaultPartitioner → set the partitioner.class property to io.confluent.connect.storage.partitioner.DefaultPartitioner.

  • FieldPartitioner: A partitioner that uses record values of the configured partition.field.name to determine partitions. To use the FieldPartitioner → set the partitioner.class property to io.confluent.connect.storage.partitioner.FieldPartitioner.

  • TimeBasedPartitioner: To use the TimeBasedPartitioner → set the partitioner.class property to io.confluent.connect.storage.partitioner.TimeBasedPartitioner.

  • DailyPartitioner: A subclass of the TimeBasedPartitioner. To use the DailyPartitioner → set the partitioner.class property to io.confluent.connect.storage.partitioner.DailyPartitioner.

  • HourlyPartitioner: A subclass of the TimeBasedPartitioner. To use the HourlyPartitioner → set the partitioner.class property to io.confluent.connect.storage.partitioner.HourlyPartitioner.

Backup and Restore GCS Source Connector Data Formats

The Backup and Restore GCS Source connector supports several data formats:

  • Avro: For supporting Avro format. To use Avro format → set the format.class property to io.confluent.connect.gcs.format.avro.AvroFormat.

  • JSON: For supporting JSON format. To use JSON format → set the format.class property to io.confluent.connect.gcs.format.json.JsonFormat.

  • Raw Bytes: For supporting Raw Bytes format. To use the Raw Bytes format → set the format.class property to io.confluent.connect.gcs.format.bytearray.ByteArrayFormat.

Pricing

Backup pricing is based on Cloud Storage Bucket pricing.

See cloud storage pricing tables for more information

Cluster Linking for Confluent Cloud

Cluster Linking on Confluent Cloud is a fully-managed service for moving data from one Confluent cluster to another. Programmatically, it creates perfect copies of your topics and keeps data in sync across clusters.

Cluster Linking is a powerful geo-replication technology for:

  • Multi-cloud and global architectures powered by real-time data in motion

  • Data sharing between different teams, lines of business, or organizations

  • High Availability (HA)/Disaster Recovery (DR) during an outage of a cloud provider’s region

  • Data and workload migration from a Apache Kafka® cluster to Confluent Cloud or Confluent Platform cluster to Confluent Cloud

  • Protect Tier 1, customer-facing applications and workloads from disruption by creating a read-replica cluster for lower-priority applications and workloads

  • Hybrid cloud architectures that supply real-time data to applications across on-premises datacenters and the cloud

  • Syncing data between production environments and staging or development environments

Cluster Linking is fully-managed in Confluent Cloud, so you don’t need to manage or tune data flows. Its usage-based pricing puts multi-cloud and multi-region costs into your control. Cluster Linking reduces operational burden and cloud egress fees, while improving the performance and reliability of your cloud data pipelines.

How it Works

Cluster Linking allows one Confluent cluster to mirror data directly from another. You can establish a cluster link between a source cluster and a destination cluster in a different region, cloud, line of business, or organization. You choose which topics to replicate from the source cluster to the destination. You can even mirror consumer offsets and ACLs, making it straightforward to move Kafka consumers from one cluster to another.

In one command or API call, you can create a cluster link from one cluster to another. A cluster link acts as a persistent bridge between the two clusters.

confluent kafka link create tokyo-sydney
  --source-bootstrap-server pkc-867530.ap-northeast-1.aws.confluent.cloud:9092
  --source-cluster-id lkc-42492
  --api-key AP1K3Y
  --api-secret ********

To mirror data across the cluster link, you create mirror topics on your destination cluster.

confluent kafka mirror create clickstream.tokyo
   --link tokyo-sydney

Mirror topics are a special kind of topic: they are read-only copies of their source topic. Any messages produced to the source topic are mirrored to the mirror topic “byte-for-byte,” meaning that the same messages go to the same partition and same offset on the mirror topic. Mirror topics can be consumed from just the same as any other topic.

Cluster links and mirror topics are the building blocks you can use to create scalable, consistent architectures across regions, clouds, teams, and organizations.

Cluster Linking replicates essential metadata.

  • Cluster Linking applies the best practice of syncing topic configurations between the source and mirror topics. (Certain configurations are synced, others are not.)

  • You can enable consumer offset sync, which will sync consumer offsets from the source topic to the mirror topic (only for mirror topics), and you can filter to specific consumer groups if desired.

  • You can enable ACL sync, which will sync all ACLs on the cluster (not just for mirror topics). You can filter based on the topic name or the principal name, as needed.

These features are covered in the various Tutorials.

Supported Cluster Types

A cluster link sends data from a “source cluster” to a “destination cluster”. The supported cluster types are shown in the table below.

Unsupported cluster types and other limits are described in Limitations.

Source Cluster Options

Destination Cluster Options

Any Dedicated Confluent Cloud cluster

Dedicated Confluent Cloud cluster with private networking

Dedicated Confluent Cloud cluster under certain networking circumstances, see Cluster Linking with Private Networking.

Apache Kafka® 2.4+ or Confluent Platform 5.4+ with public internet IP addresses on all brokers

Any Dedicated Confluent Cloud cluster

Confluent Platform 7.1+ (even behind a firewall)

Any Dedicated Confluent Cloud cluster under certain networking circumstances, see Cluster Linking with Private Networking.

Any Dedicated Confluent Cloud cluster under certain networking circumstances, see Cluster Linking with Private Networking.

Confluent Platform 7.0+ (even behind a firewall)

If the source cluster is not a Confluent Cloud cluster, then it must be running running version 2.4 of the inter.broker.protocol (IBP).

Tip

  • The source cluster and destination cluster can be in different regions, cloud providers, Confluent Cloud environments, or Confluent Cloud organizations.

  • Dedicated legacy clusters are not supported as a destination with Cluster Linking. You should should seamlessly migrate to Dedicated clusters to use Cluster Linking.

How to Check the Cluster Type

To check a Confluent Cloud cluster’s type and endpoint type:

  1. Log on to Confluent Cloud.

  2. Select an environment.

  3. Select a cluster.

    The cluster type is shown on the summary card for the cluster.

    Alternatively, click into the cluster, and select Cluster settings from the left menu. The cluster type is shown on the summary card for “Cluster type”.

    From the left menu under Cluster overview for a dedicated cluster, click the Networking menu item to view the endpoint type. Only Dedicated clusters have the Networking tab; Basic and Standard clusters always have Internet networking. Networking is defined when you first create the Dedicated cluster.

Pricing

Confluent Cloud clusters that use Cluster Linking are charged based on the number of cluster links and the volume of mirroring throughput to or from the cluster.

For a detailed breakdown of how Cluster Linking is billed, including guidelines for using metrics to track your costs, see Cluster Linking in Confluent Cloud Billing.

Commands and Prerequisites

The destination cluster can use the confluent kafka link command to create a link from the source cluster. The following prerequisite steps are needed to run the tutorials during the Preview.

To try out Cluster Linking on Confluent Cloud:

  1. Install Confluent Cloud if you do not already have it.

    • As a shortcut alternative to installing from the web, you can get Confluent Cloud in two commands in your terminal window. (Replace ~/.local/bin in both commands with a different directory, if you wish.)

      curl -L --http1.1 https://cnfl.io/ccloud-cli | sh -s -- -b ~/.local/bin
      
      export PATH=~/.local/bin:$PATH;
      
    • To learn more about Confluent Cloud in general, see Quick Start for Confluent Cloud.

  2. Log on to Confluent Cloud.

  3. Update your Confluent CLI to be sure you have an up-to-date version of the Cluster Linking commands. See Get the latest version of the Confluent CLI in the quick start for details.

    The confluent kafka link command has the following subcommands or flags.

    CommandDescriptioncreateCreate a new cluster link.deleteDelete a previously created cluster link.describeDescribes an existing cluster link.listLists existing cluster links.updateUpdates a property for an existing cluster link.

    The confluent kafka mirror command has the following subcommands or flags.

    CommandDescriptiondescribeDescribe a mirror topic.failoverFailover the mirror topics.listList all mirror topics in the cluster or under the given cluster link.pausePause the mirror topics.promotePromote the mirror topics.resumeUpdates a property for an existing cluster link.

Pro Tips for the CLI

A list of Confluent CLI commands is available here. Following are some generic strategies for saving time on command line workflows.

Save command output to a text file

To keep track of information, save the output of the Confluent Cloud commands to a text file. If you do so, be sure to safeguard API keys and secrets afterwards by deleting the file or moving only the security codes to safer storage. To redirect command output to a file, you can use either of these methods and manually add in headings for organization:

  • To redirect output to a file, use Linux syntax such as <command> > notes.txt to run the first command and create the notes file, and then <command> >> notes.txt to append further output.

  • To send output to a file and also view it on-screen (recommended), use <command> | tee notes.txt to run the first command and create the file. Thereafter, use the tee command with the -a flag to append; for example, <command> | tee -a notes.txt.

Use configuration files to store data you will use in commands

Create configuration files to store API keys and secrets, detailed configurations on cluster links, or security credentials for clusters external to Confluent Cloud. Examples of this are provided in (Usually Optional) Use a config File in the topic data sharing tutorial and in Create the cluster link for the disaster recovery tutorial.

Use environment variables to store resource information

You can streamline your command line workflows by saving permissions and cluster data in shell environment variables. Save API keys and secrets, resources such as IDs for environments, clusters, or service accounts, and bootstrap servers, then use the variables in Confluent commands.

For example, create variables for an environment and clusters:

export CLINK_ENV=env-200py
export USA_EAST=lkc-qxxw7
export USA_WEST=lkc-1xx66

Then use these in commands:

$ confluent environment use $CLINK_ENV
Now using "env-200py" as the default (active) environment.

$ confluent kafka cluster use $USA_EAST
Set Kafka cluster "lkc-qxxw7" as the active cluster for environment "env-200py".

Put it all together in commands

Assuming you’ve created environment variables for your clusters, API keys, and secrets, and have cluster link configuration details in a file called link.config, here is an example of creating a cluster link named “east-west-link” using variables and your configuration file.

confluent kafka link create east-west-link \
  --cluster $DESTINATION_ID  \
  --source-cluster-id $ORIG_ID \
  --source-bootstrap-server $ORIG_BOOT  \
  --config-file link.config

Scaling Cluster Linking

Because Cluster Linking fetches data from source topics, the first scaling unit to inspect is the number of partitions in the source topics. Having enough partitions lets Cluster Linking mirror data in parallel. Having too few partitions can make Cluster Linking bottleneck on partitions that are more heavily used.

In Confluent Cloud, Cluster Linking scales with the ingress and egress quotas of your cluster. Cluster Linking is able to use all remaining bandwidth in a cluster’s throughput quota: 150 MB/s per CKU egress on a Confluent Cloud source cluster or 50 MB/s per CKU ingress on a Confluent Cloud destination cluster, whichever is hit first. Therefore, to scale Cluster Linking throughput, simply adjust the number of CKUs on either the source, the destination, or both.

(warning) On the destination cluster, Cluster Linking write takes lower priority than Kafka clients producing to that cluster; Cluster Linking will be throttled first. (warning)

Confluent proactively monitors all cluster links in Confluent Cloud and will perform tuning when necessary. If you find that your cluster link is not hitting these limits even after a full day of sustained traffic, contact Confluent Support.

In a Confluent Platform or Apache Kafka® cluster, you can scale Cluster Linking throughput as follows:

Limitations

This section details support and known limitations in terms of cluster types, cluster management, and performance.

  • Cluster Types and Networking

    • Currently supported cluster types are described in Supported Cluster Types.

    • A given cluster can only be the destination for five cluster links. Cluster Linking does not currently support aggregating data from more than five sources.

  • Security

    • OAuth is not supported for the Cluster Linking credential on Confluent Cloud clusters.

    • To learn more, see information about the Security Model for Cluster Linking.

  • ACL Syncing
    A key feature of Cluster Linking is the capability to sync ACLs between clusters. This is useful when moving clients between clusters for a migration or failover. However, in Confluent Cloud, ACLs on a cluster can only be created for service accounts that are in the same Confluent Cloud organization as the cluster itself. Therefore, in practice, ACL sync is only useful between two Confluent Cloud clusters that are in the same Confluent Cloud organization.
    ACL sync is not useful between two Confluent Cloud clusters in different organizations, between Confluent Platform and Confluent Cloud, or Apache Kafka® and Confluent Cloud.
    As a general rule, do not include in the sync filter ACLs that are managed independently on the destination cluster. This is to prevent cluster link migration from deleting ACLs that were added specifically on the destination and should not be deleted. See also, Configuring Cluster Link Behavior and Syncing ACLs from Source to Destination Cluster.
    (warning) Temporary known limitation: When using ACL sync, you must manually verify that ACL deletions on source cluster were deleted on the destination cluster. When ACLs are created or deleted on the source cluster, the ACL sync feature propagates these changes to the destination cluster. However, in rare circumstances, the deletion of ACLs on the source cluster is not synced to the destination cluster. Therefore, for the time being, if you are using the ACL sync feature, whenever you delete ACLs on the source cluster, you should verify that the ACLs were also deleted on the destination cluster. If they have not been deleted by the cluster link, then you should manually delete the ACLs on the destination cluster. (warning)

  • Management Limitations

    • Cluster links must be created and managed on the destination cluster.

    • Cluster links can only be created with destination clusters that are Dedicated Confluent Cloud clusters.

    • In Confluent Platform 7.0.0, source initiated cluster links (recommended for hybrid cloud) cannot be created with the REST API; they must be created with the kafka-cluster-links CLI. Regular, destination initiated cluster links can be created with either the REST API or kafka-cluster-links CLI.

    • In Confluent Platform 7.0.x, REST API calls to list and get source-initiated cluster links will have their destination cluster IDs returned under the parameter source_cluster_id.

    • Mirror topics count against a cluster’s topic limits, partition limits, and/or storage limits; just like other topics.

    • There is no limit to the number of topics or partitions a cluster link can have, up to the destination cluster’s maximum number of topics and partitions.

    • A cluster can have at most five cluster links targeting it as the destination; that is, not more than five cluster links that are replicating data to it. If you require more than five cluster links on one cluster, contact Confluent Support.

    • By definition, a mirror topic can only have one cluster link and one source topic replicating data to it. Alternatively, a single topic can be the source topic for an unlimited number of mirror topics.

    • The frequency of sync processes for consumer group offset sync, ACL sync, and topic configuration sync are user-configurable. The frequency with which these syncs can occur is limited to at most once per second (that is, 1000 ms, since the setting is in milliseconds). You can set these syncs to occur less frequently, but no more frequent than 1000 ms.

  • Kafka transactions such as “exactly once” semantics not supported on mirror topics

    • Cluster Linking is not integrated with Kafka transactions, including “exactly once” semantics. Using Cluster Linking to mirror topics that contain transactions or exactly once semantics is not supported and not recommended.

  • Feature Support and Permissions Requirements for Cloud Console

    • You must be a CloudClusterAdmin, EnvironmentAdmin, or OrgAdmin in order to create a cluster link and/or mirror topics in the Confluent Cloud Console. Using the UI without these roles will yield a permissions error when attempting to create a cluster link. (See also, Security for Cluster Linking on Confluent Cloud.)

    • You must be an OrgAdmin to use the first source-cluster option: “Confluent Cloud (in my org)” because you must be authorized to create a Service Account. Using that option without the OrgAdmin role will yield a permissions error when attempting to create a cluster link. (See also, Security for Cluster Linking on Confluent Cloud.)

    • At this time, you cannot add mirror topics to a cluster link that has a prefix set (cluster.link.prefix). You also cannot create a cluster link with a prefix in the Cloud Console, and you cannot view the prefix of an existing cluster link. To learn more about prefixing, see Prefixing Mirror Topics and Consumer Group Names.

  • Performance Limits

    • Throughput
      For Cluster Linking, throughput indicates bytes-per-second of data replication. The following performance factors and limitations apply.

      • Cluster Linking throughput (bytes-per-second of data replication) counts towards the destination cluster’s produce limits (also known as “ingress” or “write” limits). However, production from Kafka clients is prioritized over Cluster Linking writes; therefore, these are exposed as separate metrics in the Metrics API: Kafka client writes are received_bytes and Cluster Linking writes are cluster_link_destination_response_bytes

      • Cluster Linking consumes from the source cluster similar to Kafka consumers. Throughput (bytes-per-second of data replication) is treated the same as consumer throughput. Cluster Linking will contribute to any quotas and hard or soft limits on your source cluster. The Kafka client reads and Cluster Linking reads are therefore included in the same metric in the Metrics API: sent_bytes

      • Cluster Linking is able to max out the throughput of your CKUs. The physical distance between clusters is a factor of Cluster Linking performance. Confluent monitors cluster links and optimizes their performance. Unlike Replicator and Kafka MirrorMaker 2, Cluster Linking does not have a unique scaling (that is, tasks). You do not need to scale up or scale down your cluster links to increase performance.

    • Connections

      • Cluster Linking connections count towards any connection limits on your clusters.

    • Request rate

      • Cluster Linking contributes requests which count towards your source cluster’s request rate limits.