Python: Zero to Hero
Home/Advanced Projects
Share

Chapter 48: Build a Web Scraper and Data Pipeline

Data doesn't arrive clean. It's scattered across websites, in inconsistent formats, with missing values and duplicates. A data pipeline takes raw data from the source, cleans it, transforms it, stores it, and delivers insights.

In this chapter, you build a complete, end-to-end pipeline:

Scrape -> Clean -> Store -> Analyse -> Visualise -> Schedule

We'll scrape real book data from books.toscrape.com, clean it with pandas, store it in SQLite, analyse it, visualise the results, and schedule the whole thing to run automatically.

The Plan

pipeline/
├── pipeline/
│   ├── __init__.py
│   ├── scraper.py      <- fetch + parse HTML
│   ├── cleaner.py      <- validate + transform data
│   ├── storage.py      <- SQLite with SQLAlchemy
│   ├── analyser.py     <- pandas analysis
│   ├── visualiser.py   <- matplotlib + seaborn charts
│   └── runner.py       <- orchestrate + schedule
├── output/             <- charts and reports
├── data/               <- raw and cleaned CSVs
└── books.db            <- SQLite database

Step 1 — The Scraper (scraper.py)

# pipeline/scraper.py
"""
Scrape books.toscrape.com — a practice site built for scraping.
No robots.txt restrictions, no API keys needed.
"""
import time
import logging
import requests
from bs4 import BeautifulSoup
from dataclasses import dataclass, field
from datetime import datetime

log = logging.getLogger(__name__)

BASE_URL   = "https://books.toscrape.com"
RATING_MAP = {"One": 1, "Two": 2, "Three": 3, "Four": 4, "Five": 5}


@dataclass
class RawBook:
    """Raw data as scraped — strings only, no cleaning yet."""
    title:       str
    price_str:   str        # e.g. "£12.99" — raw, may have encoding issues
    rating_word: str        # e.g. "Three"
    in_stock:    bool
    url:         str
    scraped_at:  str = field(default_factory=lambda: datetime.utcnow().isoformat())


def fetch_html(url: str, retries: int = 3, delay: float = 1.0) -> str | None:
    """Fetch a URL with retries and exponential backoff."""
    for attempt in range(retries):
        try:
            response = requests.get(
                url,
                timeout=10,
                headers={"User-Agent": "DataPipelineBot/1.0 (educational)"},
            )
            response.raise_for_status()
            return response.text
        except requests.RequestException as e:
            log.warning(f"Attempt {attempt+1}/{retries} failed for {url}: {e}")
            if attempt < retries - 1:
                time.sleep(delay * (2 ** attempt))   # exponential backoff
    return None


def parse_catalogue_page(html: str, base_url: str) -> tuple[list[RawBook], str | None]:
    """
    Parse one catalogue page.
    Returns (list_of_books, next_page_url_or_None).
    """
    soup  = BeautifulSoup(html, "lxml")
    books = []

    for article in soup.select("article.product_pod"):
        title       = article.find("h3").find("a")["title"]
        price_str   = article.find("p", class_="price_color").text.strip()
        rating_word = article.find("p", class_="star-rating")["class"][1]
        in_stock    = "In stock" in article.find("p", class_="instock").text
        href        = article.find("h3").find("a")["href"]
        url         = f"{base_url}/catalogue/{href.replace('../', '')}"

        books.append(RawBook(
            title       = title,
            price_str   = price_str,
            rating_word = rating_word,
            in_stock    = in_stock,
            url         = url,
        ))

    # Find next page link
    next_btn = soup.select_one("li.next a")
    next_url = None
    if next_btn:
        current_path = base_url + "/catalogue"
        next_url     = f"{current_path}/{next_btn['href']}"

    return books, next_url


def scrape_catalogue(max_pages: int = 50, delay: float = 1.0) -> list[RawBook]:
    """
    Scrape the entire catalogue (50 pages = 1000 books).
    Politely waits between requests.
    """
    all_books = []
    url       = f"{BASE_URL}/catalogue/page-1.html"
    page      = 1

    while url and page <= max_pages:
        log.info(f"Scraping page {page}: {url}")
        html = fetch_html(url)

        if html is None:
            log.error(f"Failed to fetch page {page}. Stopping.")
            break

        books, next_url = parse_catalogue_page(html, BASE_URL)
        all_books.extend(books)
        log.info(f"  Page {page}: {len(books)} books (total: {len(all_books)})")

        url   = next_url
        page += 1

        if url:
            time.sleep(delay)

    log.info(f"Scraping complete. Total books: {len(all_books)}")
    return all_books

