Python: Zero to Hero
Home/The Python Ecosystem
Share

Chapter 38: The Capstone Project — Build a Production Task Manager API

You've traveled a long way. Variables and loops in Chapter 1. Now metaclasses, async programming, type hints, databases, and deployment. This final chapter ties everything together.

We're building TaskFlow — a production-quality task management REST API. Not a toy. A real application you could deploy today.

Here's what it includes:

  • FastAPI REST API with full CRUD
  • SQLAlchemy ORM with Alembic migrations
  • JWT authentication with bcrypt password hashing
  • Background task processing with a simple job queue
  • Structured logging with rotation
  • pytest test suite with 90%+ coverage
  • Docker containerization
  • GitHub Actions CI/CD pipeline

Every technique from this book appears in this project. Let's build it.

Project Structure

taskflow/
├── src/
│   └── taskflow/
│       ├── __init__.py
│       ├── main.py          <- FastAPI app entry point
│       ├── config.py        <- Settings from environment
│       ├── database.py      <- SQLAlchemy setup
│       ├── logging_config.py<- Structured logging
│       ├── models/
│       │   ├── __init__.py
│       │   ├── user.py
│       │   └── task.py
│       ├── schemas/
│       │   ├── __init__.py
│       │   ├── user.py
│       │   └── task.py
│       ├── routers/
│       │   ├── __init__.py
│       │   ├── auth.py
│       │   ├── users.py
│       │   └── tasks.py
│       ├── services/
│       │   ├── __init__.py
│       │   ├── auth_service.py
│       │   ├── task_service.py
│       │   └── queue.py
│       └── dependencies.py
├── tests/
│   ├── conftest.py
│   ├── test_auth.py
│   ├── test_tasks.py
│   └── test_users.py
├── alembic/
│   ├── env.py
│   └── versions/
├── pyproject.toml
├── .env.example
├── Dockerfile
├── docker-compose.yml
└── .github/
    └── workflows/
        └── ci.yml

Configuration

# src/taskflow/config.py
from __future__ import annotations
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file        = ".env",
        env_file_encoding = "utf-8",
        case_sensitive  = False,
    )

    # Application
    app_name:        str  = "TaskFlow"
    app_version:     str  = "1.0.0"
    debug:           bool = False

    # Database
    database_url:    str  = "sqlite:///./taskflow.db"

    # Security
    secret_key:      str  = "change-me-in-production-use-a-long-random-string"
    algorithm:       str  = "HS256"
    access_token_expire_minutes: int = 30

    # Logging
    log_level:       str  = "INFO"
    log_file:        str  = "logs/taskflow.log"

    # Queue
    max_workers:     int  = 4

    def is_production(self) -> bool:
        return not self.debug

    def validate_secrets(self) -> None:
        if self.is_production() and self.secret_key == "change-me-in-production-use-a-long-random-string":
            raise RuntimeError("SECRET_KEY must be changed in production.")


@lru_cache
def get_settings() -> Settings:
    """Return cached settings — called once, reused everywhere."""
    return Settings()

Logging

# src/taskflow/logging_config.py
import logging
import logging.handlers
import sys
from pathlib import Path
from taskflow.config import get_settings


def setup_logging() -> None:
    settings = get_settings()
    log_path = Path(settings.log_file)
    log_path.parent.mkdir(parents=True, exist_ok=True)

    fmt = "%(asctime)s %(levelname)-8s %(name)-30s %(message)s"

    handlers: list[logging.Handler] = [
        logging.StreamHandler(sys.stdout),
        logging.handlers.RotatingFileHandler(
            log_path,
            maxBytes    = 10 * 1024 * 1024,  # 10 MB
            backupCount = 5,
            encoding    = "utf-8",
        ),
    ]

    logging.basicConfig(
        level    = getattr(logging, settings.log_level.upper()),
        format   = fmt,
        datefmt  = "%Y-%m-%d %H:%M:%S",
        handlers = handlers,
        force    = True,
    )

    # Reduce noise from libraries
    for noisy in ("uvicorn.access", "sqlalchemy.engine", "passlib"):
        logging.getLogger(noisy).setLevel(logging.WARNING)


log = logging.getLogger(__name__)

Database

# src/taskflow/database.py
from sqlalchemy import create_engine, event
from sqlalchemy.orm import DeclarativeBase, sessionmaker, Session
from taskflow.config import get_settings

settings = get_settings()

connect_args = {}
if settings.database_url.startswith("sqlite"):
    connect_args["check_same_thread"] = False

engine = create_engine(
    settings.database_url,
    connect_args = connect_args,
    pool_pre_ping = True,   # detect stale connections
    echo          = settings.debug,
)

# Enable WAL mode for SQLite (better concurrent reads)
if settings.database_url.startswith("sqlite"):
    @event.listens_for(engine, "connect")
    def set_sqlite_pragma(dbapi_conn, _):
        cursor = dbapi_conn.cursor()
        cursor.execute("PRAGMA journal_mode=WAL")
        cursor.execute("PRAGMA foreign_keys=ON")
        cursor.close()

SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)


class Base(DeclarativeBase):
    pass


def get_db() -> Session:
    """FastAPI dependency — yields a database session."""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Models

# src/taskflow/models/user.py
from __future__ import annotations
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.orm import relationship
from taskflow.database import Base


class User(Base):
    __tablename__ = "users"

    id           = Column(Integer, primary_key=True, index=True)
    username     = Column(String(50),  unique=True, nullable=False, index=True)
    email        = Column(String(200), unique=True, nullable=False, index=True)
    hashed_password = Column(String(200), nullable=False)
    full_name    = Column(String(200), default="")
    is_active    = Column(Boolean, default=True)
    is_admin     = Column(Boolean, default=False)
    created_at   = Column(DateTime, default=datetime.utcnow)
    updated_at   = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    tasks = relationship("Task", back_populates="owner",
                         cascade="all, delete-orphan")

    def __repr__(self) -> str:
        return f"User(id={self.id}, username={self.username!r})"
# src/taskflow/models/task.py
from __future__ import annotations
from datetime import datetime
from sqlalchemy import (
    Boolean, Column, DateTime, Enum, ForeignKey,
    Integer, String, Text
)
from sqlalchemy.orm import relationship
import enum
from taskflow.database import Base


class Priority(str, enum.Enum):
    low    = "low"
    medium = "medium"
    high   = "high"
    urgent = "urgent"


class Status(str, enum.Enum):
    todo        = "todo"
    in_progress = "in_progress"
    done        = "done"
    cancelled   = "cancelled"


class Task(Base):
    __tablename__ = "tasks"

    id          = Column(Integer, primary_key=True, index=True)
    title       = Column(String(200), nullable=False)
    description = Column(Text, default="")
    priority    = Column(Enum(Priority), default=Priority.medium, nullable=False)
    status      = Column(Enum(Status),   default=Status.todo,    nullable=False)
    due_date    = Column(DateTime, nullable=True)
    completed_at= Column(DateTime, nullable=True)
    owner_id    = Column(Integer, ForeignKey("users.id"), nullable=False)
    created_at  = Column(DateTime, default=datetime.utcnow)
    updated_at  = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    owner = relationship("User", back_populates="tasks")

    def __repr__(self) -> str:
        return f"Task(id={self.id}, title={self.title!r}, status={self.status})"

Schemas (Pydantic)

# src/taskflow/schemas/user.py
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field, field_validator


class UserCreate(BaseModel):
    username:  str = Field(..., min_length=3, max_length=50,
                           pattern=r"^[a-zA-Z0-9_]+$")
    email:     EmailStr
    password:  str = Field(..., min_length=8)
    full_name: str = Field("", max_length=200)

    @field_validator("password")
    @classmethod
    def password_strength(cls, v: str) -> str:
        if not any(c.isupper() for c in v):
            raise ValueError("Password must contain an uppercase letter.")
        if not any(c.isdigit() for c in v):
            raise ValueError("Password must contain a digit.")
        return v


class UserUpdate(BaseModel):
    full_name: str | None = Field(None, max_length=200)
    email:     EmailStr | None = None


class UserOut(BaseModel):
    id:         int
    username:   str
    email:      str
    full_name:  str
    is_active:  bool
    created_at: datetime

    model_config = {"from_attributes": True}


class UserOutWithStats(UserOut):
    task_count: int = 0
