GitHub: https://github.com/SafiBank/SaFiMono/tree/main/common/schema

Overview

Common module to help working with Avro schemas for communication with Kafka.

To get a good understanding on the message types, naming conventions and how to make schema upgrades, consult Kafka message types and Kafka topic schema management.

Structure

The list of Kafka topics can be found in https://github.com/SafiBank/SaFiMono/blob/main/common/schema/topicSchemasDefinitions.json.

Each topic entry has the following structure:

{
      "name": "transactions.airtimeload-transaction-occurred.event.v1",
      "schema": "transactions.AirtimeLoadTransactionOccurredEventV1",
      "oldName": "transaction.airtime_load",
      "oldSchema": "transaction.AirtimeLoadTransactionAvro",
      "ablyChannel": "",
      "messageKey": "UUID",
      "type": "EVENT",
      "schemaCompatLevel": "FORWARD_TRANSITIVE"
},

The meaning of each field is explained here: https://safibank.atlassian.net/wiki/spaces/ITArch/pages/23199927/Kafka+topic+schema+management#Topic-schema-management

Avro schema file has to be put to folder named after the domain where it belongs, with name denoting the schema name and version suffix (increment, starting with 1):

schemas/<DOMAIN>/<SCHEMA_NAME>V<VERSION>.avsc

For example:

schemas/transactions/IntrabankTransactionOccurredEventV1.avsc

Usage

Validation phase

Validation occurs automatically when a pull request containing new files under common/schema/schemas is created. GH Action avro-schema-validator is triggered under such circumstances. For each new file, the following scripts are executed:

  • avroSchemaSyntaxValidator

    • syntax check - whether file contains valid avro schema

    • if check fails, committer should revise and fix the schema file

  • avroSchemaCompatibilityCheck

    • schema compatibility check - performs checks in schema registry if the schema is compatible with previous versions (if any) for given subject (subject is from schema registry terminology, we use it interchangeably with topic from Kafka terminology for our use cases)

    • if check fails, committer should try to either modify the change so that it is compatible or create new topic for this non-compatible schema

For the whole schemas folder, following script is executed:

Code generation, topic creation & schema registration phase

Topic creation and schema registration logic was shifted from avro-model to argocd/helm responsibility. For more info refer to Kafka topic schema management

Code generation is meant to ease development related to producing and consuming messages to/from kafka. Avro schema files are the source of truth for generated classes.

GH action avro-schema-generator is triggered whenever there is any change anywhere under common/schema folder. It then invokes avroClassGenerator.sh script, which creates java classes out of all existing avro files, packages them as jar library and publishes this artifact to safi maven repository as ph.safibank.common:avro-model artefact (which microservices can then depend on and use generated classes).

We use gradle-avro-plugin for generating java classes from avro files, which enhances gradle build task with the desired functionality (plugin present in gradle under common/schema).

Local development

When developing new feature that will depend on avro schema generated classes, it is inconvenient to run the full circle (pull request for new schema into main, merge to main). It is advised to locally validate newly added schema byt running following script from common/schema folder:

./localAvroValidator.sh /path/to/avro/file

This validator will both validate the syntax of the schema file and will also run namespace collision check (if two classes with same packages are to be generated)

If validation is passed successfully, classes can be locally generated by running following script from common/schema folder:

./localAvroGenerator.sh

which would generate the artifact and store it to local .m2 repository. In the module that should depend on this new schema, simply add (as first entry, since gradle checks repositories in sequence)

mavenLocal()

to repositories section (do not commit this to the remote repo).

Generation of Kafka producer and listeners for micronaut

In addition to generation java classes for data serialization. We want to also provide helpers for listeners and producers. Design consideration what should they allow:

  • currently, there is no validation that you use correct topic with correct message key type and correct data object

  • there is no uniform logging across services that is provided out of box

  • currently, no service (except statement manager) that properly send idempotency key (some services call kafka message key idempotency key which is not always correct) and we would like to get away from this state

For generation of helper classes there is gradle task generateMicronautKafkaWrapers that take templates from ListenerTemplate.txt and ProducerTemplate.txt and for each topic in topicSchemasDefinitions.json and generate helper class to library that contain avro wrappers.

For topics that have a flag "delete": true this process is skipped.

Also, for topics that do not have "messageKey": "String" or "messageKey": "UUID" we don't generate helpers.

How it looks if you want to use producer helper:

import ph.safibank.kafka.pesonetgateway.TransactionsPesonetgatewayCreateOutwardTransactionTempProducer

@KafkaClient("some_kafka_setting_if_neccessary")
abstract class OutwardTransactionProducer: TransactionsPesonetgatewayCreateOutwardTransactionTempProducer()

// in another file
class ServiceName(
    private val producer: OutwardTransactionProducer
) {
...
   producer.send(idempotencyKey, messageKey, messageAvro)
   

How it looks if you want to use listener helper:

import ph.safibank.kafka.pesonetgateway.TransactionsPesonetgatewayCreateOutwardTransactionTempListener

@KafkaListener("some_kafka_setting_if_neccessary")
class OutwardTransactionListener1(
    private val service: Service,
) : TransactionsPesonetgatewayCreateOutwardTransactionTempListener() {
    override fun process(
        _idempotencyKey: String?, 
        _messageKey: UUID,  
        msg: OutwardTransactionAvro) =
        service.processCreateOutwardTransactionCommand(msg)
}

What we achieved with this approach:

  • you don't specify topics in listener, nor in producer

  • when implementing listener you have to override function process that has correct avro object that is tied to concrete topic so no mistake there

  • when sending message you may provide idempotency key (or keep it null) but at least you have to think about it

  • when sending message you have to provide messageKey

  • there is proper logging when we send/receive message

Logging payload of messages

For ease of debugging, wrapper logs payload of send/received kafka message. It does it currently with debug severity. Latest after end of november we should delete it and prepare for production and payload could not be logged in production because it may contain sensitive data. Maybe we can keep it in tests.

If you want to enable it in your microservice put this line in your logback.xml file.

<logger name="ph.safibank.kafka" level="DEBUG"/>

List of available topic names

If for any reason you want to use topic name in your code, for example you don't use wrapper or you need it in tests. This library exposes all topic names in class KafkaTopicName. Usage:

import ph.safibank.kafka.KafkaTopicName

val topic = TopicName.ACCOUNT_MAIN_ACTIVATE_TEMP // or whatever topic you want