Overview
The Zarna backend uses JWT (JSON Web Tokens) for authentication with middleware that protects all API routes. Middleware Location:api/app/middleware/JWTAuthMiddleware
JWT Authentication Middleware
Implementation
Copy
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from jose import jwt, JWTError
import os
class JWTAuthMiddleware(BaseHTTPMiddleware):
"""
Middleware to validate JWT tokens on all requests
"""
# Public routes that don't require authentication
PUBLIC_ROUTES = [
"/",
"/health",
"/docs",
"/redoc",
"/openapi.json",
"/auth/login",
"/auth/register"
]
async def dispatch(self, request: Request, call_next):
"""
Intercept all requests and validate JWT
"""
# Skip authentication for public routes
if request.url.path in self.PUBLIC_ROUTES:
return await call_next(request)
# Extract token from Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=401,
content={"detail": "Missing or invalid authorization header"}
)
token = auth_header.split(" ")[1]
try:
# Decode and validate token
payload = jwt.decode(
token,
os.getenv("SUPABASE_JWT_SECRET"),
algorithms=["HS256"]
)
# Attach user info to request state
request.state.user_id = payload.get("sub")
request.state.firm_id = payload.get("firm_id")
request.state.user_email = payload.get("email")
request.state.user_role = payload.get("role")
except JWTError as e:
return JSONResponse(
status_code=401,
content={"detail": "Invalid or expired token"}
)
# Continue to route handler
response = await call_next(request)
return response
Middleware Registration
Copy
# api/app/main.py
from app.middleware import JWTAuthMiddleware
app = FastAPI()
# Add JWT middleware
app.add_middleware(JWTAuthMiddleware)
# All routes after this are protected
Using Authentication in Routes
Accessing User Info
Copy
from fastapi import Request
@router.get("/companies")
async def get_companies(request: Request):
"""
Get companies for authenticated user
"""
# Access user info from request.state (set by middleware)
user_id = request.state.user_id
firm_id = request.state.firm_id
user_email = request.state.user_email
user_role = request.state.user_role
# Use firm_id to filter data
companies = supabase.table("companies") \
.select("*") \
.eq("firm_id", firm_id) \
.execute()
return companies.data
Dependency Injection (Alternative)
Copy
from fastapi import Depends
from app.auth import get_current_user
@router.get("/companies")
async def get_companies(current_user: User = Depends(get_current_user)):
"""
Get companies using dependency injection
"""
companies = supabase.table("companies") \
.select("*") \
.eq("firm_id", current_user.firm_id) \
.execute()
return companies.data
Token Generation
Login Endpoint
Copy
from datetime import datetime, timedelta
from jose import jwt
@router.post("/auth/login")
async def login(credentials: LoginCredentials):
"""
Authenticate user and return JWT token
"""
# Validate credentials with Supabase
auth_response = supabase.auth.sign_in_with_password({
"email": credentials.email,
"password": credentials.password
})
user = auth_response.user
# Get user's firm_id
user_data = supabase.table("users") \
.select("firm_id, role") \
.eq("id", user.id) \
.single() \
.execute()
# Create JWT payload
payload = {
"sub": user.id, # Subject (user ID)
"email": user.email,
"firm_id": user_data.data["firm_id"],
"role": user_data.data["role"],
"iat": datetime.utcnow(), # Issued at
"exp": datetime.utcnow() + timedelta(hours=24) # Expires in 24h
}
# Generate token
token = jwt.encode(
payload,
os.getenv("SUPABASE_JWT_SECRET"),
algorithm="HS256"
)
return {
"access_token": token,
"token_type": "bearer",
"expires_in": 86400, # 24 hours in seconds
"user": {
"id": user.id,
"email": user.email,
"firm_id": user_data.data["firm_id"],
"role": user_data.data["role"]
}
}
Token Refresh
Copy
@router.post("/auth/refresh")
async def refresh_token(request: Request):
"""
Refresh an expired access token
"""
# Get current token from header
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(status_code=401, detail="Missing token")
old_token = auth_header.split(" ")[1]
try:
# Decode without verification (to get payload even if expired)
payload = jwt.decode(
old_token,
options={"verify_signature": False}
)
# Verify user still exists and is active
user = supabase.table("users") \
.select("*") \
.eq("id", payload["sub"]) \
.eq("status", "active") \
.single() \
.execute()
# Generate new token
new_payload = {
"sub": user.data["id"],
"email": user.data["email"],
"firm_id": user.data["firm_id"],
"role": user.data["role"],
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=24)
}
new_token = jwt.encode(
new_payload,
os.getenv("SUPABASE_JWT_SECRET"),
algorithm="HS256"
)
return {
"access_token": new_token,
"token_type": "bearer",
"expires_in": 86400
}
except Exception as e:
raise HTTPException(status_code=401, detail="Failed to refresh token")
Security Best Practices
Use strong JWT secrets
Use strong JWT secrets
Copy
# Generate strong secret
openssl rand -base64 64
# Add to .env
SUPABASE_JWT_SECRET=your-very-long-random-string
Set appropriate expiration
Set appropriate expiration
- Access tokens: 24 hours (balances security and UX)
- Refresh tokens: 7 days
- Session tokens: 30 days for “remember me”
Validate all claims
Validate all claims
Copy
# Check required claims exist
if not payload.get("sub"):
raise HTTPException(status_code=401, detail="Invalid token structure")
# Verify expiration
if datetime.fromtimestamp(payload["exp"]) < datetime.now():
raise HTTPException(status_code=401, detail="Token expired")
Use HTTPS in production
Use HTTPS in production
Tokens are sensitive and must be transmitted over HTTPS only.
Implement token rotation
Implement token rotation
Issue new token on each refresh and invalidate old ones.
Error Handling
Custom Exception Handlers
Copy
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""
Custom handler for HTTP exceptions
"""
if exc.status_code in [401, 403]:
# Log auth errors simply
print(f"Auth error: {exc.status_code} - {exc.detail}")
else:
# Log other errors with more detail
print(f"HTTP {exc.status_code}: {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)
Configuration
Environment Variables
Copy
# JWT Configuration
SUPABASE_JWT_SECRET=your-jwt-secret-here
TOKEN_EXPIRATION_HOURS=24
# Supabase Auth
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_KEY=your-service-role-key
Middleware Order
Copy
# Middleware is applied in reverse order
app.add_middleware(CORSMiddleware) # Applied last (outer)
app.add_middleware(JWTAuthMiddleware) # Applied first (inner)
# Request flow:
# 1. CORS checks origin
# 2. JWT validates token
# 3. Route handler executes
Testing
Test with cURL
Copy
# Get token
TOKEN=$(curl -X POST \
-H "Content-Type: application/json" \
-d '{"email":"user@example.com","password":"password"}' \
http://localhost:8000/auth/login | jq -r '.access_token')
# Use token
curl -H "Authorization: Bearer $TOKEN" \
http://localhost:8000/api/companies
Test with Python
Copy
import requests
# Login
response = requests.post(
'http://localhost:8000/auth/login',
json={
'email': 'user@example.com',
'password': 'password'
}
)
token = response.json()['access_token']
# Use token
companies = requests.get(
'http://localhost:8000/api/companies',
headers={'Authorization': f'Bearer {token}'}
).json()
Troubleshooting
401 Unauthorized
401 Unauthorized
Causes:
- Token expired (> 24 hours)
- Invalid token signature
- Missing Authorization header
- Incorrect token format
- Check token expiration
- Verify SUPABASE_JWT_SECRET matches
- Ensure header format:
Authorization: Bearer {token} - Try refreshing token
CORS errors
CORS errors
Cause: Origin not allowedSolution:
Copy
# Add origin to CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True
)
