SaFi Bank Space : Jira Cloud Data Pipeline

Background

This document describes Data teams process for extracting data from Jira cloud API. Currently, the team is extracting data from Jira service management product. It contains the list of issues/tickets per project.

Report

Endpoint

Description

Servicedesk Issues

/rest/api/2/search

Searches issues using JQL.

Workflow

Ingestion

Preparing the configuration files:

{
    "base_url": "https://safibank.atlassian.net/rest/api/2",
    "url": "/search",
    "headers": {
        "Accept": "application/json"
    },
    "parameters": {
        "query": {
            "jql": "project IN (DT, BOSD) and (updated >= '<start>' and updated < '<end>')"
        },
        "startAt": 0,
        "maxResults": 100
    },
    "gcs_bucket_uri": "gs://data-automation-raw-data",
    "gcs_bucket_name": "data-automation-raw-data",
    "gcs_source": "jira",
    "gcs_format": "json",
    "gcs_report": "issues",
    "gcs_date_format": "%Y%m%d",
    "gcs_filename": "jira_servicedesk-issues_<date>_<page>.json",
    "bq_dataset": "jira",
    "bq_table": "issues_v7",
    "schema_filename": "schema/jira_servicedesk_issues.json"
}

base_url - The main URL of the Jira API.

url - Specific endpoint that will be invoked in the Cloud Function.

headers - Represent the meta-data associated with the API request and response.

parameters - The values passed in the API GET request. It includes the JQL for filtering the project and updated fields.

gcs_* - These fields are used for the Cloud Storage bucket.

bq_* - These fields are used for the BigQuery load.

schema_filename - The full path of the schema file used for configuring the BigQuery table. Sample schema is located in the GitHub repository.

Python Script

The code for the Jira data ingestion is contained inside the main.py. The Cloud Function will pass data and context as a parameter to the jira_api function when it is run.

def jira_api(data, context):
    sm = SecretManagerHelper(os.environ['SECRET_ID_JIRA'])
    secret_in_json = sm.get_secret()
    api_config = gen_util.get_api_config('config/jira/servicedesk_issues.json')

    auth = (secret_in_json['username'], secret_in_json['password'])
    end_date = date.today()
    start_date = end_date - timedelta(1)

    gcs_filename = api_config['gcs_filename']
    gcs_file_ext = gcs_filename.split('.')[1]
    date_dct = gen_util.get_year_month_day(start_date)
    gcs_blob_path = f'{api_config["gcs_source"]}/' \
                    f'{api_config["gcs_report"]}/' \
                    f'{date_dct["year"]}/' \
                    f'{date_dct["month"]}/' \
                    f'{date_dct["day"]}'

    page_size = api_config['parameters']['maxResults']
    start_at = api_config['parameters']['startAt']

    more_results_flag = True
    page = 1

    jql_query = api_config['parameters']['query']['jql']

    if jql_query.find('updated') != -1:
        parameters = {
            'jql':
                jql_query
                .replace('<start>', str(start_date))
                .replace('<end>', str(end_date))
        }
    else:
        parameters = api_config['parameters']['query']

    # Loop for fetching paginated data
    while more_results_flag:
        url = f"{api_config['url']}?startAt={start_at}&maxResults={page_size}"
        jira_api_client = RequestsApi(api_config['base_url'], headers=api_config['headers'])
        response = jira_api_client.get(url, params=parameters, auth=auth)
        data = json.loads(response.text)
        full_filename = gen_util.generate_gcs_filename(gcs_filename, start_date,
                                                       api_config['gcs_date_format'], page)

        if response.status_code == 200:
            start_at += page_size
            final_data = gen_util.add_string_to_key_names(data['issues'])
            results = '\n'.join(map(json.dumps, final_data))

            # Upload files to GCS
            gcs_client = GcsHelper(api_config['gcs_bucket_name'])
            gcs_client.blob = f'{gcs_blob_path}/{full_filename}'
            gcs_client.upload_from_string(results, gcs_client.blob, gcs_file_ext)

            more_results_flag = start_at < data["total"]
            page += 1

        else:
            more_results_flag = False

    bq = BqHelper(api_config['bq_dataset'], api_config['bq_table'])
    job = bq.insert_to_bq_from_gcs(api_config['schema_filename'],
                                   f"{api_config['gcs_bucket_uri']}/{gcs_blob_path}/*.json")

Load

For storing the data in BigQuery, the Load Configuration should be set accordingly. It can be found inside the bq_helper file in GitHub:

@staticmethod
def get_load_job_config(table_schema):
    return bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        schema=table_schema,
        ignore_unknown_values=True,
        time_partitioning=bigquery.table.TimePartitioning()
    )