Step 2 — The Cleaner (cleaner.py)

# pipeline/cleaner.py
"""
Transform raw scraped data into a clean, validated DataFrame.
"""
import re
import logging
import pandas as pd
from dataclasses import asdict
from scraper import RawBook

log = logging.getLogger(__name__)

RATING_MAP = {"One": 1, "Two": 2, "Three": 3, "Four": 4, "Five": 5}


def parse_price(price_str: str) -> float | None:
    """Extract float from a price string like '£12.99' or '£12.99'."""
    match = re.search(r"(\d+\.\d+)", price_str)
    if match:
        return float(match.group(1))
    return None


def clean_books(raw_books: list[RawBook]) -> pd.DataFrame:
    """
    Convert raw books to a clean DataFrame.

    Transformations:
    - Parse price string -> float
    - Map rating word -> int (1--5)
    - Normalise title (strip whitespace)
    - Drop rows that fail validation
    - Add derived columns
    """
    records = [asdict(b) for b in raw_books]
    df      = pd.DataFrame(records)

    initial_count = len(df)
    log.info(f"Cleaning {initial_count} raw records...")

    # Parse price
    df["price"] = df["price_str"].apply(parse_price)
    price_failures = df["price"].isna().sum()
    if price_failures:
        log.warning(f"  {price_failures} rows have unparseable prices — dropping")
    df = df.dropna(subset=["price"])

    # Map rating
    df["rating"] = df["rating_word"].map(RATING_MAP)
    rating_failures = df["rating"].isna().sum()
    if rating_failures:
        log.warning(f"  {rating_failures} rows have unknown rating — dropping")
    df = df.dropna(subset=["rating"])
    df["rating"] = df["rating"].astype(int)

    # Clean title
    df["title"] = df["title"].str.strip()

    # Drop duplicates by URL
    before_dedup = len(df)
    df = df.drop_duplicates(subset=["url"])
    deduped = before_dedup - len(df)
    if deduped:
        log.info(f"  Removed {deduped} duplicate URLs")

    # Derived columns
    df["price_band"] = pd.cut(
        df["price"],
        bins=[0, 10, 20, 35, 60],
        labels=["Budget", "Mid-range", "Premium", "Luxury"],
    )
    df["value_score"] = (df["rating"] / df["price"] * 10).round(2)

    # Parse scrape timestamp
    df["scraped_at"] = pd.to_datetime(df["scraped_at"])

    # Drop raw intermediate columns
    df = df.drop(columns=["price_str", "rating_word"])

    log.info(f"Cleaning done. {len(df)}/{initial_count} records kept.")
    return df


def validate_dataframe(df: pd.DataFrame) -> bool:
    """Assert data quality expectations."""
    checks = {
        "No null prices":   df["price"].isna().sum() == 0,
        "No null ratings":  df["rating"].isna().sum() == 0,
        "Price > 0":        (df["price"] > 0).all(),
        "Rating in 1-5":    df["rating"].between(1, 5).all(),
        "No empty titles":  (df["title"].str.len() > 0).all(),
    }
    all_passed = True
    for check, result in checks.items():
        status = "[x]" if result else "[-]"
        log.info(f"  [{status}] {check}")
        if not result:
            all_passed = False

    return all_passed

Step 3 — Storage (storage.py)

# pipeline/storage.py
"""
Persist cleaned data to SQLite via SQLAlchemy.
Also provides helpers to query the database.
"""
import logging
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Float, Integer, Boolean, DateTime
from datetime import datetime

log = logging.getLogger(__name__)

DB_PATH = "books.db"
engine  = create_engine(f"sqlite:///{DB_PATH}", echo=False)


class Base(DeclarativeBase):
    pass


class Book(Base):
    __tablename__ = "books"

    id:          Mapped[int]      = mapped_column(primary_key=True)
    title:       Mapped[str]      = mapped_column(String(500))
    price:       Mapped[float]    = mapped_column(Float)
    rating:      Mapped[int]      = mapped_column(Integer)
    in_stock:    Mapped[bool]     = mapped_column(Boolean)
    url:         Mapped[str]      = mapped_column(String(500), unique=True)
    price_band:  Mapped[str]      = mapped_column(String(50))
    value_score: Mapped[float]    = mapped_column(Float)
    scraped_at:  Mapped[datetime] = mapped_column(DateTime)


