🔌CRM Integration: Universal Questions Inference Results

This guide describes how to extract evaluatio results for universal questions from the Trusst platform database and integrate them with backend systems such as Salesforce or Microsoft Dynamics.

CRM Integration: Universal Questions Inference Results (Direct Database Access)

Overview

This guide describes how to extract inference results for universal questions from the Trusst platform database using direct database access for GenesysOnPremise and PremierContactPoint (S3-based) streams and integrate them with Customer Relationship Management (CRM) systems such as Salesforce and Microsoft Dynamics.

Universal questions are evaluation questions that apply to all contacts regardless of which criteria is deduced during inference. These questions typically represent:

  • Core compliance requirements (e.g., "Did agent identify themselves?")

  • Standard quality metrics across all conversation types

  • Universal performance indicators

Since universal questions are evaluated on every contact, their inference results provide consistent, comparable metrics that can be synchronized to backend systems for reporting, analytics, and compliance tracking.


⚠️ Important Notices

Schema Stability Warning

This documentation reflects the database schema as of November 2025. Schema changes may occur in future Trusst platform updates. We recommend:

  • Using the provided SQL queries as templates only

  • Testing queries after platform upgrades before production use

  • Implementing robust error handling for schema changes

  • Contacting Trusst support before major platform upgrades

  • Monitoring the Trusst platform release notes for schema changes

For production deployments requiring long-term stability and security, consider using the REST API endpoint approach instead of direct database access. See the separate document: crm-integration-universal-questions-api.md for the recommended API-based integration approach.

Direct database access is suitable for:

  • Development and testing environments

  • Custom analytics requiring complex joins

  • One-time data migrations or backfills

  • Organizations with dedicated database administrators

Supported Stream Types

This document covers GenesysOnPremise and PremierContactPoint (S3-based) streams only. For other stream types (BrightPattern, Genesys Cloud, ContactSpace), please use the API endpoint approach.


Database Connection

Production Database Access

Recommended Approach: Use an Aurora PostgreSQL read replica to avoid impacting production performance.

Connection Details

  • Database Type: PostgreSQL (Aurora)

  • Network Access: Database is in a private VPC subnet. Requires VPN connection or VPC peering

  • Credentials: Read-only database credentials are stored in AWS Secrets Manager

    • Secret Path: trusst/{customer_name}/db-readonly-user

    • Contains: username, password, host, port, database

  • SSL/TLS: Required for all database connections

Retrieving Credentials from AWS Secrets Manager

import boto3
import json

def get_database_credentials(customer_name: str):
    """Retrieve read-only database credentials from AWS Secrets Manager."""
    client = boto3.client('secretsmanager', region_name='ap-southeast-2')

    secret_name = f"trusst/{customer_name}/db-readonly-user"
    response = client.get_secret_value(SecretId=secret_name)

    credentials = json.loads(response['SecretString'])
    return credentials

# Usage
creds = get_database_credentials('your-organization')
DATABASE_URL = f"postgresql://{creds['username']}:{creds['password']}@{creds['host']}:{creds['port']}/{creds['database']}?sslmode=require"

Network Access Requirements

Contact Trusst Support to configure:

  • VPN connection to customer VPC, OR

  • VPC peering between customer AWS account and Trusst deployment VPC

  • Security group rules to allow inbound PostgreSQL traffic (port 5432) from integration server


Database Schema

Relevant Tables

questions Table

Stores question definitions including universal questions.

Key Columns:

  • question_id (String, PK): Unique question identifier

  • key (String): Question key (e.g., "agent_identified", "compliance_check")

  • text (String): The question text shown to evaluators

  • result_type (String): Type of answer - Boolean, Rating, Paragraph, Label, Summary

  • is_universal (Boolean): True for universal questions

  • is_scored (Boolean): Whether this question contributes to scoring

  • group (String): Question group (e.g., Compliance, Quality)

  • created_at (Timestamp): When the question was created

inferences Table

Stores inference results linking questions to contacts.

Key Columns:

  • inference_id (String, PK): Unique inference identifier

  • contact_id (String, FK): Foreign key to contacts table

  • question_id (String, FK): Foreign key to questions table

  • result (String): The inference result value

  • created_at (Timestamp): When the inference was generated

  • updated_at (Timestamp): Last update timestamp

contacts Table

Stores conversation contact records.

Key Columns:

  • contact_id (String, PK): Unique contact identifier

  • stream_id (String, FK): Foreign key to stream (data source)

  • external_id (String): External system identifier (e.g., Genesys conversation ID)

  • start_time (Timestamp): When the contact started

  • direction (String): Call direction - "inbound" or "outbound"

  • current_stage (String): Processing stage - e.g., "COMPLETED", "TRANSCRIPTION", "INFERENCE"

  • agent_id (String): Agent who handled the contact

  • created_at (Timestamp): Record creation timestamp

s3_conversations Table

Stores metadata for S3-sourced conversations (GenesysOnPremise, PremierContactPoint streams).

Key Columns:

  • conversation_id (String, PK): Unique conversation identifier

  • contact_id (String, FK): Foreign key to contacts table

  • stream_id (String, FK): Foreign key to stream

  • conn_id (String): Connection ID from source system (e.g., Genesys recording ID)

  • agent_id (String): Agent identifier from source system

  • call_time (Timestamp): When the call occurred

  • audio_s3_key (String): S3 key for audio file

  • metadata_s3_key (String): S3 key for metadata file

Note: The s3_conversations table contains additional agent and connection metadata specific to S3-sourced conversations (GenesysOnPremise, PremierContactPoint). This table is essential for the queries in this document.


SQL Query: Fetch Universal Question Inference Results

Basic Query for Genesys On-Premise / PremierContactPoint

This query retrieves universal question inference results for GenesysOnPremise and PremierContactPoint (S3-based) stream types:

SELECT
    c.contact_id,
    c.external_id,
    s3c.call_time,
    c.start_time,
    c.direction,
    c.agent_id AS contact_agent_id,
    s3c.agent_id AS s3_agent_id,
    s3c.conn_id,
    q.question_id,
    q.key AS question_key,
    q.text AS question_text,
    q.result_type,
    q.group AS question_group,
    i.result,
    i.created_at AS inference_created_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
  AND c.current_stage = 'COMPLETED'
ORDER BY s3c.call_time DESC, c.contact_id, q.key;

Field Notes:

  • call_time: Call time from s3_conversations table (when the call occurred in source system)

  • contact_agent_id: Agent ID from contacts table (always populated)

  • s3_agent_id: Agent ID from s3_conversations table (should always be populated for S3-based streams)

  • conn_id: Connection/recording ID from GenesysOnPremise/PremierContactPoint source system

  • direction: Call direction ("inbound" or "outbound")

  • start_time: Contact start timestamp from contacts table

  • Filter: Only includes contacts where current_stage = 'COMPLETED' (fully processed)


Sample Result

contact_id
external_id
call_time
start_time
direction
contact_agent_id
s3_agent_id
conn_id
question_key
result

c1a2b3c4-...

550e8400-...

2025-01-15 14:23:10

2025-01-15 14:23:05

inbound

agent_john

john.smith

REC-001

agent_identified

true

c1a2b3c4-...

550e8400-...

2025-01-15 14:23:10

2025-01-15 14:23:05

inbound

agent_john

john.smith

REC-001

compliance_check

true

c1a2b3c4-...

550e8400-...

2025-01-15 14:23:10

2025-01-15 14:23:05

inbound

agent_john

john.smith

REC-001

quality_score

8

d5e6f7g8-...

6ba7b810-...

2025-01-15 13:45:22

2025-01-15 13:45:18

outbound

agent_sarah

sarah.jones

REC-002

agent_identified

false

d5e6f7g8-...

6ba7b810-...

2025-01-15 13:45:22

2025-01-15 13:45:18

outbound

agent_sarah

sarah.jones

REC-002

compliance_check

true

d5e6f7g8-...

6ba7b810-...

2025-01-15 13:45:22

2025-01-15 13:45:18

outbound

agent_sarah

sarah.jones

REC-002

quality_score

7

Note: Each contact has multiple rows (one per universal question). Only contacts with current_stage = 'COMPLETED' are included.


Time-Range Batching Query

For efficient processing, fetch results in time-based batches:

SELECT
    c.contact_id,
    c.external_id,
    s3c.call_time,
    c.start_time,
    c.direction,
    c.agent_id AS contact_agent_id,
    c.stream_id,
    s3c.agent_id AS s3_agent_id,
    s3c.conn_id,
    q.question_id,
    q.key AS question_key,
    q.text AS question_text,
    q.result_type,
    q.group AS question_group,
    i.result,
    i.created_at AS inference_created_at,
    i.updated_at AS inference_updated_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
  AND c.current_stage = 'COMPLETED'
  AND s3c.call_time >= :start_time
  AND s3c.call_time < :end_time
ORDER BY s3c.call_time DESC, c.contact_id, q.key;

Parameters:

  • :start_time - Start of time range (e.g., 2025-01-01 00:00:00)

  • :end_time - End of time range (e.g., 2025-01-02 00:00:00)

Example Usage (Python with SQLAlchemy):

from datetime import datetime, timedelta
from sqlalchemy import text

# Fetch results for last 24 hours
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)

query = text("""
    SELECT
        c.contact_id,
        c.external_id,
        s3c.call_time,
        c.start_time,
        c.direction,
        c.agent_id AS contact_agent_id,
        s3c.agent_id AS s3_agent_id,
        s3c.conn_id,
        q.key AS question_key,
        q.result_type,
        i.result
    FROM contacts c
    INNER JOIN inferences i ON c.contact_id = i.contact_id
    INNER JOIN questions q ON i.question_id = q.question_id
    LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
    WHERE q.is_universal = TRUE
      AND c.current_stage = 'COMPLETED'
      AND s3c.call_time >= :start_time
      AND s3c.call_time < :end_time
    ORDER BY s3c.call_time DESC, c.contact_id, q.key
""")

results = session.execute(query, {
    'start_time': start_time,
    'end_time': end_time
}).fetchall()

Incremental Sync Query (Delta)

For ongoing synchronization, fetch only new or updated inferences since last sync:

SELECT
    c.contact_id,
    c.external_id,
    s3c.call_time,
    c.start_time,
    c.direction,
    c.agent_id AS contact_agent_id,
    s3c.agent_id AS s3_agent_id,
    s3c.conn_id,
    q.question_id,
    q.key AS question_key,
    q.text AS question_text,
    q.result_type,
    i.result,
    i.created_at AS inference_created_at,
    i.updated_at AS inference_updated_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
  AND c.current_stage = 'COMPLETED'
  AND i.updated_at > :last_sync_time
ORDER BY i.updated_at ASC, c.contact_id, q.key;

Parameters:

  • :last_sync_time - Timestamp of last successful CRM sync

Use Case: Scheduled sync jobs that run every 15 minutes to push only new results to CRM.


Pivoted Query: One Row Per Contact

For easier CRM mapping, pivot results so each contact has one row with columns for each universal question:

WITH universal_questions AS (
    SELECT question_id, key
    FROM questions
    WHERE is_universal = TRUE
),
inference_results AS (
    SELECT
        c.contact_id,
        c.external_id,
        s3c.call_time,
        c.agent_id,
        q.key AS question_key,
        i.result
    FROM contacts c
    INNER JOIN inferences i ON c.contact_id = i.contact_id
    INNER JOIN questions q ON i.question_id = q.question_id
    LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
    WHERE q.is_universal = TRUE
      AND s3c.call_time >= :start_time
      AND s3c.call_time < :end_time
)
SELECT
    contact_id,
    external_id,
    call_time,
    agent_id,
    MAX(CASE WHEN question_key = 'agent_identified' THEN result END) AS agent_identified,
    MAX(CASE WHEN question_key = 'compliance_check' THEN result END) AS compliance_check,
    MAX(CASE WHEN question_key = 'quality_score' THEN result END) AS quality_score
    -- Add additional CASE statements for each universal question key
FROM inference_results
GROUP BY contact_id, external_id, call_time, agent_id
ORDER BY call_time DESC;

Note: Replace 'agent_identified', 'compliance_check', etc. with your actual universal question keys.


Integration Process

Step 1: Identify Universal Questions

Query the questions table to get all universal question keys:

SELECT question_id, key, text, result_type, group
FROM questions
WHERE is_universal = TRUE
ORDER BY "group", "order";

Use this to understand which fields you'll need to map to your CRM.


Step 2: Batch Extraction Strategy

Recommended Approach: Time-based batching with incremental sync

  1. Initial Sync: Fetch all historical data in daily or weekly batches

  2. Ongoing Sync: Run incremental sync every 15 minutes using updated_at filter

Example Batch Strategy:

from datetime import datetime, timedelta

def sync_universal_questions_to_crm(start_date, end_date, batch_days=1):
    """
    Sync universal question results to CRM in daily batches.

    Args:
        start_date: Start date for sync (datetime)
        end_date: End date for sync (datetime)
        batch_days: Number of days per batch (default: 1)
    """
    current_start = start_date

    while current_start < end_date:
        current_end = min(current_start + timedelta(days=batch_days), end_date)

        # Fetch batch
        results = fetch_universal_question_results(current_start, current_end)

        # Transform and write to CRM
        write_to_crm(results)

        # Move to next batch
        current_start = current_end

Step 3: CRM Field Mapping

Map Trusst inference results to CRM custom fields:

Trusst Field
CRM Field (Salesforce Example)
CRM Field (Dynamics Example)

contact_id

Trusst_Contact_ID__c

trusst_contact_id

external_id

External ID field (match key)

Match key field

call_time

Call_Date__c

calldate

start_time

Call_Start_Time__c

call_start_time

direction

Call_Direction__c (Picklist)

call_direction (Option Set)

contact_agent_id

Agent_ID__c

agentid

S3-based stream fields (GenesysOnPremise/PremierContactPoint)

s3_agent_id

S3_Agent_ID__c

s3_agent_id

conn_id

Connection_ID__c

connection_id

Universal question results

Question results

Custom fields per question

Custom fields per question

Field Notes:

  • direction: Create as Picklist/Option Set with values: "inbound", "outbound"

  • contact_agent_id: Always populated, represents the primary agent identifier

  • s3_agent_id and conn_id: Should always be populated for GenesysOnPremise/PremierContactPoint streams

Example: If you have a universal question with key "agent_identified":

  • Salesforce: Create custom field Agent_Identified__c (Checkbox)

  • Dynamics: Create custom field agent_identified (Two Options: Yes/No)


Step 4: Upsert to CRM

Use the CRM's bulk API to efficiently write data:

Salesforce Example (using simple-salesforce):

from simple_salesforce import Salesforce

sf = Salesforce(username='...', password='...', security_token='...')

# Prepare records for upsert
records = [
    {
        'Trusst_Contact_ID__c': row.contact_id,
        'Call_Date__c': row.call_time.isoformat() if row.call_time else None,
        'Call_Start_Time__c': row.start_time.isoformat() if row.start_time else None,
        'Call_Direction__c': row.direction,
        'Agent_ID__c': row.contact_agent_id,
        # S3-based stream fields (GenesysOnPremise/PremierContactPoint)
        'S3_Agent_ID__c': row.s3_agent_id,
        'Connection_ID__c': row.conn_id,
        # Universal question results
        'Agent_Identified__c': row.agent_identified == 'true',
        'Quality_Score__c': int(row.quality_score) if row.quality_score else None,
        # ... other universal question fields
    }
    for row in results
]

# Bulk upsert using external ID
sf.bulk.Contact.upsert(records, 'Trusst_Contact_ID__c', batch_size=200)

MuleSoft API Integration Example (for customers using MuleSoft as intermediary):

For organizations using MuleSoft as an integration layer between Trusst and Salesforce, you'll send data to a MuleSoft API endpoint instead of directly to Salesforce:

import requests
from typing import List, Dict

# MuleSoft API configuration
MULESOFT_API_URL = "https://your-org.us-e1.cloudhub.io/api/trusst-crm-sync"
MULESOFT_CLIENT_ID = "your_client_id"
MULESOFT_CLIENT_SECRET = "your_client_secret"

def get_mulesoft_access_token():
    """Obtain OAuth 2.0 access token from MuleSoft."""
    token_url = "https://your-org.anypoint.mulesoft.com/accounts/oauth2/token"

    payload = {
        "grant_type": "client_credentials",
        "client_id": MULESOFT_CLIENT_ID,
        "client_secret": MULESOFT_CLIENT_SECRET
    }

    response = requests.post(token_url, data=payload)
    response.raise_for_status()

    return response.json()["access_token"]

def sync_to_salesforce_via_mulesoft(results: List[Dict]):
    """Send universal question results to Salesforce via MuleSoft API."""

    # Get access token
    access_token = get_mulesoft_access_token()

    # Prepare headers
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "X-Client-Id": MULESOFT_CLIENT_ID
    }

    # Transform results to MuleSoft expected format
    payload = {
        "source": "trusst-platform",
        "sync_timestamp": datetime.utcnow().isoformat(),
        "contacts": []
    }

    # Group by contact_id
    contacts_map = {}
    for row in results:
        contact_id = row['contact_id']
        if contact_id not in contacts_map:
            contacts_map[contact_id] = {
                "trusst_contact_id": contact_id,
                "external_id": row['external_id'],
                "call_date": row['call_time'].isoformat() if row['call_time'] else None,
                "call_start_time": row['start_time'].isoformat() if row['start_time'] else None,
                "direction": row['direction'],
                "contact_agent_id": row['contact_agent_id'],
                # S3-based stream fields (GenesysOnPremise/PremierContactPoint)
                "s3_agent_id": row['s3_agent_id'],
                "conn_id": row['conn_id'],
                "universal_questions": []
            }

        # Add question result
        contacts_map[contact_id]["universal_questions"].append({
            "question_key": row['question_key'],
            "question_text": row['question_text'],
            "result_type": row['result_type'],
            "result": row['result']
        })

    payload["contacts"] = list(contacts_map.values())

    # Send to MuleSoft API
    response = requests.post(
        MULESOFT_API_URL,
        headers=headers,
        json=payload,
        timeout=30
    )

    response.raise_for_status()

    # Process response
    result = response.json()

    return {
        "success_count": result.get("records_processed", 0),
        "failed_count": result.get("records_failed", 0),
        "errors": result.get("errors", [])
    }

# Example usage
try:
    sync_result = sync_to_salesforce_via_mulesoft(results)
    print(f"Synced {sync_result['success_count']} records successfully")

    if sync_result['failed_count'] > 0:
        print(f"Failed to sync {sync_result['failed_count']} records")
        for error in sync_result['errors']:
            print(f"  - {error}")

except requests.exceptions.HTTPError as e:
    print(f"MuleSoft API error: {e.response.status_code} - {e.response.text}")
except Exception as e:
    print(f"Sync failed: {str(e)}")

MuleSoft Payload Structure:

The MuleSoft API expects a structured JSON payload with grouped contact data:

{
  "source": "trusst-platform",
  "sync_timestamp": "2025-01-15T14:30:00Z",
  "contacts": [
    {
      "trusst_contact_id": "c1a2b3c4-...",
      "external_id": "550e8400-...",
      "call_date": "2025-01-15T14:23:10Z",
      "agent_id": "agent_john",
      "universal_questions": [
        {
          "question_key": "agent_identified",
          "question_text": "Did the agent properly identify themselves?",
          "result_type": "Boolean",
          "result": "true"
        },
        {
          "question_key": "compliance_check",
          "question_text": "Were all compliance requirements met?",
          "result_type": "Boolean",
          "result": "true"
        }
      ]
    }
  ]
}

MuleSoft Integration Benefits:

  • Centralized Integration Logic: Business rules and transformations managed in MuleSoft

  • Error Handling: MuleSoft handles retries, logging, and error notifications

  • Multiple Target Systems: Single API can update Salesforce, data warehouses, and other systems

  • Validation: MuleSoft can validate and enrich data before writing to Salesforce

  • Audit Trail: Comprehensive logging of all integration activities

Microsoft Dynamics Example (using REST API):

import requests

dynamics_url = "https://yourorg.crm.dynamics.com/api/data/v9.2"
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json",
    "OData-MaxVersion": "4.0",
    "OData-Version": "4.0"
}

for row in results:
    payload = {
        "trusst_contact_id": row.contact_id,
        "calldate": row.call_time.isoformat() if row.call_time else None,
        "agent_identified": row.agent_identified == 'true',
        "quality_score": int(row.quality_score) if row.quality_score else None,
    }

    # Upsert using PATCH with Prefer: return=representation
    response = requests.patch(
        f"{dynamics_url}/contacts({row.external_id})",
        headers=headers,
        json=payload
    )

Best Practices

1. Use External IDs for Matching

  • Store the Trusst contact_id as a custom field in your CRM

  • Use CRM's native external ID or unique identifier for upsert operations

  • This ensures idempotent writes (re-running sync won't create duplicates)

2. Handle Result Type Conversion

Universal questions can have different result types:

Result Type
SQL Value
CRM Field Type
Conversion

Boolean

'true' / 'false'

Checkbox / Two Options

Parse string to boolean

Rating

'1' to '10'

Number

Parse string to integer

Paragraph

Long text

Text Area / Memo

Use as-is

Label

'Excellent', 'Good', etc.

Picklist / Option Set

Map to CRM picklist values

Example Conversion:

def convert_inference_result(result_type, result_value):
    """Convert Trusst inference result to CRM-friendly format."""
    if result_type == 'Boolean':
        return result_value.lower() == 'true'
    elif result_type == 'Rating':
        return int(result_value) if result_value else None
    elif result_type in ('Paragraph', 'Summary'):
        return result_value
    elif result_type == 'Label':
        # Map to CRM picklist value
        return result_value
    else:
        return result_value

3. Error Handling and Retry Logic

  • Implement exponential backoff for CRM API rate limits

  • Log failed records for manual review

  • Use database transactions to track sync state

Example:

import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def write_batch_to_crm(records):
    # CRM write logic here
    pass

4. Track Sync State (Optional)

Create a sync status table to track synchronization:

CREATE TABLE crm_sync_status (
    sync_id VARCHAR PRIMARY KEY,
    crm_system VARCHAR NOT NULL,
    last_sync_time TIMESTAMP NOT NULL,
    records_synced INTEGER,
    records_failed INTEGER,
    status VARCHAR NOT NULL,  -- 'success', 'partial', 'failed'
    error_message TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

5. Scheduled Sync Job

Run a scheduled job (cron, Airflow, etc.) for continuous synchronization:

# Example: 15-minute sync of universal questions to CRM
def scheduled_sync_job():
    """Sync universal questions updated in last 15 minutes."""
    # Get last successful sync time
    last_sync = get_last_successful_sync_time('salesforce')

    # Fetch updated inferences
    results = fetch_universal_question_results_since(last_sync)

    # Write to CRM with error handling
    try:
        write_to_crm(results)
        record_sync_success('salesforce', len(results))
    except Exception as e:
        record_sync_failure('salesforce', str(e))
        raise

Performance Considerations

Batch Size Recommendations

CRM System
Recommended Batch Size
Max API Calls/Day

Salesforce

200 records/batch

15,000 (varies by edition)

Microsoft Dynamics

100-1000 records/batch

Unlimited (rate limited)

Query Optimization

  1. Use Pagination: For large result sets, use LIMIT/OFFSET:

    SELECT ...
    FROM contacts c
    INNER JOIN inferences i ON c.contact_id = i.contact_id
    INNER JOIN questions q ON i.question_id = q.question_id
    LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
    WHERE q.is_universal = TRUE
    ORDER BY s3c.call_time DESC
    LIMIT 1000 OFFSET 0;

Complete Integration Example

Python Script: Sync Universal Questions to Salesforce

#!/usr/bin/env python3
"""
Sync universal question inference results from Trusst to Salesforce.

Usage:
    python sync_to_salesforce.py --start-date 2025-01-01 --end-date 2025-01-02
"""

import argparse
from datetime import datetime, timedelta
from typing import List, Dict
import logging
from sqlalchemy import create_engine, text
from simple_salesforce import Salesforce

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Database connection
DATABASE_URL = "postgresql://user:password@localhost/trusstai"
engine = create_engine(DATABASE_URL)

# Salesforce connection
sf = Salesforce(
    username='your_username',
    password='your_password',
    security_token='your_token'
)

def fetch_universal_question_results(start_time: datetime, end_time: datetime) -> List[Dict]:
    """Fetch universal question inference results for a time range."""
    query = text("""
        SELECT
            c.contact_id,
            c.external_id,
            s3c.call_time,
            c.agent_id,
            q.key AS question_key,
            q.result_type,
            i.result
        FROM contacts c
        INNER JOIN inferences i ON c.contact_id = i.contact_id
        INNER JOIN questions q ON i.question_id = q.question_id
        LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
        WHERE q.is_universal = TRUE
          AND s3c.call_time >= :start_time
          AND s3c.call_time < :end_time
        ORDER BY s3c.call_time DESC, c.contact_id, q.key
    """)

    with engine.connect() as conn:
        results = conn.execute(query, {
            'start_time': start_time,
            'end_time': end_time
        }).fetchall()

    return [dict(row._mapping) for row in results]

def transform_to_salesforce_format(results: List[Dict]) -> List[Dict]:
    """Transform Trusst results to Salesforce record format."""
    # Group by contact_id to create one record per contact
    contacts = {}

    for row in results:
        contact_id = row['contact_id']
        if contact_id not in contacts:
            contacts[contact_id] = {
                'Trusst_Contact_ID__c': contact_id,
                'External_ID__c': row['external_id'],
                'Call_Date__c': row['call_time'].isoformat() if row['call_time'] else None,
                'Agent_ID__c': row['agent_id']
            }

        # Add question result as custom field
        field_name = f"{row['question_key'].replace('_', ' ').title().replace(' ', '_')}__c"

        # Convert based on result type
        if row['result_type'] == 'Boolean':
            contacts[contact_id][field_name] = row['result'].lower() == 'true'
        elif row['result_type'] == 'Rating':
            contacts[contact_id][field_name] = int(row['result']) if row['result'] else None
        else:
            contacts[contact_id][field_name] = row['result']

    return list(contacts.values())

def sync_to_salesforce(records: List[Dict]):
    """Upsert records to Salesforce."""
    if not records:
        logger.info("No records to sync")
        return

    logger.info(f"Syncing {len(records)} records to Salesforce...")

    try:
        result = sf.bulk.Contact.upsert(records, 'Trusst_Contact_ID__c', batch_size=200)

        success_count = sum(1 for r in result if r['success'])
        logger.info(f"Successfully synced {success_count}/{len(records)} records")

        # Log failures
        failures = [r for r in result if not r['success']]
        if failures:
            logger.error(f"Failed to sync {len(failures)} records:")
            for fail in failures[:10]:  # Log first 10 failures
                logger.error(f"  - {fail['id']}: {fail['errors']}")

    except Exception as e:
        logger.error(f"Salesforce sync failed: {e}")
        raise

def main():
    parser = argparse.ArgumentParser(description='Sync universal questions to Salesforce')
    parser.add_argument('--start-date', required=True, help='Start date (YYYY-MM-DD)')
    parser.add_argument('--end-date', required=True, help='End date (YYYY-MM-DD)')
    parser.add_argument('--batch-days', type=int, default=1, help='Days per batch')

    args = parser.parse_args()

    start_date = datetime.strptime(args.start_date, '%Y-%m-%d')
    end_date = datetime.strptime(args.end_date, '%Y-%m-%d')

    current_start = start_date

    while current_start < end_date:
        current_end = min(current_start + timedelta(days=args.batch_days), end_date)

        logger.info(f"Processing batch: {current_start} to {current_end}")

        # Fetch results
        results = fetch_universal_question_results(current_start, current_end)

        # Transform to Salesforce format
        sf_records = transform_to_salesforce_format(results)

        # Sync to Salesforce
        sync_to_salesforce(sf_records)

        # Move to next batch
        current_start = current_end

    logger.info("Sync complete!")

if __name__ == '__main__':
    main()

Run Example:

# Sync last 7 days of data
python sync_to_salesforce.py --start-date 2025-01-15 --end-date 2025-01-22 --batch-days 1

Summary

This guide provides the SQL queries and integration process for extracting universal question inference results from the Trusst platform and synchronizing them with CRM systems.

Key Takeaways:

  1. Universal questions (identified by is_universal = TRUE) are evaluated on all contacts

  2. Use time-based batching for efficient extraction

  3. Implement incremental sync using updated_at for ongoing synchronization

  4. Map Trusst result types to appropriate CRM field types

  5. Use bulk APIs for efficient CRM writes

  6. Track sync state and implement error handling for production reliability

For questions or support, please refer to the Trusst platform documentation or contact the engineering team.

Last updated