GitHub: https://github.com/SafiBank/SaFiMono/tree/main/common/idempotency-lib

Overview

Library that enables idempotent behavior.

General Concepts

Idempotent Phase

The base concept of the library is idempotent phase. It is an executable block of code that, for a given set of inputs, executes its body only once, stores the result and then, for subsequent calls, returns the stored (cached) value.

fun handleSomething() {
    val result = phase {
        executeSideEffects()
    }
    return Response(result)
}

fun main() {
    handleSomething() // Executes the side effects
    handleSomething() // Does not execute the side effects
}

As it is common with the caches, you need to identify different values it stores. This identifier is called phaseId and it's a required parameter for each phase.

fun handleSomething() {
    val fooResult = phase("foo") {
        executeFooSideEffects()
    }
    val barResult = phase("bar") {
        executeBarSideEffects()
    }
    return Response(fooResult, barResult)
}

Sometimes the phase only needs to execute the side effects, without expected any result. In this case, the result is represented as Unit. You can specify this explicitly, however, the IDE will figure this out for you in most cases.

fun handleSomething(entity: Entity) {
    val id = phase("storeExternally") {
        externalService.post(entity)
    }
    phase<Unit>("saveInDb") {
        database.store(id, entity)
    }
    return Response(id)
}

Flows

In order to use phase function anywhere in code, one needs to wrap it into a flow abstraction. Flow can have multiple phases and should have unique flowId (unique identification of flow) and idempotencyKey (unique identifier of a particular run of that flow). This ensures that if you want to implement idempotency in multiple places, the cached results of phases will not collide. Also grouping phases into flows helps with logging and troubleshooting. For the name of the flow, use a unique identifier across that service.

fun handleSomething(entity: Entity, idempotencyKey: String) =
    idempotencyService.runFlow("SomethingHandler", idempotencyKey) { ctx ->
        val id = ctx.phase("storeExternally") {
            externalService.post(entity)
        }
        ctx.phase<Unit>("saveInDb") {
            database.store(id, entity)
        }
        Response(id)
    }

Phase Modes

Each phase can have one of the four different configurations, called modes. The mode should be chosen carefully depending on what you want to do in that phase body. Specifying mode is the second argument to phase function call. Enum with modes are defined in ph.safibank.common.idempotency.core. IdempotencyPhaseMode. Here’s an example of specifying phase mode:

ctx.phase("storeExternally", IdempotencyPhaseMode.DEFAULT) {
    externalService.post(entity)
}

DEFAULT

This is default mode that is applied when you don't specify the mode explicitly. It is useful when the phase body can be executed twice without any issues. There is no locking before start of phase so when 2 concurrent running flows will be started in parallel body can be executed multiple times. Even though 2 flows can start executing body of phase at the end of phase only one of runs will succeed and other one will throw an exception. If you want to prevent starting 2 flows from executing, use LOCK mode. However, this mode is cheaper because you don't have to write lock to database before you start execution. So use this mode for side effects that are idempotent by design or can ensure nothing wrong will happen if run twice. Also you can use this mode for reading external API or reading from database.

LOCK

Before execution of phase body is started, there is lock written to database. This will prevent any other flow with same phase key to be executed. This is more costly than default mode and use it if side effects executed in body do not have idempotent semantic. Also be aware if whole service crash, lock in database might be kept there for duration of timeout. You can specify this timeout for phase with failsafeTimeout argument on phase function. In case when body of phase throws Exception lock is released and second run of flow can run this again. This is in contrast to ONE_TIME where lock is never released after body is run at least once.

ONE_TIME

This is very special case where side effect that need to be executed in body is designed terribly (usually API calls). What it does is acquire lock before start of phase and never release it. That means that you have to manually handle cases where execution of phase throws. Example when you need to use it when external API has post request to create resource but if response packet is lost you need to manually use another API to find out if resource has been already created. if you use phase with ONE_TIME semantics this phase should always be wrapper with try catch block with catching exception PhaseIsLocked:

try {
    ctx.phase("createInExternalAPI", IdempotencyPhaseMode.ONE_TIME) {
        API.createResource(id);
    }
} catch (e: PhaseIsLocked) {
    val existingResouce = API.findIfResouceExist(id)
    if (existingResouce) {
        // resource has been created in previous run we don't need to do anything just return it
        existingResouce
    } else {
        API.createResource(id);
    }    
}

TRANSACTIONAL

Use this phase mode when dealing with database transactions. If this phase fails it will roll back transaction. No locking/releasing locks needed. Therefore when this phase fails in next runs it will try to execute again. Use only for side effects that support transactional with micronaut like writes to database.

How it works

How to use

