# CRM Integration: Universal Questions Inference Results

## CRM Integration: Universal Questions Inference Results (Direct Database Access)

### Overview

This guide describes how to extract inference results for universal questions from the Trusst platform database using **direct database access** for **GenesysOnPremise and PremierContactPoint (S3-based) streams** and integrate them with Customer Relationship Management (CRM) systems such as Salesforce and Microsoft Dynamics.

**Universal questions** are evaluation questions that apply to all contacts regardless of which criteria is deduced during inference. These questions typically represent:

* Core compliance requirements (e.g., "Did agent identify themselves?")
* Standard quality metrics across all conversation types
* Universal performance indicators

Since universal questions are evaluated on every contact, their inference results provide consistent, comparable metrics that can be synchronized to backend systems for reporting, analytics, and compliance tracking.

***

### ⚠️ Important Notices

#### Schema Stability Warning

**This documentation reflects the database schema as of November 2025.** Schema changes may occur in future Trusst platform updates. We recommend:

* Using the provided SQL queries as templates only
* Testing queries after platform upgrades before production use
* Implementing robust error handling for schema changes
* Contacting Trusst support before major platform upgrades
* Monitoring the Trusst platform release notes for schema changes

#### Alternative: API Endpoint (Recommended)

For production deployments requiring long-term stability and security, consider using the **REST API endpoint approach** instead of direct database access. See the separate document: `crm-integration-universal-questions-api.md` for the recommended API-based integration approach.

Direct database access is suitable for:

* Development and testing environments
* Custom analytics requiring complex joins
* One-time data migrations or backfills
* Organizations with dedicated database administrators

#### Supported Stream Types

This document covers **GenesysOnPremise and PremierContactPoint (S3-based) streams only**. For other stream types (BrightPattern, Genesys Cloud, ContactSpace), please use the API endpoint approach.

***

### Database Connection

#### Production Database Access

**Recommended Approach**: Use an **Aurora PostgreSQL read replica** to avoid impacting production performance.

#### Connection Details

* **Database Type**: PostgreSQL (Aurora)
* **Network Access**: Database is in a private VPC subnet. Requires VPN connection or VPC peering
* **Credentials**: Read-only database credentials are stored in AWS Secrets Manager
  * Secret Path: `trusst/{customer_name}/db-readonly-user`
  * Contains: `username`, `password`, `host`, `port`, `database`
* **SSL/TLS**: Required for all database connections

#### Retrieving Credentials from AWS Secrets Manager

```python
import boto3
import json

def get_database_credentials(customer_name: str):
    """Retrieve read-only database credentials from AWS Secrets Manager."""
    client = boto3.client('secretsmanager', region_name='ap-southeast-2')

    secret_name = f"trusst/{customer_name}/db-readonly-user"
    response = client.get_secret_value(SecretId=secret_name)

    credentials = json.loads(response['SecretString'])
    return credentials

# Usage
creds = get_database_credentials('your-organization')
DATABASE_URL = f"postgresql://{creds['username']}:{creds['password']}@{creds['host']}:{creds['port']}/{creds['database']}?sslmode=require"
```

#### Network Access Requirements

Contact Trusst Support to configure:

* VPN connection to customer VPC, OR
* VPC peering between customer AWS account and Trusst deployment VPC
* Security group rules to allow inbound PostgreSQL traffic (port 5432) from integration server

***

### Database Schema

#### Relevant Tables

**`questions` Table**

Stores question definitions including universal questions.

**Key Columns**:

* `question_id` (String, PK): Unique question identifier
* `key` (String): Question key (e.g., "agent\_identified", "compliance\_check")
* `text` (String): The question text shown to evaluators
* `result_type` (String): Type of answer - Boolean, Rating, Paragraph, Label, Summary
* `is_universal` (Boolean): **True for universal questions**
* `is_scored` (Boolean): Whether this question contributes to scoring
* `group` (String): Question group (e.g., Compliance, Quality)
* `created_at` (Timestamp): When the question was created

