Previous steps
First you need to install httpie in your local machine
brew install httpie
Create a connectors configuration file
topics: topic from our kafka cluster(dev: http://172.16.48.7/) (stage:http://172.17.48.3/) brave(http://172.21.48.3/)
cps.topic: Google pub/sub topic
{ "name": "CPSSinkConnector", "config": { "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector", "value.convertor": "ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter", "tasks.max": "1", "topics": "kafka-avro-stage", "cps.topic": "kafka-connect-test", "cps.project": "datatest-348502" } }
How to create Kafka connectors
dev kafka connector: http://172.16.48.7:8083/connectors
stage kafka connector: http://172.17.48.3:8083/connectors
brave kafka connector: http://172.21.48.3:8083/connectors
list connector
http http://34.87.120.51:8083/connectors HTTP/1.1 200 OK Content-Length: 140 Content-Type: application/json Date: Fri, 30 Sep 2022 09:30:55 GMT Server: Jetty(9.4.48.v20220622) [ "CustomerSnapshot", "IdCardSnapshot", "InterbankTransaction", "FacialVerificationSnapshot", "DeviceFingerprintSnapshot", "IntrabankTransaction" ]
create connector
http POST http://34.87.120.51:8083/connectors < cps-connector.json HTTP/1.1 201 Created Content-Length: 361 Content-Type: application/json Date: Fri, 30 Sep 2022 07:44:26 GMT Location: http://34.87.120.51:8083/connectors/CPSSinkConnector Server: Jetty(9.4.48.v20220622) { "config": { "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector", "cps.project": "datatest-348502", "cps.topic": "kafka-connect-test", "name": "CPSSinkConnector", "tasks.max": "1", "topics": "kafka-avro-stage", "value.convertor": "ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter" }, "name": "CPSSinkConnector", "tasks": [], "type": "sink" }
query connector
delete connector
http DELETE http://34.87.120.51:8083/connectors/CPSSinkConnector HTTP/1.1 204 No Content Date: Fri, 30 Sep 2022 09:30:52 GMT Server: Jetty(9.4.48.v20220622)
Deploy kafka connector
docker-compose.yml
version: '2' services: kafka-connect-stage: image: confluentinc/cp-kafka-connect-base:6.2.7 container_name: kafka-connect-stage ports: - 8083:8083 environment: CONNECT_BOOTSTRAP_SERVERS: "34.87.21.207:9092" CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: kafka-connect-stage CONNECT_CONFIG_STORAGE_TOPIC: _stage_connect-configs CONNECT_OFFSET_STORAGE_TOPIC: _stage_connect-offsets CONNECT_STATUS_STORAGE_TOPIC: _stage_connect-status CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: ph.safibank.common.utils.kafka.connect.converter.AvroToJsonConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://34.87.21.207:8081" CONNECT_REST_ADVERTISED_HOST_NAME: "34.87.21.207" CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" GOOGLE_APPLICATION_CREDENTIALS: /data/credentials.json CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jar volumes: - $PWD/data:/data
there are two jar files in data folder. see below structures.
data ├── connect-jar │ ├── pubsub-kafka-connector.jar │ └── utils.jar └── credentials.json
pubsub-kafka-connector.jar can download from https://github.com/googleapis/java-pubsub-group-kafka-connector/releases or download from below
utils.jar can build from our source code https://github.com/SafiBank/SaFiMono/tree/main/common/utils or download from below.
credentials.json: download from google cloud https://console.cloud.google.com/iam-admin/serviceaccounts/details/114525919296248372049/keys?project=datatest-348502
the service account must be slacker-test@datatest-348502.iam.gserviceaccount.com , the others have no privileges to pub messages.