SaFi Bank Space : Kafka Connect/Connectors

Overview

Kafka Connect is the pluggable, declarative data integration framework for Kafka, it either reads or push data in the Kafka Cluster. Kafka connect can come int 2 forms:

  1. Connect - reads data from a source such as mysql, s3 etc. then push the data to the cluster

  2. Sink - reads data from cluster then sends the data to the another data sore like Elasticseach, Ably etc.

Connect vs Connector vs Sink

The kafka Connect can be fully managed by the Confluent or self manage (if not supported) can be either deployed as VM or a container. It is a framework for connecting Kafka with external systems, such as databases, key-value stores, search indexes, and file systems. It allows you to import data from external systems into Kafka, or export data from Kafka to external systems.

A Connector, on the other hand, is a specific implementation of the Kafka Connect framework that provides a specific source or sink functionality. For example, there are connectors that allow you to import data from databases like MySQL or PostgreSQL, or export data to file systems like HDFS or S3.

Sink Connector is a specific type of connector that is used to export data from Kafka to external systems. It receives messages from one or more Kafka topics and writes the messages to the target system.

Standalone Mode vs Distributed Mode

Standalone mode is the simplest mode of operation, where a single instance of the Kafka Connect framework runs on a single machine. This mode is well-suited for small-scale or testing environments where you don't need the high availability or scalability features offered by the distributed mode.

Distributed mode, allows you to run multiple instances of the Kafka Connect framework in a cluster, providing high availability and scalability. This mode is well-suited for large-scale or production environments where you need the reliability and resiliency offered by a distributed architecture.

Implementation in ArgoCD

For our Implementation of Connect and Connectors in ArgoCD, we use Confluent Operator in Kube as a solution. Read more for kube context here Deploy Kafka-Connect in GKE Cluster for Confluent-Cloud

Create a Connect Server

Path: SaFiMono/devops/argocd/environments/{env}/confluent-kafka

Follow the usual convention for every connect server

kafka-connect-{app}                        # create a folder kafka-connect-{my app}
├── confluent-cluster-secret.yaml          # secrets api for confluent cloud bootstrap
├── confluent-schema-registry-secret.yaml  # secrets api for confluent cloud schema
├── ingress.yaml                           # ingress for the server cloud schema {optional?}
└── kafka-connect.yaml                     # the yaml manifest of server itself

For Connect manifest, almost all configurations are the same for all Connect so follow the current setup and pay attention only on what pluginsto use, in this case, the connect server downloaded and the plugin for big query by we pay ( see link)

https://www.confluent.io/hub/wepay/kafka-connect-bigquery

---
apiVersion: platform.confluent.io/v1beta1
kind: Connect
metadata:
  name: connect-accounts
  labels:
    app.kubernetes.io/name: kafka-connect-brave
spec:
  configOverrides:
    server:
      - value.converter=io.confluent.connect.avro.AvroConverter
      - key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
      - value.converter.schemas.enable=true
....

#############
  build:
    type: onDemand
    onDemand:
      plugins:
        locationType: confluentHub
        confluentHub:
          - name: kafka-connect-bigquery
            owner: wepay
            version: 2.3.4
        
#############
  dependencies:
    kafka:
.....
    schemaRegistry:
...
  mountedSecrets:
    - secretRef: kafka-connect-brave-accounts

Configure a Connector

Path: SaFiMono/devops/argocd/environments/{env}/confluent-kafka/connectors/templates

Connectors are like configuration as a code, deployed in using helm template

Every connector is different since not all use the same plugin, pay attention to the following parameters

1. class: always use the right class base on the plugin used in Connect
2. cofigs: for every connector have different parameter configuration so read the official documentation of the connector plugin
3. topics : topic in the confluent cloud to push or sink data with, comma separated for multiple topics, or use helm template with conditional parameters for dynamic coding.

4. transforms:used to specify one or more transforms that should be applied to the data as it is being passed through the connector

Note: Not all cases are straight forward setup, as transferring data from one platform to another data types are sometimes not supported for conversion, check logs for errors and look for any transformation function in the plugin documentation for smooth data transition from confluent cloud to new platform or vise versa.

{{- $channels := list }}
{{- range $topics := .Values.topics }}
{{- if and (ne ($topics.ablyChannel) "") (eq ($topics.ablyChannel) "ably-channel-sink") }} 
{{ $channels = append $channels $topics.name }}

{{- end}}
{{- end}}
kind: Connector
metadata:
  name: ably-channel-sink-connector
  namespace: safi-confluent-operator
spec:
  taskMax: 1
  class: "com.ably.kafka.connect.ChannelSinkConnector"
  # Default values
  restartPolicy:
    type: OnFailure
    maxRetry: 10
  connectClusterRef:
    name: connect-ably
  configs:
....
    topics: {{ join ", " $channels | quote }}
    transforms.headerToField.header.mappings: "header_key:STRING:header_key"
....