**`inferences` Table**

Stores inference results linking questions to contacts.

**Key Columns**:

* `inference_id` (String, PK): Unique inference identifier
* `contact_id` (String, FK): Foreign key to contacts table
* `question_id` (String, FK): Foreign key to questions table
* `result` (String): The inference result value
* `created_at` (Timestamp): When the inference was generated
* `updated_at` (Timestamp): Last update timestamp

**`contacts` Table**

Stores conversation contact records.

**Key Columns**:

* `contact_id` (String, PK): Unique contact identifier
* `stream_id` (String, FK): Foreign key to stream (data source)
* `external_id` (String): External system identifier (e.g., Genesys conversation ID)
* `start_time` (Timestamp): When the contact started
* `direction` (String): Call direction - "inbound" or "outbound"
* `current_stage` (String): Processing stage - e.g., "COMPLETED", "TRANSCRIPTION", "INFERENCE"
* `agent_id` (String): Agent who handled the contact
* `created_at` (Timestamp): Record creation timestamp

**`s3_conversations` Table**

Stores metadata for S3-sourced conversations (GenesysOnPremise, PremierContactPoint streams).

**Key Columns**:

* `conversation_id` (String, PK): Unique conversation identifier
* `contact_id` (String, FK): Foreign key to contacts table
* `stream_id` (String, FK): Foreign key to stream
* `conn_id` (String): Connection ID from source system (e.g., Genesys recording ID)
* `agent_id` (String): Agent identifier from source system
* `call_time` (Timestamp): When the call occurred
* `audio_s3_key` (String): S3 key for audio file
* `metadata_s3_key` (String): S3 key for metadata file

**Note**: The `s3_conversations` table contains additional agent and connection metadata specific to S3-sourced conversations (GenesysOnPremise, PremierContactPoint). This table is essential for the queries in this document.

***

### SQL Query: Fetch Universal Question Inference Results

#### Basic Query for Genesys On-Premise / PremierContactPoint

This query retrieves universal question inference results for GenesysOnPremise and PremierContactPoint (S3-based) stream types:

```sql
SELECT
    c.contact_id,
    c.external_id,
    s3c.call_time,
    c.start_time,
    c.direction,
    c.agent_id AS contact_agent_id,
    s3c.agent_id AS s3_agent_id,
    s3c.conn_id,
    q.question_id,
    q.key AS question_key,
    q.text AS question_text,
    q.result_type,
    q.group AS question_group,
    i.result,
    i.created_at AS inference_created_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
  AND c.current_stage = 'COMPLETED'
ORDER BY s3c.call_time DESC, c.contact_id, q.key;
```

**Field Notes**:

* `call_time`: Call time from s3\_conversations table (when the call occurred in source system)
* `contact_agent_id`: Agent ID from contacts table (always populated)
* `s3_agent_id`: Agent ID from s3\_conversations table (should always be populated for S3-based streams)
* `conn_id`: Connection/recording ID from GenesysOnPremise/PremierContactPoint source system
* `direction`: Call direction ("inbound" or "outbound")
* `start_time`: Contact start timestamp from contacts table
* **Filter**: Only includes contacts where `current_stage = 'COMPLETED'` (fully processed)

***

#### Sample Result

