Asyncio в Python: event loop, async/await и задачи
Полный разбор asyncio: как работает event loop, coroutines и async/await, создание задач через asyncio.Task и gather — и что точно спросят на собеседовании.
- 1. Конкурентность, параллелизм и asyncio
- 1.1 Три модели: threading, multiprocessing, asyncio
- 1.2 I/O-bound vs CPU-bound: когда что выбирать
- 2. Event loop: сердце asyncio
- 2.1 Как устроен event loop изнутри
- 2.2 asyncio.run() и жизненный цикл loop
- 2.3 Фазы одной итерации loop
- 3. Coroutines и async/await
- 3.1 Что такое coroutine-объект
- 3.2 await: три типа awaitables
- 3.3 Цепочки coroutines и стек вызовов
- 4. asyncio.Task: задачи и планировщик
- 4.1 asyncio.create_task()
- 4.2 Отмена задач и CancelledError
- 4.3 Обработка исключений в задачах
- 5. asyncio.gather() и asyncio.wait()
- 5.1 gather: параллельный запуск и сбор результатов
- 5.2 wait: гибкое ожидание с FIRST_COMPLETED / FIRST_EXCEPTION
- 5.3 TaskGroup (Python 3.11+)
- 6. Типичные ошибки и вопросы на собеседовании
- Блокирующий вызов внутри coroutine
- Забытый await (coroutine never awaited)
- Вложенный event loop (RuntimeError)
- Утечка задач (task not awaited)
- 7. Что точно спросят на собеседовании
- Что такое event loop и как он работает?
- В чём разница между coroutine, Task и Future?
- Чем asyncio.gather() отличается от asyncio.wait()?
- Почему time.sleep() нельзя использовать в async-функции?
- Как отменить asyncio.Task?
- Можно ли запустить несколько event loop'ов в одном потоке?
- Как запустить CPU-bound задачу внутри asyncio?
- Что такое structured concurrency и TaskGroup?
Когда веб-сервер обрабатывает тысячи одновременных HTTP-запросов, большую часть времени он проводит в ожидании: ответа от базы данных, результата внешнего API, завершения записи в файл. Создавать отдельный поток на каждое соединение — дорого: потоки потребляют память, а переключение контекста между ними нагружает планировщик ОС. Именно здесь asyncio показывает свою силу: один поток, один event loop, тысячи конкурентных операций без блокировки.
asyncio появился в Python 3.4 (PEP 3156), а синтаксис async/await — в Python 3.5 (PEP 492). С тех пор он стал стандартным инструментом для I/O-конкурентности в Python: FastAPI, aiohttp, SQLAlchemy async, asyncpg — все крупные современные библиотеки работают поверх него. Понять, как asyncio устроен изнутри, важно не только для правильного использования, но и для диагностики типичных ошибок — блокирующих вызовов внутри корутин, забытых await, утечек задач.
Если вы ещё не читали про GIL, загляните сначала в GIL в Python: как работает и что спрашивают на собеседовании — там объясняется, почему asyncio не взаимодействует с GIL вообще. А обзор других тем интервью — в Python: 50 вопросов на собеседовании junior–middle.
Python для собеседований: 50 реальных вопросов
GIL, asyncio, ООП и память — с разборами и примерами кода.
1. Конкурентность, параллелизм и asyncio
1.1 Три модели: threading, multiprocessing, asyncio
В Python есть три инструмента для работы с конкурентностью, и каждый решает свою задачу. Их часто путают, потому что все три позволяют делать «несколько вещей одновременно» — но механизмы принципиально разные.
threading создаёт потоки внутри одного процесса. Потоки разделяют память, но GIL гарантирует, что Python-байткод выполняет только один поток в каждый момент. Переключение управляется планировщиком ОС — вытесняющее (preemptive): поток может быть прерван в любой момент. Хорошо для I/O-задач, бесполезно для CPU-задач.
multiprocessing создаёт отдельные процессы, каждый со своей памятью и своим GIL. Настоящий параллелизм на нескольких ядрах. Нужна сериализация данных между процессами (pickle). Хорошо для CPU-bound задач.
asyncio — кооперативная конкурентность в одном потоке. Нет ОС-потоков, нет переключения контекста, нет GIL-проблем. Корутины сами явно передают управление через await. Event loop — простой однопоточный диспетчер, выбирающий следующую готовую задачу. Хорошо для I/O-bound задач с высокой нагрузкой.
import asyncio
import threading
import time
# asyncio: один поток, тысячи конкурентных операций
async def async_worker(name, delay):
await asyncio.sleep(delay) # не блокирует event loop
return f"{name} done"
async def main():
start = time.time()
results = await asyncio.gather(
*[async_worker(f"task-{i}", 1) for i in range(1000)]
)
print(f"asyncio: {time.time() - start:.2f}s, {len(results)} задач")
asyncio.run(main())
# asyncio: ~1.00s, 1000 задач
Тысяча «одновременных» ожиданий по одной секунде — и всё занимает около секунды. Никаких потоков, никакого параллелизма — только event loop, кооперативно переключающий корутины.
1.2 I/O-bound vs CPU-bound: когда что выбирать
Выбор инструмента начинается с вопроса: где тратится время вашей программы?
I/O-bound задача — программа ждёт внешнего события: ответа от базы данных, HTTP-ответа, чтения файла. Процессор простаивает. Здесь asyncio оптимален: вместо блокировки потока корутина приостанавливается, event loop берёт следующую задачу. threading тоже работает, но с накладными расходами на потоки.
CPU-bound задача — программа активно вычисляет: парсит, сжимает, обрабатывает изображения. Процессор занят. asyncio здесь не поможет — event loop однопоточный, и тяжёлые вычисления заблокируют его целиком. Нужен ProcessPoolExecutor или run_in_executor для offload в отдельный процесс.
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_heavy(n):
"""CPU-bound: выполняется в отдельном процессе"""
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_event_loop()
# Запускаем CPU-задачу в ProcessPoolExecutor,
# не блокируя event loop
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy, 10_000_000)
print(f"Result: {result}")
asyncio.run(main())
run_in_executor — мост между asyncio и пулом процессов. Event loop продолжает обрабатывать другие корутины, пока cpu_heavy выполняется в отдельном процессе.
2. Event loop: сердце asyncio
2.1 Как устроен event loop изнутри
Event loop — это бесконечный цикл, который на каждой итерации опрашивает систему о готовности I/O, запускает готовые callback-и и продвигает корутины до следующего await.
В основе event loop — системный механизм мультиплексирования I/O: на Linux это epoll, на macOS — kqueue, на Windows — IOCP. Python абстрагирует их через selectors.DefaultSelector. Принцип один: ядро ОС следит за файловыми дескрипторами и уведомляет программу, когда данные готовы к чтению или записи. Никакого busy-waiting — поток действительно спит, пока нет работы.
┌─────────────────────────────────────┐
│ Event Loop │
│ │
┌──────────┐ │ ┌─────────────┐ │
│ Coroutine│──create── ▶ │ Task Queue │ │
│ (async fn│ │ │ (ready) │ │
└──────────┘ │ └──────┬──────┘ │
│ │ run step │
┌──────────┐ │ ┌──────▼──────┐ │
│ I/O event│──notify── ▶ │ Coroutine │──await I/O──► │
│ (epoll) │ │ │ execution │ suspend │
└──────────┘ │ └─────────────┘ │
│ │
┌──────────┐ │ ┌─────────────┐ │
│ Timer │──fire──── ▶ │ Callbacks │ │
│ (sleep) │ │ │ (scheduled)│ │
└──────────┘ │ └─────────────┘ │
└─────────────────────────────────────┘
Когда корутина встречает await asyncio.sleep(1) или await aiohttp.get(url), она не блокирует поток. Вместо этого: регистрирует callback на нужное событие (таймер или готовность I/O), приостанавливается, передаёт управление event loop. Loop берёт следующую готовую задачу. Через секунду (или когда данные готовы) callback срабатывает, корутина попадает обратно в очередь и продолжается с того места, где остановилась.
2.2 asyncio.run() и жизненный цикл loop
До Python 3.7 управление event loop было ручным: loop = asyncio.get_event_loop(), loop.run_until_complete(...), loop.close(). В 3.7 появился asyncio.run() — единственная точка входа для большинства программ:
import asyncio
async def main():
print("Hello from asyncio")
await asyncio.sleep(0.1)
print("Done")
# asyncio.run() делает три вещи:
# 1. Создаёт новый event loop
# 2. Запускает переданную корутину до завершения
# 3. Закрывает loop и освобождает ресурсы
asyncio.run(main())
Под капотом asyncio.run() примерно эквивалентен:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
asyncio.set_event_loop(None)
loop.close()
Важный момент: asyncio.run() нельзя вызвать из уже работающего event loop. В Jupyter Notebook loop уже запущен — там нужен await напрямую или nest_asyncio.
2.3 Фазы одной итерации loop
Каждая итерация event loop проходит через несколько фаз. Понимание этого помогает объяснить, почему asyncio.sleep(0) передаёт управление, а синхронный тяжёлый код блокирует всё:
Фаза 1: Ready callbacks. Выполняет все callbacks, которые уже готовы к исполнению: завершённые I/O-операции, callbacks от call_soon(), только что созданные задачи.
Фаза 2: I/O polling. Вызывает select()/epoll_wait() с таймаутом, равным ближайшему запланированному событию (или 0, если есть готовые задачи). Ядро ОС блокирует поток ровно настолько, насколько нужно — не дольше.
Фаза 3: Scheduled callbacks. Срабатывают таймеры, время которых наступило: asyncio.sleep(), call_later().
Фаза 4: Исполнение задач. Каждая Task.__step__() продвигает корутину до следующего await.
import asyncio
async def demonstrate_loop():
print("Step 1: начали корутину")
await asyncio.sleep(0) # передаём управление loop — "yield to loop"
print("Step 2: вернулись после sleep(0)")
await asyncio.sleep(0.1) # уходим на 100мс
print("Step 3: вернулись после sleep(0.1)")
asyncio.run(demonstrate_loop())
asyncio.sleep(0) — не «ничего не делать». Это явная передача управления event loop: текущая корутина приостанавливается, loop может выполнить другие задачи, затем возвращается к нашей. Это эквивалент yield в генераторах.
3. Coroutines и async/await
3.1 Что такое coroutine-объект
async def определяет не обычную функцию, а coroutine function. Её вызов не выполняет тело функции — он создаёт coroutine object:
import asyncio
import inspect
async def greet(name: str) -> str:
await asyncio.sleep(0.1)
return f"Hello, {name}"
# Вызов async def создаёт объект корутины, не выполняет её
coro = greet("World")
print(type(coro)) # <class 'coroutine'>
print(inspect.iscoroutine(coro)) # True
# Корутина ничего не сделала — нужен await или asyncio.run()
result = asyncio.run(coro)
print(result) # Hello, World
Coroutine object реализует протокол __await__: при каждом шаге event loop вызывает coro.send(None) (или coro.throw(exception)), пока не получит StopIteration — это сигнал о завершении. Возвращаемое значение корутины — атрибут value у StopIteration.
Это прямое наследие генераторов Python. До появления async/await корутины писались через @asyncio.coroutine и yield from. async def / await — синтаксический сахар поверх той же механики.
3.2 await: три типа awaitables
await работает не только с корутинами. Объект awaitable — это всё, у чего есть метод __await__, возвращающий итератор. Три типа:
Coroutines — объекты от async def. Самый распространённый случай:
async def fetch_data():
return {"key": "value"}
async def main():
data = await fetch_data() # ждём завершения корутины
Tasks — asyncio.Task, обёртка над корутиной, запланированная на event loop:
async def main():
task = asyncio.create_task(fetch_data())
# task уже выполняется! await просто ждёт результата
data = await task
Futures — низкоуровневый примитив: объект, который будет содержать результат в будущем. Task наследуется от Future. Большинство I/O библиотек возвращают Future под капотом:
import asyncio
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# Устанавливаем результат через секунду
loop.call_later(1.0, future.set_result, "ready")
result = await future # ждём, пока future не получит результат
print(result) # ready
asyncio.run(main())
Своя реализация __await__ нужна редко, но полезна для библиотек:
class MyAwaitable:
def __await__(self):
yield # приостановить на одну итерацию loop
return "result"
async def main():
result = await MyAwaitable()
print(result) # result
3.3 Цепочки coroutines и стек вызовов
Корутины можно вызывать из корутин, создавая цепочки вызовов. await в корутине работает аналогично обычному вызову функции — приостанавливает текущую и ждёт результата вложенной:
import asyncio
import time
async def level_3():
await asyncio.sleep(0.1)
return "deep result"
async def level_2():
print(f" level_2 started at {time.time():.3f}")
result = await level_3()
print(f" level_2 got: {result}")
return result.upper()
async def level_1():
print(f"level_1 started at {time.time():.3f}")
result = await level_2()
print(f"level_1 got: {result}")
return result
asyncio.run(level_1())
Важно: такая цепочка выполняется последовательно внутри одной задачи. level_1 ждёт level_2, level_2 ждёт level_3. Конкурентности нет — чтобы её добавить, нужны Task или gather, о которых дальше.
4. asyncio.Task: задачи и планировщик
4.1 asyncio.create_task()
create_task() — главный способ запустить корутину конкурентно. В отличие от await coroutine() (который выполняется последовательно), create_task() регистрирует корутину в event loop и возвращает управление немедленно:
import asyncio
import time
async def worker(name: str, delay: float):
print(f"{name}: start")
await asyncio.sleep(delay)
print(f"{name}: done")
return f"{name} result"
async def sequential():
"""Последовательно: ~3 секунды"""
start = time.time()
await worker("A", 1.0)
await worker("B", 1.0)
await worker("C", 1.0)
print(f"sequential: {time.time() - start:.1f}s")
async def concurrent():
"""Конкурентно: ~1 секунда"""
start = time.time()
task_a = asyncio.create_task(worker("A", 1.0))
task_b = asyncio.create_task(worker("B", 1.0))
task_c = asyncio.create_task(worker("C", 1.0))
# Ждём все три задачи
results = await asyncio.gather(task_a, task_b, task_c)
print(f"concurrent: {time.time() - start:.1f}s")
print(f"results: {results}")
asyncio.run(concurrent())
Ключевой момент: задача начинает выполняться сразу при create_task(), не при await. Фактически она встаёт в очередь event loop и начнёт работу при следующей итерации loop — когда текущая корутина дойдёт до своего await.
async def main():
task = asyncio.create_task(worker("background", 0.1))
# В этот момент task УЖЕ поставлена в очередь
print("doing other work...")
await asyncio.sleep(0.2) # здесь task выполняется фоново
result = await task # task уже завершилась — await вернётся сразу
print(result)
4.2 Отмена задач и CancelledError
Task.cancel() — способ остановить выполняющуюся задачу. Это не немедленное прерывание: метод cancel() вбрасывает CancelledError в корутину при следующей итерации event loop, в точку ожидания (await):
import asyncio
async def long_operation():
try:
print("Starting long operation...")
await asyncio.sleep(10) # долгое ожидание
return "completed"
except asyncio.CancelledError:
print("Operation was cancelled — cleanup...")
# Важно: пробросить CancelledError или дать задаче завершиться
raise # правильно: сигнализируем, что задача отменена
async def main():
task = asyncio.create_task(long_operation())
await asyncio.sleep(1) # даём задаче поработать секунду
task.cancel() # запрашиваем отмену
try:
await task # ждём фактического завершения
except asyncio.CancelledError:
print("Task was successfully cancelled")
print(f"Task cancelled: {task.cancelled()}") # True
asyncio.run(main())
Если CancelledError поглощается внутри корутины (не пробрасывается), задача завершается нормально — не как отменённая. Это ломает семантику отмены и нарушает ожидания вызывающего кода. Поэтому правило простое: CancelledError можно перехватить для cleanup, но нужно обязательно пробросить дальше.
Timeout через asyncio.wait_for() — удобная обёртка над отменой:
async def main():
try:
result = await asyncio.wait_for(long_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Timed out after 2 seconds")
wait_for() отменяет корутину при истечении таймаута и преобразует CancelledError в TimeoutError.
4.3 Обработка исключений в задачах
Исключения в Task не всплывают немедленно — они сохраняются в объекте задачи до момента, когда задача await-нута:
import asyncio
async def failing_task():
await asyncio.sleep(0.1)
raise ValueError("something went wrong")
async def main():
task = asyncio.create_task(failing_task())
# Если не await-нуть task — исключение проглотится молча
# (с предупреждением "Task exception was never retrieved")
await asyncio.sleep(0.5)
try:
await task # здесь исключение всплывает
except ValueError as e:
print(f"Caught: {e}")
asyncio.run(main())
Если задача завершилась с исключением, а её никто не await-нул — Python выведет предупреждение при сборке мусора: Task exception was never retrieved. Это распространённая ошибка — "fire and forget" без обработки исключений.
Безопасный паттерн "fire and forget" с обработкой исключений:
def handle_exception(task: asyncio.Task):
try:
task.result() # вызовет исключение, если оно есть
except asyncio.CancelledError:
pass # отмена — нормально
except Exception as e:
print(f"Task failed: {e}")
async def main():
task = asyncio.create_task(failing_task())
task.add_done_callback(handle_exception)
# Задача выполняется фоново, исключение обработается через callback
await asyncio.sleep(1)
Python-вопросы в Telegram
Ежедневные разборы GIL, asyncio и паттернов.
5. asyncio.gather() и asyncio.wait()
5.1 gather: параллельный запуск и сбор результатов
asyncio.gather() — самый удобный способ запустить несколько корутин конкурентно и дождаться всех результатов:
import asyncio
import aiohttp # pip install aiohttp
async def fetch(session, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3",
]
async with aiohttp.ClientSession() as session:
# Все три запроса выполняются конкурентно
results = await asyncio.gather(
*[fetch(session, url) for url in urls]
)
# results — список в том же порядке, что и входные корутины
for url, result in zip(urls, results):
print(f"{url}: {result}")
asyncio.run(main())
По умолчанию, если одна из корутин бросает исключение, gather() немедленно пробрасывает его вызывающему коду, отменяя остальные задачи. Параметр return_exceptions=True меняет поведение: исключения возвращаются как результаты, не останавливая другие задачи:
async def maybe_fail(n: int):
await asyncio.sleep(0.1)
if n == 2:
raise ValueError(f"task {n} failed")
return f"result {n}"
async def main():
results = await asyncio.gather(
maybe_fail(1),
maybe_fail(2),
maybe_fail(3),
return_exceptions=True,
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i}: {result}")
asyncio.run(main())
# Task 1: result 1
# Task 2 failed: task 2 failed
# Task 3: result 3
Важное свойство gather(): результаты возвращаются в том же порядке, что и входные корутины, независимо от порядка завершения. Это удобно, когда нужно сопоставить результат с входными данными.
5.2 wait: гибкое ожидание с FIRST_COMPLETED / FIRST_EXCEPTION
asyncio.wait() — более гибкий, но менее удобный инструмент. Он возвращает два set-а: done (завершённые) и pending (ещё выполняющиеся):
import asyncio
async def task_with_delay(name: str, delay: float):
await asyncio.sleep(delay)
return f"{name} completed in {delay}s"
async def main():
tasks = {
asyncio.create_task(task_with_delay("fast", 0.5)),
asyncio.create_task(task_with_delay("medium", 1.0)),
asyncio.create_task(task_with_delay("slow", 2.0)),
}
# FIRST_COMPLETED: вернуться, как только хоть одна задача завершится
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"First done: {len(done)}, still pending: {len(pending)}")
for task in done:
print(f" Result: {task.result()}")
# Отменяем оставшиеся задачи
for task in pending:
task.cancel()
# Ждём отмены
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(main())
Три режима ожидания через return_when:
asyncio.ALL_COMPLETED (по умолчанию) — ждать, пока все задачи завершатся или будут отменены.
asyncio.FIRST_COMPLETED — вернуться сразу, как только хоть одна задача завершится. Удобно для гонки: «кто ответит первым из нескольких серверов».
asyncio.FIRST_EXCEPTION — вернуться при первом исключении (или при завершении всех, если исключений нет).
Ключевое отличие от gather(): wait() принимает только Task-объекты (не корутины напрямую), возвращает set-ы (порядок не сохраняется), и не отменяет pending-задачи автоматически — это нужно делать вручную.
5.3 TaskGroup (Python 3.11+)
Python 3.11 добавил asyncio.TaskGroup — структурированный способ управления группой задач, вдохновлённый концепцией structured concurrency. Он безопаснее gather() в плане отмены и обработки ошибок:
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # имитация I/O
return {"id": user_id, "name": f"User {user_id}"}
async def main():
results = []
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_user(i))
for i in range(1, 6)
]
# После выхода из with-блока все задачи гарантированно завершены
for task in tasks:
results.append(task.result())
print(results)
asyncio.run(main())
Если одна задача внутри TaskGroup бросает исключение, все остальные задачи группы автоматически отменяются. Это поведение structured concurrency: группа задач — единая единица работы, и частичный успех не допускается. При множественных ошибках TaskGroup оборачивает их в ExceptionGroup, с которым работает синтаксис except*:
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1))
tg.create_task(failing_task(2))
tg.create_task(success_task(3))
except* ValueError as eg:
print(f"Got {len(eg.exceptions)} ValueError(s)")
except* TypeError as eg:
print(f"Got {len(eg.exceptions)} TypeError(s)")
TaskGroup — предпочтительный инструмент для нового кода на Python 3.11+, когда нужен структурированный жизненный цикл группы задач. Для простых случаев gather() остаётся удобнее.
6. Типичные ошибки и вопросы на собеседовании
Блокирующий вызов внутри coroutine
Самая распространённая ошибка в asyncio-коде — использовать синхронную блокирующую функцию там, где нужна асинхронная:
import asyncio
import time
# НЕПРАВИЛЬНО: time.sleep блокирует весь event loop
async def bad_worker(name: str):
print(f"{name}: start")
time.sleep(1) # блокирует поток — все корутины заморожены
print(f"{name}: done")
# ПРАВИЛЬНО: asyncio.sleep приостанавливает только эту корутину
async def good_worker(name: str):
print(f"{name}: start")
await asyncio.sleep(1) # event loop продолжает работу
print(f"{name}: done")
async def main():
start = time.time()
# bad_worker: занимает ~3 секунды (все последовательно из-за блокировки)
# good_worker: занимает ~1 секунду (конкурентно)
await asyncio.gather(
good_worker("A"),
good_worker("B"),
good_worker("C"),
)
print(f"Time: {time.time() - start:.1f}s")
asyncio.run(main())
То же правило применяется к любым блокирующим операциям: requests.get() вместо aiohttp, синхронные драйверы БД (psycopg2), блокирующие файловые операции. Для таких случаев — run_in_executor:
import asyncio
import requests # синхронная библиотека
async def fetch_sync_in_executor(url: str):
loop = asyncio.get_event_loop()
# Запускаем блокирующий вызов в thread pool, не блокируя event loop
response = await loop.run_in_executor(None, requests.get, url)
return response.json()
Забытый await (coroutine never awaited)
Python выдаст предупреждение RuntimeWarning: coroutine 'foo' was never awaited, если вызвать async def функцию без await:
import asyncio
async def save_to_db(data):
await asyncio.sleep(0.1)
print(f"Saved: {data}")
async def main():
# НЕПРАВИЛЬНО: создаём coroutine object, но не запускаем его
save_to_db({"key": "value"}) # ничего не сохранится!
# ПРАВИЛЬНО: await запускает корутину
await save_to_db({"key": "value"})
asyncio.run(main())
Это тихая ошибка в Python: код не упадёт, просто ничего не произойдёт. Предупреждение появится только при сборке мусора. В продакшн-коде включают PYTHONASYNCIODEBUG=1 или используют asyncio.run(..., debug=True) — debug-режим превращает предупреждение в ошибку.
Вложенный event loop (RuntimeError)
asyncio.run() нельзя вызвать из уже работающего event loop:
import asyncio
async def inner():
return "result"
async def outer():
# RuntimeError: This event loop is already running
result = asyncio.run(inner()) # НЕПРАВИЛЬНО
# ПРАВИЛЬНО: await напрямую
result = await inner()
return result
В Jupyter Notebook проблема возникает постоянно, потому что там постоянно работает event loop. Решение — либо await напрямую в ячейке (Jupyter поддерживает await на верхнем уровне), либо nest_asyncio.apply().
Утечка задач (task not awaited)
Создать задачу через create_task() и не ждать её завершения — утечка ресурсов:
import asyncio
async def background_work():
await asyncio.sleep(10)
print("background done")
async def main():
# Задача создана, но мы не ждём её завершения
asyncio.create_task(background_work())
# main завершается — event loop останавливается —
# задача убивается на полуслове
await asyncio.sleep(0.1)
print("main done")
Правильный паттерн: либо await задачу, либо хранить ссылку и явно дожидаться в конце:
async def main():
tasks = set()
task = asyncio.create_task(background_work())
tasks.add(task)
task.add_done_callback(tasks.discard) # авто-cleanup при завершении
await asyncio.sleep(0.1)
# Дожидаемся всех фоновых задач перед выходом
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
7. Что точно спросят на собеседовании
Что такое event loop и как он работает?
Event loop — однопоточный цикл, управляющий выполнением корутин и I/O-callback-ов в asyncio. На каждой итерации он: исполняет готовые callback-и, вызывает epoll/kqueue для получения готовых I/O-событий, срабатывает по таймерам. Когда корутина встречает await, она приостанавливается и отдаёт управление loop-у — он берёт следующую готовую задачу.
В чём разница между coroutine, Task и Future?
Coroutine — объект от async def, не выполняется без await или Task. Task — обёртка над корoutine, поставленная в очередь event loop через create_task(): начинает выполняться сразу, конкурентно. Future — низкоуровневый примитив, объект-обещание с результатом; Task наследует Future. В повседневном коде работают с корутинами и Task-ами, Future-ы появляются при интеграции с низкоуровневым API.
Чем asyncio.gather() отличается от asyncio.wait()?
gather() запускает корутины/задачи конкурентно, возвращает список результатов в исходном порядке, при ошибке по умолчанию отменяет остальные. wait() возвращает два set-а (done, pending), поддерживает режимы FIRST_COMPLETED/FIRST_EXCEPTION, порядок не гарантирован, pending-задачи нужно отменять вручную. Для большинства задач gather() удобнее; wait() нужен, когда важна реакция на первое завершение.
Почему time.sleep() нельзя использовать в async-функции?
time.sleep() — блокирующий системный вызов, останавливающий весь поток. Event loop работает в этом же потоке — он тоже заморозится. Все остальные корутины не смогут выполниться на всё время sleep. Нужен await asyncio.sleep() — он приостанавливает только текущую корутину, возвращая управление event loop.
Как отменить asyncio.Task?
task.cancel() вбрасывает CancelledError в корутину при следующей итерации loop, в точку await. Корутина может перехватить CancelledError для cleanup (закрыть соединение, освободить ресурсы), но должна пробросить его дальше — иначе задача не будет помечена как отменённая. await task после cancel() поднимет CancelledError. asyncio.wait_for() — удобная обёртка: отменяет задачу по таймауту, преобразует CancelledError в TimeoutError.
Можно ли запустить несколько event loop'ов в одном потоке?
Нет. В одном потоке может быть только один активный event loop. asyncio.run() внутри работающего loop вызовет RuntimeError: This event loop is already running. Это базовое ограничение дизайна: кооперативная конкурентность предполагает единый диспетчер. Обход — nest_asyncio (патчит asyncio для nested-запуска, используется в Jupyter) или создание нового потока с отдельным event loop.
Как запустить CPU-bound задачу внутри asyncio?
loop.run_in_executor(executor, func, *args) — оборачивает синхронную функцию в asyncio.Future и выполняет её в пуле. None как executor использует ThreadPoolExecutor по умолчанию (подходит для I/O-bound кода с блокирующим API). Для CPU-bound нужен ProcessPoolExecutor: он создаёт отдельный процесс, обходя GIL.
Что такое structured concurrency и TaskGroup?
Structured concurrency — подход, при котором группа задач живёт строго внутри своего контекста: все задачи гарантированно завершены при выходе из блока. asyncio.TaskGroup (Python 3.11+) реализует это через async with: если одна задача падает с исключением, все остальные в группе автоматически отменяются. Это надёжнее gather(), потому что нет случайных утечек задач.
Прокачай Python для собеседований
50 вопросов с разбором: asyncio, GIL, декораторы, ООП и память.
Python-вопросы в Telegram
Ежедневные разборы GIL, asyncio и паттернов.
Автор
Lexicon Team
Читайте также
backend
Threading vs multiprocessing в Python: что выбрать и почему
Глубокий разбор threading и multiprocessing в Python: GIL, CPU-bound vs I/O-bound, concurrent.futures, синхронизация, IPC и реальные примеры кода для собеседования.
backend
Channels в Go: буферизированные и небуферизированные
Полный разбор каналов в Go: небуферизированные vs буферизированные, направленные каналы, select, закрытие, паттерны pipeline/fan-out/fan-in и частые ошибки на собеседовании.
backend
Go memory model простыми словами: happens-before, гонки и sync
Разбираем модель памяти Go: happens-before, data races, гарантии каналов, Mutex, Once и атомики — с примерами кода и реальными вопросами с интервью.