This covers integrations with Genesys Download Call Recordings.

Context

Jira Tickets:

The tickets for download recordings:

SAF-1253 - Getting issue details... STATUS

SAF-1735 - Getting issue details... STATUS

SAF-1736 - Getting issue details... STATUS

SAF-1737 - Getting issue details... STATUS
SAF-1738 - Getting issue details... STATUS

SAF-1739 - Getting issue details... STATUS

SAF-1773 - Getting issue details... STATUS

SAF-1803 - Getting issue details... STATUS

Implementation

  1. Airflow integration: manage the cron job for download call recording;

  2. Check prev recording: list the GCS bucket, check the tag file for prev recordings;

  3. Download recording: download the recordings and recording metadata from the Genesys Cloud;

  4. Persist recording to GCS: upload the recordings to GCS(Google Cloud Storage)

Airflow integration:

We use airflow to trigger the cron job for downloading recordings.

The genesys-recording-migrator will define the py file for airflow, and then use the airflow to trigger the cron job

  • add /dags folder in genesys-recording-migrator service

  • add airflow python file: "genesys_recording_migrator_download_recordings.py"

    @dag(
        dag_id='genesys_recording_migrator_download_recordings',
        description='Trigger for downloading recordings with Genesys.',
        schedule_interval='0 0 * * *',
        catchup=False,
        start_date=pendulum.datetime(2022, 10, 21, tz='UTC'),
        dagrun_timeout=timedelta(minutes=20),
        tags=["genesys-gateway"]
    )
    def genesys_recording_migrator_download_recordings_dag():
        start = BashOperator(task_id="start", bash_command="echo parameter")
    
        @task(task_id='download_recordings')
        def download_recordings(**context):
            url = f'https://genesys-recording-migrator.apps.brave.safibank.online/genesys/scheduled-jobs/download_recordings'
            response = requests.post(url, json={})
            response.raise_for_status()
        start >> download_recordings()
        
    dag = genesys_recording_migrator_download_recordings_dag()

Note: Need to test the airflow, in local env, when post a request to genesys-recording-migrator service, the connection is failed.
Note: The cron job’s schedule: In order to prevent recording loss and recording overlap of date interval, we use the schedule like this:

eg:

cron job execution time: 2023-02-04.00:00:00. The conversation recording date interval ranges from 2023-02-01 00:00:00 to 2023-02-02 00:00:00

cron job execution time: 2023-02-05:00:00:00. The date interval of conversation recording is 2023-02-02 00:00:00 to 2023-02-03 00:00:00

  • three tasks in airflow workflow:
    (1) start_task: use this start node to clarify the execution time, easy to follow up on the problem;
    (2) check_prev_task: trigger the job to check the prev day’s recording. Since HTTP connections have a time-out limit, So when the downloading and uploading are successful or failed, it will also tag a ‘_SUCCESS' or '_FAILURE' file in GCS. In this task, we will check the '_SUCCESS' file;(If there is no _SUCCESS file in prev path, send an email to the developer, and try to rerun the prev task).
    (3) download_recording: trigger the job to download the recording.

Check prev recordings tag:

We will use Genesys Cloud SDK to implement this requirement.

  • add a new endpoint for the airflow cron job: /genesys/scheduled-jobs/check_recordings
    use date: recordingDay:

    fun checkRecordings(
      @NotNull recordingDay: String,
      httpRequest: HttpRequest<String>
    ) {
       // TODO
       listGCS()
       if (failed)
        sendEmailToDev()
    }

Download recordings:

We will use Genesys Cloud SDK to implement this requirement.

As the docs mentioned, when we trigger the cron job, in the job we can just use the SDK to download the recordings. https://developer.genesys.cloud/analyticsdatamanagement/recording/recordings-downloader

  • add a new endpoint for the airflow cron job: /genesys/scheduled-jobs/download_recordings
    define recordings' date interval: startTime, EndTime

    fun downloadRecordings(
      @NotNull startTime: String,
      @NotNull endTime: String,
      httpRequest: HttpRequest<String>
    )
  • use Genesys cloud recordings SDKs to download the matched recordings.

Recordings Type:

  1. Call Recordings: *.ogg

  2. Chat Recordings: *.json (only fetch the message.bodyType === "STANDARD" || message.bodyType === "NOTICE")

  3. Email Recordings: *.zip

  • use Genesys cloud recordings SDKs to download the matched recordings metadata.

Recordings Metadata: *.json

Note: About the date interval, the more detail can check from here: https://developer.genesys.cloud/analyticsdatamanagement/analytics/detail/#aggregations

Persist recording to GCS:

When we download the recording from the Genesys Cloud, we will upload the recordings to GCS(Google Cloud Storage):

  • Connect to the GCS

  • Get the right bucket for conversation recordings.

We need to discuss with the Ops team to figure out whether we need different bucket for dev/prod env.

GCS bucket file directory Settings: as much as possible to consider how to quickly query. The path like this: bucket/env/day/recording_type/ specific conversation file:

eg:

bucket/brave/2023-02-03/call/conversation1.ogg
bucket/brave/2023-02-03/chat/conversation1.json
bucket/brave/2023-02-03/email/conversation1.zip
bucket/brave/2023-02-03/metadata/conversation1.json

  • Upload the file to GCS

    log.info("Uploading object $objectId of ${content.size} Bytes to bucket $bucket.")
    
    val blobInfo = BlobInfo.newBuilder(bucket, objectId).also { builder ->
      contentType?.let { builder.setContentType(it) }
     }.build()
    val result = storage.service.createFrom(blobInfo, ByteArrayInputStream(content))
    return result.name
  • Create Tag file to GCS: when the downloading and uploading are successful, create the ‘_SUCCESS' tag file in GCS, otherwise create '_FAILURE' tag file in GCS;

bucket/brave/2023-02-03/_SUCCESS
bucket/brave/2023-02-03/_FAILURE