asyncio.gather, wait y as_completed: tareas en paralelo
gather ejecuta múltiples corrutinas a la vez y espera a que todas terminen. wait y as_completed ofrecen más control cuando necesitas gestionar los resultados a medida que llegan.
asyncio.gather() — el más común
asyncio.gather(*corrutinas) recibe múltiples corrutinas (o tareas), las ejecuta concurrentemente y retorna una lista con los resultados en el mismo orden en que las pasaste — independientemente de cuál terminó primero.
import asyncio
import time
async def consultar_api(endpoint: str, demora: float) -> dict:
await asyncio.sleep(demora)
return {"endpoint": endpoint, "datos": f"resultado de {endpoint}"}
async def main() -> None:
inicio = time.perf_counter()
# gather: todas las corrutinas se ejecutan concurrentemente
resultados = await asyncio.gather(
consultar_api("/usuarios", 1.5),
consultar_api("/productos", 1.0),
consultar_api("/pedidos", 2.0),
)
# resultados está en el mismo orden que las corrutinas
for r in resultados:
print(r["endpoint"], "→", r["datos"])
print(f"Tiempo total: {time.perf_counter() - inicio:.1f}s")
# Tiempo total: ~2.0s (el más lento), no 4.5s
asyncio.run(main())/usuarios → resultado de /usuarios
/productos → resultado de /productos
/pedidos → resultado de /pedidos
Tiempo total: 2.0sAunque /productos termina antes que /usuarios, resultados[0] siempre será el resultado de la primera corrutina que pasaste. gather mantiene el orden de la llamada.
También puedes pasar tareas ya creadas con create_task:
import asyncio
async def tarea(n: int) -> int:
await asyncio.sleep(n * 0.1)
return n * 10
async def main() -> None:
# Puedes mezclar corrutinas y tareas
t1 = asyncio.create_task(tarea(1))
t2 = asyncio.create_task(tarea(2))
# gather acepta tanto corrutinas como tasks
r1, r2, r3 = await asyncio.gather(t1, t2, tarea(3))
print(r1, r2, r3) # 10 20 30
asyncio.run(main())10 20 30asyncio.wait() — control granular
asyncio.wait() devuelve dos conjuntos: las tareas completadas (done) y las que aún están pendientes (pending). Es útil cuando quieres reaccionar a la primera tarea que termina, o cuando necesitas procesar completadas y cancelar pendientes.
import asyncio
async def tarea_variable(nombre: str, segundos: float) -> str:
await asyncio.sleep(segundos)
return f"{nombre} completado en {segundos}s"
async def main() -> None:
tareas = [
asyncio.create_task(tarea_variable("A", 1.0)),
asyncio.create_task(tarea_variable("B", 3.0)),
asyncio.create_task(tarea_variable("C", 2.0)),
]
# FIRST_COMPLETED: retorna cuando termina la primera
done, pending = await asyncio.wait(
tareas,
return_when=asyncio.FIRST_COMPLETED,
)
print(f"Primera en terminar: {len(done)} tarea(s)")
for tarea in done:
print(" →", tarea.result())
print(f"Aún pendientes: {len(pending)} tarea(s)")
# Cancela las restantes si ya no las necesitas
for tarea in pending:
tarea.cancel()
asyncio.run(main())Primera en terminar: 1 tarea(s)
→ A completado en 1.0s
Aún pendientes: 2 tarea(s)Las opciones de return_when:
asyncio.FIRST_COMPLETED— retorna cuando termina (o falla) cualquier tareaasyncio.FIRST_EXCEPTION— retorna cuando alguna tarea lanza una excepciónasyncio.ALL_COMPLETED— equivale agather, espera a que todas terminen
asyncio.as_completed() — por orden de llegada
as_completed() retorna un iterador de futuros que se van completando. Cada iteración te da la siguiente tarea que terminó — en orden de finalización, no de creación.
import asyncio
import time
async def procesar_item(item_id: int, demora: float) -> dict:
await asyncio.sleep(demora)
return {"id": item_id, "resultado": item_id * 2}
async def main() -> None:
inicio = time.perf_counter()
corrutinas = [
procesar_item(1, 2.0), # lenta
procesar_item(2, 0.5), # rápida
procesar_item(3, 1.0), # media
]
# as_completed: procesa cada resultado en cuanto llega
for futuro in asyncio.as_completed(corrutinas):
resultado = await futuro
transcurrido = time.perf_counter() - inicio
print(f" {transcurrido:.1f}s — item {resultado['id']}: {resultado['resultado']}")
asyncio.run(main()) 0.5s — item 2: 4
1.0s — item 3: 6
2.0s — item 1: 2Usa gather cuando necesitas todos los resultados y el orden importa. Usa as_completed cuando quieres procesar o mostrar resultados en tiempo real — como una barra de progreso, logs de operaciones, o cuando quieres mostrar resultados al usuario conforme llegan.
return_exceptions en gather
Por defecto, si cualquier corrutina en gather lanza una excepción, la excepción se propaga inmediatamente y las demás tareas continúan ejecutándose pero sus resultados se pierden. Con return_exceptions=True, los errores se devuelven como valores — puedes inspeccionarlos sin que interrumpan el resto.
import asyncio
async def operacion(n: int) -> float:
await asyncio.sleep(0.1)
if n == 0:
raise ValueError("División por cero no permitida")
return 100.0 / n
async def main() -> None:
# Sin return_exceptions: el primer error cancela todo
try:
await asyncio.gather(
operacion(5),
operacion(0), # ❌ lanza ValueError
operacion(2),
)
except ValueError as e:
print(f"Error sin tolerancia: {e}")
# Con return_exceptions=True: recoge todo, errores incluidos
resultados = await asyncio.gather(
operacion(5),
operacion(0), # retorna el error como valor
operacion(2),
return_exceptions=True,
)
for i, r in enumerate(resultados):
if isinstance(r, Exception):
print(f" operacion({i+1 if i < 2 else 2}): ERROR — {r}")
else:
print(f" operacion({i+1 if i < 2 else 2}): {r:.1f}")
asyncio.run(main())Error sin tolerancia: División por cero no permitida
operacion(1): 20.0
operacion(2): ERROR — División por cero no permitida
operacion(2): 50.0