Errores en código asíncrono: captura y propagación con asyncio
Los errores en corrutinas se propagan igual que en código síncrono, con try/except. El reto es cuando varios errores ocurren en paralelo — ahí entra ExceptionGroup (Python 3.11+).
try/except dentro de corrutinas
Las excepciones en corrutinas se comportan exactamente igual que en funciones síncronas: si no se capturan, suben por el stack hasta que alguien las atrapa o el programa termina. Puedes usar try/except/finally con total normalidad.
import asyncio
async def consultar_db(query: str) -> list[dict]:
await asyncio.sleep(0.1) # simula latencia
if "DROP" in query.upper():
raise PermissionError(f"Query peligrosa rechazada: {query}")
return [{"id": 1, "nombre": "resultado"}]
async def obtener_usuarios() -> list[dict]:
try:
usuarios = await consultar_db("SELECT * FROM usuarios")
return usuarios
except PermissionError as e:
print(f" Error de permisos: {e}")
return []
except Exception as e:
print(f" Error inesperado: {e}")
raise # re-lanza si no sabemos manejarlo
async def main() -> None:
# Consulta segura ✅
resultado = await obtener_usuarios()
print(f"Usuarios: {resultado}")
# Consulta peligrosa ❌
async def peligrosa() -> None:
await consultar_db("DROP TABLE usuarios")
try:
await peligrosa()
except PermissionError as e:
print(f"Capturado en main: {e}")
asyncio.run(main())Usuarios: [{'id': 1, 'nombre': 'resultado'}]
Error de permisos: Query peligrosa rechazada: DROP TABLE usuarios
Capturado en main: Query peligrosa rechazada: DROP TABLE usuariosEl bloque finally funciona igual — se ejecuta siempre, incluso si la corrutina es cancelada:
import asyncio
async def con_recurso() -> None:
print("Abriendo conexión...")
try:
await asyncio.sleep(5) # puede cancelarse aquí
print("Operación completada")
finally:
print("Cerrando conexión (siempre se ejecuta)")
async def main() -> None:
tarea = asyncio.create_task(con_recurso())
await asyncio.sleep(0.1)
tarea.cancel()
try:
await tarea
except asyncio.CancelledError:
print("Tarea cancelada")
asyncio.run(main())Abriendo conexión...
Cerrando conexión (siempre se ejecuta)
Tarea canceladaErrores en gather (return_exceptions)
Cuando usas gather sin return_exceptions=True, la primera excepción cancela el gather y se propaga. Las demás tareas siguen ejecutándose en segundo plano pero sus resultados se descartan.
import asyncio
async def tarea(nombre: str, falla: bool = False) -> str:
await asyncio.sleep(0.2)
if falla:
raise RuntimeError(f"Fallo en {nombre}")
return f"{nombre}: ok"
async def main() -> None:
# Sin return_exceptions: el error se propaga inmediatamente
print("=== Sin return_exceptions ===")
try:
resultados = await asyncio.gather(
tarea("A"),
tarea("B", falla=True), # lanza error
tarea("C"),
)
except RuntimeError as e:
print(f"Error: {e}")
# Resultado de A y C se pierde ❌
# Con return_exceptions: recoge todo
print("=== Con return_exceptions=True ===")
resultados = await asyncio.gather(
tarea("A"),
tarea("B", falla=True),
tarea("C"),
return_exceptions=True,
)
for r in resultados:
if isinstance(r, Exception):
print(f" ❌ {type(r).__name__}: {r}")
else:
print(f" ✅ {r}")
asyncio.run(main())=== Sin return_exceptions ===
Error: Fallo en B
=== Con return_exceptions=True ===
✅ A: ok
❌ RuntimeError: Fallo en B
✅ C: okTaskGroup y ExceptionGroup (Python 3.11+)
asyncio.TaskGroup es la forma moderna (Python 3.11+) de gestionar múltiples tareas. Si alguna falla, cancela automáticamente las demás y agrupa todos los errores en un ExceptionGroup.
import asyncio
async def servicio(nombre: str, demora: float, falla: bool = False) -> str:
await asyncio.sleep(demora)
if falla:
raise ConnectionError(f"No se pudo conectar a {nombre}")
return f"{nombre}: disponible"
async def main() -> None:
resultados: list[str] = []
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(servicio("auth", 0.1))
t2 = tg.create_task(servicio("db", 0.3, falla=True))
t3 = tg.create_task(servicio("cache", 0.2))
# Si alguna falla, las demás se cancelan automáticamente
except* ConnectionError as eg:
# except* maneja ExceptionGroup (sintaxis Python 3.11+)
for exc in eg.exceptions:
print(f" Error de conexión: {exc}")
# Las tareas completadas antes del fallo tienen su resultado
if t1.done() and not t1.cancelled():
print(f"Auth: {t1.result()}")
asyncio.run(main()) Error de conexión: No se pudo conectar a db
Auth: auth: disponibleLa sintaxis except* TipoError as eg captura un ExceptionGroup. Es diferente de except normal. Si usas Python 3.10 o anterior, necesitas capturar BaseExceptionGroup manualmente o usar gather(return_exceptions=True).
Timeouts con asyncio.timeout()
Siempre debes poner un límite de tiempo a las operaciones de red. asyncio.timeout() (Python 3.11+) es la forma limpia de hacerlo.
import asyncio
async def llamada_lenta(segundos: float) -> str:
await asyncio.sleep(segundos)
return "respuesta"
async def main() -> None:
# asyncio.timeout() — Python 3.11+
print("=== asyncio.timeout() ===")
try:
async with asyncio.timeout(1.0): # máximo 1 segundo
resultado = await llamada_lenta(0.5)
print(f"Éxito: {resultado}")
except TimeoutError:
print("Timeout alcanzado")
try:
async with asyncio.timeout(1.0):
resultado = await llamada_lenta(2.0) # tarda demasiado
print(f"Esto no se imprime")
except TimeoutError:
print("Timeout: la llamada tardó más de 1 segundo")
# asyncio.wait_for() — compatible con Python 3.7+
print("=== asyncio.wait_for() ===")
try:
resultado = await asyncio.wait_for(
llamada_lenta(0.5),
timeout=1.0,
)
print(f"Éxito: {resultado}")
except asyncio.TimeoutError:
print("Timeout")
asyncio.run(main())=== asyncio.timeout() ===
Éxito: respuesta
Timeout: la llamada tardó más de 1 segundo
=== asyncio.wait_for() ===
Éxito: respuestaSin timeout, una corrutina puede quedar esperando indefinidamente si el servidor no responde. Esto bloquea recursos del sistema y puede dejar el programa «colgado» sin error visible. Define siempre un timeout razonable para llamadas HTTP, consultas a DB y cualquier I/O externo.