TL;DR - quick steps for consuming the library

  1. Copy migrations from this folder

  2. Set common.idempotency.postgres.autoconfigure property to true

  3. Look at examples (see below)

Configuration

Most of the library components are autoconfigured.

Phase result and lock storage

Storage of phase results and phase locks is coupled at the moment. You can choose between to different implementations: Postgres and Mock.

Postgres stores the data in your application database. It should be default setup for runtime and all tests that have the database available. While the models and repositories are part of the library, you need to set up the migrations yourself:

  1. Copy migrations from this folder.

  2. Set common.idempotency.postgres.autoconfigure property to true.

Mock stores the data in application memory. It should be only used for experiments and with tests that have no database available.

  • With micronaut injection (@MicronautTest)

    • Set common.idempotency.mock.autoconfigure property to true.

  • Without micronaut injection (e.g. unit tests)

    • Populate IdempotencyService using mockIdempotencyService()

Note that in Mock setup IdempotencyPhaseMode.TRANSACTIONAL won't work.

Serialisation

In order to be stored, phase results need to be serialised. By default, the library uses Jackson. It uses your ObjectMapper singleton if present or creates its own.

You can specify custom object mapper:

@Factory
class IdempotencyBeansFactory {

    @Singleton
    fun idempotencySerializationBackend() =
        JacksonSerializationBackend(yourFancyObjectMapper)
}

There is also the possibility of implementing IdempotencySerializationBackend, for special cases that require it.

Other properties

  • common.idempotency.autoconfigure (default true)

    • Enables autoconfiguration of IdempotencyService singleton and Jackson serialisation.

  • common.idempotency.failsafe_timeout_seconds (default 60)

    • Failsafe timeout used for IdempotencyPhaseMode.LOCK if not specified in phase.

  • common.idempotency.infinity_timeout_days (default 365)

    • Timeout used for IdempotencyPhaseMode.ONE_TIME.

Design Patterns

  • Name of phase in one flow should be unique.

  • Prefer DEFAULT mode from LOCK mode if possible because it is cheaper.

  • For read from database use DEFAULT mode.

  • For write to database use TRANSACTIONAL mode.

  • For read from internal API, read from external API you can use DEFAULT mode.

  • If using write from internal API that is idempotent you can use DEFAULT mode.

  • If using write from external API that has idempotent behaviour or can error on creation of same resouce you can use DEFAULT mode.

  • When using write on external API and it is not idempotent writes use LOCK mode.

  • As last resort use ONE_TIME semantics in case when writing to exernal api even if previous write did not succeed can have bad consequencies.

  • For publishing to kafka topic that do not have idempotent behaviour use LOCK mode

  • For publishing to kafka topic that have idempotent behaviour you can use DEFAULT mode.

How to delete phase, add new phase, change phase from existing flows

Code evolution when using idempotency library is a bit trickier that without it. You have to always remember that there exist partials run of flows stored in database.

When you want to delete phase from flow this is safe operation. When you want to add phase to flow keep this is also safe operation.

When you want to edit phase or change its behaviour this is trickier. Main problem is that there exist flows that already might execute this flow. First you should use different name for edited phase e.g. add suffix "V2". And inside newly named phase you should load results from previously runned phase. Example:

// PREVIOUS PHASE
val id = ctx.phase("Call3rdParty", mode = IdempotencyPhaseMode.LOCK) {
    ExternalService.storeSomething(request)
}
// EVOLVED PHASE
val id = ctx.phase("Call3rdPartyV2", mode = IdempotencyPhaseMode.LOCK) {
    val previousResult = ctx.storedOrNull<Int>("Call3rdParty")
    if (previousResult != null && previousResult > 47) {
        // if previous run exist we need to migrate to new version
        ExternalService.updateSomethingFromV1ToV2(request)
    } else {
        // otherwise just create resource with new version
        ExternalService.storeSomethingV2(request)
    }
}

Examples

Package structure - for library authors

Packages are structured in a way that will allow potential split, if multiple implementations for individual backends will be available:

  • core: Idempotency logic and backend interfaces.

  • jackson: Specific dependency on Jackson.

  • postgres: Specific dependency on Micronaut Data and PostgreSQL.

  • mock: Alternative to postgres.

  • autoconfiguration: Ties it all together.

Attachments:

plantuml_1669903247040 (text/plain)
plantuml_1669903247040.svg (image/svg+xml)
plantuml_1669903247040.png (image/png)
plantuml_1669903247040.svg (image/svg+xml)
plantuml_1669903247040 (text/plain)
plantuml_1669903247040.png (image/png)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
~drawio~627285696a38370069ddff2e~packages.tmp (application/vnd.jgraph.mxfile)
image-20221201-151712.png (image/png)
image-20230110-055051.png (image/png)