GitHub: https://github.com/SafiBank/SaFiMono/tree/main/common/schema
Overview
Common module to help working with Avro schemas for communication with Kafka.
To get a good understanding on the message types, naming conventions and how to make schema upgrades, consult Kafka message types and Kafka topic schema management.
Structure
The list of Kafka topics can be found in https://github.com/SafiBank/SaFiMono/blob/main/common/schema/topicSchemasDefinitions.json.
Each topic entry has the following structure:
{ "name": "transactions.airtimeload-transaction-occurred.event.v1", "schema": "transactions.AirtimeLoadTransactionOccurredEventV1", "oldName": "transaction.airtime_load", "oldSchema": "transaction.AirtimeLoadTransactionAvro", "ablyChannel": "", "messageKey": "UUID", "type": "EVENT", "schemaCompatLevel": "FORWARD_TRANSITIVE" },
The meaning of each field is explained here: https://safibank.atlassian.net/wiki/spaces/ITArch/pages/23199927/Kafka+topic+schema+management#Topic-schema-management
Avro schema file has to be put to folder named after the domain where it belongs, with name denoting the schema name and version suffix (increment, starting with 1):
schemas/<DOMAIN>/<SCHEMA_NAME>V<VERSION>.avsc
For example:
schemas/transactions/IntrabankTransactionOccurredEventV1.avsc
Usage
Validation phase
Validation occurs automatically when a pull request containing new files under common/schema/schemas is created. GH Action avro-schema-validator is triggered under such circumstances. For each new file, the following scripts are executed:
syntax check - whether file contains valid avro schema
if check fails, committer should revise and fix the schema file
schema compatibility check - performs checks in schema registry if the schema is compatible with previous versions (if any) for given subject (subject is from schema registry terminology, we use it interchangeably with topic from Kafka terminology for our use cases)
if check fails, committer should try to either modify the change so that it is compatible or create new topic for this non-compatible schema
For the whole schemas folder, following script is executed:
checks possible namespace collisions (if same class+namespace is defined in multiple schemas)
Code generation, topic creation & schema registration phase
Topic creation and schema registration logic was shifted from avro-model to argocd/helm responsibility. For more info refer to Kafka topic schema management
Code generation is meant to ease development related to producing and consuming messages to/from kafka. Avro schema files are the source of truth for generated classes.
GH action avro-schema-generator is triggered whenever there is any change anywhere under common/schema folder. It then invokes avroClassGenerator.sh script, which creates java classes out of all existing avro files, packages them as jar library and publishes this artifact to safi maven repository as ph.safibank.common:avro-model artefact (which microservices can then depend on and use generated classes).
We use gradle-avro-plugin for generating java classes from avro files, which enhances gradle build task with the desired functionality (plugin present in gradle under common/schema).
Local development
When developing new feature that will depend on avro schema generated classes, it is inconvenient to run the full circle (pull request for new schema into main, merge to main). It is advised to locally validate newly added schema byt running following script from common/schema folder:
./localAvroValidator.sh /path/to/avro/file
This validator will both validate the syntax of the schema file and will also run namespace collision check (if two classes with same packages are to be generated)
If validation is passed successfully, classes can be locally generated by running following script from common/schema folder:
./localAvroGenerator.sh
which would generate the artifact and store it to local .m2 repository. In the module that should depend on this new schema, simply add (as first entry, since gradle checks repositories in sequence)
mavenLocal()
to repositories section (do not commit this to the remote repo).
Generation of Kafka producer and listeners for micronaut
In addition to generation java classes for data serialization. We want to also provide helpers for listeners and producers. Design consideration what should they allow:
currently, there is no validation that you use correct topic with correct message key type and correct data object
there is no uniform logging across services that is provided out of box
currently, no service (except statement manager) that properly send idempotency key (some services call kafka message key idempotency key which is not always correct) and we would like to get away from this state
For generation of helper classes there is gradle task generateMicronautKafkaWrapers
that take templates from ListenerTemplate.txt and ProducerTemplate.txt and for each topic in topicSchemasDefinitions.json
and generate helper class to library that contain avro wrappers.
For topics that have a flag "delete": true
this process is skipped.
Also, for topics that do not have "messageKey": "String"
or "messageKey": "UUID"
we don't generate helpers.
How it looks if you want to use producer helper:
import ph.safibank.kafka.pesonetgateway.TransactionsPesonetgatewayCreateOutwardTransactionTempProducer @KafkaClient("some_kafka_setting_if_neccessary") abstract class OutwardTransactionProducer: TransactionsPesonetgatewayCreateOutwardTransactionTempProducer() // in another file class ServiceName( private val producer: OutwardTransactionProducer ) { ... producer.send(idempotencyKey, messageKey, messageAvro)
How it looks if you want to use listener helper:
import ph.safibank.kafka.pesonetgateway.TransactionsPesonetgatewayCreateOutwardTransactionTempListener @KafkaListener("some_kafka_setting_if_neccessary") class OutwardTransactionListener1( private val service: Service, ) : TransactionsPesonetgatewayCreateOutwardTransactionTempListener() { override fun process( _idempotencyKey: String?, _messageKey: UUID, msg: OutwardTransactionAvro) = service.processCreateOutwardTransactionCommand(msg) }
What we achieved with this approach:
you don't specify topics in listener, nor in producer
when implementing listener you have to override function process that has correct avro object that is tied to concrete topic so no mistake there
when sending message you may provide idempotency key (or keep it null) but at least you have to think about it
when sending message you have to provide messageKey
there is proper logging when we send/receive message
Logging payload of messages
For ease of debugging, wrapper logs payload of send/received kafka message. It does it currently with debug severity. Latest after end of november we should delete it and prepare for production and payload could not be logged in production because it may contain sensitive data. Maybe we can keep it in tests.
If you want to enable it in your microservice put this line in your logback.xml
file.
<logger name="ph.safibank.kafka" level="DEBUG"/>
List of available topic names
If for any reason you want to use topic name in your code, for example you don't use wrapper or you need it in tests. This library exposes all topic names in class KafkaTopicName
. Usage:
import ph.safibank.kafka.KafkaTopicName val topic = TopicName.ACCOUNT_MAIN_ACTIVATE_TEMP // or whatever topic you want