Documentation Index Fetch the complete documentation index at: https://zarna.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Report Generation Service creates comprehensive AI-powered reports with real-time streaming delivery to the frontend.
Primary Files :
scripts/report_generation_service.py
api/app/routers/reports.py
Report Types
Due Diligence Comprehensive DD reports with all sections
Financial Analysis Deep dive into financial metrics and trends
Market Analysis Market size, competition, positioning
Executive Summary High-level overview for executives
Investment Memo Investment committee memorandum
Custom Reports User-defined sections and structure
Architecture
Report Generation Pipeline
1. Request Received
↓
2. Data Collection (Parallel)
├─> Company data
├─> Financial records
├─> Contact information
├─> Deal history
├─> Uploaded documents
└─> External research
↓
3. Agent Pool Assignment
↓
4. Multi-Agent Processing
├─> Data Analysis Agent
├─> Financial Analysis Agent
├─> Market Research Agent
└─> Report Writing Agent
↓
5. Report Synthesis (Streaming)
├─> Stream section by section
├─> Real-time delivery to frontend
└─> Progress updates
↓
6. Storage & Delivery
├─> Save to database
├─> Generate PDF (optional)
└─> Return download link
Streaming Implementation
from fastapi.responses import StreamingResponse
import asyncio
@router.post ( "/reports/generate" )
async def generate_report ( request : ReportRequest):
"""
Generate report with streaming
"""
async def generate ():
# Start event
yield f "data: { json.dumps({ 'type' : 'start' , 'timestamp' : datetime.now().isoformat()}) } \n\n "
# Collect data
data = await collect_report_data(request.company_id)
# Process sections
for section in request.sections:
# Section start
yield f "data: { json.dumps({ 'type' : 'section_start' , 'section' : section}) } \n\n "
# Generate section content
async for chunk in generate_section(section, data):
yield f "data: { json.dumps({ 'type' : 'chunk' , 'content' : chunk}) } \n\n "
# Section complete
yield f "data: { json.dumps({ 'type' : 'section_complete' , 'section' : section}) } \n\n "
# Complete event
yield f "data: { json.dumps({ 'type' : 'complete' , 'total_time' : elapsed}) } \n\n "
return StreamingResponse(
generate(),
media_type = "text/event-stream" ,
headers = {
"Cache-Control" : "no-cache" ,
"Connection" : "keep-alive"
}
)
Report Sections
Due Diligence Report
Sections :
Executive Summary - High-level overview
Company Overview - Business model, products, history
Financial Analysis - Revenue, profitability, growth
Market Analysis - TAM, competition, positioning
Management Team - Key executives and experience
Operations - Business processes, scalability
Technology - Tech stack, IP, infrastructure
Risk Assessment - Key risks and mitigation
Valuation - Valuation multiples and comparisons
Recommendations - Investment decision and terms
Financial Analysis Report
Sections :
Revenue Analysis - Trends, growth, breakdown
Profitability - EBITDA, margins, efficiency
Cash Flow - Operating, investing, financing
Balance Sheet - Assets, liabilities, equity
Key Metrics - KPIs, unit economics
Projections - Forward-looking estimates
Benchmarking - Industry comparisons
Data Collection
Multi-Source Aggregation
async def collect_report_data ( company_id : str ) -> dict :
"""
Collect all data needed for report generation
"""
# Parallel data collection
company, financials, contacts, deals, files = await asyncio.gather(
fetch_company(company_id),
fetch_financials(company_id),
fetch_contacts(company_id),
fetch_deals(company_id),
fetch_files(company_id)
)
# External data (if needed)
market_data = await fetch_market_intelligence(company.industry)
return {
"company" : company,
"financials" : financials,
"contacts" : contacts,
"deals" : deals,
"files" : files,
"market" : market_data
}
AI Integration
Claude for Report Writing
from anthropic import Anthropic
anthropic = Anthropic( api_key = os.getenv( "ANTHROPIC_API_KEY" ))
async def generate_section ( section : str , data : dict ) -> AsyncGenerator:
"""
Generate report section with streaming
"""
prompt = f """
Generate a { section } section for a due diligence report.
Company Data:
{ json.dumps(data, indent = 2 ) }
Requirements:
- Professional tone
- Data-driven insights
- Actionable recommendations
- 2-3 pages of content
"""
stream = anthropic.messages.stream(
model = "claude-3-opus-20240229" ,
messages = [{ "role" : "user" , "content" : prompt}],
max_tokens = 4000
)
async with stream as s:
async for text in s.text_stream:
yield text
API Endpoints
Generate Report
@router.post ( "/reports/generate" )
async def generate_report (
request : ReportRequest,
user : User = Depends(get_current_user)
):
"""
Generate report with streaming
"""
# Implementation shown above
List Reports
@router.get ( "/reports" )
async def list_reports (
company_id : Optional[ str ] = None ,
user : User = Depends(get_current_user)
):
"""
List generated reports
"""
query = supabase.table( "reports" ) \
.select( "*" ) \
.eq( "firm_id" , user.firm_id)
if company_id:
query = query.eq( "company_id" , company_id)
reports = query.order( "created_at" , desc = True ).execute()
return reports.data
Export Report
@router.get ( "/reports/ {report_id} /export" )
async def export_report (
report_id : str ,
format : str = "pdf" ,
user : User = Depends(get_current_user)
):
"""
Export report in requested format
"""
report = get_report(report_id)
if format == "pdf" :
pdf = generate_pdf(report.content)
return FileResponse(pdf, media_type = "application/pdf" )
elif format == "docx" :
docx = generate_docx(report.content)
return FileResponse(docx, media_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" )
elif format == "html" :
html = markdown_to_html(report.content)
return HTMLResponse( content = html)
else :
return { "error" : "Unsupported format" }
With Agent Pool
First report (cold firm) : ~15-20 seconds
Subsequent reports (warm pool) : ~10-12 seconds
Average chunk delivery : 50-100ms
Total sections : 8-10 sections per report
Optimization Techniques
Parallel data collection : Fetch all data concurrently
Agent pools : Pre-warmed agents eliminate cold start
Streaming : Start delivering content immediately
Caching : Cache company data during report generation
Batch processing : Generate multiple reports in parallel
Configuration
# AI Models
REPORT_AI_MODEL = claude-3-opus-20240229
REPORT_FAST_MODEL = claude-3-haiku-20240307
# Generation settings
REPORT_MAX_TOKENS = 4000
REPORT_TEMPERATURE = 0.7
REPORT_STREAM_CHUNK_SIZE = 100
# Export settings
REPORT_PDF_FONT = Helvetica
REPORT_PDF_PAGE_SIZE = Letter
Examples
Frontend Streaming
async function generateReport ( companyId : string , reportType : string ) {
const response = await fetch ( '/api/reports/generate' , {
method: 'POST' ,
headers: {
'Authorization' : `Bearer ${ token } ` ,
'Content-Type' : 'application/json' ,
'Accept' : 'text/event-stream'
},
body: JSON . stringify ({
company_id: companyId ,
report_type: reportType ,
sections: [ 'overview' , 'financial' , 'market' , 'risks' ]
})
})
const reader = response . body ! . getReader ()
const decoder = new TextDecoder ()
while ( true ) {
const { done , value } = await reader . read ()
if ( done ) break
const chunk = decoder . decode ( value )
const lines = chunk . split ( ' \n ' )
for ( const line of lines ) {
if ( line . startsWith ( 'data: ' )) {
const data = JSON . parse ( line . slice ( 6 ))
if ( data . type === 'chunk' ) {
// Append to report display
setReportContent ( prev => prev + data . content )
} else if ( data . type === 'complete' ) {
console . log ( `Report completed in ${ data . total_time } s` )
}
}
}
}
}
Next Steps
Reports API API endpoint documentation
Agentic System Multi-agent architecture
Agent Pool Performance optimization
CRM Agent CRM data operations