SaFi Bank Space : Setup Ably connector with distributed mode

below are steps for setup local machine for demo ably real time message with kafka ably connector.

(blue star) Instructions

  1. Clone repo
    https://github.com/ably/kafka-connect-ably

  2. Adjust docker-compose-worker-distributed.properties
    set connector class, converter, plugin path, etc

    bootstrap.servers=kafka:9092
    connector.class = \"com.ably.kafka.connect.ChannelSinkConnector\"
    group.id=connect-cluster
    
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    key.converter.schema.registry.url=http://schema-registry:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://schema-registry:8081
    
    config.storage.topic=connect-configs
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-statuses
    
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    plugin.path=/plugins,/usr/share/confluent-hub-components

  3. Run container with distributed mode.
    (warning) If you encounter no matching manifest for linux/arm64/v8 in the manifest list entries
    error on M1 chip executing the command below, you may need to adjust docker-compose service platform: or run on a non-M1 chip.

    docker-compose -f docker-compose-distributed.yml up -d

  4. (warning) Only add or adjust the following environment variables in docker-compose-distributed.yml if replication factor defaults to more than the available brokers.

      kafka:
        # omitted others for brevity
        environment:
          KAFKA_DEFAULT_REPLICATION_FACTOR: 1
          KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1


  5. Setup Kafka connector add ably.
    use dynamic channel with format customer:#{key}.
    topic name (#{topic}) will be use as event type on front end side.

    payload:

    {
      "name": "ably-channel-sink",
      "config": {
        "connector.class": "com.ably.kafka.connect.ChannelSinkConnector",
        "tasks.max": "2",
        "group.id": "ably-connect-cluster",
        "topics": "card_updated,account_updated",
        "client.id": "Ably-Kafka-Connector",
        "channel": "customer:#{key}",
        "message.name": "#{topic}",
        "client.key": ""
      }
    }


  6. Setup schema registry.
    topic account_updated

    {
      "fields": [
        {
          "name": "name",
          "type": "string"
        },
        {
          "name": "balance",
          "type": "int"
        }
      ],
      "name": "AccountUpdated",
      "namespace": "com.safi.accountUpdated.v1",
      "type": "record"
    }

    topic card_updated

    {
      "fields": [
        {
          "name": "name",
          "type": "string"
        },
        {
          "name": "limit",
          "type": "int"
        }
      ],
      "name": "CardUpdated",
      "namespace": "com.safi.cardUpdated.v1",
      "type": "record"
    }


  7. Add kafka connect transform common.
    since official message transform not support insert field from header, we use additional plugin.
    usecase is to add requestId from header to message field.
    plugin: https://github.com/jcustenborder/kafka-connect-transform-common

    Please choose no 2 for below option

    The component can be installed in any of the following Confluent Platform installations: 
      1. / (installed rpm/deb package) 
      2. / (where this tool is installed) 
    Choose one of these to continue the installation (1-2): 2
    Do you want to install this into /usr/share/confluent-hub-components? (yN) y


  8. Update connector config to use message transform.
    endpoint : http://localhost:8083/connectors/ably-channel-sink/config

    {
      "connector.class": "com.ably.kafka.connect.ChannelSinkConnector",
        "tasks.max": "2",
        "group.id": "ably-connect-cluster",
        "topics": "card_updated,account_updated",
        "client.id": "Ably-Kafka-Connector",
        "channel": "customer:#{key}",
        "message.name": "#{topic}",
        "client.key": "",
        "transforms" : "headerToField",
        "transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
        "transforms.headerToField.header.mappings" : "header_key:STRING:header_key"
    }

  9. Produce message.
    avoid using confluent control center to produce avro message.

Ably Kafka Connector

Attachments: