Asyncio в Python: event loop, async/await и задачи

Полный разбор asyncio: как работает event loop, coroutines и async/await, создание задач через asyncio.Task и gather — и что точно спросят на собеседовании.

01 марта 2026 г.20 минLexicon Team

Когда веб-сервер обрабатывает тысячи одновременных HTTP-запросов, большую часть времени он проводит в ожидании: ответа от базы данных, результата внешнего API, завершения записи в файл. Создавать отдельный поток на каждое соединение — дорого: потоки потребляют память, а переключение контекста между ними нагружает планировщик ОС. Именно здесь asyncio показывает свою силу: один поток, один event loop, тысячи конкурентных операций без блокировки.

asyncio появился в Python 3.4 (PEP 3156), а синтаксис async/await — в Python 3.5 (PEP 492). С тех пор он стал стандартным инструментом для I/O-конкурентности в Python: FastAPI, aiohttp, SQLAlchemy async, asyncpg — все крупные современные библиотеки работают поверх него. Понять, как asyncio устроен изнутри, важно не только для правильного использования, но и для диагностики типичных ошибок — блокирующих вызовов внутри корутин, забытых await, утечек задач.

Если вы ещё не читали про GIL, загляните сначала в GIL в Python: как работает и что спрашивают на собеседовании — там объясняется, почему asyncio не взаимодействует с GIL вообще. А обзор других тем интервью — в Python: 50 вопросов на собеседовании junior–middle.

Python для собеседований: 50 реальных вопросов

GIL, asyncio, ООП и память — с разборами и примерами кода.

Начать тренировку

1. Конкурентность, параллелизм и asyncio

1.1 Три модели: threading, multiprocessing, asyncio

В Python есть три инструмента для работы с конкурентностью, и каждый решает свою задачу. Их часто путают, потому что все три позволяют делать «несколько вещей одновременно» — но механизмы принципиально разные.

threading создаёт потоки внутри одного процесса. Потоки разделяют память, но GIL гарантирует, что Python-байткод выполняет только один поток в каждый момент. Переключение управляется планировщиком ОС — вытесняющее (preemptive): поток может быть прерван в любой момент. Хорошо для I/O-задач, бесполезно для CPU-задач.

multiprocessing создаёт отдельные процессы, каждый со своей памятью и своим GIL. Настоящий параллелизм на нескольких ядрах. Нужна сериализация данных между процессами (pickle). Хорошо для CPU-bound задач.

asyncio — кооперативная конкурентность в одном потоке. Нет ОС-потоков, нет переключения контекста, нет GIL-проблем. Корутины сами явно передают управление через await. Event loop — простой однопоточный диспетчер, выбирающий следующую готовую задачу. Хорошо для I/O-bound задач с высокой нагрузкой.

import asyncio
import threading
import time

# asyncio: один поток, тысячи конкурентных операций
async def async_worker(name, delay):
    await asyncio.sleep(delay)  # не блокирует event loop
    return f"{name} done"

async def main():
    start = time.time()
    results = await asyncio.gather(
        *[async_worker(f"task-{i}", 1) for i in range(1000)]
    )
    print(f"asyncio: {time.time() - start:.2f}s, {len(results)} задач")

asyncio.run(main())
# asyncio: ~1.00s, 1000 задач

Тысяча «одновременных» ожиданий по одной секунде — и всё занимает около секунды. Никаких потоков, никакого параллелизма — только event loop, кооперативно переключающий корутины.

1.2 I/O-bound vs CPU-bound: когда что выбирать

Выбор инструмента начинается с вопроса: где тратится время вашей программы?

I/O-bound задача — программа ждёт внешнего события: ответа от базы данных, HTTP-ответа, чтения файла. Процессор простаивает. Здесь asyncio оптимален: вместо блокировки потока корутина приостанавливается, event loop берёт следующую задачу. threading тоже работает, но с накладными расходами на потоки.

CPU-bound задача — программа активно вычисляет: парсит, сжимает, обрабатывает изображения. Процессор занят. asyncio здесь не поможет — event loop однопоточный, и тяжёлые вычисления заблокируют его целиком. Нужен ProcessPoolExecutor или run_in_executor для offload в отдельный процесс.

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy(n):
    """CPU-bound: выполняется в отдельном процессе"""
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_event_loop()

    # Запускаем CPU-задачу в ProcessPoolExecutor,
    # не блокируя event loop
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy, 10_000_000)

    print(f"Result: {result}")

asyncio.run(main())

run_in_executor — мост между asyncio и пулом процессов. Event loop продолжает обрабатывать другие корутины, пока cpu_heavy выполняется в отдельном процессе.

2. Event loop: сердце asyncio

2.1 Как устроен event loop изнутри

Event loop — это бесконечный цикл, который на каждой итерации опрашивает систему о готовности I/O, запускает готовые callback-и и продвигает корутины до следующего await.

В основе event loop — системный механизм мультиплексирования I/O: на Linux это epoll, на macOS — kqueue, на Windows — IOCP. Python абстрагирует их через selectors.DefaultSelector. Принцип один: ядро ОС следит за файловыми дескрипторами и уведомляет программу, когда данные готовы к чтению или записи. Никакого busy-waiting — поток действительно спит, пока нет работы.

                         ┌─────────────────────────────────────┐
                         │           Event Loop                │
                         │                                     │
  ┌──────────┐           │   ┌─────────────┐                  │
  │ Coroutine│──create── ▶   │  Task Queue │                  │
  │ (async fn│           │   │  (ready)    │                  │
  └──────────┘           │   └──────┬──────┘                  │
                         │          │ run step                 │
  ┌──────────┐           │   ┌──────▼──────┐                  │
  │ I/O event│──notify── ▶   │  Coroutine  │──await I/O──►    │
  │ (epoll)  │           │   │  execution  │     suspend       │
  └──────────┘           │   └─────────────┘                  │
                         │                                     │
  ┌──────────┐           │   ┌─────────────┐                  │
  │ Timer    │──fire──── ▶   │  Callbacks  │                  │
  │ (sleep)  │           │   │  (scheduled)│                  │
  └──────────┘           │   └─────────────┘                  │
                         └─────────────────────────────────────┘

Когда корутина встречает await asyncio.sleep(1) или await aiohttp.get(url), она не блокирует поток. Вместо этого: регистрирует callback на нужное событие (таймер или готовность I/O), приостанавливается, передаёт управление event loop. Loop берёт следующую готовую задачу. Через секунду (или когда данные готовы) callback срабатывает, корутина попадает обратно в очередь и продолжается с того места, где остановилась.

2.2 asyncio.run() и жизненный цикл loop

До Python 3.7 управление event loop было ручным: loop = asyncio.get_event_loop(), loop.run_until_complete(...), loop.close(). В 3.7 появился asyncio.run() — единственная точка входа для большинства программ:

import asyncio

async def main():
    print("Hello from asyncio")
    await asyncio.sleep(0.1)
    print("Done")

# asyncio.run() делает три вещи:
# 1. Создаёт новый event loop
# 2. Запускает переданную корутину до завершения
# 3. Закрывает loop и освобождает ресурсы
asyncio.run(main())

Под капотом asyncio.run() примерно эквивалентен:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
finally:
    try:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.run_until_complete(loop.shutdown_default_executor())
    finally:
        asyncio.set_event_loop(None)
        loop.close()

Важный момент: asyncio.run() нельзя вызвать из уже работающего event loop. В Jupyter Notebook loop уже запущен — там нужен await напрямую или nest_asyncio.

2.3 Фазы одной итерации loop

Каждая итерация event loop проходит через несколько фаз. Понимание этого помогает объяснить, почему asyncio.sleep(0) передаёт управление, а синхронный тяжёлый код блокирует всё:

Фаза 1: Ready callbacks. Выполняет все callbacks, которые уже готовы к исполнению: завершённые I/O-операции, callbacks от call_soon(), только что созданные задачи.

Фаза 2: I/O polling. Вызывает select()/epoll_wait() с таймаутом, равным ближайшему запланированному событию (или 0, если есть готовые задачи). Ядро ОС блокирует поток ровно настолько, насколько нужно — не дольше.

Фаза 3: Scheduled callbacks. Срабатывают таймеры, время которых наступило: asyncio.sleep(), call_later().

Фаза 4: Исполнение задач. Каждая Task.__step__() продвигает корутину до следующего await.

import asyncio

async def demonstrate_loop():
    print("Step 1: начали корутину")
    await asyncio.sleep(0)   # передаём управление loop — "yield to loop"
    print("Step 2: вернулись после sleep(0)")
    await asyncio.sleep(0.1) # уходим на 100мс
    print("Step 3: вернулись после sleep(0.1)")

asyncio.run(demonstrate_loop())

asyncio.sleep(0) — не «ничего не делать». Это явная передача управления event loop: текущая корутина приостанавливается, loop может выполнить другие задачи, затем возвращается к нашей. Это эквивалент yield в генераторах.

3. Coroutines и async/await

3.1 Что такое coroutine-объект

async def определяет не обычную функцию, а coroutine function. Её вызов не выполняет тело функции — он создаёт coroutine object:

import asyncio
import inspect

async def greet(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"Hello, {name}"

# Вызов async def создаёт объект корутины, не выполняет её
coro = greet("World")
print(type(coro))     # <class 'coroutine'>
print(inspect.iscoroutine(coro))  # True

# Корутина ничего не сделала — нужен await или asyncio.run()
result = asyncio.run(coro)
print(result)  # Hello, World

Coroutine object реализует протокол __await__: при каждом шаге event loop вызывает coro.send(None) (или coro.throw(exception)), пока не получит StopIteration — это сигнал о завершении. Возвращаемое значение корутины — атрибут value у StopIteration.

Это прямое наследие генераторов Python. До появления async/await корутины писались через @asyncio.coroutine и yield from. async def / await — синтаксический сахар поверх той же механики.

3.2 await: три типа awaitables

await работает не только с корутинами. Объект awaitable — это всё, у чего есть метод __await__, возвращающий итератор. Три типа:

Coroutines — объекты от async def. Самый распространённый случай:

async def fetch_data():
    return {"key": "value"}

async def main():
    data = await fetch_data()  # ждём завершения корутины

Tasksasyncio.Task, обёртка над корутиной, запланированная на event loop:

async def main():
    task = asyncio.create_task(fetch_data())
    # task уже выполняется! await просто ждёт результата
    data = await task

Futures — низкоуровневый примитив: объект, который будет содержать результат в будущем. Task наследуется от Future. Большинство I/O библиотек возвращают Future под капотом:

import asyncio

async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()

    # Устанавливаем результат через секунду
    loop.call_later(1.0, future.set_result, "ready")

    result = await future  # ждём, пока future не получит результат
    print(result)  # ready

asyncio.run(main())

Своя реализация __await__ нужна редко, но полезна для библиотек:

class MyAwaitable:
    def __await__(self):
        yield  # приостановить на одну итерацию loop
        return "result"

async def main():
    result = await MyAwaitable()
    print(result)  # result

3.3 Цепочки coroutines и стек вызовов

Корутины можно вызывать из корутин, создавая цепочки вызовов. await в корутине работает аналогично обычному вызову функции — приостанавливает текущую и ждёт результата вложенной:

import asyncio
import time

async def level_3():
    await asyncio.sleep(0.1)
    return "deep result"

async def level_2():
    print(f"  level_2 started at {time.time():.3f}")
    result = await level_3()
    print(f"  level_2 got: {result}")
    return result.upper()

async def level_1():
    print(f"level_1 started at {time.time():.3f}")
    result = await level_2()
    print(f"level_1 got: {result}")
    return result

asyncio.run(level_1())

Важно: такая цепочка выполняется последовательно внутри одной задачи. level_1 ждёт level_2, level_2 ждёт level_3. Конкурентности нет — чтобы её добавить, нужны Task или gather, о которых дальше.

4. asyncio.Task: задачи и планировщик

4.1 asyncio.create_task()

create_task() — главный способ запустить корутину конкурентно. В отличие от await coroutine() (который выполняется последовательно), create_task() регистрирует корутину в event loop и возвращает управление немедленно:

import asyncio
import time

async def worker(name: str, delay: float):
    print(f"{name}: start")
    await asyncio.sleep(delay)
    print(f"{name}: done")
    return f"{name} result"

async def sequential():
    """Последовательно: ~3 секунды"""
    start = time.time()
    await worker("A", 1.0)
    await worker("B", 1.0)
    await worker("C", 1.0)
    print(f"sequential: {time.time() - start:.1f}s")

async def concurrent():
    """Конкурентно: ~1 секунда"""
    start = time.time()
    task_a = asyncio.create_task(worker("A", 1.0))
    task_b = asyncio.create_task(worker("B", 1.0))
    task_c = asyncio.create_task(worker("C", 1.0))

    # Ждём все три задачи
    results = await asyncio.gather(task_a, task_b, task_c)
    print(f"concurrent: {time.time() - start:.1f}s")
    print(f"results: {results}")

asyncio.run(concurrent())

Ключевой момент: задача начинает выполняться сразу при create_task(), не при await. Фактически она встаёт в очередь event loop и начнёт работу при следующей итерации loop — когда текущая корутина дойдёт до своего await.

async def main():
    task = asyncio.create_task(worker("background", 0.1))
    # В этот момент task УЖЕ поставлена в очередь

    print("doing other work...")
    await asyncio.sleep(0.2)  # здесь task выполняется фоново

    result = await task  # task уже завершилась — await вернётся сразу
    print(result)

4.2 Отмена задач и CancelledError

Task.cancel() — способ остановить выполняющуюся задачу. Это не немедленное прерывание: метод cancel() вбрасывает CancelledError в корутину при следующей итерации event loop, в точку ожидания (await):

import asyncio

async def long_operation():
    try:
        print("Starting long operation...")
        await asyncio.sleep(10)  # долгое ожидание
        return "completed"
    except asyncio.CancelledError:
        print("Operation was cancelled — cleanup...")
        # Важно: пробросить CancelledError или дать задаче завершиться
        raise  # правильно: сигнализируем, что задача отменена

async def main():
    task = asyncio.create_task(long_operation())

    await asyncio.sleep(1)  # даём задаче поработать секунду

    task.cancel()  # запрашиваем отмену

    try:
        await task  # ждём фактического завершения
    except asyncio.CancelledError:
        print("Task was successfully cancelled")

    print(f"Task cancelled: {task.cancelled()}")  # True

asyncio.run(main())

Если CancelledError поглощается внутри корутины (не пробрасывается), задача завершается нормально — не как отменённая. Это ломает семантику отмены и нарушает ожидания вызывающего кода. Поэтому правило простое: CancelledError можно перехватить для cleanup, но нужно обязательно пробросить дальше.

Timeout через asyncio.wait_for() — удобная обёртка над отменой:

async def main():
    try:
        result = await asyncio.wait_for(long_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Timed out after 2 seconds")

wait_for() отменяет корутину при истечении таймаута и преобразует CancelledError в TimeoutError.

4.3 Обработка исключений в задачах

Исключения в Task не всплывают немедленно — они сохраняются в объекте задачи до момента, когда задача await-нута:

import asyncio

async def failing_task():
    await asyncio.sleep(0.1)
    raise ValueError("something went wrong")

async def main():
    task = asyncio.create_task(failing_task())

    # Если не await-нуть task — исключение проглотится молча
    # (с предупреждением "Task exception was never retrieved")
    await asyncio.sleep(0.5)

    try:
        await task  # здесь исключение всплывает
    except ValueError as e:
        print(f"Caught: {e}")

asyncio.run(main())

Если задача завершилась с исключением, а её никто не await-нул — Python выведет предупреждение при сборке мусора: Task exception was never retrieved. Это распространённая ошибка — "fire and forget" без обработки исключений.

Безопасный паттерн "fire and forget" с обработкой исключений:

def handle_exception(task: asyncio.Task):
    try:
        task.result()  # вызовет исключение, если оно есть
    except asyncio.CancelledError:
        pass  # отмена — нормально
    except Exception as e:
        print(f"Task failed: {e}")

async def main():
    task = asyncio.create_task(failing_task())
    task.add_done_callback(handle_exception)

    # Задача выполняется фоново, исключение обработается через callback
    await asyncio.sleep(1)

Python-вопросы в Telegram

Ежедневные разборы GIL, asyncio и паттернов.

Подписаться

5. asyncio.gather() и asyncio.wait()

5.1 gather: параллельный запуск и сбор результатов

asyncio.gather() — самый удобный способ запустить несколько корутин конкурентно и дождаться всех результатов:

import asyncio
import aiohttp  # pip install aiohttp

async def fetch(session, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        "https://api.example.com/users/1",
        "https://api.example.com/users/2",
        "https://api.example.com/users/3",
    ]

    async with aiohttp.ClientSession() as session:
        # Все три запроса выполняются конкурентно
        results = await asyncio.gather(
            *[fetch(session, url) for url in urls]
        )

    # results — список в том же порядке, что и входные корутины
    for url, result in zip(urls, results):
        print(f"{url}: {result}")

asyncio.run(main())

По умолчанию, если одна из корутин бросает исключение, gather() немедленно пробрасывает его вызывающему коду, отменяя остальные задачи. Параметр return_exceptions=True меняет поведение: исключения возвращаются как результаты, не останавливая другие задачи:

async def maybe_fail(n: int):
    await asyncio.sleep(0.1)
    if n == 2:
        raise ValueError(f"task {n} failed")
    return f"result {n}"

async def main():
    results = await asyncio.gather(
        maybe_fail(1),
        maybe_fail(2),
        maybe_fail(3),
        return_exceptions=True,
    )

    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i}: {result}")

asyncio.run(main())
# Task 1: result 1
# Task 2 failed: task 2 failed
# Task 3: result 3

Важное свойство gather(): результаты возвращаются в том же порядке, что и входные корутины, независимо от порядка завершения. Это удобно, когда нужно сопоставить результат с входными данными.

5.2 wait: гибкое ожидание с FIRST_COMPLETED / FIRST_EXCEPTION

asyncio.wait() — более гибкий, но менее удобный инструмент. Он возвращает два set-а: done (завершённые) и pending (ещё выполняющиеся):

import asyncio

async def task_with_delay(name: str, delay: float):
    await asyncio.sleep(delay)
    return f"{name} completed in {delay}s"

async def main():
    tasks = {
        asyncio.create_task(task_with_delay("fast", 0.5)),
        asyncio.create_task(task_with_delay("medium", 1.0)),
        asyncio.create_task(task_with_delay("slow", 2.0)),
    }

    # FIRST_COMPLETED: вернуться, как только хоть одна задача завершится
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    print(f"First done: {len(done)}, still pending: {len(pending)}")
    for task in done:
        print(f"  Result: {task.result()}")

    # Отменяем оставшиеся задачи
    for task in pending:
        task.cancel()

    # Ждём отмены
    await asyncio.gather(*pending, return_exceptions=True)

asyncio.run(main())

Три режима ожидания через return_when:

asyncio.ALL_COMPLETED (по умолчанию) — ждать, пока все задачи завершатся или будут отменены.

asyncio.FIRST_COMPLETED — вернуться сразу, как только хоть одна задача завершится. Удобно для гонки: «кто ответит первым из нескольких серверов».

asyncio.FIRST_EXCEPTION — вернуться при первом исключении (или при завершении всех, если исключений нет).

Ключевое отличие от gather(): wait() принимает только Task-объекты (не корутины напрямую), возвращает set-ы (порядок не сохраняется), и не отменяет pending-задачи автоматически — это нужно делать вручную.

5.3 TaskGroup (Python 3.11+)

Python 3.11 добавил asyncio.TaskGroup — структурированный способ управления группой задач, вдохновлённый концепцией structured concurrency. Он безопаснее gather() в плане отмены и обработки ошибок:

import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # имитация I/O
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    results = []

    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(fetch_user(i))
            for i in range(1, 6)
        ]

    # После выхода из with-блока все задачи гарантированно завершены
    for task in tasks:
        results.append(task.result())

    print(results)

asyncio.run(main())

Если одна задача внутри TaskGroup бросает исключение, все остальные задачи группы автоматически отменяются. Это поведение structured concurrency: группа задач — единая единица работы, и частичный успех не допускается. При множественных ошибках TaskGroup оборачивает их в ExceptionGroup, с которым работает синтаксис except*:

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(failing_task(1))
            tg.create_task(failing_task(2))
            tg.create_task(success_task(3))
    except* ValueError as eg:
        print(f"Got {len(eg.exceptions)} ValueError(s)")
    except* TypeError as eg:
        print(f"Got {len(eg.exceptions)} TypeError(s)")

TaskGroup — предпочтительный инструмент для нового кода на Python 3.11+, когда нужен структурированный жизненный цикл группы задач. Для простых случаев gather() остаётся удобнее.

6. Типичные ошибки и вопросы на собеседовании

Блокирующий вызов внутри coroutine

Самая распространённая ошибка в asyncio-коде — использовать синхронную блокирующую функцию там, где нужна асинхронная:

import asyncio
import time

# НЕПРАВИЛЬНО: time.sleep блокирует весь event loop
async def bad_worker(name: str):
    print(f"{name}: start")
    time.sleep(1)   # блокирует поток — все корутины заморожены
    print(f"{name}: done")

# ПРАВИЛЬНО: asyncio.sleep приостанавливает только эту корутину
async def good_worker(name: str):
    print(f"{name}: start")
    await asyncio.sleep(1)   # event loop продолжает работу
    print(f"{name}: done")

async def main():
    start = time.time()
    # bad_worker: занимает ~3 секунды (все последовательно из-за блокировки)
    # good_worker: занимает ~1 секунду (конкурентно)
    await asyncio.gather(
        good_worker("A"),
        good_worker("B"),
        good_worker("C"),
    )
    print(f"Time: {time.time() - start:.1f}s")

asyncio.run(main())

То же правило применяется к любым блокирующим операциям: requests.get() вместо aiohttp, синхронные драйверы БД (psycopg2), блокирующие файловые операции. Для таких случаев — run_in_executor:

import asyncio
import requests  # синхронная библиотека

async def fetch_sync_in_executor(url: str):
    loop = asyncio.get_event_loop()
    # Запускаем блокирующий вызов в thread pool, не блокируя event loop
    response = await loop.run_in_executor(None, requests.get, url)
    return response.json()

Забытый await (coroutine never awaited)

Python выдаст предупреждение RuntimeWarning: coroutine 'foo' was never awaited, если вызвать async def функцию без await:

import asyncio

async def save_to_db(data):
    await asyncio.sleep(0.1)
    print(f"Saved: {data}")

async def main():
    # НЕПРАВИЛЬНО: создаём coroutine object, но не запускаем его
    save_to_db({"key": "value"})  # ничего не сохранится!

    # ПРАВИЛЬНО: await запускает корутину
    await save_to_db({"key": "value"})

asyncio.run(main())

Это тихая ошибка в Python: код не упадёт, просто ничего не произойдёт. Предупреждение появится только при сборке мусора. В продакшн-коде включают PYTHONASYNCIODEBUG=1 или используют asyncio.run(..., debug=True) — debug-режим превращает предупреждение в ошибку.

Вложенный event loop (RuntimeError)

asyncio.run() нельзя вызвать из уже работающего event loop:

import asyncio

async def inner():
    return "result"

async def outer():
    # RuntimeError: This event loop is already running
    result = asyncio.run(inner())   # НЕПРАВИЛЬНО

    # ПРАВИЛЬНО: await напрямую
    result = await inner()
    return result

В Jupyter Notebook проблема возникает постоянно, потому что там постоянно работает event loop. Решение — либо await напрямую в ячейке (Jupyter поддерживает await на верхнем уровне), либо nest_asyncio.apply().

Утечка задач (task not awaited)

Создать задачу через create_task() и не ждать её завершения — утечка ресурсов:

import asyncio

async def background_work():
    await asyncio.sleep(10)
    print("background done")

async def main():
    # Задача создана, но мы не ждём её завершения
    asyncio.create_task(background_work())

    # main завершается — event loop останавливается —
    # задача убивается на полуслове
    await asyncio.sleep(0.1)
    print("main done")

Правильный паттерн: либо await задачу, либо хранить ссылку и явно дожидаться в конце:

async def main():
    tasks = set()
    task = asyncio.create_task(background_work())
    tasks.add(task)
    task.add_done_callback(tasks.discard)   # авто-cleanup при завершении

    await asyncio.sleep(0.1)

    # Дожидаемся всех фоновых задач перед выходом
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)

7. Что точно спросят на собеседовании

Что такое event loop и как он работает?

Event loop — однопоточный цикл, управляющий выполнением корутин и I/O-callback-ов в asyncio. На каждой итерации он: исполняет готовые callback-и, вызывает epoll/kqueue для получения готовых I/O-событий, срабатывает по таймерам. Когда корутина встречает await, она приостанавливается и отдаёт управление loop-у — он берёт следующую готовую задачу.

В чём разница между coroutine, Task и Future?

Coroutine — объект от async def, не выполняется без await или Task. Task — обёртка над корoutine, поставленная в очередь event loop через create_task(): начинает выполняться сразу, конкурентно. Future — низкоуровневый примитив, объект-обещание с результатом; Task наследует Future. В повседневном коде работают с корутинами и Task-ами, Future-ы появляются при интеграции с низкоуровневым API.

Чем asyncio.gather() отличается от asyncio.wait()?

gather() запускает корутины/задачи конкурентно, возвращает список результатов в исходном порядке, при ошибке по умолчанию отменяет остальные. wait() возвращает два set(done, pending), поддерживает режимы FIRST_COMPLETED/FIRST_EXCEPTION, порядок не гарантирован, pending-задачи нужно отменять вручную. Для большинства задач gather() удобнее; wait() нужен, когда важна реакция на первое завершение.

Почему time.sleep() нельзя использовать в async-функции?

time.sleep() — блокирующий системный вызов, останавливающий весь поток. Event loop работает в этом же потоке — он тоже заморозится. Все остальные корутины не смогут выполниться на всё время sleep. Нужен await asyncio.sleep() — он приостанавливает только текущую корутину, возвращая управление event loop.

Как отменить asyncio.Task?

task.cancel() вбрасывает CancelledError в корутину при следующей итерации loop, в точку await. Корутина может перехватить CancelledError для cleanup (закрыть соединение, освободить ресурсы), но должна пробросить его дальше — иначе задача не будет помечена как отменённая. await task после cancel() поднимет CancelledError. asyncio.wait_for() — удобная обёртка: отменяет задачу по таймауту, преобразует CancelledError в TimeoutError.

Можно ли запустить несколько event loop'ов в одном потоке?

Нет. В одном потоке может быть только один активный event loop. asyncio.run() внутри работающего loop вызовет RuntimeError: This event loop is already running. Это базовое ограничение дизайна: кооперативная конкурентность предполагает единый диспетчер. Обход — nest_asyncio (патчит asyncio для nested-запуска, используется в Jupyter) или создание нового потока с отдельным event loop.

Как запустить CPU-bound задачу внутри asyncio?

loop.run_in_executor(executor, func, *args) — оборачивает синхронную функцию в asyncio.Future и выполняет её в пуле. None как executor использует ThreadPoolExecutor по умолчанию (подходит для I/O-bound кода с блокирующим API). Для CPU-bound нужен ProcessPoolExecutor: он создаёт отдельный процесс, обходя GIL.

Что такое structured concurrency и TaskGroup?

Structured concurrency — подход, при котором группа задач живёт строго внутри своего контекста: все задачи гарантированно завершены при выходе из блока. asyncio.TaskGroup (Python 3.11+) реализует это через async with: если одна задача падает с исключением, все остальные в группе автоматически отменяются. Это надёжнее gather(), потому что нет случайных утечек задач.

Прокачай Python для собеседований

50 вопросов с разбором: asyncio, GIL, декораторы, ООП и память.

Начать тренировку

Python-вопросы в Telegram

Ежедневные разборы GIL, asyncio и паттернов.

Подписаться

Автор

Lexicon Team

Читайте также