def init_db() -> None:
    Base.metadata.create_all(bind=engine)
    log.info("Database initialised.")


def save_books(df: pd.DataFrame) -> int:
    """
    Upsert books into the database.
    Returns the number of new rows inserted.
    """
    with engine.connect() as conn:
        existing_urls = set(
            row[0] for row in conn.execute(text("SELECT url FROM books")).fetchall()
        )

    new_books = df[~df["url"].isin(existing_urls)].copy()
    if new_books.empty:
        log.info("No new books to save.")
        return 0

    new_books.to_sql(
        "books",
        engine,
        if_exists="append",
        index=False,
        method="multi",
    )
    log.info(f"Saved {len(new_books)} new books to database.")
    return len(new_books)


def load_books() -> pd.DataFrame:
    """Load all books from the database into a DataFrame."""
    return pd.read_sql("SELECT * FROM books ORDER BY price", engine)


def get_stats() -> dict:
    """Return high-level stats from the database."""
    df = load_books()
    if df.empty:
        return {}
    return {
        "total_books":    len(df),
        "avg_price":      round(df["price"].mean(), 2),
        "avg_rating":     round(df["rating"].mean(), 2),
        "in_stock_count": int(df["in_stock"].sum()),
        "price_min":      df["price"].min(),
        "price_max":      df["price"].max(),
    }

Step 4 — Analysis (analyser.py)

# pipeline/analyser.py
"""
Analyse the book database and produce summary tables.
"""
import logging
import pandas as pd
from storage import load_books

log = logging.getLogger(__name__)


def analyse(df: pd.DataFrame) -> dict[str, pd.DataFrame]:
    """Return a dict of named summary DataFrames."""
    results = {}

    # Rating distribution
    results["rating_dist"] = (
        df.groupby("rating")
          .agg(count=("title", "count"), avg_price=("price", "mean"))
          .round(2)
    )

    # Price band breakdown
    results["price_bands"] = (
        df.groupby("price_band", observed=True)
          .agg(
              count      = ("title",       "count"),
              avg_rating = ("rating",      "mean"),
              avg_price  = ("price",       "mean"),
              in_stock   = ("in_stock",    "sum"),
          )
          .round(2)
    )

    # Top 10 best value books (high rating, low price)
    results["best_value"] = (
        df.sort_values("value_score", ascending=False)
          [["title", "price", "rating", "value_score"]]
          .head(10)
          .reset_index(drop=True)
    )

    # Top 10 most expensive
    results["most_expensive"] = (
        df.sort_values("price", ascending=False)
          [["title", "price", "rating"]]
          .head(10)
          .reset_index(drop=True)
    )

    # Stock availability by price band
    results["stock_by_band"] = (
        df.groupby("price_band", observed=True)["in_stock"]
          .agg(total="count", in_stock="sum")
          .assign(pct_in_stock=lambda x: (x["in_stock"] / x["total"] * 100).round(1))
    )

    return results


def print_report(results: dict[str, pd.DataFrame]) -> None:
    """Print a formatted text report to the console."""
    print("\n" + "=" * 60)
    print("  BOOK CATALOGUE ANALYSIS REPORT")
    print("=" * 60)

    print("\n[chart] Books by Rating:")
    print(results["rating_dist"].to_string())

    print("\n Price Band Breakdown:")
    print(results["price_bands"].to_string())

    print("\n[trophy] Top 10 Best Value Books:")
    for _, row in results["best_value"].iterrows():
        print(f"  {row['rating']}[*]  £{row['price']:.2f}  {row['title'][:50]}")

    print("\n[pkg] Stock Availability by Price Band:")
    print(results["stock_by_band"].to_string())

Step 5 — Visualisation (visualiser.py)

# pipeline/visualiser.py
"""
Generate and save charts from the analysed data.
"""
import logging
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

log = logging.getLogger(__name__)
OUTPUT_DIR = Path("output")
OUTPUT_DIR.mkdir(exist_ok=True)

sns.set_theme(style="whitegrid", palette="muted")


def _save(fig: plt.Figure, name: str) -> None:
    path = OUTPUT_DIR / name
    fig.savefig(path, dpi=150, bbox_inches="tight")
    plt.close(fig)
    log.info(f"Saved chart: {path}")


