Background

This page describes the process of extracting data from Genesys cloud API and loading it to Data Lake.

Workflow

There are three (3) data sources that are extracted:

  • Conversation - detailed data of genesys conversations/interactions. It includes media type (call, message, etc), conversation start time, conversation end time, participants, direction, metrics, segments.

  • Evaluation - detailed data of evaluators evaluation of an interaction. It include the scores, comments, event time.

  • User Details - provides a low-level view into all the status changes a user went through during a given interval

Ingestion

Upload the data sources that need to be ingested into the target system.

Conversation

Endpoint : /api/v2/analytics/conversations/details/query

Configuration

Preparing the configuration files.

{
    "conversations_url": "/analytics/conversations/details/query",
    "request_body": {
        "interval": "",
        "order": "asc",
        "orderBy": "conversationStart",
        "paging": {
          "pageSize": 25,
          "pageNumber": 1
        }
    },
    "gcs_report": "conversations",
    "gcs_filename": "genesys_conversations_<date>_<page>.json",
    "bq_dataset": "Genesys",
    "bq_table": "raw_genesys_conversations_v1",
    "schema_filename": "schema_conversation_full.json",
    "start_date": 2,
    "end_date": 1,
    "start_time": "T16:00:00.000Z",
    "end_time": "T15:59:59.000Z"
}
  • conversations_url - The endpoint URL of the Conversation API.

  • request_body - The values passed in the API GET request. It includes the query parameters for filtering the necessary fields.

  • gcs_* - These fields are used for the Cloud Storage bucket.

  • bq_* - These fields are used for the BigQuery load.

  • schema_filename - The full path of the schema file used for configuring the BigQuery table. Sample schema is located in the GitHub repository.

  • start_date - The start date input for time delta.

  • end_date - The end date input for time delta.

  • start_time - The start time defined for data coverage.

  • end_time - The end time defined for data coverage.

Python Script

The code for the Genesys Conversation API ingestion is contained inside the main.py. The Cloud Function will pass data and context as a parameter to the genesys_conversation function when it is run.

def genesys_conversation(data, context):
    sm = SecretManagerHelper(os.environ['SECRET_ID_GENESYS'])
    secret_in_json = sm.get_secret()

    api_conv_config = gen_util.get_api_config('config_conversation.json')
    api_gen_config = gen_util.get_api_config('config_general.json')
    api_gen_config = gen_util.get_oauth_token(api_gen_config, secret_in_json['clientid'], secret_in_json['clientsecret'])

    interval = gen_util.get_interval(api_conv_config['start_time'], 
                                        api_conv_config['end_time'], 
                                        api_conv_config['start_date'],
                                        api_conv_config['end_date'])

    api_conv_config['request_body']['interval'] = interval    

    gcs_filename = api_conv_config['gcs_filename']
    gcs_file_ext = gcs_filename.split('.')[1]

    yesterday = date.today() - timedelta(1)
    date_dct = gen_util.get_year_month_day(yesterday)

    gcs_blob_path = f'{api_gen_config["gcs_source"]}' \
                    f'/{api_conv_config["gcs_report"]}' \
                    f'/{date_dct["year"]}' \
                    f'/{date_dct["month"]}' \
                    f'/{date_dct["day"]}'
    
    api_client = RequestsApi(api_gen_config['api_base_url'])
    conversations_response = api_client.post(api_conv_config['conversations_url'],
                                                json=api_conv_config['request_body'],
                                                headers=api_gen_config['request_headers'])
    data = json.loads(conversations_response.text)

    total_pages = gen_util.get_total_pages(data['totalHits'], api_conv_config['request_body']['paging']['pageSize'])
    gcs_file_page = 1

    for page in range(0, total_pages):
        response = api_client.post(api_conv_config['conversations_url'],
                                json=api_conv_config['request_body'],
                                headers=api_gen_config['request_headers'])

        data = response.json()
        conversations = data['conversations']

        api_conv_config['request_body']['paging']['pageNumber'] += 1

        full_filename = gcs_filename.replace('<date>', yesterday.strftime(api_gen_config['gcs_date_format'])) \
            .replace('<page>', str(gcs_file_page))
        
        results = '\n'.join(map(json.dumps, conversations))
        gcs_client = GcsHelper(api_gen_config['gcs_bucket_name'])
        gcs_client.blob = f'{gcs_blob_path}/{full_filename}'
        try:
            gcs_client.upload_from_string(results, gcs_client.blob, gcs_file_ext)
        except GoogleAPIError as e:
            logger.error(e)

        gcs_file_page += 1

