Pipelines ETL — Extract, Transform, Load para Ingenieros de Datos
Guía práctica sobre pipelines ETL: extraer datos de múltiples fuentes, transformar con validación y lógica de negocio, y cargar en data warehouses. Cubre programación de batches, manejo de errores y monitoreo con Python, dbt y Airflow.
Nota para desarrolladores hispanohablantes: Esta guía incluye ejemplos y convenciones de nomenclatura adaptadas a equipos que trabajan en español. Cuando existen diferencias significativas en terminología técnica entre el inglés y el español, se indican explícitamente para facilitar la comunicación en equipos multiculturales.
Descripción General
ETL (Extract, Transform, Load) mueve datos desde sistemas operacionales hacia sistemas analíticos donde pueden ser consultados, reportados y usados para toma de decisiones. A diferencia del procesamiento de streams, que maneja eventos a medida que llegan, ETL procesa datos en batches programados, haciéndolo más simple de implementar y razonar para muchos casos de uso de análisis de negocio.
Esta guía cubre arquitectura de pipelines, estrategias de extracción de datos, patrones de transformación, técnicas de carga y consideraciones operativas de producción.
Cuándo Usar
- Necesitas consolidar datos de múltiples fuentes en una base de datos analítica única
- Tus consultas analíticas son demasiado lentas o disruptivas para correr en bases de datos de producción
- Necesitas snapshots históricos de datos que cambian con el tiempo
- Tus datos requieren limpieza, enriquecimiento o agregación antes del análisis
- Quieres separar cargas de trabajo operacionales y analíticos
- Necesitas procesamiento de datos programado y repetible (por hora, diario, semanal)
Cuándo NO Usar
- Necesitas latencia sub-segundo de evento a insight — usa procesamiento de streams
- Tu volumen de datos es lo suficientemente pequeño para consultar directamente en las bases de datos fuente
- Necesitas detección de fraude o alertas en tiempo real — usa streaming de eventos
- Tus cambios de datos son eventos discretos que deberían disparar acciones inmediatas
Conceptos Clave
| Concepto | Descripción |
|---|---|
| Extract | Leer datos de sistemas fuente (bases de datos, APIs, archivos) |
| Transform | Limpiar, validar, enriquecer y reestructurar datos |
| Load | Escribir datos procesados al destino (data warehouse, data lake) |
| Staging | Almacenamiento intermedio para datos crudos antes de transformación |
| Carga Incremental | Procesar solo registros nuevos o cambiados desde la última ejecución |
| Full Refresh | Reprocesar todos los datos desde cero |
| SCD | Slowly Changing Dimension — rastrear cambios históricos |
Arquitectura ETL
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Fuente │ │ Fuente │ │ Fuente │
│ CRM │ │ Pedidos │ │ Logs │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└───────────────┼───────────────┘
│ Extract
▼
┌────────────────┐
│ Área de Staging │ (Datos crudos, schema-on-read)
│ (S3 / GCS / │
│ Tablas temp) │
└───────┬────────┘
│ Transform
▼
┌────────────────┐
│ Data Warehouse │ (Datos limpios, modelados)
│ (Snowflake / │
│ BigQuery / │
│ PostgreSQL) │
└────────────────┘
Implementación ETL Paso a Paso
1. Extraer Datos de Fuentes
Extrae datos de forma confiable sin impactar sistemas fuente:
# Ejemplo: Extraer desde PostgreSQL con carga incremental
import psycopg2
import pandas as pd
from datetime import datetime
class PostgresExtractor:
def __init__(self, connection_string, watermark_table='etl_watermarks'):
self.conn = psycopg2.connect(connection_string)
self.watermark_table = watermark_table
self._ensure_watermark_table()
def _ensure_watermark_table(self):
"""Rastrear última hora de extracción por tabla."""
cursor = self.conn.cursor()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {self.watermark_table} (
table_name VARCHAR(255) PRIMARY KEY,
last_extracted TIMESTAMP,
last_id BIGINT,
record_count BIGINT
)
""")
self.conn.commit()
def extract_incremental(self, table, timestamp_column='updated_at',
id_column='id', batch_size=10000):
"""Extraer solo registros cambiados desde la última ejecución."""
cursor = self.conn.cursor()
# Obtener watermark
cursor.execute(f"""
SELECT last_extracted FROM {self.watermark_table}
WHERE table_name = %s
""", (table,))
row = cursor.fetchone()
last_extracted = row[0] if row else datetime.min
# Extraer registros nuevos/cambiados
cursor.execute(f"""
SELECT * FROM {table}
WHERE {timestamp_column} > %s
ORDER BY {id_column}
LIMIT %s
""", (last_extracted, batch_size))
columns = [desc[0] for desc in cursor.description]
data = cursor.fetchall()
df = pd.DataFrame(data, columns=columns)
# Actualizar watermark
if not df.empty:
max_timestamp = df[timestamp_column].max()
cursor.execute(f"""
INSERT INTO {self.watermark_table} (table_name, last_extracted)
VALUES (%s, %s)
ON CONFLICT (table_name) DO UPDATE SET last_extracted = EXCLUDED.last_extracted
""", (table, max_timestamp))
self.conn.commit()
return df
def extract_full(self, table):
"""Extraer tabla completa (para datos de referencia pequeños)."""
return pd.read_sql(f"SELECT * FROM {table}", self.conn)
# Ejemplo: Extraer desde API REST con paginación
import requests
import json
class APIExtractor:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {api_key}'}
def extract_paginated(self, endpoint, params=None):
"""Extraer todas las páginas de una API paginada."""
all_data = []
page = 1
while True:
response = requests.get(
f"{self.base_url}/{endpoint}",
headers=self.headers,
params={**(params or {}), 'page': page, 'per_page': 100}
)
response.raise_for_status()
data = response.json()
if not data.get('results'):
break
all_data.extend(data['results'])
if not data.get('has_more', False):
break
page += 1
return pd.DataFrame(all_data)
def extract_with_backoff(self, endpoint, max_retries=3):
"""Extraer con backoff exponencial para rate limiting."""
for attempt in range(max_retries):
try:
response = requests.get(
f"{self.base_url}/{endpoint}",
headers=self.headers
)
if response.status_code == 429: # Rate limited
wait = 2 ** attempt
time.sleep(wait)
continue
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
Estrategias de extracción:
| Estrategia | Caso de Uso | Trade-off |
|---|---|---|
| Extracción completa | Tablas pequeñas (<1M filas), datos de referencia | Simple, pero lento para tablas grandes |
| Incremental (timestamp) | Tablas con columna updated_at | Rápido, pero pierde deletes duros |
| Incremental (ID) | Tablas con ID autoincremental | Captura inserts, pierde updates |
| CDC (Change Data Capture) | Todo tipo de cambios, extracción en tiempo real | Requiere Debezium o triggers de base de datos |
| Polling de API | Datos de SaaS externos | Rate limits, consistencia eventual |
2. Transforma Datos
Limpia, valida y remodela datos extraídos:
# Ejemplo: Pipeline de transformación con validación
import pandas as pd
from typing import Dict, List, Callable
class DataTransformer:
def __init__(self):
self.validators: List[Callable] = []
self.transformations: List[Callable] = []
def add_validator(self, validator: Callable):
self.validators.append(validator)
def add_transformation(self, transform: Callable):
self.transformations.append(transform)
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ejecutar todas las validaciones luego transformaciones."""
# Validación
errors = []
for validator in self.validators:
result = validator(df)
if result:
errors.extend(result)
if errors:
raise ValidationError(f"Validación falló con {len(errors)} errores: {errors[:5]}")
# Transformación
for transform in self.transformations:
df = transform(df)
return df
# Definir validadores
def validate_no_null_ids(df):
null_count = df['customer_id'].isnull().sum()
if null_count > 0:
return [f"{null_count} filas con customer_id nulo"]
return []
def validate_email_format(df):
invalid = df[~df['email'].str.contains(r'^[^@]+@[^@]+\.[^@]+$', na=False)]
if len(invalid) > 0:
return [f"{len(invalid)} filas con formato de email inválido"]
return []
# Definir transformaciones
def normalize_emails(df):
df['email'] = df['email'].str.lower().str.strip()
return df
def calculate_order_totals(df):
df['order_total'] = df['quantity'] * df['unit_price'] * (1 - df['discount'])
return df
def add_derived_columns(df):
df['order_year'] = pd.to_datetime(df['order_date']).dt.year
df['order_month'] = pd.to_datetime(df['order_date']).dt.month
df['customer_segment'] = pd.cut(
df['lifetime_value'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['Bronze', 'Silver', 'Gold', 'Platinum']
)
return df
# Construir pipeline
transformer = DataTransformer()
transformer.add_validator(validate_no_null_ids)
transformer.add_validator(validate_email_format)
transformer.add_transformation(normalize_emails)
transformer.add_transformation(calculate_order_totals)
transformer.add_transformation(add_derived_columns)
# Ejecutar pipeline
clean_data = transformer.transform(raw_data)
Patrones de transformación:
| Patrón | Descripción | Ejemplo |
|---|---|---|
| Limpieza | Remover/corregir datos inválidos | Manejo de nulos, deduplicación |
| Normalización | Estandarizar formatos | Fechas, monedas, unidades |
| Enriquecimiento | Agregar datos derivados | Geo-ubicación desde IP, segmento de cliente |
| Agregación | Resumir datos granulares | Ventas diarias desde líneas de pedido |
| Joining | Combinar múltiples fuentes | Pedidos + Clientes + Productos |
| Type casting | Convertir tipos de datos | String → Date, String → Numeric |
| Filtrado | Excluir filas irrelevantes | Datos de prueba, pedidos cancelados |
3. Carga en Data Warehouse
Carga datos transformados eficientemente:
# Ejemplo: Cargar en PostgreSQL con upsert (merge)
import psycopg2
from psycopg2.extras import execute_values
class PostgresLoader:
def __init__(self, connection_string):
self.conn = psycopg2.connect(connection_string)
def upsert(self, df, table, key_columns, batch_size=1000):
"""Insertar o actualizar registros usando ON CONFLICT."""
cursor = self.conn.cursor()
columns = list(df.columns)
column_str = ', '.join(columns)
# Construir cláusula update para columnas no-clave
update_columns = [c for c in columns if c not in key_columns]
update_clause = ', '.join([f"{c} = EXCLUDED.{c}" for c in update_columns])
# Insert en batch con upsert
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
values = [tuple(row) for _, row in batch.iterrows()]
query = f"""
INSERT INTO {table} ({column_str})
VALUES %s
ON CONFLICT ({', '.join(key_columns)}) DO UPDATE SET {update_clause}
"""
execute_values(cursor, query, values)
self.conn.commit()
def bulk_load(self, df, table, staging_table=None):
"""Carga rápida usando comando COPY vía tabla staging."""
staging = staging_table or f"{table}_staging"
cursor = self.conn.cursor()
# Crear tabla staging como target
cursor.execute(f"DROP TABLE IF EXISTS {staging}")
cursor.execute(f"CREATE TABLE {staging} (LIKE {table} INCLUDING ALL)")
# COPY datos a staging
from io import StringIO
buffer = StringIO()
df.to_csv(buffer, index=False, header=False, sep='\t', na_rep='\\N')
buffer.seek(0)
cursor.copy_from(buffer, staging, columns=list(df.columns), sep='\t', null='\\N')
# Merge staging a target
cursor.execute(f"""
INSERT INTO {table}
SELECT * FROM {staging}
ON CONFLICT DO NOTHING
""")
cursor.execute(f"DROP TABLE {staging}")
self.conn.commit()
# Ejemplo: Cargar en Snowflake usando Snowpark
from snowflake.snowpark import Session
class SnowflakeLoader:
def __init__(self, account, user, password, database, schema):
self.session = Session.builder.configs({
"account": account,
"user": user,
"password": password,
"database": database,
"schema": schema
}).create()
def load_dataframe(self, df, table):
"""Cargar DataFrame pandas a tabla Snowflake."""
snowpark_df = self.session.create_dataframe(df)
snowpark_df.write.mode("overwrite").save_as_table(table)
def merge_dataframe(self, df, table, key_columns):
"""Merge (upsert) DataFrame en tabla existente."""
temp_table = f"{table}_temp"
# Crear tabla temporal desde DataFrame
snowpark_df = self.session.create_dataframe(df)
snowpark_df.write.mode("overwrite").save_as_table(temp_table)
# Ejecutar MERGE
key_match = " AND ".join([f"t.{k} = s.{k}" for k in key_columns])
update_set = ", ".join([f"t.{c} = s.{c}" for c in df.columns if c not in key_columns])
insert_cols = ", ".join(df.columns)
insert_vals = ", ".join([f"s.{c}" for c in df.columns])
merge_sql = f"""
MERGE INTO {table} t
USING {temp_table} s
ON ({key_match})
WHEN MATCHED THEN UPDATE SET {update_set}
WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals})
"""
self.session.sql(merge_sql).collect()
self.session.sql(f"DROP TABLE IF EXISTS {temp_table}").collect()
Estrategias de carga:
| Estrategia | Mejor Para | Trade-off |
|---|---|---|
| Full refresh (TRUNCATE + INSERT) | Tablas pequeñas, data marts | Simple, pero downtime para tablas grandes |
| Upsert/Merge | Cargas incrementales, tablas de dimensión | Preserva historia, complejo |
| Staging + swap | Tablas de hechos grandes | Cero downtime, requiere 2× espacio temporalmente |
| Reemplazo de partición | Tablas particionadas (fecha) | Rápido, pero requiere alineación de partición |
| Insert stream | Micro-batches casi en tiempo real | Mayor complejidad, menor latencia |
4. Orquesta con Apache Airflow
Programa y monitorea flujos de trabajo ETL:
# Ejemplo: DAG de Airflow para ETL diario
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email': ['data-alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_sales_etl',
default_args=default_args,
description='ETL diario de datos de ventas desde producción a warehouse',
schedule_interval='0 2 * * *', # Correr a las 2 AM diario
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sales', 'etl'],
) as dag:
def extract_orders(**context):
pg_hook = PostgresHook(postgres_conn_id='production_db')
sql = """
SELECT * FROM orders
WHERE created_at >= %s AND created_at < %s
"""
execution_date = context['ds']
next_date = (datetime.strptime(execution_date, '%Y-%m-%d') +
timedelta(days=1)).strftime('%Y-%m-%d')
df = pg_hook.get_pandas_df(sql, parameters=(execution_date, next_date))
# Guardar en staging S3
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_hook.load_string(
df.to_csv(index=False),
key=f"staging/orders/{execution_date}.csv",
bucket_name='data-lake',
replace=True
)
return f"Extraídos {len(df)} pedidos"
def transform_orders(**context):
s3_hook = S3Hook(aws_conn_id='aws_default')
execution_date = context['ds']
# Leer desde staging
csv_data = s3_hook.read_key(
key=f"staging/orders/{execution_date}.csv",
bucket_name='data-lake'
)
df = pd.read_csv(pd.io.common.StringIO(csv_data))
# Transformar
transformer = DataTransformer()
transformer.add_transformation(calculate_order_totals)
transformer.add_transformation(add_derived_columns)
clean_df = transformer.transform(df)
# Escribir a procesados
s3_hook.load_string(
clean_df.to_csv(index=False),
key=f"processed/orders/{execution_date}.csv",
bucket_name='data-lake',
replace=True
)
return f"Transformados {len(clean_df)} pedidos"
def load_to_warehouse(**context):
execution_date = context['ds']
# Leer datos procesados
s3_hook = S3Hook(aws_conn_id='aws_default')
csv_data = s3_hook.read_key(
key=f"processed/orders/{execution_date}.csv",
bucket_name='data-lake'
)
df = pd.read_csv(pd.io.common.StringIO(csv_data))
# Cargar en Snowflake
loader = SnowflakeLoader(...)
loader.merge_dataframe(df, 'fact_orders', ['order_id'])
return f"Cargados {len(df)} pedidos al warehouse"
extract_task = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
)
transform_task = PythonOperator(
task_id='transform_orders',
python_callable=transform_orders,
)
load_task = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse,
)
extract_task >> transform_task >> load_task
Slowly Changing Dimensions (SCD)
Rastrea cómo cambian los datos de dimensión con el tiempo:
-- SCD Tipo 2: Mantener historia con fechas efectivas
CREATE TABLE dim_customers (
customer_sk BIGINT PRIMARY KEY, -- Surrogate key
customer_id BIGINT NOT NULL, -- Natural key
name VARCHAR(255),
email VARCHAR(255),
segment VARCHAR(50),
effective_date DATE NOT NULL,
expiration_date DATE,
is_current BOOLEAN DEFAULT TRUE,
UNIQUE(customer_id, effective_date)
);
-- Insertar nueva versión cuando cambia cliente
INSERT INTO dim_customers (customer_id, name, email, segment, effective_date)
SELECT
s.customer_id,
s.name,
s.email,
s.segment,
CURRENT_DATE
FROM staging_customers s
LEFT JOIN dim_customers d ON s.customer_id = d.customer_id AND d.is_current = TRUE
WHERE d.customer_sk IS NULL -- Cliente nuevo
OR (d.name <> s.name OR d.email <> s.email OR d.segment <> s.segment); -- Cambiado
-- Expirar versión vieja
UPDATE dim_customers d
SET expiration_date = CURRENT_DATE - 1,
is_current = FALSE
FROM staging_customers s
WHERE d.customer_id = s.customer_id
AND d.is_current = TRUE
AND (d.name <> s.name OR d.email <> s.email OR d.segment <> s.segment);
Mejores Prácticas
- Usa tablas de staging. Nunca transformes datos directamente en tablas de producción. Stage, valida, luego carga.
- Haz pipelines idempotentes. Correr el mismo DAG dos veces debería producir el mismo resultado.
- Valida temprano, valida frecuentemente. Atrapa problemas de calidad de datos en staging, no en el warehouse.
- Particiona tablas grandes. Carga datos por partición para habilitar reemplazo rápido y pruning.
- Monitorea frescura de datos. Alerta cuando tablas no han sido actualizadas dentro del SLA.
- Documenta lineage. Rastrea qué tablas fuente alimentan qué tablas del warehouse.
- Prueba transformaciones. Prueba lógica de negocio como código de aplicación.
Errores Comunes
- Sin validación de datos. Datos malos corrompen silenciosamente reportes y dashboards.
- Transformar en producción. Correr UPDATE directamente en el warehouse es riesgoso y difícil de revertir.
- Sin carga incremental. Full refreshes de tablas grandes toman horas y desperdician recursos.
- Sin monitoreo de SLA. Los stakeholders no saben que el pipeline falló hasta que ven dashboards obsoletos.
- Credenciales hard-coded. Usa manejadores de conexión (Airflow, AWS Secrets Manager) en su lugar.
- Sin lógica de reintento. Fallos transitorios de red no deberían hacer fallar todo el pipeline.
Variantes
- ELT (Extract, Load, Transform): Cargar datos crudos al warehouse primero, luego transformar con SQL (dbt, Snowflake) — más simple para equipos nativos de SQL
- Reverse ETL: Empujar datos del warehouse de vuelta a sistemas operacionales (CRM, herramientas de marketing)
- Zero-ETL: Consulta federada directa sin mover datos (BigQuery Federated Queries, Snowflake External Tables)
- Change Data Capture (CDC): Extracción en tiempo real usando logs de base de datos en lugar de polling por batch
FAQ
P: ¿Debería usar ETL o ELT? Usa ETL cuando las transformaciones son complejas (Python, APIs externos) o cuando necesitas limpiar datos antes de que lleguen al warehouse. Usa ELT cuando las transformaciones son basadas en SQL y tu warehouse es lo suficientemente potente para manejarlas (Snowflake, BigQuery, Redshift).
P: ¿Cómo manejo datos que llegan tarde? Implementa un proceso de “late arriving data” que reprocesa particiones pasadas cuando los datos llegan después de la carga inicial. O usa ingestión streaming que maneja eventos fuera de orden.
P: ¿Cómo hago backfill de datos históricos? Corre tu pipeline en un loop sobre rangos de fechas históricas, o usa un DAG parametrizado que acepta un rango de fechas y lo procesa en chunks.
P: ¿Cuál es la diferencia entre un data lake y un data warehouse? Un data lake almacena datos crudos y no procesados en archivos (S3, GCS) con schema-on-read. Un data warehouse almacena datos estructurados y procesados en tablas con schema-on-write. ETL típicamente mueve datos de fuentes → lake → warehouse.
Conclusión
Los pipelines ETL son la columna vertebral de la inteligencia de negocios y el análisis. Al extraer datos de forma confiable, transformarlos con validación y cargarlos eficientemente, creas una base de datos confiable para reportes, dashboards y machine learning.