Python: Zero to Hero
Home/Data and Files
Share

Chapter 27: Concurrency — Threading, Multiprocessing, and asyncio

Your computer has multiple cores. Most Python programs use exactly one of them. This chapter shows you how to use the rest — and how to handle situations where your program spends most of its time waiting rather than computing.

There are three tools:

  • Threading — multiple threads in one process, good for I/O-bound work
  • Multiprocessing — multiple processes with separate memory, good for CPU-bound work
  • asyncio — a single thread handling thousands of I/O operations concurrently

Understanding which tool to reach for is more than half the battle. Let's start there.

The Two Types of Bottleneck

I/O-bound work spends most of its time waiting — for network responses, disk reads, database queries, user input. The CPU is mostly idle.

CPU-bound work spends most of its time computing — parsing files, running algorithms, processing images, training models. The CPU is pegged at 100%.

I/O-bound:  |──wait──|compute|──wait──|──wait──|compute|──wait──|
CPU-bound:  |compute|compute|compute|compute|compute|compute|

Solutions:
  I/O-bound  -> threading or asyncio  (overlap the waiting)
  CPU-bound  -> multiprocessing       (use multiple cores)

Threading does NOT help with CPU-bound work in Python — because of the Global Interpreter Lock (GIL).

The GIL — Why Threads Don't Parallelize CPU Work

Python's interpreter uses a lock called the GIL (Global Interpreter Lock). Only one thread can execute Python bytecode at a time. For CPU-bound work, threads take turns — you get no speedup.

For I/O-bound work, the GIL is released during I/O operations (network calls, file reads, time.sleep). While one thread waits, another runs. This is why threading helps for I/O.

Without concurrency:
  request 1: |──wait 2s──|
  request 2:              |──wait 2s──|
  request 3:                          |──wait 2s──|
  Total: 6 seconds

With threading (I/O releases GIL):
  request 1: |──wait 2s──|
  request 2: |──wait 2s──|  (started immediately)
  request 3: |──wait 2s──|  (started immediately)
  Total: ~2 seconds

Threading

threading.Thread runs a function in a separate thread:

import threading
import time

def download(url, results, index):
    """Simulate downloading a URL."""
    print(f"  Starting: {url}")
    time.sleep(2)   # simulate network wait
    results[index] = f"Content of {url}"
    print(f"  Done:     {url}")


urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
]

results = [None] * len(urls)

# Sequential — 6 seconds
start = time.perf_counter()
for i, url in enumerate(urls):
    download(url, results, i)
print(f"Sequential: {time.perf_counter() - start:.1f}s")

# Threaded — ~2 seconds
results = [None] * len(urls)
threads = []

start = time.perf_counter()
for i, url in enumerate(urls):
    t = threading.Thread(target=download, args=(url, results, i))
    threads.append(t)
    t.start()

for t in threads:
    t.join()   # wait for all threads to finish

print(f"Threaded:   {time.perf_counter() - start:.1f}s")
print(results)

Output:

Sequential: 6.0s

  Starting: https://example.com/page1
  Starting: https://example.com/page2
  Starting: https://example.com/page3
  Done:     https://example.com/page1
  Done:     https://example.com/page2
  Done:     https://example.com/page3
Threaded:   2.0s

ThreadPoolExecutor — the cleaner way

concurrent.futures.ThreadPoolExecutor manages a pool of threads for you:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch(url):
    """Simulate fetching a URL. Returns (url, content)."""
    time.sleep(1.5)
    return url, f"<html>Content from {url}</html>"


urls = [f"https://example.com/page{i}" for i in range(8)]

start = time.perf_counter()

with ThreadPoolExecutor(max_workers=4) as executor:
    # Submit all tasks
    futures = {executor.submit(fetch, url): url for url in urls}

    # Process results as they complete
    for future in as_completed(futures):
        url, content = future.result()
        print(f"  Got {len(content)} bytes from {url}")

print(f"Total: {time.perf_counter() - start:.1f}s")
# 8 requests, 4 at a time, each takes 1.5s -> ~3s total

as_completed yields futures in the order they finish — not the order they were submitted. This is useful when you want to process results as soon as they're ready.

If you just want results in submission order:

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(fetch, urls))
    # blocks until all are done, returns in order

Thread safety and locks

Threads share memory. When multiple threads modify the same data at the same time, you get race conditions — unpredictable, hard-to-reproduce bugs.

import threading

counter = 0