| contact\_id  | external\_id | call\_time          | start\_time         | direction | contact\_agent\_id | s3\_agent\_id | conn\_id | question\_key     | result |
| ------------ | ------------ | ------------------- | ------------------- | --------- | ------------------ | ------------- | -------- | ----------------- | ------ |
| c1a2b3c4-... | 550e8400-... | 2025-01-15 14:23:10 | 2025-01-15 14:23:05 | inbound   | agent\_john        | john.smith    | REC-001  | agent\_identified | true   |
| c1a2b3c4-... | 550e8400-... | 2025-01-15 14:23:10 | 2025-01-15 14:23:05 | inbound   | agent\_john        | john.smith    | REC-001  | compliance\_check | true   |
| c1a2b3c4-... | 550e8400-... | 2025-01-15 14:23:10 | 2025-01-15 14:23:05 | inbound   | agent\_john        | john.smith    | REC-001  | quality\_score    | 8      |
| d5e6f7g8-... | 6ba7b810-... | 2025-01-15 13:45:22 | 2025-01-15 13:45:18 | outbound  | agent\_sarah       | sarah.jones   | REC-002  | agent\_identified | false  |
| d5e6f7g8-... | 6ba7b810-... | 2025-01-15 13:45:22 | 2025-01-15 13:45:18 | outbound  | agent\_sarah       | sarah.jones   | REC-002  | compliance\_check | true   |
| d5e6f7g8-... | 6ba7b810-... | 2025-01-15 13:45:22 | 2025-01-15 13:45:18 | outbound  | agent\_sarah       | sarah.jones   | REC-002  | quality\_score    | 7      |

**Note**: Each contact has multiple rows (one per universal question). Only contacts with `current_stage = 'COMPLETED'` are included.

***

#### Time-Range Batching Query

For efficient processing, fetch results in time-based batches:

```sql
SELECT
    c.contact_id,
    c.external_id,
    s3c.call_time,
    c.start_time,
    c.direction,
    c.agent_id AS contact_agent_id,
    c.stream_id,
    s3c.agent_id AS s3_agent_id,
    s3c.conn_id,
    q.question_id,
    q.key AS question_key,
    q.text AS question_text,
    q.result_type,
    q.group AS question_group,
    i.result,
    i.created_at AS inference_created_at,
    i.updated_at AS inference_updated_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
  AND c.current_stage = 'COMPLETED'
  AND s3c.call_time >= :start_time
  AND s3c.call_time < :end_time
ORDER BY s3c.call_time DESC, c.contact_id, q.key;
```

**Parameters**:

* `:start_time` - Start of time range (e.g., `2025-01-01 00:00:00`)
* `:end_time` - End of time range (e.g., `2025-01-02 00:00:00`)

**Example Usage** (Python with SQLAlchemy):

```python
from datetime import datetime, timedelta
from sqlalchemy import text

# Fetch results for last 24 hours
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)

query = text("""
    SELECT
        c.contact_id,
        c.external_id,
        s3c.call_time,
        c.start_time,
        c.direction,
        c.agent_id AS contact_agent_id,
        s3c.agent_id AS s3_agent_id,
        s3c.conn_id,
        q.key AS question_key,
        q.result_type,
        i.result
    FROM contacts c
    INNER JOIN inferences i ON c.contact_id = i.contact_id
    INNER JOIN questions q ON i.question_id = q.question_id
    LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
    WHERE q.is_universal = TRUE
      AND c.current_stage = 'COMPLETED'
      AND s3c.call_time >= :start_time
      AND s3c.call_time < :end_time
    ORDER BY s3c.call_time DESC, c.contact_id, q.key
""")

results = session.execute(query, {
    'start_time': start_time,
    'end_time': end_time
}).fetchall()
```

***

#### Incremental Sync Query (Delta)

For ongoing synchronization, fetch only new or updated inferences since last sync:

```sql
SELECT
    c.contact_id,
    c.external_id,
    s3c.call_time,
    c.start_time,
    c.direction,
    c.agent_id AS contact_agent_id,
    s3c.agent_id AS s3_agent_id,
    s3c.conn_id,
    q.question_id,
    q.key AS question_key,
    q.text AS question_text,
    q.result_type,
    i.result,
    i.created_at AS inference_created_at,
    i.updated_at AS inference_updated_at
FROM contacts c
INNER JOIN inferences i ON c.contact_id = i.contact_id
INNER JOIN questions q ON i.question_id = q.question_id
LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
WHERE q.is_universal = TRUE
  AND c.current_stage = 'COMPLETED'
  AND i.updated_at > :last_sync_time
ORDER BY i.updated_at ASC, c.contact_id, q.key;
```

**Parameters**:

* `:last_sync_time` - Timestamp of last successful CRM sync

