Threading vs multiprocessing в Python: что выбрать и почему
Глубокий разбор threading и multiprocessing в Python: GIL, CPU-bound vs I/O-bound, concurrent.futures, синхронизация, IPC и реальные примеры кода для собеседования.
- 1. Почему выбор важен: GIL и природа задачи
- 1.1 Краткий recap: что такое GIL
- 1.2 CPU-bound vs I/O-bound: измеряем разницу
- 2. Модуль threading
- 2.1 Thread, start(), join(), daemon
- 2.2 ThreadLocal — изолированное состояние
- 2.3 Примитивы синхронизации
- 2.4 Queue — потокобезопасная очередь
- 2.5 Типичные ловушки: race condition и deadlock
- 3. Модуль multiprocessing
- 3.1 Process, start(), join(), terminate()
- 3.2 Pool — пул процессов
- 3.3 IPC: Queue и Pipe
- 3.4 Разделяемая память: Value, Array, Manager
- 3.5 Overhead процессов vs потоков
- 4. concurrent.futures: единый высокоуровневый API
- 4.1 ThreadPoolExecutor vs ProcessPoolExecutor
- 4.2 submit() + Future
- 4.3 map() — параллельный итератор
- 4.4 as_completed() — обработка по мере готовности
- 4.5 Пример: I/O-bound vs CPU-bound
- 5. Когда что выбирать: практическая шпаргалка
- Таблица сравнения
- asyncio vs threading vs multiprocessing
- 6. Производительность и подводные камни
- 6.1 Pickling — что нельзя передать в Process
- 6.2 Утечки ресурсов: всегда используйте with / shutdown
- 6.3 fork vs spawn vs forkserver
- 7. Итоги и что спросят на собеседовании
- В чём главное отличие threading от multiprocessing?
- Почему threading не ускоряет CPU-bound задачи?
- Как правильно передать данные между процессами?
- Что такое concurrent.futures и когда его использовать?
- Как происходит deadlock и как его предотвратить?
- Чем fork отличается от spawn?
- Когда asyncio лучше threading?
- Можно ли комбинировать asyncio и multiprocessing?
- Что такое race condition и как её поймать?
- Когда стоит использовать multiprocessing.Pool вместо ProcessPoolExecutor?
- Почему нельзя использовать lambda в ProcessPoolExecutor?
- FAQ
В Python три инструмента для конкурентности: threading, multiprocessing и asyncio. Каждый решает свой класс задач, и неправильный выбор приводит либо к коду, который не ускоряется вообще, либо к лишней сложности там, где хватило бы простого решения. На собеседовании вопрос «когда threading, а когда multiprocessing?» звучит почти гарантированно — и поверхностного «threading для I/O, multiprocessing для CPU» обычно недостаточно.
В этой статье разберём оба модуля глубоко: механику работы, примитивы синхронизации, межпроцессное взаимодействие, единый API через concurrent.futures и практические ловушки. По пути посмотрим на измеримые примеры — не просто объяснения, а код, который можно запустить и увидеть разницу своими глазами.
Если вы ещё не читали про GIL — стоит начать с GIL в Python: как работает и что спрашивают на собеседовании. Там объяснено, почему потоки в Python не дают параллелизма для CPU-задач, — здесь мы опираемся на это понимание. А про asyncio как третью модель конкурентности — в Asyncio в Python: event loop, async/await и задачи.
Python на собеседовании — без стресса
50 вопросов с разбором: GIL, threading, multiprocessing и asyncio.
1. Почему выбор важен: GIL и природа задачи
1.1 Краткий recap: что такое GIL
GIL — Global Interpreter Lock — мьютекс внутри CPython, который разрешает выполнять Python-байткод только одному потоку в каждый момент времени. Он защищает внутренние структуры интерпретатора: прежде всего счётчики ссылок объектов (ob_refcnt), через которые CPython управляет памятью.
Следствие прямое: даже если вы создадите 8 потоков на 8-ядерном процессоре, Python-байткод будет выполняться на одном ядре в каждый момент. Остальные потоки ждут своей очереди на GIL.
Важный нюанс: GIL освобождается при I/O-операциях. Пока один поток ждёт ответа от сети или диска, другой поток захватывает GIL и выполняет код. Именно поэтому threading полезен для I/O-bound задач и бесполезен для CPU-bound.
Подробнее о механике GIL — в статье GIL в Python. Здесь важен практический вывод: перед выбором инструмента нужно понять природу своей задачи.
1.2 CPU-bound vs I/O-bound: измеряем разницу
CPU-bound задача — вычисления, где процессор занят, а не ждёт. Чистый Python, без сетевых запросов и файловых операций. Посмотрим, что даёт threading на такой задаче:
import threading
import time
def cpu_task(n):
"""Чистые вычисления — GIL держим почти всё время"""
total = 0
for i in range(n):
total += i * i
return total
N = 10_000_000
# Последовательно
start = time.perf_counter()
cpu_task(N)
cpu_task(N)
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Два потока
start = time.perf_counter()
t1 = threading.Thread(target=cpu_task, args=(N,))
t2 = threading.Thread(target=cpu_task, args=(N,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"2 threads: {time.perf_counter() - start:.2f}s")
# Результат: потоки работают примерно так же или медленнее
Почему потоки не ускоряют? Они не работают параллельно — GIL пропускает только один. Плюс добавляются накладные расходы на переключение контекста и постоянный захват/освобождение GIL.
I/O-bound задача — другая история. Пока поток ждёт ответа сервера, GIL свободен, и другие потоки могут работать:
import threading
import time
import urllib.request
def fetch(url):
urllib.request.urlopen(url) # GIL освобождается на время ожидания
urls = ["https://httpbin.org/delay/1"] * 4
# Последовательно: ~4 секунды
start = time.perf_counter()
for url in urls:
fetch(url)
print(f"Sequential: {time.perf_counter() - start:.1f}s")
# 4 потока: ~1 секунда
start = time.perf_counter()
threads = [threading.Thread(target=fetch, args=(u,)) for u in urls]
for t in threads: t.start()
for t in threads: t.join()
print(f"4 threads: {time.perf_counter() - start:.1f}s")
Четыре запроса по одной секунде выполняются параллельно — итого около секунды вместо четырёх. Это и есть конкурентность через threading: не параллелизм по CPU, а параллельное ожидание.
2. Модуль threading
2.1 Thread, start(), join(), daemon
Базовый способ создать поток — инстанцировать threading.Thread с target-функцией:
import threading
import time
def worker(name: str, delay: float):
print(f"[{name}] started")
time.sleep(delay)
print(f"[{name}] done after {delay}s")
# Создаём потоки
t1 = threading.Thread(target=worker, args=("Alpha", 1.0))
t2 = threading.Thread(target=worker, args=("Beta", 0.5))
# start() — запускает поток
t1.start()
t2.start()
# join() — ждёт завершения потока
t2.join()
print("Beta finished")
t1.join()
print("Alpha finished")
Альтернатива — наследоваться от Thread и переопределить run(). Это удобно, когда поток несёт состояние:
class WorkerThread(threading.Thread):
def __init__(self, name: str, delay: float):
super().__init__()
self.name = name
self.delay = delay
self.result = None
def run(self):
time.sleep(self.delay)
self.result = f"{self.name} computed"
t = WorkerThread("Compute", 0.5)
t.start()
t.join()
print(t.result) # Compute computed
Daemon-потоки. По умолчанию основная программа ждёт завершения всех не-daemon потоков перед выходом. Daemon-поток (daemon=True) убивается принудительно, когда завершается основной поток — используется для фоновых задач, которые не нужно завершать «чисто»:
def background_monitor():
while True:
print("monitoring...")
time.sleep(1)
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
# Когда main завершится — monitor убьётся автоматически
time.sleep(3)
print("main done")
Важно: daemon-поток не получает возможности освободить ресурсы (закрыть файл, завершить транзакцию). Для таких задач нужен обычный поток с явной логикой завершения.
2.2 ThreadLocal — изолированное состояние
Когда потоки разделяют глобальное состояние, возникают гонки. Иногда нужна обратная задача: чтобы у каждого потока было своё изолированное состояние — и threading.local() решает именно её.
import threading
# local() создаёт объект, у которого для каждого потока своя копия атрибутов
thread_local = threading.local()
def process_request(user_id: int):
# Каждый поток пишет в свою копию thread_local.user
thread_local.user = user_id
time.sleep(0.01) # имитация работы
# Читаем из своей же копии — не пересекаемся с другими потоками
print(f"Thread {threading.current_thread().name}: user={thread_local.user}")
threads = [
threading.Thread(target=process_request, args=(i,), name=f"T{i}")
for i in range(5)
]
for t in threads: t.start()
for t in threads: t.join()
Типичный use-case — хранение соединения с базой данных или сессии для каждого потока. Flask, Django ORM и многие другие фреймворки используют local() внутри для request-scoped состояния.
2.3 Примитивы синхронизации
Lock — базовый мьютекс. Самый простой примитив: захвати перед изменением, освободи после.
import threading
counter = 0
lock = threading.Lock()
def safe_increment(n: int):
global counter
for _ in range(n):
with lock: # acquire() + release() автоматически
counter += 1
threads = [threading.Thread(target=safe_increment, args=(100_000,)) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # Всегда 1_000_000 — гонок нет
Всегда используйте with lock вместо ручного lock.acquire() / lock.release() — защищает от забытого release() при исключении.
RLock — реентерабельный лок. Позволяет одному потоку захватить лок несколько раз (нужно столько же раз освободить). Полезен для рекурсивных функций или методов, которые вызывают друг друга:
rlock = threading.RLock()
def outer():
with rlock:
print("outer acquired")
inner() # тот же поток захватывает rlock снова — ок
def inner():
with rlock: # обычный Lock дедлокнул бы здесь
print("inner acquired")
outer()
Обычный Lock заблокировался бы при повторном захвате тем же потоком — поток ждал бы сам себя бесконечно.
Semaphore — ограничение параллелизма. Позволяет одновременно работать не более N потокам. Используется для rate limiting, ограничения числа одновременных соединений:
# Максимум 3 потока одновременно делают запросы
semaphore = threading.Semaphore(3)
def fetch_with_limit(url: str):
with semaphore:
print(f"Fetching {url}")
time.sleep(0.5) # имитация запроса
print(f"Done {url}")
threads = [
threading.Thread(target=fetch_with_limit, args=(f"url-{i}",))
for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
# Видим, что в каждый момент активны максимум 3 потока
Event — сигнал между потоками. Один поток ждёт события, другой его сигнализирует:
event = threading.Event()
def waiter():
print("Waiter: ожидаю сигнала...")
event.wait() # блокируется до event.set()
print("Waiter: получил сигнал!")
def signaler():
time.sleep(1)
print("Signaler: отправляю сигнал")
event.set()
threading.Thread(target=waiter).start()
threading.Thread(target=signaler).start()
Condition — ожидание с условием. Комбинирует Lock и Event: поток ждёт, пока не выполнится условие, другой поток уведомляет:
condition = threading.Condition()
items = []
def producer():
for i in range(5):
time.sleep(0.3)
with condition:
items.append(i)
condition.notify() # будим одного ждущего
def consumer():
while True:
with condition:
while not items:
condition.wait() # ждём, пока появится элемент
item = items.pop(0)
print(f"Consumed: {item}")
if item == 4:
break
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start(); c.start()
p.join(); c.join()
2.4 Queue — потокобезопасная очередь
queue.Queue — главный инструмент для передачи данных между потоками. Она потокобезопасна по дизайну: внутри использует Condition, и вам не нужны явные локи.
import queue
import threading
import time
q = queue.Queue(maxsize=10) # ограниченный буфер
def producer(q: queue.Queue, n: int):
for i in range(n):
item = f"item-{i}"
q.put(item) # блокируется, если очередь полна
print(f"Produced: {item}")
q.put(None) # sentinel — сигнал завершения
def consumer(q: queue.Queue):
while True:
item = q.get() # блокируется, если очередь пуста
if item is None:
break
print(f"Consumed: {item}")
q.task_done() # сигнал, что элемент обработан
p = threading.Thread(target=producer, args=(q, 5))
c = threading.Thread(target=consumer, args=(q,))
p.start(); c.start()
p.join(); c.join()
q.join() # ждём, пока все task_done() не будут вызваны
print("All done")
Помимо Queue, есть queue.LifoQueue (стек) и queue.PriorityQueue. Для большинства задач типичен паттерн producer-consumer с обычным Queue.
2.5 Типичные ловушки: race condition и deadlock
Race condition возникает, когда несколько потоков изменяют общее состояние без синхронизации. GIL не защищает: он гарантирует атомарность только на уровне одной байткод-инструкции, а counter += 1 — это несколько инструкций:
counter = 0
def unsafe_increment():
global counter
for _ in range(100_000):
counter += 1 # LOAD_GLOBAL, LOAD_CONST, BINARY_OP, STORE_GLOBAL — 4 инструкции
# GIL может переключиться после любой из них
threads = [threading.Thread(target=unsafe_increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # Почти всегда меньше 1_000_000
Deadlock — взаимная блокировка: поток A держит lock1 и ждёт lock2, поток B держит lock2 и ждёт lock1. Оба ждут вечно.
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread_a():
with lock1:
time.sleep(0.1)
with lock2: # ждёт, пока thread_b освободит lock2
print("A done")
def thread_b():
with lock2:
time.sleep(0.1)
with lock1: # ждёт, пока thread_a освободит lock1
print("B done")
# Эти два потока дедлокнут
t1 = threading.Thread(target=thread_a)
t2 = threading.Thread(target=thread_b)
t1.start(); t2.start()
# Программа зависнет
Как избежать дедлока: всегда захватывать локи в одном и том же порядке во всех потоках, использовать lock.acquire(timeout=...) с таймаутом, или применять threading.RLock для реентерабельных случаев.
3. Модуль multiprocessing
3.1 Process, start(), join(), terminate()
multiprocessing.Process — аналог threading.Thread, только запускает отдельный процесс операционной системы:
from multiprocessing import Process
import os
import time
def worker(name: str):
print(f"[{name}] PID={os.getpid()}, PPID={os.getppid()}")
time.sleep(1)
print(f"[{name}] done")
if __name__ == "__main__":
p1 = Process(target=worker, args=("Alpha",))
p2 = Process(target=worker, args=("Beta",))
p1.start()
p2.start()
p1.join()
p2.join()
print("All processes done")
Защита if __name__ == "__main__" обязательна на Windows и macOS (режим spawn): без неё каждый дочерний процесс будет пытаться запустить родительский код при старте, что приведёт к бесконечной рекурсии.
terminate() посылает процессу SIGTERM — принудительное завершение. kill() посылает SIGKILL. В отличие от потоков, процессы можно убивать «жёстко», но лучше использовать явный sentinel-паттерн для аккуратного завершения.
p = Process(target=long_task)
p.start()
time.sleep(2)
if p.is_alive():
p.terminate() # мягкое завершение
p.join(timeout=3) # ждём до 3 секунд
if p.is_alive():
p.kill() # жёсткое завершение
p.join()
3.2 Pool — пул процессов
multiprocessing.Pool — высокоуровневый способ распараллелить функцию на набор данных. Создаёт пул из N процессов и распределяет задачи между ними:
from multiprocessing import Pool
import time
def compute(n: int) -> int:
"""CPU-bound задача"""
return sum(i * i for i in range(n))
if __name__ == "__main__":
data = [5_000_000] * 8
# Последовательно
start = time.perf_counter()
results = [compute(n) for n in data]
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Параллельно с Pool
start = time.perf_counter()
with Pool(processes=4) as pool:
results = pool.map(compute, data)
print(f"Pool(4 processes): {time.perf_counter() - start:.2f}s")
# На 4-ядерном CPU даёт ~4x ускорение
Основные методы Pool:
pool.map(func, iterable)— применяет функцию к каждому элементу, возвращает список результатов в исходном порядке. Блокирует до завершения всех.pool.starmap(func, iterable)— аналогmap, но каждый элемент — кортеж аргументов:pool.starmap(func, [(a1, b1), (a2, b2), ...]).pool.apply_async(func, args)— неблокирующий вызов, возвращаетAsyncResult. Позволяет запускать задачи без ожидания каждой.
with Pool(4) as pool:
# apply_async: запускаем задачи и собираем результаты потом
futures = [pool.apply_async(compute, (n,)) for n in data]
results = [f.get() for f in futures] # get() блокирует до готовности
3.3 IPC: Queue и Pipe
Процессы не разделяют память — данные нужно явно передавать через механизмы IPC (Inter-Process Communication).
multiprocessing.Queue — потокобезопасная и процессобезопасная очередь. Работает через os pipe под капотом:
from multiprocessing import Process, Queue
def producer(q: Queue, items: list):
for item in items:
q.put(item)
q.put(None) # sentinel
def consumer(q: Queue):
while True:
item = q.get()
if item is None:
break
print(f"Processed: {item ** 2}")
if __name__ == "__main__":
q = Queue()
data = list(range(10))
p = Process(target=producer, args=(q, data))
c = Process(target=consumer, args=(q,))
p.start(); c.start()
p.join(); c.join()
Pipe — двусторонний канал связи между двумя процессами. Быстрее Queue для простых случаев «один-к-одному»:
from multiprocessing import Process, Pipe
def child(conn):
msg = conn.recv()
print(f"Child received: {msg}")
conn.send(f"Echo: {msg}")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p = Process(target=child, args=(child_conn,))
p.start()
parent_conn.send("Hello from parent")
response = parent_conn.recv()
print(f"Parent received: {response}")
p.join()
Pipe() возвращает два Connection-объекта: у каждого есть send() и recv(). По умолчанию Pipe двусторонний — оба конца могут и отправлять, и получать.
3.4 Разделяемая память: Value, Array, Manager
Иногда нужно, чтобы несколько процессов читали и изменяли одни данные. multiprocessing.Value и Array дают доступ к разделяемой памяти через ctypes:
from multiprocessing import Process, Value, Array
import ctypes
def increment(shared_counter, n: int):
for _ in range(n):
with shared_counter.get_lock(): # нужен явный лок!
shared_counter.value += 1
if __name__ == "__main__":
counter = Value(ctypes.c_int, 0)
processes = [
Process(target=increment, args=(counter, 100_000))
for _ in range(4)
]
for p in processes: p.start()
for p in processes: p.join()
print(counter.value) # 400_000
Value сам по себе не атомарен — нужен явный get_lock(). Array работает аналогично, но для последовательности элементов одного типа.
Manager — более гибкий вариант: сервер-процесс с прокси-объектами, которые поддерживают привычные Python-типы (dict, list, Namespace):
from multiprocessing import Process, Manager
def worker(shared_dict, key: str, value: int):
shared_dict[key] = value
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()
processes = [
Process(target=worker, args=(d, f"key{i}", i))
for i in range(5)
]
for p in processes: p.start()
for p in processes: p.join()
print(dict(d)) # {'key0': 0, 'key1': 1, ...}
Manager удобен, но медленнее Value/Array — каждое обращение проходит через IPC к серверному процессу. Для высоконагруженных сценариев предпочтительнее Value/Array или архитектура без разделяемого состояния.
3.5 Overhead процессов vs потоков
Создание процесса значительно дороже создания потока. Поток — несколько мегабайт стека и структура данных ОС. Процесс — полная копия адресного пространства, инициализация нового интерпретатора Python, загрузка модулей.
import threading
import multiprocessing
import time
def empty_task():
pass
N = 100
# Потоки
start = time.perf_counter()
threads = [threading.Thread(target=empty_task) for _ in range(N)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threads: {time.perf_counter() - start:.3f}s")
# Процессы
if __name__ == "__main__":
start = time.perf_counter()
processes = [multiprocessing.Process(target=empty_task) for _ in range(N)]
for p in processes: p.start()
for p in processes: p.join()
print(f"Processes: {time.perf_counter() - start:.3f}s")
# Процессы на порядок медленнее при старте
Практическое правило: multiprocessing оправдан, когда время выполнения задачи на процесс — хотя бы десятки миллисекунд, а объём передаваемых данных (через pickle) небольшой. Для коротких задач с большими данными overhead полностью съедает выигрыш.
Python-вопросы в Telegram
Ежедневные разборы GIL, asyncio и паттернов.
4. concurrent.futures: единый высокоуровневый API
4.1 ThreadPoolExecutor vs ProcessPoolExecutor
Модуль concurrent.futures предоставляет единый интерфейс для пулов потоков и процессов. Вместо ручного управления жизненным циклом Thread/Process — просто передаём задачи в executor:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# ThreadPoolExecutor — пул потоков (для I/O-bound)
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_url, urls))
# ProcessPoolExecutor — пул процессов (для CPU-bound)
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute, data))
with-блок гарантирует корректное завершение: при выходе executor вызывает shutdown(wait=True) — дожидается всех запущенных задач. Это важно: утечки потоков/процессов — распространённая ошибка.
max_workers по умолчанию: для ThreadPoolExecutor — min(32, cpu_count + 4), для ProcessPoolExecutor — cpu_count(). Для I/O-bound задач разумно выставлять больше, чем число ядер — потоки большую часть времени ждут.
4.2 submit() + Future
submit() — неблокирующий запуск задачи. Возвращает объект Future — обещание результата:
from concurrent.futures import ThreadPoolExecutor
import time
def slow_task(n: int) -> str:
time.sleep(n)
return f"task-{n} done"
with ThreadPoolExecutor(max_workers=3) as executor:
# submit() не блокирует — задача поставлена в пул
future1 = executor.submit(slow_task, 2)
future2 = executor.submit(slow_task, 1)
future3 = executor.submit(slow_task, 3)
# Можно делать другую работу пока задачи выполняются
print("Tasks submitted, doing other work...")
# result() блокирует до готовности конкретного future
print(future2.result()) # возвращает первым — он самый быстрый
print(future1.result())
print(future3.result())
Future поддерживает несколько полезных методов:
result(timeout=None)— возвращает результат или бросает исключениеexception()— возвращает исключение (не бросает)done()—True, если задача завершенаadd_done_callback(fn)— регистрирует callback, который вызовется при завершении
4.3 map() — параллельный итератор
executor.map(func, iterable) — самый простой способ применить функцию к набору данных параллельно:
from concurrent.futures import ProcessPoolExecutor
def square(n: int) -> int:
return n * n
with ProcessPoolExecutor() as executor:
# map() возвращает итератор с результатами в исходном порядке
squares = list(executor.map(square, range(10)))
print(squares) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
В отличие от Pool.map, executor.map() можно вызывать с chunksize — батчингом задач, что снижает overhead IPC при большом числе мелких задач:
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, range(10_000), chunksize=100))
Без chunksize каждый элемент — отдельный IPC-запрос. С chunksize=100 данные передаются пачками по 100 элементов — значительно быстрее для маленьких задач.
4.4 as_completed() — обработка по мере готовности
as_completed() позволяет обрабатывать результаты в порядке завершения, а не в исходном порядке:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def fetch_data(url: str) -> str:
delay = random.uniform(0.5, 2.0)
time.sleep(delay)
return f"{url} -> {delay:.2f}s"
urls = [f"https://api.example.com/endpoint/{i}" for i in range(8)]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(fetch_data, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(f"Done: {result}")
except Exception as e:
print(f"Failed {url}: {e}")
Паттерн {future: original_data} — стандартный способ связать future с исходными данными. as_completed() возвращает Future-объекты в порядке их завершения — самые быстрые приходят первыми.
4.5 Пример: I/O-bound vs CPU-bound
Финальное сравнение: один и тот же код, разный executor — и совершенно разная производительность в зависимости от природы задачи:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import urllib.request
# I/O-BOUND: загрузка URL
def fetch(url: str) -> int:
with urllib.request.urlopen(url, timeout=10) as r:
return len(r.read())
urls = ["https://httpbin.org/delay/1"] * 8
# Плохо: ProcessPoolExecutor для I/O — избыточный overhead
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as ex:
list(ex.map(fetch, urls))
print(f"ProcessPool (I/O): {time.perf_counter() - start:.1f}s")
# Хорошо: ThreadPoolExecutor для I/O
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=8) as ex:
list(ex.map(fetch, urls))
print(f"ThreadPool (I/O): {time.perf_counter() - start:.1f}s")
# CPU-BOUND: факториал
def compute(n: int) -> int:
result = 1
for i in range(2, n + 1):
result *= i
return result
data = [200_000] * 8
# Плохо: ThreadPoolExecutor для CPU — GIL не даёт параллелизма
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as ex:
list(ex.map(compute, data))
print(f"ThreadPool (CPU): {time.perf_counter() - start:.1f}s")
# Хорошо: ProcessPoolExecutor для CPU
if __name__ == "__main__":
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as ex:
list(ex.map(compute, data))
print(f"ProcessPool (CPU): {time.perf_counter() - start:.1f}s")
5. Когда что выбирать: практическая шпаргалка
Таблица сравнения
| Параметр | threading | multiprocessing | asyncio |
|---|---|---|---|
| Модель | Потоки в одном процессе | Отдельные процессы | Корутины в одном потоке |
| Параллелизм по CPU | Нет (GIL) | Да (отдельные GIL) | Нет (однопоточный) |
| I/O-конкурентность | Да | Избыточно | Да (лучший вариант) |
| Разделение памяти | Общая | Изолированная (pickle) | Общая |
| Накладные расходы | Низкие | Высокие (fork + pickle) | Минимальные |
| Синхронизация | Lock, Semaphore, Event | Queue, Pipe, Value | asyncio.Lock, asyncio.Queue |
| Подходит для | I/O-bound: сеть, файлы, БД | CPU-bound: вычисления | I/O-bound с высокой нагрузкой |
asyncio vs threading vs multiprocessing
Выбор зависит от двух вопросов: где тратится время и какая нагрузка?
threading — разумный выбор для I/O-bound задач с умеренным числом параллельных операций (десятки). Синхронные библиотеки (requests, psycopg2) работают в потоках без переписывания. Модель знакома и проще asyncio для начинающих.
asyncio — лучший выбор для I/O-bound задач с высокой нагрузкой (тысячи соединений). Меньше overhead, чем потоки, без переключений контекста ОС. Требует асинхронных библиотек (aiohttp, asyncpg). Подробнее — в Asyncio в Python: event loop, async/await и задачи.
multiprocessing — единственный способ получить настоящий CPU-параллелизм в Python. Для числодробилок, ML-препроцессинга, обработки данных. ProcessPoolExecutor через concurrent.futures — предпочтительный API.
Комбинация: asyncio + ProcessPoolExecutor через loop.run_in_executor() — стандартный паттерн для смешанных задач: I/O через event loop, тяжёлые вычисления offload в процессы.
import asyncio
from concurrent.futures import ProcessPoolExecutor
def heavy_compute(data):
return sum(x ** 2 for x in data)
async def handle_request(data):
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy_compute, data)
return result
6. Производительность и подводные камни
6.1 Pickling — что нельзя передать в Process
Данные между процессами передаются через pickle — сериализацию Python. Не всё можно сериализовать, и это частый источник ошибок:
from multiprocessing import Pool
import pickle
# Что нельзя pickle-ить:
# - lambda функции
# - вложенные функции (closures с захваченными переменными)
# - некоторые объекты с состоянием (сокеты, файловые дескрипторы)
# - генераторы
# ОШИБКА: lambda не pickle-ится
with Pool(4) as pool:
# pool.map(lambda x: x**2, range(10)) # AttributeError!
pass
# РАБОТАЕТ: обычная функция на верхнем уровне модуля
def square(x):
return x ** 2
with Pool(4) as pool:
print(pool.map(square, range(10)))
# Проверить, можно ли передать объект:
def check_picklable(obj):
try:
pickle.dumps(obj)
return True
except Exception as e:
print(f"Not picklable: {e}")
return False
Правило: функции, передаваемые в Pool/ProcessPoolExecutor, должны быть объявлены на верхнем уровне модуля (не как lambda и не как вложенные функции). Аргументы — сериализуемые объекты.
6.2 Утечки ресурсов: всегда используйте with / shutdown
Пул потоков или процессов — ресурс, который нужно явно закрывать. Незакрытый пул продолжает удерживать потоки/процессы:
from concurrent.futures import ThreadPoolExecutor
# ПЛОХО: создаём executor без with — утечка потоков
executor = ThreadPoolExecutor(max_workers=10)
results = executor.map(some_func, data)
# executor никогда не закроется явно!
# ХОРОШО: with гарантирует shutdown(wait=True)
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(some_func, data))
# Или явно:
executor = ThreadPoolExecutor(max_workers=10)
try:
results = list(executor.map(some_func, data))
finally:
executor.shutdown(wait=True)
То же касается Pool из multiprocessing: всегда используйте with Pool(...) as pool или явный pool.close() + pool.join().
6.3 fork vs spawn vs forkserver
При создании нового процесса Python использует один из трёх методов запуска — start method. Это важно знать, особенно при отладке странного поведения на разных платформах.
fork (по умолчанию на Linux): дочерний процесс — полная копия родительского через os.fork(). Быстрый старт, но копирует всё состояние родителя, включая открытые сокеты и блокировки. Может быть небезопасен в многопоточных программах: если в момент fork один из потоков держал мьютекс, дочерний процесс унаследует его в залоченном состоянии.
spawn (по умолчанию на Windows и macOS с Python 3.8+): запускает свежий интерпретатор Python, импортирует модуль и вызывает целевую функцию. Чисто, безопасно, но медленнее — нужно заново импортировать всё.
forkserver: запускает отдельный сервер-процесс при старте программы; новые процессы создаются через него. Компромисс между fork и spawn.
import multiprocessing
# Изменить метод запуска (делать в if __name__ == "__main__")
if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
# или "fork", "forkserver"
p = multiprocessing.Process(target=some_func)
p.start()
p.join()
На практике: если ваш код работает на Linux через Docker — используется fork. Если разрабатываете на macOS и деплоите на Linux — поведение может отличаться. spawn — наиболее предсказуемый вариант на всех платформах.
7. Итоги и что спросят на собеседовании
В чём главное отличие threading от multiprocessing?
threading создаёт потоки в одном процессе с общей памятью. GIL разрешает выполнять Python-байткод только одному потоку в каждый момент — настоящего параллелизма по CPU нет. Зато I/O-операции освобождают GIL, потоки дёшевы в создании и могут легко обмениваться данными через общие объекты.
multiprocessing создаёт отдельные процессы с изолированной памятью. Каждый процесс — свой GIL, настоящий параллелизм. Дорог в создании, данные нужно передавать через IPC (Queue, Pipe, pickle). Это выбор для CPU-bound задач.
Почему threading не ускоряет CPU-bound задачи?
Из-за GIL. В каждый момент только один поток выполняет Python-байткод. CPU-bound задача постоянно держит GIL — другим потокам ничего не достаётся. В итоге потоки работают последовательно, добавляя лишь overhead на переключение контекста.
Как правильно передать данные между процессами?
Через multiprocessing.Queue для очереди задач, multiprocessing.Pipe для двустороннего канала двух процессов, Value/Array для разделяемой памяти примитивных типов, Manager() для Python-словарей и списков. Функции и аргументы для Pool/ProcessPoolExecutor сериализуются через pickle автоматически — убедитесь, что объекты picklable.
Что такое concurrent.futures и когда его использовать?
Высокоуровневый API с единым интерфейсом для потоков и процессов. ThreadPoolExecutor для I/O-bound, ProcessPoolExecutor для CPU-bound. Предпочтительнее низкоуровневых Thread/Process для большинства задач: не нужно управлять жизненным циклом, есть удобный API (submit, map, as_completed).
Как происходит deadlock и как его предотвратить?
Deadlock — взаимная блокировка: поток A держит lock1 и ждёт lock2, поток B держит lock2 и ждёт lock1. Оба ждут вечно. Предотвращение: всегда захватывать несколько локов в одном фиксированном порядке во всех потоках; использовать lock.acquire(timeout=N) с явной обработкой неудачи; по возможности избегать нескольких локов одновременно.
Чем fork отличается от spawn?
fork — создаёт дочерний процесс как копию родительского через os.fork(). Быстро, но наследует всё состояние, включая открытые файловые дескрипторы и блокировки. spawn — запускает свежий интерпретатор Python. Медленнее, зато чисто. По умолчанию на Linux — fork, на Windows/macOS — spawn. Для максимальной переносимости используйте spawn.
Когда asyncio лучше threading?
Для I/O-bound задач с высокой нагрузкой (тысячи параллельных соединений). Потоки потребляют память (стек каждого потока — мегабайты), и переключение контекста между ними имеет overhead. asyncio работает в одном потоке, корутины — легковесны, тысячи конкурентных операций — норма. Недостаток: требует асинхронных библиотек.
Можно ли комбинировать asyncio и multiprocessing?
Да, и это стандартный паттерн. loop.run_in_executor(ProcessPoolExecutor(), func, *args) запускает CPU-heavy функцию в отдельном процессе, возвращая asyncio.Future. Event loop продолжает обрабатывать другие корутины — не блокируется. Так строят высоконагруженные сервисы: I/O через asyncio, тяжёлые вычисления offload в процессы.
Что такое race condition и как её поймать?
Race condition — состояние гонки, когда результат зависит от порядка выполнения потоков. Типичный пример: counter += 1 в нескольких потоках без лока. Поймать сложно — баги проявляются нестабильно. Инструменты: запуск с -X dev (включает дополнительные проверки), использование threading.Lock для всех shared-объектов, предпочтение queue.Queue и immutable данных.
Когда стоит использовать multiprocessing.Pool вместо ProcessPoolExecutor?
Pool имеет дополнительные методы (imap, imap_unordered, starmap, apply) и более гибкое управление пулом. ProcessPoolExecutor — стандартизированный API через concurrent.futures, лучше интегрируется с asyncio через run_in_executor. Для нового кода предпочтительнее ProcessPoolExecutor; Pool оправдан, если нужен imap_unordered или другие специфические методы.
Почему нельзя использовать lambda в ProcessPoolExecutor?
ProcessPoolExecutor сериализует функции через pickle, а lambda-функции не поддерживают pickle (они анонимны и не имеют привязки к модулю). Вместо lambda используйте обычные функции на верхнем уровне модуля или классы с __call__. Если нужна логика с параметрами — functools.partial тоже не всегда pickle-ится; лучше написать явную функцию.
Python-вопросы в Telegram
Ежедневные разборы GIL, asyncio и паттернов.
FAQ
Автор
Lexicon Team
Читайте также
backend
GIL в Python простыми словами: как работает и что спрашивают на собеседовании
Разбираем Global Interpreter Lock с нуля: история появления, bytecode-переключение, влияние на threading/multiprocessing, обход GIL и free-threaded Python 3.13.
backend
Asyncio в Python: event loop, async/await и задачи
Полный разбор asyncio: как работает event loop, coroutines и async/await, создание задач через asyncio.Task и gather — и что точно спросят на собеседовании.
backend
Channels в Go: буферизированные и небуферизированные
Полный разбор каналов в Go: небуферизированные vs буферизированные, направленные каналы, select, закрытие, паттерны pipeline/fan-out/fan-in и частые ошибки на собеседовании.