SaFi Bank Space : Kafka message types

Microservices are generating Kafka messages. But not all messages are created equal.

3 categories of messages are: Commands, Events and Snapshots. More details about each of the messages below.

Commands

Commands are messages which are owned by the consumer, because much like REST APIs, they offer a way for the producers to invoke a specific action on the service (consumer). Hence, the relationship is many (producers) to one (consumer). They imperatively say what should happen and carry the relevant data to execute the action. E.g. “send_transaction” with data about sender and recipient.

Typically, the topic should be either:

  • a private topic for the service and only the single service should publish and consume from the topic. The service would expose a REST API where it validates the authenticity of the request and then pushes it to this command topic. It would be used only for the benefit of the consumer to achieve:

    • retryability of the request on the consumer side

    • decoupling between the client and service from the performance perspective

  • a topic exposed to other services for publishing, but with the owning service being a single consumer

Message key

Commands should use a key based on which does the caller (producer) expect the consumer to process the requests in order. This could be an entity key if the command is related to changing a specific entity, or a key of the owner of the process (e.g. customerId or accountId).

Naming

Pattern: {ImperativeVerb}{Object}Command{Version} where:

  • {ImperativeVerb}{Object} is a short description in imperative form describing the action that should be taken to the Object

  • {Version} is a major version of the schema, following the pattern V{integer}

Examples: CreateCardCommandV1, BlockCardCommandV1, SubmitTransactionCommandV2, …

Data privacy implications

The topic is not log compacted and has fairly short (e.g. 7-15 days) retention period. All data published to the topic (including PII data) is subject to be deleted after the retention period.

Schema evolution

Minor upgrades - BACKWARD_TRANSITIVE

Since the schema is owned by consumer, it also needs to evolve much like a request in an endpoint in a REST API. The consumer upgrades first, as it first needs to start offering the new possibilities to the producers. Because it does not want to break its old producers, the new reader schema (consumer) needs to understand the older producers, yielding BACKWARD_TRANSITIVE schema registry compatibility type.

How to do minor, compatible upgrades in commands

As mentioned in the previous paragraph, commands are similar to the requests REST APIs in a way that they are owned by the consumer and provide means of triggering the consumer’s API.

Hence, the backward compatible changes are:

  • Add a new optional field - in such case, the consumer will start offering a new functionality (for this field), but does not enforce all the producer to use it (since it’s optional) - hence, it’s a compatible change with previous versions. Just keep in mind that since it’s optional, the producers which haven’t migrated to the latest version will provide the DEFAULT value.

    • Example: Consider an OnboardCustomerCommand, which contains only customerId. The service currently always creates a card, but going forward, it would like to make the card creation optional and receive it as part of the command. The schema owner can add a new optional field createCard with "type": "boolean" and "default": true. This means that if the message was produced by a producer using the old schema, the consuming service would read the field as “true”, hence creating card as before. The new producers can leverage from the schema and optionally disable the card creation.

  • Remove an existing required/optional field - in such case, the consumer will simply stop considering the field, regardless of whether it was published by the producer or not - hence, it’s a compatible change with previous versions.

    • Example: Let’s use a similar example to the one mentioned when adding an optional field. Consider the schema already contains both the customerId and the createCard boolean (even as required = no default, this time). If the schema owners wish to just remove this functionality and create cards for everyone, then they can simply remove the createCard boolean field. The producers using the old schema will send the field, but the consumer using the new schema will just ignore it - as the new functionality on the consumer side works the same regardless of what is set in this field.

Note that a non-breaking (minor) change in commands schema should follow rules very similar to those when making a non-breaking change in a REST API request structure of an endpoint.

Major schema upgrades

As per decision in Avro major version upgrade strategy , commands should follow a consumer-first upgrade in separate topics. A high level description of the process follows (very similar to REST API major upgrades):

  1. The owners of the schema (the consumer) creates a new schema and topic.

  2. The consumer will create a new listener on the V2 and support consuming both V1 and V2 versions of the command.

  3. The clients (producers) can switch one by one and start sending V2 commands during a transition period.

  4. After the clients are switched to V2, the consumer can stop accepting V1 messages and remove the topic.

Idempotency

Executing a command is an operation which is not idempotent by nature. It can create/modify a resource, or start a process. Hence, the commands should be processed by the consumer at most once with respect to the idempotency key. The idempotency key should be provided in Message header with name X-Idempotency-Key and value UUID.

Events