**Use Case**: Scheduled sync jobs that run every 15 minutes to push only new results to CRM.

***

#### Pivoted Query: One Row Per Contact

For easier CRM mapping, pivot results so each contact has one row with columns for each universal question:

```sql
WITH universal_questions AS (
    SELECT question_id, key
    FROM questions
    WHERE is_universal = TRUE
),
inference_results AS (
    SELECT
        c.contact_id,
        c.external_id,
        s3c.call_time,
        c.agent_id,
        q.key AS question_key,
        i.result
    FROM contacts c
    INNER JOIN inferences i ON c.contact_id = i.contact_id
    INNER JOIN questions q ON i.question_id = q.question_id
    LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
    WHERE q.is_universal = TRUE
      AND s3c.call_time >= :start_time
      AND s3c.call_time < :end_time
)
SELECT
    contact_id,
    external_id,
    call_time,
    agent_id,
    MAX(CASE WHEN question_key = 'agent_identified' THEN result END) AS agent_identified,
    MAX(CASE WHEN question_key = 'compliance_check' THEN result END) AS compliance_check,
    MAX(CASE WHEN question_key = 'quality_score' THEN result END) AS quality_score
    -- Add additional CASE statements for each universal question key
FROM inference_results
GROUP BY contact_id, external_id, call_time, agent_id
ORDER BY call_time DESC;
```

**Note**: Replace `'agent_identified'`, `'compliance_check'`, etc. with your actual universal question keys.

***

### Integration Process

#### Step 1: Identify Universal Questions

Query the questions table to get all universal question keys:

```sql
SELECT question_id, key, text, result_type, group
FROM questions
WHERE is_universal = TRUE
ORDER BY "group", "order";
```

Use this to understand which fields you'll need to map to your CRM.

***

#### Step 2: Batch Extraction Strategy

**Recommended Approach**: Time-based batching with incremental sync

1. **Initial Sync**: Fetch all historical data in daily or weekly batches
2. **Ongoing Sync**: Run incremental sync every 15 minutes using `updated_at` filter

**Example Batch Strategy**:

```python
from datetime import datetime, timedelta

def sync_universal_questions_to_crm(start_date, end_date, batch_days=1):
    """
    Sync universal question results to CRM in daily batches.

    Args:
        start_date: Start date for sync (datetime)
        end_date: End date for sync (datetime)
        batch_days: Number of days per batch (default: 1)
    """
    current_start = start_date

    while current_start < end_date:
        current_end = min(current_start + timedelta(days=batch_days), end_date)

        # Fetch batch
        results = fetch_universal_question_results(current_start, current_end)

        # Transform and write to CRM
        write_to_crm(results)

        # Move to next batch
        current_start = current_end
```

***

#### Step 3: CRM Field Mapping

Map Trusst inference results to CRM custom fields:

| Trusst Field                                                      | CRM Field (Salesforce Example) | CRM Field (Dynamics Example)  |
| ----------------------------------------------------------------- | ------------------------------ | ----------------------------- |
| `contact_id`                                                      | `Trusst_Contact_ID__c`         | `trusst_contact_id`           |
| `external_id`                                                     | External ID field (match key)  | Match key field               |
| `call_time`                                                       | `Call_Date__c`                 | `calldate`                    |
| `start_time`                                                      | `Call_Start_Time__c`           | `call_start_time`             |
| `direction`                                                       | `Call_Direction__c` (Picklist) | `call_direction` (Option Set) |
| `contact_agent_id`                                                | `Agent_ID__c`                  | `agentid`                     |
| **S3-based stream fields (GenesysOnPremise/PremierContactPoint)** |                                |                               |
| `s3_agent_id`                                                     | `S3_Agent_ID__c`               | `s3_agent_id`                 |
| `conn_id`                                                         | `Connection_ID__c`             | `connection_id`               |
| **Universal question results**                                    |                                |                               |
| Question results                                                  | Custom fields per question     | Custom fields per question    |

**Field Notes**:

* `direction`: Create as Picklist/Option Set with values: "inbound", "outbound"
* `contact_agent_id`: Always populated, represents the primary agent identifier
* `s3_agent_id` and `conn_id`: Should always be populated for GenesysOnPremise/PremierContactPoint streams

**Example**: If you have a universal question with key `"agent_identified"`:

* Salesforce: Create custom field `Agent_Identified__c` (Checkbox)
* Dynamics: Create custom field `agent_identified` (Two Options: Yes/No)

***

#### Step 4: Upsert to CRM

Use the CRM's bulk API to efficiently write data:

**Salesforce Example** (using `simple-salesforce`):

```python
from simple_salesforce import Salesforce

sf = Salesforce(username='...', password='...', security_token='...')

# Prepare records for upsert
records = [
    {
        'Trusst_Contact_ID__c': row.contact_id,
        'Call_Date__c': row.call_time.isoformat() if row.call_time else None,
        'Call_Start_Time__c': row.start_time.isoformat() if row.start_time else None,
        'Call_Direction__c': row.direction,
        'Agent_ID__c': row.contact_agent_id,
        # S3-based stream fields (GenesysOnPremise/PremierContactPoint)
        'S3_Agent_ID__c': row.s3_agent_id,
        'Connection_ID__c': row.conn_id,
        # Universal question results
        'Agent_Identified__c': row.agent_identified == 'true',
        'Quality_Score__c': int(row.quality_score) if row.quality_score else None,
        # ... other universal question fields
    }
    for row in results
]

# Bulk upsert using external ID
sf.bulk.Contact.upsert(records, 'Trusst_Contact_ID__c', batch_size=200)
```

**MuleSoft API Integration Example** (for customers using MuleSoft as intermediary):

For organizations using MuleSoft as an integration layer between Trusst and Salesforce, you'll send data to a MuleSoft API endpoint instead of directly to Salesforce:

```python
import requests
from typing import List, Dict

# MuleSoft API configuration
MULESOFT_API_URL = "https://your-org.us-e1.cloudhub.io/api/trusst-crm-sync"
MULESOFT_CLIENT_ID = "your_client_id"
MULESOFT_CLIENT_SECRET = "your_client_secret"

def get_mulesoft_access_token():
    """Obtain OAuth 2.0 access token from MuleSoft."""
    token_url = "https://your-org.anypoint.mulesoft.com/accounts/oauth2/token"

    payload = {
        "grant_type": "client_credentials",
        "client_id": MULESOFT_CLIENT_ID,
        "client_secret": MULESOFT_CLIENT_SECRET
    }

    response = requests.post(token_url, data=payload)
    response.raise_for_status()

    return response.json()["access_token"]

def sync_to_salesforce_via_mulesoft(results: List[Dict]):
    """Send universal question results to Salesforce via MuleSoft API."""

    # Get access token
    access_token = get_mulesoft_access_token()

    # Prepare headers
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "X-Client-Id": MULESOFT_CLIENT_ID
    }

    # Transform results to MuleSoft expected format
    payload = {
        "source": "trusst-platform",
        "sync_timestamp": datetime.utcnow().isoformat(),
        "contacts": []
    }

    # Group by contact_id
    contacts_map = {}
    for row in results:
        contact_id = row['contact_id']
        if contact_id not in contacts_map:
            contacts_map[contact_id] = {
                "trusst_contact_id": contact_id,
                "external_id": row['external_id'],
                "call_date": row['call_time'].isoformat() if row['call_time'] else None,
                "call_start_time": row['start_time'].isoformat() if row['start_time'] else None,
                "direction": row['direction'],
                "contact_agent_id": row['contact_agent_id'],
                # S3-based stream fields (GenesysOnPremise/PremierContactPoint)
                "s3_agent_id": row['s3_agent_id'],
                "conn_id": row['conn_id'],
                "universal_questions": []
            }

        # Add question result
        contacts_map[contact_id]["universal_questions"].append({
            "question_key": row['question_key'],
            "question_text": row['question_text'],
            "result_type": row['result_type'],
            "result": row['result']
        })

    payload["contacts"] = list(contacts_map.values())

    # Send to MuleSoft API
    response = requests.post(
        MULESOFT_API_URL,
        headers=headers,
        json=payload,
        timeout=30
    )

    response.raise_for_status()

    # Process response
    result = response.json()

    return {
        "success_count": result.get("records_processed", 0),
        "failed_count": result.get("records_failed", 0),
        "errors": result.get("errors", [])
    }

# Example usage
try:
    sync_result = sync_to_salesforce_via_mulesoft(results)
    print(f"Synced {sync_result['success_count']} records successfully")

    if sync_result['failed_count'] > 0:
        print(f"Failed to sync {sync_result['failed_count']} records")
        for error in sync_result['errors']:
            print(f"  - {error}")

except requests.exceptions.HTTPError as e:
    print(f"MuleSoft API error: {e.response.status_code} - {e.response.text}")
except Exception as e:
    print(f"Sync failed: {str(e)}")
```

