Chapter 27: Concurrency — Threading, Multiprocessing, and asyncio
Your computer has multiple cores. Most Python programs use exactly one of them. This chapter shows you how to use the rest — and how to handle situations where your program spends most of its time waiting rather than computing.
There are three tools:
- Threading — multiple threads in one process, good for I/O-bound work
- Multiprocessing — multiple processes with separate memory, good for CPU-bound work
- asyncio — a single thread handling thousands of I/O operations concurrently
Understanding which tool to reach for is more than half the battle. Let's start there.
The Two Types of Bottleneck
I/O-bound work spends most of its time waiting — for network responses, disk reads, database queries, user input. The CPU is mostly idle.
CPU-bound work spends most of its time computing — parsing files, running algorithms, processing images, training models. The CPU is pegged at 100%.
I/O-bound: |──wait──|compute|──wait──|──wait──|compute|──wait──|
CPU-bound: |compute|compute|compute|compute|compute|compute|
Solutions:
I/O-bound -> threading or asyncio (overlap the waiting)
CPU-bound -> multiprocessing (use multiple cores)
Threading does NOT help with CPU-bound work in Python — because of the Global Interpreter Lock (GIL).
The GIL — Why Threads Don't Parallelize CPU Work
Python's interpreter uses a lock called the GIL (Global Interpreter Lock). Only one thread can execute Python bytecode at a time. For CPU-bound work, threads take turns — you get no speedup.
For I/O-bound work, the GIL is released during I/O operations (network calls, file reads, time.sleep). While one thread waits, another runs. This is why threading helps for I/O.
Without concurrency:
request 1: |──wait 2s──|
request 2: |──wait 2s──|
request 3: |──wait 2s──|
Total: 6 seconds
With threading (I/O releases GIL):
request 1: |──wait 2s──|
request 2: |──wait 2s──| (started immediately)
request 3: |──wait 2s──| (started immediately)
Total: ~2 seconds
Threading
threading.Thread runs a function in a separate thread:
import threading
import time
def download(url, results, index):
"""Simulate downloading a URL."""
print(f" Starting: {url}")
time.sleep(2) # simulate network wait
results[index] = f"Content of {url}"
print(f" Done: {url}")
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
results = [None] * len(urls)
# Sequential — 6 seconds
start = time.perf_counter()
for i, url in enumerate(urls):
download(url, results, i)
print(f"Sequential: {time.perf_counter() - start:.1f}s")
# Threaded — ~2 seconds
results = [None] * len(urls)
threads = []
start = time.perf_counter()
for i, url in enumerate(urls):
t = threading.Thread(target=download, args=(url, results, i))
threads.append(t)
t.start()
for t in threads:
t.join() # wait for all threads to finish
print(f"Threaded: {time.perf_counter() - start:.1f}s")
print(results)
Output:
Sequential: 6.0s
Starting: https://example.com/page1
Starting: https://example.com/page2
Starting: https://example.com/page3
Done: https://example.com/page1
Done: https://example.com/page2
Done: https://example.com/page3
Threaded: 2.0s
ThreadPoolExecutor — the cleaner way
concurrent.futures.ThreadPoolExecutor manages a pool of threads for you:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch(url):
"""Simulate fetching a URL. Returns (url, content)."""
time.sleep(1.5)
return url, f"<html>Content from {url}</html>"
urls = [f"https://example.com/page{i}" for i in range(8)]
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit all tasks
futures = {executor.submit(fetch, url): url for url in urls}
# Process results as they complete
for future in as_completed(futures):
url, content = future.result()
print(f" Got {len(content)} bytes from {url}")
print(f"Total: {time.perf_counter() - start:.1f}s")
# 8 requests, 4 at a time, each takes 1.5s -> ~3s total
as_completed yields futures in the order they finish — not the order they were submitted. This is useful when you want to process results as soon as they're ready.
If you just want results in submission order:
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(fetch, urls))
# blocks until all are done, returns in order
Thread safety and locks
Threads share memory. When multiple threads modify the same data at the same time, you get race conditions — unpredictable, hard-to-reproduce bugs.
import threading
counter = 0
def increment(n):
global counter
for _ in range(n):
counter += 1 # NOT thread-safe: read-modify-write
threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Expected: 500000, Got: {counter}")
# Expected: 500000, Got: 423891 (varies each run — race condition)
Fix with a Lock:
import threading
counter = 0
lock = threading.Lock()
def increment_safe(n):
global counter
for _ in range(n):
with lock: # only one thread at a time
counter += 1
threads = [threading.Thread(target=increment_safe, args=(100_000,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Expected: 500000, Got: {counter}") # Expected: 500000, Got: 500000
Other synchronization primitives:
import threading
# RLock — re-entrant lock (same thread can acquire multiple times)
rlock = threading.RLock()
# Semaphore — limit concurrent access to N threads
sem = threading.Semaphore(3) # max 3 threads at a time
with sem:
pass # do work
# Event — signal between threads
event = threading.Event()
event.set() # signal
event.clear() # unsignal
event.wait() # block until signaled
# Barrier — wait for N threads to arrive before any proceed
barrier = threading.Barrier(4)
barrier.wait() # block until 4 threads reach this line
Multiprocessing
For CPU-bound work, use multiprocessing. Each process has its own Python interpreter and its own GIL — true parallelism.
import multiprocessing
import time
def cpu_work(n):
"""A CPU-bound computation."""
return sum(i * i for i in range(n))
data = [5_000_000] * 8 # 8 chunks of work
# Sequential
start = time.perf_counter()
results = [cpu_work(n) for n in data]
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Multiprocessing
start = time.perf_counter()
with multiprocessing.Pool() as pool: # uses all CPU cores
results = pool.map(cpu_work, data)
print(f"Parallel: {time.perf_counter() - start:.2f}s")
On a 4-core machine, the parallel version is roughly 4x faster.
ProcessPoolExecutor — mirrors ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def process_chunk(chunk):
"""CPU-intensive processing of a data chunk."""
return sum(x ** 2 for x in chunk)
# Split work into chunks
data = list(range(10_000_000))
chunk_size = len(data) // 8
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
start = time.perf_counter()
with ProcessPoolExecutor() as executor:
results = list(executor.map(process_chunk, chunks))
total = sum(results)
print(f"Total: {total}")
print(f"Time: {time.perf_counter() - start:.2f}s")
Important rules for multiprocessing:
- Arguments and return values must be picklable (basic Python types, dataclasses, named tuples — but not lambdas, file handles, or locks).
- Always protect the entry point with
if __name__ == "__main__":on Windows. - Processes have separate memory — no shared state without explicit shared memory or queues.
# Always do this on Windows when using multiprocessing
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_work, data))
Sharing data between processes
from multiprocessing import Process, Queue, Value, Array
import multiprocessing
# Queue — safe for inter-process communication
def producer(queue, items):
for item in items:
queue.put(item)
queue.put(None) # sentinel value
def consumer(queue, results):
while True:
item = queue.get()
if item is None:
break
results.append(item * 2)
# Shared memory — for simple types
counter = multiprocessing.Value("i", 0) # shared integer
array = multiprocessing.Array("d", [1.0, 2.0, 3.0]) # shared float array
def increment_shared(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1
For most use cases, prefer passing data via pool.map() arguments and return values, and avoid shared state entirely.
asyncio — Async/Await
asyncio is a fundamentally different model. Instead of multiple threads or processes, one thread manages many tasks by switching between them at await points.
When a task hits an await, it says: "I'm waiting for something. Run something else while I wait." The event loop picks another task and runs it. No OS threads, no locking, no race conditions on shared data (within a single async program).
import asyncio
import time
async def fetch(url, delay):
"""Simulate an async HTTP request."""
print(f" Fetching {url}...")
await asyncio.sleep(delay) # yields control to the event loop
print(f" Done: {url}")
return f"Content of {url}"
async def main():
start = time.perf_counter()
# Run all fetches concurrently
results = await asyncio.gather(
fetch("https://example.com/page1", 1.5),
fetch("https://example.com/page2", 2.0),
fetch("https://example.com/page3", 1.0),
)
elapsed = time.perf_counter() - start
print(f"\nAll done in {elapsed:.2f}s")
for r in results:
print(f" {r}")
asyncio.run(main())
Output:
Fetching https://example.com/page1...
Fetching https://example.com/page2...
Fetching https://example.com/page3...
Done: https://example.com/page3
Done: https://example.com/page1
Done: https://example.com/page2
All done in 2.00s <- the longest task, not the sum
Three 1-2 second fetches complete in 2 seconds total — the same as threading, but with zero threads.
async and await — the rules
An async function is declared with async def. It returns a coroutine when called — not the result. The result comes when you await it.
async def greet(name: str) -> str:
await asyncio.sleep(0) # yield to the event loop (trivial here)
return f"Hello, {name}!"
# This does NOT call greet — it creates a coroutine object
coro = greet("Alice")
# This runs it and gets the result
result = await greet("Alice") # only valid inside an async function
# Or run it as the top-level entry point
asyncio.run(greet("Alice"))
You can only await inside an async def function. You can only call asyncio.run() once, at the top level.
asyncio.gather — run tasks concurrently
async def main():
# All three run concurrently — total time = max(individual times)
a, b, c = await asyncio.gather(
task_one(),
task_two(),
task_three(),
)
asyncio.create_task — fire and forget
async def main():
# Start a task without immediately waiting for it
task = asyncio.create_task(background_work())
# Do other work while background_work runs
await do_something_else()
# Now wait for the background task
result = await task
Async context managers and iterators
import asyncio
class AsyncTimer:
async def __aenter__(self):
self.start = asyncio.get_event_loop().time()
return self
async def __aexit__(self, *args):
elapsed = asyncio.get_event_loop().time() - self.start
print(f"Elapsed: {elapsed:.3f}s")
async def async_range(n):
for i in range(n):
await asyncio.sleep(0) # yield between each item
yield i
async def main():
# Async context manager
async with AsyncTimer():
await asyncio.sleep(0.5)
# Async iterator
async for i in async_range(5):
print(i, end=" ")
print()
asyncio.run(main())
Async with aiohttp — real HTTP requests
For real async HTTP in production, use aiohttp or httpx:
# pip install aiohttp
import asyncio
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls: list[str]) -> list[str]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
results = asyncio.run(fetch_all(urls))
# 3 requests x 1s each -> ~1s total (not 3s)
Choosing the Right Tool
| Situation | Tool | Why |
|---|---|---|
| Making many HTTP requests | asyncio + aiohttp |
Most efficient for I/O |
| Reading many files at once | ThreadPoolExecutor |
Simple, I/O releases GIL |
| Compressing images | ProcessPoolExecutor |
CPU-bound, needs real parallelism |
| Parsing large CSV files | ProcessPoolExecutor |
CPU-bound |
| Running a web server | asyncio (FastAPI/aiohttp) |
Handles thousands of connections |
| Scraping websites | asyncio or threads |
I/O-bound |
| Data science computation | multiprocessing or numpy |
CPU-bound |
| Simple background task | threading.Thread |
Simplest option |
Quick decision tree:
Is your bottleneck I/O (network, disk)?
├── Yes, and you want maximum concurrency -> asyncio
├── Yes, and you want simplicity -> ThreadPoolExecutor
└── No (CPU is the bottleneck) -> ProcessPoolExecutor
Project: Async Web Scraper with Rate Limiting
"""
scraper.py — Async web scraper with rate limiting, retries, and timeout.
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import random
@dataclass
class ScrapeResult:
url: str
status: int
content: Optional[str]
elapsed: float
error: Optional[str] = None
class RateLimiter:
"""Allow at most `rate` operations per second."""
def __init__(self, rate: float):
self.rate = rate
self.min_gap = 1.0 / rate
self._last = 0.0
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = asyncio.get_event_loop().time()
wait = self.min_gap - (now - self._last)
if wait > 0:
await asyncio.sleep(wait)
self._last = asyncio.get_event_loop().time()
async def fetch_with_retry(
url: str,
limiter: RateLimiter,
retries: int = 3,
timeout: float = 5.0,
) -> ScrapeResult:
"""Fetch url respecting rate limit, with retries on failure."""
await limiter.acquire()
start = time.perf_counter()
for attempt in range(1, retries + 1):
try:
# Simulate HTTP request (replace with aiohttp in production)
await asyncio.wait_for(simulate_request(url), timeout=timeout)
elapsed = time.perf_counter() - start
return ScrapeResult(
url = url,
status = 200,
content = f"<html>Content from {url}</html>",
elapsed = elapsed,
)
except asyncio.TimeoutError:
if attempt == retries:
elapsed = time.perf_counter() - start
return ScrapeResult(
url=url, status=408, content=None,
elapsed=elapsed, error="Timeout"
)
await asyncio.sleep(0.5 * attempt) # exponential back-off
except Exception as e:
if attempt == retries:
elapsed = time.perf_counter() - start
return ScrapeResult(
url=url, status=500, content=None,
elapsed=elapsed, error=str(e)
)
await asyncio.sleep(0.5 * attempt)
# Should not reach here
return ScrapeResult(url=url, status=500, content=None, elapsed=0, error="Unknown")
async def simulate_request(url: str) -> None:
"""Simulate a network request (0.1--1.5s, 15% failure rate)."""
delay = random.uniform(0.1, 1.5)
await asyncio.sleep(delay)
if random.random() < 0.15:
raise ConnectionError("Simulated connection error")
async def scrape_all(
urls: list[str],
max_concurrent: int = 10,
rate_per_second: float = 5.0,
) -> list[ScrapeResult]:
"""Scrape all URLs with concurrency and rate limiting."""
limiter = RateLimiter(rate_per_second)
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_fetch(url):
async with semaphore:
return await fetch_with_retry(url, limiter)
tasks = [bounded_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
return list(results)
def print_summary(results: list[ScrapeResult]) -> None:
ok = [r for r in results if r.error is None]
failed = [r for r in results if r.error is not None]
avg_ms = sum(r.elapsed for r in ok) / len(ok) * 1000 if ok else 0
print(f"\n{'URL':<35} {'Status':>6} {'Time':>8} {'Error'}")
print("-" * 65)
for r in sorted(results, key=lambda r: r.url):
status = str(r.status) if r.error is None else f"ERR"
time_s = f"{r.elapsed*1000:.0f}ms"
error = r.error or ""
print(f" {r.url:<33} {status:>6} {time_s:>8} {error}")
print(f"\nSummary: {len(ok)}/{len(results)} succeeded, avg {avg_ms:.0f}ms per request")
if failed:
print(f"Failed: {[r.url for r in failed]}")
if __name__ == "__main__":
urls = [f"https://example.com/page{i:02d}" for i in range(20)]
start = time.perf_counter()
results = asyncio.run(scrape_all(urls, max_concurrent=5, rate_per_second=4.0))
total = time.perf_counter() - start
print_summary(results)
print(f"\nTotal wall time: {total:.2f}s")
This scraper uses:
asyncio.Semaphoreto limit concurrency (max 5 at a time)RateLimiterwith anasyncio.Lockto cap requests per secondasyncio.wait_forfor timeouts- Exponential back-off on retries
asyncio.gatherto run all tasks concurrently
What You Learned in This Chapter
- I/O-bound programs wait for network/disk — threads and asyncio help by overlapping the waiting.
- CPU-bound programs compute constantly — only multiprocessing gives real speedup in Python.
- The GIL prevents threads from running Python code truly in parallel, but releases during I/O.
threading.ThreadandThreadPoolExecutorrun functions in threads — use for I/O-bound work.- Use a
Lockto protect shared data from race conditions. ProcessPoolExecutorandmultiprocessing.Poolrun functions in separate processes — use for CPU-bound work. Arguments must be picklable. Guard withif __name__ == "__main__":.async defandawaitwrite cooperative coroutines. The event loop switches between them at everyawait.asyncio.gatherruns multiple coroutines concurrently.asyncio.create_taskstarts a coroutine without waiting for it immediately.asyncio.Semaphorelimits how many coroutines run at once.- Use
aiohttporhttpxfor real async HTTP requests.
What's Next?
Chapter 28 covers Testing — unittest, pytest, fixtures, parametrize, mocking, and test-driven development. Testing is how you prove your code works, catch regressions, and refactor with confidence. If you write tests, you can change anything. If you don't, every change is a gamble.