Threading vs multiprocessing в Python: что выбрать и почему

Глубокий разбор threading и multiprocessing в Python: GIL, CPU-bound vs I/O-bound, concurrent.futures, синхронизация, IPC и реальные примеры кода для собеседования.

02 марта 2026 г.20 минLexicon Team

В Python три инструмента для конкурентности: threading, multiprocessing и asyncio. Каждый решает свой класс задач, и неправильный выбор приводит либо к коду, который не ускоряется вообще, либо к лишней сложности там, где хватило бы простого решения. На собеседовании вопрос «когда threading, а когда multiprocessing?» звучит почти гарантированно — и поверхностного «threading для I/O, multiprocessing для CPU» обычно недостаточно.

В этой статье разберём оба модуля глубоко: механику работы, примитивы синхронизации, межпроцессное взаимодействие, единый API через concurrent.futures и практические ловушки. По пути посмотрим на измеримые примеры — не просто объяснения, а код, который можно запустить и увидеть разницу своими глазами.

Если вы ещё не читали про GIL — стоит начать с GIL в Python: как работает и что спрашивают на собеседовании. Там объяснено, почему потоки в Python не дают параллелизма для CPU-задач, — здесь мы опираемся на это понимание. А про asyncio как третью модель конкурентности — в Asyncio в Python: event loop, async/await и задачи.

Python на собеседовании — без стресса

50 вопросов с разбором: GIL, threading, multiprocessing и asyncio.

Начать тренировку

1. Почему выбор важен: GIL и природа задачи

1.1 Краткий recap: что такое GIL

GIL — Global Interpreter Lock — мьютекс внутри CPython, который разрешает выполнять Python-байткод только одному потоку в каждый момент времени. Он защищает внутренние структуры интерпретатора: прежде всего счётчики ссылок объектов (ob_refcnt), через которые CPython управляет памятью.

Следствие прямое: даже если вы создадите 8 потоков на 8-ядерном процессоре, Python-байткод будет выполняться на одном ядре в каждый момент. Остальные потоки ждут своей очереди на GIL.

Важный нюанс: GIL освобождается при I/O-операциях. Пока один поток ждёт ответа от сети или диска, другой поток захватывает GIL и выполняет код. Именно поэтому threading полезен для I/O-bound задач и бесполезен для CPU-bound.

Подробнее о механике GIL — в статье GIL в Python. Здесь важен практический вывод: перед выбором инструмента нужно понять природу своей задачи.

1.2 CPU-bound vs I/O-bound: измеряем разницу

CPU-bound задача — вычисления, где процессор занят, а не ждёт. Чистый Python, без сетевых запросов и файловых операций. Посмотрим, что даёт threading на такой задаче:

import threading
import time

def cpu_task(n):
    """Чистые вычисления — GIL держим почти всё время"""
    total = 0
    for i in range(n):
        total += i * i
    return total

N = 10_000_000

# Последовательно
start = time.perf_counter()
cpu_task(N)
cpu_task(N)
print(f"Sequential:  {time.perf_counter() - start:.2f}s")

