Saltar al contenido principal

Apache Airflow

Definición

Apache Airflow es una plataforma de código abierto para la creación, programación y monitoreo programático de workflows. Los workflows se expresan como Grafos Acíclicos Dirigidos (DAGs) escritos en Python, lo que brinda a los ingenieros la expresividad completa de un lenguaje de programación para definir dependencias complejas, lógica de ramificación, generación dinámica de tareas y políticas de reintento. Airflow fue creado originalmente en Airbnb en 2014 y luego donado a la Apache Software Foundation; se ha convertido en el estándar de facto para la orquestación de workflows batch en ingeniería de datos y MLOps.

En el contexto ML, Airflow orquesta el ciclo de vida completo del modelo: ingesta de datos, preprocesamiento, ingeniería de características, entrenamiento del modelo, evaluación, registro de artefactos y despliegue. No ejecuta el cómputo por sí mismo — en cambio, delega a sistemas especializados (Spark, dbt, SageMaker, Kubernetes) a través de su rico ecosistema de operadores.

Cómo funciona

DAGs y dependencias de tareas

Un DAG es un archivo Python que instancia un objeto airflow.DAG y define tareas usando operadores. Las dependencias entre tareas se declaran con el operador de desplazamiento de bits >> o llamadas set_downstream/set_upstream. El programador lee estos archivos desde la carpeta de DAGs y desencadena instancias de tareas cuando todas las dependencias upstream están en estado success.

Operadores, sensores y hooks

Los operadores son las unidades atómicas de trabajo en Airflow. Los sensores son una clase especial de operador que bloquea hasta que se cumple una condición. Los hooks proporcionan conexiones reutilizables a sistemas externos.

XComs y comunicación entre tareas

Los XComs permiten a las tareas empujar y extraer pequeños valores entre instancias de tareas dentro del mismo DAG run. Son ideales para pasar métricas de evaluación del modelo, rutas de artefactos o flags de decisión.

Arquitectura del scheduler

Con KubernetesExecutor, cada instancia de tarea obtiene su propio pod de Kubernetes aislado, eliminando la contención de recursos de trabajadores compartidos.

Cuándo usar / Cuándo NO usar

Usar cuandoEvitar cuando
Necesita orquestación de workflows batch con dependencias complejasSu carga de trabajo requiere latencia sub-minuto o es dirigida por eventos
Su equipo se siente cómodo escribiendo workflows en PythonQuiere un constructor de workflows low-code o UI-first
Necesita integración rica con servicios cloud (AWS, GCP, Azure)Sus DAGs son extremadamente simples y un cron job sería suficiente
Requiere registros de auditoría detallados, reintentos y alertasNecesita un servicio de orquestación gestionado y sin operaciones

Comparaciones

CriterioApache AirflowPrefect
Facilidad de usoModeradaAlta
EscalabilidadAltaAlta
Calidad de UIBuenaExcelente
Curva de aprendizajePronunciadaSuave

Pros y contras

ProsContras
Ecosistema maduro con cientos de integraciones de proveedoresOverhead operacional significativo
Expresividad completa de Python para generación dinámica de DAGsLos errores de análisis de DAG pueden romper el scheduler silenciosamente
Soporte sólido de comunidad y empresaNo apto para streaming o programación sub-minuto
KubernetesExecutor permite aislamiento de recursos por tareaXComs son limitados en tamaño

Ejemplo de código

from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
"owner": "ml-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["ml-alerts@example.com"],
}

def extract_data(**context) -> None:
import pandas as pd, pathlib
df = pd.DataFrame({"feature_a": [1.0, 2.0], "feature_b": [0.1, 0.4], "label": [0, 1]})
output_path = "/tmp/airflow/training_data.parquet"
pathlib.Path(output_path).parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_path, index=False)
context["ti"].xcom_push(key="data_path", value=output_path)

def train_model(**context) -> None:
import pandas as pd, mlflow, mlflow.sklearn
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

data_path = context["ti"].xcom_pull(task_ids="extract_data", key="data_path")
df = pd.read_parquet(data_path)
X = df[["feature_a", "feature_b"]].values
y = df["label"].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
with mlflow.start_run(run_name="airflow-logistic-regression"):
model = LogisticRegression()
model.fit(X_train, y_train)
accuracy = accuracy_score(y_test, model.predict(X_test))
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, artifact_path="model")

with DAG(
dag_id="ml_training_pipeline",
description="Extract -> Train pipeline for nightly model refresh",
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule="0 2 * * *",
catchup=False,
tags=["ml", "training"],
) as dag:
extract = PythonOperator(task_id="extract_data", python_callable=extract_data)
train = PythonOperator(task_id="train_model", python_callable=train_model)
extract >> train

Recursos prácticos

Ver también