Background
This page describes the process of extracting data from Genesys cloud API and loading it to Data Lake.
Workflow
There are three (3) data sources that are extracted:
Conversation - detailed data of genesys conversations/interactions. It includes media type (call, message, etc), conversation start time, conversation end time, participants, direction, metrics, segments.
Evaluation - detailed data of evaluators evaluation of an interaction. It include the scores, comments, event time.
User Details - provides a low-level view into all the status changes a user went through during a given interval
Ingestion
Upload the data sources that need to be ingested into the target system.
Conversation
Endpoint : /api/v2/analytics/conversations/details/query
Configuration
Preparing the configuration files.
{ "conversations_url": "/analytics/conversations/details/query", "request_body": { "interval": "", "order": "asc", "orderBy": "conversationStart", "paging": { "pageSize": 25, "pageNumber": 1 } }, "gcs_report": "conversations", "gcs_filename": "genesys_conversations_<date>_<page>.json", "bq_dataset": "Genesys", "bq_table": "raw_genesys_conversations_v1", "schema_filename": "schema_conversation_full.json", "start_date": 2, "end_date": 1, "start_time": "T16:00:00.000Z", "end_time": "T15:59:59.000Z" }
conversations_url - The endpoint URL of the Conversation API.
request_body - The values passed in the API GET request. It includes the query parameters for filtering the necessary fields.
gcs_* - These fields are used for the Cloud Storage bucket.
bq_* - These fields are used for the BigQuery load.
schema_filename - The full path of the schema file used for configuring the BigQuery table. Sample schema is located in the GitHub repository.
start_date - The start date input for time delta.
end_date - The end date input for time delta.
start_time - The start time defined for data coverage.
end_time - The end time defined for data coverage.
Python Script
The code for the Genesys Conversation API ingestion is contained inside the main.py. The Cloud Function will pass data and context as a parameter to the genesys_conversation function when it is run.
def genesys_conversation(data, context): sm = SecretManagerHelper(os.environ['SECRET_ID_GENESYS']) secret_in_json = sm.get_secret() api_conv_config = gen_util.get_api_config('config_conversation.json') api_gen_config = gen_util.get_api_config('config_general.json') api_gen_config = gen_util.get_oauth_token(api_gen_config, secret_in_json['clientid'], secret_in_json['clientsecret']) interval = gen_util.get_interval(api_conv_config['start_time'], api_conv_config['end_time'], api_conv_config['start_date'], api_conv_config['end_date']) api_conv_config['request_body']['interval'] = interval gcs_filename = api_conv_config['gcs_filename'] gcs_file_ext = gcs_filename.split('.')[1] yesterday = date.today() - timedelta(1) date_dct = gen_util.get_year_month_day(yesterday) gcs_blob_path = f'{api_gen_config["gcs_source"]}' \ f'/{api_conv_config["gcs_report"]}' \ f'/{date_dct["year"]}' \ f'/{date_dct["month"]}' \ f'/{date_dct["day"]}' api_client = RequestsApi(api_gen_config['api_base_url']) conversations_response = api_client.post(api_conv_config['conversations_url'], json=api_conv_config['request_body'], headers=api_gen_config['request_headers']) data = json.loads(conversations_response.text) total_pages = gen_util.get_total_pages(data['totalHits'], api_conv_config['request_body']['paging']['pageSize']) gcs_file_page = 1 for page in range(0, total_pages): response = api_client.post(api_conv_config['conversations_url'], json=api_conv_config['request_body'], headers=api_gen_config['request_headers']) data = response.json() conversations = data['conversations'] api_conv_config['request_body']['paging']['pageNumber'] += 1 full_filename = gcs_filename.replace('<date>', yesterday.strftime(api_gen_config['gcs_date_format'])) \ .replace('<page>', str(gcs_file_page)) results = '\n'.join(map(json.dumps, conversations)) gcs_client = GcsHelper(api_gen_config['gcs_bucket_name']) gcs_client.blob = f'{gcs_blob_path}/{full_filename}' try: gcs_client.upload_from_string(results, gcs_client.blob, gcs_file_ext) except GoogleAPIError as e: logger.error(e) gcs_file_page += 1
Evaluation
Endpoint : /api/v2/quality/evaluations/query
Configuration
Preparing the configuration files.
{ "conversations_url": "/analytics/conversations/details/query", "users_url": "/users", "auth_roles_url": "/authorization/roles", "auth_roles_users_url": "/authorization/roles/<roleId>/users", "evaluation_url": "/quality/evaluations/query", "evaluation_parameters": { "startTime": "2022-09-01T16:00:00.000Z", "endTime": "2023-02-15T15:59:59.000Z", "evaluatorUserId": "<evaluatorUserId>", "expandAnswerTotalScores": true }, "users_parameters": { "expand": "authorization", "pageSize": 2, "pageNumber": 1 }, "get_auth_roles_parameters": { "name": "Quality Evaluator" }, "get_user_ids_from_auth_role_parameters": { "pageSize": 2, "pageNumber": 1 }, "gcs_report": "evaluations_refactored", "report_version": 1, "schema_filename": "schema_genesys_evaluation.json", "start_date_minus": 2, "end_date_minus": 1, "start_time": "T16:00:00.000Z", "end_time": "T15:59:59.000Z" }
conversations_url - The endpoint URL of the Conversations API.
users_url - The endpoint URL of the Users API.
evaluation_url - The endpoint URL of Evaluations API.
evaluation_parameters - The values passed in the API GET request. It includes the query parameters for filtering the necessary fields.
gcs_* - These fields are used for the Cloud Storage bucket.
bq_* - These fields are used for the BigQuery load.
schema_filename - The full path of the schema file used for configuring the BigQuery table. Sample schema is located in the GitHub repository.
start_date_minus - The start date input for time delta.
end_date_minus - The end date input for time delta.
start_time - The start time defined for data coverage.
end_time - The end time defined for data coverage.
Python Script
The code for the Genesys Evaluation API ingestion is contained inside the main.py. The Cloud Function will pass data and context as a parameter to the genesys_evaluation function when it is run.
def genesys_evaluation(request): sm = SecretManagerHelper(os.environ['SECRET_ID_GENESYS']) secret_in_json = sm.get_secret() api_eval_config = gen_util.get_api_config('config_genesys_evaluation.json') api_gen_config = gen_util.get_api_config('config_genesys_general.json') api_gen_config = gen_util.get_oauth_token(api_gen_config, secret_in_json['clientid'], secret_in_json['clientsecret']) gcs_bucket = api_gen_config['gcs_bucket_name'] gcs_file_ext = api_gen_config['gcs_format'] gcs_report = api_eval_config['gcs_report'] gcs_source = api_gen_config['gcs_source'] gcs_blob_path = gen_util.generate_gcs_uri(api_gen_config["gcs_source"], api_eval_config["gcs_report"]) request_headers = api_gen_config['request_headers'] yesterday = date.today() - timedelta(1) date_interval = gen_util.get_interval(api_eval_config['start_time'], api_eval_config['end_time'], api_eval_config['start_date_minus'], api_eval_config['end_date_minus']).split('/') api_eval_config['evaluation_parameters']['startTime'] = date_interval[0] api_eval_config['evaluation_parameters']['endTime'] = date_interval[1] api_client = RequestsApi(api_gen_config['api_base_url']) # Get the Quality Evaluator role quality_evaluator_role_id = gen_util.get_auth_role(api_client, api_eval_config, request_headers)['entities'][0]['id'] # Get the users using the Quality Evaluator role's id user_ids = gen_util.get_user_ids_from_role_id(api_client, quality_evaluator_role_id, api_eval_config, request_headers) # API Request to get evaluations using evaluator user id evaluations_response_list = [] for user_id in user_ids: api_eval_config['evaluation_parameters']['evaluatorUserId'] = user_id evaluations_response = api_client.get(api_eval_config['evaluation_url'], params=api_eval_config['evaluation_parameters'], headers=request_headers) evaluations_entity = evaluations_response.json()['entities'] if len(evaluations_entity) != 0: evaluations_response_list.append(evaluations_entity) # Upload evaluation data to GCS gen_util.upload_list_data_to_gcs(evaluations_response_list, yesterday, gcs_source, gcs_report, gcs_file_ext, api_gen_config['gcs_date_format'], gcs_bucket, gcs_blob_path) # Load data to BQ gen_util.load_data_to_bq(api_eval_config['schema_filename'], gcs_source, gcs_report, api_eval_config['report_version'], gcs_bucket, gcs_file_ext) return "The Job run was successfully executed."
Users Details
Endpoint : /api/v2/analytics/users/details/query
Lookup
Agent Details : /api/v2/users
Published Evaluation : /api/v2/quality/forms/evaluations
/api/v2/quality/forms/evaluations/{formId}
Queues : /api/v2/routing/queues
Configuration
Preparing the configuration files.
{ "users_url":"/analytics/users/details/query", "request_body": { "interval": "2023-01-31T16:00:00.000Z/2023-02-01T15:59:59.000Z", "order": "asc", "paging": { "pageSize": 100, "pageNumber": 1 } }, "gcs_report": "user_details", "report_version": 1, "schema_filename": "schema_user.json", "start_date": 2, "end_date": 1, "start_time": "T16:00:00.000Z", "end_time": "T15:59:59.000Z" }
users_url - The endpoint URL of the Conversation API.
request_body - The values passed in the API GET request. It includes the query parameters for filtering the necessary fields.
gcs_* - These fields are used for the Cloud Storage bucket.
bq_* - These fields are used for the BigQuery load.
schema_filename - The full path of the schema file used for configuring the BigQuery table. Sample schema is located in the GitHub repository.
start_date - The start date input for time delta.
end_date - The end date input for time delta.
start_time - The start time defined for data coverage.
end_time - The end time defined for data coverage.
Python Script
The code for the Genesys Users Details API ingestion is contained inside the main.py. The Cloud Function will pass data and context as a parameter to the genesys_evaluation function when it is run.
def genesys_user_details(data, context): sm = SecretManagerHelper(os.environ['SECRET_ID_GENESYS']) secret_in_json = sm.get_secret() api_user_config = gen_util.get_api_config('config_user.json') api_gen_config = gen_util.get_api_config('config_general.json') api_gen_config = gen_util.get_oauth_token(api_gen_config, secret_in_json['clientid'], secret_in_json['clientsecret']) interval = gen_util.get_interval(api_user_config['start_time'], api_user_config['end_time'], api_user_config['start_date'], api_user_config['end_date']) api_user_config['request_body']['interval'] = interval request_headers = api_gen_config['request_headers'] base_url = (api_gen_config['api_base_url']) yesterday = date.today() - timedelta(1) gcs_blob_path = gen_util.generate_gcs_uri(api_gen_config["gcs_source"], api_user_config["gcs_report"]) gcs_file_ext = api_gen_config['gcs_format'] client = RequestsApi(base_url) total_hits = 1 total_page = 1 gcs_file_page = 1 while total_page >= total_hits: req = client.post(api_user_config['users_url'], json=api_user_config['request_body'], headers=request_headers) response = req.json() total_page = gen_util.get_total_pages(response['totalHits'], api_user_config['request_body']['paging']['pageSize']) total_hits += 1 api_user_config['request_body']['paging']['pageNumber'] = total_hits data = response['userDetails'] full_filename = gen_util.generate_gcs_filename(api_gen_config['gcs_source'], api_user_config['gcs_report'], gcs_file_ext, yesterday, api_gen_config['gcs_date_format'], gcs_file_page) results = '\n'.join(map(json.dumps, data)) gcs_client = GcsHelper(api_gen_config['gcs_bucket_name']) gcs_client.blob = f'{gcs_blob_path}/{full_filename}' gcs_client.upload_from_string(results, gcs_client.blob, gcs_file_ext) gcs_file_page += 1 bq_table_name = gen_util.generate_bq_table_name(api_gen_config['gcs_source'], api_user_config['gcs_report'], api_user_config['report_version']) bq = BqHelper(api_gen_config['gcs_source'].capitalize(), bq_table_name) job = bq.insert_to_bq_from_gcs(api_user_config['schema_filename'], f"{api_gen_config['gcs_bucket_uri']}/{gcs_blob_path}/*.json")
Transformation
Transform the source data into a common format that can be ingested into the target system.
Structuring Models
Within the dbt project, there is a parent directory called "models" that contains two sub-directories, each representing one of the types of models.
models |---genesys |---conversations |---marts |---prd_genesys_conversations_v1.sql |---staging |---stg_genesys_conversations_v1.sql |---schema_genesys_conversations.yml |---evaluations |---marts |---prd_genesys_evaluations_v1.sql |---staging |---stg_genesys_evaluations_v1.sql |---schema_genesys_evaluations.yml |---user_details |---marts |---prd_genesys_user_details_v1.sql |---staging |---stg_genesys_user_details_v1.sql |---schema_genesys_user_details.yml dbt.project.yml
prd_genesys_<endpoint name>_v1.sql - contains the "final" version of the data, and is updated by transforming and merging data from the source.
stg_genesys_<endpoint name>_v1.sql - temporary area where data is tested and cleaned up before being loaded into the production table.
dbt_project.yml - a configuration file that contains settings for the dbt project. It is used to define settings such as the project name, the target database, and the schema to which the models should be deployed.
Lineage Graph
Schema
For the associated schema of the following tables, visit the link to repo.
Table | GitHub Link |
---|---|
prd_genesys_conversations | |
prd_genesys_evaluations | |
prd_genesys_user_details |
Macros
Macros are defined in .sql files within the macros directory of the dbt project.
generate_schema_name.sql - this macro is used to generate a schema name dynamically.
{% macro generate_schema_name(custom_schema_name, node) -%} {%- set default_schema = target.schema -%} {%- if custom_schema_name is none -%} {{ default_schema }} {%- else -%} {{ custom_schema_name | trim }} {%- endif -%} {%- endmacro %}
Data Quality
To monitor data quality of the tables, the following tests are executed:
Test | Table | Field |
---|---|---|
unique |
|
|
not_null |
|
|
not_null |
|
|
unique |
|
|
not_null |
|
|
unique |
|
|
not_null |
|
|
To execute data quality testing, run dbt test --select genesys
.
Attachments:
Flowchart-5.jpg (image/jpeg)
Flowchart-6.jpg (image/jpeg)
image-20230220-032011.png (image/png)
image-20230220-032218.png (image/png)
image-20230220-032249.png (image/png)