Skip to main content

Overview

The Agentic Chat Service, known as Zarna AI, provides a natural language interface to query and interact with CRM data using a multi-agent AI system with persistent agent pools. Zarna AI is the intelligent chatbot that understands your questions and retrieves relevant CRM data. Location: scripts/agentic_chat/

Architecture

Directory Structure

agentic_chat/
├── core/
│   ├── agent_pool.py         # Agent pool management (566 lines)
│   ├── orchestrator.py       # Multi-agent orchestration
│   ├── team.py               # Agent team creation
│   └── query_understanding.py

├── agents/
│   ├── manager.py            # Manager agent
│   ├── data_retrieval.py     # Data fetching agent
│   ├── analysis.py           # Data analysis agent
│   └── web_search.py         # External research agent

└── tools/
    ├── crm_tool.py           # CRM database access
    ├── calculation_tool.py   # Math and analytics
    └── search_tool.py        # Web search via Exa

Agent Types

Manager Agent

Role: Coordinates other agents and routes queries Capabilities:
  • Parse user queries
  • Determine which agents to invoke
  • Aggregate results from multiple agents
  • Format final response
Example:
User: "Show me all tech companies with revenue over $10M"

Manager thinks:
1. This needs data retrieval (CRM database)
2. Assign to Data Retrieval Agent
3. Wait for results
4. Format and return to user

Data Retrieval Agent

Role: Fetch data from CRM database Capabilities:
  • Translate natural language to SQL
  • Execute database queries
  • Return structured data
  • Handle complex joins
Example:
Manager request: "Get tech companies with revenue > $10M"

Agent generates SQL:
SELECT * FROM companies
WHERE industry = 'Technology'
AND revenue > 10000000
ORDER BY revenue DESC

Returns: List of matching companies

Analysis Agent

Role: Perform calculations and analytics Capabilities:
  • Calculate metrics (growth rates, averages, etc.)
  • Compare data points
  • Identify trends
  • Generate insights
Example:
Manager request: "Calculate average deal size for Q4"

Agent:
1. Gets deal data
2. Filters by Q4 2024
3. Calculates average
4. Returns: "$127,500 average deal size"

Web Search Agent

Role: Research using external sources Capabilities:
  • Search the web with Exa
  • Find company information
  • Research industries
  • Gather market intelligence
Example:
Manager request: "Find market size for healthcare SaaS"

Agent:
1. Searches with Exa
2. Aggregates findings
3. Returns: "$15B market, 23% CAGR"

Multi-Agent Orchestration

Sequential Execution

# Query requires multiple steps
"Find all tech companies, then calculate their average revenue"

Step 1: Data Retrieval Agent
  └─> Fetches tech companies

Step 2: Analysis Agent
  └─> Calculates average revenue

Step 3: Manager Agent
  └─> Formats final response

Parallel Execution

# Query can be parallelized
"Get company overview and market intelligence for Acme Corp"

Parallel:
├─> Data Retrieval Agent: Fetch company data
└─> Web Search Agent: Research market

Manager: Combine results

Agent Pool System

Why Agent Pools?

Problem: Creating agents has 5-11s overhead Solution: Maintain pool of warm, pre-initialized agents Benefits:
  • Zero cold start: Agents ready immediately
  • Consistent performance: Predictable response times
  • Better resource usage: Reuse connections
  • Scalability: Handle concurrent requests

Pool Management

from scripts.agentic_chat.core.agent_pool import AgentPoolManager

# Global singleton
pool_manager = AgentPoolManager()

# Get pool for firm
pool = await pool_manager.get_pool(firm_id)

# Execute query with warm agents
result = await pool.execute_query(query, chat_id)

Agent Pool Details

Deep dive into agent pool architecture

Tool System

Available Tools

# CRM Tool
crm_tool = {
    "name": "query_crm",
    "description": "Query the CRM database for companies, contacts, deals",
    "parameters": {
        "entity_type": "companies | contacts | deals",
        "filters": {}, "limit": 100
    }
}

# Calculation Tool
calc_tool = {
    "name": "calculate",
    "description": "Perform mathematical calculations and aggregations",
    "parameters": {
        "operation": "sum | avg | count | growth_rate",
        "data": []
    }
}

# Search Tool
search_tool = {
    "name": "web_search",
    "description": "Search the web for information using Exa",
    "parameters": {
        "query": str,
        "num_results": 10
    }
}

