Skip to content
SP StackPractices
intermediate Por StackPractices

Pipelines ETL — Extract, Transform, Load para Ingenieros de Datos

Guía práctica sobre pipelines ETL: extraer datos de múltiples fuentes, transformar con validación y lógica de negocio, y cargar en data warehouses. Cubre programación de batches, manejo de errores y monitoreo con Python, dbt y Airflow.

Nota para desarrolladores hispanohablantes: Esta guía incluye ejemplos y convenciones de nomenclatura adaptadas a equipos que trabajan en español. Cuando existen diferencias significativas en terminología técnica entre el inglés y el español, se indican explícitamente para facilitar la comunicación en equipos multiculturales.

Descripción General

ETL (Extract, Transform, Load) mueve datos desde sistemas operacionales hacia sistemas analíticos donde pueden ser consultados, reportados y usados para toma de decisiones. A diferencia del procesamiento de streams, que maneja eventos a medida que llegan, ETL procesa datos en batches programados, haciéndolo más simple de implementar y razonar para muchos casos de uso de análisis de negocio.

Esta guía cubre arquitectura de pipelines, estrategias de extracción de datos, patrones de transformación, técnicas de carga y consideraciones operativas de producción.

Cuándo Usar

  • Necesitas consolidar datos de múltiples fuentes en una base de datos analítica única
  • Tus consultas analíticas son demasiado lentas o disruptivas para correr en bases de datos de producción
  • Necesitas snapshots históricos de datos que cambian con el tiempo
  • Tus datos requieren limpieza, enriquecimiento o agregación antes del análisis
  • Quieres separar cargas de trabajo operacionales y analíticos
  • Necesitas procesamiento de datos programado y repetible (por hora, diario, semanal)

Cuándo NO Usar

  • Necesitas latencia sub-segundo de evento a insight — usa procesamiento de streams
  • Tu volumen de datos es lo suficientemente pequeño para consultar directamente en las bases de datos fuente
  • Necesitas detección de fraude o alertas en tiempo real — usa streaming de eventos
  • Tus cambios de datos son eventos discretos que deberían disparar acciones inmediatas

Conceptos Clave

ConceptoDescripción
ExtractLeer datos de sistemas fuente (bases de datos, APIs, archivos)
TransformLimpiar, validar, enriquecer y reestructurar datos
LoadEscribir datos procesados al destino (data warehouse, data lake)
StagingAlmacenamiento intermedio para datos crudos antes de transformación
Carga IncrementalProcesar solo registros nuevos o cambiados desde la última ejecución
Full RefreshReprocesar todos los datos desde cero
SCDSlowly Changing Dimension — rastrear cambios históricos

Arquitectura ETL

┌──────────┐    ┌──────────┐    ┌──────────┐
│  Fuente  │    │  Fuente  │    │  Fuente  │
│   CRM    │    │  Pedidos │    │   Logs   │
└────┬─────┘    └────┬─────┘    └────┬─────┘
     │               │               │
     └───────────────┼───────────────┘
                     │ Extract

            ┌────────────────┐
            │   Área de Staging │   (Datos crudos, schema-on-read)
            │  (S3 / GCS /   │
            │   Tablas temp)  │
            └───────┬────────┘
                    │ Transform

            ┌────────────────┐
            │  Data Warehouse │   (Datos limpios, modelados)
            │  (Snowflake /   │
            │   BigQuery /     │
            │   PostgreSQL)    │
            └────────────────┘

Implementación ETL Paso a Paso

1. Extraer Datos de Fuentes

Extrae datos de forma confiable sin impactar sistemas fuente:

# Ejemplo: Extraer desde PostgreSQL con carga incremental
import psycopg2
import pandas as pd
from datetime import datetime