Evaluation

Endpoint : /api/v2/quality/evaluations/query

Configuration

Preparing the configuration files.

{
    "conversations_url": "/analytics/conversations/details/query",
    "users_url": "/users",
    "auth_roles_url": "/authorization/roles",
    "auth_roles_users_url": "/authorization/roles/<roleId>/users",
    "evaluation_url": "/quality/evaluations/query",
    "evaluation_parameters": {
        "startTime": "2022-09-01T16:00:00.000Z",
        "endTime": "2023-02-15T15:59:59.000Z",
        "evaluatorUserId": "<evaluatorUserId>",
        "expandAnswerTotalScores": true
    },
    "users_parameters": {
        "expand": "authorization",
        "pageSize": 2,
        "pageNumber": 1
    },
    "get_auth_roles_parameters": {
        "name": "Quality Evaluator"
    },
    "get_user_ids_from_auth_role_parameters": {
        "pageSize": 2,
        "pageNumber": 1
    },
    "gcs_report": "evaluations_refactored",
    "report_version": 1,
    "schema_filename": "schema_genesys_evaluation.json",
    "start_date_minus": 2,
    "end_date_minus": 1,
    "start_time": "T16:00:00.000Z",
    "end_time": "T15:59:59.000Z"
}
  • conversations_url - The endpoint URL of the Conversations API.

  • users_url - The endpoint URL of the Users API.

  • evaluation_url - The endpoint URL of Evaluations API.

  • evaluation_parameters - The values passed in the API GET request. It includes the query parameters for filtering the necessary fields.

  • gcs_* - These fields are used for the Cloud Storage bucket.

  • bq_* - These fields are used for the BigQuery load.

  • schema_filename - The full path of the schema file used for configuring the BigQuery table. Sample schema is located in the GitHub repository.

  • start_date_minus - The start date input for time delta.

  • end_date_minus - The end date input for time delta.

  • start_time - The start time defined for data coverage.

  • end_time - The end time defined for data coverage.

Python Script

The code for the Genesys Evaluation API ingestion is contained inside the main.py. The Cloud Function will pass data and context as a parameter to the genesys_evaluation function when it is run.

def genesys_evaluation(request):
    sm = SecretManagerHelper(os.environ['SECRET_ID_GENESYS'])
    secret_in_json = sm.get_secret()

    api_eval_config = gen_util.get_api_config('config_genesys_evaluation.json')
    api_gen_config = gen_util.get_api_config('config_genesys_general.json')
    api_gen_config = gen_util.get_oauth_token(api_gen_config, secret_in_json['clientid'],
                                              secret_in_json['clientsecret'])

    gcs_bucket = api_gen_config['gcs_bucket_name']
    gcs_file_ext = api_gen_config['gcs_format']
    gcs_report = api_eval_config['gcs_report']
    gcs_source = api_gen_config['gcs_source']
    gcs_blob_path = gen_util.generate_gcs_uri(api_gen_config["gcs_source"], api_eval_config["gcs_report"])
    request_headers = api_gen_config['request_headers']
    yesterday = date.today() - timedelta(1)
    date_interval = gen_util.get_interval(api_eval_config['start_time'],
                                          api_eval_config['end_time'],
                                          api_eval_config['start_date_minus'],
                                          api_eval_config['end_date_minus']).split('/')
    api_eval_config['evaluation_parameters']['startTime'] = date_interval[0]
    api_eval_config['evaluation_parameters']['endTime'] = date_interval[1]

    api_client = RequestsApi(api_gen_config['api_base_url'])
    # Get the Quality Evaluator role
    quality_evaluator_role_id = gen_util.get_auth_role(api_client, api_eval_config,
                                                       request_headers)['entities'][0]['id']
    # Get the users using the Quality Evaluator role's id
    user_ids = gen_util.get_user_ids_from_role_id(api_client, quality_evaluator_role_id,
                                                  api_eval_config, request_headers)

    # API Request to get evaluations using evaluator user id
    evaluations_response_list = []
    for user_id in user_ids:
        api_eval_config['evaluation_parameters']['evaluatorUserId'] = user_id
        evaluations_response = api_client.get(api_eval_config['evaluation_url'],
                                              params=api_eval_config['evaluation_parameters'],
                                              headers=request_headers)
        evaluations_entity = evaluations_response.json()['entities']

        if len(evaluations_entity) != 0:
            evaluations_response_list.append(evaluations_entity)

    # Upload evaluation data to GCS
    gen_util.upload_list_data_to_gcs(evaluations_response_list, yesterday, gcs_source, gcs_report, gcs_file_ext,
                                     api_gen_config['gcs_date_format'], gcs_bucket, gcs_blob_path)

    # Load data to BQ
    gen_util.load_data_to_bq(api_eval_config['schema_filename'], gcs_source, gcs_report,
                             api_eval_config['report_version'], gcs_bucket, gcs_file_ext)

    return "The Job run was successfully executed."