def plot_rating_distribution(df: pd.DataFrame) -> None:
    fig, ax = plt.subplots(figsize=(7, 4))
    counts = df["rating"].value_counts().sort_index()
    bars   = ax.bar(
        [f"{'[*]' * r}" for r in counts.index],
        counts.values,
        color=["#f44336", "#ff9800", "#ffc107", "#8bc34a", "#4caf50"],
        edgecolor="white",
    )
    for bar, count in zip(bars, counts.values):
        ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 5,
                str(count), ha="center", va="bottom", fontsize=10)
    ax.set_title("Books by Star Rating", fontsize=13)
    ax.set_xlabel("Rating")
    ax.set_ylabel("Number of Books")
    _save(fig, "rating_distribution.png")


def plot_price_distribution(df: pd.DataFrame) -> None:
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))

    # Histogram
    axes[0].hist(df["price"], bins=30, color="steelblue", edgecolor="white", alpha=0.85)
    axes[0].axvline(df["price"].mean(), color="red", linestyle="--",
                    label=f"Mean £{df['price'].mean():.2f}")
    axes[0].axvline(df["price"].median(), color="green", linestyle="--",
                    label=f"Median £{df['price'].median():.2f}")
    axes[0].set_title("Price Distribution")
    axes[0].set_xlabel("Price (£)")
    axes[0].set_ylabel("Count")
    axes[0].legend()

    # Box plot by rating
    sns.boxplot(data=df, x="rating", y="price", palette="muted", ax=axes[1])
    axes[1].set_title("Price Distribution by Rating")
    axes[1].set_xlabel("Rating (stars)")
    axes[1].set_ylabel("Price (£)")

    _save(fig, "price_distribution.png")


def plot_price_bands(df: pd.DataFrame) -> None:
    band_counts = df["price_band"].value_counts().sort_index()

    fig, axes = plt.subplots(1, 2, figsize=(12, 4))

    # Bar chart
    axes[0].bar(band_counts.index, band_counts.values,
                color=["#4caf50", "#2196f3", "#ff9800", "#f44336"],
                edgecolor="white")
    axes[0].set_title("Books by Price Band")
    axes[0].set_ylabel("Count")

    # Pie chart
    axes[1].pie(band_counts.values, labels=band_counts.index,
                autopct="%1.0f%%", startangle=90,
                colors=["#4caf50", "#2196f3", "#ff9800", "#f44336"])
    axes[1].set_title("Price Band Share")

    _save(fig, "price_bands.png")


def plot_value_vs_price(df: pd.DataFrame) -> None:
    fig, ax = plt.subplots(figsize=(9, 5))

    scatter = ax.scatter(
        df["price"],
        df["rating"],
        c=df["value_score"],
        cmap="RdYlGn",
        alpha=0.6,
        s=30,
        edgecolors="none",
    )
    plt.colorbar(scatter, ax=ax, label="Value Score")

    ax.set_title("Price vs Rating (colour = value score)")
    ax.set_xlabel("Price (£)")
    ax.set_ylabel("Rating (stars)")
    ax.set_yticks([1, 2, 3, 4, 5])

    _save(fig, "value_vs_price.png")


def generate_all_charts(df: pd.DataFrame) -> None:
    log.info("Generating charts...")
    plot_rating_distribution(df)
    plot_price_distribution(df)
    plot_price_bands(df)
    plot_value_vs_price(df)
    log.info(f"All charts saved to {OUTPUT_DIR}/")

Step 6 — The Runner (runner.py)

# pipeline/runner.py
"""
Orchestrates the full pipeline:
  scrape -> clean -> store -> analyse -> visualise

Optionally schedules it to run daily.
"""
import logging
import time
from pathlib import Path
from datetime import datetime

import schedule

from scraper    import scrape_catalogue
from cleaner    import clean_books, validate_dataframe
from storage    import init_db, save_books, load_books, get_stats
from analyser   import analyse, print_report
from visualiser import generate_all_charts

# ── Logging setup ──────────────────────────────────────────────────────────────

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)-8s  %(name)s — %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("pipeline.log"),
    ],
)
log = logging.getLogger("runner")

# ── Pipeline ───────────────────────────────────────────────────────────────────