Events are messages about “something that happened”. These would be emitted every time somethings (semantically, from business perspective) happens in a service. For instance “CustomerProfileUpdated” or “UserLoginSuccessful” including some relevant data. The consumers can choose to listen to these events and kick off their business logic - essentially leveraging a decoupled choreography of events. Just note that the originating service does not necessarily need to know about its consumers, and the retry-responsibility is not on the service, but on Kafka. Once the event is published, it’s a fire-and-forget for the publisher.

The schemas of events are owned by the producer, as there is only a single producer for each event. Hence, the relationship is one (producer) to (many) consumers.

Message key

Much like commands, the events should use a key based on which should the consumers expect the stream of events be ordered. For example, if the events are related to an entity, then the key should be the entity key. Most of the times, customerId should be a good fit if available.

If the relation between entities is A 1-* B 1-* C, then using C entity key as message key does not guarantee ordering of all entities related to the A. But vice versa, using the A entity as message key guarantees also that entities C with transitive relation to A are also ordered by their primary key.

However, the more granular keys (closer to C), the more even distribution of messages on partitions. So it is a tradeoff between performance and consistency.

Naming

Pattern: {Object}{PastTenseVerb}Event{Version} where:

  • {Object}{PastTenseVerb} is a short description of an action in past tense which happened related to the Object

  • {Version} is a major version of the schema, following the pattern V{integer}

Examples: CustomerProfileUpdatedEventV1, OverdraftCreatedEventV2, TransactionCreatedEventV4, CardBlockedEventV1, …

Data privacy implications

The topic is not log compacted and has fairly short (e.g. 7-15 days) retention period. All data published to the topic (including PII data) is subject to be deleted after the retention period.

Schema evolution

Minor upgrades - FORWARD_TRANSITIVE

Since the schema is owned by the producer, and we want to keep the producers of the event as much decoupled from the consumers as possible, then the producer (as owner of the schema) needs to be able to evolve independently. And as it’s the owner, it needs to evolve first. Hence, the old reader schemas in consumers need to be able to consume messages published by new writer schemas. Which means that the changes in schema need to have FORWARD_TRANSITIVE schema compatibility type.

How to do minor, compatible upgrades in events

As mentioned in the previous paragraph, the consumers having older reader schemas need to be able to understand the new writer schema. Hence a forward transitive compatible change is:

  • Add a new required/optional field - in this case, the consumer having an older reader schema simply won’t consider the new field that was added in the event. Hence the change if forward compatible. Once the consumer upgrades, it can start considering the newly added field.

    • Example: Consider a CustomerStatusUpdatedEvent which contains some information about the previousStatus and newStatus for a customerId. Let’s assume that the schema owners want to add a new field statusUpdatedAt going forward. The owners will enhance the schema with this new, (required) field, upgrade the producer to use it, and then allow the rest of the consumers to start using it as well. The old consumers reading new schema simply don’t have the field defined, hence they ignore it when processing. Consumers which upgrade to the new schema can leverage the new functionality of the event.

  • Remove an old optional field - in this case, once the producer stops sending the optional field, the consumer will use its DEFAULT value instead. The DEFAULT needs to be sensible for this to work properly (since it will actually be used by the consumers if the field is removed).

    • Example: TODO

Note that a non-breaking (minor) change in event schema should follow rules very similar to those when making a non-breaking change in a REST API response structure of an endpoint.

Major schema upgrades

As per decision in Avro major version upgrade strategy , the major schema upgrades in events should follow the producer-first upgrade in separate topics. A high level description of the process follows:

  1. The owners of the schema (producer) will create a new schema and register a new topic.

  2. The producer (owner) will first start publishing both the V1 and V2 events to the separate topics.

  3. The clients (consumers) can start accepting V2 instead of V1 during the transition period.

  4. Once all consumers accept the new V2, the producer can stop sending V1 events and remove the topic.

Idempotency

An event which specifies that “something has happened” is not idempotent by nature. The same effect could have happened twice. Hence, to identify the same event being processed twice, the events have to contain an idempotency key, and the consumer should ensure that such event is processed at most once with respect to the idempotency key. The idempotency key should be provided in Message header with name X-Idempotency-Key and value UUID.

Snapshots

Messages containing a view of a specific entity, published to a Kafka topic upon every change of that entity. It doesn’t state what happened, but provides an up to date view of how the entity looks like.

Snapshots would be used:

  • to create materialized views by different services wishing to query such entities

  • to build Datalake to serve reporting

It allows us to also create a service (or ETL in datalake) which provides a full history of changes of any entity that we maintain in the bank.

