Share
Chapter 48: Build a Web Scraper and Data Pipeline
Data doesn't arrive clean. It's scattered across websites, in inconsistent formats, with missing values and duplicates. A data pipeline takes raw data from the source, cleans it, transforms it, stores it, and delivers insights.
In this chapter, you build a complete, end-to-end pipeline:
Scrape -> Clean -> Store -> Analyse -> Visualise -> Schedule
We'll scrape real book data from books.toscrape.com, clean it with pandas, store it in SQLite, analyse it, visualise the results, and schedule the whole thing to run automatically.
The Plan
pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── scraper.py <- fetch + parse HTML
│ ├── cleaner.py <- validate + transform data
│ ├── storage.py <- SQLite with SQLAlchemy
│ ├── analyser.py <- pandas analysis
│ ├── visualiser.py <- matplotlib + seaborn charts
│ └── runner.py <- orchestrate + schedule
├── output/ <- charts and reports
├── data/ <- raw and cleaned CSVs
└── books.db <- SQLite database
Step 1 — The Scraper (scraper.py)
# pipeline/scraper.py
"""
Scrape books.toscrape.com — a practice site built for scraping.
No robots.txt restrictions, no API keys needed.
"""
import time
import logging
import requests
from bs4 import BeautifulSoup
from dataclasses import dataclass, field
from datetime import datetime
log = logging.getLogger(__name__)
BASE_URL = "https://books.toscrape.com"
RATING_MAP = {"One": 1, "Two": 2, "Three": 3, "Four": 4, "Five": 5}
@dataclass
class RawBook:
"""Raw data as scraped — strings only, no cleaning yet."""
title: str
price_str: str # e.g. "£12.99" — raw, may have encoding issues
rating_word: str # e.g. "Three"
in_stock: bool
url: str
scraped_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
def fetch_html(url: str, retries: int = 3, delay: float = 1.0) -> str | None:
"""Fetch a URL with retries and exponential backoff."""
for attempt in range(retries):
try:
response = requests.get(
url,
timeout=10,
headers={"User-Agent": "DataPipelineBot/1.0 (educational)"},
)
response.raise_for_status()
return response.text
except requests.RequestException as e:
log.warning(f"Attempt {attempt+1}/{retries} failed for {url}: {e}")
if attempt < retries - 1:
time.sleep(delay * (2 ** attempt)) # exponential backoff
return None
def parse_catalogue_page(html: str, base_url: str) -> tuple[list[RawBook], str | None]:
"""
Parse one catalogue page.
Returns (list_of_books, next_page_url_or_None).
"""
soup = BeautifulSoup(html, "lxml")
books = []
for article in soup.select("article.product_pod"):
title = article.find("h3").find("a")["title"]
price_str = article.find("p", class_="price_color").text.strip()
rating_word = article.find("p", class_="star-rating")["class"][1]
in_stock = "In stock" in article.find("p", class_="instock").text
href = article.find("h3").find("a")["href"]
url = f"{base_url}/catalogue/{href.replace('../', '')}"
books.append(RawBook(
title = title,
price_str = price_str,
rating_word = rating_word,
in_stock = in_stock,
url = url,
))
# Find next page link
next_btn = soup.select_one("li.next a")
next_url = None
if next_btn:
current_path = base_url + "/catalogue"
next_url = f"{current_path}/{next_btn['href']}"
return books, next_url
def scrape_catalogue(max_pages: int = 50, delay: float = 1.0) -> list[RawBook]:
"""
Scrape the entire catalogue (50 pages = 1000 books).
Politely waits between requests.
"""
all_books = []
url = f"{BASE_URL}/catalogue/page-1.html"
page = 1
while url and page <= max_pages:
log.info(f"Scraping page {page}: {url}")
html = fetch_html(url)
if html is None:
log.error(f"Failed to fetch page {page}. Stopping.")
break
books, next_url = parse_catalogue_page(html, BASE_URL)
all_books.extend(books)
log.info(f" Page {page}: {len(books)} books (total: {len(all_books)})")
url = next_url
page += 1
if url:
time.sleep(delay)
log.info(f"Scraping complete. Total books: {len(all_books)}")
return all_books
Step 2 — The Cleaner (cleaner.py)
# pipeline/cleaner.py
"""
Transform raw scraped data into a clean, validated DataFrame.
"""
import re
import logging
import pandas as pd
from dataclasses import asdict
from scraper import RawBook
log = logging.getLogger(__name__)
RATING_MAP = {"One": 1, "Two": 2, "Three": 3, "Four": 4, "Five": 5}
def parse_price(price_str: str) -> float | None:
"""Extract float from a price string like '£12.99' or '£12.99'."""
match = re.search(r"(\d+\.\d+)", price_str)
if match:
return float(match.group(1))
return None
def clean_books(raw_books: list[RawBook]) -> pd.DataFrame:
"""
Convert raw books to a clean DataFrame.
Transformations:
- Parse price string -> float
- Map rating word -> int (1--5)
- Normalise title (strip whitespace)
- Drop rows that fail validation
- Add derived columns
"""
records = [asdict(b) for b in raw_books]
df = pd.DataFrame(records)
initial_count = len(df)
log.info(f"Cleaning {initial_count} raw records...")
# Parse price
df["price"] = df["price_str"].apply(parse_price)
price_failures = df["price"].isna().sum()
if price_failures:
log.warning(f" {price_failures} rows have unparseable prices — dropping")
df = df.dropna(subset=["price"])
# Map rating
df["rating"] = df["rating_word"].map(RATING_MAP)
rating_failures = df["rating"].isna().sum()
if rating_failures:
log.warning(f" {rating_failures} rows have unknown rating — dropping")
df = df.dropna(subset=["rating"])
df["rating"] = df["rating"].astype(int)
# Clean title
df["title"] = df["title"].str.strip()
# Drop duplicates by URL
before_dedup = len(df)
df = df.drop_duplicates(subset=["url"])
deduped = before_dedup - len(df)
if deduped:
log.info(f" Removed {deduped} duplicate URLs")
# Derived columns
df["price_band"] = pd.cut(
df["price"],
bins=[0, 10, 20, 35, 60],
labels=["Budget", "Mid-range", "Premium", "Luxury"],
)
df["value_score"] = (df["rating"] / df["price"] * 10).round(2)
# Parse scrape timestamp
df["scraped_at"] = pd.to_datetime(df["scraped_at"])
# Drop raw intermediate columns
df = df.drop(columns=["price_str", "rating_word"])
log.info(f"Cleaning done. {len(df)}/{initial_count} records kept.")
return df
def validate_dataframe(df: pd.DataFrame) -> bool:
"""Assert data quality expectations."""
checks = {
"No null prices": df["price"].isna().sum() == 0,
"No null ratings": df["rating"].isna().sum() == 0,
"Price > 0": (df["price"] > 0).all(),
"Rating in 1-5": df["rating"].between(1, 5).all(),
"No empty titles": (df["title"].str.len() > 0).all(),
}
all_passed = True
for check, result in checks.items():
status = "[x]" if result else "[-]"
log.info(f" [{status}] {check}")
if not result:
all_passed = False
return all_passed
Step 3 — Storage (storage.py)
# pipeline/storage.py
"""
Persist cleaned data to SQLite via SQLAlchemy.
Also provides helpers to query the database.
"""
import logging
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Float, Integer, Boolean, DateTime
from datetime import datetime
log = logging.getLogger(__name__)
DB_PATH = "books.db"
engine = create_engine(f"sqlite:///{DB_PATH}", echo=False)
class Base(DeclarativeBase):
pass
class Book(Base):
__tablename__ = "books"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(500))
price: Mapped[float] = mapped_column(Float)
rating: Mapped[int] = mapped_column(Integer)
in_stock: Mapped[bool] = mapped_column(Boolean)
url: Mapped[str] = mapped_column(String(500), unique=True)
price_band: Mapped[str] = mapped_column(String(50))
value_score: Mapped[float] = mapped_column(Float)
scraped_at: Mapped[datetime] = mapped_column(DateTime)
def init_db() -> None:
Base.metadata.create_all(bind=engine)
log.info("Database initialised.")
def save_books(df: pd.DataFrame) -> int:
"""
Upsert books into the database.
Returns the number of new rows inserted.
"""
with engine.connect() as conn:
existing_urls = set(
row[0] for row in conn.execute(text("SELECT url FROM books")).fetchall()
)
new_books = df[~df["url"].isin(existing_urls)].copy()
if new_books.empty:
log.info("No new books to save.")
return 0
new_books.to_sql(
"books",
engine,
if_exists="append",
index=False,
method="multi",
)
log.info(f"Saved {len(new_books)} new books to database.")
return len(new_books)
def load_books() -> pd.DataFrame:
"""Load all books from the database into a DataFrame."""
return pd.read_sql("SELECT * FROM books ORDER BY price", engine)
def get_stats() -> dict:
"""Return high-level stats from the database."""
df = load_books()
if df.empty:
return {}
return {
"total_books": len(df),
"avg_price": round(df["price"].mean(), 2),
"avg_rating": round(df["rating"].mean(), 2),
"in_stock_count": int(df["in_stock"].sum()),
"price_min": df["price"].min(),
"price_max": df["price"].max(),
}
Step 4 — Analysis (analyser.py)
# pipeline/analyser.py
"""
Analyse the book database and produce summary tables.
"""
import logging
import pandas as pd
from storage import load_books
log = logging.getLogger(__name__)
def analyse(df: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""Return a dict of named summary DataFrames."""
results = {}
# Rating distribution
results["rating_dist"] = (
df.groupby("rating")
.agg(count=("title", "count"), avg_price=("price", "mean"))
.round(2)
)
# Price band breakdown
results["price_bands"] = (
df.groupby("price_band", observed=True)
.agg(
count = ("title", "count"),
avg_rating = ("rating", "mean"),
avg_price = ("price", "mean"),
in_stock = ("in_stock", "sum"),
)
.round(2)
)
# Top 10 best value books (high rating, low price)
results["best_value"] = (
df.sort_values("value_score", ascending=False)
[["title", "price", "rating", "value_score"]]
.head(10)
.reset_index(drop=True)
)
# Top 10 most expensive
results["most_expensive"] = (
df.sort_values("price", ascending=False)
[["title", "price", "rating"]]
.head(10)
.reset_index(drop=True)
)
# Stock availability by price band
results["stock_by_band"] = (
df.groupby("price_band", observed=True)["in_stock"]
.agg(total="count", in_stock="sum")
.assign(pct_in_stock=lambda x: (x["in_stock"] / x["total"] * 100).round(1))
)
return results
def print_report(results: dict[str, pd.DataFrame]) -> None:
"""Print a formatted text report to the console."""
print("\n" + "=" * 60)
print(" BOOK CATALOGUE ANALYSIS REPORT")
print("=" * 60)
print("\n[chart] Books by Rating:")
print(results["rating_dist"].to_string())
print("\n Price Band Breakdown:")
print(results["price_bands"].to_string())
print("\n[trophy] Top 10 Best Value Books:")
for _, row in results["best_value"].iterrows():
print(f" {row['rating']}[*] £{row['price']:.2f} {row['title'][:50]}")
print("\n[pkg] Stock Availability by Price Band:")
print(results["stock_by_band"].to_string())
Step 5 — Visualisation (visualiser.py)
# pipeline/visualiser.py
"""
Generate and save charts from the analysed data.
"""
import logging
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
log = logging.getLogger(__name__)
OUTPUT_DIR = Path("output")
OUTPUT_DIR.mkdir(exist_ok=True)
sns.set_theme(style="whitegrid", palette="muted")
def _save(fig: plt.Figure, name: str) -> None:
path = OUTPUT_DIR / name
fig.savefig(path, dpi=150, bbox_inches="tight")
plt.close(fig)
log.info(f"Saved chart: {path}")
def plot_rating_distribution(df: pd.DataFrame) -> None:
fig, ax = plt.subplots(figsize=(7, 4))
counts = df["rating"].value_counts().sort_index()
bars = ax.bar(
[f"{'[*]' * r}" for r in counts.index],
counts.values,
color=["#f44336", "#ff9800", "#ffc107", "#8bc34a", "#4caf50"],
edgecolor="white",
)
for bar, count in zip(bars, counts.values):
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 5,
str(count), ha="center", va="bottom", fontsize=10)
ax.set_title("Books by Star Rating", fontsize=13)
ax.set_xlabel("Rating")
ax.set_ylabel("Number of Books")
_save(fig, "rating_distribution.png")
def plot_price_distribution(df: pd.DataFrame) -> None:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Histogram
axes[0].hist(df["price"], bins=30, color="steelblue", edgecolor="white", alpha=0.85)
axes[0].axvline(df["price"].mean(), color="red", linestyle="--",
label=f"Mean £{df['price'].mean():.2f}")
axes[0].axvline(df["price"].median(), color="green", linestyle="--",
label=f"Median £{df['price'].median():.2f}")
axes[0].set_title("Price Distribution")
axes[0].set_xlabel("Price (£)")
axes[0].set_ylabel("Count")
axes[0].legend()
# Box plot by rating
sns.boxplot(data=df, x="rating", y="price", palette="muted", ax=axes[1])
axes[1].set_title("Price Distribution by Rating")
axes[1].set_xlabel("Rating (stars)")
axes[1].set_ylabel("Price (£)")
_save(fig, "price_distribution.png")
def plot_price_bands(df: pd.DataFrame) -> None:
band_counts = df["price_band"].value_counts().sort_index()
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Bar chart
axes[0].bar(band_counts.index, band_counts.values,
color=["#4caf50", "#2196f3", "#ff9800", "#f44336"],
edgecolor="white")
axes[0].set_title("Books by Price Band")
axes[0].set_ylabel("Count")
# Pie chart
axes[1].pie(band_counts.values, labels=band_counts.index,
autopct="%1.0f%%", startangle=90,
colors=["#4caf50", "#2196f3", "#ff9800", "#f44336"])
axes[1].set_title("Price Band Share")
_save(fig, "price_bands.png")
def plot_value_vs_price(df: pd.DataFrame) -> None:
fig, ax = plt.subplots(figsize=(9, 5))
scatter = ax.scatter(
df["price"],
df["rating"],
c=df["value_score"],
cmap="RdYlGn",
alpha=0.6,
s=30,
edgecolors="none",
)
plt.colorbar(scatter, ax=ax, label="Value Score")
ax.set_title("Price vs Rating (colour = value score)")
ax.set_xlabel("Price (£)")
ax.set_ylabel("Rating (stars)")
ax.set_yticks([1, 2, 3, 4, 5])
_save(fig, "value_vs_price.png")
def generate_all_charts(df: pd.DataFrame) -> None:
log.info("Generating charts...")
plot_rating_distribution(df)
plot_price_distribution(df)
plot_price_bands(df)
plot_value_vs_price(df)
log.info(f"All charts saved to {OUTPUT_DIR}/")
Step 6 — The Runner (runner.py)
# pipeline/runner.py
"""
Orchestrates the full pipeline:
scrape -> clean -> store -> analyse -> visualise
Optionally schedules it to run daily.
"""
import logging
import time
from pathlib import Path
from datetime import datetime
import schedule
from scraper import scrape_catalogue
from cleaner import clean_books, validate_dataframe
from storage import init_db, save_books, load_books, get_stats
from analyser import analyse, print_report
from visualiser import generate_all_charts
# ── Logging setup ──────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s — %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("pipeline.log"),
],
)
log = logging.getLogger("runner")
# ── Pipeline ───────────────────────────────────────────────────────────────────
def run_pipeline(max_pages: int = 5) -> None:
"""Full pipeline: scrape -> clean -> store -> analyse -> visualise."""
start = time.time()
log.info("=" * 50)
log.info(f"Pipeline started at {datetime.now():%Y-%m-%d %H:%M:%S}")
# 1. Scrape
log.info("STEP 1/5 — Scraping...")
raw_books = scrape_catalogue(max_pages=max_pages, delay=1.0)
if not raw_books:
log.error("No books scraped. Aborting.")
return
# 2. Clean
log.info("STEP 2/5 — Cleaning...")
df = clean_books(raw_books)
log.info("STEP 2b — Validating...")
if not validate_dataframe(df):
log.error("Data validation failed. Aborting.")
return
# Save raw CSV for audit trail
raw_path = Path("data") / f"raw_{datetime.now():%Y%m%d_%H%M}.csv"
raw_path.parent.mkdir(exist_ok=True)
df.to_csv(raw_path, index=False)
log.info(f"Raw data saved to {raw_path}")
# 3. Store
log.info("STEP 3/5 — Storing...")
init_db()
new_count = save_books(df)
# 4. Analyse
log.info("STEP 4/5 — Analysing...")
all_books = load_books()
results = analyse(all_books)
print_report(results)
stats = get_stats()
log.info(f"Database stats: {stats}")
# 5. Visualise
log.info("STEP 5/5 — Visualising...")
generate_all_charts(all_books)
elapsed = time.time() - start
log.info(f"Pipeline completed in {elapsed:.1f}s. {new_count} new books added.")
log.info("=" * 50)
# ── Scheduler ─────────────────────────────────────────────────────────────────
def schedule_pipeline(run_at: str = "06:00") -> None:
"""Run the pipeline every day at the specified time."""
log.info(f"Scheduler started. Pipeline will run daily at {run_at}.")
schedule.every().day.at(run_at).do(run_pipeline)
# Run immediately on first launch
run_pipeline()
while True:
schedule.run_pending()
time.sleep(60)
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
if "--schedule" in sys.argv:
schedule_pipeline()
else:
# Single run
pages = int(sys.argv[1]) if len(sys.argv) > 1 else 5
run_pipeline(max_pages=pages)
Running the Pipeline
# Install dependencies
pip install requests beautifulsoup4 lxml pandas sqlalchemy matplotlib seaborn schedule
# Run once (scrape 5 pages = 100 books)
python pipeline/runner.py 5
# Run all 50 pages (1000 books)
python pipeline/runner.py 50
# Run continuously, daily at 6am
python pipeline/runner.py --schedule
After running, you'll have:
books.db— SQLite database with all booksdata/raw_YYYYMMDD_HHMM.csv— raw CSV snapshotoutput/rating_distribution.pngoutput/price_distribution.pngoutput/price_bands.pngoutput/value_vs_price.pngpipeline.log— full run log
Sample Output
==================================================
BOOK CATALOGUE ANALYSIS REPORT
==================================================
[chart] Books by Rating:
count avg_price
rating
1 20 35.12
2 20 34.78
3 20 36.02
4 20 33.94
5 20 34.45
Price Band Breakdown:
count avg_rating avg_price in_stock
price_band
Budget 18 2.89 10.62 18
Mid-range 29 3.10 15.43 29
Premium 39 3.21 27.81 39
Luxury 14 2.93 54.20 14
[trophy] Top 10 Best Value Books:
1[*] £10.00 A Light in the Attic
5[*] £10.00 Sharp Objects
5[*] £13.99 It's Only the Himalayas
...
[pkg] Stock Availability by Price Band:
total in_stock pct_in_stock
price_band
Budget 18 18 100.0
Mid-range 29 29 100.0
Premium 39 39 100.0
Luxury 14 14 100.0
What You Built
A production-grade data pipeline with:
- Scraper — polite HTTP requests, retry logic with exponential backoff, HTML parsing, pagination
- Cleaner — regex price parsing, rating mapping, deduplication, derived columns (
value_score,price_band), data quality validation with named checks - Storage — SQLAlchemy ORM, upsert logic (skip existing URLs), pandas -> SQL in one call
- Analyser — rating distribution, price band breakdown, best value ranking, stock analysis
- Visualiser — 4 charts (rating bar, price histogram + box plot, price band pie, value scatter)
- Runner — end-to-end orchestration, structured logging to file and console, audit CSV snapshots,
schedulefor daily automation
What's Next?
Chapter 49 covers Building a Desktop App with Tkinter — you'll build a graphical user interface with windows, buttons, forms, and menus, turning your Python scripts into apps that anyone can use without a terminal.