Pular para o conteúdo principal

Pipelines de dados

Definição

Uma pipeline de dados é uma sequência automatizada de etapas que move dados brutos de uma ou mais fontes para um destino onde podem ser consumidos por analistas, dashboards ou modelos de machine learning. No contexto de ML, as pipelines não se tratam apenas de mover dados: elas garantem que os dados cheguem na forma certa, no momento certo e com qualidade verificável para que os modelos treinem e sirvam de forma previsível. Sem pipelines confiáveis, cada artefato subsequente — features, modelos treinados, previsões — é suspeito.

As pipelines de dados estão na base de todo sistema MLOps. Elas abrangem a ingestão de fontes heterogêneas (bancos de dados, APIs, streams de eventos, arquivos), a transformação para produzir conjuntos de dados limpos e estruturados ou vetores de features, o armazenamento em data warehouses ou feature stores, e o serving para jobs de treinamento ou endpoints de inferência online. As escolhas de design na camada de pipeline — batch vs. streaming, push vs. pull, schema-on-read vs. schema-on-write — se propagam até a latência, frescor e confiabilidade do modelo.

A qualidade dos dados é o contrato oculto entre engenheiros de dados e equipes de modelos. Desvio de esquema, explosões de nulos, mudança de distribuição e registros duplicados estão entre as causas mais comuns de degradação silenciosa do modelo. As pipelines modernas incorporam pontos de verificação de validação (usando ferramentas como Great Expectations ou testes dbt) para detectar esses problemas antes que dados ruins cheguem ao treinamento ou serving.

Como funciona

Batch vs. Streaming

Pipelines batch processam dados em pedaços delimitados em um cronograma — de hora em hora, diariamente ou acionados por chegada de arquivos. São mais simples de construir e raciocinar e são o padrão certo quando o consumidor downstream (um job de treinamento noturno, um dashboard BI) não requer frescor abaixo de um minuto. Pipelines de streaming processam registros à medida que chegam, permitindo features quase em tempo real para modelos online. A compensação é a complexidade operacional: você precisa lidar com chegadas tardias, eventos fora de ordem e semântica exactly-once.

ETL vs. ELT

Extract-Transform-Load (ETL) aplica transformações antes que os dados pousem no armazenamento de destino. Extract-Load-Transform (ELT) carrega dados brutos primeiro e depois os transforma dentro de um warehouse ou lakehouse poderoso (por exemplo, BigQuery, Snowflake, Databricks). ELT preserva o histórico bruto e permite exploração ad hoc sem re-ingestão — uma grande vantagem em cargas de trabalho ML onde a engenharia de features evolui constantemente.

Qualidade de dados e validação de esquema

Verificações de qualidade de dados devem ser incorporadas em cada estágio da pipeline, não acrescentadas no final. Na ingestão, as verificações confirmam que os dados de origem estão conformes com o esquema esperado. Na transformação, verificações em nível de linha afirmam regras de negócio. Na camada de serving, verificações estatísticas detectam desvio de distribuição — o assassino silencioso de modelos implantados.

Quando usar / Quando NÃO usar

Usar quandoEvitar quando
Várias fontes de dados precisam ser consolidadas para treinamento MLOs dados já existem em uma única tabela limpa pronta para uso direto
Os dados precisam ser atualizados em um cronograma ou em tempo realSua análise é uma exploração pontual que não será repetida
Os modelos downstream requerem garantias de qualidadeO overhead de uma pipeline completa excede o valor para um protótipo rápido
As transformações precisam ser versionadas, testadas e reproduzíveisO volume de dados é trivial e um script simples em um notebook é suficiente
Vários consumidores compartilham os mesmos dados processadosO sistema de origem já fornece uma API limpa e com contrato

Comparações

CritérioPipeline batchPipeline de streaming
Frescor dos dadosMinutos a horas (baseado em cronograma)Sub-segundo a segundos
ComplexidadeBaixa — conjuntos de dados delimitados, retentativas simplesAlta — dados tardios, janelamento, estado
CustoPrevisível, computação em rajadasComputação contínua, baseline frequentemente maior
Tolerância a falhasReexecutar o batch com falhaSemântica exactly-once ou at-least-once necessária
Caso de uso ML típicoTreinamento offline, atualização noturna de featuresFeature store online, pontuação em tempo real

Prós e contras

PrósContras
Centraliza e padroniza o acesso a dados entre equipesInvestimento inicial não trivial para construir e manter
Permite transformações de dados reproduzíveis e testadasFalhas de pipeline se propagam para todos os consumidores downstream
Incorpora verificações de qualidade antes que dados ruins cheguem a modelosDepurar pipelines distribuídas é complexo
Suporta versionamento e rastreamento de linhagemStreaming adiciona sobrecarga operacional significativa
Desacopla produtores de consumidoresRequer disciplina de governança e propriedade de dados

Exemplos de código

import pandas as pd
import pandera as pa
from pandera import Column, DataFrameSchema, Check
from pathlib import Path

raw_schema = DataFrameSchema({
"user_id": Column(int, nullable=False),
"event_ts": Column(str, nullable=False),
"amount": Column(float, Check(lambda x: x >= 0), nullable=False),
"category": Column(str, nullable=True),
})

def extract(source_path: str) -> pd.DataFrame:
return pd.read_csv(source_path)

def validate(df: pd.DataFrame, schema: DataFrameSchema) -> pd.DataFrame:
return schema.validate(df)

def transform(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["event_date"] = pd.to_datetime(df["event_ts"])
df.drop(columns=["event_ts"], inplace=True)
df["category"] = df["category"].fillna("unknown")
import numpy as np
df["log_amount"] = np.log1p(df["amount"])
df.drop_duplicates(subset=["user_id", "event_date"], inplace=True)
return df

def load(df: pd.DataFrame, dest_path: str) -> None:
Path(dest_path).parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(dest_path, index=False)

def run_pipeline(source: str, destination: str) -> None:
raw = extract(source)
validated = validate(raw, raw_schema)
clean = transform(validated)
load(clean, destination)
print("[pipeline] completed successfully")

if __name__ == "__main__":
run_pipeline("data/raw/events.csv", "data/processed/events.parquet")

Recursos práticos

Veja também