**MuleSoft Payload Structure**:

The MuleSoft API expects a structured JSON payload with grouped contact data:

```json
{
  "source": "trusst-platform",
  "sync_timestamp": "2025-01-15T14:30:00Z",
  "contacts": [
    {
      "trusst_contact_id": "c1a2b3c4-...",
      "external_id": "550e8400-...",
      "call_date": "2025-01-15T14:23:10Z",
      "agent_id": "agent_john",
      "universal_questions": [
        {
          "question_key": "agent_identified",
          "question_text": "Did the agent properly identify themselves?",
          "result_type": "Boolean",
          "result": "true"
        },
        {
          "question_key": "compliance_check",
          "question_text": "Were all compliance requirements met?",
          "result_type": "Boolean",
          "result": "true"
        }
      ]
    }
  ]
}
```

**MuleSoft Integration Benefits**:

* **Centralized Integration Logic**: Business rules and transformations managed in MuleSoft
* **Error Handling**: MuleSoft handles retries, logging, and error notifications
* **Multiple Target Systems**: Single API can update Salesforce, data warehouses, and other systems
* **Validation**: MuleSoft can validate and enrich data before writing to Salesforce
* **Audit Trail**: Comprehensive logging of all integration activities

**Microsoft Dynamics Example** (using REST API):

```python
import requests

dynamics_url = "https://yourorg.crm.dynamics.com/api/data/v9.2"
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json",
    "OData-MaxVersion": "4.0",
    "OData-Version": "4.0"
}

for row in results:
    payload = {
        "trusst_contact_id": row.contact_id,
        "calldate": row.call_time.isoformat() if row.call_time else None,
        "agent_identified": row.agent_identified == 'true',
        "quality_score": int(row.quality_score) if row.quality_score else None,
    }

    # Upsert using PATCH with Prefer: return=representation
    response = requests.patch(
        f"{dynamics_url}/contacts({row.external_id})",
        headers=headers,
        json=payload
    )
```

***

### Best Practices

#### 1. Use External IDs for Matching