Tool Execution

# Agent uses tools
async def data_retrieval_agent(query: str):
    """
    Data retrieval agent with CRM tool
    """
    # Agent decides to use CRM tool
    result = await execute_tool(
        tool_name="query_crm",
        parameters={
            "entity_type": "companies",
            "filters": {"industry": "Technology"},
            "limit": 50
        }
    )

    # Process and return results
    return format_results(result)

Conversation Memory

Context Management

class ConversationMemory:
    """
    Maintain conversation history for context
    """

    def __init__(self, chat_id: str):
        self.chat_id = chat_id
        self.messages = []

    def add_message(self, role: str, content: str):
        """
        Add message to history
        """
        self.messages.append({
            "role": role,
            "content": content,
            "timestamp": datetime.now()
        })

    def get_context(self, max_messages: int = 10) -> list:
        """
        Get recent conversation context
        """
        return self.messages[-max_messages:]

Context-Aware Queries

Query 1: "Show me all tech companies"
Response: "Found 42 tech companies..."

Query 2: "Which ones have revenue over $10M?"
         (Refers back to tech companies from Query 1)
Response: "Of the 42 tech companies, 15 have revenue over $10M..."

Query 3: "Show me their deal pipelines"
         (Refers to the 15 companies from Query 2)
Response: "Here are the deal pipelines for those 15 companies..."

Error Handling

Graceful Degradation

try:
    # Try using agentic system
    result = await agentic_chat.query(user_query)
except AgentPoolError:
    # Fall back to direct CRM query
    result = await crm_agent.query(user_query)
except Exception as e:
    # Return helpful error message
    return "I encountered an error processing your query. Could you try rephrasing it?"

Query Clarification

# Ambiguous query handling
if query_is_ambiguous(query):
    return {
        "type": "clarification_needed",
        "message": "I found multiple interpretations:",
        "options": [
            "Show all companies (across all industries)",
            "Show companies in your most active industry",
            "Show recently created companies"
        ]
    }

Performance Metrics

Typical Query Times

Query TypeCold StartWarm Pool
Simple data retrieval12-15s3-4s
Complex analytics20-25s8-10s
Multi-step queries30-40s15-18s
With web search40-50s20-25s

Optimization Impact

  • Cold start elimination: 5-11s saved per query
  • Connection reuse: 1-2s saved on DB queries
  • Parallel execution: 3-5s saved on multi-step queries
  • Total improvement: 15-20s per query (20-25%)

Configuration

# AI Models
AGENTIC_MANAGER_MODEL=claude-3-opus-20240229
AGENTIC_AGENT_MODEL=claude-3-sonnet-20240229
AGENTIC_FAST_MODEL=claude-3-haiku-20240307

# Pool Settings
AGENT_POOL_MAX_INSTANCES=3
AGENT_POOL_CLEANUP_TIMEOUT_MIN=30
AGENT_POOL_MAX_TOTAL_POOLS=20

# Performance
AGENTIC_MAX_CONCURRENT_QUERIES=10
AGENTIC_QUERY_TIMEOUT_SEC=120

# Search
EXA_API_KEY=your-exa-key

Examples

Frontend Chat Interface

import { useState } from 'react'

function AgenticChat({ chatId }: Props) {
  const [messages, setMessages] = useState<Message[]>([])
  const [input, setInput] = useState('')
  const [isStreaming, setIsStreaming] = useState(false)

  const sendQuery = async () => {
    setIsStreaming(true)

    const response = await fetch('/agentic-chat/query', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${token}`,
        'Content-Type': 'application/json',
        'Accept': 'text/event-stream'
      },
      body: JSON.stringify({
        query: input,
        chat_id: chatId
      })
    })

    const reader = response.body!.getReader()
    const decoder = new TextDecoder()
    let currentContent = ''

    while (true) {
      const { done, value } = await reader.read()
      if (done) break

      const chunk = decoder.decode(value)
      // Process SSE events
      // Update UI in real-time
    }

    setIsStreaming(false)
  }

  return (
    <div>
      {messages.map(msg => <Message key={msg.id} {...msg} />)}
      <Input value={input} onChange={e => setInput(e.target.value)} />
      <Button onClick={sendQuery} disabled={isStreaming}>
        {isStreaming ? 'Thinking...' : 'Send'}
      </Button>
    </div>
  )
}

Next Steps