Building with AI – A Developer's Diary
Starlette Day 5 — Background Tasks, WebSockets, and Async HTTP
Add post-response background work, async outbound HTTP calls with HTTPX, and live WebSocket updates — the patterns that move a backend from handling requests to doing real work.
After four days you have a real backend: structured routes, a persistent database, authentication, and logging middleware. The one thing it still does synchronously is everything. A client makes a request, waits, gets a response. Today you break that model. You'll add background tasks that run after the response is sent, async outbound HTTP calls using HTTPX, and WebSocket support so connected clients get live updates. These are the three features that move a backend from "handles requests" to "does work."
All three are first-class features in Starlette—no extra framework required. You already have everything you need installed except HTTPX.
Before you start — activate your virtual environment
Shellcd starlette-day1
source venv/bin/activate # Mac / Linux
# venv\Scripts\activate # Windows
Then install the two new packages for today:
Shellpip install httpx "uvicorn[standard]"
HTTPX is an async-capable HTTP client. Its docs recommend using the async client when working inside an async web framework—which Starlette is. That's why it fits here rather than the requests library, which is synchronous and would block the event loop on every outbound call.
uvicorn[standard] upgrades the Uvicorn installation you've been using since Day 1 with full WebSocket support via the websockets library. The base uvicorn package you installed initially is minimal by design—it deliberately omits WebSocket support to keep the dependency footprint small. Without this upgrade, Uvicorn cannot perform the HTTP → WebSocket upgrade handshake, and any request to a WebSocket endpoint falls through as a plain HTTP GET—returning a 404 even though the route is correctly registered.
What you're building today
The complete request flow for a new task will be:
ShellClient sends POST /tasks
↓
Auth middleware verifies token
↓
Task saved to SQLite
↓
API returns 201 immediately ← client is done waiting
↓
Background task starts
↓
Async outbound HTTP call simulates external sync
↓
WebSocket clients receive live update
The key line is "client is done waiting." Starlette background tasks are specifically designed for work that should happen after the response is sent. Your database write is fast and reliable. The external sync may be slow or fail. Keeping them separate means neither one holds up the other.
Update the project structure
Add a WebSocket connection manager and a sync service:
Shellmkdir app/ws
touch app/ws/__init__.py
touch app/ws/manager.py
touch app/routes/ws.py
touch app/services/sync_service.py
Your full structure now looks like this:
Shellstarlette-day1/
├── venv/
├── alembic/
├── alembic.ini
├── app.db
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database.py
│ ├── auth/
│ ├── middleware/
│ ├── models/
│ ├── ws/ ← new
│ │ ├── __init__.py
│ │ └── manager.py
│ ├── routes/
│ │ ├── __init__.py
│ │ ├── tasks.py
│ │ └── ws.py ← new
│ └── services/
│ ├── __init__.py
│ ├── task_service.py
│ └── sync_service.py ← new
Build the WebSocket connection manager
Create app/ws/manager.py. This module maintains a list of active WebSocket connections and provides a way to broadcast messages to all of them:
from starlette.websockets import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def broadcast_json(self, data: dict):
disconnected = []
for connection in self.active_connections:
try:
await connection.send_json(data)
except Exception:
disconnected.append(connection)
for connection in disconnected:
self.disconnect(connection)
manager = ConnectionManager()
The module-level manager instance is intentional. Because it's a module-level singleton, every part of the app that imports it gets the same object with the same connection list. When the sync service broadcasts a message, it reaches every connected WebSocket client regardless of where in the codebase the broadcast is called from.
The broadcast_json method collects any failed sends rather than raising immediately. A broken connection shouldn't stop the broadcast from reaching other connected clients. Disconnected connections are removed after the loop finishes to avoid mutating the list while iterating over it.
Create the WebSocket route
Create app/routes/ws.py:
from starlette.routing import WebSocketRoute
from starlette.websockets import WebSocket, WebSocketDisconnect
from app.ws.manager import manager
async def task_updates_socket(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
message = await websocket.receive_text()
await websocket.send_json({
"type": "echo",
"message": message,
})
except WebSocketDisconnect:
manager.disconnect(websocket)
routes = [
WebSocketRoute("/ws/tasks", task_updates_socket),
]
WebSockets have a different lifecycle than HTTP routes. An HTTP handler runs once and returns a response. A WebSocket handler runs in a loop—it calls receive_text() (or receive_json()) which blocks until the client sends a message, then handles it, then loops back to wait. When the client disconnects, Starlette raises WebSocketDisconnect, which you catch to clean up.
The echo behavior here (send back whatever the client sends) is mostly for testing the connection. The more interesting direction is server-to-client: the sync service broadcasting task updates to connected clients after a background job completes.
Create the sync service
Create app/services/sync_service.py. This function accepts a task dictionary, makes an outbound async HTTP call simulating an external sync, and broadcasts the result to WebSocket clients:
import httpx
from app.ws.manager import manager
async def sync_task_to_external_service(task: dict):
payload = {
"task_id": task["id"],
"title": task["title"],
"source": "starlette-demo",
}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
"https://httpbin.org/post",
json=payload,
)
response.raise_for_status()
await manager.broadcast_json({
"type": "task_synced",
"task": task,
})
return {"ok": True, "detail": "Task synced successfully"}
except httpx.HTTPError as exc:
await manager.broadcast_json({
"type": "task_sync_failed",
"task": task,
"error": str(exc),
})
return {"ok": False, "detail": str(exc)}
A few things worth understanding here:
httpx.AsyncClient(timeout=5.0)— HTTPX applies a default timeout, but setting it explicitly is clearer. Five seconds is reasonable for a background sync; you don't want a slow external service holding your background worker indefinitely.response.raise_for_status()— turns 4xx and 5xx responses into exceptions. Without this, a 500 from the external service would look like success.except httpx.HTTPError— catches both network errors (connection refused, timeout) and HTTP errors raised byraise_for_status(). The sync failure is broadcast to WebSocket clients so a connected frontend can surface it without the request caller ever knowing.- The function accepts a plain
dict, not a SQLAlchemy model object. This matters—covered in the next section.
The target URL is httpbin.org/post, a free public endpoint that accepts any POST body and echoes it back. It's a convenient stand-in for a real external API while you're learning.
Update the POST route with a background task
Replace app/routes/tasks.py with this version. The key change is in create_task—it attaches a BackgroundTask to the response:
from starlette.background import BackgroundTask
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from app.database import SessionLocal
from app.services import task_service
from app.services.sync_service import sync_task_to_external_service
def require_authentication(request):
if not request.user.is_authenticated:
return JSONResponse({"error": "Authentication required"}, status_code=401)
return None
async def health_check(request):
return JSONResponse({"message": "API is running"})
async def get_tasks(request):
auth_error = require_authentication(request)
if auth_error:
return auth_error
with SessionLocal() as db:
tasks = task_service.get_all_tasks(db)
return JSONResponse({
"user": request.user.display_name,
"items": tasks,
})
async def get_task(request):
auth_error = require_authentication(request)
if auth_error:
return auth_error
task_id = int(request.path_params["id"])
with SessionLocal() as db:
task = task_service.get_task(db, task_id)
if not task:
return JSONResponse({"error": "Task not found"}, status_code=404)
return JSONResponse(task)
async def create_task(request: Request):
auth_error = require_authentication(request)
if auth_error:
return auth_error
data = await request.json()
title = data.get("title")
if not title:
return JSONResponse({"error": "Title is required"}, status_code=400)
with SessionLocal() as db:
task = task_service.create_task(db, title)
background = BackgroundTask(sync_task_to_external_service, task)
return JSONResponse(
{
"message": "Task created",
"task": task,
"sync": "scheduled",
},
status_code=201,
background=background,
)
async def update_task(request: Request):
auth_error = require_authentication(request)
if auth_error:
return auth_error
task_id = int(request.path_params["id"])
data = await request.json()
title = data.get("title")
if not title:
return JSONResponse({"error": "Title is required"}, status_code=400)
with SessionLocal() as db:
task = task_service.update_task(db, task_id, title)
if not task:
return JSONResponse({"error": "Task not found"}, status_code=404)
return JSONResponse(task)
async def delete_task(request):
auth_error = require_authentication(request)
if auth_error:
return auth_error
task_id = int(request.path_params["id"])
with SessionLocal() as db:
deleted = task_service.delete_task(db, task_id)
if not deleted:
return JSONResponse({"error": "Task not found"}, status_code=404)
return JSONResponse({"message": "Deleted"})
routes = [
Route("/", health_check),
Route("/tasks", get_tasks),
Route("/tasks", create_task, methods=["POST"]),
Route("/tasks/{id:int}", get_task),
Route("/tasks/{id:int}", update_task, methods=["PUT"]),
Route("/tasks/{id:int}", delete_task, methods=["DELETE"]),
]
Notice that the with SessionLocal() as db: block in create_task closes before the background task is attached to the response. That's intentional and important.
task_service.create_task() returns a plain Python dictionary—the serialized task—not a SQLAlchemy model object. If you passed the model object itself to the background function, it would be detached from the closed session by the time the background task ran, and accessing any of its attributes would either return stale data or raise a DetachedInstanceError. Passing a plain dict avoids this entirely. Your service layer's serialize_task() function is what makes this clean—the route never holds a live ORM object past the end of the session block.
BackgroundTask works: BackgroundTask(func, arg) schedules func(arg) to run after the response is fully sent to the client. You attach it to the JSONResponse via the background argument. Starlette handles the rest—the client receives the 201 immediately, then the background function runs.
Wire the WebSocket route into main.py
Update app/main.py to include both route lists. Python list concatenation with + works here because both task_routes and ws_routes are plain lists:
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse
from app.auth.backend import BasicTokenAuthBackend
from app.middleware.logging import LoggingMiddleware
from app.routes.tasks import routes as task_routes
from app.routes.ws import routes as ws_routes
def on_auth_error(request, exc):
return JSONResponse({"error": str(exc)}, status_code=401)
middleware = [
Middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://127.0.0.1:3000"],
allow_methods=["*"],
allow_headers=["*"],
),
Middleware(LoggingMiddleware),
Middleware(
AuthenticationMiddleware,
backend=BasicTokenAuthBackend(),
on_error=on_auth_error,
),
]
app = Starlette(
debug=True,
routes=task_routes + ws_routes,
middleware=middleware,
)
Confirm the task service
Since you added the status field on Day 3, your app/services/task_service.py should already include it in the serializer. Confirm your file looks like this—pay particular attention to create_task passing status="open" explicitly and serialize_task including the field:
from sqlalchemy.orm import Session
from app.models.task import Task
def serialize_task(task: Task) -> dict:
return {
"id": task.id,
"title": task.title,
"status": task.status,
}
def get_all_tasks(db: Session) -> list[dict]:
tasks = db.query(Task).order_by(Task.id.asc()).all()
return [serialize_task(task) for task in tasks]
def get_task(db: Session, task_id: int) -> dict | None:
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
return None
return serialize_task(task)
def create_task(db: Session, title: str) -> dict:
task = Task(title=title, status="open")
db.add(task)
db.commit()
db.refresh(task)
return serialize_task(task)
def update_task(db: Session, task_id: int, title: str) -> dict | None:
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
return None
task.title = title
db.commit()
db.refresh(task)
return serialize_task(task)
def delete_task(db: Session, task_id: int) -> bool:
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
return False
db.delete(task)
db.commit()
return True
The explicit status="open" in Task(title=title, status="open") is technically redundant—the model column has server_default="open" at the database level. Being explicit here makes the intent clear in code and means the field is populated on the Python object before db.commit(), so serialize_task sees it without needing the db.refresh() call to load it from the DB.
Run the app
Shelluvicorn app.main:app --reload
Test the HTTP and background flow
Create a task with auth:
Shellcurl -X POST http://127.0.0.1:8000/tasks \
-H "Content-Type: application/json" \
-H "Authorization: Bearer dev-secret-token" \
-d '{"title": "Task with background sync"}'
You should get an immediate 201 response:
JSON{
"message": "Task created",
"task": {
"id": 1,
"title": "Task with background sync",
"status": "open"
},
"sync": "scheduled"
}
Look at your Uvicorn terminal after the response. A moment later you'll see the logging middleware output for the request, and if echo=True is still on in database.py you'll see the INSERT statement. The background sync runs after all of that—the outbound call to httpbin.org happens entirely after the client got its response.
Test the WebSocket connection
You have a few options for testing WebSockets from the terminal. The most straightforward is websocat, a command-line WebSocket client. Install it with Homebrew on Mac:
brew install websocat
uvicorn[standard] wasn't installed (or the install ran outside the virtual environment). Without the WebSocket library, Uvicorn cannot perform the HTTP → WebSocket upgrade handshake—it falls through and treats the request as a plain HTTP GET, which has no matching route, so it returns a 404. Stop the server, run pip install "uvicorn[standard]" with your venv active, and restart.
Then connect to your WebSocket endpoint in one terminal window:
Shellwebsocat ws://127.0.0.1:8000/ws/tasks
The connection opens and waits. Type any message and press Enter—you'll see the echo response come back. Then, in a second terminal window, create a task via the API:
Shellcurl -X POST http://127.0.0.1:8000/tasks \
-H "Content-Type: application/json" \
-H "Authorization: Bearer dev-secret-token" \
-d '{"title": "WebSocket test task"}'
After the 201 response is sent, the background sync runs. When it completes (or fails), your WebSocket terminal receives a broadcast:
JSON{"type": "task_synced", "task": {"id": 2, "title": "WebSocket test task", "status": "open"}}
If you prefer to test from a browser, open the browser console on any page and run:
JavaScriptconst socket = new WebSocket("ws://127.0.0.1:8000/ws/tasks");
socket.onopen = () => console.log("connected");
socket.onmessage = (event) => console.log("received:", event.data);
Create another task via curl and watch the console. The message appears after the background sync completes, not when the task is created—that's the point.
Why this pattern matters
The "write fast, respond fast, sync after" pattern is one of the most common real-world backend architectures:
- Your database is fast and under your control. Save to it immediately.
- External services may be slow, unreliable, or rate-limited. Don't make the client wait on them.
- If the sync fails, the record is already in your database. You can retry later. If you did the sync synchronously and it timed out, you'd have a failed request and a frustrated user.
This pattern scales naturally to webhooks, push notifications, email delivery, third-party integrations, and analytics events—anything where you need to "do something else" after saving, without the client waiting on it.
An important caveat about in-process background tasks
Starlette's BackgroundTask runs inside the same process as your web server. That means:
- If the server process restarts or crashes while a background task is running, that task is lost.
- There's no retry mechanism—a failed sync stays failed until something else triggers it again.
- Tasks don't survive a deploy.
For the sync pattern you built today—a best-effort notification to an external service—this is fine. For anything that absolutely must complete (payment processing, critical notifications, audit logging), you'd use a proper job queue with durable storage and retry logic, such as Celery with Redis or a cloud-native queue service.
The Day 5 approach is the right concept to learn first. The queue-backed version uses the same mental model; it just adds durability guarantees around it.
Common mistakes
Doing slow work before returning the response. If the sync call happens before JSONResponse() is returned, the client waits for the full round trip to httpbin.org. The entire point of BackgroundTask is that the work happens after. If your API feels slow, check that your background work isn't accidentally inline.
Passing a SQLAlchemy model object to a background task. The session is closed when the with SessionLocal() block exits. Any model object from that session is detached—accessing its attributes after the session closes raises DetachedInstanceError. Always serialize to a plain dict before the session closes, then pass the dict to the background function.
Using a synchronous HTTP library inside async code. The requests library blocks the event loop. Inside an async Starlette app, use httpx.AsyncClient with await for any outbound calls. The event loop stays responsive while the outbound call is in flight.
Treating WebSocket handlers like HTTP handlers. WebSocket handlers run in a loop. They don't return a response—they stay open until the client disconnects. If you forget the while True: loop, the handler will run once and close the connection immediately.
Treating in-process background tasks like a durable job queue. They're lightweight and convenient, but they carry no durability guarantees. Know the difference before putting critical work behind them.
What you actually learned
- Background tasks in Starlette run after the response is sent. Attach one to a
JSONResponseusing thebackgroundargument. The client is unblocked; the work happens asynchronously. - HTTPX async client makes outbound HTTP calls without blocking the event loop. Use the context manager form (
async with httpx.AsyncClient() as client) for clean resource handling.raise_for_status()andexcept httpx.HTTPErrorgive you a solid failure-handling baseline. - WebSocket routes use
WebSocketRouteand follow an accept → loop → disconnect lifecycle. Server-to-client broadcasting through a shared connection manager is the pattern for pushing live updates. - Pass plain dicts, not ORM objects, to background functions. The session is closed before the background task runs. Serialized dicts are safe to use anywhere; detached model objects are not.
What you've built across all five days
Looking back at the full series:
- Day 1 — Starlette basics: routes, JSON responses, POST handling, async fundamentals
- Day 2 — Project structure: routes, services, and a clean entry point that scales
- Day 3 — Real persistence: SQLAlchemy models, SQLite, Alembic migrations
- Day 4 — Auth and middleware: token auth backend, CORS, request logging
- Day 5 — Async patterns: background tasks, outbound HTTP, WebSocket updates
Each layer was added to something that already worked. The structure you set up on Day 2 made Day 3 easy. The service/route separation made Day 5's background task pattern clean. That's the real lesson of the series: the early decisions about structure pay off compounding over time.