Consuming this message should not start any new business process, but should rather be used to build a the materialized view. It is very similar to a distributed read-only DB replica.

Message key

Snapshot messages are required to use their primary entity key as the message key as they represent the evolution of the specific entity (not owner, nor any relation to it) over time.

Naming

Pattern: {Object}Snapshot{Version} where:

  • {Object} is the entity (noun) provided from the provider to the rest of the system to materialize

  • {Version} is a major version of the schema, following the pattern V{integer}

Examples: CustomerProfileSnapshotV1, OverdraftSnapshotV2, TransactionSnapshotV2, CardSnapshotV1, …

A Snapshot entity does not need to map 1-to-1 to a DB model. The Snapshot entity should provide a business representation of the full entity and does not necessarily need to respect the split to multiple tables in DB.

For instance, if CustomerProfile is split to:

  • Profile

  • Address

  • PhoneNumber

It makes sense for the Snapshot of CustomerProfile to represent a joined view across the models which extend the “Profile”. Especially if it’s possible to change fields in two of the models at once.

Topic configuration

In order to make the new consumers able to create a full materialized view of all entities within the snapshot topic, the topic needs to have a log compaction option set up to save the data storage and aid performance (initial sync) when new consumer is added.

The log compacted topics retain only the latest message per key. Hence, the snapshot message should use its primary key as the message key when being published to the topic

Schema evolution

Minor upgrades - FULL_TRANSITIVE

A new consumer with latest reader schema should be able to read all messages published in the topic for the reason mentioned above. These messages were published potentially with historical writer schemas - hence, the latest reader schema needs to understand all previous schemas. This implies backward transitive compatibility. Additionally, as the producer of snapshot should be decoupled from its consumers, we want the producer to upgrade independently. And as it owns the schema, it is expected to upgrade first. This implies forward transitive compatibility. As the evolution needs to be both backward and forward compatible transitively, the compatibility setting for snapshot topics should be FULL_TRANSITIVE schema compatibility type.

How to do minor, compatible upgrades in snapshots

Unlike the other types of messages, the snapshots need to be compatible both ways as mentioned in the previous paragraph. Hence, a full transitive compatible change is:

  • Add a new optional field - since the field is added with a DEFAULT value, the new consumers reading older messages will just use the DEFAULT when reading it. The old consumers reading new message won’t even consider the field.

    • Example: Consider and AccountSnapshot containing some list of fields that are currently being sent, and the owners wanting to add a new timestamp, for instance accountClosedAt. The new field can be freely added as long as it contains a DEFAULT value - the only question is what is a viable default. As the snapshots should represent and entity in the DB, then the new field default should be the “same” as was used when creating the DB migration. Let’s assume it’s a newly added field which will be filled only for newly closed accounts. Hence, the field should be nullable - have a union type of timestamp and null as possible values (because the old accounts have null in the DB) and a DEFAULT value of null. A new consumer reading the old messages will read the accountClosedAt as null, because that’s what is also in the DB.

  • Remove an old optional field - since the field had a DEFAULT value defined, the old consumers reading a message from new publisher will simply use the DEFAULT. The new consumers don’t have the field defined, and will ignore it for the old messages.

    • Example: Consider the AccountSnapshot again, and let’s assume we want to remove an optional accountClosedAt field. This is most probably because we have also removed that field from the DB. Now we want to create the same effect for the consumers of snapshots. Simply removing the field from would mean that the old consumers reading new message will use the DEFAULT instead - if the DEFAULT was chosen a sensible value when it was added (when the column was added), such as null , then the old consumers would read null for all newly published messages. Which is also exactly what happened in the DB - the column was removed, and now is (basically) null. The new consumers reading old message don’t have the field defined in their schema and hence will simply ignore the field in the message.

Major schema upgrades

As per the decision in Avro major version upgrade strategy , major upgrades in schemas for snapshots should follow the producer-first upgrade in separate topics strategy. A high level description of the process follows:

  1. The owner of the snapshot schema (producer) will create a new major version of snapshot schema and a new topic.

  2. The producer of the snapshot starts publishing the snapshots of both versions, V1 and V2 to two separate topics.

  3. During a transition period, the consumers will upgrade their listeners to listen on V2 instead of V1. The switch may be atomic, but it’s not required. The consumer can listen on both V1 and V2 until the consumer is fully up to date with V2 to avoid any lag.

  4. After all consumers have transitioned, the producer will “republish” the snapshots of entities into the new V2 topic so that the consumers of the new topic have full information.

  5. The producer can stop sending V1.

