We are using Kafka-Connect to convert existing Kafka messages emitted by services to Pub/Sub messages processable by the GCP Cloud functions.
How we deliver Kafka message to GCP Pub/Sub
Here is a general diagram to illustrate how the data transferred between our Kafka and GCP Pub/Sub:
We deployed Kafka-Connect as a single process which will subscribe to specific topics in Kafka. There are three steps for delivering message from Kafka to GCP Pub/Sub:
Kafka received Avro messages and send messages to Converter(AvroToJsonConverter)
After Converter received the Avro message from Kafka, it will convert it to JSON format according to the Avro schema and send message to Connector(CloudPubSubSinkConnector)
When Connector received the JSON format message, it will send Pub/Sub message directly to the pre-configured GCP Pub/Sub Topic
Implementation Details
Kafka-Connect
Currently for PoC period, we have created a VM instance in GCP Data Test project to host the Kafka-Connect process. And we are using Docker to run the Kafka-Connect.
Here is the current docker-compose file which we are using to deploy Kafka-Connect:
version: '2' services: kafka-connect: image: confluentinc/cp-kafka-connect-base:6.2.0 container_name: kafka-connect ports: - 8083:8083 environment: CONNECT_BOOTSTRAP_SERVERS: "34.124.153.44:9092" CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: kafka-connect CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets CONNECT_STATUS_STORAGE_TOPIC: _connect-status CONNECT_KEY_CONVERTER: ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://34.124.153.44:8081' CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" GOOGLE_APPLICATION_CREDENTIALS: /data/simbo-76741-dbc8e3a4940f.json CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars volumes: - $PWD/data:/data
Currently we are using jars to deploy the Pub/Sub sink connector. You can find the jar here: https://github.com/GoogleCloudPlatform/pubsub/releases
And we create a customized Converter(AvroToJsonConverter) to do the format conversion from Avro to JSON. Here is the location for this Converter in SaFi Mono repo: common/utils/src/main/kotlin/ph/safibank/common/utils/kafka/connect/converter/AvroToJsonConverter.kt
CloudPubSubSinkConnector
After we deployed the Kafka-Connect, we are using the REST API which provided by Kafka-Connect to manage the connectors.
Here is a sample config for creating a connector in Kafka-Connect:
{ "name": "test-connector-json", "config": { "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector", "tasks.max": "1", "topics": "loans.overdraft.updates.temp", "cps.topic": "overdraft-test-json", "cps.project": "datatest-348502" } }
Please note that the cps.topic
and cps.project
are the topic and project for GCP Pub/Sub.
And in order to help us to do the creation of connector, we made a python script to help us call the REST API. For more details for this script, please refer to: services/slacker-manager/tools/kafka-connect-tool.py
to see more details.
Kafka Schema | Kafka Topic | Pub Sub Topic |
DeviceFingerprintSnapshotAvro-v1 |
|
|
FacialVerificationSnapshotAvro-v1 |
|
|
IdCardSnapshotAvro-v1 |
|
|
CustomerSnapshotAvro-v1 |
|
|
InterbankTransactionAvro-v1 |
|
|
IntrabankTransactionAvro-v1 |
|
|
Attachments:
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:229b5867-a6cd-46a1-9572-1eb4b6e6294b~Kafka-connect to PubSub.tmp (application/vnd.jgraph.mxfile)
Kafka-connect to PubSub (application/vnd.jgraph.mxfile)
Kafka-connect to PubSub.png (image/png)