Overview
Kafka Connect is the pluggable, declarative data integration framework for Kafka, it either reads or push data in the Kafka Cluster. Kafka connect can come int 2 forms:
Connect - reads data from a source such as mysql, s3 etc. then push the data to the cluster
Sink - reads data from cluster then sends the data to the another data sore like Elasticseach, Ably etc.
Connect vs Connector vs Sink
The kafka Connect can be fully managed by the Confluent or self manage (if not supported) can be either deployed as VM or a container. It is a framework for connecting Kafka with external systems, such as databases, key-value stores, search indexes, and file systems. It allows you to import data from external systems into Kafka, or export data from Kafka to external systems.
A Connector, on the other hand, is a specific implementation of the Kafka Connect framework that provides a specific source or sink functionality. For example, there are connectors that allow you to import data from databases like MySQL or PostgreSQL, or export data to file systems like HDFS or S3.
Sink Connector is a specific type of connector that is used to export data from Kafka to external systems. It receives messages from one or more Kafka topics and writes the messages to the target system.
Standalone Mode vs Distributed Mode
Standalone mode is the simplest mode of operation, where a single instance of the Kafka Connect framework runs on a single machine. This mode is well-suited for small-scale or testing environments where you don't need the high availability or scalability features offered by the distributed mode.
Distributed mode, allows you to run multiple instances of the Kafka Connect framework in a cluster, providing high availability and scalability. This mode is well-suited for large-scale or production environments where you need the reliability and resiliency offered by a distributed architecture.
Implementation in ArgoCD
For our Implementation of Connect and Connectors in ArgoCD, we use Confluent Operator in Kube as a solution. Read more for kube context here Deploy Kafka-Connect in GKE Cluster for Confluent-Cloud
Create a Connect Server
Path: SaFiMono/devops/argocd/environments/{env}/confluent-kafka
Follow the usual convention for every connect server
kafka-connect-{app} # create a folder kafka-connect-{my app} ├── confluent-cluster-secret.yaml # secrets api for confluent cloud bootstrap ├── confluent-schema-registry-secret.yaml # secrets api for confluent cloud schema ├── ingress.yaml # ingress for the server cloud schema {optional?} └── kafka-connect.yaml # the yaml manifest of server itself
For Connect manifest, almost all configurations are the same for all Connect so follow the current setup and pay attention only on what plugins
to use, in this case, the connect server downloaded and the plugin for big query by we pay ( see link)
https://www.confluent.io/hub/wepay/kafka-connect-bigquery
--- apiVersion: platform.confluent.io/v1beta1 kind: Connect metadata: name: connect-accounts labels: app.kubernetes.io/name: kafka-connect-brave spec: configOverrides: server: - value.converter=io.confluent.connect.avro.AvroConverter - key.converter=org.apache.kafka.connect.converters.ByteArrayConverter - value.converter.schemas.enable=true .... ############# build: type: onDemand onDemand: plugins: locationType: confluentHub confluentHub: - name: kafka-connect-bigquery owner: wepay version: 2.3.4 ############# dependencies: kafka: ..... schemaRegistry: ... mountedSecrets: - secretRef: kafka-connect-brave-accounts
Configure a Connector
Path: SaFiMono/devops/argocd/environments/{env}/confluent-kafka/connectors/templates
Connectors are like configuration as a code, deployed in using helm template
Every connector is different since not all use the same plugin, pay attention to the following parameters
1. class:
always use the right class base on the plugin used in Connect
2. cofigs:
for every connector have different parameter configuration so read the official documentation of the connector plugin
3. topics
: topic in the confluent cloud to push or sink data with, comma separated for multiple topics, or use helm template with conditional parameters for dynamic coding.
4. transforms:
used to specify one or more transforms that should be applied to the data as it is being passed through the connector
Note: Not all cases are straight forward setup, as transferring data from one platform to another data types are sometimes not supported for conversion, check logs for errors and look for any transformation function in the plugin documentation for smooth data transition from confluent cloud to new platform or vise versa.
{{- $channels := list }} {{- range $topics := .Values.topics }} {{- if and (ne ($topics.ablyChannel) "") (eq ($topics.ablyChannel) "ably-channel-sink") }} {{ $channels = append $channels $topics.name }} {{- end}} {{- end}} kind: Connector metadata: name: ably-channel-sink-connector namespace: safi-confluent-operator spec: taskMax: 1 class: "com.ably.kafka.connect.ChannelSinkConnector" # Default values restartPolicy: type: OnFailure maxRetry: 10 connectClusterRef: name: connect-ably configs: .... topics: {{ join ", " $channels | quote }} transforms.headerToField.header.mappings: "header_key:STRING:header_key" ....