class PostgresExtractor:
    def __init__(self, connection_string, watermark_table='etl_watermarks'):
        self.conn = psycopg2.connect(connection_string)
        self.watermark_table = watermark_table
        self._ensure_watermark_table()
    
    def _ensure_watermark_table(self):
        """Rastrear última hora de extracción por tabla."""
        cursor = self.conn.cursor()
        cursor.execute(f"""
            CREATE TABLE IF NOT EXISTS {self.watermark_table} (
                table_name VARCHAR(255) PRIMARY KEY,
                last_extracted TIMESTAMP,
                last_id BIGINT,
                record_count BIGINT
            )
        """)
        self.conn.commit()
    
    def extract_incremental(self, table, timestamp_column='updated_at', 
                           id_column='id', batch_size=10000):
        """Extraer solo registros cambiados desde la última ejecución."""
        cursor = self.conn.cursor()
        
        # Obtener watermark
        cursor.execute(f"""
            SELECT last_extracted FROM {self.watermark_table} 
            WHERE table_name = %s
        """, (table,))
        row = cursor.fetchone()
        last_extracted = row[0] if row else datetime.min
        
        # Extraer registros nuevos/cambiados
        cursor.execute(f"""
            SELECT * FROM {table} 
            WHERE {timestamp_column} > %s 
            ORDER BY {id_column}
            LIMIT %s
        """, (last_extracted, batch_size))
        
        columns = [desc[0] for desc in cursor.description]
        data = cursor.fetchall()
        df = pd.DataFrame(data, columns=columns)
        
        # Actualizar watermark
        if not df.empty:
            max_timestamp = df[timestamp_column].max()
            cursor.execute(f"""
                INSERT INTO {self.watermark_table} (table_name, last_extracted)
                VALUES (%s, %s)
                ON CONFLICT (table_name) DO UPDATE SET last_extracted = EXCLUDED.last_extracted
            """, (table, max_timestamp))
            self.conn.commit()
        
        return df
    
    def extract_full(self, table):
        """Extraer tabla completa (para datos de referencia pequeños)."""
        return pd.read_sql(f"SELECT * FROM {table}", self.conn)
# Ejemplo: Extraer desde API REST con paginación
import requests
import json

class APIExtractor:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.headers = {'Authorization': f'Bearer {api_key}'}
    
    def extract_paginated(self, endpoint, params=None):
        """Extraer todas las páginas de una API paginada."""
        all_data = []
        page = 1
        
        while True:
            response = requests.get(
                f"{self.base_url}/{endpoint}",
                headers=self.headers,
                params={**(params or {}), 'page': page, 'per_page': 100}
            )
            response.raise_for_status()
            
            data = response.json()
            if not data.get('results'):
                break
            
            all_data.extend(data['results'])
            
            if not data.get('has_more', False):
                break
            
            page += 1
        
        return pd.DataFrame(all_data)
    
    def extract_with_backoff(self, endpoint, max_retries=3):
        """Extraer con backoff exponencial para rate limiting."""
        for attempt in range(max_retries):
            try:
                response = requests.get(
                    f"{self.base_url}/{endpoint}",
                    headers=self.headers
                )
                
                if response.status_code == 429:  # Rate limited
                    wait = 2 ** attempt
                    time.sleep(wait)
                    continue
                
                response.raise_for_status()
                return response.json()
            
            except requests.RequestException as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)

Estrategias de extracción:

EstrategiaCaso de UsoTrade-off
Extracción completaTablas pequeñas (<1M filas), datos de referenciaSimple, pero lento para tablas grandes
Incremental (timestamp)Tablas con columna updated_atRápido, pero pierde deletes duros
Incremental (ID)Tablas con ID autoincrementalCaptura inserts, pierde updates
CDC (Change Data Capture)Todo tipo de cambios, extracción en tiempo realRequiere Debezium o triggers de base de datos
Polling de APIDatos de SaaS externosRate limits, consistencia eventual

2. Transforma Datos

Limpia, valida y remodela datos extraídos:

# Ejemplo: Pipeline de transformación con validación
import pandas as pd
from typing import Dict, List, Callable

class DataTransformer:
    def __init__(self):
        self.validators: List[Callable] = []
        self.transformations: List[Callable] = []
    
    def add_validator(self, validator: Callable):
        self.validators.append(validator)
    
    def add_transformation(self, transform: Callable):
        self.transformations.append(transform)
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """Ejecutar todas las validaciones luego transformaciones."""
        # Validación
        errors = []
        for validator in self.validators:
            result = validator(df)
            if result:
                errors.extend(result)
        
        if errors:
            raise ValidationError(f"Validación falló con {len(errors)} errores: {errors[:5]}")
        
        # Transformación
        for transform in self.transformations:
            df = transform(df)
        
        return df

# Definir validadores
def validate_no_null_ids(df):
    null_count = df['customer_id'].isnull().sum()
    if null_count > 0:
        return [f"{null_count} filas con customer_id nulo"]
    return []

def validate_email_format(df):
    invalid = df[~df['email'].str.contains(r'^[^@]+@[^@]+\.[^@]+$', na=False)]
    if len(invalid) > 0:
        return [f"{len(invalid)} filas con formato de email inválido"]
    return []

# Definir transformaciones
def normalize_emails(df):
    df['email'] = df['email'].str.lower().str.strip()
    return df

