Beyond the Prototype
It’s easy to build an AI wrapper in a weekend. You throw together a Streamlit frontend, import the OpenAI SDK, add an API key to an environment variable, and you have a working prototype.
But what happens when you decide to take it to production?
Suddenly, you need to handle asynchronous token streaming so users aren't staring at a loading spinner for 10 seconds. You need persistent conversation histories. You need Retrieval-Augmented Generation (RAG) that scales beyond a local JSON file. You need connection pooling and rate limiting.
In this post, we'll architect a production-grade backend for an AI application using FastAPI, PostgreSQL (with pgvector), and Server-Sent Events (SSE) for streaming.
Architecture Overview
Our stack looks like this:
- FastAPI (Python): Provides high-performance async routing and straightforward SSE streaming.
- PostgreSQL + pgvector: Acts as both our primary relational database (users, chats, messages) and our vector database (document embeddings). Consolidating these saves massive operational overhead.
- Async SQLAlchemy: For non-blocking database queries.
- LiteLLM: A lightweight abstraction layer allowing us to seamlessly swap between OpenAI, Anthropic, or local models.
Setting up PostgreSQL and pgvector
Using PostgreSQL for vector similarity search is a superpower. You don’t need to manage a separate vector database cluster. We just need to enable the extension:
-- Step 1: Enable the extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Step 2: Create our documents table
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
document_id UUID NOT NULL,
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
embedding vector(1536) -- size matches OpenAI's text-embedding-3-small
);
-- Step 3: Create an HNSW index for fast Approximate Nearest Neighbor search
CREATE INDEX ON document_chunks USING hnsw (embedding vector_cosine_ops);
Using an HNSW (Hierarchical Navigable Small World) index is critical here. Without it, integer-exact nearest neighbor search requires a full table scan, computing the distance against every row mathematically—which crashes your database at a million rows. HNSW keeps query times in the sub-millisecond range.
Implementing the RAG Pipeline
Before generating an answer, we need to find relevant context. Using async SQLAlchemy, querying our database using cosine distance looks like this:
from sqlalchemy import select, text
from core.database import get_session
import numpy as np
async def get_relevant_context(query_embedding: list[float], limit: int = 5) -> str:
async with get_session() as session:
# We use pgvector's <=> operator which computes cosine distance
stmt = text("""
SELECT content, metadata
FROM document_chunks
ORDER BY embedding <=> :embedding::vector
LIMIT :limit
""").bindparams(
embedding=query_embedding,
limit=limit
)
result = await session.execute(stmt)
chunks = result.fetchall()
# Combine chunks into a readable context string
context = "\n\n---\n\n".join([row.content for row in chunks])
return context
Bridging FastAPI and LLM Streaming
The most critical part of an AI UX is Time-To-First-Token (TTFT). You cannot wait for the LLM to finish thinking before returning an HTTP response. We must stream tokens as they arrive over a single persisted HTTP connection using Server-Sent Events (SSE).
FastAPI makes this elegant via StreamingResponse. We'll write an async generator that yields chunks.
import json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from litellm import acompletion
app = FastAPI()
async def generate_chat_stream(messages: list[dict], context: str):
# System prompt injecting the RAG context
system_message = {
"role": "system",
"content": f"You are a helpful assistant. Answer questions based on this context:\n{context}"
}
full_messages = [system_message] + messages
# liteLLM provides an async generator when stream=True
response = await acompletion(
model="claude-3-5-sonnet-20241022",
messages=full_messages,
stream=True
)
full_response_text = ""
# Iterate over the async stream
async for chunk in response:
delta = chunk.choices[0].delta.content
if delta:
full_response_text += delta
# Format according to SSE text/event-stream spec
yield f"data: {json.dumps({'text': delta})}\n\n"
yield "data: [DONE]\n\n"
# Post-processing (Save to DB here without blocking the stream)
# await save_message_to_db(messages[-1], full_response_text)
@app.post("/api/chat")
async def chat_endpoint(request: Request):
body = await request.json()
user_message = body.get("message")
# 1. Embed query
query_embed = await generate_embedding(user_message)
# 2. Retrieve vectors
context = await get_relevant_context(query_embed)
# 3. Stream response
return StreamingResponse(
generate_chat_stream([{"role": "user", "content": user_message}], context),
media_type="text/event-stream"
)
Preventing DB Connection Exhaustion
A major failing point for AI backends is connection limits. Postgres defaults to 100 max connections. If 100 users are waiting for a 15-second generation while holding open a database connection to save the final message, your API goes down for the 101st user.
The solution is decoupling generation from DB sessions.
Notice in our stream generator, we do not hold an open async with session block while yielding to the client. The session should only be opened quickly before the stream to grab context, and quickly after the stream concludes to persist the chat history.
The Payoff
This setup scales beautifully.
- Fast TTFT because the web framework handles async I/O elegantly.
- No massive operational overhead from managing detached vector databases alongside your relational data.
- LiteLLM gives you resilience (automatic fallbacks and retries) out of the box.
Building scalable AI isn't about using the flashiest new vector startup; it's about mature software engineering principles applied to probabilistic data. If you leverage Postgres and async Python effectively, you can scale to millions of requests a month on a single $20 VPS.