SaFi Bank Space : Ably Kafka Connector

Overview

The Ably Kafka Connector is a sink connector used to publish data from Kafka into Ably and is available to install from GitHub or Confluent Hub.

The connector will publish data from one or more Kafka topics into a single Ably channel.

The connector is built on top of Kafka Connect and can be run locally with Docker, or installed into an instance of Confluent Platform.

This connector development status is currently in Preview, so it’s not yet ready for Production use. https://ably.com/docs/general/kafka-connector
(UPDATE 28 Apr: Ably connector development status is no longer in Preview)

Installation, Initial Configuration and Usage

For this guide, we will try to run the connector locally with Docker.

  1. Clone the connector repository from GitHub

  2. Configure the connector, an example file already provided in /config/example-connector.properties. Rename it to docker-compose-connector.properties to store the connector configuration.

  3. Specify the Ably channel you want to publish to, and API key provided from your Ably dashboard. This client key need to have publish capability for the specified Ably channel. In this tutorial, we will try to send to dynamic channel with Kafka message key as the customerId, and Kafka topic name as the Ably message.name. We also set the connector to consume from two Kafka topics, separated by comma (line 9).

  4. Start the cluster with docker-compose up -d .

  5. Once the containers have started, you can test the connector by subscribing to your Ably channel using SSE in a new terminal window. Replace <channel-name> with the channel set in your configuration file and <ably-api-key> with an API key with the capability to subscribe to the channel.
    curl -s -u "<ably-api-key>" "https://realtime.ably.io/sse?channel=<channel-name>&v=1.1"

    Note: SSE is only used as an example. An Ably SDK can also be used to subscribe to the channel.

  6. Connect to the local Kafka at localhost:29092, and produce a message to topic kafka-connect-ably-example with 001 as the message key, so the message will go to channel customer:001.

  7. Go to the terminal window where we subscribed to the Ably channel. Messaged will be received with base64 encoding by default, because we haven’t set up the secret key for channel encryption. We can also see the message.name containing our Kafka topic name.


Channel Encryption and Usage

Currently the connector supports channel encryption, with one secret key applied globally to each Ably channel(s) that the connector is sending the message to. In this tutorial, we’ll try to set up encryption, and then decrypt the message using simple Node.js client.

  1. Configure the encryption in connector configuration file by filling client.channel.cipher.key with a sample base64 cipher key.

  2. Save the configuration file, and then restart the connector Docker container to apply updated configuration.

  3. Set up simple Ably client for subscribing to channel customer:001 and decrypting the message, written in Node.js.

    const Ably = require("ably");
    
    const ably = new Ably.Realtime({
      key: "<ably-api-key>", // Replace with your Ably API Key
    });
    
    ably.connection.on("connected", () => {
      console.log("Connected to Ably!");
    });
    
    const subscribeAbly = async () => {
      const channel = ably.channels.get("customer:001", {
        cipher: {
          key: "G98udOf2gFPJd0ITsIng8DdQJ32yhAjtRdTsMnCqkmw=", 
        },
      });
      channel.attach((err) => {
        console.log("Channel attached with error", err);
        if (!err) {
          channel.subscribe((message) => {
            console.log(
              `Received an Ably message in realtime!\nmessage.name: ${message.name}\nmessage.data: ${message.data}`
            );
          });
        }
      });
    };
    
    subscribeAbly();
    

  4. Run the client, and try sending the message through our Kafka topic. We can see the encrypted message in terminal, and it will be decrypted with our Ably client.

    Because we already configured in the connector to consume from two topics, messages from the two topics will be sent to Ably, with Kafka topic name configured as Ably message name.

Kafka Message Filtering

Since SMT (Single Message Transforms) comes with Kafka Connect, we can do filtering for every messages that goes through Kafka topics before further processing (for this case, sending the message to Ably). Filter (Apache Kafka) is one of the SMTs that we can use within this Ably Kafka Connector.

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Apache Kafka® Filter, predicates can conditionally filter out specific records.

Docs: https://docs.confluent.io/platform/current/connect/transforms/filter-ak.html

Here below is the predicate properties that can be used to filter the messages. We can match by topic name using regex, match by header key, or match by messages with null value.

In this tutorial, we will try to filter a message with a header key test from being processed.

  1. Add new configuration in connector configuration file

    This Filter use HasHeaderKey as Predicate, and test as the header key. So every Kafka message containing header key test will not be processed further.

  2. Try sending the Kafka message.

Pros and Cons

Pros

Cons

Easy to set up

Development status is still in Preview, not yet ready to be used in Production environment

Can send Ably message to dynamic channels, with key (from Kafka message) and topic (from Kafka topic name)

Channel encryption only works globally, one secret key for all channels and clients.
(UPDATE 28 Apr: Currently Ably team already aware for this use case, and they’re working on per client encryption)