Operator Installation:

kubectl create ns confluent
helm repo add confluentinc https://packages.confluent.io/helm
helm upgrade --install operator confluentinc/confluent-for-kubernetes -n confluent
kubectl get pods -n confluent

Connect Installation:

cat ccloud-credentials.txt
username=<<api key>>
password=<<api Secret>>
cat ccloud-sr-credentials.txt
username=<<api key>>
password=<<api Secret>>
kubectl create secret generic ccloud-credentials --from-file=plain.txt=ccloud-credentials.txt
kubectl create secret generic ccloud-sr-credentials --from-file=basic.txt=ccloud-sr-credentials.txt
---
apiVersion: platform.confluent.io/v1beta1
kind: Connect
metadata:
  name: connect
spec:
  replicas: 1
  image:
    application: confluentinc/cp-server-connect:7.2.0
    init: confluentinc/confluent-init-container:2.4.0
  build:
    type: onDemand
    onDemand:
      plugins:
        locationType: confluentHub
        confluentHub:
          - name: kafka-connect-jdbc
            owner: confluentinc
            version: 10.2.5
          - name: kafka-connect-ably
            owner: ably
            version: 2.0.1
  dependencies:
    kafka:
      bootstrapEndpoint: SASL_SSL://pkc-ldvr1.asia-southeast1.gcp.confluent.cloud:9092
      authentication:
        type: plain
        jaasConfig:
          secretRef: ccloud-credentials
      tls:
        enabled: true
        ignoreTrustStoreConfig: true 
    schemaRegistry:
      url: https://psrc-nx5kv.australia-southeast1.gcp.confluent.cloud
      authentication:
        type: basic
        basic:
          secretRef: ccloud-sr-credentials
k -n confluent apply -f kafka-connect.yaml

Connectors Creation:

---
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: ably-confluent-sink-distributed
  namespace: confluent
spec:
  class: "com.ably.kafka.connect.ChannelSinkConnector"
  taskMax: 4
  connectClusterRef:
    name: connect
  configs:
    connector.class: "com.ably.kafka.connect.ChannelSinkConnector"
    tasks.max: "3"
    topics: "card_updated,account_updated,account_modified"
    client.id: "Ably-Kafka-Connector-Distributed"
    channel: "kafka-channel"
    client.key: "<<client key>>"
    message.name": "#{topic}"%

k -n confluent apply -f ably-connector.yaml
kubectl port-forward connect-0 8083
curl localhost:8083/connectors

Topic Creation:

kubectl create secret generic cloud-rest-access \
  --from-file=basic.txt=ccloud-credentials.txt --namespace confluent
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: cloud-demo-topic
  namespace: confluent
spec:
  replicas: 3
  partitionCount: 4
  configs:
    message.timestamp.type: "LogAppendTime"
  kafkaRest:
    endpoint: https://pkc-ldvr1.asia-southeast1.gcp.confluent.cloud:443
    kafkaClusterID: lkc-zmwjv7
    authentication:
      type: basic
      basic:
        secretRef: cloud-rest-access
k -n confluent apply -f topic.yaml

Schema Creation:

apiVersion: v1
kind: ConfigMap
metadata:
  name: schema-config
  namespace: confluent
data:
  schema: |
    {
      "namespace": "io.confluent.examples.clients.basicavro",
      "type": "record",
      "name": "Payment",
      "fields": [
        {"name": "id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "email", "type": "string"}
      ]
    }
k apply -f schema-config.yaml
apiVersion: platform.confluent.io/v1beta1
kind: Schema
metadata:
  name: confluent-demo-schema
  namespace: confluent
spec:
  data:
    configRef: schema-config
    format: avro
  schemaRegistryRest:
    endpoint: https://psrc-nx5kv.australia-southeast1.gcp.confluent.cloud
    authentication:
      type: basic
      basic:
        secretRef: ccloud-sr-credentials
  name: confluent-demo-schema%
k apply -f schema.yaml

Results:

Reference:

  1. https://github.com/confluentinc/confluent-kubernetes-examples/tree/master/hybrid

Attachments: