🔌CRM Integration: Universal Questions Inference Results
This guide describes how to extract evaluatio results for universal questions from the Trusst platform database and integrate them with backend systems such as Salesforce or Microsoft Dynamics.
CRM Integration: Universal Questions Inference Results (Direct Database Access)
Overview
This guide describes how to extract inference results for universal questions from the Trusst platform database using direct database access for GenesysOnPremise and PremierContactPoint (S3-based) streams and integrate them with Customer Relationship Management (CRM) systems such as Salesforce and Microsoft Dynamics.
Universal questions are evaluation questions that apply to all contacts regardless of which criteria is deduced during inference. These questions typically represent:
Core compliance requirements (e.g., "Did agent identify themselves?")
Standard quality metrics across all conversation types
Universal performance indicators
Since universal questions are evaluated on every contact, their inference results provide consistent, comparable metrics that can be synchronized to backend systems for reporting, analytics, and compliance tracking.
⚠️ Important Notices
Schema Stability Warning
This documentation reflects the database schema as of November 2025. Schema changes may occur in future Trusst platform updates. We recommend:
Using the provided SQL queries as templates only
Testing queries after platform upgrades before production use
Implementing robust error handling for schema changes
Contacting Trusst support before major platform upgrades
Monitoring the Trusst platform release notes for schema changes
Alternative: API Endpoint (Recommended)
For production deployments requiring long-term stability and security, consider using the REST API endpoint approach instead of direct database access. See the separate document: crm-integration-universal-questions-api.md for the recommended API-based integration approach.
Direct database access is suitable for:
Development and testing environments
Custom analytics requiring complex joins
One-time data migrations or backfills
Organizations with dedicated database administrators
Supported Stream Types
This document covers GenesysOnPremise and PremierContactPoint (S3-based) streams only. For other stream types (BrightPattern, Genesys Cloud, ContactSpace), please use the API endpoint approach.
Database Connection
Production Database Access
Recommended Approach: Use an Aurora PostgreSQL read replica to avoid impacting production performance.
Connection Details
Database Type: PostgreSQL (Aurora)
Network Access: Database is in a private VPC subnet. Requires VPN connection or VPC peering
Credentials: Read-only database credentials are stored in AWS Secrets Manager
Secret Path:
trusst/{customer_name}/db-readonly-userContains:
username,password,host,port,database
SSL/TLS: Required for all database connections
Retrieving Credentials from AWS Secrets Manager
import boto3
import json
def get_database_credentials(customer_name: str):
"""Retrieve read-only database credentials from AWS Secrets Manager."""
client = boto3.client('secretsmanager', region_name='ap-southeast-2')
secret_name = f"trusst/{customer_name}/db-readonly-user"
response = client.get_secret_value(SecretId=secret_name)
credentials = json.loads(response['SecretString'])
return credentials
# Usage
creds = get_database_credentials('your-organization')
DATABASE_URL = f"postgresql://{creds['username']}:{creds['password']}@{creds['host']}:{creds['port']}/{creds['database']}?sslmode=require"Network Access Requirements
Contact Trusst Support to configure:
VPN connection to customer VPC, OR
VPC peering between customer AWS account and Trusst deployment VPC
Security group rules to allow inbound PostgreSQL traffic (port 5432) from integration server
Database Schema
Relevant Tables
questions Table
Stores question definitions including universal questions.
Key Columns:
question_id(String, PK): Unique question identifierkey(String): Question key (e.g., "agent_identified", "compliance_check")text(String): The question text shown to evaluatorsresult_type(String): Type of answer - Boolean, Rating, Paragraph, Label, Summaryis_universal(Boolean): True for universal questionsis_scored(Boolean): Whether this question contributes to scoringgroup(String): Question group (e.g., Compliance, Quality)created_at(Timestamp): When the question was created
inferences Table
Stores inference results linking questions to contacts.
Key Columns:
inference_id(String, PK): Unique inference identifiercontact_id(String, FK): Foreign key to contacts tablequestion_id(String, FK): Foreign key to questions tableresult(String): The inference result valuecreated_at(Timestamp): When the inference was generatedupdated_at(Timestamp): Last update timestamp
contacts Table
Stores conversation contact records.
Key Columns:
contact_id(String, PK): Unique contact identifierstream_id(String, FK): Foreign key to stream (data source)external_id(String): External system identifier (e.g., Genesys conversation ID)start_time(Timestamp): When the contact starteddirection(String): Call direction - "inbound" or "outbound"current_stage(String): Processing stage - e.g., "COMPLETED", "TRANSCRIPTION", "INFERENCE"agent_id(String): Agent who handled the contactcreated_at(Timestamp): Record creation timestamp
s3_conversations Table
Stores metadata for S3-sourced conversations (GenesysOnPremise, PremierContactPoint streams).
Key Columns:
conversation_id(String, PK): Unique conversation identifiercontact_id(String, FK): Foreign key to contacts tablestream_id(String, FK): Foreign key to streamconn_id(String): Connection ID from source system (e.g., Genesys recording ID)agent_id(String): Agent identifier from source systemcall_time(Timestamp): When the call occurredaudio_s3_key(String): S3 key for audio filemetadata_s3_key(String): S3 key for metadata file
Note: The s3_conversations table contains additional agent and connection metadata specific to S3-sourced conversations (GenesysOnPremise, PremierContactPoint). This table is essential for the queries in this document.
SQL Query: Fetch Universal Question Inference Results
Basic Query for Genesys On-Premise / PremierContactPoint
This query retrieves universal question inference results for GenesysOnPremise and PremierContactPoint (S3-based) stream types:
SELECT
c.contact_id,
c.external_id,
s3c.call_time,
c.start_time,
c.direction,
c.agent_id AS contact_agent_id,
s3c.agent_id AS s3_agent_id,
s3c.conn_id,
q.question_id,
q.key AS question_key,
q.text AS question_text,
q.result_type,
q.group AS question_group,
i.result,
i.created_at AS inference_created_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
AND c.current_stage = 'COMPLETED'
ORDER BY s3c.call_time DESC, c.contact_id, q.key;Field Notes:
call_time: Call time from s3_conversations table (when the call occurred in source system)contact_agent_id: Agent ID from contacts table (always populated)s3_agent_id: Agent ID from s3_conversations table (should always be populated for S3-based streams)conn_id: Connection/recording ID from GenesysOnPremise/PremierContactPoint source systemdirection: Call direction ("inbound" or "outbound")start_time: Contact start timestamp from contacts tableFilter: Only includes contacts where
current_stage = 'COMPLETED'(fully processed)
Sample Result
c1a2b3c4-...
550e8400-...
2025-01-15 14:23:10
2025-01-15 14:23:05
inbound
agent_john
john.smith
REC-001
agent_identified
true
c1a2b3c4-...
550e8400-...
2025-01-15 14:23:10
2025-01-15 14:23:05
inbound
agent_john
john.smith
REC-001
compliance_check
true
c1a2b3c4-...
550e8400-...
2025-01-15 14:23:10
2025-01-15 14:23:05
inbound
agent_john
john.smith
REC-001
quality_score
8
d5e6f7g8-...
6ba7b810-...
2025-01-15 13:45:22
2025-01-15 13:45:18
outbound
agent_sarah
sarah.jones
REC-002
agent_identified
false
d5e6f7g8-...
6ba7b810-...
2025-01-15 13:45:22
2025-01-15 13:45:18
outbound
agent_sarah
sarah.jones
REC-002
compliance_check
true
d5e6f7g8-...
6ba7b810-...
2025-01-15 13:45:22
2025-01-15 13:45:18
outbound
agent_sarah
sarah.jones
REC-002
quality_score
7
Note: Each contact has multiple rows (one per universal question). Only contacts with current_stage = 'COMPLETED' are included.
Time-Range Batching Query
For efficient processing, fetch results in time-based batches:
SELECT
c.contact_id,
c.external_id,
s3c.call_time,
c.start_time,
c.direction,
c.agent_id AS contact_agent_id,
c.stream_id,
s3c.agent_id AS s3_agent_id,
s3c.conn_id,
q.question_id,
q.key AS question_key,
q.text AS question_text,
q.result_type,
q.group AS question_group,
i.result,
i.created_at AS inference_created_at,
i.updated_at AS inference_updated_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
AND c.current_stage = 'COMPLETED'
AND s3c.call_time >= :start_time
AND s3c.call_time < :end_time
ORDER BY s3c.call_time DESC, c.contact_id, q.key;Parameters:
:start_time- Start of time range (e.g.,2025-01-01 00:00:00):end_time- End of time range (e.g.,2025-01-02 00:00:00)
Example Usage (Python with SQLAlchemy):
from datetime import datetime, timedelta
from sqlalchemy import text
# Fetch results for last 24 hours
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
query = text("""
SELECT
c.contact_id,
c.external_id,
s3c.call_time,
c.start_time,
c.direction,
c.agent_id AS contact_agent_id,
s3c.agent_id AS s3_agent_id,
s3c.conn_id,
q.key AS question_key,
q.result_type,
i.result
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
AND c.current_stage = 'COMPLETED'
AND s3c.call_time >= :start_time
AND s3c.call_time < :end_time
ORDER BY s3c.call_time DESC, c.contact_id, q.key
""")
results = session.execute(query, {
'start_time': start_time,
'end_time': end_time
}).fetchall()Incremental Sync Query (Delta)
For ongoing synchronization, fetch only new or updated inferences since last sync:
SELECT
c.contact_id,
c.external_id,
s3c.call_time,
c.start_time,
c.direction,
c.agent_id AS contact_agent_id,
s3c.agent_id AS s3_agent_id,
s3c.conn_id,
q.question_id,
q.key AS question_key,
q.text AS question_text,
q.result_type,
i.result,
i.created_at AS inference_created_at,
i.updated_at AS inference_updated_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
AND c.current_stage = 'COMPLETED'
AND i.updated_at > :last_sync_time
ORDER BY i.updated_at ASC, c.contact_id, q.key;Parameters:
:last_sync_time- Timestamp of last successful CRM sync
Use Case: Scheduled sync jobs that run every 15 minutes to push only new results to CRM.
Pivoted Query: One Row Per Contact
For easier CRM mapping, pivot results so each contact has one row with columns for each universal question:
WITH universal_questions AS (
SELECT question_id, key
FROM questions
WHERE is_universal = TRUE
),
inference_results AS (
SELECT
c.contact_id,
c.external_id,
s3c.call_time,
c.agent_id,
q.key AS question_key,
i.result
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
AND s3c.call_time >= :start_time
AND s3c.call_time < :end_time
)
SELECT
contact_id,
external_id,
call_time,
agent_id,
MAX(CASE WHEN question_key = 'agent_identified' THEN result END) AS agent_identified,
MAX(CASE WHEN question_key = 'compliance_check' THEN result END) AS compliance_check,
MAX(CASE WHEN question_key = 'quality_score' THEN result END) AS quality_score
-- Add additional CASE statements for each universal question key
FROM inference_results
GROUP BY contact_id, external_id, call_time, agent_id
ORDER BY call_time DESC;Note: Replace 'agent_identified', 'compliance_check', etc. with your actual universal question keys.
Integration Process
Step 1: Identify Universal Questions
Query the questions table to get all universal question keys:
SELECT question_id, key, text, result_type, group
FROM questions
WHERE is_universal = TRUE
ORDER BY "group", "order";Use this to understand which fields you'll need to map to your CRM.
Step 2: Batch Extraction Strategy
Recommended Approach: Time-based batching with incremental sync
Initial Sync: Fetch all historical data in daily or weekly batches
Ongoing Sync: Run incremental sync every 15 minutes using
updated_atfilter
Example Batch Strategy:
from datetime import datetime, timedelta
def sync_universal_questions_to_crm(start_date, end_date, batch_days=1):
"""
Sync universal question results to CRM in daily batches.
Args:
start_date: Start date for sync (datetime)
end_date: End date for sync (datetime)
batch_days: Number of days per batch (default: 1)
"""
current_start = start_date
while current_start < end_date:
current_end = min(current_start + timedelta(days=batch_days), end_date)
# Fetch batch
results = fetch_universal_question_results(current_start, current_end)
# Transform and write to CRM
write_to_crm(results)
# Move to next batch
current_start = current_endStep 3: CRM Field Mapping
Map Trusst inference results to CRM custom fields:
contact_id
Trusst_Contact_ID__c
trusst_contact_id
external_id
External ID field (match key)
Match key field
call_time
Call_Date__c
calldate
start_time
Call_Start_Time__c
call_start_time
direction
Call_Direction__c (Picklist)
call_direction (Option Set)
contact_agent_id
Agent_ID__c
agentid
S3-based stream fields (GenesysOnPremise/PremierContactPoint)
s3_agent_id
S3_Agent_ID__c
s3_agent_id
conn_id
Connection_ID__c
connection_id
Universal question results
Question results
Custom fields per question
Custom fields per question
Field Notes:
direction: Create as Picklist/Option Set with values: "inbound", "outbound"contact_agent_id: Always populated, represents the primary agent identifiers3_agent_idandconn_id: Should always be populated for GenesysOnPremise/PremierContactPoint streams
Example: If you have a universal question with key "agent_identified":
Salesforce: Create custom field
Agent_Identified__c(Checkbox)Dynamics: Create custom field
agent_identified(Two Options: Yes/No)
Step 4: Upsert to CRM
Use the CRM's bulk API to efficiently write data:
Salesforce Example (using simple-salesforce):
from simple_salesforce import Salesforce
sf = Salesforce(username='...', password='...', security_token='...')
# Prepare records for upsert
records = [
{
'Trusst_Contact_ID__c': row.contact_id,
'Call_Date__c': row.call_time.isoformat() if row.call_time else None,
'Call_Start_Time__c': row.start_time.isoformat() if row.start_time else None,
'Call_Direction__c': row.direction,
'Agent_ID__c': row.contact_agent_id,
# S3-based stream fields (GenesysOnPremise/PremierContactPoint)
'S3_Agent_ID__c': row.s3_agent_id,
'Connection_ID__c': row.conn_id,
# Universal question results
'Agent_Identified__c': row.agent_identified == 'true',
'Quality_Score__c': int(row.quality_score) if row.quality_score else None,
# ... other universal question fields
}
for row in results
]
# Bulk upsert using external ID
sf.bulk.Contact.upsert(records, 'Trusst_Contact_ID__c', batch_size=200)MuleSoft API Integration Example (for customers using MuleSoft as intermediary):
For organizations using MuleSoft as an integration layer between Trusst and Salesforce, you'll send data to a MuleSoft API endpoint instead of directly to Salesforce:
import requests
from typing import List, Dict
# MuleSoft API configuration
MULESOFT_API_URL = "https://your-org.us-e1.cloudhub.io/api/trusst-crm-sync"
MULESOFT_CLIENT_ID = "your_client_id"
MULESOFT_CLIENT_SECRET = "your_client_secret"
def get_mulesoft_access_token():
"""Obtain OAuth 2.0 access token from MuleSoft."""
token_url = "https://your-org.anypoint.mulesoft.com/accounts/oauth2/token"
payload = {
"grant_type": "client_credentials",
"client_id": MULESOFT_CLIENT_ID,
"client_secret": MULESOFT_CLIENT_SECRET
}
response = requests.post(token_url, data=payload)
response.raise_for_status()
return response.json()["access_token"]
def sync_to_salesforce_via_mulesoft(results: List[Dict]):
"""Send universal question results to Salesforce via MuleSoft API."""
# Get access token
access_token = get_mulesoft_access_token()
# Prepare headers
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"X-Client-Id": MULESOFT_CLIENT_ID
}
# Transform results to MuleSoft expected format
payload = {
"source": "trusst-platform",
"sync_timestamp": datetime.utcnow().isoformat(),
"contacts": []
}
# Group by contact_id
contacts_map = {}
for row in results:
contact_id = row['contact_id']
if contact_id not in contacts_map:
contacts_map[contact_id] = {
"trusst_contact_id": contact_id,
"external_id": row['external_id'],
"call_date": row['call_time'].isoformat() if row['call_time'] else None,
"call_start_time": row['start_time'].isoformat() if row['start_time'] else None,
"direction": row['direction'],
"contact_agent_id": row['contact_agent_id'],
# S3-based stream fields (GenesysOnPremise/PremierContactPoint)
"s3_agent_id": row['s3_agent_id'],
"conn_id": row['conn_id'],
"universal_questions": []
}
# Add question result
contacts_map[contact_id]["universal_questions"].append({
"question_key": row['question_key'],
"question_text": row['question_text'],
"result_type": row['result_type'],
"result": row['result']
})
payload["contacts"] = list(contacts_map.values())
# Send to MuleSoft API
response = requests.post(
MULESOFT_API_URL,
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
# Process response
result = response.json()
return {
"success_count": result.get("records_processed", 0),
"failed_count": result.get("records_failed", 0),
"errors": result.get("errors", [])
}
# Example usage
try:
sync_result = sync_to_salesforce_via_mulesoft(results)
print(f"Synced {sync_result['success_count']} records successfully")
if sync_result['failed_count'] > 0:
print(f"Failed to sync {sync_result['failed_count']} records")
for error in sync_result['errors']:
print(f" - {error}")
except requests.exceptions.HTTPError as e:
print(f"MuleSoft API error: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"Sync failed: {str(e)}")MuleSoft Payload Structure:
The MuleSoft API expects a structured JSON payload with grouped contact data:
{
"source": "trusst-platform",
"sync_timestamp": "2025-01-15T14:30:00Z",
"contacts": [
{
"trusst_contact_id": "c1a2b3c4-...",
"external_id": "550e8400-...",
"call_date": "2025-01-15T14:23:10Z",
"agent_id": "agent_john",
"universal_questions": [
{
"question_key": "agent_identified",
"question_text": "Did the agent properly identify themselves?",
"result_type": "Boolean",
"result": "true"
},
{
"question_key": "compliance_check",
"question_text": "Were all compliance requirements met?",
"result_type": "Boolean",
"result": "true"
}
]
}
]
}MuleSoft Integration Benefits:
Centralized Integration Logic: Business rules and transformations managed in MuleSoft
Error Handling: MuleSoft handles retries, logging, and error notifications
Multiple Target Systems: Single API can update Salesforce, data warehouses, and other systems
Validation: MuleSoft can validate and enrich data before writing to Salesforce
Audit Trail: Comprehensive logging of all integration activities
Microsoft Dynamics Example (using REST API):
import requests
dynamics_url = "https://yourorg.crm.dynamics.com/api/data/v9.2"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"OData-MaxVersion": "4.0",
"OData-Version": "4.0"
}
for row in results:
payload = {
"trusst_contact_id": row.contact_id,
"calldate": row.call_time.isoformat() if row.call_time else None,
"agent_identified": row.agent_identified == 'true',
"quality_score": int(row.quality_score) if row.quality_score else None,
}
# Upsert using PATCH with Prefer: return=representation
response = requests.patch(
f"{dynamics_url}/contacts({row.external_id})",
headers=headers,
json=payload
)Best Practices
1. Use External IDs for Matching
Store the Trusst
contact_idas a custom field in your CRMUse CRM's native external ID or unique identifier for upsert operations
This ensures idempotent writes (re-running sync won't create duplicates)
2. Handle Result Type Conversion
Universal questions can have different result types:
Boolean
'true' / 'false'
Checkbox / Two Options
Parse string to boolean
Rating
'1' to '10'
Number
Parse string to integer
Paragraph
Long text
Text Area / Memo
Use as-is
Label
'Excellent', 'Good', etc.
Picklist / Option Set
Map to CRM picklist values
Example Conversion:
def convert_inference_result(result_type, result_value):
"""Convert Trusst inference result to CRM-friendly format."""
if result_type == 'Boolean':
return result_value.lower() == 'true'
elif result_type == 'Rating':
return int(result_value) if result_value else None
elif result_type in ('Paragraph', 'Summary'):
return result_value
elif result_type == 'Label':
# Map to CRM picklist value
return result_value
else:
return result_value3. Error Handling and Retry Logic
Implement exponential backoff for CRM API rate limits
Log failed records for manual review
Use database transactions to track sync state
Example:
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def write_batch_to_crm(records):
# CRM write logic here
pass4. Track Sync State (Optional)
Create a sync status table to track synchronization:
CREATE TABLE crm_sync_status (
sync_id VARCHAR PRIMARY KEY,
crm_system VARCHAR NOT NULL,
last_sync_time TIMESTAMP NOT NULL,
records_synced INTEGER,
records_failed INTEGER,
status VARCHAR NOT NULL, -- 'success', 'partial', 'failed'
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW()
);5. Scheduled Sync Job
Run a scheduled job (cron, Airflow, etc.) for continuous synchronization:
# Example: 15-minute sync of universal questions to CRM
def scheduled_sync_job():
"""Sync universal questions updated in last 15 minutes."""
# Get last successful sync time
last_sync = get_last_successful_sync_time('salesforce')
# Fetch updated inferences
results = fetch_universal_question_results_since(last_sync)
# Write to CRM with error handling
try:
write_to_crm(results)
record_sync_success('salesforce', len(results))
except Exception as e:
record_sync_failure('salesforce', str(e))
raisePerformance Considerations
Batch Size Recommendations
Salesforce
200 records/batch
15,000 (varies by edition)
Microsoft Dynamics
100-1000 records/batch
Unlimited (rate limited)
Query Optimization
Use Pagination: For large result sets, use LIMIT/OFFSET:
SELECT ... FROM contacts c INNER JOIN inferences i ON c.contact_id = i.contact_id INNER JOIN questions q ON i.question_id = q.question_id LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id WHERE q.is_universal = TRUE ORDER BY s3c.call_time DESC LIMIT 1000 OFFSET 0;
Complete Integration Example
Python Script: Sync Universal Questions to Salesforce
#!/usr/bin/env python3
"""
Sync universal question inference results from Trusst to Salesforce.
Usage:
python sync_to_salesforce.py --start-date 2025-01-01 --end-date 2025-01-02
"""
import argparse
from datetime import datetime, timedelta
from typing import List, Dict
import logging
from sqlalchemy import create_engine, text
from simple_salesforce import Salesforce
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database connection
DATABASE_URL = "postgresql://user:password@localhost/trusstai"
engine = create_engine(DATABASE_URL)
# Salesforce connection
sf = Salesforce(
username='your_username',
password='your_password',
security_token='your_token'
)
def fetch_universal_question_results(start_time: datetime, end_time: datetime) -> List[Dict]:
"""Fetch universal question inference results for a time range."""
query = text("""
SELECT
c.contact_id,
c.external_id,
s3c.call_time,
c.agent_id,
q.key AS question_key,
q.result_type,
i.result
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
AND s3c.call_time >= :start_time
AND s3c.call_time < :end_time
ORDER BY s3c.call_time DESC, c.contact_id, q.key
""")
with engine.connect() as conn:
results = conn.execute(query, {
'start_time': start_time,
'end_time': end_time
}).fetchall()
return [dict(row._mapping) for row in results]
def transform_to_salesforce_format(results: List[Dict]) -> List[Dict]:
"""Transform Trusst results to Salesforce record format."""
# Group by contact_id to create one record per contact
contacts = {}
for row in results:
contact_id = row['contact_id']
if contact_id not in contacts:
contacts[contact_id] = {
'Trusst_Contact_ID__c': contact_id,
'External_ID__c': row['external_id'],
'Call_Date__c': row['call_time'].isoformat() if row['call_time'] else None,
'Agent_ID__c': row['agent_id']
}
# Add question result as custom field
field_name = f"{row['question_key'].replace('_', ' ').title().replace(' ', '_')}__c"
# Convert based on result type
if row['result_type'] == 'Boolean':
contacts[contact_id][field_name] = row['result'].lower() == 'true'
elif row['result_type'] == 'Rating':
contacts[contact_id][field_name] = int(row['result']) if row['result'] else None
else:
contacts[contact_id][field_name] = row['result']
return list(contacts.values())
def sync_to_salesforce(records: List[Dict]):
"""Upsert records to Salesforce."""
if not records:
logger.info("No records to sync")
return
logger.info(f"Syncing {len(records)} records to Salesforce...")
try:
result = sf.bulk.Contact.upsert(records, 'Trusst_Contact_ID__c', batch_size=200)
success_count = sum(1 for r in result if r['success'])
logger.info(f"Successfully synced {success_count}/{len(records)} records")
# Log failures
failures = [r for r in result if not r['success']]
if failures:
logger.error(f"Failed to sync {len(failures)} records:")
for fail in failures[:10]: # Log first 10 failures
logger.error(f" - {fail['id']}: {fail['errors']}")
except Exception as e:
logger.error(f"Salesforce sync failed: {e}")
raise
def main():
parser = argparse.ArgumentParser(description='Sync universal questions to Salesforce')
parser.add_argument('--start-date', required=True, help='Start date (YYYY-MM-DD)')
parser.add_argument('--end-date', required=True, help='End date (YYYY-MM-DD)')
parser.add_argument('--batch-days', type=int, default=1, help='Days per batch')
args = parser.parse_args()
start_date = datetime.strptime(args.start_date, '%Y-%m-%d')
end_date = datetime.strptime(args.end_date, '%Y-%m-%d')
current_start = start_date
while current_start < end_date:
current_end = min(current_start + timedelta(days=args.batch_days), end_date)
logger.info(f"Processing batch: {current_start} to {current_end}")
# Fetch results
results = fetch_universal_question_results(current_start, current_end)
# Transform to Salesforce format
sf_records = transform_to_salesforce_format(results)
# Sync to Salesforce
sync_to_salesforce(sf_records)
# Move to next batch
current_start = current_end
logger.info("Sync complete!")
if __name__ == '__main__':
main()Run Example:
# Sync last 7 days of data
python sync_to_salesforce.py --start-date 2025-01-15 --end-date 2025-01-22 --batch-days 1Summary
This guide provides the SQL queries and integration process for extracting universal question inference results from the Trusst platform and synchronizing them with CRM systems.
Key Takeaways:
Universal questions (identified by
is_universal = TRUE) are evaluated on all contactsUse time-based batching for efficient extraction
Implement incremental sync using
updated_atfor ongoing synchronizationMap Trusst result types to appropriate CRM field types
Use bulk APIs for efficient CRM writes
Track sync state and implement error handling for production reliability
For questions or support, please refer to the Trusst platform documentation or contact the engineering team.
Last updated