Users Details

Endpoint : /api/v2/analytics/users/details/query

Lookup

Agent Details : /api/v2/users

Published Evaluation : /api/v2/quality/forms/evaluations

/api/v2/quality/forms/evaluations/{formId}

Queues : /api/v2/routing/queues

Configuration

Preparing the configuration files.

{
    "users_url":"/analytics/users/details/query",
    "request_body": {
        "interval": "2023-01-31T16:00:00.000Z/2023-02-01T15:59:59.000Z",
        "order": "asc",
        "paging": {
         "pageSize": 100,
         "pageNumber": 1
        }
       },
    "gcs_report": "user_details",
    "report_version": 1,
    "schema_filename": "schema_user.json",
    "start_date": 2,
    "end_date": 1,
    "start_time": "T16:00:00.000Z",
    "end_time": "T15:59:59.000Z"
}
  • users_url - The endpoint URL of the Conversation API.

  • request_body - The values passed in the API GET request. It includes the query parameters for filtering the necessary fields.

  • gcs_* - These fields are used for the Cloud Storage bucket.

  • bq_* - These fields are used for the BigQuery load.

  • schema_filename - The full path of the schema file used for configuring the BigQuery table. Sample schema is located in the GitHub repository.

  • start_date - The start date input for time delta.

  • end_date - The end date input for time delta.

  • start_time - The start time defined for data coverage.

  • end_time - The end time defined for data coverage.

Python Script

The code for the Genesys Users Details API ingestion is contained inside the main.py. The Cloud Function will pass data and context as a parameter to the genesys_evaluation function when it is run.

