Environments

For scheduled jobs, we’re using the Airflow environment created in Cloudcomposer.

For Brave environment it can be found here:

https://console.cloud.google.com/composer/environments?orgonly=true&project=safi-env-brave-cloudcomposer&supportedpurview=organizationId&pli=1

For Stage environment navigate here:

https://console.cloud.google.com/composer/environments?orgonly=true&project=safi-env-stage-cloudcomposer&supportedpurview=organizationId&pli=1

The rest of the documentation reflects the current Brave setup, for Stage use links on the above page

Access

To have Access developers need to be added to this configuration:

https://github.com/SafiBank/SaFiMono/blob/8f9f50205911a523a799195c726b5d5e9c4cc7a9/devops/terraform/_files/users_composer.yaml

DAGs

Each Job has to be created as a separate and independent DAG. All DAGs can be monitored here:

https://013817fadf5e44919e3601866c75370d-dot-asia-southeast1.composer.googleusercontent.com/home

Uploading a DAG

As of now the process of managing Jobs is manual, in the future it would be beneficial to use GitHub actions or some other automation to add/update jobs.

To add/update a DAG upload it to the DAGs folder: https://console.cloud.google.com/storage/browser/asia-southeast1-safi-brave--9478b5d3-bucket/dags;tab=objects?orgonly=true&project=safi-env-brave-cloudcomposer&supportedpurview=organizationId&prefix=&forceOnObjectsSortingFiltering=false

Versioning of Jobs

By agreement scripts for DAGs should reside under dags folder under the corresponding manager/service, such as e.g. for product-manager here:

https://github.com/SafiBank/SaFiMono/tree/main/services/product-manager/dags

DAG definition

DAG definitions are python scripts that define its functionality and configuration.

Example

Example of a DAG for renewing subscriptions:

"""A simple dag for calling subscription manager"""
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import timedelta

default_args = {
    'start_date': pendulum.datetime(2022, 10, 28, tz='UTC'),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}


@dag(
    dag_id='subscriptions_renewal',
    default_args=default_args,
    description='subsriptions renewal dag',
    schedule_interval="0 * * * *",
    dagrun_timeout=timedelta(minutes=20)
)
def subscriptions_renewal():

    @task(task_id="renewal")
    def execute_renewals():
        environment = Variable.get('SAFI_ENVIRONMENT')
        url = f"https://product-manager.apps.{environment}.safibank.online/development/renew-subscriptions"

        response = requests.request("POST", url)

        response.raise_for_status()

    execute_renewals()

dag = subscriptions_renewal()

In this example, we’re creating a simple dag that started to run on October 28th, 2022 (start_date), it would retry once if it fails for some reason in 5 minutes.

@dag defines the configuration for the dag

dag_id needs to be unique

schedule_interval uses standard cron format (check e.g. here: https://crontab.guru/ )

dagrun_timeout defines the time limit for the run (here 20 minutes)

notice usage of custom SAFI_ENVIRONMENT which is used so that the same script can be run in all environments we have (currently BRAVE and STAGE)

This is used in the URL and the main purpose of the whole script is to call an endpoint that is exposed through the service’s API.

raise_for_status - It is expected the response will contain status code 200, otherwise would fail (and retried as mentioned)

For further studies follow the link to Airflow documentation: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html