Skip to main content

Overview

The Zarna backend uses JWT (JSON Web Tokens) for authentication with middleware that protects all API routes. Middleware Location: api/app/middleware/JWTAuthMiddleware

JWT Authentication Middleware

Implementation

from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from jose import jwt, JWTError
import os

class JWTAuthMiddleware(BaseHTTPMiddleware):
    """
    Middleware to validate JWT tokens on all requests
    """

    # Public routes that don't require authentication
    PUBLIC_ROUTES = [
        "/",
        "/health",
        "/docs",
        "/redoc",
        "/openapi.json",
        "/auth/login",
        "/auth/register"
    ]

    async def dispatch(self, request: Request, call_next):
        """
        Intercept all requests and validate JWT
        """
        # Skip authentication for public routes
        if request.url.path in self.PUBLIC_ROUTES:
            return await call_next(request)

        # Extract token from Authorization header
        auth_header = request.headers.get("Authorization")

        if not auth_header or not auth_header.startswith("Bearer "):
            return JSONResponse(
                status_code=401,
                content={"detail": "Missing or invalid authorization header"}
            )

        token = auth_header.split(" ")[1]

        try:
            # Decode and validate token
            payload = jwt.decode(
                token,
                os.getenv("SUPABASE_JWT_SECRET"),
                algorithms=["HS256"]
            )

            # Attach user info to request state
            request.state.user_id = payload.get("sub")
            request.state.firm_id = payload.get("firm_id")
            request.state.user_email = payload.get("email")
            request.state.user_role = payload.get("role")

        except JWTError as e:
            return JSONResponse(
                status_code=401,
                content={"detail": "Invalid or expired token"}
            )

        # Continue to route handler
        response = await call_next(request)
        return response

Middleware Registration

# api/app/main.py
from app.middleware import JWTAuthMiddleware

app = FastAPI()

# Add JWT middleware
app.add_middleware(JWTAuthMiddleware)

# All routes after this are protected

Using Authentication in Routes

Accessing User Info

from fastapi import Request

@router.get("/companies")
async def get_companies(request: Request):
    """
    Get companies for authenticated user
    """
    # Access user info from request.state (set by middleware)
    user_id = request.state.user_id
    firm_id = request.state.firm_id
    user_email = request.state.user_email
    user_role = request.state.user_role

    # Use firm_id to filter data
    companies = supabase.table("companies") \
        .select("*") \
        .eq("firm_id", firm_id) \
        .execute()

    return companies.data

Dependency Injection (Alternative)

from fastapi import Depends
from app.auth import get_current_user

@router.get("/companies")
async def get_companies(current_user: User = Depends(get_current_user)):
    """
    Get companies using dependency injection
    """
    companies = supabase.table("companies") \
        .select("*") \
        .eq("firm_id", current_user.firm_id) \
        .execute()

    return companies.data

Token Generation

Login Endpoint

from datetime import datetime, timedelta
from jose import jwt

@router.post("/auth/login")
async def login(credentials: LoginCredentials):
    """
    Authenticate user and return JWT token
    """
    # Validate credentials with Supabase
    auth_response = supabase.auth.sign_in_with_password({
        "email": credentials.email,
        "password": credentials.password
    })

    user = auth_response.user

    # Get user's firm_id
    user_data = supabase.table("users") \
        .select("firm_id, role") \
        .eq("id", user.id) \
        .single() \
        .execute()

    # Create JWT payload
    payload = {
        "sub": user.id,  # Subject (user ID)
        "email": user.email,
        "firm_id": user_data.data["firm_id"],
        "role": user_data.data["role"],
        "iat": datetime.utcnow(),  # Issued at
        "exp": datetime.utcnow() + timedelta(hours=24)  # Expires in 24h
    }

    # Generate token
    token = jwt.encode(
        payload,
        os.getenv("SUPABASE_JWT_SECRET"),
        algorithm="HS256"
    )

    return {
        "access_token": token,
        "token_type": "bearer",
        "expires_in": 86400,  # 24 hours in seconds
        "user": {
            "id": user.id,
            "email": user.email,
            "firm_id": user_data.data["firm_id"],
            "role": user_data.data["role"]
        }
    }

