We are using Kafka-Connect to convert existing Kafka messages emitted by services to Pub/Sub messages processable by the GCP Cloud functions.

How we deliver Kafka message to GCP Pub/Sub

Here is a general diagram to illustrate how the data transferred between our Kafka and GCP Pub/Sub:

We deployed Kafka-Connect as a single process which will subscribe to specific topics in Kafka. There are three steps for delivering message from Kafka to GCP Pub/Sub:

  1. Kafka received Avro messages and send messages to Converter(AvroToJsonConverter)

  2. After Converter received the Avro message from Kafka, it will convert it to JSON format according to the Avro schema and send message to Connector(CloudPubSubSinkConnector)

  3. When Connector received the JSON format message, it will send Pub/Sub message directly to the pre-configured GCP Pub/Sub Topic

Implementation Details

Kafka-Connect

Currently for PoC period, we have created a VM instance in GCP Data Test project to host the Kafka-Connect process. And we are using Docker to run the Kafka-Connect.

Here is the current docker-compose file which we are using to deploy Kafka-Connect:

version: '2'
services:
  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.2.0
    container_name: kafka-connect
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "34.124.153.44:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://34.124.153.44:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      GOOGLE_APPLICATION_CREDENTIALS: /data/simbo-76741-dbc8e3a4940f.json
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    volumes:
      - $PWD/data:/data

Currently we are using jars to deploy the Pub/Sub sink connector. You can find the jar here: https://github.com/GoogleCloudPlatform/pubsub/releases

And we create a customized Converter(AvroToJsonConverter) to do the format conversion from Avro to JSON. Here is the location for this Converter in SaFi Mono repo: common/utils/src/main/kotlin/ph/safibank/common/utils/kafka/connect/converter/AvroToJsonConverter.kt

CloudPubSubSinkConnector

After we deployed the Kafka-Connect, we are using the REST API which provided by Kafka-Connect to manage the connectors.

Here is a sample config for creating a connector in Kafka-Connect:

{
    "name": "test-connector-json",
    "config": {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "tasks.max": "1",
        "topics": "loans.overdraft.updates.temp",
        "cps.topic": "overdraft-test-json",
        "cps.project": "datatest-348502"
    }
}

Please note that the cps.topic and cps.project are the topic and project for GCP Pub/Sub.

And in order to help us to do the creation of connector, we made a python script to help us call the REST API. For more details for this script, please refer to: services/slacker-manager/tools/kafka-connect-tool.py to see more details.

Kafka Schema

Kafka Topic

Pub Sub Topic

DeviceFingerprintSnapshotAvro-v1

customer.device-fingerprint.snapshots.temp

customer-device-fingerprint-snapshot-json

FacialVerificationSnapshotAvro-v1

customer.snapshots.facial_verification

customer-FacialVerificationSnapshotAvro-json

IdCardSnapshotAvro-v1

customer.snapshots.id_card

customer-IdCardSnapshotAvro-json

CustomerSnapshotAvro-v1

customer.snapshots.temp

customer-snapshot-json

InterbankTransactionAvro-v1

transaction.interbank.temp

transaction-interbank-temp-json

IntrabankTransactionAvro-v1

transaction.intrabank.temp

transaction-intrabank-temp-json

Attachments:

~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
Kafka-connect to PubSub (application/vnd.jgraph.mxfile)
Kafka-connect to PubSub.png (image/png)