def calculate_order_totals(df):
    df['order_total'] = df['quantity'] * df['unit_price'] * (1 - df['discount'])
    return df

def add_derived_columns(df):
    df['order_year'] = pd.to_datetime(df['order_date']).dt.year
    df['order_month'] = pd.to_datetime(df['order_date']).dt.month
    df['customer_segment'] = pd.cut(
        df['lifetime_value'],
        bins=[0, 100, 500, 1000, float('inf')],
        labels=['Bronze', 'Silver', 'Gold', 'Platinum']
    )
    return df

# Construir pipeline
transformer = DataTransformer()
transformer.add_validator(validate_no_null_ids)
transformer.add_validator(validate_email_format)
transformer.add_transformation(normalize_emails)
transformer.add_transformation(calculate_order_totals)
transformer.add_transformation(add_derived_columns)

# Ejecutar pipeline
clean_data = transformer.transform(raw_data)

Patrones de transformación:

PatrónDescripciónEjemplo
LimpiezaRemover/corregir datos inválidosManejo de nulos, deduplicación
NormalizaciónEstandarizar formatosFechas, monedas, unidades
EnriquecimientoAgregar datos derivadosGeo-ubicación desde IP, segmento de cliente
AgregaciónResumir datos granularesVentas diarias desde líneas de pedido
JoiningCombinar múltiples fuentesPedidos + Clientes + Productos
Type castingConvertir tipos de datosString → Date, String → Numeric
FiltradoExcluir filas irrelevantesDatos de prueba, pedidos cancelados

3. Carga en Data Warehouse

Carga datos transformados eficientemente:

# Ejemplo: Cargar en PostgreSQL con upsert (merge)
import psycopg2
from psycopg2.extras import execute_values

class PostgresLoader:
    def __init__(self, connection_string):
        self.conn = psycopg2.connect(connection_string)
    
    def upsert(self, df, table, key_columns, batch_size=1000):
        """Insertar o actualizar registros usando ON CONFLICT."""
        cursor = self.conn.cursor()
        
        columns = list(df.columns)
        column_str = ', '.join(columns)
        
        # Construir cláusula update para columnas no-clave
        update_columns = [c for c in columns if c not in key_columns]
        update_clause = ', '.join([f"{c} = EXCLUDED.{c}" for c in update_columns])
        
        # Insert en batch con upsert
        for i in range(0, len(df), batch_size):
            batch = df.iloc[i:i+batch_size]
            values = [tuple(row) for _, row in batch.iterrows()]
            
            query = f"""
                INSERT INTO {table} ({column_str})
                VALUES %s
                ON CONFLICT ({', '.join(key_columns)}) DO UPDATE SET {update_clause}
            """
            
            execute_values(cursor, query, values)
            self.conn.commit()
    
    def bulk_load(self, df, table, staging_table=None):
        """Carga rápida usando comando COPY vía tabla staging."""
        staging = staging_table or f"{table}_staging"
        cursor = self.conn.cursor()
        
        # Crear tabla staging como target
        cursor.execute(f"DROP TABLE IF EXISTS {staging}")
        cursor.execute(f"CREATE TABLE {staging} (LIKE {table} INCLUDING ALL)")
        
        # COPY datos a staging
        from io import StringIO
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False, sep='\t', na_rep='\\N')
        buffer.seek(0)
        
        cursor.copy_from(buffer, staging, columns=list(df.columns), sep='\t', null='\\N')
        
        # Merge staging a target
        cursor.execute(f"""
            INSERT INTO {table}
            SELECT * FROM {staging}
            ON CONFLICT DO NOTHING
        """)
        
        cursor.execute(f"DROP TABLE {staging}")
        self.conn.commit()
# Ejemplo: Cargar en Snowflake usando Snowpark
from snowflake.snowpark import Session

class SnowflakeLoader:
    def __init__(self, account, user, password, database, schema):
        self.session = Session.builder.configs({
            "account": account,
            "user": user,
            "password": password,
            "database": database,
            "schema": schema
        }).create()
    
    def load_dataframe(self, df, table):
        """Cargar DataFrame pandas a tabla Snowflake."""
        snowpark_df = self.session.create_dataframe(df)
        snowpark_df.write.mode("overwrite").save_as_table(table)
    
    def merge_dataframe(self, df, table, key_columns):
        """Merge (upsert) DataFrame en tabla existente."""
        temp_table = f"{table}_temp"
        
        # Crear tabla temporal desde DataFrame
        snowpark_df = self.session.create_dataframe(df)
        snowpark_df.write.mode("overwrite").save_as_table(temp_table)
        
        # Ejecutar MERGE
        key_match = " AND ".join([f"t.{k} = s.{k}" for k in key_columns])
        update_set = ", ".join([f"t.{c} = s.{c}" for c in df.columns if c not in key_columns])
        insert_cols = ", ".join(df.columns)
        insert_vals = ", ".join([f"s.{c}" for c in df.columns])
        
        merge_sql = f"""
            MERGE INTO {table} t
            USING {temp_table} s
            ON ({key_match})
            WHEN MATCHED THEN UPDATE SET {update_set}
            WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals})
        """
        
        self.session.sql(merge_sql).collect()
        self.session.sql(f"DROP TABLE IF EXISTS {temp_table}").collect()