# src/taskflow/schemas/task.py
from __future__ import annotations
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from taskflow.models.task import Priority, Status


class TaskCreate(BaseModel):
    title:       str      = Field(..., min_length=1, max_length=200)
    description: str      = Field("", max_length=5000)
    priority:    Priority = Priority.medium
    due_date:    Optional[datetime] = None


class TaskUpdate(BaseModel):
    title:       Optional[str]      = Field(None, min_length=1, max_length=200)
    description: Optional[str]      = Field(None, max_length=5000)
    priority:    Optional[Priority] = None
    status:      Optional[Status]   = None
    due_date:    Optional[datetime] = None


class TaskOut(BaseModel):
    id:           int
    title:        str
    description:  str
    priority:     Priority
    status:       Status
    due_date:     Optional[datetime]
    completed_at: Optional[datetime]
    owner_id:     int
    created_at:   datetime
    updated_at:   datetime

    model_config = {"from_attributes": True}

Services

# src/taskflow/services/auth_service.py
from __future__ import annotations
from datetime import datetime, timedelta
import logging
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from taskflow.config import get_settings
from taskflow.models.user import User

log      = logging.getLogger(__name__)
settings = get_settings()
pwd_ctx  = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_ctx.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_ctx.verify(plain, hashed)


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    payload = data.copy()
    expire  = datetime.utcnow() + (
        expires_delta or timedelta(minutes=settings.access_token_expire_minutes)
    )
    payload["exp"] = expire
    return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)


def decode_token(token: str) -> dict:
    try:
        return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
    except JWTError as e:
        log.warning("Token decode failed: %s", e)
        raise


def authenticate_user(db: Session, username: str, password: str) -> User | None:
    user = db.query(User).filter(User.username == username).first()
    if not user or not verify_password(password, user.hashed_password):
        log.warning("Failed login attempt for username=%r", username)
        return None
    log.info("User authenticated: %s", username)
    return user


def create_user(db: Session, username: str, email: str,
                password: str, full_name: str = "") -> User:
    user = User(
        username        = username,
        email           = email,
        hashed_password = hash_password(password),
        full_name       = full_name,
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    log.info("User created: %s (id=%d)", username, user.id)
    return user
# src/taskflow/services/task_service.py
from __future__ import annotations
from datetime import datetime
import logging
from typing import Optional
from sqlalchemy.orm import Session
from taskflow.models.task import Task, Status
from taskflow.schemas.task import TaskCreate, TaskUpdate

log = logging.getLogger(__name__)


def create_task(db: Session, payload: TaskCreate, owner_id: int) -> Task:
    task = Task(**payload.model_dump(), owner_id=owner_id)
    db.add(task)
    db.commit()
    db.refresh(task)
    log.info("Task created: id=%d owner=%d", task.id, owner_id)
    return task


def get_task(db: Session, task_id: int, owner_id: int) -> Optional[Task]:
    return db.query(Task).filter(
        Task.id == task_id,
        Task.owner_id == owner_id,
    ).first()


def list_tasks(
    db:       Session,
    owner_id: int,
    status:   Optional[Status] = None,
    priority: Optional[str]    = None,
    skip:     int = 0,
    limit:    int = 50,
) -> list[Task]:
    q = db.query(Task).filter(Task.owner_id == owner_id)
    if status:
        q = q.filter(Task.status == status)
    if priority:
        q = q.filter(Task.priority == priority)
    return q.order_by(Task.created_at.desc()).offset(skip).limit(limit).all()


def update_task(db: Session, task: Task, payload: TaskUpdate) -> Task:
    updates = payload.model_dump(exclude_unset=True)
    for field, value in updates.items():
        setattr(task, field, value)
    # Auto-set completed_at when status becomes "done"
    if updates.get("status") == Status.done and task.completed_at is None:
        task.completed_at = datetime.utcnow()
    elif updates.get("status") and updates["status"] != Status.done:
        task.completed_at = None
    task.updated_at = datetime.utcnow()
    db.commit()
    db.refresh(task)
    log.info("Task updated: id=%d", task.id)
    return task


def delete_task(db: Session, task: Task) -> None:
    db.delete(task)
    db.commit()
    log.info("Task deleted: id=%d", task.id)


def get_stats(db: Session, owner_id: int) -> dict:
    from sqlalchemy import func
    from taskflow.models.task import Priority

    rows = (
        db.query(Task.status, Task.priority, func.count(Task.id))
        .filter(Task.owner_id == owner_id)
        .group_by(Task.status, Task.priority)
        .all()
    )
    stats: dict = {"total": 0, "by_status": {}, "by_priority": {}}
    for status, priority, count in rows:
        stats["total"] += count
        stats["by_status"][status]     = stats["by_status"].get(status, 0) + count
        stats["by_priority"][priority] = stats["by_priority"].get(priority, 0) + count
    return stats

Dependencies

# src/taskflow/dependencies.py
from __future__ import annotations
import logging
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from sqlalchemy.orm import Session
from taskflow.database import get_db
from taskflow.models.user import User
from taskflow.services.auth_service import decode_token

log           = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")


def get_current_user(
    token: str     = Depends(oauth2_scheme),
    db:    Session = Depends(get_db),
) -> User:
    credentials_exception = HTTPException(
        status_code = status.HTTP_401_UNAUTHORIZED,
        detail      = "Could not validate credentials",
        headers     = {"WWW-Authenticate": "Bearer"},
    )
    try:
        payload  = decode_token(token)
        username = payload.get("sub")
        if not username:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = db.query(User).filter(User.username == username).first()
    if not user:
        raise credentials_exception
    if not user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return user


def get_admin_user(current_user: User = Depends(get_current_user)) -> User:
    if not current_user.is_admin:
        raise HTTPException(status_code=403, detail="Admin access required")
    return current_user

Routers

# src/taskflow/routers/auth.py
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from taskflow.database import get_db
from taskflow.schemas.user import UserCreate, UserOut
from taskflow.services import auth_service

router = APIRouter(prefix="/auth", tags=["Authentication"])


@router.post("/register", response_model=UserOut, status_code=201)
def register(payload: UserCreate, db: Session = Depends(get_db)):
    """Register a new user account."""
    if db.query(auth_service.User).filter_by(username=payload.username).first():
        raise HTTPException(status_code=400, detail="Username already taken")
    if db.query(auth_service.User).filter_by(email=payload.email).first():
        raise HTTPException(status_code=400, detail="Email already registered")
    user = auth_service.create_user(
        db, payload.username, payload.email,
        payload.password, payload.full_name
    )
    return user


@router.post("/token")
def login(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    """Obtain a JWT access token."""
    user = auth_service.authenticate_user(db, form.username, form.password)
    if not user:
        raise HTTPException(
            status_code = status.HTTP_401_UNAUTHORIZED,
            detail      = "Incorrect username or password",
            headers     = {"WWW-Authenticate": "Bearer"},
        )
    token = auth_service.create_access_token({"sub": user.username})
    return {"access_token": token, "token_type": "bearer"}
# src/taskflow/routers/tasks.py
from __future__ import annotations
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from taskflow.database import get_db
from taskflow.dependencies import get_current_user
from taskflow.models.task import Priority, Status
from taskflow.models.user import User
from taskflow.schemas.task import TaskCreate, TaskOut, TaskUpdate
from taskflow.services import task_service

router = APIRouter(prefix="/tasks", tags=["Tasks"])


@router.get("/", response_model=list[TaskOut])
def list_tasks(
    task_status: Optional[Status]   = Query(None, alias="status"),
    priority:    Optional[Priority] = None,
    skip:        int                = Query(0, ge=0),
    limit:       int                = Query(50, ge=1, le=200),
    current_user: User              = Depends(get_current_user),
    db:           Session           = Depends(get_db),
):
    """List all tasks for the current user."""
    return task_service.list_tasks(
        db, current_user.id, task_status, priority, skip, limit
    )


@router.get("/stats")
def task_stats(
    current_user: User    = Depends(get_current_user),
    db:           Session = Depends(get_db),
):
    """Return task statistics for the current user."""
    return task_service.get_stats(db, current_user.id)


@router.get("/{task_id}", response_model=TaskOut)
def get_task(
    task_id:      int,
    current_user: User    = Depends(get_current_user),
    db:           Session = Depends(get_db),
):
    """Get a specific task by ID."""
    task = task_service.get_task(db, task_id, current_user.id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task


@router.post("/", response_model=TaskOut, status_code=201)
def create_task(
    payload:      TaskCreate,
    current_user: User    = Depends(get_current_user),
    db:           Session = Depends(get_db),
):
    """Create a new task."""
    return task_service.create_task(db, payload, current_user.id)


@router.patch("/{task_id}", response_model=TaskOut)
def update_task(
    task_id:      int,
    payload:      TaskUpdate,
    current_user: User    = Depends(get_current_user),
    db:           Session = Depends(get_db),
):
    """Partially update a task."""
    task = task_service.get_task(db, task_id, current_user.id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task_service.update_task(db, task, payload)


@router.delete("/{task_id}", status_code=204)
def delete_task(
    task_id:      int,
    current_user: User    = Depends(get_current_user),
    db:           Session = Depends(get_db),
):
    """Delete a task."""
    task = task_service.get_task(db, task_id, current_user.id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    task_service.delete_task(db, task)

Background Queue

# src/taskflow/services/queue.py
"""
A simple in-process background task queue using threading.
For production, replace with Celery + Redis or RQ.
"""
from __future__ import annotations
import logging
import queue
import threading
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable
from uuid import uuid4

log = logging.getLogger(__name__)


@dataclass
class Job:
    id:         str      = field(default_factory=lambda: str(uuid4()))
    func:       Callable = field(repr=False)
    args:       tuple    = ()
    kwargs:     dict     = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    status:     str      = "pending"
    result:     Any      = None
    error:      str | None = None


class TaskQueue:
    """Thread-pool backed task queue."""

    def __init__(self, workers: int = 4):
        self._queue   = queue.Queue()
        self._jobs: dict[str, Job] = {}
        self._lock    = threading.Lock()
        self._workers = [
            threading.Thread(target=self._worker, daemon=True, name=f"worker-{i}")
            for i in range(workers)
        ]
        for w in self._workers:
            w.start()
        log.info("TaskQueue started with %d workers", workers)

    def enqueue(self, func: Callable, *args, **kwargs) -> str:
        job = Job(func=func, args=args, kwargs=kwargs)
        with self._lock:
            self._jobs[job.id] = job
        self._queue.put(job)
        log.debug("Enqueued job %s: %s", job.id, func.__name__)
        return job.id

    def get_job(self, job_id: str) -> Job | None:
        with self._lock:
            return self._jobs.get(job_id)

    def _worker(self) -> None:
        while True:
            job: Job = self._queue.get()
            log.debug("Worker %s processing job %s", threading.current_thread().name, job.id)
            try:
                with self._lock:
                    job.status = "running"
                result = job.func(*job.args, **job.kwargs)
                with self._lock:
                    job.status = "done"
                    job.result = result
                log.info("Job %s completed", job.id)
            except Exception as e:
                with self._lock:
                    job.status = "failed"
                    job.error  = str(e)
                log.error("Job %s failed: %s", job.id, e)
            finally:
                self._queue.task_done()


# Singleton queue — shared across the app
_queue: TaskQueue | None = None


def get_queue() -> TaskQueue:
    global _queue
    if _queue is None:
        from taskflow.config import get_settings
        _queue = TaskQueue(workers=get_settings().max_workers)
    return _queue

Main Application

# src/taskflow/main.py
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from taskflow.config import get_settings
from taskflow.database import Base, engine
from taskflow.logging_config import setup_logging
from taskflow.routers import auth, tasks
from taskflow.services.queue import get_queue

setup_logging()
log      = logging.getLogger(__name__)
settings = get_settings()


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Run on startup and shutdown."""
    log.info("Starting %s v%s", settings.app_name, settings.app_version)
    Base.metadata.create_all(bind=engine)
    get_queue()   # initialize background queue
    yield
    log.info("Shutting down %s", settings.app_name)


app = FastAPI(
    title       = settings.app_name,
    version     = settings.app_version,
    description = "A production-ready task management API.",
    lifespan    = lifespan,
    docs_url    = "/docs",
    redoc_url   = "/redoc",
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins     = ["*"],
    allow_credentials = True,
    allow_methods     = ["*"],
    allow_headers     = ["*"],
)

# Routers
app.include_router(auth.router)
app.include_router(tasks.router)


@app.get("/", tags=["Health"])
def root():
    return {"name": settings.app_name, "version": settings.app_version, "status": "ok"}


@app.get("/health", tags=["Health"])
def health():
    return {"status": "healthy"}


@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    log.error("Unhandled exception on %s: %s", request.url, exc, exc_info=True)
    return JSONResponse(status_code=500, content={"detail": "Internal server error"})

Tests

# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from taskflow.database import Base, get_db
from taskflow.main import app
from taskflow.services.auth_service import create_user

TEST_DB_URL = "sqlite:///./test.db"
engine      = create_engine(TEST_DB_URL, connect_args={"check_same_thread": False})
TestSession = sessionmaker(bind=engine)


@pytest.fixture(autouse=True)
def setup_db():
    Base.metadata.create_all(engine)
    yield
    Base.metadata.drop_all(engine)


@pytest.fixture
def db():
    session = TestSession()
    try:
        yield session
    finally:
        session.close()


@pytest.fixture
def client(db):
    def override_get_db():
        try:
            yield db
        finally:
            pass
    app.dependency_overrides[get_db] = override_get_db
    with TestClient(app) as c:
        yield c
    app.dependency_overrides.clear()


@pytest.fixture
def test_user(db):
    return create_user(db, "testuser", "test@example.com", "Password1", "Test User")


@pytest.fixture
def auth_headers(client, test_user):
    response = client.post("/auth/token", data={
        "username": "testuser",
        "password": "Password1",
    })
    token = response.json()["access_token"]
    return {"Authorization": f"Bearer {token}"}
# tests/test_auth.py
import pytest


def test_register_success(client):
    r = client.post("/auth/register", json={
        "username": "alice",
        "email":    "alice@example.com",
        "password": "Password1",
    })
    assert r.status_code == 201
    data = r.json()
    assert data["username"] == "alice"
    assert "hashed_password" not in data


def test_register_duplicate_username(client, test_user):
    r = client.post("/auth/register", json={
        "username": "testuser",
        "email":    "other@example.com",
        "password": "Password1",
    })
    assert r.status_code == 400


def test_login_success(client, test_user):
    r = client.post("/auth/token", data={
        "username": "testuser",
        "password": "Password1",
    })
    assert r.status_code == 200
    assert "access_token" in r.json()
    assert r.json()["token_type"] == "bearer"


def test_login_wrong_password(client, test_user):
    r = client.post("/auth/token", data={
        "username": "testuser",
        "password": "WrongPassword1",
    })
    assert r.status_code == 401


def test_protected_route_without_token(client):
    r = client.get("/tasks/")
    assert r.status_code == 401


def test_protected_route_with_token(client, auth_headers):
    r = client.get("/tasks/", headers=auth_headers)
    assert r.status_code == 200
# tests/test_tasks.py
import pytest


@pytest.fixture
def created_task(client, auth_headers):
    r = client.post("/tasks/", headers=auth_headers, json={
        "title":    "Write tests",
        "priority": "high",
    })
    assert r.status_code == 201
    return r.json()


def test_create_task(client, auth_headers):
    r = client.post("/tasks/", headers=auth_headers, json={
        "title":       "Learn Python",
        "description": "Read the book",
        "priority":    "high",
    })
    assert r.status_code == 201
    data = r.json()
    assert data["title"]    == "Learn Python"
    assert data["priority"] == "high"
    assert data["status"]   == "todo"


def test_list_tasks_empty(client, auth_headers):
    r = client.get("/tasks/", headers=auth_headers)
    assert r.status_code == 200
    assert r.json() == []


def test_list_tasks_with_filter(client, auth_headers, created_task):
    r = client.get("/tasks/?status=todo", headers=auth_headers)
    assert r.status_code == 200
    assert len(r.json()) == 1

    r = client.get("/tasks/?status=done", headers=auth_headers)
    assert r.status_code == 200
    assert len(r.json()) == 0


def test_get_task(client, auth_headers, created_task):
    r = client.get(f"/tasks/{created_task['id']}", headers=auth_headers)
    assert r.status_code == 200
    assert r.json()["id"] == created_task["id"]


def test_get_task_not_found(client, auth_headers):
    r = client.get("/tasks/99999", headers=auth_headers)
    assert r.status_code == 404


def test_update_task_status(client, auth_headers, created_task):
    r = client.patch(
        f"/tasks/{created_task['id']}",
        headers=auth_headers,
        json={"status": "done"},
    )
    assert r.status_code == 200
    data = r.json()
    assert data["status"]       == "done"
    assert data["completed_at"] is not None


def test_delete_task(client, auth_headers, created_task):
    r = client.delete(f"/tasks/{created_task['id']}", headers=auth_headers)
    assert r.status_code == 204

    r = client.get(f"/tasks/{created_task['id']}", headers=auth_headers)
    assert r.status_code == 404


def test_task_stats(client, auth_headers):
    client.post("/tasks/", headers=auth_headers, json={"title": "Task A", "priority": "high"})
    client.post("/tasks/", headers=auth_headers, json={"title": "Task B", "priority": "low"})
    client.post("/tasks/", headers=auth_headers, json={"title": "Task C", "priority": "high"})

    r = client.get("/tasks/stats", headers=auth_headers)
    assert r.status_code == 200
    stats = r.json()
    assert stats["total"] == 3
    assert stats["by_status"]["todo"] == 3
    assert stats["by_priority"]["high"] == 2

Dockerfile

# Dockerfile
FROM python:3.12-slim

WORKDIR /app

# Install dependencies first (cached layer)
COPY pyproject.toml .
RUN pip install --no-cache-dir build && \
    pip install --no-cache-dir -e ".[prod]"

# Copy application code
COPY src/ src/

# Non-root user for security
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

EXPOSE 8000

CMD ["uvicorn", "taskflow.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: "3.9"
services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=sqlite:///./taskflow.db
      - SECRET_KEY=${SECRET_KEY}
      - LOG_LEVEL=INFO
    volumes:
      - ./data:/app/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

GitHub Actions CI/CD

# .github/workflows/ci.yml
name: CI

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.11", "3.12"]

    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}

      - name: Install dependencies
        run: pip install -e ".[dev]"

      - name: Lint
        run: ruff check src/ tests/

      - name: Type check
        run: mypy src/taskflow

      - name: Test
        run: pytest tests/ --cov=src/taskflow --cov-report=xml --cov-fail-under=85

      - name: Upload coverage
        uses: codecov/codecov-action@v4
        with:
          file: ./coverage.xml

  docker:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
      - uses: actions/checkout@v4

      - name: Build Docker image
        run: docker build -t taskflow:latest .

      - name: Smoke test
        run: |
          docker run -d -p 8000:8000 --name taskflow_test \
            -e SECRET_KEY=test-secret-key taskflow:latest
          sleep 5
          curl -f http://localhost:8000/health
          docker stop taskflow_test

pyproject.toml

[build-system]
requires      = ["hatchling"]
build-backend = "hatchling.build"

[project]
name            = "taskflow"
version         = "1.0.0"
description     = "Production-ready task management API"
requires-python = ">=3.11"
dependencies    = [
    "fastapi>=0.110",
    "uvicorn[standard]>=0.29",
    "sqlalchemy>=2.0",
    "alembic>=1.13",
    "pydantic>=2.6",
    "pydantic-settings>=2.2",
    "python-jose[cryptography]>=3.3",
    "passlib[bcrypt]>=1.7",
    "python-multipart>=0.0.9",
]

[project.optional-dependencies]
dev = [
    "pytest>=8",
    "pytest-cov>=5",
    "httpx>=0.27",
    "mypy>=1.9",
    "ruff>=0.3",
]
prod = [
    "gunicorn>=22",
]

[project.scripts]
taskflow = "taskflow.main:app"

[tool.hatch.build.targets.wheel]
packages = ["src/taskflow"]

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts   = "-v --tb=short"

[tool.mypy]
python_version         = "3.12"
strict                 = true
ignore_missing_imports = true

[tool.ruff.lint]
select = ["E", "F", "I", "N", "UP"]

Running the Application

# 1. Clone and set up
git clone https://github.com/you/taskflow
cd taskflow
python -m venv .venv
source .venv/bin/activate          # Windows: .venv\Scripts\activate
pip install -e ".[dev]"

# 2. Configure
cp .env.example .env
# Edit .env — set SECRET_KEY to a long random string

# 3. Run
uvicorn taskflow.main:app --reload
# -> http://localhost:8000/docs  (Swagger UI)
# -> http://localhost:8000/redoc (ReDoc)

# 4. Test
pytest tests/ -v --cov=src/taskflow

# 5. Docker
docker-compose up --build

Visit http://localhost:8000/docs — you'll see the full interactive API:

  1. Click POST /auth/register -> register a user
  2. Click POST /auth/token -> log in, copy the token
  3. Click the Authorize button -> paste the token
  4. Now all task endpoints are accessible

What This Project Used

Every major topic from this book appeared in TaskFlow:

Chapter Used in TaskFlow
Variables, types, f-strings Throughout
Conditionals, loops Services, queue worker
Functions, scope All service functions
Classes, OOP Models, schemas, services
Inheritance, polymorphism SQLAlchemy model hierarchy
Special methods __repr__ on all models
Error handling All routers, auth service
Comprehensions Stats aggregation, list filtering
Modules, packages Full package structure
Standard library logging, threading, queue, datetime, uuid
Type hints Every function signature
Dataclasses Job dataclass in queue
Decorators @lru_cache, @field_validator, @asynccontextmanager
Generators SQLAlchemy yield sessions
Context managers get_db() yield, lifespan()
Functional programming model_dump(exclude_unset=True)
Concurrency threading.Thread worker pool
Testing Full pytest suite with fixtures
Databases SQLAlchemy ORM + SQLite
Web development FastAPI with Pydantic
Packaging pyproject.toml
Professional practices Logging, env vars, clean code

Where to Go From Here

You've finished the book. Here's what to do next:

Deploy TaskFlow. Take this exact project, set DATABASE_URL to a real PostgreSQL instance (free tier on Supabase or Railway), add a SECRET_KEY, and run docker-compose up. You'll have a live API in 10 minutes.

Add features. Tags on tasks. Task assignments (multiple users). Email notifications when tasks are due. A dashboard page with charts using Chart.js. All of these use skills from this book.

Read excellent code. The best way to improve is to read code written by expert Python developers. Study the source of fastapi, pydantic, httpx, and rich. Each one teaches you something new.

Contribute to open source. Find a project you use, fix a bug, improve the docs, add a test. Your first PR is the hardest. After that, it becomes normal.

Build something real. Every skill in this book was taught through building — and building is how you retain it. Pick one idea you genuinely care about and build it completely. Not a tutorial project. Something you'd actually use.

Python is a tool. What matters is what you build with it.

Go build something worth building.

Final Summary: What You Now Know

You started with print("Hello, World!"). You ended with a production API, complete with authentication, a background queue, structured logging, a test suite, and Docker deployment.

Along the way you learned:

  • The fundamentals — variables, operators, conditionals, loops, functions, files, error handling
  • Data structures — lists, tuples, sets, dictionaries, strings — and when to use each
  • Object-oriented programming — classes, inheritance, polymorphism, encapsulation, special methods, properties, dataclasses
  • Functional programming — pure functions, closures, decorators, generators, context managers, map/filter/reduce, functools
  • Advanced types — type hints, TypeVar, Generic, Protocol, NamedTuple, TypedDict
  • Concurrency — threading, multiprocessing, asyncio — and when to use each
  • Testing — pytest, fixtures, parametrize, mocking, coverage
  • Debuggingpdb, breakpoint(), logging, tracebacks, profiling
  • Performancetimeit, cProfile, algorithmic complexity, numpy
  • Databases — SQLite, PostgreSQL, SQLAlchemy, migrations
  • Web development — Flask, FastAPI, REST, JWT authentication
  • Data science — numpy, pandas, matplotlib, scikit-learn
  • Packagingpyproject.toml, building, publishing to PyPI
  • Professional practices — style, docstrings, code review, git, environment variables
  • Internals — bytecode, the GIL, memory management, descriptors, metaclasses
  • Metaprogramming — class factories, exec/eval, dynamic attribute generation

You are no longer a beginner. You are a Python developer.

The language is the same. What you do with it — that's up to you.

© 2026 Abhilash Sahoo. Python: Zero to Hero.