below are steps for setup local machine for demo ably real time message with kafka ably connector.
Instructions
Adjust
docker-compose-worker-distributed.properties
set connector class, converter, plugin path, etcbootstrap.servers=kafka:9092 connector.class = \"com.ably.kafka.connect.ChannelSinkConnector\" group.id=connect-cluster key.converter=org.apache.kafka.connect.converters.ByteArrayConverter key.converter.schema.registry.url=http://schema-registry:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://schema-registry:8081 config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-statuses config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false plugin.path=/plugins,/usr/share/confluent-hub-components
Run container with distributed mode.
If you encounterno matching manifest for linux/arm64/v8 in the manifest list entries
error onM1 chip
executing the command below, you may need to adjust docker-compose serviceplatform:
or run on a non-M1 chip.docker-compose -f docker-compose-distributed.yml up -d
Only add or adjust the following environment variables in
docker-compose-distributed.yml
if replication factor defaults to more than the available brokers.kafka: # omitted others for brevity environment: KAFKA_DEFAULT_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
Setup Kafka connector add ably.
use dynamic channel with formatcustomer:#{key}.
topic name (#{topic}
) will be use as event type on front end side.
payload:{ "name": "ably-channel-sink", "config": { "connector.class": "com.ably.kafka.connect.ChannelSinkConnector", "tasks.max": "2", "group.id": "ably-connect-cluster", "topics": "card_updated,account_updated", "client.id": "Ably-Kafka-Connector", "channel": "customer:#{key}", "message.name": "#{topic}", "client.key": "" } }
Setup schema registry.
topicaccount_updated
{ "fields": [ { "name": "name", "type": "string" }, { "name": "balance", "type": "int" } ], "name": "AccountUpdated", "namespace": "com.safi.accountUpdated.v1", "type": "record" }
topic
card_updated
{ "fields": [ { "name": "name", "type": "string" }, { "name": "limit", "type": "int" } ], "name": "CardUpdated", "namespace": "com.safi.cardUpdated.v1", "type": "record" }
Add kafka connect transform common.
since official message transform not support insert field from header, we use additional plugin.
usecase is to add requestId from header to message field.
plugin: https://github.com/jcustenborder/kafka-connect-transform-common
Please choose no 2 for below optionThe component can be installed in any of the following Confluent Platform installations: 1. / (installed rpm/deb package) 2. / (where this tool is installed) Choose one of these to continue the installation (1-2): 2 Do you want to install this into /usr/share/confluent-hub-components? (yN) y
Update connector config to use message transform.
endpoint : http://localhost:8083/connectors/ably-channel-sink/config{ "connector.class": "com.ably.kafka.connect.ChannelSinkConnector", "tasks.max": "2", "group.id": "ably-connect-cluster", "topics": "card_updated,account_updated", "client.id": "Ably-Kafka-Connector", "channel": "customer:#{key}", "message.name": "#{topic}", "client.key": "", "transforms" : "headerToField", "transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value", "transforms.headerToField.header.mappings" : "header_key:STRING:header_key" }
Produce message.
avoid using confluent control center to produce avro message.
Related articles
Attachments:
image-20220617-084112.png (image/png)
image-20220617-084412.png (image/png)