def increment(n):
    global counter
    for _ in range(n):
        counter += 1   # NOT thread-safe: read-modify-write


threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Expected: 500000, Got: {counter}")
# Expected: 500000, Got: 423891  (varies each run — race condition)

Fix with a Lock:

import threading

counter = 0
lock    = threading.Lock()

def increment_safe(n):
    global counter
    for _ in range(n):
        with lock:          # only one thread at a time
            counter += 1


threads = [threading.Thread(target=increment_safe, args=(100_000,)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Expected: 500000, Got: {counter}")   # Expected: 500000, Got: 500000

Other synchronization primitives:

import threading

# RLock — re-entrant lock (same thread can acquire multiple times)
rlock = threading.RLock()

# Semaphore — limit concurrent access to N threads
sem = threading.Semaphore(3)   # max 3 threads at a time
with sem:
    pass   # do work

# Event — signal between threads
event = threading.Event()
event.set()      # signal
event.clear()    # unsignal
event.wait()     # block until signaled

# Barrier — wait for N threads to arrive before any proceed
barrier = threading.Barrier(4)
barrier.wait()   # block until 4 threads reach this line

Multiprocessing

For CPU-bound work, use multiprocessing. Each process has its own Python interpreter and its own GIL — true parallelism.

import multiprocessing
import time

def cpu_work(n):
    """A CPU-bound computation."""
    return sum(i * i for i in range(n))


data = [5_000_000] * 8   # 8 chunks of work

# Sequential
start = time.perf_counter()
results = [cpu_work(n) for n in data]
print(f"Sequential: {time.perf_counter() - start:.2f}s")

# Multiprocessing
start = time.perf_counter()
with multiprocessing.Pool() as pool:    # uses all CPU cores
    results = pool.map(cpu_work, data)
print(f"Parallel:   {time.perf_counter() - start:.2f}s")

On a 4-core machine, the parallel version is roughly 4x faster.

ProcessPoolExecutor — mirrors ThreadPoolExecutor

from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def process_chunk(chunk):
    """CPU-intensive processing of a data chunk."""
    return sum(x ** 2 for x in chunk)


# Split work into chunks
data = list(range(10_000_000))
chunk_size = len(data) // 8
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

start = time.perf_counter()

with ProcessPoolExecutor() as executor:
    results = list(executor.map(process_chunk, chunks))

total = sum(results)
print(f"Total: {total}")
print(f"Time:  {time.perf_counter() - start:.2f}s")

Important rules for multiprocessing:

  • Arguments and return values must be picklable (basic Python types, dataclasses, named tuples — but not lambdas, file handles, or locks).
  • Always protect the entry point with if __name__ == "__main__": on Windows.
  • Processes have separate memory — no shared state without explicit shared memory or queues.
# Always do this on Windows when using multiprocessing
if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(cpu_work, data))

Sharing data between processes

from multiprocessing import Process, Queue, Value, Array
import multiprocessing

# Queue — safe for inter-process communication
def producer(queue, items):
    for item in items:
        queue.put(item)
    queue.put(None)   # sentinel value

def consumer(queue, results):
    while True:
        item = queue.get()
        if item is None:
            break
        results.append(item * 2)


# Shared memory — for simple types
counter = multiprocessing.Value("i", 0)   # shared integer
array   = multiprocessing.Array("d", [1.0, 2.0, 3.0])  # shared float array

def increment_shared(counter, n):
    for _ in range(n):
        with counter.get_lock():
            counter.value += 1

For most use cases, prefer passing data via pool.map() arguments and return values, and avoid shared state entirely.

asyncio — Async/Await

asyncio is a fundamentally different model. Instead of multiple threads or processes, one thread manages many tasks by switching between them at await points.

When a task hits an await, it says: "I'm waiting for something. Run something else while I wait." The event loop picks another task and runs it. No OS threads, no locking, no race conditions on shared data (within a single async program).

import asyncio
import time

async def fetch(url, delay):
    """Simulate an async HTTP request."""
    print(f"  Fetching {url}...")
    await asyncio.sleep(delay)   # yields control to the event loop
    print(f"  Done:     {url}")
    return f"Content of {url}"


async def main():
    start = time.perf_counter()

    # Run all fetches concurrently
    results = await asyncio.gather(
        fetch("https://example.com/page1", 1.5),
        fetch("https://example.com/page2", 2.0),
        fetch("https://example.com/page3", 1.0),
    )

    elapsed = time.perf_counter() - start
    print(f"\nAll done in {elapsed:.2f}s")
    for r in results:
        print(f"  {r}")


asyncio.run(main())

Output:

  Fetching https://example.com/page1...
  Fetching https://example.com/page2...
  Fetching https://example.com/page3...
  Done:     https://example.com/page3
  Done:     https://example.com/page1
  Done:     https://example.com/page2

All done in 2.00s   <- the longest task, not the sum

Three 1-2 second fetches complete in 2 seconds total — the same as threading, but with zero threads.

async and await — the rules

An async function is declared with async def. It returns a coroutine when called — not the result. The result comes when you await it.

async def greet(name: str) -> str:
    await asyncio.sleep(0)   # yield to the event loop (trivial here)
    return f"Hello, {name}!"

# This does NOT call greet — it creates a coroutine object
coro = greet("Alice")

# This runs it and gets the result
result = await greet("Alice")   # only valid inside an async function

# Or run it as the top-level entry point
asyncio.run(greet("Alice"))

You can only await inside an async def function. You can only call asyncio.run() once, at the top level.

asyncio.gather — run tasks concurrently

async def main():
    # All three run concurrently — total time = max(individual times)
    a, b, c = await asyncio.gather(
        task_one(),
        task_two(),
        task_three(),
    )

asyncio.create_task — fire and forget

async def main():
    # Start a task without immediately waiting for it
    task = asyncio.create_task(background_work())

    # Do other work while background_work runs
    await do_something_else()

    # Now wait for the background task
    result = await task

Async context managers and iterators

import asyncio

class AsyncTimer:
    async def __aenter__(self):
        self.start = asyncio.get_event_loop().time()
        return self

    async def __aexit__(self, *args):
        elapsed = asyncio.get_event_loop().time() - self.start
        print(f"Elapsed: {elapsed:.3f}s")


async def async_range(n):
    for i in range(n):
        await asyncio.sleep(0)   # yield between each item
        yield i


async def main():
    # Async context manager
    async with AsyncTimer():
        await asyncio.sleep(0.5)

    # Async iterator
    async for i in async_range(5):
        print(i, end=" ")
    print()


asyncio.run(main())

Async with aiohttp — real HTTP requests

For real async HTTP in production, use aiohttp or httpx:

# pip install aiohttp
import asyncio
import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as response:
        return await response.text()


async def fetch_all(urls: list[str]) -> list[str]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)


urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

results = asyncio.run(fetch_all(urls))
# 3 requests x 1s each -> ~1s total (not 3s)

Choosing the Right Tool

Situation Tool Why
Making many HTTP requests asyncio + aiohttp Most efficient for I/O
Reading many files at once ThreadPoolExecutor Simple, I/O releases GIL
Compressing images ProcessPoolExecutor CPU-bound, needs real parallelism
Parsing large CSV files ProcessPoolExecutor CPU-bound
Running a web server asyncio (FastAPI/aiohttp) Handles thousands of connections
Scraping websites asyncio or threads I/O-bound
Data science computation multiprocessing or numpy CPU-bound
Simple background task threading.Thread Simplest option

Quick decision tree:

Is your bottleneck I/O (network, disk)?
  ├── Yes, and you want maximum concurrency -> asyncio
  ├── Yes, and you want simplicity          -> ThreadPoolExecutor
  └── No (CPU is the bottleneck)            -> ProcessPoolExecutor

Project: Async Web Scraper with Rate Limiting

"""
scraper.py — Async web scraper with rate limiting, retries, and timeout.
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import random


@dataclass
class ScrapeResult:
    url:     str
    status:  int
    content: Optional[str]
    elapsed: float
    error:   Optional[str] = None


class RateLimiter:
    """Allow at most `rate` operations per second."""
    def __init__(self, rate: float):
        self.rate     = rate
        self.min_gap  = 1.0 / rate
        self._last    = 0.0
        self._lock    = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now  = asyncio.get_event_loop().time()
            wait = self.min_gap - (now - self._last)
            if wait > 0:
                await asyncio.sleep(wait)
            self._last = asyncio.get_event_loop().time()


async def fetch_with_retry(
    url: str,
    limiter: RateLimiter,
    retries: int = 3,
    timeout: float = 5.0,
) -> ScrapeResult:
    """Fetch url respecting rate limit, with retries on failure."""
    await limiter.acquire()

    start = time.perf_counter()

    for attempt in range(1, retries + 1):
        try:
            # Simulate HTTP request (replace with aiohttp in production)
            await asyncio.wait_for(simulate_request(url), timeout=timeout)
            elapsed = time.perf_counter() - start
            return ScrapeResult(
                url     = url,
                status  = 200,
                content = f"<html>Content from {url}</html>",
                elapsed = elapsed,
            )
        except asyncio.TimeoutError:
            if attempt == retries:
                elapsed = time.perf_counter() - start
                return ScrapeResult(
                    url=url, status=408, content=None,
                    elapsed=elapsed, error="Timeout"
                )
            await asyncio.sleep(0.5 * attempt)   # exponential back-off
        except Exception as e:
            if attempt == retries:
                elapsed = time.perf_counter() - start
                return ScrapeResult(
                    url=url, status=500, content=None,
                    elapsed=elapsed, error=str(e)
                )
            await asyncio.sleep(0.5 * attempt)

    # Should not reach here
    return ScrapeResult(url=url, status=500, content=None, elapsed=0, error="Unknown")


async def simulate_request(url: str) -> None:
    """Simulate a network request (0.1--1.5s, 15% failure rate)."""
    delay = random.uniform(0.1, 1.5)
    await asyncio.sleep(delay)
    if random.random() < 0.15:
        raise ConnectionError("Simulated connection error")


async def scrape_all(
    urls: list[str],
    max_concurrent: int = 10,
    rate_per_second: float = 5.0,
) -> list[ScrapeResult]:
    """Scrape all URLs with concurrency and rate limiting."""
    limiter   = RateLimiter(rate_per_second)
    semaphore = asyncio.Semaphore(max_concurrent)

    async def bounded_fetch(url):
        async with semaphore:
            return await fetch_with_retry(url, limiter)

    tasks   = [bounded_fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return list(results)


def print_summary(results: list[ScrapeResult]) -> None:
    ok      = [r for r in results if r.error is None]
    failed  = [r for r in results if r.error is not None]
    avg_ms  = sum(r.elapsed for r in ok) / len(ok) * 1000 if ok else 0

    print(f"\n{'URL':<35} {'Status':>6} {'Time':>8}  {'Error'}")
    print("-" * 65)
    for r in sorted(results, key=lambda r: r.url):
        status = str(r.status) if r.error is None else f"ERR"
        time_s = f"{r.elapsed*1000:.0f}ms"
        error  = r.error or ""
        print(f"  {r.url:<33} {status:>6} {time_s:>8}  {error}")

    print(f"\nSummary: {len(ok)}/{len(results)} succeeded, avg {avg_ms:.0f}ms per request")
    if failed:
        print(f"Failed:  {[r.url for r in failed]}")


if __name__ == "__main__":
    urls = [f"https://example.com/page{i:02d}" for i in range(20)]

    start = time.perf_counter()
    results = asyncio.run(scrape_all(urls, max_concurrent=5, rate_per_second=4.0))
    total = time.perf_counter() - start

    print_summary(results)
    print(f"\nTotal wall time: {total:.2f}s")

This scraper uses:

  • asyncio.Semaphore to limit concurrency (max 5 at a time)
  • RateLimiter with an asyncio.Lock to cap requests per second
  • asyncio.wait_for for timeouts
  • Exponential back-off on retries
  • asyncio.gather to run all tasks concurrently

What You Learned in This Chapter

  • I/O-bound programs wait for network/disk — threads and asyncio help by overlapping the waiting.
  • CPU-bound programs compute constantly — only multiprocessing gives real speedup in Python.
  • The GIL prevents threads from running Python code truly in parallel, but releases during I/O.
  • threading.Thread and ThreadPoolExecutor run functions in threads — use for I/O-bound work.
  • Use a Lock to protect shared data from race conditions.
  • ProcessPoolExecutor and multiprocessing.Pool run functions in separate processes — use for CPU-bound work. Arguments must be picklable. Guard with if __name__ == "__main__":.
  • async def and await write cooperative coroutines. The event loop switches between them at every await.
  • asyncio.gather runs multiple coroutines concurrently.
  • asyncio.create_task starts a coroutine without waiting for it immediately.
  • asyncio.Semaphore limits how many coroutines run at once.
  • Use aiohttp or httpx for real async HTTP requests.

What's Next?

Chapter 28 covers Testingunittest, pytest, fixtures, parametrize, mocking, and test-driven development. Testing is how you prove your code works, catch regressions, and refactor with confidence. If you write tests, you can change anything. If you don't, every change is a gamble.

© 2026 Abhilash Sahoo. Python: Zero to Hero.