Operator Installation:
kubectl create ns confluent helm repo add confluentinc https://packages.confluent.io/helm helm upgrade --install operator confluentinc/confluent-for-kubernetes -n confluent kubectl get pods -n confluent
Connect Installation:
cat ccloud-credentials.txt username=<<api key>> password=<<api Secret>>
cat ccloud-sr-credentials.txt username=<<api key>> password=<<api Secret>>
kubectl create secret generic ccloud-credentials --from-file=plain.txt=ccloud-credentials.txt kubectl create secret generic ccloud-sr-credentials --from-file=basic.txt=ccloud-sr-credentials.txt
--- apiVersion: platform.confluent.io/v1beta1 kind: Connect metadata: name: connect spec: replicas: 1 image: application: confluentinc/cp-server-connect:7.2.0 init: confluentinc/confluent-init-container:2.4.0 build: type: onDemand onDemand: plugins: locationType: confluentHub confluentHub: - name: kafka-connect-jdbc owner: confluentinc version: 10.2.5 - name: kafka-connect-ably owner: ably version: 2.0.1 dependencies: kafka: bootstrapEndpoint: SASL_SSL://pkc-ldvr1.asia-southeast1.gcp.confluent.cloud:9092 authentication: type: plain jaasConfig: secretRef: ccloud-credentials tls: enabled: true ignoreTrustStoreConfig: true schemaRegistry: url: https://psrc-nx5kv.australia-southeast1.gcp.confluent.cloud authentication: type: basic basic: secretRef: ccloud-sr-credentials
k -n confluent apply -f kafka-connect.yaml
Connectors Creation:
--- apiVersion: platform.confluent.io/v1beta1 kind: Connector metadata: name: ably-confluent-sink-distributed namespace: confluent spec: class: "com.ably.kafka.connect.ChannelSinkConnector" taskMax: 4 connectClusterRef: name: connect configs: connector.class: "com.ably.kafka.connect.ChannelSinkConnector" tasks.max: "3" topics: "card_updated,account_updated,account_modified" client.id: "Ably-Kafka-Connector-Distributed" channel: "kafka-channel" client.key: "<<client key>>" message.name": "#{topic}"%
k -n confluent apply -f ably-connector.yaml kubectl port-forward connect-0 8083 curl localhost:8083/connectors
Topic Creation:
kubectl create secret generic cloud-rest-access \ --from-file=basic.txt=ccloud-credentials.txt --namespace confluent
apiVersion: platform.confluent.io/v1beta1 kind: KafkaTopic metadata: name: cloud-demo-topic namespace: confluent spec: replicas: 3 partitionCount: 4 configs: message.timestamp.type: "LogAppendTime" kafkaRest: endpoint: https://pkc-ldvr1.asia-southeast1.gcp.confluent.cloud:443 kafkaClusterID: lkc-zmwjv7 authentication: type: basic basic: secretRef: cloud-rest-access
k -n confluent apply -f topic.yaml
Schema Creation:
apiVersion: v1 kind: ConfigMap metadata: name: schema-config namespace: confluent data: schema: | { "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "email", "type": "string"} ] }
k apply -f schema-config.yaml
apiVersion: platform.confluent.io/v1beta1 kind: Schema metadata: name: confluent-demo-schema namespace: confluent spec: data: configRef: schema-config format: avro schemaRegistryRest: endpoint: https://psrc-nx5kv.australia-southeast1.gcp.confluent.cloud authentication: type: basic basic: secretRef: ccloud-sr-credentials name: confluent-demo-schema%
k apply -f schema.yaml
Results:
Reference:
Attachments:
image-20220812-053023.png (image/png)