def genesys_user_details(data, context):
    sm = SecretManagerHelper(os.environ['SECRET_ID_GENESYS'])
    secret_in_json = sm.get_secret()

    api_user_config = gen_util.get_api_config('config_user.json')
    api_gen_config = gen_util.get_api_config('config_general.json')
    api_gen_config = gen_util.get_oauth_token(api_gen_config, secret_in_json['clientid'], secret_in_json['clientsecret'])

    interval = gen_util.get_interval(api_user_config['start_time'], 
                                        api_user_config['end_time'], 
                                        api_user_config['start_date'],
                                        api_user_config['end_date'])

    api_user_config['request_body']['interval'] = interval

    request_headers = api_gen_config['request_headers']
    base_url = (api_gen_config['api_base_url'])
    yesterday = date.today() - timedelta(1)

    gcs_blob_path = gen_util.generate_gcs_uri(api_gen_config["gcs_source"], api_user_config["gcs_report"])
    gcs_file_ext = api_gen_config['gcs_format']

    client = RequestsApi(base_url)

    total_hits = 1
    total_page = 1
    gcs_file_page = 1

    while total_page >= total_hits:
        req = client.post(api_user_config['users_url'],
                          json=api_user_config['request_body'],
                          headers=request_headers)
        response = req.json()
        total_page = gen_util.get_total_pages(response['totalHits'],
                                              api_user_config['request_body']['paging']['pageSize'])
        total_hits += 1
        api_user_config['request_body']['paging']['pageNumber'] = total_hits
        data = response['userDetails']
        full_filename = gen_util.generate_gcs_filename(api_gen_config['gcs_source'],
                                                       api_user_config['gcs_report'],
                                                       gcs_file_ext,
                                                       yesterday,
                                                       api_gen_config['gcs_date_format'],
                                                       gcs_file_page)   

        results = '\n'.join(map(json.dumps, data))
        gcs_client = GcsHelper(api_gen_config['gcs_bucket_name'])
        gcs_client.blob = f'{gcs_blob_path}/{full_filename}'
        gcs_client.upload_from_string(results, gcs_client.blob, gcs_file_ext)
        gcs_file_page += 1

    bq_table_name = gen_util.generate_bq_table_name(api_gen_config['gcs_source'],
                                                    api_user_config['gcs_report'],
                                                    api_user_config['report_version'])
    bq = BqHelper(api_gen_config['gcs_source'].capitalize(), bq_table_name)
    job = bq.insert_to_bq_from_gcs(api_user_config['schema_filename'], f"{api_gen_config['gcs_bucket_uri']}/{gcs_blob_path}/*.json")

Transformation

Transform the source data into a common format that can be ingested into the target system.

Structuring Models

Within the dbt project, there is a parent directory called "models" that contains two sub-directories, each representing one of the types of models.

models
|---genesys
  |---conversations
    |---marts
      |---prd_genesys_conversations_v1.sql
    |---staging
      |---stg_genesys_conversations_v1.sql
    |---schema_genesys_conversations.yml
  |---evaluations
    |---marts
      |---prd_genesys_evaluations_v1.sql
    |---staging
      |---stg_genesys_evaluations_v1.sql
    |---schema_genesys_evaluations.yml
  |---user_details
    |---marts
      |---prd_genesys_user_details_v1.sql
    |---staging
      |---stg_genesys_user_details_v1.sql
    |---schema_genesys_user_details.yml
dbt.project.yml
  • prd_genesys_<endpoint name>_v1.sql - contains the "final" version of the data, and is updated by transforming and merging data from the source.

  • stg_genesys_<endpoint name>_v1.sql - temporary area where data is tested and cleaned up before being loaded into the production table.

  • dbt_project.yml - a configuration file that contains settings for the dbt project. It is used to define settings such as the project name, the target database, and the schema to which the models should be deployed.

Lineage Graph

Schema

For the associated schema of the following tables, visit the link to repo.

Table

GitHub Link

prd_genesys_conversations

Conversation

prd_genesys_evaluations

Evaluation

prd_genesys_user_details

User Details

Macros

Macros are defined in .sql files within the macros directory of the dbt project.

  • generate_schema_name.sql - this macro is used to generate a schema name dynamically.

{% macro generate_schema_name(custom_schema_name, node) -%}
  {%- set default_schema = target.schema -%}
  {%- if custom_schema_name is none -%}
    {{ default_schema }}
  {%- else -%}
    {{ custom_schema_name | trim }}
  {%- endif -%}
{%- endmacro %}

Data Quality

To monitor data quality of the tables, the following tests are executed:

Test

Table

Field

unique

raw_genesys_conversations_v1

conversationId

not_null

raw_genesys_conversations_v1

conversationId

not_null

raw_genesys_conversations_v1

conversationEnd

unique

raw_genesys_evaluations_refactored_v1

id

not_null

raw_genesys_evaluations_refactored_v1

id

unique

raw_genesys_evaluation_form_v1

id

not_null

raw_genesys_evaluation_form_v1

id

To execute data quality testing, run dbt test --select genesys.

Attachments:

Flowchart-3.jpg (image/jpeg)
Flowchart-5.jpg (image/jpeg)
Flowchart-6.jpg (image/jpeg)
image-20230220-032011.png (image/png)
image-20230220-032218.png (image/png)
image-20230220-032249.png (image/png)