Overview
The Ably Kafka Connector is a sink connector used to publish data from Kafka into Ably and is available to install from GitHub or Confluent Hub.
The connector will publish data from one or more Kafka topics into a single Ably channel.
The connector is built on top of Kafka Connect and can be run locally with Docker, or installed into an instance of Confluent Platform.
This connector development status is currently in Preview, so it’s not yet ready for Production use. https://ably.com/docs/general/kafka-connector
(UPDATE 28 Apr: Ably connector development status is no longer in Preview)
Installation, Initial Configuration and Usage
For this guide, we will try to run the connector locally with Docker.
Clone the connector repository from GitHub
Configure the connector, an example file already provided in
/config/example-connector.properties
. Rename it todocker-compose-connector.properties
to store the connector configuration.Specify the Ably channel you want to publish to, and API key provided from your Ably dashboard. This client key need to have publish capability for the specified Ably channel. In this tutorial, we will try to send to dynamic channel with Kafka message key as the customerId, and Kafka topic name as the Ably
message.name
. We also set the connector to consume from two Kafka topics, separated by comma (line 9).Start the cluster with
docker-compose up -d
.Once the containers have started, you can test the connector by subscribing to your Ably channel using SSE in a new terminal window. Replace
<channel-name>
with the channel set in your configuration file and<ably-api-key>
with an API key with the capability to subscribe to the channel.curl -s -u "<ably-api-key>" "https://realtime.ably.io/sse?channel=<channel-name>&v=1.1"
Note: SSE is only used as an example. An Ably SDK can also be used to subscribe to the channel.
Connect to the local Kafka at
localhost:29092
, and produce a message to topickafka-connect-ably-example
with001
as the message key, so the message will go to channelcustomer:001
.Go to the terminal window where we subscribed to the Ably channel. Messaged will be received with base64 encoding by default, because we haven’t set up the secret key for channel encryption. We can also see the
message.name
containing our Kafka topic name.
Channel Encryption and Usage
Currently the connector supports channel encryption, with one secret key applied globally to each Ably channel(s) that the connector is sending the message to. In this tutorial, we’ll try to set up encryption, and then decrypt the message using simple Node.js client.
Configure the encryption in connector configuration file by filling
client.channel.cipher.key
with a sample base64 cipher key.Save the configuration file, and then restart the connector Docker container to apply updated configuration.
Set up simple Ably client for subscribing to channel
customer:001
and decrypting the message, written in Node.js.const Ably = require("ably"); const ably = new Ably.Realtime({ key: "<ably-api-key>", // Replace with your Ably API Key }); ably.connection.on("connected", () => { console.log("Connected to Ably!"); }); const subscribeAbly = async () => { const channel = ably.channels.get("customer:001", { cipher: { key: "G98udOf2gFPJd0ITsIng8DdQJ32yhAjtRdTsMnCqkmw=", }, }); channel.attach((err) => { console.log("Channel attached with error", err); if (!err) { channel.subscribe((message) => { console.log( `Received an Ably message in realtime!\nmessage.name: ${message.name}\nmessage.data: ${message.data}` ); }); } }); }; subscribeAbly();
Run the client, and try sending the message through our Kafka topic. We can see the encrypted message in terminal, and it will be decrypted with our Ably client.
Because we already configured in the connector to consume from two topics, messages from the two topics will be sent to Ably, with Kafka topic name configured as Ably message name.
Kafka Message Filtering
Since SMT (Single Message Transforms) comes with Kafka Connect, we can do filtering for every messages that goes through Kafka topics before further processing (for this case, sending the message to Ably). Filter (Apache Kafka) is one of the SMTs that we can use within this Ably Kafka Connector.
Predicates
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Apache Kafka® Filter, predicates can conditionally filter out specific records.
Docs: https://docs.confluent.io/platform/current/connect/transforms/filter-ak.html
Here below is the predicate properties that can be used to filter the messages. We can match by topic name using regex, match by header key, or match by messages with null value.
In this tutorial, we will try to filter a message with a header key test
from being processed.
Add new configuration in connector configuration file
This
Filter
useHasHeaderKey
as Predicate, andtest
as the header key. So every Kafka message containing header keytest
will not be processed further.Try sending the Kafka message.
Pros and Cons
Pros | Cons |
---|---|
Easy to set up |
|
Can send Ably message to dynamic channels, with | Channel encryption only works globally, one secret key for all channels and clients. |
Attachments:
Screen Shot 2022-04-20 at 5.13.57 PM.png (image/png)
Screen Shot 2022-04-20 at 10.42.50 PM.png (image/png)
Screen Shot 2022-04-20 at 10.59.05 PM.png (image/png)
Screen Shot 2022-04-20 at 11.03.12 PM.png (image/png)
Screen Shot 2022-04-20 at 11.06.58 PM.png (image/png)
Screen Shot 2022-04-20 at 11.20.34 PM.png (image/png)
Screen Shot 2022-04-20 at 11.27.50 PM.png (image/png)
Screen Shot 2022-04-20 at 11.24.35 PM.png (image/png)
Screen Shot 2022-04-22 at 4.54.59 PM.png (image/png)
Screen Shot 2022-04-22 at 5.06.57 PM.png (image/png)
Screen Shot 2022-04-22 at 5.14.43 PM.png (image/png)
Screen Shot 2022-04-22 at 5.19.46 PM.png (image/png)
Screen Shot 2022-04-22 at 5.25.45 PM.png (image/png)
Screen Shot 2022-04-22 at 5.25.12 PM.png (image/png)
Screen Shot 2022-04-22 at 5.25.34 PM.png (image/png)
Screen Shot 2022-04-29 at 10.21.28.png (image/png)
Screen Shot 2022-04-29 at 10.21.12.png (image/png)
Ably peak.png (image/png)
Screen Shot 2022-04-29 at 10.22.01.png (image/png)
image-20220429-024348.png (image/png)