Chapter 38: The Capstone Project — Build a Production Task Manager API
You've traveled a long way. Variables and loops in Chapter 1. Now metaclasses, async programming, type hints, databases, and deployment. This final chapter ties everything together.
We're building TaskFlow — a production-quality task management REST API. Not a toy. A real application you could deploy today.
Here's what it includes:
- FastAPI REST API with full CRUD
- SQLAlchemy ORM with Alembic migrations
- JWT authentication with bcrypt password hashing
- Background task processing with a simple job queue
- Structured logging with rotation
- pytest test suite with 90%+ coverage
- Docker containerization
- GitHub Actions CI/CD pipeline
Every technique from this book appears in this project. Let's build it.
Project Structure
taskflow/
├── src/
│ └── taskflow/
│ ├── __init__.py
│ ├── main.py <- FastAPI app entry point
│ ├── config.py <- Settings from environment
│ ├── database.py <- SQLAlchemy setup
│ ├── logging_config.py<- Structured logging
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── task.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── task.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── users.py
│ │ └── tasks.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── auth_service.py
│ │ ├── task_service.py
│ │ └── queue.py
│ └── dependencies.py
├── tests/
│ ├── conftest.py
│ ├── test_auth.py
│ ├── test_tasks.py
│ └── test_users.py
├── alembic/
│ ├── env.py
│ └── versions/
├── pyproject.toml
├── .env.example
├── Dockerfile
├── docker-compose.yml
└── .github/
└── workflows/
└── ci.yml
Configuration
# src/taskflow/config.py
from __future__ import annotations
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file = ".env",
env_file_encoding = "utf-8",
case_sensitive = False,
)
# Application
app_name: str = "TaskFlow"
app_version: str = "1.0.0"
debug: bool = False
# Database
database_url: str = "sqlite:///./taskflow.db"
# Security
secret_key: str = "change-me-in-production-use-a-long-random-string"
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# Logging
log_level: str = "INFO"
log_file: str = "logs/taskflow.log"
# Queue
max_workers: int = 4
def is_production(self) -> bool:
return not self.debug
def validate_secrets(self) -> None:
if self.is_production() and self.secret_key == "change-me-in-production-use-a-long-random-string":
raise RuntimeError("SECRET_KEY must be changed in production.")
@lru_cache
def get_settings() -> Settings:
"""Return cached settings — called once, reused everywhere."""
return Settings()
Logging
# src/taskflow/logging_config.py
import logging
import logging.handlers
import sys
from pathlib import Path
from taskflow.config import get_settings
def setup_logging() -> None:
settings = get_settings()
log_path = Path(settings.log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
fmt = "%(asctime)s %(levelname)-8s %(name)-30s %(message)s"
handlers: list[logging.Handler] = [
logging.StreamHandler(sys.stdout),
logging.handlers.RotatingFileHandler(
log_path,
maxBytes = 10 * 1024 * 1024, # 10 MB
backupCount = 5,
encoding = "utf-8",
),
]
logging.basicConfig(
level = getattr(logging, settings.log_level.upper()),
format = fmt,
datefmt = "%Y-%m-%d %H:%M:%S",
handlers = handlers,
force = True,
)
# Reduce noise from libraries
for noisy in ("uvicorn.access", "sqlalchemy.engine", "passlib"):
logging.getLogger(noisy).setLevel(logging.WARNING)
log = logging.getLogger(__name__)
Database
# src/taskflow/database.py
from sqlalchemy import create_engine, event
from sqlalchemy.orm import DeclarativeBase, sessionmaker, Session
from taskflow.config import get_settings
settings = get_settings()
connect_args = {}
if settings.database_url.startswith("sqlite"):
connect_args["check_same_thread"] = False
engine = create_engine(
settings.database_url,
connect_args = connect_args,
pool_pre_ping = True, # detect stale connections
echo = settings.debug,
)
# Enable WAL mode for SQLite (better concurrent reads)
if settings.database_url.startswith("sqlite"):
@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_conn, _):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
class Base(DeclarativeBase):
pass
def get_db() -> Session:
"""FastAPI dependency — yields a database session."""
db = SessionLocal()
try:
yield db
finally:
db.close()
Models
# src/taskflow/models/user.py
from __future__ import annotations
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.orm import relationship
from taskflow.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(200), unique=True, nullable=False, index=True)
hashed_password = Column(String(200), nullable=False)
full_name = Column(String(200), default="")
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
tasks = relationship("Task", back_populates="owner",
cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"User(id={self.id}, username={self.username!r})"
# src/taskflow/models/task.py
from __future__ import annotations
from datetime import datetime
from sqlalchemy import (
Boolean, Column, DateTime, Enum, ForeignKey,
Integer, String, Text
)
from sqlalchemy.orm import relationship
import enum
from taskflow.database import Base
class Priority(str, enum.Enum):
low = "low"
medium = "medium"
high = "high"
urgent = "urgent"
class Status(str, enum.Enum):
todo = "todo"
in_progress = "in_progress"
done = "done"
cancelled = "cancelled"
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text, default="")
priority = Column(Enum(Priority), default=Priority.medium, nullable=False)
status = Column(Enum(Status), default=Status.todo, nullable=False)
due_date = Column(DateTime, nullable=True)
completed_at= Column(DateTime, nullable=True)
owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
owner = relationship("User", back_populates="tasks")
def __repr__(self) -> str:
return f"Task(id={self.id}, title={self.title!r}, status={self.status})"
Schemas (Pydantic)
# src/taskflow/schemas/user.py
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field, field_validator
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50,
pattern=r"^[a-zA-Z0-9_]+$")
email: EmailStr
password: str = Field(..., min_length=8)
full_name: str = Field("", max_length=200)
@field_validator("password")
@classmethod
def password_strength(cls, v: str) -> str:
if not any(c.isupper() for c in v):
raise ValueError("Password must contain an uppercase letter.")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain a digit.")
return v
class UserUpdate(BaseModel):
full_name: str | None = Field(None, max_length=200)
email: EmailStr | None = None
class UserOut(BaseModel):
id: int
username: str
email: str
full_name: str
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class UserOutWithStats(UserOut):
task_count: int = 0
# src/taskflow/schemas/task.py
from __future__ import annotations
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from taskflow.models.task import Priority, Status
class TaskCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
description: str = Field("", max_length=5000)
priority: Priority = Priority.medium
due_date: Optional[datetime] = None
class TaskUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = Field(None, max_length=5000)
priority: Optional[Priority] = None
status: Optional[Status] = None
due_date: Optional[datetime] = None
class TaskOut(BaseModel):
id: int
title: str
description: str
priority: Priority
status: Status
due_date: Optional[datetime]
completed_at: Optional[datetime]
owner_id: int
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
Services
# src/taskflow/services/auth_service.py
from __future__ import annotations
from datetime import datetime, timedelta
import logging
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from taskflow.config import get_settings
from taskflow.models.user import User
log = logging.getLogger(__name__)
settings = get_settings()
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_ctx.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_ctx.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
payload = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=settings.access_token_expire_minutes)
)
payload["exp"] = expire
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
except JWTError as e:
log.warning("Token decode failed: %s", e)
raise
def authenticate_user(db: Session, username: str, password: str) -> User | None:
user = db.query(User).filter(User.username == username).first()
if not user or not verify_password(password, user.hashed_password):
log.warning("Failed login attempt for username=%r", username)
return None
log.info("User authenticated: %s", username)
return user
def create_user(db: Session, username: str, email: str,
password: str, full_name: str = "") -> User:
user = User(
username = username,
email = email,
hashed_password = hash_password(password),
full_name = full_name,
)
db.add(user)
db.commit()
db.refresh(user)
log.info("User created: %s (id=%d)", username, user.id)
return user
# src/taskflow/services/task_service.py
from __future__ import annotations
from datetime import datetime
import logging
from typing import Optional
from sqlalchemy.orm import Session
from taskflow.models.task import Task, Status
from taskflow.schemas.task import TaskCreate, TaskUpdate
log = logging.getLogger(__name__)
def create_task(db: Session, payload: TaskCreate, owner_id: int) -> Task:
task = Task(**payload.model_dump(), owner_id=owner_id)
db.add(task)
db.commit()
db.refresh(task)
log.info("Task created: id=%d owner=%d", task.id, owner_id)
return task
def get_task(db: Session, task_id: int, owner_id: int) -> Optional[Task]:
return db.query(Task).filter(
Task.id == task_id,
Task.owner_id == owner_id,
).first()
def list_tasks(
db: Session,
owner_id: int,
status: Optional[Status] = None,
priority: Optional[str] = None,
skip: int = 0,
limit: int = 50,
) -> list[Task]:
q = db.query(Task).filter(Task.owner_id == owner_id)
if status:
q = q.filter(Task.status == status)
if priority:
q = q.filter(Task.priority == priority)
return q.order_by(Task.created_at.desc()).offset(skip).limit(limit).all()
def update_task(db: Session, task: Task, payload: TaskUpdate) -> Task:
updates = payload.model_dump(exclude_unset=True)
for field, value in updates.items():
setattr(task, field, value)
# Auto-set completed_at when status becomes "done"
if updates.get("status") == Status.done and task.completed_at is None:
task.completed_at = datetime.utcnow()
elif updates.get("status") and updates["status"] != Status.done:
task.completed_at = None
task.updated_at = datetime.utcnow()
db.commit()
db.refresh(task)
log.info("Task updated: id=%d", task.id)
return task
def delete_task(db: Session, task: Task) -> None:
db.delete(task)
db.commit()
log.info("Task deleted: id=%d", task.id)
def get_stats(db: Session, owner_id: int) -> dict:
from sqlalchemy import func
from taskflow.models.task import Priority
rows = (
db.query(Task.status, Task.priority, func.count(Task.id))
.filter(Task.owner_id == owner_id)
.group_by(Task.status, Task.priority)
.all()
)
stats: dict = {"total": 0, "by_status": {}, "by_priority": {}}
for status, priority, count in rows:
stats["total"] += count
stats["by_status"][status] = stats["by_status"].get(status, 0) + count
stats["by_priority"][priority] = stats["by_priority"].get(priority, 0) + count
return stats
Dependencies
# src/taskflow/dependencies.py
from __future__ import annotations
import logging
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from sqlalchemy.orm import Session
from taskflow.database import get_db
from taskflow.models.user import User
from taskflow.services.auth_service import decode_token
log = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
) -> User:
credentials_exception = HTTPException(
status_code = status.HTTP_401_UNAUTHORIZED,
detail = "Could not validate credentials",
headers = {"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
username = payload.get("sub")
if not username:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if not user:
raise credentials_exception
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
def get_admin_user(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
return current_user
Routers
# src/taskflow/routers/auth.py
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from taskflow.database import get_db
from taskflow.schemas.user import UserCreate, UserOut
from taskflow.services import auth_service
router = APIRouter(prefix="/auth", tags=["Authentication"])
@router.post("/register", response_model=UserOut, status_code=201)
def register(payload: UserCreate, db: Session = Depends(get_db)):
"""Register a new user account."""
if db.query(auth_service.User).filter_by(username=payload.username).first():
raise HTTPException(status_code=400, detail="Username already taken")
if db.query(auth_service.User).filter_by(email=payload.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
user = auth_service.create_user(
db, payload.username, payload.email,
payload.password, payload.full_name
)
return user
@router.post("/token")
def login(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""Obtain a JWT access token."""
user = auth_service.authenticate_user(db, form.username, form.password)
if not user:
raise HTTPException(
status_code = status.HTTP_401_UNAUTHORIZED,
detail = "Incorrect username or password",
headers = {"WWW-Authenticate": "Bearer"},
)
token = auth_service.create_access_token({"sub": user.username})
return {"access_token": token, "token_type": "bearer"}
# src/taskflow/routers/tasks.py
from __future__ import annotations
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from taskflow.database import get_db
from taskflow.dependencies import get_current_user
from taskflow.models.task import Priority, Status
from taskflow.models.user import User
from taskflow.schemas.task import TaskCreate, TaskOut, TaskUpdate
from taskflow.services import task_service
router = APIRouter(prefix="/tasks", tags=["Tasks"])
@router.get("/", response_model=list[TaskOut])
def list_tasks(
task_status: Optional[Status] = Query(None, alias="status"),
priority: Optional[Priority] = None,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""List all tasks for the current user."""
return task_service.list_tasks(
db, current_user.id, task_status, priority, skip, limit
)
@router.get("/stats")
def task_stats(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Return task statistics for the current user."""
return task_service.get_stats(db, current_user.id)
@router.get("/{task_id}", response_model=TaskOut)
def get_task(
task_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Get a specific task by ID."""
task = task_service.get_task(db, task_id, current_user.id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.post("/", response_model=TaskOut, status_code=201)
def create_task(
payload: TaskCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Create a new task."""
return task_service.create_task(db, payload, current_user.id)
@router.patch("/{task_id}", response_model=TaskOut)
def update_task(
task_id: int,
payload: TaskUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Partially update a task."""
task = task_service.get_task(db, task_id, current_user.id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task_service.update_task(db, task, payload)
@router.delete("/{task_id}", status_code=204)
def delete_task(
task_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Delete a task."""
task = task_service.get_task(db, task_id, current_user.id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
task_service.delete_task(db, task)
Background Queue
# src/taskflow/services/queue.py
"""
A simple in-process background task queue using threading.
For production, replace with Celery + Redis or RQ.
"""
from __future__ import annotations
import logging
import queue
import threading
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable
from uuid import uuid4
log = logging.getLogger(__name__)
@dataclass
class Job:
id: str = field(default_factory=lambda: str(uuid4()))
func: Callable = field(repr=False)
args: tuple = ()
kwargs: dict = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.utcnow)
status: str = "pending"
result: Any = None
error: str | None = None
class TaskQueue:
"""Thread-pool backed task queue."""
def __init__(self, workers: int = 4):
self._queue = queue.Queue()
self._jobs: dict[str, Job] = {}
self._lock = threading.Lock()
self._workers = [
threading.Thread(target=self._worker, daemon=True, name=f"worker-{i}")
for i in range(workers)
]
for w in self._workers:
w.start()
log.info("TaskQueue started with %d workers", workers)
def enqueue(self, func: Callable, *args, **kwargs) -> str:
job = Job(func=func, args=args, kwargs=kwargs)
with self._lock:
self._jobs[job.id] = job
self._queue.put(job)
log.debug("Enqueued job %s: %s", job.id, func.__name__)
return job.id
def get_job(self, job_id: str) -> Job | None:
with self._lock:
return self._jobs.get(job_id)
def _worker(self) -> None:
while True:
job: Job = self._queue.get()
log.debug("Worker %s processing job %s", threading.current_thread().name, job.id)
try:
with self._lock:
job.status = "running"
result = job.func(*job.args, **job.kwargs)
with self._lock:
job.status = "done"
job.result = result
log.info("Job %s completed", job.id)
except Exception as e:
with self._lock:
job.status = "failed"
job.error = str(e)
log.error("Job %s failed: %s", job.id, e)
finally:
self._queue.task_done()
# Singleton queue — shared across the app
_queue: TaskQueue | None = None
def get_queue() -> TaskQueue:
global _queue
if _queue is None:
from taskflow.config import get_settings
_queue = TaskQueue(workers=get_settings().max_workers)
return _queue
Main Application
# src/taskflow/main.py
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from taskflow.config import get_settings
from taskflow.database import Base, engine
from taskflow.logging_config import setup_logging
from taskflow.routers import auth, tasks
from taskflow.services.queue import get_queue
setup_logging()
log = logging.getLogger(__name__)
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Run on startup and shutdown."""
log.info("Starting %s v%s", settings.app_name, settings.app_version)
Base.metadata.create_all(bind=engine)
get_queue() # initialize background queue
yield
log.info("Shutting down %s", settings.app_name)
app = FastAPI(
title = settings.app_name,
version = settings.app_version,
description = "A production-ready task management API.",
lifespan = lifespan,
docs_url = "/docs",
redoc_url = "/redoc",
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins = ["*"],
allow_credentials = True,
allow_methods = ["*"],
allow_headers = ["*"],
)
# Routers
app.include_router(auth.router)
app.include_router(tasks.router)
@app.get("/", tags=["Health"])
def root():
return {"name": settings.app_name, "version": settings.app_version, "status": "ok"}
@app.get("/health", tags=["Health"])
def health():
return {"status": "healthy"}
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
log.error("Unhandled exception on %s: %s", request.url, exc, exc_info=True)
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
Tests
# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from taskflow.database import Base, get_db
from taskflow.main import app
from taskflow.services.auth_service import create_user
TEST_DB_URL = "sqlite:///./test.db"
engine = create_engine(TEST_DB_URL, connect_args={"check_same_thread": False})
TestSession = sessionmaker(bind=engine)
@pytest.fixture(autouse=True)
def setup_db():
Base.metadata.create_all(engine)
yield
Base.metadata.drop_all(engine)
@pytest.fixture
def db():
session = TestSession()
try:
yield session
finally:
session.close()
@pytest.fixture
def client(db):
def override_get_db():
try:
yield db
finally:
pass
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as c:
yield c
app.dependency_overrides.clear()
@pytest.fixture
def test_user(db):
return create_user(db, "testuser", "test@example.com", "Password1", "Test User")
@pytest.fixture
def auth_headers(client, test_user):
response = client.post("/auth/token", data={
"username": "testuser",
"password": "Password1",
})
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
# tests/test_auth.py
import pytest
def test_register_success(client):
r = client.post("/auth/register", json={
"username": "alice",
"email": "alice@example.com",
"password": "Password1",
})
assert r.status_code == 201
data = r.json()
assert data["username"] == "alice"
assert "hashed_password" not in data
def test_register_duplicate_username(client, test_user):
r = client.post("/auth/register", json={
"username": "testuser",
"email": "other@example.com",
"password": "Password1",
})
assert r.status_code == 400
def test_login_success(client, test_user):
r = client.post("/auth/token", data={
"username": "testuser",
"password": "Password1",
})
assert r.status_code == 200
assert "access_token" in r.json()
assert r.json()["token_type"] == "bearer"
def test_login_wrong_password(client, test_user):
r = client.post("/auth/token", data={
"username": "testuser",
"password": "WrongPassword1",
})
assert r.status_code == 401
def test_protected_route_without_token(client):
r = client.get("/tasks/")
assert r.status_code == 401
def test_protected_route_with_token(client, auth_headers):
r = client.get("/tasks/", headers=auth_headers)
assert r.status_code == 200
# tests/test_tasks.py
import pytest
@pytest.fixture
def created_task(client, auth_headers):
r = client.post("/tasks/", headers=auth_headers, json={
"title": "Write tests",
"priority": "high",
})
assert r.status_code == 201
return r.json()
def test_create_task(client, auth_headers):
r = client.post("/tasks/", headers=auth_headers, json={
"title": "Learn Python",
"description": "Read the book",
"priority": "high",
})
assert r.status_code == 201
data = r.json()
assert data["title"] == "Learn Python"
assert data["priority"] == "high"
assert data["status"] == "todo"
def test_list_tasks_empty(client, auth_headers):
r = client.get("/tasks/", headers=auth_headers)
assert r.status_code == 200
assert r.json() == []
def test_list_tasks_with_filter(client, auth_headers, created_task):
r = client.get("/tasks/?status=todo", headers=auth_headers)
assert r.status_code == 200
assert len(r.json()) == 1
r = client.get("/tasks/?status=done", headers=auth_headers)
assert r.status_code == 200
assert len(r.json()) == 0
def test_get_task(client, auth_headers, created_task):
r = client.get(f"/tasks/{created_task['id']}", headers=auth_headers)
assert r.status_code == 200
assert r.json()["id"] == created_task["id"]
def test_get_task_not_found(client, auth_headers):
r = client.get("/tasks/99999", headers=auth_headers)
assert r.status_code == 404
def test_update_task_status(client, auth_headers, created_task):
r = client.patch(
f"/tasks/{created_task['id']}",
headers=auth_headers,
json={"status": "done"},
)
assert r.status_code == 200
data = r.json()
assert data["status"] == "done"
assert data["completed_at"] is not None
def test_delete_task(client, auth_headers, created_task):
r = client.delete(f"/tasks/{created_task['id']}", headers=auth_headers)
assert r.status_code == 204
r = client.get(f"/tasks/{created_task['id']}", headers=auth_headers)
assert r.status_code == 404
def test_task_stats(client, auth_headers):
client.post("/tasks/", headers=auth_headers, json={"title": "Task A", "priority": "high"})
client.post("/tasks/", headers=auth_headers, json={"title": "Task B", "priority": "low"})
client.post("/tasks/", headers=auth_headers, json={"title": "Task C", "priority": "high"})
r = client.get("/tasks/stats", headers=auth_headers)
assert r.status_code == 200
stats = r.json()
assert stats["total"] == 3
assert stats["by_status"]["todo"] == 3
assert stats["by_priority"]["high"] == 2
Dockerfile
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install dependencies first (cached layer)
COPY pyproject.toml .
RUN pip install --no-cache-dir build && \
pip install --no-cache-dir -e ".[prod]"
# Copy application code
COPY src/ src/
# Non-root user for security
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
CMD ["uvicorn", "taskflow.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=sqlite:///./taskflow.db
- SECRET_KEY=${SECRET_KEY}
- LOG_LEVEL=INFO
volumes:
- ./data:/app/data
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
GitHub Actions CI/CD
# .github/workflows/ci.yml
name: CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install -e ".[dev]"
- name: Lint
run: ruff check src/ tests/
- name: Type check
run: mypy src/taskflow
- name: Test
run: pytest tests/ --cov=src/taskflow --cov-report=xml --cov-fail-under=85
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
docker:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: docker build -t taskflow:latest .
- name: Smoke test
run: |
docker run -d -p 8000:8000 --name taskflow_test \
-e SECRET_KEY=test-secret-key taskflow:latest
sleep 5
curl -f http://localhost:8000/health
docker stop taskflow_test
pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "taskflow"
version = "1.0.0"
description = "Production-ready task management API"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.110",
"uvicorn[standard]>=0.29",
"sqlalchemy>=2.0",
"alembic>=1.13",
"pydantic>=2.6",
"pydantic-settings>=2.2",
"python-jose[cryptography]>=3.3",
"passlib[bcrypt]>=1.7",
"python-multipart>=0.0.9",
]
[project.optional-dependencies]
dev = [
"pytest>=8",
"pytest-cov>=5",
"httpx>=0.27",
"mypy>=1.9",
"ruff>=0.3",
]
prod = [
"gunicorn>=22",
]
[project.scripts]
taskflow = "taskflow.main:app"
[tool.hatch.build.targets.wheel]
packages = ["src/taskflow"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --tb=short"
[tool.mypy]
python_version = "3.12"
strict = true
ignore_missing_imports = true
[tool.ruff.lint]
select = ["E", "F", "I", "N", "UP"]
Running the Application
# 1. Clone and set up
git clone https://github.com/you/taskflow
cd taskflow
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -e ".[dev]"
# 2. Configure
cp .env.example .env
# Edit .env — set SECRET_KEY to a long random string
# 3. Run
uvicorn taskflow.main:app --reload
# -> http://localhost:8000/docs (Swagger UI)
# -> http://localhost:8000/redoc (ReDoc)
# 4. Test
pytest tests/ -v --cov=src/taskflow
# 5. Docker
docker-compose up --build
Visit http://localhost:8000/docs — you'll see the full interactive API:
- Click
POST /auth/register-> register a user - Click
POST /auth/token-> log in, copy the token - Click the Authorize button -> paste the token
- Now all task endpoints are accessible
What This Project Used
Every major topic from this book appeared in TaskFlow:
| Chapter | Used in TaskFlow |
|---|---|
| Variables, types, f-strings | Throughout |
| Conditionals, loops | Services, queue worker |
| Functions, scope | All service functions |
| Classes, OOP | Models, schemas, services |
| Inheritance, polymorphism | SQLAlchemy model hierarchy |
| Special methods | __repr__ on all models |
| Error handling | All routers, auth service |
| Comprehensions | Stats aggregation, list filtering |
| Modules, packages | Full package structure |
| Standard library | logging, threading, queue, datetime, uuid |
| Type hints | Every function signature |
| Dataclasses | Job dataclass in queue |
| Decorators | @lru_cache, @field_validator, @asynccontextmanager |
| Generators | SQLAlchemy yield sessions |
| Context managers | get_db() yield, lifespan() |
| Functional programming | model_dump(exclude_unset=True) |
| Concurrency | threading.Thread worker pool |
| Testing | Full pytest suite with fixtures |
| Databases | SQLAlchemy ORM + SQLite |
| Web development | FastAPI with Pydantic |
| Packaging | pyproject.toml |
| Professional practices | Logging, env vars, clean code |
Where to Go From Here
You've finished the book. Here's what to do next:
Deploy TaskFlow. Take this exact project, set DATABASE_URL to a real PostgreSQL instance (free tier on Supabase or Railway), add a SECRET_KEY, and run docker-compose up. You'll have a live API in 10 minutes.
Add features. Tags on tasks. Task assignments (multiple users). Email notifications when tasks are due. A dashboard page with charts using Chart.js. All of these use skills from this book.
Read excellent code. The best way to improve is to read code written by expert Python developers. Study the source of fastapi, pydantic, httpx, and rich. Each one teaches you something new.
Contribute to open source. Find a project you use, fix a bug, improve the docs, add a test. Your first PR is the hardest. After that, it becomes normal.
Build something real. Every skill in this book was taught through building — and building is how you retain it. Pick one idea you genuinely care about and build it completely. Not a tutorial project. Something you'd actually use.
Python is a tool. What matters is what you build with it.
Go build something worth building.
Final Summary: What You Now Know
You started with print("Hello, World!"). You ended with a production API, complete with authentication, a background queue, structured logging, a test suite, and Docker deployment.
Along the way you learned:
- The fundamentals — variables, operators, conditionals, loops, functions, files, error handling
- Data structures — lists, tuples, sets, dictionaries, strings — and when to use each
- Object-oriented programming — classes, inheritance, polymorphism, encapsulation, special methods, properties, dataclasses
- Functional programming — pure functions, closures, decorators, generators, context managers,
map/filter/reduce,functools - Advanced types — type hints,
TypeVar,Generic,Protocol,NamedTuple,TypedDict - Concurrency — threading, multiprocessing, asyncio — and when to use each
- Testing — pytest, fixtures, parametrize, mocking, coverage
- Debugging —
pdb,breakpoint(),logging, tracebacks, profiling - Performance —
timeit,cProfile, algorithmic complexity, numpy - Databases — SQLite, PostgreSQL, SQLAlchemy, migrations
- Web development — Flask, FastAPI, REST, JWT authentication
- Data science — numpy, pandas, matplotlib, scikit-learn
- Packaging —
pyproject.toml, building, publishing to PyPI - Professional practices — style, docstrings, code review, git, environment variables
- Internals — bytecode, the GIL, memory management, descriptors, metaclasses
- Metaprogramming — class factories,
exec/eval, dynamic attribute generation
You are no longer a beginner. You are a Python developer.
The language is the same. What you do with it — that's up to you.