write_disposition=bigquery.WriteDisposition.WRITE_APPEND - If the table already exists, BigQuery appends the data to the table.

source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON - The format of the source is newline-delimited JSON.

schema=table_schema - Represents the destination table’s schema in JSON format.

ignore_unknown_values=True - Ignore extra values not represented in the table schema.

time_partitioning=bigquery.table.TimePartitioning() - Specifies time-based partitioning for the destination table.

The BigQuery table will be populated when the function insert_to_bq_from_gcs() is called.

Transformation

Structuring Models

Within the dbt project, there is a parent directory called "models" that contains two sub-directories, each representing one of the types of models.

models
|---jira
  |---marts
    |---prd_jira_serviceproject_issues_v1.sql
  |---staging
    |---stg_jira_serviceproject_issues_v1.sql
dbt.project.yml
  • prd_jira_serviceproject_issues_v1.sql - contains the "final" version of the data, and is updated by transforming and merging data from the source.

  • stg_jira_serviceproject_issues_v1.sql - temporary area where data is tested and cleaned up before being loaded into the production table.

  • dbt_project.yml - a configuration file that contains settings for the dbt project. It is used to define settings such as the project name, the target database, and the schema to which the models should be deployed.

Lineage Graph

Schema

fullname

mode

type

description

testing

key

NULLABLE

STRING

id

NULLABLE

INTEGER

not_null, is unique

PROJECT

NULLABLE

STRING

reporter_name

NULLABLE

STRING

creator_name

NULLABLE

STRING

department

NULLABLE

STRING

watch_count

NULLABLE

INTEGER

description

NULLABLE

STRING

priority

NULLABLE

STRING

issue_type

NULLABLE

STRING

status

NULLABLE

STRING

resolution

NULLABLE

STRING

assignee

NULLABLE

STRING

request_type

NULLABLE

STRING

resposible_team

NULLABLE

STRING

summary

NULLABLE

STRING

back_office_option

NULLABLE

STRING

customer_channel

NULLABLE

STRING

customer_type

NULLABLE

STRING

product_type

NULLABLE

STRING

created

NULLABLE

TIMESTAMP

updated

NULLABLE

TIMESTAMP

is_updated

time_to_first_response_ongoing_start

NULLABLE

TIMESTAMP

time_to_first_response_completed_start

NULLABLE

STRING

time_to_first_response_completed_breached

NULLABLE

BOOLEAN

time_to_first_resolution_ongoing_start

NULLABLE

TIMESTAMP

time_to_first_resolution_completed_start

NULLABLE

STRING

time_to_first_resolution_completed_breached

NULLABLE

BOOLEAN

due_date

NULLABLE

DATE

Macros

Macros are defined in .sql files within the macros directory of the dbt project.

  • generate_schema_name.sql - this macro is used to generate a schema name dynamically.

{% macro generate_schema_name(custom_schema_name, node) -%}
  {%- set default_schema = target.schema -%}
  {%- if custom_schema_name is none -%}
    {{ default_schema }}
  {%- else -%}
    {{ custom_schema_name | trim }}
  {%- endif -%}
{%- endmacro %}

Source Freshness

Source freshness refers to the timeliness of the data in the source tables that are used to create the dbt models

Here's the configuration of the .yml file to determine the source freshness of the source table jira.issues_v7 :

version: 2
sources:
  - name: jira_src
    description: "Loading dataset for staging"
    database: datatest-348502
    schema: jira
    tables:
      - name: issues_v7
    freshness:
      warn_after:
        count: 1
        period: day
      error_after:
        count: 2
        period: day
    loaded_at_field: _PARTITIONTIME

To check the source freshness, run dbt source freshness --select models/jira.

Data Quality

To monitor data quality of the tables, the following tests are executed:

Generic Tests

  • source_not_null_jira_prd_prd_jira_serviceproject_issues_v1_id - check if the "id" column contains any null values.

  • source_unique_jira_prd_prd_jira_serviceproject_issues_v1_id - check if the "id" column contains any duplicate values.

Singular Tests

  • assert_singular_prd_jira_issues_is_updated - compare the "updated" column from the prod table against the source table and check whether the latest "updated" date is returned to the prod table.

To execute data quality testing, run dbt test --select jira.

Deployment

Running dbt in production means setting up a system to run a dbt job on a schedule, rather than running dbt commands manually from the command line. Here's the job configuration:

Job Name: jira_serviceproject_issues_daily_run

Environment: Deployment

Target Name: default

Run Timeout: Never

Generate docs on run: True

Run source freshness: True

Commands: dbt build --select jira

Custom Cron Schedule (UTC): At 01:15 AM

dbt build --select jira - this command runs all tests and builds all views and tables.