Skip to main content

Overview

The Report Generation Service creates comprehensive AI-powered reports with real-time streaming delivery to the frontend. Primary Files:
  • scripts/report_generation_service.py
  • api/app/routers/reports.py

Report Types

Due Diligence

Comprehensive DD reports with all sections

Financial Analysis

Deep dive into financial metrics and trends

Market Analysis

Market size, competition, positioning

Executive Summary

High-level overview for executives

Investment Memo

Investment committee memorandum

Custom Reports

User-defined sections and structure

Architecture

Report Generation Pipeline

1. Request Received

2. Data Collection (Parallel)
   ├─> Company data
   ├─> Financial records
   ├─> Contact information
   ├─> Deal history
   ├─> Uploaded documents
   └─> External research

3. Agent Pool Assignment

4. Multi-Agent Processing
   ├─> Data Analysis Agent
   ├─> Financial Analysis Agent
   ├─> Market Research Agent
   └─> Report Writing Agent

5. Report Synthesis (Streaming)
   ├─> Stream section by section
   ├─> Real-time delivery to frontend
   └─> Progress updates

6. Storage & Delivery
   ├─> Save to database
   ├─> Generate PDF (optional)
   └─> Return download link

Streaming Implementation

from fastapi.responses import StreamingResponse
import asyncio

@router.post("/reports/generate")
async def generate_report(request: ReportRequest):
    """
    Generate report with streaming
    """
    async def generate():
        # Start event
        yield f"data: {json.dumps({'type': 'start', 'timestamp': datetime.now().isoformat()})}\n\n"

        # Collect data
        data = await collect_report_data(request.company_id)

        # Process sections
        for section in request.sections:
            # Section start
            yield f"data: {json.dumps({'type': 'section_start', 'section': section})}\n\n"

            # Generate section content
            async for chunk in generate_section(section, data):
                yield f"data: {json.dumps({'type': 'chunk', 'content': chunk})}\n\n"

            # Section complete
            yield f"data: {json.dumps({'type': 'section_complete', 'section': section})}\n\n"

        # Complete event
        yield f"data: {json.dumps({'type': 'complete', 'total_time': elapsed})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

Report Sections

Due Diligence Report

Sections:
  1. Executive Summary - High-level overview
  2. Company Overview - Business model, products, history
  3. Financial Analysis - Revenue, profitability, growth
  4. Market Analysis - TAM, competition, positioning
  5. Management Team - Key executives and experience
  6. Operations - Business processes, scalability
  7. Technology - Tech stack, IP, infrastructure
  8. Risk Assessment - Key risks and mitigation
  9. Valuation - Valuation multiples and comparisons
  10. Recommendations - Investment decision and terms

Financial Analysis Report

Sections:
  1. Revenue Analysis - Trends, growth, breakdown
  2. Profitability - EBITDA, margins, efficiency
  3. Cash Flow - Operating, investing, financing
  4. Balance Sheet - Assets, liabilities, equity
  5. Key Metrics - KPIs, unit economics
  6. Projections - Forward-looking estimates
  7. Benchmarking - Industry comparisons

Data Collection

Multi-Source Aggregation

async def collect_report_data(company_id: str) -> dict:
    """
    Collect all data needed for report generation
    """
    # Parallel data collection
    company, financials, contacts, deals, files = await asyncio.gather(
        fetch_company(company_id),
        fetch_financials(company_id),
        fetch_contacts(company_id),
        fetch_deals(company_id),
        fetch_files(company_id)
    )

    # External data (if needed)
    market_data = await fetch_market_intelligence(company.industry)

    return {
        "company": company,
        "financials": financials,
        "contacts": contacts,
        "deals": deals,
        "files": files,
        "market": market_data
    }

AI Integration

Claude for Report Writing

from anthropic import Anthropic

anthropic = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

async def generate_section(section: str, data: dict) -> AsyncGenerator:
    """
    Generate report section with streaming
    """
    prompt = f"""
    Generate a {section} section for a due diligence report.

    Company Data:
    {json.dumps(data, indent=2)}

    Requirements:
    - Professional tone
    - Data-driven insights
    - Actionable recommendations
    - 2-3 pages of content
    """

    stream = anthropic.messages.stream(
        model="claude-3-opus-20240229",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=4000
    )

    async with stream as s:
        async for text in s.text_stream:
            yield text

API Endpoints

Generate Report

@router.post("/reports/generate")
async def generate_report(
    request: ReportRequest,
    user: User = Depends(get_current_user)
):
    """
    Generate report with streaming
    """
    # Implementation shown above

List Reports

@router.get("/reports")
async def list_reports(
    company_id: Optional[str] = None,
    user: User = Depends(get_current_user)
):
    """
    List generated reports
    """
    query = supabase.table("reports") \
        .select("*") \
        .eq("firm_id", user.firm_id)

    if company_id:
        query = query.eq("company_id", company_id)

    reports = query.order("created_at", desc=True).execute()
    return reports.data

Export Report

@router.get("/reports/{report_id}/export")
async def export_report(
    report_id: str,
    format: str = "pdf",
    user: User = Depends(get_current_user)
):
    """
    Export report in requested format
    """
    report = get_report(report_id)

    if format == "pdf":
        pdf = generate_pdf(report.content)
        return FileResponse(pdf, media_type="application/pdf")
    elif format == "docx":
        docx = generate_docx(report.content)
        return FileResponse(docx, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document")
    elif format == "html":
        html = markdown_to_html(report.content)
        return HTMLResponse(content=html)
    else:
        return {"error": "Unsupported format"}

Performance

With Agent Pool

  • First report (cold firm): ~15-20 seconds
  • Subsequent reports (warm pool): ~10-12 seconds
  • Average chunk delivery: 50-100ms
  • Total sections: 8-10 sections per report

Optimization Techniques

  1. Parallel data collection: Fetch all data concurrently
  2. Agent pools: Pre-warmed agents eliminate cold start
  3. Streaming: Start delivering content immediately
  4. Caching: Cache company data during report generation
  5. Batch processing: Generate multiple reports in parallel

Configuration

# AI Models
REPORT_AI_MODEL=claude-3-opus-20240229
REPORT_FAST_MODEL=claude-3-haiku-20240307

# Generation settings
REPORT_MAX_TOKENS=4000
REPORT_TEMPERATURE=0.7
REPORT_STREAM_CHUNK_SIZE=100

# Export settings
REPORT_PDF_FONT=Helvetica
REPORT_PDF_PAGE_SIZE=Letter

Examples

Frontend Streaming

async function generateReport(companyId: string, reportType: string) {
  const response = await fetch('/api/reports/generate', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${token}`,
      'Content-Type': 'application/json',
      'Accept': 'text/event-stream'
    },
    body: JSON.stringify({
      company_id: companyId,
      report_type: reportType,
      sections: ['overview', 'financial', 'market', 'risks']
    })
  })

  const reader = response.body!.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    const chunk = decoder.decode(value)
    const lines = chunk.split('\n')

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6))

        if (data.type === 'chunk') {
          // Append to report display
          setReportContent(prev => prev + data.content)
        } else if (data.type === 'complete') {
          console.log(`Report completed in ${data.total_time}s`)
        }
      }
    }
  }
}

Next Steps