def run_pipeline(max_pages: int = 5) -> None:
    """Full pipeline: scrape -> clean -> store -> analyse -> visualise."""
    start = time.time()
    log.info("=" * 50)
    log.info(f"Pipeline started at {datetime.now():%Y-%m-%d %H:%M:%S}")

    # 1. Scrape
    log.info("STEP 1/5 — Scraping...")
    raw_books = scrape_catalogue(max_pages=max_pages, delay=1.0)
    if not raw_books:
        log.error("No books scraped. Aborting.")
        return

    # 2. Clean
    log.info("STEP 2/5 — Cleaning...")
    df = clean_books(raw_books)

    log.info("STEP 2b — Validating...")
    if not validate_dataframe(df):
        log.error("Data validation failed. Aborting.")
        return

    # Save raw CSV for audit trail
    raw_path = Path("data") / f"raw_{datetime.now():%Y%m%d_%H%M}.csv"
    raw_path.parent.mkdir(exist_ok=True)
    df.to_csv(raw_path, index=False)
    log.info(f"Raw data saved to {raw_path}")

    # 3. Store
    log.info("STEP 3/5 — Storing...")
    init_db()
    new_count = save_books(df)

    # 4. Analyse
    log.info("STEP 4/5 — Analysing...")
    all_books = load_books()
    results   = analyse(all_books)
    print_report(results)

    stats = get_stats()
    log.info(f"Database stats: {stats}")

    # 5. Visualise
    log.info("STEP 5/5 — Visualising...")
    generate_all_charts(all_books)

    elapsed = time.time() - start
    log.info(f"Pipeline completed in {elapsed:.1f}s. {new_count} new books added.")
    log.info("=" * 50)


# ── Scheduler ─────────────────────────────────────────────────────────────────

def schedule_pipeline(run_at: str = "06:00") -> None:
    """Run the pipeline every day at the specified time."""
    log.info(f"Scheduler started. Pipeline will run daily at {run_at}.")
    schedule.every().day.at(run_at).do(run_pipeline)

    # Run immediately on first launch
    run_pipeline()

    while True:
        schedule.run_pending()
        time.sleep(60)


# ── Entry point ───────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import sys
    if "--schedule" in sys.argv:
        schedule_pipeline()
    else:
        # Single run
        pages = int(sys.argv[1]) if len(sys.argv) > 1 else 5
        run_pipeline(max_pages=pages)

Running the Pipeline

# Install dependencies
pip install requests beautifulsoup4 lxml pandas sqlalchemy matplotlib seaborn schedule

# Run once (scrape 5 pages = 100 books)
python pipeline/runner.py 5

# Run all 50 pages (1000 books)
python pipeline/runner.py 50

# Run continuously, daily at 6am
python pipeline/runner.py --schedule

After running, you'll have:

  • books.db — SQLite database with all books
  • data/raw_YYYYMMDD_HHMM.csv — raw CSV snapshot
  • output/rating_distribution.png
  • output/price_distribution.png
  • output/price_bands.png
  • output/value_vs_price.png
  • pipeline.log — full run log

Sample Output

==================================================
  BOOK CATALOGUE ANALYSIS REPORT
==================================================

[chart] Books by Rating:
        count  avg_price
rating
1          20      35.12
2          20      34.78
3          20      36.02
4          20      33.94
5          20      34.45

 Price Band Breakdown:
           count  avg_rating  avg_price  in_stock
price_band
Budget        18        2.89      10.62        18
Mid-range     29        3.10      15.43        29
Premium       39        3.21      27.81        39
Luxury        14        2.93      54.20        14

[trophy] Top 10 Best Value Books:
  1[*]  £10.00  A Light in the Attic
  5[*]  £10.00  Sharp Objects
  5[*]  £13.99  It's Only the Himalayas
  ...

[pkg] Stock Availability by Price Band:
           total  in_stock  pct_in_stock
price_band
Budget        18        18         100.0
Mid-range     29        29         100.0
Premium       39        39         100.0
Luxury        14        14         100.0

What You Built

A production-grade data pipeline with:

  • Scraper — polite HTTP requests, retry logic with exponential backoff, HTML parsing, pagination
  • Cleaner — regex price parsing, rating mapping, deduplication, derived columns (value_score, price_band), data quality validation with named checks
  • Storage — SQLAlchemy ORM, upsert logic (skip existing URLs), pandas -> SQL in one call
  • Analyser — rating distribution, price band breakdown, best value ranking, stock analysis
  • Visualiser — 4 charts (rating bar, price histogram + box plot, price band pie, value scatter)
  • Runner — end-to-end orchestration, structured logging to file and console, audit CSV snapshots, schedule for daily automation

What's Next?

Chapter 49 covers Building a Desktop App with Tkinter — you'll build a graphical user interface with windows, buttons, forms, and menus, turning your Python scripts into apps that anyone can use without a terminal.

© 2026 Abhilash Sahoo. Python: Zero to Hero.