# Два потока
start = time.perf_counter()
t1 = threading.Thread(target=cpu_task, args=(N,))
t2 = threading.Thread(target=cpu_task, args=(N,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"2 threads:   {time.perf_counter() - start:.2f}s")
# Результат: потоки работают примерно так же или медленнее

Почему потоки не ускоряют? Они не работают параллельно — GIL пропускает только один. Плюс добавляются накладные расходы на переключение контекста и постоянный захват/освобождение GIL.

I/O-bound задача — другая история. Пока поток ждёт ответа сервера, GIL свободен, и другие потоки могут работать:

import threading
import time
import urllib.request

def fetch(url):
    urllib.request.urlopen(url)  # GIL освобождается на время ожидания

urls = ["https://httpbin.org/delay/1"] * 4

# Последовательно: ~4 секунды
start = time.perf_counter()
for url in urls:
    fetch(url)
print(f"Sequential:  {time.perf_counter() - start:.1f}s")

# 4 потока: ~1 секунда
start = time.perf_counter()
threads = [threading.Thread(target=fetch, args=(u,)) for u in urls]
for t in threads: t.start()
for t in threads: t.join()
print(f"4 threads:   {time.perf_counter() - start:.1f}s")

Четыре запроса по одной секунде выполняются параллельно — итого около секунды вместо четырёх. Это и есть конкурентность через threading: не параллелизм по CPU, а параллельное ожидание.

2. Модуль threading

2.1 Thread, start(), join(), daemon

Базовый способ создать поток — инстанцировать threading.Thread с target-функцией:

import threading
import time

def worker(name: str, delay: float):
    print(f"[{name}] started")
    time.sleep(delay)
    print(f"[{name}] done after {delay}s")

# Создаём потоки
t1 = threading.Thread(target=worker, args=("Alpha", 1.0))
t2 = threading.Thread(target=worker, args=("Beta", 0.5))

# start() — запускает поток
t1.start()
t2.start()

# join() — ждёт завершения потока
t2.join()
print("Beta finished")
t1.join()
print("Alpha finished")

Альтернатива — наследоваться от Thread и переопределить run(). Это удобно, когда поток несёт состояние:

class WorkerThread(threading.Thread):
    def __init__(self, name: str, delay: float):
        super().__init__()
        self.name = name
        self.delay = delay
        self.result = None

    def run(self):
        time.sleep(self.delay)
        self.result = f"{self.name} computed"

t = WorkerThread("Compute", 0.5)
t.start()
t.join()
print(t.result)  # Compute computed

Daemon-потоки. По умолчанию основная программа ждёт завершения всех не-daemon потоков перед выходом. Daemon-поток (daemon=True) убивается принудительно, когда завершается основной поток — используется для фоновых задач, которые не нужно завершать «чисто»:

def background_monitor():
    while True:
        print("monitoring...")
        time.sleep(1)

monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
# Когда main завершится — monitor убьётся автоматически
time.sleep(3)
print("main done")

Важно: daemon-поток не получает возможности освободить ресурсы (закрыть файл, завершить транзакцию). Для таких задач нужен обычный поток с явной логикой завершения.

2.2 ThreadLocal — изолированное состояние

Когда потоки разделяют глобальное состояние, возникают гонки. Иногда нужна обратная задача: чтобы у каждого потока было своё изолированное состояние — и threading.local() решает именно её.

import threading

# local() создаёт объект, у которого для каждого потока своя копия атрибутов
thread_local = threading.local()

def process_request(user_id: int):
    # Каждый поток пишет в свою копию thread_local.user
    thread_local.user = user_id
    time.sleep(0.01)  # имитация работы
    # Читаем из своей же копии — не пересекаемся с другими потоками
    print(f"Thread {threading.current_thread().name}: user={thread_local.user}")

threads = [
    threading.Thread(target=process_request, args=(i,), name=f"T{i}")
    for i in range(5)
]
for t in threads: t.start()
for t in threads: t.join()

Типичный use-case — хранение соединения с базой данных или сессии для каждого потока. Flask, Django ORM и многие другие фреймворки используют local() внутри для request-scoped состояния.

2.3 Примитивы синхронизации

Lock — базовый мьютекс. Самый простой примитив: захвати перед изменением, освободи после.

import threading

counter = 0
lock = threading.Lock()

def safe_increment(n: int):
    global counter
    for _ in range(n):
        with lock:  # acquire() + release() автоматически
            counter += 1

threads = [threading.Thread(target=safe_increment, args=(100_000,)) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # Всегда 1_000_000 — гонок нет

Всегда используйте with lock вместо ручного lock.acquire() / lock.release() — защищает от забытого release() при исключении.

RLock — реентерабельный лок. Позволяет одному потоку захватить лок несколько раз (нужно столько же раз освободить). Полезен для рекурсивных функций или методов, которые вызывают друг друга:

rlock = threading.RLock()

def outer():
    with rlock:
        print("outer acquired")
        inner()  # тот же поток захватывает rlock снова — ок

def inner():
    with rlock:  # обычный Lock дедлокнул бы здесь
        print("inner acquired")

outer()

Обычный Lock заблокировался бы при повторном захвате тем же потоком — поток ждал бы сам себя бесконечно.

Semaphore — ограничение параллелизма. Позволяет одновременно работать не более N потокам. Используется для rate limiting, ограничения числа одновременных соединений:

# Максимум 3 потока одновременно делают запросы
semaphore = threading.Semaphore(3)

def fetch_with_limit(url: str):
    with semaphore:
        print(f"Fetching {url}")
        time.sleep(0.5)  # имитация запроса
        print(f"Done {url}")

threads = [
    threading.Thread(target=fetch_with_limit, args=(f"url-{i}",))
    for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
# Видим, что в каждый момент активны максимум 3 потока

Event — сигнал между потоками. Один поток ждёт события, другой его сигнализирует:

event = threading.Event()

def waiter():
    print("Waiter: ожидаю сигнала...")
    event.wait()  # блокируется до event.set()
    print("Waiter: получил сигнал!")

def signaler():
    time.sleep(1)
    print("Signaler: отправляю сигнал")
    event.set()

threading.Thread(target=waiter).start()
threading.Thread(target=signaler).start()

Condition — ожидание с условием. Комбинирует Lock и Event: поток ждёт, пока не выполнится условие, другой поток уведомляет:

condition = threading.Condition()
items = []

def producer():
    for i in range(5):
        time.sleep(0.3)
        with condition:
            items.append(i)
            condition.notify()  # будим одного ждущего

def consumer():
    while True:
        with condition:
            while not items:
                condition.wait()  # ждём, пока появится элемент
            item = items.pop(0)
        print(f"Consumed: {item}")
        if item == 4:
            break

p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start(); c.start()
p.join(); c.join()

2.4 Queue — потокобезопасная очередь

queue.Queue — главный инструмент для передачи данных между потоками. Она потокобезопасна по дизайну: внутри использует Condition, и вам не нужны явные локи.

import queue
import threading
import time

q = queue.Queue(maxsize=10)  # ограниченный буфер

def producer(q: queue.Queue, n: int):
    for i in range(n):
        item = f"item-{i}"
        q.put(item)  # блокируется, если очередь полна
        print(f"Produced: {item}")
    q.put(None)  # sentinel — сигнал завершения

def consumer(q: queue.Queue):
    while True:
        item = q.get()  # блокируется, если очередь пуста
        if item is None:
            break
        print(f"Consumed: {item}")
        q.task_done()  # сигнал, что элемент обработан

p = threading.Thread(target=producer, args=(q, 5))
c = threading.Thread(target=consumer, args=(q,))

p.start(); c.start()
p.join(); c.join()

q.join()  # ждём, пока все task_done() не будут вызваны
print("All done")

Помимо Queue, есть queue.LifoQueue (стек) и queue.PriorityQueue. Для большинства задач типичен паттерн producer-consumer с обычным Queue.

2.5 Типичные ловушки: race condition и deadlock

Race condition возникает, когда несколько потоков изменяют общее состояние без синхронизации. GIL не защищает: он гарантирует атомарность только на уровне одной байткод-инструкции, а counter += 1 — это несколько инструкций:

counter = 0

def unsafe_increment():
    global counter
    for _ in range(100_000):
        counter += 1  # LOAD_GLOBAL, LOAD_CONST, BINARY_OP, STORE_GLOBAL — 4 инструкции
        # GIL может переключиться после любой из них

threads = [threading.Thread(target=unsafe_increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # Почти всегда меньше 1_000_000

Deadlock — взаимная блокировка: поток A держит lock1 и ждёт lock2, поток B держит lock2 и ждёт lock1. Оба ждут вечно.

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread_a():
    with lock1:
        time.sleep(0.1)
        with lock2:   # ждёт, пока thread_b освободит lock2
            print("A done")

def thread_b():
    with lock2:
        time.sleep(0.1)
        with lock1:   # ждёт, пока thread_a освободит lock1
            print("B done")

# Эти два потока дедлокнут
t1 = threading.Thread(target=thread_a)
t2 = threading.Thread(target=thread_b)
t1.start(); t2.start()
# Программа зависнет

Как избежать дедлока: всегда захватывать локи в одном и том же порядке во всех потоках, использовать lock.acquire(timeout=...) с таймаутом, или применять threading.RLock для реентерабельных случаев.

3. Модуль multiprocessing

3.1 Process, start(), join(), terminate()

multiprocessing.Process — аналог threading.Thread, только запускает отдельный процесс операционной системы:

from multiprocessing import Process
import os
import time

def worker(name: str):
    print(f"[{name}] PID={os.getpid()}, PPID={os.getppid()}")
    time.sleep(1)
    print(f"[{name}] done")

if __name__ == "__main__":
    p1 = Process(target=worker, args=("Alpha",))
    p2 = Process(target=worker, args=("Beta",))

    p1.start()
    p2.start()

    p1.join()
    p2.join()
    print("All processes done")

Защита if __name__ == "__main__" обязательна на Windows и macOS (режим spawn): без неё каждый дочерний процесс будет пытаться запустить родительский код при старте, что приведёт к бесконечной рекурсии.

terminate() посылает процессу SIGTERM — принудительное завершение. kill() посылает SIGKILL. В отличие от потоков, процессы можно убивать «жёстко», но лучше использовать явный sentinel-паттерн для аккуратного завершения.

p = Process(target=long_task)
p.start()
time.sleep(2)

if p.is_alive():
    p.terminate()      # мягкое завершение
    p.join(timeout=3)  # ждём до 3 секунд

if p.is_alive():
    p.kill()           # жёсткое завершение
    p.join()

3.2 Pool — пул процессов

multiprocessing.Pool — высокоуровневый способ распараллелить функцию на набор данных. Создаёт пул из N процессов и распределяет задачи между ними:

from multiprocessing import Pool
import time

def compute(n: int) -> int:
    """CPU-bound задача"""
    return sum(i * i for i in range(n))

if __name__ == "__main__":
    data = [5_000_000] * 8

    # Последовательно
    start = time.perf_counter()
    results = [compute(n) for n in data]
    print(f"Sequential:        {time.perf_counter() - start:.2f}s")

    # Параллельно с Pool
    start = time.perf_counter()
    with Pool(processes=4) as pool:
        results = pool.map(compute, data)
    print(f"Pool(4 processes): {time.perf_counter() - start:.2f}s")
    # На 4-ядерном CPU даёт ~4x ускорение

Основные методы Pool:

  • pool.map(func, iterable) — применяет функцию к каждому элементу, возвращает список результатов в исходном порядке. Блокирует до завершения всех.
  • pool.starmap(func, iterable) — аналог map, но каждый элемент — кортеж аргументов: pool.starmap(func, [(a1, b1), (a2, b2), ...]).
  • pool.apply_async(func, args) — неблокирующий вызов, возвращает AsyncResult. Позволяет запускать задачи без ожидания каждой.
with Pool(4) as pool:
    # apply_async: запускаем задачи и собираем результаты потом
    futures = [pool.apply_async(compute, (n,)) for n in data]
    results = [f.get() for f in futures]  # get() блокирует до готовности

3.3 IPC: Queue и Pipe

Процессы не разделяют память — данные нужно явно передавать через механизмы IPC (Inter-Process Communication).

multiprocessing.Queue — потокобезопасная и процессобезопасная очередь. Работает через os pipe под капотом:

from multiprocessing import Process, Queue

def producer(q: Queue, items: list):
    for item in items:
        q.put(item)
    q.put(None)  # sentinel

def consumer(q: Queue):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processed: {item ** 2}")

if __name__ == "__main__":
    q = Queue()
    data = list(range(10))

    p = Process(target=producer, args=(q, data))
    c = Process(target=consumer, args=(q,))

    p.start(); c.start()
    p.join(); c.join()

Pipe — двусторонний канал связи между двумя процессами. Быстрее Queue для простых случаев «один-к-одному»:

from multiprocessing import Process, Pipe

def child(conn):
    msg = conn.recv()
    print(f"Child received: {msg}")
    conn.send(f"Echo: {msg}")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p = Process(target=child, args=(child_conn,))
    p.start()

    parent_conn.send("Hello from parent")
    response = parent_conn.recv()
    print(f"Parent received: {response}")
    p.join()

Pipe() возвращает два Connection-объекта: у каждого есть send() и recv(). По умолчанию Pipe двусторонний — оба конца могут и отправлять, и получать.

3.4 Разделяемая память: Value, Array, Manager

Иногда нужно, чтобы несколько процессов читали и изменяли одни данные. multiprocessing.Value и Array дают доступ к разделяемой памяти через ctypes:

from multiprocessing import Process, Value, Array
import ctypes

def increment(shared_counter, n: int):
    for _ in range(n):
        with shared_counter.get_lock():  # нужен явный лок!
            shared_counter.value += 1

if __name__ == "__main__":
    counter = Value(ctypes.c_int, 0)
    processes = [
        Process(target=increment, args=(counter, 100_000))
        for _ in range(4)
    ]
    for p in processes: p.start()
    for p in processes: p.join()
    print(counter.value)  # 400_000

Value сам по себе не атомарен — нужен явный get_lock(). Array работает аналогично, но для последовательности элементов одного типа.

Manager — более гибкий вариант: сервер-процесс с прокси-объектами, которые поддерживают привычные Python-типы (dict, list, Namespace):

from multiprocessing import Process, Manager

def worker(shared_dict, key: str, value: int):
    shared_dict[key] = value

if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        processes = [
            Process(target=worker, args=(d, f"key{i}", i))
            for i in range(5)
        ]
        for p in processes: p.start()
        for p in processes: p.join()
        print(dict(d))  # {'key0': 0, 'key1': 1, ...}

Manager удобен, но медленнее Value/Array — каждое обращение проходит через IPC к серверному процессу. Для высоконагруженных сценариев предпочтительнее Value/Array или архитектура без разделяемого состояния.

3.5 Overhead процессов vs потоков

Создание процесса значительно дороже создания потока. Поток — несколько мегабайт стека и структура данных ОС. Процесс — полная копия адресного пространства, инициализация нового интерпретатора Python, загрузка модулей.

import threading
import multiprocessing
import time

def empty_task():
    pass

N = 100

# Потоки
start = time.perf_counter()
threads = [threading.Thread(target=empty_task) for _ in range(N)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threads:   {time.perf_counter() - start:.3f}s")

# Процессы
if __name__ == "__main__":
    start = time.perf_counter()
    processes = [multiprocessing.Process(target=empty_task) for _ in range(N)]
    for p in processes: p.start()
    for p in processes: p.join()
    print(f"Processes: {time.perf_counter() - start:.3f}s")
    # Процессы на порядок медленнее при старте

Практическое правило: multiprocessing оправдан, когда время выполнения задачи на процесс — хотя бы десятки миллисекунд, а объём передаваемых данных (через pickle) небольшой. Для коротких задач с большими данными overhead полностью съедает выигрыш.

Python-вопросы в Telegram

Ежедневные разборы GIL, asyncio и паттернов.

Подписаться

4. concurrent.futures: единый высокоуровневый API

4.1 ThreadPoolExecutor vs ProcessPoolExecutor

Модуль concurrent.futures предоставляет единый интерфейс для пулов потоков и процессов. Вместо ручного управления жизненным циклом Thread/Process — просто передаём задачи в executor:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# ThreadPoolExecutor — пул потоков (для I/O-bound)
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(fetch_url, urls))

# ProcessPoolExecutor — пул процессов (для CPU-bound)
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(compute, data))

with-блок гарантирует корректное завершение: при выходе executor вызывает shutdown(wait=True) — дожидается всех запущенных задач. Это важно: утечки потоков/процессов — распространённая ошибка.

max_workers по умолчанию: для ThreadPoolExecutormin(32, cpu_count + 4), для ProcessPoolExecutorcpu_count(). Для I/O-bound задач разумно выставлять больше, чем число ядер — потоки большую часть времени ждут.

4.2 submit() + Future

submit() — неблокирующий запуск задачи. Возвращает объект Future — обещание результата:

from concurrent.futures import ThreadPoolExecutor
import time

def slow_task(n: int) -> str:
    time.sleep(n)
    return f"task-{n} done"

with ThreadPoolExecutor(max_workers=3) as executor:
    # submit() не блокирует — задача поставлена в пул
    future1 = executor.submit(slow_task, 2)
    future2 = executor.submit(slow_task, 1)
    future3 = executor.submit(slow_task, 3)

    # Можно делать другую работу пока задачи выполняются
    print("Tasks submitted, doing other work...")

    # result() блокирует до готовности конкретного future
    print(future2.result())   # возвращает первым — он самый быстрый
    print(future1.result())
    print(future3.result())

Future поддерживает несколько полезных методов:

  • result(timeout=None) — возвращает результат или бросает исключение
  • exception() — возвращает исключение (не бросает)
  • done()True, если задача завершена
  • add_done_callback(fn) — регистрирует callback, который вызовется при завершении

4.3 map() — параллельный итератор

executor.map(func, iterable) — самый простой способ применить функцию к набору данных параллельно:

from concurrent.futures import ProcessPoolExecutor

def square(n: int) -> int:
    return n * n

with ProcessPoolExecutor() as executor:
    # map() возвращает итератор с результатами в исходном порядке
    squares = list(executor.map(square, range(10)))

print(squares)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

В отличие от Pool.map, executor.map() можно вызывать с chunksize — батчингом задач, что снижает overhead IPC при большом числе мелких задач:

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(square, range(10_000), chunksize=100))

Без chunksize каждый элемент — отдельный IPC-запрос. С chunksize=100 данные передаются пачками по 100 элементов — значительно быстрее для маленьких задач.

4.4 as_completed() — обработка по мере готовности

as_completed() позволяет обрабатывать результаты в порядке завершения, а не в исходном порядке:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def fetch_data(url: str) -> str:
    delay = random.uniform(0.5, 2.0)
    time.sleep(delay)
    return f"{url} -> {delay:.2f}s"

urls = [f"https://api.example.com/endpoint/{i}" for i in range(8)]

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(fetch_data, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(f"Done: {result}")
        except Exception as e:
            print(f"Failed {url}: {e}")

Паттерн {future: original_data} — стандартный способ связать future с исходными данными. as_completed() возвращает Future-объекты в порядке их завершения — самые быстрые приходят первыми.

4.5 Пример: I/O-bound vs CPU-bound

Финальное сравнение: один и тот же код, разный executor — и совершенно разная производительность в зависимости от природы задачи:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import urllib.request

# I/O-BOUND: загрузка URL
def fetch(url: str) -> int:
    with urllib.request.urlopen(url, timeout=10) as r:
        return len(r.read())

urls = ["https://httpbin.org/delay/1"] * 8

# Плохо: ProcessPoolExecutor для I/O — избыточный overhead
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as ex:
    list(ex.map(fetch, urls))
print(f"ProcessPool (I/O): {time.perf_counter() - start:.1f}s")

# Хорошо: ThreadPoolExecutor для I/O
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=8) as ex:
    list(ex.map(fetch, urls))
print(f"ThreadPool (I/O):  {time.perf_counter() - start:.1f}s")

# CPU-BOUND: факториал
def compute(n: int) -> int:
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

data = [200_000] * 8

# Плохо: ThreadPoolExecutor для CPU — GIL не даёт параллелизма
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as ex:
    list(ex.map(compute, data))
print(f"ThreadPool (CPU):   {time.perf_counter() - start:.1f}s")

# Хорошо: ProcessPoolExecutor для CPU
if __name__ == "__main__":
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as ex:
        list(ex.map(compute, data))
    print(f"ProcessPool (CPU): {time.perf_counter() - start:.1f}s")

5. Когда что выбирать: практическая шпаргалка

Таблица сравнения

Параметрthreadingmultiprocessingasyncio
МодельПотоки в одном процессеОтдельные процессыКорутины в одном потоке
Параллелизм по CPUНет (GIL)Да (отдельные GIL)Нет (однопоточный)
I/O-конкурентностьДаИзбыточноДа (лучший вариант)
Разделение памятиОбщаяИзолированная (pickle)Общая
Накладные расходыНизкиеВысокие (fork + pickle)Минимальные
СинхронизацияLock, Semaphore, EventQueue, Pipe, Valueasyncio.Lock, asyncio.Queue
Подходит дляI/O-bound: сеть, файлы, БДCPU-bound: вычисленияI/O-bound с высокой нагрузкой

asyncio vs threading vs multiprocessing

Выбор зависит от двух вопросов: где тратится время и какая нагрузка?

threading — разумный выбор для I/O-bound задач с умеренным числом параллельных операций (десятки). Синхронные библиотеки (requests, psycopg2) работают в потоках без переписывания. Модель знакома и проще asyncio для начинающих.

asyncio — лучший выбор для I/O-bound задач с высокой нагрузкой (тысячи соединений). Меньше overhead, чем потоки, без переключений контекста ОС. Требует асинхронных библиотек (aiohttp, asyncpg). Подробнее — в Asyncio в Python: event loop, async/await и задачи.

multiprocessing — единственный способ получить настоящий CPU-параллелизм в Python. Для числодробилок, ML-препроцессинга, обработки данных. ProcessPoolExecutor через concurrent.futures — предпочтительный API.

Комбинация: asyncio + ProcessPoolExecutor через loop.run_in_executor() — стандартный паттерн для смешанных задач: I/O через event loop, тяжёлые вычисления offload в процессы.

import asyncio
from concurrent.futures import ProcessPoolExecutor

def heavy_compute(data):
    return sum(x ** 2 for x in data)

async def handle_request(data):
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, heavy_compute, data)
    return result

6. Производительность и подводные камни

6.1 Pickling — что нельзя передать в Process

Данные между процессами передаются через pickle — сериализацию Python. Не всё можно сериализовать, и это частый источник ошибок:

from multiprocessing import Pool
import pickle

# Что нельзя pickle-ить:
# - lambda функции
# - вложенные функции (closures с захваченными переменными)
# - некоторые объекты с состоянием (сокеты, файловые дескрипторы)
# - генераторы

# ОШИБКА: lambda не pickle-ится
with Pool(4) as pool:
    # pool.map(lambda x: x**2, range(10))  # AttributeError!
    pass

# РАБОТАЕТ: обычная функция на верхнем уровне модуля
def square(x):
    return x ** 2

with Pool(4) as pool:
    print(pool.map(square, range(10)))

# Проверить, можно ли передать объект:
def check_picklable(obj):
    try:
        pickle.dumps(obj)
        return True
    except Exception as e:
        print(f"Not picklable: {e}")
        return False

Правило: функции, передаваемые в Pool/ProcessPoolExecutor, должны быть объявлены на верхнем уровне модуля (не как lambda и не как вложенные функции). Аргументы — сериализуемые объекты.

6.2 Утечки ресурсов: всегда используйте with / shutdown

Пул потоков или процессов — ресурс, который нужно явно закрывать. Незакрытый пул продолжает удерживать потоки/процессы:

from concurrent.futures import ThreadPoolExecutor

# ПЛОХО: создаём executor без with — утечка потоков
executor = ThreadPoolExecutor(max_workers=10)
results = executor.map(some_func, data)
# executor никогда не закроется явно!

# ХОРОШО: with гарантирует shutdown(wait=True)
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(some_func, data))

# Или явно:
executor = ThreadPoolExecutor(max_workers=10)
try:
    results = list(executor.map(some_func, data))
finally:
    executor.shutdown(wait=True)

То же касается Pool из multiprocessing: всегда используйте with Pool(...) as pool или явный pool.close() + pool.join().

6.3 fork vs spawn vs forkserver

При создании нового процесса Python использует один из трёх методов запуска — start method. Это важно знать, особенно при отладке странного поведения на разных платформах.

fork (по умолчанию на Linux): дочерний процесс — полная копия родительского через os.fork(). Быстрый старт, но копирует всё состояние родителя, включая открытые сокеты и блокировки. Может быть небезопасен в многопоточных программах: если в момент fork один из потоков держал мьютекс, дочерний процесс унаследует его в залоченном состоянии.

spawn (по умолчанию на Windows и macOS с Python 3.8+): запускает свежий интерпретатор Python, импортирует модуль и вызывает целевую функцию. Чисто, безопасно, но медленнее — нужно заново импортировать всё.

forkserver: запускает отдельный сервер-процесс при старте программы; новые процессы создаются через него. Компромисс между fork и spawn.

import multiprocessing

# Изменить метод запуска (делать в if __name__ == "__main__")
if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")
    # или "fork", "forkserver"

    p = multiprocessing.Process(target=some_func)
    p.start()
    p.join()

На практике: если ваш код работает на Linux через Docker — используется fork. Если разрабатываете на macOS и деплоите на Linux — поведение может отличаться. spawn — наиболее предсказуемый вариант на всех платформах.

7. Итоги и что спросят на собеседовании

В чём главное отличие threading от multiprocessing?

threading создаёт потоки в одном процессе с общей памятью. GIL разрешает выполнять Python-байткод только одному потоку в каждый момент — настоящего параллелизма по CPU нет. Зато I/O-операции освобождают GIL, потоки дёшевы в создании и могут легко обмениваться данными через общие объекты.

multiprocessing создаёт отдельные процессы с изолированной памятью. Каждый процесс — свой GIL, настоящий параллелизм. Дорог в создании, данные нужно передавать через IPC (Queue, Pipe, pickle). Это выбор для CPU-bound задач.

Почему threading не ускоряет CPU-bound задачи?

Из-за GIL. В каждый момент только один поток выполняет Python-байткод. CPU-bound задача постоянно держит GIL — другим потокам ничего не достаётся. В итоге потоки работают последовательно, добавляя лишь overhead на переключение контекста.

Как правильно передать данные между процессами?

Через multiprocessing.Queue для очереди задач, multiprocessing.Pipe для двустороннего канала двух процессов, Value/Array для разделяемой памяти примитивных типов, Manager() для Python-словарей и списков. Функции и аргументы для Pool/ProcessPoolExecutor сериализуются через pickle автоматически — убедитесь, что объекты picklable.

Что такое concurrent.futures и когда его использовать?

Высокоуровневый API с единым интерфейсом для потоков и процессов. ThreadPoolExecutor для I/O-bound, ProcessPoolExecutor для CPU-bound. Предпочтительнее низкоуровневых Thread/Process для большинства задач: не нужно управлять жизненным циклом, есть удобный API (submit, map, as_completed).

Как происходит deadlock и как его предотвратить?

Deadlock — взаимная блокировка: поток A держит lock1 и ждёт lock2, поток B держит lock2 и ждёт lock1. Оба ждут вечно. Предотвращение: всегда захватывать несколько локов в одном фиксированном порядке во всех потоках; использовать lock.acquire(timeout=N) с явной обработкой неудачи; по возможности избегать нескольких локов одновременно.

Чем fork отличается от spawn?

fork — создаёт дочерний процесс как копию родительского через os.fork(). Быстро, но наследует всё состояние, включая открытые файловые дескрипторы и блокировки. spawn — запускает свежий интерпретатор Python. Медленнее, зато чисто. По умолчанию на Linux — fork, на Windows/macOS — spawn. Для максимальной переносимости используйте spawn.

Когда asyncio лучше threading?

Для I/O-bound задач с высокой нагрузкой (тысячи параллельных соединений). Потоки потребляют память (стек каждого потока — мегабайты), и переключение контекста между ними имеет overhead. asyncio работает в одном потоке, корутины — легковесны, тысячи конкурентных операций — норма. Недостаток: требует асинхронных библиотек.

Можно ли комбинировать asyncio и multiprocessing?

Да, и это стандартный паттерн. loop.run_in_executor(ProcessPoolExecutor(), func, *args) запускает CPU-heavy функцию в отдельном процессе, возвращая asyncio.Future. Event loop продолжает обрабатывать другие корутины — не блокируется. Так строят высоконагруженные сервисы: I/O через asyncio, тяжёлые вычисления offload в процессы.

Что такое race condition и как её поймать?

Race condition — состояние гонки, когда результат зависит от порядка выполнения потоков. Типичный пример: counter += 1 в нескольких потоках без лока. Поймать сложно — баги проявляются нестабильно. Инструменты: запуск с -X dev (включает дополнительные проверки), использование threading.Lock для всех shared-объектов, предпочтение queue.Queue и immutable данных.

Когда стоит использовать multiprocessing.Pool вместо ProcessPoolExecutor?

Pool имеет дополнительные методы (imap, imap_unordered, starmap, apply) и более гибкое управление пулом. ProcessPoolExecutor — стандартизированный API через concurrent.futures, лучше интегрируется с asyncio через run_in_executor. Для нового кода предпочтительнее ProcessPoolExecutor; Pool оправдан, если нужен imap_unordered или другие специфические методы.

Почему нельзя использовать lambda в ProcessPoolExecutor?

ProcessPoolExecutor сериализует функции через pickle, а lambda-функции не поддерживают pickle (они анонимны и не имеют привязки к модулю). Вместо lambda используйте обычные функции на верхнем уровне модуля или классы с __call__. Если нужна логика с параметрами — functools.partial тоже не всегда pickle-ится; лучше написать явную функцию.

Python-вопросы в Telegram

Ежедневные разборы GIL, asyncio и паттернов.

Подписаться

FAQ

Автор

Lexicon Team

Читайте также