* Store the Trusst `contact_id` as a custom field in your CRM
* Use CRM's native external ID or unique identifier for upsert operations
* This ensures idempotent writes (re-running sync won't create duplicates)

#### 2. Handle Result Type Conversion

Universal questions can have different result types:

| Result Type | SQL Value                     | CRM Field Type         | Conversion                 |
| ----------- | ----------------------------- | ---------------------- | -------------------------- |
| Boolean     | `'true'` / `'false'`          | Checkbox / Two Options | Parse string to boolean    |
| Rating      | `'1'` to `'10'`               | Number                 | Parse string to integer    |
| Paragraph   | Long text                     | Text Area / Memo       | Use as-is                  |
| Label       | `'Excellent'`, `'Good'`, etc. | Picklist / Option Set  | Map to CRM picklist values |

**Example Conversion**:

```python
def convert_inference_result(result_type, result_value):
    """Convert Trusst inference result to CRM-friendly format."""
    if result_type == 'Boolean':
        return result_value.lower() == 'true'
    elif result_type == 'Rating':
        return int(result_value) if result_value else None
    elif result_type in ('Paragraph', 'Summary'):
        return result_value
    elif result_type == 'Label':
        # Map to CRM picklist value
        return result_value
    else:
        return result_value
```

#### 3. Error Handling and Retry Logic

* Implement exponential backoff for CRM API rate limits
* Log failed records for manual review
* Use database transactions to track sync state

**Example**:

```python
import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def write_batch_to_crm(records):
    # CRM write logic here
    pass
```

#### 4. Track Sync State (Optional)

Create a sync status table to track synchronization:

```sql
CREATE TABLE crm_sync_status (
    sync_id VARCHAR PRIMARY KEY,
    crm_system VARCHAR NOT NULL,
    last_sync_time TIMESTAMP NOT NULL,
    records_synced INTEGER,
    records_failed INTEGER,
    status VARCHAR NOT NULL,  -- 'success', 'partial', 'failed'
    error_message TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);
```

#### 5. Scheduled Sync Job

Run a scheduled job (cron, Airflow, etc.) for continuous synchronization:

```python
# Example: 15-minute sync of universal questions to CRM
def scheduled_sync_job():
    """Sync universal questions updated in last 15 minutes."""
    # Get last successful sync time
    last_sync = get_last_successful_sync_time('salesforce')

    # Fetch updated inferences
    results = fetch_universal_question_results_since(last_sync)

    # Write to CRM with error handling
    try:
        write_to_crm(results)
        record_sync_success('salesforce', len(results))
    except Exception as e:
        record_sync_failure('salesforce', str(e))
        raise
```

***

### Performance Considerations

#### Batch Size Recommendations

| CRM System         | Recommended Batch Size | Max API Calls/Day          |
| ------------------ | ---------------------- | -------------------------- |
| Salesforce         | 200 records/batch      | 15,000 (varies by edition) |
| Microsoft Dynamics | 100-1000 records/batch | Unlimited (rate limited)   |

#### Query Optimization

1. **Use Pagination**: For large result sets, use LIMIT/OFFSET:

   ```sql
   SELECT ...
   FROM contacts c
   INNER JOIN inferences i ON c.contact_id = i.contact_id
   INNER JOIN questions q ON i.question_id = q.question_id
   LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
   WHERE q.is_universal = TRUE
   ORDER BY s3c.call_time DESC
   LIMIT 1000 OFFSET 0;
   ```

***

### Complete Integration Example

#### Python Script: Sync Universal Questions to Salesforce

```python
#!/usr/bin/env python3
"""
Sync universal question inference results from Trusst to Salesforce.

Usage:
    python sync_to_salesforce.py --start-date 2025-01-01 --end-date 2025-01-02
"""

import argparse
from datetime import datetime, timedelta
from typing import List, Dict
import logging
from sqlalchemy import create_engine, text
from simple_salesforce import Salesforce

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Database connection
DATABASE_URL = "postgresql://user:password@localhost/trusstai"
engine = create_engine(DATABASE_URL)

# Salesforce connection
sf = Salesforce(
    username='your_username',
    password='your_password',
    security_token='your_token'
)

def fetch_universal_question_results(start_time: datetime, end_time: datetime) -> List[Dict]:
    """Fetch universal question inference results for a time range."""
    query = text("""
        SELECT
            c.contact_id,
            c.external_id,
            s3c.call_time,
            c.agent_id,
            q.key AS question_key,
            q.result_type,
            i.result
        FROM contacts c
        INNER JOIN inferences i ON c.contact_id = i.contact_id
        INNER JOIN questions q ON i.question_id = q.question_id
        LEFT JOIN s3_conversations s3c ON c.contact_id = s3c.contact_id
        WHERE q.is_universal = TRUE
          AND s3c.call_time >= :start_time
          AND s3c.call_time < :end_time
        ORDER BY s3c.call_time DESC, c.contact_id, q.key
    """)

    with engine.connect() as conn:
        results = conn.execute(query, {
            'start_time': start_time,
            'end_time': end_time
        }).fetchall()

    return [dict(row._mapping) for row in results]

def transform_to_salesforce_format(results: List[Dict]) -> List[Dict]:
    """Transform Trusst results to Salesforce record format."""
    # Group by contact_id to create one record per contact
    contacts = {}

    for row in results:
        contact_id = row['contact_id']
        if contact_id not in contacts:
            contacts[contact_id] = {
                'Trusst_Contact_ID__c': contact_id,
                'External_ID__c': row['external_id'],
                'Call_Date__c': row['call_time'].isoformat() if row['call_time'] else None,
                'Agent_ID__c': row['agent_id']
            }

        # Add question result as custom field
        field_name = f"{row['question_key'].replace('_', ' ').title().replace(' ', '_')}__c"

        # Convert based on result type
        if row['result_type'] == 'Boolean':
            contacts[contact_id][field_name] = row['result'].lower() == 'true'
        elif row['result_type'] == 'Rating':
            contacts[contact_id][field_name] = int(row['result']) if row['result'] else None
        else:
            contacts[contact_id][field_name] = row['result']

    return list(contacts.values())

def sync_to_salesforce(records: List[Dict]):
    """Upsert records to Salesforce."""
    if not records:
        logger.info("No records to sync")
        return

    logger.info(f"Syncing {len(records)} records to Salesforce...")

    try:
        result = sf.bulk.Contact.upsert(records, 'Trusst_Contact_ID__c', batch_size=200)

        success_count = sum(1 for r in result if r['success'])
        logger.info(f"Successfully synced {success_count}/{len(records)} records")

        # Log failures
        failures = [r for r in result if not r['success']]
        if failures:
            logger.error(f"Failed to sync {len(failures)} records:")
            for fail in failures[:10]:  # Log first 10 failures
                logger.error(f"  - {fail['id']}: {fail['errors']}")

    except Exception as e:
        logger.error(f"Salesforce sync failed: {e}")
        raise

def main():
    parser = argparse.ArgumentParser(description='Sync universal questions to Salesforce')
    parser.add_argument('--start-date', required=True, help='Start date (YYYY-MM-DD)')
    parser.add_argument('--end-date', required=True, help='End date (YYYY-MM-DD)')
    parser.add_argument('--batch-days', type=int, default=1, help='Days per batch')

    args = parser.parse_args()

    start_date = datetime.strptime(args.start_date, '%Y-%m-%d')
    end_date = datetime.strptime(args.end_date, '%Y-%m-%d')

    current_start = start_date

    while current_start < end_date:
        current_end = min(current_start + timedelta(days=args.batch_days), end_date)

        logger.info(f"Processing batch: {current_start} to {current_end}")

        # Fetch results
        results = fetch_universal_question_results(current_start, current_end)

        # Transform to Salesforce format
        sf_records = transform_to_salesforce_format(results)

        # Sync to Salesforce
        sync_to_salesforce(sf_records)

        # Move to next batch
        current_start = current_end

    logger.info("Sync complete!")

if __name__ == '__main__':
    main()
```

**Run Example**:

```bash
# Sync last 7 days of data
python sync_to_salesforce.py --start-date 2025-01-15 --end-date 2025-01-22 --batch-days 1
```

***

### Summary

This guide provides the SQL queries and integration process for extracting universal question inference results from the Trusst platform and synchronizing them with CRM systems.

**Key Takeaways**:

1. Universal questions (identified by `is_universal = TRUE`) are evaluated on all contacts
2. Use time-based batching for efficient extraction
3. Implement incremental sync using `updated_at` for ongoing synchronization
4. Map Trusst result types to appropriate CRM field types
5. Use bulk APIs for efficient CRM writes
6. Track sync state and implement error handling for production reliability

For questions or support, please refer to the Trusst platform documentation or contact the engineering team.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.trusst.ai/product-guides/crm-integration-universal-questions-inference-results.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
