Chapter 31: Working with Databases
Files store data. But as soon as you need to search, filter, sort, or relate data across multiple collections, files become painful. Databases are purpose-built for exactly that.
This chapter covers three layers of database access in Python:
- SQLite — built-in, zero setup, perfect for learning and small apps
- PostgreSQL with psycopg2 — production-grade relational database
- SQLAlchemy — the standard Python ORM, lets you work with databases using Python objects instead of raw SQL
You'll understand SQL well enough to write real queries, and know when to use raw SQL vs an ORM.
Why Databases?
Consider storing 100,000 users in a CSV file. Finding a user by email means reading every line — O(n). Sorting by last name means loading everything into memory. Joining user data with their orders means matching records manually in Python.
A database does all of this efficiently:
- Indexes make lookups O(log n) or O(1)
- Queries happen in the database engine, not Python memory
- Relationships are enforced by the schema
- Multiple programs can read and write safely at the same time
- Transactions ensure your data is always consistent
SQL Basics
SQL (Structured Query Language) is the language all relational databases use. The same core syntax works in SQLite, PostgreSQL, MySQL, and others.
The four fundamental operations — CRUD:
-- Create
INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
-- Read
SELECT name, email FROM users WHERE age > 25 ORDER BY name;
-- Update
UPDATE users SET age = 31 WHERE email = 'alice@example.com';
-- Delete
DELETE FROM users WHERE email = 'alice@example.com';
Other essential SQL:
-- Create a table
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
-- Count rows
SELECT COUNT(*) FROM users;
-- Aggregate functions
SELECT AVG(age), MIN(age), MAX(age) FROM users;
-- Group and aggregate
SELECT age, COUNT(*) AS count FROM users GROUP BY age ORDER BY age;
-- Filter groups
SELECT age, COUNT(*) AS count FROM users
GROUP BY age
HAVING COUNT(*) > 1;
-- Join two tables
SELECT users.name, orders.total
FROM users
JOIN orders ON users.id = orders.user_id
WHERE orders.total > 100;
-- Left join — include users with no orders
SELECT users.name, orders.total
FROM users
LEFT JOIN orders ON users.id = orders.user_id;
SQLite — The Built-in Database
SQLite is included with Python — no installation needed. Data lives in a single .db file. Perfect for prototypes, local apps, and learning.
Connecting and creating tables
import sqlite3
# Connect — creates the file if it doesn't exist
conn = sqlite3.connect("myapp.db")
# Use a context manager for automatic commit/rollback
with conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.close()
Inserting data safely
Always use parameterized queries — never format user input directly into SQL. Direct formatting opens the door to SQL injection attacks.
conn = sqlite3.connect("myapp.db")
# WRONG — SQL injection vulnerability
name = "Alice'; DROP TABLE users; --"
conn.execute(f"INSERT INTO users (name, email) VALUES ('{name}', 'a@b.com')")
# RIGHT — parameterized query (? placeholders)
conn.execute(
"INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
("Alice", "alice@example.com", 30)
)
conn.commit()
# Insert many rows at once
users = [
("Bob", "bob@example.com", 25),
("Carlos", "carlos@example.com", 35),
("Diana", "diana@example.com", 28),
]
conn.executemany(
"INSERT OR IGNORE INTO users (name, email, age) VALUES (?, ?, ?)",
users
)
conn.commit()
conn.close()
Querying data
conn = sqlite3.connect("myapp.db")
# Make rows behave like dicts
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT * FROM users ORDER BY name")
for row in cursor:
print(f"{row['name']:<15} {row['email']:<30} age={row['age']}")
# Fetch methods
cursor = conn.execute("SELECT * FROM users WHERE age > ?", (27,))
one = cursor.fetchone() # first row or None
some = cursor.fetchmany(3) # list of up to 3 rows
all_ = cursor.fetchall() # list of all remaining rows
# Parameterized query with named placeholders
cursor = conn.execute(
"SELECT * FROM users WHERE name = :name AND age >= :min_age",
{"name": "Alice", "min_age": 18}
)
conn.close()
Transactions
A transaction groups multiple statements into an atomic unit. Either all succeed or none do. SQLite uses transactions automatically, but you control them explicitly with commit() and rollback():
conn = sqlite3.connect("myapp.db")
try:
conn.execute("INSERT INTO accounts (user_id, balance) VALUES (1, 1000)")
conn.execute("INSERT INTO accounts (user_id, balance) VALUES (2, 500)")
# Transfer $200 from account 1 to account 2
conn.execute("UPDATE accounts SET balance = balance - 200 WHERE user_id = 1")
conn.execute("UPDATE accounts SET balance = balance + 200 WHERE user_id = 2")
conn.commit() # both updates succeed together
except Exception as e:
conn.rollback() # both updates undone if anything fails
raise
finally:
conn.close()
A full SQLite example
"""
contacts.py — A simple contact book backed by SQLite.
"""
import sqlite3
from pathlib import Path
from typing import Optional
DB_PATH = Path("contacts.db")
def get_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON") # enforce FK constraints
conn.execute("PRAGMA journal_mode = WAL") # better concurrent access
return conn
def setup_schema(conn: sqlite3.Connection) -> None:
conn.executescript("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
phone TEXT,
notes TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_contacts_name
ON contacts(name COLLATE NOCASE);
""")
conn.commit()
def add_contact(conn, name: str, email: str = None,
phone: str = None, notes: str = "") -> int:
cursor = conn.execute(
"INSERT INTO contacts (name, email, phone, notes) VALUES (?, ?, ?, ?)",
(name, email, phone, notes)
)
conn.commit()
return cursor.lastrowid # ID of the newly inserted row
def find_by_name(conn, query: str) -> list:
return conn.execute(
"SELECT * FROM contacts WHERE name LIKE ? ORDER BY name",
(f"%{query}%",)
).fetchall()
def find_by_id(conn, contact_id: int) -> Optional[sqlite3.Row]:
return conn.execute(
"SELECT * FROM contacts WHERE id = ?", (contact_id,)
).fetchone()
def update_contact(conn, contact_id: int, **fields) -> bool:
if not fields:
return False
allowed = {"name", "email", "phone", "notes"}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return False
set_clause = ", ".join(f"{k} = ?" for k in updates)
cursor = conn.execute(
f"UPDATE contacts SET {set_clause} WHERE id = ?",
[*updates.values(), contact_id]
)
conn.commit()
return cursor.rowcount > 0
def delete_contact(conn, contact_id: int) -> bool:
cursor = conn.execute("DELETE FROM contacts WHERE id = ?", (contact_id,))
conn.commit()
return cursor.rowcount > 0
def list_all(conn) -> list:
return conn.execute("SELECT * FROM contacts ORDER BY name").fetchall()
def count(conn) -> int:
return conn.execute("SELECT COUNT(*) FROM contacts").fetchone()[0]
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
conn = get_connection()
setup_schema(conn)
# Add contacts
id1 = add_contact(conn, "Alice Smith", "alice@example.com", "555-0101")
id2 = add_contact(conn, "Bob Jones", "bob@example.com", "555-0102")
id3 = add_contact(conn, "Alice Walker", "awalker@example.com")
print(f"Added {count(conn)} contacts\n")
# Search
results = find_by_name(conn, "alice")
print(f"Search 'alice': {len(results)} results")
for r in results:
print(f" [{r['id']}] {r['name']} — {r['email']}")
# Update
update_contact(conn, id1, phone="555-9999", notes="Updated phone")
updated = find_by_id(conn, id1)
print(f"\nUpdated: {updated['name']} — {updated['phone']}")
# Delete
delete_contact(conn, id2)
print(f"\nAfter delete: {count(conn)} contacts")
conn.close()
PostgreSQL with psycopg2
For production applications, you want a full-featured database server. PostgreSQL is the best open-source relational database — it handles large datasets, concurrent users, advanced queries, and full ACID compliance.
pip install psycopg2-binary
import psycopg2
from psycopg2.extras import RealDictCursor
# Connect
conn = psycopg2.connect(
host = "localhost",
port = 5432,
dbname = "myapp",
user = "myuser",
password = "mypassword",
)
# Use RealDictCursor so rows behave like dicts
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Create table
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
price NUMERIC(10, 2) NOT NULL,
stock INTEGER DEFAULT 0,
category TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# Insert — psycopg2 uses %s placeholders
cur.execute(
"INSERT INTO products (name, price, stock, category) VALUES (%s, %s, %s, %s)",
("Python Book", 29.99, 100, "Education")
)
# Insert many
products = [
("Laptop", 999.99, 50, "Electronics"),
("Keyboard", 49.99, 200, "Electronics"),
("Notebook", 4.99, 500, "Stationery"),
]
cur.executemany(
"INSERT INTO products (name, price, stock, category) VALUES (%s, %s, %s, %s)",
products
)
# Query
cur.execute("SELECT * FROM products WHERE price < %s ORDER BY price", (100,))
for row in cur.fetchall():
print(f"{row['name']}: ${row['price']}")
conn.commit()
conn.close()
Connection pooling with psycopg2.pool
Opening a new database connection for every request is expensive — it takes ~100ms. Connection pooling maintains a pool of open connections and reuses them:
from psycopg2 import pool
connection_pool = pool.ThreadedConnectionPool(
minconn = 2,
maxconn = 10,
host = "localhost",
dbname = "myapp",
user = "myuser",
password= "mypassword",
)
def get_user(user_id: int) -> dict:
conn = connection_pool.getconn()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cur.fetchone()
finally:
connection_pool.putconn(conn) # return to pool, not close
SQLAlchemy — The Python ORM
Writing raw SQL for every query works, but it's verbose and error-prone for complex apps. SQLAlchemy is Python's most popular ORM (Object-Relational Mapper). It lets you define your data model as Python classes and query using Python syntax.
pip install sqlalchemy
Defining models
from sqlalchemy import (
create_engine, Column, Integer, String, Float, Boolean,
ForeignKey, DateTime, Text, func
)
from sqlalchemy.orm import DeclarativeBase, relationship, Session
from datetime import datetime
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
email = Column(String(200), unique=True, nullable=False)
age = Column(Integer, default=0)
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationship — one user has many orders
orders = relationship("Order", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"User(id={self.id}, name={self.name!r}, email={self.email!r})"
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
total = Column(Float, nullable=False)
status = Column(String(20), default="pending")
created_at = Column(DateTime, default=datetime.utcnow)
notes = Column(Text, default="")
# Relationship — each order belongs to one user
user = relationship("User", back_populates="orders")
def __repr__(self):
return f"Order(id={self.id}, total={self.total}, status={self.status!r})"
Creating the database and session
# SQLite for this example (change the URL for PostgreSQL)
engine = create_engine(
"sqlite:///shop.db",
echo=False, # set True to print all SQL
)
# Create all tables
Base.metadata.create_all(engine)
# Session factory
from sqlalchemy.orm import sessionmaker
SessionLocal = sessionmaker(bind=engine)
CRUD with SQLAlchemy
# ── Create ─────────────────────────────────────────────────────────────────────
with SessionLocal() as session:
alice = User(name="Alice", email="alice@example.com", age=30)
bob = User(name="Bob", email="bob@example.com", age=25)
session.add_all([alice, bob])
session.flush() # assigns IDs without committing
# Add orders for Alice
session.add(Order(user_id=alice.id, total=59.98, status="completed"))
session.add(Order(user_id=alice.id, total=129.99, status="pending"))
session.add(Order(user_id=bob.id, total=19.99, status="completed"))
session.commit()
print(f"Created: {alice}, {bob}")
# ── Read ───────────────────────────────────────────────────────────────────────
with SessionLocal() as session:
# Get by primary key
user = session.get(User, 1)
print(user)
# Filter
active_users = session.query(User).filter(User.active == True).all()
young_users = session.query(User).filter(User.age < 30).all()
alice = session.query(User).filter_by(email="alice@example.com").first()
# Order and limit
top_users = session.query(User).order_by(User.name).limit(10).all()
# Access related objects (lazy loaded by default)
for user in session.query(User).all():
print(f"{user.name}: {len(user.orders)} orders")
# ── Update ─────────────────────────────────────────────────────────────────────
with SessionLocal() as session:
user = session.query(User).filter_by(email="alice@example.com").first()
if user:
user.age = 31
user.name = "Alice Smith"
session.commit()
print(f"Updated: {user}")
# ── Delete ─────────────────────────────────────────────────────────────────────
with SessionLocal() as session:
user = session.get(User, 2)
if user:
session.delete(user) # cascade deletes orders too
session.commit()
print("Deleted Bob and all his orders")
Querying with joins and aggregates
from sqlalchemy import func, and_, or_
with SessionLocal() as session:
# Join users and orders
results = (
session.query(User.name, Order.total, Order.status)
.join(Order, User.id == Order.user_id)
.filter(Order.total > 50)
.order_by(Order.total.desc())
.all()
)
for name, total, status in results:
print(f"{name}: ${total:.2f} ({status})")
# Aggregate — total spent per user
spending = (
session.query(
User.name,
func.count(Order.id).label("order_count"),
func.sum(Order.total).label("total_spent"),
func.avg(Order.total).label("avg_order"),
)
.join(Order, isouter=True) # LEFT JOIN — include users with no orders
.group_by(User.id)
.order_by(func.sum(Order.total).desc())
.all()
)
print("\nSpending by user:")
for row in spending:
print(f" {row.name}: {row.order_count} orders, "
f"${row.total_spent or 0:.2f} total")
# Complex filter
high_value_pending = (
session.query(Order)
.filter(
and_(
Order.status == "pending",
Order.total > 100,
)
)
.all()
)
SQLAlchemy 2.0 style (the modern way)
SQLAlchemy 2.0 introduced a cleaner query syntax using select():
from sqlalchemy import select
with SessionLocal() as session:
# Simple select
stmt = select(User).where(User.age > 25).order_by(User.name)
users = session.scalars(stmt).all()
# Join
stmt = (
select(User, Order)
.join(Order, User.id == Order.user_id)
.where(Order.status == "pending")
)
rows = session.execute(stmt).all()
# Update in bulk
from sqlalchemy import update
session.execute(
update(Order)
.where(Order.status == "pending")
.values(status="processing")
)
session.commit()
Database Migrations with Alembic
When your database schema needs to change — adding a column, renaming a table, creating an index — you need migrations. Migrations are version-controlled SQL scripts that upgrade and downgrade your schema safely.
pip install alembic
alembic init migrations # creates alembic.ini and migrations/ folder
Configure alembic.ini:
sqlalchemy.url = sqlite:///myapp.db
Configure migrations/env.py to point to your models:
from myapp.models import Base
target_metadata = Base.metadata
Generate a migration automatically from model changes:
alembic revision --autogenerate -m "add phone to users"
This produces a file like migrations/versions/abc123_add_phone_to_users.py:
def upgrade():
op.add_column("users", sa.Column("phone", sa.String(20), nullable=True))
def downgrade():
op.drop_column("users", "phone")
Apply all pending migrations:
alembic upgrade head
Roll back one migration:
alembic downgrade -1
Check current version:
alembic current
alembic history
Project: A Complete User + Order System
"""
shop_db.py — A complete shop database with SQLAlchemy.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import (
Boolean, Column, DateTime, Float, ForeignKey,
Integer, String, Text, create_engine, func
)
from sqlalchemy.orm import (
DeclarativeBase, Session, relationship, sessionmaker
)
# ── Models ────────────────────────────────────────────────────────────────────
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(200), unique=True, nullable=False)
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
orders = relationship("Order", back_populates="user",
cascade="all, delete-orphan")
@property
def total_spent(self) -> float:
return sum(o.total for o in self.orders if o.status == "completed")
def __repr__(self):
return f"User({self.id}, {self.name!r})"
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
price = Column(Float, nullable=False)
stock = Column(Integer, default=0)
category = Column(String(50))
order_items = relationship("OrderItem", back_populates="product")
def __repr__(self):
return f"Product({self.id}, {self.name!r}, ${self.price:.2f})"
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
status = Column(String(20), default="pending")
created_at = Column(DateTime, default=datetime.utcnow)
notes = Column(Text, default="")
user = relationship("User", back_populates="orders")
items = relationship("OrderItem", back_populates="order",
cascade="all, delete-orphan")
@property
def total(self) -> float:
return sum(item.subtotal for item in self.items)
def __repr__(self):
return f"Order({self.id}, user={self.user_id}, ${self.total:.2f})"
class OrderItem(Base):
__tablename__ = "order_items"
id = Column(Integer, primary_key=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity = Column(Integer, default=1)
unit_price = Column(Float, nullable=False)
order = relationship("Order", back_populates="items")
product = relationship("Product", back_populates="order_items")
@property
def subtotal(self) -> float:
return self.unit_price * self.quantity
# ── Database setup ────────────────────────────────────────────────────────────
engine = create_engine("sqlite:///shop.db", echo=False)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
# ── Service layer ─────────────────────────────────────────────────────────────
class ShopService:
def __init__(self, session: Session):
self.session = session
# Users
def create_user(self, name: str, email: str) -> User:
user = User(name=name, email=email)
self.session.add(user)
self.session.commit()
return user
def get_user(self, user_id: int) -> Optional[User]:
return self.session.get(User, user_id)
def list_users(self, active_only: bool = True) -> list[User]:
q = self.session.query(User)
if active_only:
q = q.filter(User.active == True)
return q.order_by(User.name).all()
# Products
def create_product(self, name: str, price: float,
stock: int, category: str = "") -> Product:
p = Product(name=name, price=price, stock=stock, category=category)
self.session.add(p)
self.session.commit()
return p
def list_products(self, category: str = None) -> list[Product]:
q = self.session.query(Product).filter(Product.stock > 0)
if category:
q = q.filter(Product.category == category)
return q.order_by(Product.name).all()
# Orders
def create_order(self, user_id: int, items: list[dict]) -> Order:
"""
items: list of {"product_id": int, "quantity": int}
"""
order = Order(user_id=user_id)
self.session.add(order)
self.session.flush() # get order.id
for item_data in items:
product = self.session.get(Product, item_data["product_id"])
if not product:
raise ValueError(f"Product {item_data['product_id']} not found.")
qty = item_data["quantity"]
if product.stock < qty:
raise ValueError(
f"Insufficient stock for {product.name}: "
f"{product.stock} available, {qty} requested."
)
product.stock -= qty
order_item = OrderItem(
order_id = order.id,
product_id = product.id,
quantity = qty,
unit_price = product.price,
)
self.session.add(order_item)
self.session.commit()
return order
def complete_order(self, order_id: int) -> Order:
order = self.session.get(Order, order_id)
if not order:
raise ValueError(f"Order {order_id} not found.")
order.status = "completed"
self.session.commit()
return order
# Reports
def sales_report(self) -> list[dict]:
rows = (
self.session.query(
User.name.label("customer"),
func.count(Order.id).label("orders"),
func.sum(OrderItem.unit_price * OrderItem.quantity).label("spent"),
)
.join(Order, User.id == Order.user_id)
.join(OrderItem, Order.id == OrderItem.order_id)
.filter(Order.status == "completed")
.group_by(User.id)
.order_by(func.sum(OrderItem.unit_price * OrderItem.quantity).desc())
.all()
)
return [{"customer": r.customer, "orders": r.orders,
"spent": float(r.spent)} for r in rows]
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
with SessionLocal() as session:
svc = ShopService(session)
# Seed
alice = svc.create_user("Alice", "alice@example.com")
bob = svc.create_user("Bob", "bob@example.com")
book = svc.create_product("Python Book", 29.99, 100, "Education")
laptop = svc.create_product("Laptop", 999.99, 20, "Electronics")
pen = svc.create_product("Pen", 2.50, 500, "Stationery")
# Place orders
order1 = svc.create_order(alice.id, [
{"product_id": book.id, "quantity": 2},
{"product_id": pen.id, "quantity": 5},
])
order2 = svc.create_order(bob.id, [
{"product_id": laptop.id, "quantity": 1},
])
svc.complete_order(order1.id)
svc.complete_order(order2.id)
# Report
print("\n── Sales Report ──")
for row in svc.sales_report():
print(f" {row['customer']:<15} {row['orders']} orders "
f"${row['spent']:.2f} spent")
# Product stock
print("\n── Current Stock ──")
for p in svc.list_products():
print(f" {p.name:<20} ${p.price:>8.2f} stock={p.stock}")
SQLite vs PostgreSQL — When to Use Which
| Feature | SQLite | PostgreSQL |
|---|---|---|
| Setup | Zero — built into Python | Server required |
| Concurrent writes | Limited (one writer at a time) | Full concurrency |
| Data size | Up to ~140TB | Effectively unlimited |
| Performance | Excellent for reads | Excellent for reads and writes |
| Data types | Flexible (dynamic types) | Strict (typed) |
| JSON support | Basic | Full jsonb with indexing |
| Full-text search | Basic | Built-in, powerful |
| Best for | Local apps, prototyping, testing | Production apps, multi-user |
Rule of thumb: Start with SQLite. Switch to PostgreSQL when you need multiple concurrent writers, a separate database server, or advanced SQL features.
What You Learned in This Chapter
- Databases provide efficient search, sort, filter, and relate operations that files cannot.
- SQL CRUD:
INSERT,SELECT,UPDATE,DELETE. Always use parameterized queries — never format user input into SQL. - Transactions group statements atomically — all succeed or all roll back.
- SQLite is built into Python, stores data in a single file, and is perfect for learning and small apps.
conn.row_factory = sqlite3.Rowmakes rows behave like dicts. cursor.lastrowidgives the ID of the last inserted row.- PostgreSQL handles production workloads. Use
psycopg2for raw access, connection pooling for performance. - SQLAlchemy ORM defines data models as Python classes, generates SQL for you, and manages relationships with
relationship(). session.add(),session.commit(),session.query(),session.get()are the core ORM operations.- Alembic manages schema migrations — version-controlled changes to your database structure.
- Use SQLite for prototypes, PostgreSQL for production.
What's Next?
Chapter 32 covers Web Development with Flask and FastAPI — building HTTP APIs, handling requests and responses, authentication, and serving data from your database over the web. Everything you've learned about Python, OOP, databases, and type hints comes together.