SaFi Bank Space : SaFi PoC Temporal workflow to handle async process in sync call

Background

SM-5191 - PoC Temporal workflow to handle async process in sync call Done

SM-5188 - CTPM - Create new endpoints based on Euronet schema Done

Currently, the TM(Thought Machine) is processing transactions(Transfer, Pre-Auth, Settlement) in async manner. However, the Euronet requires us to return a response either SUCCESS or FAILED in sync manner.

There are two approaches to know whether a transaction is completed/success:

  • Subscribe Kafka topic integration.postings_api.card_transaction_manager_postings_client.response

  • Call Thought Machine endpoint GET /v1/posting-instruction-batches/{id} (need to poll, not recommended)

Our CTPM(card-transaction-processor-manager) is running in Kubernetes cluster with multiple instances/pods. So the instance which receive request from Euronet may not be the one which handle TM transaction event.

Proposal

Capture transaction status in Database

This approach is to update transaction status in our Postgres database when any instances/pods received TM transaction event. Then the instance/pod which process Euronet request can keep polling from database until the transaction status is available.

Pros

Cons

Easy to implement, less effort

Extra load to database

No need to introduce external component

need to implement idempotency check, error handling, retry and timeout

Use temporal workflow

As Temporal is widely used in our services, we can use temporal workflow to orchestration the flow. There is a demo repository for POC.

Pros

Cons

Out of box idempotency check, retry and timeout

More complex

No need to poll

Steps to implement workflow

Add dependencies

    //  Add temporal SDK as dependencies
    implementation("io.temporal:temporal-kotlin:$temporalVersion")
    implementation("io.temporal:temporal-sdk:$temporalVersion")

Register our CTPM service instance as a Temporal worker

@Context
class TemporalWorker(
    private val transactionActivity: TransactionActivity,
    private val serviceOptions: WorkflowServiceStubsOptions,
    private val clientOptions: WorkflowClientOptions,
) {
    private val logger = getLogger()

    @PostConstruct
    fun start() {
        logger.info("Starting workflow service on ")
        val workflowService = WorkflowServiceStubs.newServiceStubs(serviceOptions)
        val workflowClient = WorkflowClient.newInstance(workflowService, clientOptions)

        val workerFactory = WorkerFactory.newInstance(workflowClient)
        val worker = workerFactory.newWorker("transaction-task-queue")

        worker.run {
            registerWorkflowImplementationTypes(TransactionWorkflowImpl::class.java)
            registerActivitiesImplementations(transactionActivity)
        }

        workerFactory.start()
    }
}

Add workflow activity

@ActivityInterface
interface TransactionActivity {
    @ActivityMethod
    fun transfer(idempotencyKey: String, transactionId: String)
}

@Singleton
class TransactionActivityImpl(private val coreBankClient: CoreBankClient): TransactionActivity {
    override fun transfer(idempotencyKey: String, transactionId: String) {
        this.coreBankClient.transfer(Transaction(idempotencyKey, transactionId))
    }
}

Add workflow implementation

enum class TransactionStatus {
    PENDING, FAILED, SUCCESS
}

@WorkflowInterface
interface TransactionWorkflow {

    @WorkflowMethod
    fun start(idempotencyKey: String, transactionId: String): TransactionStatus

    @SignalMethod
    fun updateTransactionStatus(status: TransactionStatus)
}

class TransactionWorkflowImpl: TransactionWorkflow {
    private val logger = getWorkflowLogger()

    private var status: TransactionStatus = TransactionStatus.PENDING

    private val defaultActivityOptions = ActivityOptions {
        setStartToCloseTimeout(Duration.ofSeconds(10))
        setRetryOptions {
            setInitialInterval(Duration.ofSeconds(1))
            setMaximumInterval(Duration.ofSeconds(10))
            setBackoffCoefficient(2.0)
        }
    }
    private val transactionActivity: TransactionActivity = Workflow.newActivityStub(TransactionActivity::class.java, defaultActivityOptions)

    override fun start(idempotencyKey: String, transactionId: String): TransactionStatus {
        logger.info("Start workflow to process transaction $transactionId with idempotency key $idempotencyKey")
        transactionActivity.transfer(idempotencyKey, transactionId)
        logger.info("Transaction has been send to core bank, start to wait core bank status")
        
        // wait until transaction status get updated
        Workflow.await {
            logger.info("Wait transaction status get updated,current status is $status")
            status != TransactionStatus.PENDING
        }

        return status
    }

    override fun updateTransactionStatus(status: TransactionStatus) {
        logger.info("Received transaction status update event, status is $status")

        this.status = status
    }
}

Update workflow status in kafka consumer

@KafkaListener
class TransactionListener(private val workflowClient: WorkflowClient) {
    private val logger = getLogger()

    @Topic("integration.postings_api.card_transaction_manager_postings_client.response")
    fun receive(@KafkaKey key: String, transactionEvent: TransactionEvent) {
        logger.info("Received transaction event $transactionEvent with idempotency key $key")
        val workflowStub: WorkflowStub = workflowClient.newUntypedWorkflowStub(key)
        workflowStub.signal("updateTransactionStatus", transactionEvent.status)
    }
}

Pert testing

I performed perf testing using gatling to end 100 request during 20 seconds (5request per sec) to brave environment, the mean response time is 816 ms, p99 is 2.5s

Conclusion

Use temporal to handle the async to sync flow.