SaFi Bank Space : Deploy kafka-connectors in GKE for confluent-cloud using kafka-connect deployed in GKE

Kafka-connectors deployed in dev cluster are available here: https://kafka-connect.apps.dev.safibank.online/connectors

And Kafka-connector plugins installed in dev cluster are available here: https://kafka-connect.apps.dev.safibank.online/connector-plugins

Deploy Google BigQuery sink connector:

Step1:

Get the installation steps from confluent-hub https://www.confluent.io/hub/wepay/kafka-connect-bigquery i.e. confluent-hub install wepay/kafka-connect-bigquery:2.4.0

Step2:

Update the kafka-connect configuration to install the plugins for the google bigquery connector like this: Install plugins for BigQuery connector

Step3:

Get the google bigquery sink connector configurations from official confluent page and add the necessary secrets in the vault for the particular GCP project and create kubernetes secrets for the GCP credentials in the kafka-connect configurations.

apiVersion: v1
kind: Secret
metadata:
  name: bq-sinkconnector-creds
  namespace: safi-confluent-operator
  labels:
    app.kubernetes.io/name: bq-sinkconnector-dev
type: Opaque
data:
  credentials.json: <secret:secret/data/dev/confluent/gcp/bqsa~bigquery_sa_key|base64>

And Mount the above secret in the kafka-connect configurations like this

Step4:

Get the google bigquery sink connector configurations from official confluent page and update the configs like below and deploy using ArgoCD

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: bigquery-sink-connector
  namespace: safi-confluent-operator
spec:
  class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
  taskMax: 4
  connectClusterRef:
    name: connect
  configs:
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    tasks.max: "3"
    topics: "bq-quickstart1"
    sanitizeTopics: "true"
    autoCreateTables: "true"
    autoUpdateSchemas: "true"
    schemaRetriever: "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever"
    schemaRegistryLocation: "<secret:secret/data/dev/confluent/schema-registry~endpoint>"
    bufferSize: "100000"
    maxWriteSize: "10000"
    tableWriteWait: "1000"
    project: "<secret:secret/data/dev/confluent/gcp/bqsa~project_id>"
    defaultDataset: ".*=device_fingerprint_data"
    keyfile: "/mnt/secrets/bq-sinkconnector-creds/credentials.json"

Deploy Google pubsub source connector:

Step1:

Get the installation steps from confluent-hub https://www.confluent.io/hub/confluentinc/kafka-connect-gcp-pubsub i.e. confluent-hub install confluentinc/kafka-connect-gcp-pubsub:1.2.0

Step2:

Update the kafka-connect configuration to install the plugins for the google pubsub source connector like this: Install plugins for Pubsub connector

Step3:

Get the google pubsub source connector configurations from official confluent page and add the necessary secrets in the vault for the particular GCP project and create kubernetes secrets for the GCP credentials in the kafka-connect configurations.

apiVersion: v1
kind: Secret
metadata:
  name: pubsub-sourceconnector-creds
  namespace: safi-confluent-operator
  labels:
    app.kubernetes.io/name: pubsub-sourceconnector-dev
type: Opaque
data:
  credentials.json: <secret:secret/data/dev/confluent/gcp/pubsubsa~pubsub_sa_key|base64>

And Mount the above secret in the kafka-connect configurations like this

Step4:

Get the google pubsub source connector configurations from official confluent page and update the configs like below and deploy using ArgoCD

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: pubsub-source-connector
  namespace: safi-confluent-operator
spec:
  class: "io.confluent.connect.gcp.pubsub.PubSubSourceConnector"
  taskMax: 4
  connectClusterRef:
    name: connect
  configs:
    connector.class: "io.confluent.connect.gcp.pubsub.PubSubSourceConnector"
    tasks.max: "3"
    kafka.topic: "test-pubsub-topic"
    gcp.pubsub.subscription.id: "subscription-1"
    gcp.pubsub.topic.id: "topic-1"
    gcp.pubsub.project.id: "<secret:secret/data/dev/confluent/gcp/pubsubsa~project_id>"
    gcp.pubsub.credentials.path: "/mnt/secrets/pubsub-sourceconnector-creds/credentials.json"
    confluent.topic.bootstrap.servers: "<secret:secret/data/dev/confluent/kafka~bootstrap-endpoint>"
    confluent.topic.replication.factor: "1"

We found an issue with HA for kafka-connect.. i.e. kafka-connect is getting down whenever new connector gets deployed via the kafka-connect deployed in GKE

Attachments: