Python: Zero to Hero
Home/Advanced Python
Share

Chapter 31: Working with Databases

Files store data. But as soon as you need to search, filter, sort, or relate data across multiple collections, files become painful. Databases are purpose-built for exactly that.

This chapter covers three layers of database access in Python:

  1. SQLite — built-in, zero setup, perfect for learning and small apps
  2. PostgreSQL with psycopg2 — production-grade relational database
  3. SQLAlchemy — the standard Python ORM, lets you work with databases using Python objects instead of raw SQL

You'll understand SQL well enough to write real queries, and know when to use raw SQL vs an ORM.

Why Databases?

Consider storing 100,000 users in a CSV file. Finding a user by email means reading every line — O(n). Sorting by last name means loading everything into memory. Joining user data with their orders means matching records manually in Python.

A database does all of this efficiently:

  • Indexes make lookups O(log n) or O(1)
  • Queries happen in the database engine, not Python memory
  • Relationships are enforced by the schema
  • Multiple programs can read and write safely at the same time
  • Transactions ensure your data is always consistent

SQL Basics

SQL (Structured Query Language) is the language all relational databases use. The same core syntax works in SQLite, PostgreSQL, MySQL, and others.

The four fundamental operations — CRUD:

-- Create
INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);

-- Read
SELECT name, email FROM users WHERE age > 25 ORDER BY name;

-- Update
UPDATE users SET age = 31 WHERE email = 'alice@example.com';

-- Delete
DELETE FROM users WHERE email = 'alice@example.com';

Other essential SQL:

-- Create a table
CREATE TABLE users (
    id    INTEGER PRIMARY KEY AUTOINCREMENT,
    name  TEXT    NOT NULL,
    email TEXT    UNIQUE NOT NULL,
    age   INTEGER DEFAULT 0,
    created_at TEXT DEFAULT CURRENT_TIMESTAMP
);

-- Count rows
SELECT COUNT(*) FROM users;

-- Aggregate functions
SELECT AVG(age), MIN(age), MAX(age) FROM users;

-- Group and aggregate
SELECT age, COUNT(*) AS count FROM users GROUP BY age ORDER BY age;

-- Filter groups
SELECT age, COUNT(*) AS count FROM users
GROUP BY age
HAVING COUNT(*) > 1;

-- Join two tables
SELECT users.name, orders.total
FROM users
JOIN orders ON users.id = orders.user_id
WHERE orders.total > 100;

-- Left join — include users with no orders
SELECT users.name, orders.total
FROM users
LEFT JOIN orders ON users.id = orders.user_id;

SQLite — The Built-in Database

SQLite is included with Python — no installation needed. Data lives in a single .db file. Perfect for prototypes, local apps, and learning.

Connecting and creating tables

import sqlite3

# Connect — creates the file if it doesn't exist
conn = sqlite3.connect("myapp.db")