Estrategias de carga:

EstrategiaMejor ParaTrade-off
Full refresh (TRUNCATE + INSERT)Tablas pequeñas, data martsSimple, pero downtime para tablas grandes
Upsert/MergeCargas incrementales, tablas de dimensiónPreserva historia, complejo
Staging + swapTablas de hechos grandesCero downtime, requiere 2× espacio temporalmente
Reemplazo de particiónTablas particionadas (fecha)Rápido, pero requiere alineación de partición
Insert streamMicro-batches casi en tiempo realMayor complejidad, menor latencia

4. Orquesta con Apache Airflow

Programa y monitorea flujos de trabajo ETL:

# Ejemplo: DAG de Airflow para ETL diario
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email': ['data-alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_sales_etl',
    default_args=default_args,
    description='ETL diario de datos de ventas desde producción a warehouse',
    schedule_interval='0 2 * * *',  # Correr a las 2 AM diario
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'etl'],
) as dag:
    
    def extract_orders(**context):
        pg_hook = PostgresHook(postgres_conn_id='production_db')
        sql = """
            SELECT * FROM orders 
            WHERE created_at >= %s AND created_at < %s
        """
        execution_date = context['ds']
        next_date = (datetime.strptime(execution_date, '%Y-%m-%d') + 
                     timedelta(days=1)).strftime('%Y-%m-%d')
        
        df = pg_hook.get_pandas_df(sql, parameters=(execution_date, next_date))
        
        # Guardar en staging S3
        s3_hook = S3Hook(aws_conn_id='aws_default')
        s3_hook.load_string(
            df.to_csv(index=False),
            key=f"staging/orders/{execution_date}.csv",
            bucket_name='data-lake',
            replace=True
        )
        
        return f"Extraídos {len(df)} pedidos"
    
    def transform_orders(**context):
        s3_hook = S3Hook(aws_conn_id='aws_default')
        execution_date = context['ds']
        
        # Leer desde staging
        csv_data = s3_hook.read_key(
            key=f"staging/orders/{execution_date}.csv",
            bucket_name='data-lake'
        )
        df = pd.read_csv(pd.io.common.StringIO(csv_data))
        
        # Transformar
        transformer = DataTransformer()
        transformer.add_transformation(calculate_order_totals)
        transformer.add_transformation(add_derived_columns)
        clean_df = transformer.transform(df)
        
        # Escribir a procesados
        s3_hook.load_string(
            clean_df.to_csv(index=False),
            key=f"processed/orders/{execution_date}.csv",
            bucket_name='data-lake',
            replace=True
        )
        
        return f"Transformados {len(clean_df)} pedidos"
    
    def load_to_warehouse(**context):
        execution_date = context['ds']
        
        # Leer datos procesados
        s3_hook = S3Hook(aws_conn_id='aws_default')
        csv_data = s3_hook.read_key(
            key=f"processed/orders/{execution_date}.csv",
            bucket_name='data-lake'
        )
        df = pd.read_csv(pd.io.common.StringIO(csv_data))
        
        # Cargar en Snowflake
        loader = SnowflakeLoader(...)
        loader.merge_dataframe(df, 'fact_orders', ['order_id'])
        
        return f"Cargados {len(df)} pedidos al warehouse"
    
    extract_task = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders,
    )
    
    transform_task = PythonOperator(
        task_id='transform_orders',
        python_callable=transform_orders,
    )
    
    load_task = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse,
    )
    
    extract_task >> transform_task >> load_task

Slowly Changing Dimensions (SCD)

Rastrea cómo cambian los datos de dimensión con el tiempo:

