Previous steps

  • First you need to install httpie in your local machine

brew install httpie

  • Create a connectors configuration file

topics: topic from our kafka cluster(dev: http://172.16.48.7/) (stage:http://172.17.48.3/) brave(http://172.21.48.3/)

cps.topic: Google pub/sub topic

{
    "name": "CPSSinkConnector",
    "config": {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "value.convertor": "ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter",
        "tasks.max": "1",
        "topics": "kafka-avro-stage",
        "cps.topic": "kafka-connect-test",
        "cps.project": "datatest-348502"
    }
}

How to create Kafka connectors

dev kafka connector: http://172.16.48.7:8083/connectors

stage kafka connector: http://172.17.48.3:8083/connectors

brave kafka connector: http://172.21.48.3:8083/connectors

  • list connector

http http://34.87.120.51:8083/connectors
HTTP/1.1 200 OK
Content-Length: 140
Content-Type: application/json
Date: Fri, 30 Sep 2022 09:30:55 GMT
Server: Jetty(9.4.48.v20220622)

[
    "CustomerSnapshot",
    "IdCardSnapshot",
    "InterbankTransaction",
    "FacialVerificationSnapshot",
    "DeviceFingerprintSnapshot",
    "IntrabankTransaction"
]
  • create connector

http POST http://34.87.120.51:8083/connectors < cps-connector.json
HTTP/1.1 201 Created
Content-Length: 361
Content-Type: application/json
Date: Fri, 30 Sep 2022 07:44:26 GMT
Location: http://34.87.120.51:8083/connectors/CPSSinkConnector
Server: Jetty(9.4.48.v20220622)

{
    "config": {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "cps.project": "datatest-348502",
        "cps.topic": "kafka-connect-test",
        "name": "CPSSinkConnector",
        "tasks.max": "1",
        "topics": "kafka-avro-stage",
        "value.convertor": "ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter"
    },
    "name": "CPSSinkConnector",
    "tasks": [],
    "type": "sink"
}
  • query connector


  • delete connector

http DELETE http://34.87.120.51:8083/connectors/CPSSinkConnector
HTTP/1.1 204 No Content
Date: Fri, 30 Sep 2022 09:30:52 GMT
Server: Jetty(9.4.48.v20220622)

Deploy kafka connector

docker-compose.yml

version: '2'
services:
  kafka-connect-stage:
    image: confluentinc/cp-kafka-connect-base:6.2.7
    container_name: kafka-connect-stage
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "34.87.21.207:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-stage
      CONNECT_CONFIG_STORAGE_TOPIC: _stage_connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _stage_connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _stage_connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://34.87.21.207:8081"

      CONNECT_REST_ADVERTISED_HOST_NAME: "34.87.21.207"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      GOOGLE_APPLICATION_CREDENTIALS: /data/credentials.json
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jar
     
    volumes:
      - $PWD/data:/data

there are two jar files in data folder. see below structures.

data
├── connect-jar
│   ├── pubsub-kafka-connector.jar
│   └── utils.jar
└── credentials.json

pubsub-kafka-connector.jar can download from https://github.com/googleapis/java-pubsub-group-kafka-connector/releases or download from below

utils.jar can build from our source code https://github.com/SafiBank/SaFiMono/tree/main/common/utils or download from below.

credentials.json: download from google cloud https://console.cloud.google.com/iam-admin/serviceaccounts/details/114525919296248372049/keys?project=datatest-348502

the service account must be slacker-test@datatest-348502.iam.gserviceaccount.com , the others have no privileges to pub messages.

Attachments:

pubsub-kafka-connector.jar (application/java-archive)
utils.jar (application/java-archive)