# Use a context manager for automatic commit/rollback
with conn:
    conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            name       TEXT    NOT NULL,
            email      TEXT    UNIQUE NOT NULL,
            age        INTEGER DEFAULT 0,
            created_at TEXT    DEFAULT (datetime('now'))
        )
    """)

conn.close()

Inserting data safely

Always use parameterized queries — never format user input directly into SQL. Direct formatting opens the door to SQL injection attacks.

conn = sqlite3.connect("myapp.db")

# WRONG — SQL injection vulnerability
name = "Alice'; DROP TABLE users; --"
conn.execute(f"INSERT INTO users (name, email) VALUES ('{name}', 'a@b.com')")

# RIGHT — parameterized query (? placeholders)
conn.execute(
    "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
    ("Alice", "alice@example.com", 30)
)
conn.commit()

# Insert many rows at once
users = [
    ("Bob",    "bob@example.com",    25),
    ("Carlos", "carlos@example.com", 35),
    ("Diana",  "diana@example.com",  28),
]
conn.executemany(
    "INSERT OR IGNORE INTO users (name, email, age) VALUES (?, ?, ?)",
    users
)
conn.commit()
conn.close()

Querying data

conn = sqlite3.connect("myapp.db")

# Make rows behave like dicts
conn.row_factory = sqlite3.Row

cursor = conn.execute("SELECT * FROM users ORDER BY name")

for row in cursor:
    print(f"{row['name']:<15} {row['email']:<30} age={row['age']}")

# Fetch methods
cursor = conn.execute("SELECT * FROM users WHERE age > ?", (27,))
one    = cursor.fetchone()    # first row or None
some   = cursor.fetchmany(3)  # list of up to 3 rows
all_   = cursor.fetchall()    # list of all remaining rows

# Parameterized query with named placeholders
cursor = conn.execute(
    "SELECT * FROM users WHERE name = :name AND age >= :min_age",
    {"name": "Alice", "min_age": 18}
)

conn.close()

Transactions

A transaction groups multiple statements into an atomic unit. Either all succeed or none do. SQLite uses transactions automatically, but you control them explicitly with commit() and rollback():

conn = sqlite3.connect("myapp.db")

try:
    conn.execute("INSERT INTO accounts (user_id, balance) VALUES (1, 1000)")
    conn.execute("INSERT INTO accounts (user_id, balance) VALUES (2, 500)")

    # Transfer $200 from account 1 to account 2
    conn.execute("UPDATE accounts SET balance = balance - 200 WHERE user_id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 200 WHERE user_id = 2")

    conn.commit()   # both updates succeed together
except Exception as e:
    conn.rollback()  # both updates undone if anything fails
    raise
finally:
    conn.close()

A full SQLite example

"""
contacts.py — A simple contact book backed by SQLite.
"""
import sqlite3
from pathlib import Path
from typing import Optional


DB_PATH = Path("contacts.db")


def get_connection() -> sqlite3.Connection:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA foreign_keys = ON")   # enforce FK constraints
    conn.execute("PRAGMA journal_mode = WAL")  # better concurrent access
    return conn


def setup_schema(conn: sqlite3.Connection) -> None:
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS contacts (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            name       TEXT    NOT NULL,
            email      TEXT    UNIQUE,
            phone      TEXT,
            notes      TEXT    DEFAULT '',
            created_at TEXT    DEFAULT (datetime('now'))
        );

        CREATE INDEX IF NOT EXISTS idx_contacts_name
            ON contacts(name COLLATE NOCASE);
    """)
    conn.commit()


def add_contact(conn, name: str, email: str = None,
                phone: str = None, notes: str = "") -> int:
    cursor = conn.execute(
        "INSERT INTO contacts (name, email, phone, notes) VALUES (?, ?, ?, ?)",
        (name, email, phone, notes)
    )
    conn.commit()
    return cursor.lastrowid   # ID of the newly inserted row


def find_by_name(conn, query: str) -> list:
    return conn.execute(
        "SELECT * FROM contacts WHERE name LIKE ? ORDER BY name",
        (f"%{query}%",)
    ).fetchall()


def find_by_id(conn, contact_id: int) -> Optional[sqlite3.Row]:
    return conn.execute(
        "SELECT * FROM contacts WHERE id = ?", (contact_id,)
    ).fetchone()


def update_contact(conn, contact_id: int, **fields) -> bool:
    if not fields:
        return False
    allowed = {"name", "email", "phone", "notes"}
    updates = {k: v for k, v in fields.items() if k in allowed}
    if not updates:
        return False
    set_clause = ", ".join(f"{k} = ?" for k in updates)
    cursor = conn.execute(
        f"UPDATE contacts SET {set_clause} WHERE id = ?",
        [*updates.values(), contact_id]
    )
    conn.commit()
    return cursor.rowcount > 0


def delete_contact(conn, contact_id: int) -> bool:
    cursor = conn.execute("DELETE FROM contacts WHERE id = ?", (contact_id,))
    conn.commit()
    return cursor.rowcount > 0


def list_all(conn) -> list:
    return conn.execute("SELECT * FROM contacts ORDER BY name").fetchall()


def count(conn) -> int:
    return conn.execute("SELECT COUNT(*) FROM contacts").fetchone()[0]


# ── Demo ──────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    conn = get_connection()
    setup_schema(conn)

    # Add contacts
    id1 = add_contact(conn, "Alice Smith",  "alice@example.com", "555-0101")
    id2 = add_contact(conn, "Bob Jones",    "bob@example.com",   "555-0102")
    id3 = add_contact(conn, "Alice Walker", "awalker@example.com")

    print(f"Added {count(conn)} contacts\n")

    # Search
    results = find_by_name(conn, "alice")
    print(f"Search 'alice': {len(results)} results")
    for r in results:
        print(f"  [{r['id']}] {r['name']}{r['email']}")

    # Update
    update_contact(conn, id1, phone="555-9999", notes="Updated phone")
    updated = find_by_id(conn, id1)
    print(f"\nUpdated: {updated['name']}{updated['phone']}")

    # Delete
    delete_contact(conn, id2)
    print(f"\nAfter delete: {count(conn)} contacts")

    conn.close()

PostgreSQL with psycopg2

For production applications, you want a full-featured database server. PostgreSQL is the best open-source relational database — it handles large datasets, concurrent users, advanced queries, and full ACID compliance.

pip install psycopg2-binary
import psycopg2
from psycopg2.extras import RealDictCursor

# Connect
conn = psycopg2.connect(
    host     = "localhost",
    port     = 5432,
    dbname   = "myapp",
    user     = "myuser",
    password = "mypassword",
)

# Use RealDictCursor so rows behave like dicts
with conn.cursor(cursor_factory=RealDictCursor) as cur:
    # Create table
    cur.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id          SERIAL PRIMARY KEY,
            name        TEXT    NOT NULL,
            price       NUMERIC(10, 2) NOT NULL,
            stock       INTEGER DEFAULT 0,
            category    TEXT,
            created_at  TIMESTAMPTZ DEFAULT NOW()
        )
    """)

    # Insert — psycopg2 uses %s placeholders
    cur.execute(
        "INSERT INTO products (name, price, stock, category) VALUES (%s, %s, %s, %s)",
        ("Python Book", 29.99, 100, "Education")
    )

    # Insert many
    products = [
        ("Laptop",   999.99, 50,  "Electronics"),
        ("Keyboard",  49.99, 200, "Electronics"),
        ("Notebook",   4.99, 500, "Stationery"),
    ]
    cur.executemany(
        "INSERT INTO products (name, price, stock, category) VALUES (%s, %s, %s, %s)",
        products
    )

    # Query
    cur.execute("SELECT * FROM products WHERE price < %s ORDER BY price", (100,))
    for row in cur.fetchall():
        print(f"{row['name']}: ${row['price']}")

    conn.commit()

conn.close()

Connection pooling with psycopg2.pool

Opening a new database connection for every request is expensive — it takes ~100ms. Connection pooling maintains a pool of open connections and reuses them:

from psycopg2 import pool

connection_pool = pool.ThreadedConnectionPool(
    minconn = 2,
    maxconn = 10,
    host    = "localhost",
    dbname  = "myapp",
    user    = "myuser",
    password= "mypassword",
)

def get_user(user_id: int) -> dict:
    conn = connection_pool.getconn()
    try:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            return cur.fetchone()
    finally:
        connection_pool.putconn(conn)   # return to pool, not close

SQLAlchemy — The Python ORM

Writing raw SQL for every query works, but it's verbose and error-prone for complex apps. SQLAlchemy is Python's most popular ORM (Object-Relational Mapper). It lets you define your data model as Python classes and query using Python syntax.

pip install sqlalchemy

Defining models

from sqlalchemy import (
    create_engine, Column, Integer, String, Float, Boolean,
    ForeignKey, DateTime, Text, func
)
from sqlalchemy.orm import DeclarativeBase, relationship, Session
from datetime import datetime


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id         = Column(Integer, primary_key=True, autoincrement=True)
    name       = Column(String(100), nullable=False)
    email      = Column(String(200), unique=True, nullable=False)
    age        = Column(Integer, default=0)
    active     = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)

    # Relationship — one user has many orders
    orders = relationship("Order", back_populates="user", cascade="all, delete-orphan")

    def __repr__(self):
        return f"User(id={self.id}, name={self.name!r}, email={self.email!r})"


class Order(Base):
    __tablename__ = "orders"

    id         = Column(Integer, primary_key=True, autoincrement=True)
    user_id    = Column(Integer, ForeignKey("users.id"), nullable=False)
    total      = Column(Float,   nullable=False)
    status     = Column(String(20), default="pending")
    created_at = Column(DateTime, default=datetime.utcnow)
    notes      = Column(Text, default="")

    # Relationship — each order belongs to one user
    user  = relationship("User", back_populates="orders")

    def __repr__(self):
        return f"Order(id={self.id}, total={self.total}, status={self.status!r})"

Creating the database and session

# SQLite for this example (change the URL for PostgreSQL)
engine = create_engine(
    "sqlite:///shop.db",
    echo=False,   # set True to print all SQL
)

# Create all tables
Base.metadata.create_all(engine)

# Session factory
from sqlalchemy.orm import sessionmaker
SessionLocal = sessionmaker(bind=engine)

CRUD with SQLAlchemy

# ── Create ─────────────────────────────────────────────────────────────────────

with SessionLocal() as session:
    alice = User(name="Alice", email="alice@example.com", age=30)
    bob   = User(name="Bob",   email="bob@example.com",   age=25)
    session.add_all([alice, bob])
    session.flush()   # assigns IDs without committing

    # Add orders for Alice
    session.add(Order(user_id=alice.id, total=59.98, status="completed"))
    session.add(Order(user_id=alice.id, total=129.99, status="pending"))
    session.add(Order(user_id=bob.id,   total=19.99,  status="completed"))

    session.commit()
    print(f"Created: {alice}, {bob}")


# ── Read ───────────────────────────────────────────────────────────────────────

with SessionLocal() as session:
    # Get by primary key
    user = session.get(User, 1)
    print(user)

    # Filter
    active_users = session.query(User).filter(User.active == True).all()
    young_users  = session.query(User).filter(User.age < 30).all()
    alice        = session.query(User).filter_by(email="alice@example.com").first()

    # Order and limit
    top_users = session.query(User).order_by(User.name).limit(10).all()

    # Access related objects (lazy loaded by default)
    for user in session.query(User).all():
        print(f"{user.name}: {len(user.orders)} orders")


# ── Update ─────────────────────────────────────────────────────────────────────

with SessionLocal() as session:
    user = session.query(User).filter_by(email="alice@example.com").first()
    if user:
        user.age = 31
        user.name = "Alice Smith"
        session.commit()
        print(f"Updated: {user}")


# ── Delete ─────────────────────────────────────────────────────────────────────

with SessionLocal() as session:
    user = session.get(User, 2)
    if user:
        session.delete(user)   # cascade deletes orders too
        session.commit()
        print("Deleted Bob and all his orders")

Querying with joins and aggregates

from sqlalchemy import func, and_, or_

with SessionLocal() as session:
    # Join users and orders
    results = (
        session.query(User.name, Order.total, Order.status)
        .join(Order, User.id == Order.user_id)
        .filter(Order.total > 50)
        .order_by(Order.total.desc())
        .all()
    )
    for name, total, status in results:
        print(f"{name}: ${total:.2f} ({status})")

    # Aggregate — total spent per user
    spending = (
        session.query(
            User.name,
            func.count(Order.id).label("order_count"),
            func.sum(Order.total).label("total_spent"),
            func.avg(Order.total).label("avg_order"),
        )
        .join(Order, isouter=True)   # LEFT JOIN — include users with no orders
        .group_by(User.id)
        .order_by(func.sum(Order.total).desc())
        .all()
    )
    print("\nSpending by user:")
    for row in spending:
        print(f"  {row.name}: {row.order_count} orders, "
              f"${row.total_spent or 0:.2f} total")

    # Complex filter
    high_value_pending = (
        session.query(Order)
        .filter(
            and_(
                Order.status == "pending",
                Order.total > 100,
            )
        )
        .all()
    )

SQLAlchemy 2.0 style (the modern way)

SQLAlchemy 2.0 introduced a cleaner query syntax using select():

from sqlalchemy import select

with SessionLocal() as session:
    # Simple select
    stmt = select(User).where(User.age > 25).order_by(User.name)
    users = session.scalars(stmt).all()

    # Join
    stmt = (
        select(User, Order)
        .join(Order, User.id == Order.user_id)
        .where(Order.status == "pending")
    )
    rows = session.execute(stmt).all()

    # Update in bulk
    from sqlalchemy import update
    session.execute(
        update(Order)
        .where(Order.status == "pending")
        .values(status="processing")
    )
    session.commit()

Database Migrations with Alembic

When your database schema needs to change — adding a column, renaming a table, creating an index — you need migrations. Migrations are version-controlled SQL scripts that upgrade and downgrade your schema safely.

pip install alembic
alembic init migrations   # creates alembic.ini and migrations/ folder

Configure alembic.ini:

sqlalchemy.url = sqlite:///myapp.db

Configure migrations/env.py to point to your models:

from myapp.models import Base
target_metadata = Base.metadata

Generate a migration automatically from model changes:

alembic revision --autogenerate -m "add phone to users"

This produces a file like migrations/versions/abc123_add_phone_to_users.py:

def upgrade():
    op.add_column("users", sa.Column("phone", sa.String(20), nullable=True))

def downgrade():
    op.drop_column("users", "phone")

Apply all pending migrations:

alembic upgrade head

Roll back one migration:

alembic downgrade -1

Check current version:

alembic current
alembic history

Project: A Complete User + Order System

"""
shop_db.py — A complete shop database with SQLAlchemy.
"""
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from typing import Optional

from sqlalchemy import (
    Boolean, Column, DateTime, Float, ForeignKey,
    Integer, String, Text, create_engine, func
)
from sqlalchemy.orm import (
    DeclarativeBase, Session, relationship, sessionmaker
)


# ── Models ────────────────────────────────────────────────────────────────────

class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id         = Column(Integer, primary_key=True)
    name       = Column(String(100), nullable=False)
    email      = Column(String(200), unique=True, nullable=False)
    active     = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)

    orders = relationship("Order", back_populates="user",
                          cascade="all, delete-orphan")

    @property
    def total_spent(self) -> float:
        return sum(o.total for o in self.orders if o.status == "completed")

    def __repr__(self):
        return f"User({self.id}, {self.name!r})"


class Product(Base):
    __tablename__ = "products"

    id       = Column(Integer, primary_key=True)
    name     = Column(String(200), nullable=False)
    price    = Column(Float, nullable=False)
    stock    = Column(Integer, default=0)
    category = Column(String(50))

    order_items = relationship("OrderItem", back_populates="product")

    def __repr__(self):
        return f"Product({self.id}, {self.name!r}, ${self.price:.2f})"


class Order(Base):
    __tablename__ = "orders"

    id         = Column(Integer, primary_key=True)
    user_id    = Column(Integer, ForeignKey("users.id"), nullable=False)
    status     = Column(String(20), default="pending")
    created_at = Column(DateTime, default=datetime.utcnow)
    notes      = Column(Text, default="")

    user  = relationship("User",      back_populates="orders")
    items = relationship("OrderItem", back_populates="order",
                         cascade="all, delete-orphan")

    @property
    def total(self) -> float:
        return sum(item.subtotal for item in self.items)

    def __repr__(self):
        return f"Order({self.id}, user={self.user_id}, ${self.total:.2f})"


class OrderItem(Base):
    __tablename__ = "order_items"

    id         = Column(Integer, primary_key=True)
    order_id   = Column(Integer, ForeignKey("orders.id"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
    quantity   = Column(Integer, default=1)
    unit_price = Column(Float, nullable=False)

    order   = relationship("Order",   back_populates="items")
    product = relationship("Product", back_populates="order_items")

    @property
    def subtotal(self) -> float:
        return self.unit_price * self.quantity


# ── Database setup ────────────────────────────────────────────────────────────

engine       = create_engine("sqlite:///shop.db", echo=False)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)


# ── Service layer ─────────────────────────────────────────────────────────────

class ShopService:
    def __init__(self, session: Session):
        self.session = session

    # Users
    def create_user(self, name: str, email: str) -> User:
        user = User(name=name, email=email)
        self.session.add(user)
        self.session.commit()
        return user

    def get_user(self, user_id: int) -> Optional[User]:
        return self.session.get(User, user_id)

    def list_users(self, active_only: bool = True) -> list[User]:
        q = self.session.query(User)
        if active_only:
            q = q.filter(User.active == True)
        return q.order_by(User.name).all()

    # Products
    def create_product(self, name: str, price: float,
                       stock: int, category: str = "") -> Product:
        p = Product(name=name, price=price, stock=stock, category=category)
        self.session.add(p)
        self.session.commit()
        return p

    def list_products(self, category: str = None) -> list[Product]:
        q = self.session.query(Product).filter(Product.stock > 0)
        if category:
            q = q.filter(Product.category == category)
        return q.order_by(Product.name).all()

    # Orders
    def create_order(self, user_id: int, items: list[dict]) -> Order:
        """
        items: list of {"product_id": int, "quantity": int}
        """
        order = Order(user_id=user_id)
        self.session.add(order)
        self.session.flush()   # get order.id

        for item_data in items:
            product = self.session.get(Product, item_data["product_id"])
            if not product:
                raise ValueError(f"Product {item_data['product_id']} not found.")
            qty = item_data["quantity"]
            if product.stock < qty:
                raise ValueError(
                    f"Insufficient stock for {product.name}: "
                    f"{product.stock} available, {qty} requested."
                )
            product.stock -= qty
            order_item = OrderItem(
                order_id   = order.id,
                product_id = product.id,
                quantity   = qty,
                unit_price = product.price,
            )
            self.session.add(order_item)

        self.session.commit()
        return order

    def complete_order(self, order_id: int) -> Order:
        order = self.session.get(Order, order_id)
        if not order:
            raise ValueError(f"Order {order_id} not found.")
        order.status = "completed"
        self.session.commit()
        return order

    # Reports
    def sales_report(self) -> list[dict]:
        rows = (
            self.session.query(
                User.name.label("customer"),
                func.count(Order.id).label("orders"),
                func.sum(OrderItem.unit_price * OrderItem.quantity).label("spent"),
            )
            .join(Order, User.id == Order.user_id)
            .join(OrderItem, Order.id == OrderItem.order_id)
            .filter(Order.status == "completed")
            .group_by(User.id)
            .order_by(func.sum(OrderItem.unit_price * OrderItem.quantity).desc())
            .all()
        )
        return [{"customer": r.customer, "orders": r.orders,
                 "spent": float(r.spent)} for r in rows]


# ── Demo ──────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    with SessionLocal() as session:
        svc = ShopService(session)

        # Seed
        alice   = svc.create_user("Alice", "alice@example.com")
        bob     = svc.create_user("Bob",   "bob@example.com")

        book    = svc.create_product("Python Book",  29.99, 100, "Education")
        laptop  = svc.create_product("Laptop",       999.99, 20, "Electronics")
        pen     = svc.create_product("Pen",            2.50, 500, "Stationery")

        # Place orders
        order1 = svc.create_order(alice.id, [
            {"product_id": book.id,   "quantity": 2},
            {"product_id": pen.id,    "quantity": 5},
        ])
        order2 = svc.create_order(bob.id, [
            {"product_id": laptop.id, "quantity": 1},
        ])

        svc.complete_order(order1.id)
        svc.complete_order(order2.id)

        # Report
        print("\n── Sales Report ──")
        for row in svc.sales_report():
            print(f"  {row['customer']:<15} {row['orders']} orders  "
                  f"${row['spent']:.2f} spent")

        # Product stock
        print("\n── Current Stock ──")
        for p in svc.list_products():
            print(f"  {p.name:<20} ${p.price:>8.2f}  stock={p.stock}")

SQLite vs PostgreSQL — When to Use Which

Feature SQLite PostgreSQL
Setup Zero — built into Python Server required
Concurrent writes Limited (one writer at a time) Full concurrency
Data size Up to ~140TB Effectively unlimited
Performance Excellent for reads Excellent for reads and writes
Data types Flexible (dynamic types) Strict (typed)
JSON support Basic Full jsonb with indexing
Full-text search Basic Built-in, powerful
Best for Local apps, prototyping, testing Production apps, multi-user

Rule of thumb: Start with SQLite. Switch to PostgreSQL when you need multiple concurrent writers, a separate database server, or advanced SQL features.

What You Learned in This Chapter

  • Databases provide efficient search, sort, filter, and relate operations that files cannot.
  • SQL CRUD: INSERT, SELECT, UPDATE, DELETE. Always use parameterized queries — never format user input into SQL.
  • Transactions group statements atomically — all succeed or all roll back.
  • SQLite is built into Python, stores data in a single file, and is perfect for learning and small apps. conn.row_factory = sqlite3.Row makes rows behave like dicts.
  • cursor.lastrowid gives the ID of the last inserted row.
  • PostgreSQL handles production workloads. Use psycopg2 for raw access, connection pooling for performance.
  • SQLAlchemy ORM defines data models as Python classes, generates SQL for you, and manages relationships with relationship().
  • session.add(), session.commit(), session.query(), session.get() are the core ORM operations.
  • Alembic manages schema migrations — version-controlled changes to your database structure.
  • Use SQLite for prototypes, PostgreSQL for production.

What's Next?

Chapter 32 covers Web Development with Flask and FastAPI — building HTTP APIs, handling requests and responses, authentication, and serving data from your database over the web. Everything you've learned about Python, OOP, databases, and type hints comes together.

© 2026 Abhilash Sahoo. Python: Zero to Hero.