CAP 06 · LEC 04·Asincronía

El event loop de Python: cómo asyncio maneja la concurrencia

El event loop es el corazón de asyncio. Un solo hilo, una cola de tareas, y un mecanismo de callbacks que permite manejar miles de conexiones simultáneas sin hilos.

● AVANZADO12 min lectura3 ejerciciospor Fernando Herrera · actualizado mayo de 2026
¿Encontraste un error o algo que mejorar?Editá esta lección en GitHub →

¿Qué es el event loop?

El event loop es un bucle infinito que mantiene una cola de tareas pendientes. En cada iteración del bucle selecciona las tareas que están listas para continuar (porque su await terminó) y las reanuda. Todo esto ocurre en un solo hilo.

import asyncio # Pseudocódigo de cómo funciona el event loop internamente: # # while tareas_pendientes: # for tarea in tareas_listas_para_continuar(): # resultado = tarea.enviar(None) # reanuda la corrutina # if resultado es "awaiting IO": # registrar_callback(IO_listo → reanudar_tarea) # elif resultado es "completado": # marcar_tarea_como_lista(resultado.valor) # # En la práctica, el event loop usa selectores del SO (select/epoll/kqueue) # para saber qué operaciones de I/O están listas. async def demostrar_concurrencia() -> None: # Cuando esta corrutina hace await, el event loop # puede ejecutar OTRAS corrutinas mientras tanto print("antes del await") await asyncio.sleep(0) # cede al event loop (duración 0) print("después del await — el event loop nos devolvió el control") async def main() -> None: # Creamos múltiples tareas para el event loop t1 = asyncio.create_task(demostrar_concurrencia()) t2 = asyncio.create_task(demostrar_concurrencia()) await t1 await t2 asyncio.run(main())
Salidaantes del await antes del await después del await — el event loop nos devolvió el control después del await — el event loop nos devolvió el control
Un hilo, no varios

asyncio no usa múltiples hilos. La concurrencia viene de que las corrutinas ceden voluntariamente el control cuando hacen await. Si una corrutina nunca hace await, bloquea al event loop completo — nadie más puede ejecutarse.

El ciclo de vida de una corrutina

Una corrutina pasa por varios estados desde que se crea hasta que termina. Entender este ciclo ayuda a depurar problemas de tareas que «nunca se ejecutaron» o que quedan «colgadas».

import asyncio async def mi_corrutina(nombre: str) -> str: print(f"{nombre}: RUNNING — empezando") await asyncio.sleep(0.5) print(f"{nombre}: RUNNING — reanudando tras await") return f"{nombre}: completado" async def main() -> None: # Estado 1: PENDING — la tarea existe pero no ha empezado tarea = asyncio.create_task(mi_corrutina("trabajo")) print(f"¿Hecha? {tarea.done()}") # False — aún PENDING # Ceder al event loop para que la tarea empiece await asyncio.sleep(0) print(f"¿Hecha? {tarea.done()}") # False — RUNNING (en asyncio.sleep) # Estado 3: DONE — cuando terminó sin error resultado = await tarea print(f"¿Hecha? {tarea.done()}") # True — DONE print(f"Resultado: {tarea.result()}") # Estado especial: CANCELLED tarea_a_cancelar = asyncio.create_task(mi_corrutina("cancelable")) tarea_a_cancelar.cancel() try: await tarea_a_cancelar except asyncio.CancelledError: print(f"¿Cancelada? {tarea_a_cancelar.cancelled()}") # True asyncio.run(main())
Salida¿Hecha? False trabajo: RUNNING — empezando ¿Hecha? False trabajo: RUNNING — reanudando tras await ¿Hecha? True Resultado: trabajo: completado ¿Cancelada? True

asyncio.get_event_loop() y asyncio.run()

asyncio.run() es la API de alto nivel. Para casos avanzados donde necesitas acceder al loop directamente, usa asyncio.get_event_loop() o asyncio.get_running_loop() (más seguro, falla si no hay loop activo).

import asyncio async def inspeccion() -> None: # get_running_loop(): solo funciona dentro de async, más seguro loop = asyncio.get_running_loop() print(f"Loop: {loop}") print(f"¿Corriendo? {loop.is_running()}") # True print(f"¿Cerrado? {loop.is_closed()}") # False # El tiempo del loop (monotónico, en segundos) print(f"Tiempo: {loop.time():.3f}s") # Ejecutar un callable en el hilo del loop def callback() -> None: print("Callback ejecutado desde el loop") loop.call_soon(callback) # próxima iteración loop.call_later(0.1, callback) # en 0.1 segundos await asyncio.sleep(0.2) # damos tiempo a que los callbacks se ejecuten asyncio.run(inspeccion())
SalidaLoop: <_UnixSelectorEventLoop running=True closed=False> ¿Corriendo? True ¿Cerrado? False Tiempo: 0.001s Callback ejecutado desde el loop Callback ejecutado desde el loop
get_event_loop() está deprecado en 3.10+

asyncio.get_event_loop() crea un nuevo loop si no hay ninguno activo, lo cual puede causar bugs sutiles. Usa asyncio.get_running_loop() dentro de corrutinas, y asyncio.run() como punto de entrada. Si necesitas el loop antes de asyncio.run(), usa asyncio.new_event_loop().

Integrar código síncrono con run_in_executor

El mayor riesgo con asyncio es llamar código bloqueante (una función síncrona que tarda) dentro de una corrutina — bloquea el event loop entero. run_in_executor delega ese código a un hilo o proceso separado.

import asyncio import time import concurrent.futures # Función síncrona bloqueante (CPU-bound o librería sin async) def calcular_hash(datos: bytes) -> str: import hashlib time.sleep(0.5) # simula operación costosa return hashlib.sha256(datos).hexdigest()[:16] def leer_archivo_grande(ruta: str) -> str: time.sleep(1.0) # simula I/O bloqueante return f"contenido de {ruta}" async def main() -> None: loop = asyncio.get_running_loop() # ThreadPoolExecutor: para I/O bloqueante y código con GIL with concurrent.futures.ThreadPoolExecutor() as pool: hash_resultado = await loop.run_in_executor( pool, calcular_hash, b"datos importantes", ) print(f"Hash: {hash_resultado}") # ProcessPoolExecutor: para cálculos CPU-bound reales with concurrent.futures.ProcessPoolExecutor() as pool: contenido = await loop.run_in_executor( pool, leer_archivo_grande, "datos.csv", ) print(f"Archivo: {contenido}") asyncio.run(main())
SalidaHash: a1b2c3d4e5f67890 Archivo: contenido de datos.csv
asyncio.to_thread() — la forma moderna

Desde Python 3.9, asyncio.to_thread(func, *args) es el atajo para run_in_executor con ThreadPoolExecutor. Úsalo en lugar de la API más verbosa cuando solo necesitas delegar a un hilo:

resultado = await asyncio.to_thread(calcular_hash, b"datos")

Practica