-- SCD Tipo 2: Mantener historia con fechas efectivas
CREATE TABLE dim_customers (
    customer_sk BIGINT PRIMARY KEY,        -- Surrogate key
    customer_id BIGINT NOT NULL,            -- Natural key
    name VARCHAR(255),
    email VARCHAR(255),
    segment VARCHAR(50),
    effective_date DATE NOT NULL,
    expiration_date DATE,
    is_current BOOLEAN DEFAULT TRUE,
    UNIQUE(customer_id, effective_date)
);

-- Insertar nueva versión cuando cambia cliente
INSERT INTO dim_customers (customer_id, name, email, segment, effective_date)
SELECT 
    s.customer_id,
    s.name,
    s.email,
    s.segment,
    CURRENT_DATE
FROM staging_customers s
LEFT JOIN dim_customers d ON s.customer_id = d.customer_id AND d.is_current = TRUE
WHERE d.customer_sk IS NULL  -- Cliente nuevo
   OR (d.name <> s.name OR d.email <> s.email OR d.segment <> s.segment);  -- Cambiado

-- Expirar versión vieja
UPDATE dim_customers d
SET expiration_date = CURRENT_DATE - 1,
    is_current = FALSE
FROM staging_customers s
WHERE d.customer_id = s.customer_id 
  AND d.is_current = TRUE
  AND (d.name <> s.name OR d.email <> s.email OR d.segment <> s.segment);

Mejores Prácticas

  • Usa tablas de staging. Nunca transformes datos directamente en tablas de producción. Stage, valida, luego carga.
  • Haz pipelines idempotentes. Correr el mismo DAG dos veces debería producir el mismo resultado.
  • Valida temprano, valida frecuentemente. Atrapa problemas de calidad de datos en staging, no en el warehouse.
  • Particiona tablas grandes. Carga datos por partición para habilitar reemplazo rápido y pruning.
  • Monitorea frescura de datos. Alerta cuando tablas no han sido actualizadas dentro del SLA.
  • Documenta lineage. Rastrea qué tablas fuente alimentan qué tablas del warehouse.
  • Prueba transformaciones. Prueba lógica de negocio como código de aplicación.

Errores Comunes

  • Sin validación de datos. Datos malos corrompen silenciosamente reportes y dashboards.
  • Transformar en producción. Correr UPDATE directamente en el warehouse es riesgoso y difícil de revertir.
  • Sin carga incremental. Full refreshes de tablas grandes toman horas y desperdician recursos.
  • Sin monitoreo de SLA. Los stakeholders no saben que el pipeline falló hasta que ven dashboards obsoletos.
  • Credenciales hard-coded. Usa manejadores de conexión (Airflow, AWS Secrets Manager) en su lugar.
  • Sin lógica de reintento. Fallos transitorios de red no deberían hacer fallar todo el pipeline.

Variantes

  • ELT (Extract, Load, Transform): Cargar datos crudos al warehouse primero, luego transformar con SQL (dbt, Snowflake) — más simple para equipos nativos de SQL
  • Reverse ETL: Empujar datos del warehouse de vuelta a sistemas operacionales (CRM, herramientas de marketing)
  • Zero-ETL: Consulta federada directa sin mover datos (BigQuery Federated Queries, Snowflake External Tables)
  • Change Data Capture (CDC): Extracción en tiempo real usando logs de base de datos en lugar de polling por batch

FAQ

P: ¿Debería usar ETL o ELT? Usa ETL cuando las transformaciones son complejas (Python, APIs externos) o cuando necesitas limpiar datos antes de que lleguen al warehouse. Usa ELT cuando las transformaciones son basadas en SQL y tu warehouse es lo suficientemente potente para manejarlas (Snowflake, BigQuery, Redshift).

P: ¿Cómo manejo datos que llegan tarde? Implementa un proceso de “late arriving data” que reprocesa particiones pasadas cuando los datos llegan después de la carga inicial. O usa ingestión streaming que maneja eventos fuera de orden.

P: ¿Cómo hago backfill de datos históricos? Corre tu pipeline en un loop sobre rangos de fechas históricas, o usa un DAG parametrizado que acepta un rango de fechas y lo procesa en chunks.

P: ¿Cuál es la diferencia entre un data lake y un data warehouse? Un data lake almacena datos crudos y no procesados en archivos (S3, GCS) con schema-on-read. Un data warehouse almacena datos estructurados y procesados en tablas con schema-on-write. ETL típicamente mueve datos de fuentes → lake → warehouse.

Conclusión

Los pipelines ETL son la columna vertebral de la inteligencia de negocios y el análisis. Al extraer datos de forma confiable, transformarlos con validación y cargarlos eficientemente, creas una base de datos confiable para reportes, dashboards y machine learning.

Recursos Relacionados