We’ll use Avro to define the shape of messages in our Kafka deployment by default. There will be some exceptions to this, especially when interfacing with other services (e.g. ThoughtMachine supports only JSON and protobuf, so we’ll use JSON for topics related to TM)
The schema files should live in subfolders (based on domain) of common/schema/schemas folder in the monorepo.
List of all available domains is available within the topicSchemasDefinitions.json
Avro schema rules:
The owner of the schema depends on the message type:
Commands are owned by it’s consumer
Events and Snapshots are owned by their producer
common/schema/schemas
folder contains one folder per domain - with name of this subfolder corresponding to domain name -common/schema/schemas/<domain>
. This subfolder should contain all major versions of each schema within the domain.Schema naming:
Schema name - the “name” property of schema. Has to follow naming conventions for schemas per given message type (snapshot, command or event) which are defined in Kafka message types
Filename - the filename has to match the schema name.
namespace - the “namespace” property of the schema is
ph.safibank.avro.<domain>
E.g.:ph.safibank.avro.transactions
Schema can be evolved through minor upgrades preserving compatibility rules specified in each of the message types by modifying the schema file. The rules will be enforced in Kafka’s SchemaRegistry.
This will be in place once SM-5427 - Getting issue details... STATUS is done. As of now, FULL_TRANSITIVE compatibility level is set for all schemas.
Every field should have documentation associated with it, even if trivial
Topic schema management
Assigning schemas to topics happens in common/schema/topicSchemaDefinitions.json configuration
“topics” - array which contains the full list of topic configuration, each containing:
“name” - (required) the name of the topic. The name has to follow a pattern : TODO
“schema” - (required) the schema name to be assigned to the topic. Schema name is prefixed with the domain.
{domain}.{schemaName}
“type” - (required) the type of the message. Must be one of
EVENT | SNAPSHOT | COMMAND
“schemaCompatLevel” - (required for some cases) compatibility level of schema. Must be
FORWARD_TRANSITIVE
for EVENT type andBACKWARD_TRANSITIVE
for COMMAND type.“messageKey” - (required) the type of the message key. Can be either
UUID | String | null
. Used by the generated kafka listeners and producers to provide proper type for the key. The null is currently accepted only due to backward compatibility. Eventually all messages should have UUID or String specified.
Topic naming
Topic name has to follow the pattern
{domain}.{descriptor}.{type}.{version}
, where:all is lowercase
{domain}
- the domain (from the available list) which owns the topic.{descriptor}
- description of the topic.use kebab-case. Only lowercase letters separated with dashes are allowed
ideally, descriptor = schema (exception: when the same schema is used for multiple topics), but this is not enforced
{type}
- type of the topic, one ofevent | snapshot | command
. Has to match the “type” in the topic definition{version}
- major version, following the patternv{integer}
starting fromv1
If a new schema version is created due to breaking changes, messages with new schemas should be published in a new topic with increased
{version}
. The old topic should be maintained for a while to give consumers time to update to the new topic. See major schema upgrade section for particular kafka message type at Kafka message types for guidelines on major upgrade.There will be different process for topic and schemas promotion to other environments once SM-6066 - Getting issue details... STATUS is done (until then, SM-6063 - Getting issue details... STATUS takes place).
Deployment flow
A PR should be submitted with proposed changes to a schema or a new topic definition. Dedicated Github workflow (avro-schema-validator) makes sure that the schema and definition in topicSchemasDefinitions.json are correct and abides the compatibility rules, particularly it checks:
proper naming in definition json
each topic name has exactly 3 dots
version part of topic name is of vInteger pattern
descriptor part of topic name consists only of lowercase letters and dashes
domain part of topic name matches one of domains in definition json
type part of topic name is one of event/command/snapshot
type part of topic name matches type attribute converted to lowercase
domain part of topic name matches domain part of schema name
schema name suffix matches type of topic + version
avro file syntax
if schema is assigned to topics of same type (Kafka message types)
if schemaCompatLevel is properly set for EVENTs and COMMANDs
schema compatibility against previous schema versions
schema namespace collision - if schema name is not already used by other avro files
After a PR is merged, dedicated Github workflow (avro-schema-generator) takes care of creation of new jar file with - containing all the latest versions of every schema. This can be picked up by services through maven.
Creation of topics and registration of schemas is setup by SRE and done via argocd - https://github.com/SafiBank/SaFiMono/blob/main/devops/argocd/environments/brave/confluent-kafka/topics.
Promotion to higher environment
There will be no actual promotion to higher environment, but instead for each environment, there will be separate dedicated topicSchemasDefinitions.json file, which would correspond to desired state on given environment.
Usage
Using generated wrappers
Recommended way for interaction with kafka is using listeners/producers automatically generated from topic&schemas definitions as described in https://github.com/SafiBank/SaFiMono/tree/main/common/schema .
Multiple benefits of this approach, for example:
type safety on avro objects
prevention of typos in topic names
common configuration of kafka connection
Using custom clients
In addition to specifying kafka server location, clients need to configure additional schema related properties (urls should be taken from env vars):
kafka: bootstrap: servers: ${KAFKA_URL} basic.auth.credentials.source: USER_INFO basic.auth.user.info: ${KAFKA_SCHEMA_REGISTRY_CRED} producers: default: key.serializer: org.apache.kafka.common.serialization.UUIDSerializer value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer schema.registry.url: ${SCHEMA_REGISTRY_URL} consumers: default: key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer schema.registry.url: ${SCHEMA_REGISTRY_URL} specific.avro.reader: true
default - denotes default configuration for producers/consumers; if consumer group is used on consumers side, consumer group can be configured instead/in addition
key.serializer / key.deserializer - (de)serializer to be used for topic key (primitive value, no schema)
value.serializer / value.deserializer - above specified KafkaAvro(De)Serializer has to be used to be able to use schema&generated classes capabilities
(for consumer) specific.avro.reader - needs to be set to true to deserialize message into specific generated java class (instead of generic avro message class)
Folder structure example:
common schema schemas transactions TransactionSnapshotV1.avsc InternalTransactionCreatedEventV1.avsc InternalTransactionCreatedEventV2.avsc messaging InternalMessageCreatedV1.avsc