Token Refresh

@router.post("/auth/refresh")
async def refresh_token(request: Request):
    """
    Refresh an expired access token
    """
    # Get current token from header
    auth_header = request.headers.get("Authorization")
    if not auth_header:
        raise HTTPException(status_code=401, detail="Missing token")

    old_token = auth_header.split(" ")[1]

    try:
        # Decode without verification (to get payload even if expired)
        payload = jwt.decode(
            old_token,
            options={"verify_signature": False}
        )

        # Verify user still exists and is active
        user = supabase.table("users") \
            .select("*") \
            .eq("id", payload["sub"]) \
            .eq("status", "active") \
            .single() \
            .execute()

        # Generate new token
        new_payload = {
            "sub": user.data["id"],
            "email": user.data["email"],
            "firm_id": user.data["firm_id"],
            "role": user.data["role"],
            "iat": datetime.utcnow(),
            "exp": datetime.utcnow() + timedelta(hours=24)
        }

        new_token = jwt.encode(
            new_payload,
            os.getenv("SUPABASE_JWT_SECRET"),
            algorithm="HS256"
        )

        return {
            "access_token": new_token,
            "token_type": "bearer",
            "expires_in": 86400
        }

    except Exception as e:
        raise HTTPException(status_code=401, detail="Failed to refresh token")

Security Best Practices

# Generate strong secret
openssl rand -base64 64

# Add to .env
SUPABASE_JWT_SECRET=your-very-long-random-string
  • Access tokens: 24 hours (balances security and UX)
  • Refresh tokens: 7 days
  • Session tokens: 30 days for “remember me”
# Check required claims exist
if not payload.get("sub"):
    raise HTTPException(status_code=401, detail="Invalid token structure")

# Verify expiration
if datetime.fromtimestamp(payload["exp"]) < datetime.now():
    raise HTTPException(status_code=401, detail="Token expired")
Tokens are sensitive and must be transmitted over HTTPS only.
Issue new token on each refresh and invalidate old ones.

Error Handling

Custom Exception Handlers

from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    """
    Custom handler for HTTP exceptions
    """
    if exc.status_code in [401, 403]:
        # Log auth errors simply
        print(f"Auth error: {exc.status_code} - {exc.detail}")
    else:
        # Log other errors with more detail
        print(f"HTTP {exc.status_code}: {exc.detail}")

    return JSONResponse(
        status_code=exc.status_code,
        content={"detail": exc.detail}
    )

Configuration

Environment Variables

# JWT Configuration
SUPABASE_JWT_SECRET=your-jwt-secret-here
TOKEN_EXPIRATION_HOURS=24

# Supabase Auth
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_KEY=your-service-role-key

Middleware Order

# Middleware is applied in reverse order
app.add_middleware(CORSMiddleware)  # Applied last (outer)
app.add_middleware(JWTAuthMiddleware)  # Applied first (inner)

# Request flow:
# 1. CORS checks origin
# 2. JWT validates token
# 3. Route handler executes

Testing

Test with cURL

# Get token
TOKEN=$(curl -X POST \
  -H "Content-Type: application/json" \
  -d '{"email":"user@example.com","password":"password"}' \
  http://localhost:8000/auth/login | jq -r '.access_token')

# Use token
curl -H "Authorization: Bearer $TOKEN" \
  http://localhost:8000/api/companies

Test with Python

import requests

# Login
response = requests.post(
    'http://localhost:8000/auth/login',
    json={
        'email': 'user@example.com',
        'password': 'password'
    }
)
token = response.json()['access_token']

# Use token
companies = requests.get(
    'http://localhost:8000/api/companies',
    headers={'Authorization': f'Bearer {token}'}
).json()

Troubleshooting

Causes:
  • Token expired (> 24 hours)
  • Invalid token signature
  • Missing Authorization header
  • Incorrect token format
Solution:
  • Check token expiration
  • Verify SUPABASE_JWT_SECRET matches
  • Ensure header format: Authorization: Bearer {token}
  • Try refreshing token
Cause: Origin not allowedSolution:
# Add origin to CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True
)

Next Steps