TODO - provide more details on how to do the “republish” job.

Data privacy implications

Since the topic retains the latest snapshot of every entity and the entities may contain PII data, it stores such PII data indefinitely if left untouched. Based on the data privacy requirements for data storage https://safibank.atlassian.net/wiki/spaces/ITArch/pages/116359472/Data+privacy#Storage , the topic needs to support PII data deletion.

Effective deletion of such data should be possible by sending a “tombstone”, which is a message with given key but empty payload (or a payload with removed specific fields). As there is log compaction set up, Kafka deletes also the compacted data. Thus, the customer data associated with their key (customerId) is cleaned from the snapshot topic.

Consumers which are building a materialized view should react on receiving the tombstone message by deleting their records of that entity within their materialized view.

Idempotency

The snapshots state that “This is what the entity looked like at some updatedAt time”. It does not state anything about what has happened to get the entity into that state. Processing the information twice should make no difference as:

  • the publisher does not intend to initiate any business process by publishing a snapshot. It only aims to provide a read-only view on the data it maintains

  • the consumer should not initiate any business processes, as the snapshot does not signalize that “something has happened” - that’s what the events are there for.

Hence, the snapshots do not need to contain an idempotency key. The publisher may (but may not) include them, but it should be safe for the consumer to ignore.

Hints for non-breaking upgrades

How to add a nullable, optional field

Let’s assume the field is a timestamp. First, the field has to have the values provided as a union of null and the actual type (timestamp). Then, on the same level in the JSON as you’ve defined the union type, define a "default": null value.

{
  "name": "onboardedAt",
  "doc": "The time customer has been moved to INACTIVE state",
  "default": null,
  "type": [
    "null",
    {
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
},

Same pattern with union can be used for any other type.

How to extend an enum with new value

The issue

Extending an enum in producer-first upgrade is usually considered a breaking change.

Simply because the consumers won’t be able to parse the value once published by the new producer. However, since Avro 1.9.0+ there is a way how to specify a symbol default, which lets the old consumers parse the message even if some strings contain extended enums.

Avoiding syntactical breaking change with Default

An enum is essentially a union of values. It can contain a default value as well, and can be extended “non-breakingly” from syntactic perspective in the sense that the old reader will be able to read the schema even if the new value is published. Consider the following:

{
  "name": "status",
  "type": [
    "null",
    {
      "type": "enum",
      "name": "StatusV1",
      "symbols": [
        "PROSPECT",
        "REJECTED",
        "SUBMITTED",
        "ACTIVE",
        "UNKNOWN"
      ],
      "default": "UNKNOWN"
    }
  ],
  "default": null,
},

The status field is an optional nullable field (because of the "default": null which is field default), which contains an enhanceable enum (because of "default": "UNKNOWN" which is symbol default). If this enum was enhanced with a new value, such as OFFBOARDED, then:

  • old reader schemas reading a new message containing the OFFBOARDED value would read it as UNKNOWN. The reader (consumer) has properly parsed the message and now has a chance to react. It may not need the value at all, but if it needs it, it can react by throwing an exception - in such case, this upgrade was breaking for the consumer.

  • new reader schemas would read the enum value as OFFBOARDED

As mentioned in the previous paragraph, extending enums in messages can still be breaking change from semantics perspective for the consumers, even if the default field is used. The default helps the consumers which are interested in the message, but perhaps not in the specific field to update a little later.

Avoiding semantical breaking change with consumer-first upgrade

Having the default value specified for the symbol and parsing “UNKNOWN” instead doesn’t help if the consumer is actually interested in the value. One alternative is to go through the actual major upgrade process, but as these are tedious and lengthy if done right, we would like to avoid them as much as possible.

If there is a need to enhance an enum and there are consumers which use it, then the best viable approach is to leverage consumer-first upgrade even for events and snapshots where producer-first upgrade is usually recommended & preferred.

Attachments:

~drawio~557058:27349720-65fa-4129-b290-8abf5953e7fb~ms-topics.tmp (application/vnd.jgraph.mxfile)
~drawio~557058:27349720-65fa-4129-b290-8abf5953e7fb~ms-topics.tmp (application/vnd.jgraph.mxfile)
ms-topics (application/vnd.jgraph.mxfile)
ms-topics.png (image/png)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
ms-topics (application/vnd.jgraph.mxfile)
ms-topics.png (image/png)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
~ms-topics.tmp (application/vnd.jgraph.mxfile)
ms-topics (application/vnd.jgraph.mxfile)
ms-topics.png (image/png)