Background
SM-5191 - PoC Temporal workflow to handle async process in sync call Done
SM-5188 - CTPM - Create new endpoints based on Euronet schema Done
Currently, the TM(Thought Machine) is processing transactions(Transfer, Pre-Auth, Settlement) in async manner. However, the Euronet requires us to return a response either SUCCESS or FAILED in sync manner.
There are two approaches to know whether a transaction is completed/success:
Subscribe Kafka topic
integration.postings_api.card_transaction_manager_postings_client.response
Call Thought Machine endpoint
GET
/v1/posting-instruction-batches/{id}
(need to poll, not recommended)
Our CTPM(card-transaction-processor-manager) is running in Kubernetes cluster with multiple instances/pods. So the instance which receive request from Euronet may not be the one which handle TM transaction event.
Proposal
Capture transaction status in Database
This approach is to update transaction status in our Postgres database when any instances/pods received TM transaction event. Then the instance/pod which process Euronet request can keep polling from database until the transaction status is available.
Pros | Cons |
---|---|
Easy to implement, less effort | Extra load to database |
No need to introduce external component | need to implement idempotency check, error handling, retry and timeout |
Use temporal workflow
As Temporal is widely used in our services, we can use temporal workflow to orchestration the flow. There is a demo repository for POC.
Pros | Cons |
---|---|
Out of box idempotency check, retry and timeout | More complex |
No need to poll |
Steps to implement workflow
Add dependencies
// Add temporal SDK as dependencies implementation("io.temporal:temporal-kotlin:$temporalVersion") implementation("io.temporal:temporal-sdk:$temporalVersion")
Register our CTPM service instance as a Temporal worker
@Context class TemporalWorker( private val transactionActivity: TransactionActivity, private val serviceOptions: WorkflowServiceStubsOptions, private val clientOptions: WorkflowClientOptions, ) { private val logger = getLogger() @PostConstruct fun start() { logger.info("Starting workflow service on ") val workflowService = WorkflowServiceStubs.newServiceStubs(serviceOptions) val workflowClient = WorkflowClient.newInstance(workflowService, clientOptions) val workerFactory = WorkerFactory.newInstance(workflowClient) val worker = workerFactory.newWorker("transaction-task-queue") worker.run { registerWorkflowImplementationTypes(TransactionWorkflowImpl::class.java) registerActivitiesImplementations(transactionActivity) } workerFactory.start() } }
Add workflow activity
@ActivityInterface interface TransactionActivity { @ActivityMethod fun transfer(idempotencyKey: String, transactionId: String) } @Singleton class TransactionActivityImpl(private val coreBankClient: CoreBankClient): TransactionActivity { override fun transfer(idempotencyKey: String, transactionId: String) { this.coreBankClient.transfer(Transaction(idempotencyKey, transactionId)) } }
Add workflow implementation
enum class TransactionStatus { PENDING, FAILED, SUCCESS } @WorkflowInterface interface TransactionWorkflow { @WorkflowMethod fun start(idempotencyKey: String, transactionId: String): TransactionStatus @SignalMethod fun updateTransactionStatus(status: TransactionStatus) } class TransactionWorkflowImpl: TransactionWorkflow { private val logger = getWorkflowLogger() private var status: TransactionStatus = TransactionStatus.PENDING private val defaultActivityOptions = ActivityOptions { setStartToCloseTimeout(Duration.ofSeconds(10)) setRetryOptions { setInitialInterval(Duration.ofSeconds(1)) setMaximumInterval(Duration.ofSeconds(10)) setBackoffCoefficient(2.0) } } private val transactionActivity: TransactionActivity = Workflow.newActivityStub(TransactionActivity::class.java, defaultActivityOptions) override fun start(idempotencyKey: String, transactionId: String): TransactionStatus { logger.info("Start workflow to process transaction $transactionId with idempotency key $idempotencyKey") transactionActivity.transfer(idempotencyKey, transactionId) logger.info("Transaction has been send to core bank, start to wait core bank status") // wait until transaction status get updated Workflow.await { logger.info("Wait transaction status get updated,current status is $status") status != TransactionStatus.PENDING } return status } override fun updateTransactionStatus(status: TransactionStatus) { logger.info("Received transaction status update event, status is $status") this.status = status } }
Update workflow status in kafka consumer
@KafkaListener class TransactionListener(private val workflowClient: WorkflowClient) { private val logger = getLogger() @Topic("integration.postings_api.card_transaction_manager_postings_client.response") fun receive(@KafkaKey key: String, transactionEvent: TransactionEvent) { logger.info("Received transaction event $transactionEvent with idempotency key $key") val workflowStub: WorkflowStub = workflowClient.newUntypedWorkflowStub(key) workflowStub.signal("updateTransactionStatus", transactionEvent.status) } }
Pert testing
I performed perf testing using gatling to end 100 request during 20 seconds (5request per sec) to brave environment, the mean response time is 816 ms, p99 is 2.5s
Conclusion
Use temporal to handle the async to sync flow.
Attachments:
Screen Shot 2022-11-09 at 12.43.11.png (image/png)
Async to Sync.drawio (2).png (image/png)