SaFi Bank Space : Kafka topic schema management

We’ll use Avro to define the shape of messages in our Kafka deployment by default. There will be some exceptions to this, especially when interfacing with other services (e.g. ThoughtMachine supports only JSON and protobuf, so we’ll use JSON for topics related to TM)

The schema files should live in subfolders (based on domain) of common/schema/schemas folder in the monorepo.

List of all available domains is available within the topicSchemasDefinitions.json

Avro schema rules:

  • The owner of the schema depends on the message type:

    • Commands are owned by it’s consumer

    • Events and Snapshots are owned by their producer

  • common/schema/schemas folder contains one folder per domain - with name of this subfolder corresponding to domain name - common/schema/schemas/<domain>. This subfolder should contain all major versions of each schema within the domain.

  • Schema naming:

    • Schema name - the “name” property of schema. Has to follow naming conventions for schemas per given message type (snapshot, command or event) which are defined in Kafka message types

    • Filename - the filename has to match the schema name.

    • namespace - the “namespace” property of the schema is ph.safibank.avro.<domain> E.g.: ph.safibank.avro.transactions

  • Schema can be evolved through minor upgrades preserving compatibility rules specified in each of the message types by modifying the schema file. The rules will be enforced in Kafka’s SchemaRegistry.

    • This will be in place once SM-5427 - Getting issue details... STATUS is done. As of now, FULL_TRANSITIVE compatibility level is set for all schemas.

  • Every field should have documentation associated with it, even if trivial

Topic schema management

Assigning schemas to topics happens in common/schema/topicSchemaDefinitions.json configuration

  • “topics” - array which contains the full list of topic configuration, each containing:

    • “name” - (required) the name of the topic. The name has to follow a pattern : TODO

    • “schema” - (required) the schema name to be assigned to the topic. Schema name is prefixed with the domain. {domain}.{schemaName}

    • “type” - (required) the type of the message. Must be one of EVENT | SNAPSHOT | COMMAND

    • “schemaCompatLevel” - (required for some cases) compatibility level of schema. Must be FORWARD_TRANSITIVE for EVENT type and BACKWARD_TRANSITIVE for COMMAND type.

    • “messageKey” - (required) the type of the message key. Can be either UUID | String | null . Used by the generated kafka listeners and producers to provide proper type for the key. The null is currently accepted only due to backward compatibility. Eventually all messages should have UUID or String specified.

Topic naming

  • Topic name has to follow the pattern {domain}.{descriptor}.{type}.{version}, where:

    • all is lowercase

    • {domain} - the domain (from the available list) which owns the topic.

    • {descriptor} - description of the topic.

      • use kebab-case. Only lowercase letters separated with dashes are allowed

      • ideally, descriptor = schema (exception: when the same schema is used for multiple topics), but this is not enforced

    • {type} - type of the topic, one of event | snapshot | command . Has to match the “type” in the topic definition

    • {version} - major version, following the pattern v{integer} starting from v1

  • If a new schema version is created due to breaking changes, messages with new schemas should be published in a new topic with increased {version}. The old topic should be maintained for a while to give consumers time to update to the new topic. See major schema upgrade section for particular kafka message type at Kafka message types for guidelines on major upgrade.

  • There will be different process for topic and schemas promotion to other environments once SM-6066 - Getting issue details... STATUS is done (until then, SM-6063 - Getting issue details... STATUS takes place).

Deployment flow

  1. A PR should be submitted with proposed changes to a schema or a new topic definition. Dedicated Github workflow (avro-schema-validator) makes sure that the schema and definition in topicSchemasDefinitions.json are correct and abides the compatibility rules, particularly it checks:

    1. proper naming in definition json

      1. each topic name has exactly 3 dots

      2. version part of topic name is of vInteger pattern

      3. descriptor part of topic name consists only of lowercase letters and dashes

      4. domain part of topic name matches one of domains in definition json

      5. type part of topic name is one of event/command/snapshot

      6. type part of topic name matches type attribute converted to lowercase

      7. domain part of topic name matches domain part of schema name

      8. schema name suffix matches type of topic + version

    2. avro file syntax

    3. if schema is assigned to topics of same type (Kafka message types)

    4. if schemaCompatLevel is properly set for EVENTs and COMMANDs

    5. schema compatibility against previous schema versions

    6. schema namespace collision - if schema name is not already used by other avro files

  2. After a PR is merged, dedicated Github workflow (avro-schema-generator) takes care of creation of new jar file with - containing all the latest versions of every schema. This can be picked up by services through maven.

  3. Creation of topics and registration of schemas is setup by SRE and done via argocd - https://github.com/SafiBank/SaFiMono/blob/main/devops/argocd/environments/brave/confluent-kafka/topics.

Promotion to higher environment

There will be no actual promotion to higher environment, but instead for each environment, there will be separate dedicated topicSchemasDefinitions.json file, which would correspond to desired state on given environment.

Usage

Using generated wrappers

Recommended way for interaction with kafka is using listeners/producers automatically generated from topic&schemas definitions as described in https://github.com/SafiBank/SaFiMono/tree/main/common/schema .

Multiple benefits of this approach, for example:

  • type safety on avro objects

  • prevention of typos in topic names

  • common configuration of kafka connection

Using custom clients

In addition to specifying kafka server location, clients need to configure additional schema related properties (urls should be taken from env vars):

kafka:
  bootstrap:
    servers: ${KAFKA_URL}
  basic.auth.credentials.source: USER_INFO 
  basic.auth.user.info: ${KAFKA_SCHEMA_REGISTRY_CRED}
  producers:
    default:
      key.serializer: org.apache.kafka.common.serialization.UUIDSerializer
      value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      schema.registry.url: ${SCHEMA_REGISTRY_URL}
  consumers:
    default:
      key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
      value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      schema.registry.url: ${SCHEMA_REGISTRY_URL}
      specific.avro.reader: true
  • default - denotes default configuration for producers/consumers; if consumer group is used on consumers side, consumer group can be configured instead/in addition

  • key.serializer / key.deserializer - (de)serializer to be used for topic key (primitive value, no schema)

  • value.serializer / value.deserializer - above specified KafkaAvro(De)Serializer has to be used to be able to use schema&generated classes capabilities

  • (for consumer) specific.avro.reader - needs to be set to true to deserialize message into specific generated java class (instead of generic avro message class)

Folder structure example:

common
  schema
    schemas
      transactions
        TransactionSnapshotV1.avsc
        InternalTransactionCreatedEventV1.avsc
        InternalTransactionCreatedEventV2.avsc
      messaging
        InternalMessageCreatedV1.avsc