Pular para o conteúdo principal

Apache Kafka

Definição

Apache Kafka é uma plataforma de streaming de eventos distribuída desenvolvida originalmente no LinkedIn e lançada como código aberto em 2011. É projetada para lidar com streams de eventos de alto throughput, baixa latência e duráveis — mensagens de log, eventos de atividade de usuários, leituras de sensores, transações — em um cluster de hardware commodity. Kafka atua como um log persistente e reproduzível: produtores escrevem eventos em tópicos nomeados, e consumidores leem desses tópicos no seu próprio ritmo, independentemente uns dos outros.

No contexto de ML, Kafka ocupa dois papéis críticos. Primeiro, serve como a espinha dorsal de dados para pipelines de features em tempo real. Segundo, é usado para pipelines de serving de modelos: as requisições de previsão chegam como mensagens Kafka, um consumidor aplica o modelo e produz eventos de previsão para um tópico de resultados.

Como funciona

Tópicos e partições

Um tópico é um log nomeado, ordenado e imutável de eventos. Os tópicos são divididos em partições — a unidade de paralelismo no Kafka.

Produtores

Um produtor é um cliente que publica registros em um ou mais tópicos. Quando uma chave está presente, o Kafka usa hashing consistente para rotear todos os registros com a mesma chave para a mesma partição.

Consumidores e grupos de consumidores

Um consumidor lê registros de uma ou mais partições. Os consumidores se organizam em grupos de consumidores: cada registro é entregue a exatamente um consumidor dentro de um grupo.

Quando usar / Quando NÃO usar

Usar quandoEvitar quando
Você precisa de streaming de eventos de alto throughput e durávelSeu caso de uso é enfileiramento de tarefas simples com taxa modesta de mensagens
Vários grupos de consumidores independentes precisam ler o mesmo stream de eventosVocê precisa de lógica de roteamento complexa ou filas de mensagens mortas
Você precisa reproduzir eventos históricos para preencher feature storesA complexidade operacional de um cluster Kafka não é justificada pela carga de trabalho
Computação de features em tempo real é necessária para serving de ML onlineAs mensagens são grandes (> alguns MB)
A ordenação de eventos por entidade deve ser preservadaSua equipe precisa de um message broker totalmente gerenciado

Comparações

CritérioApache KafkaRabbitMQ
ThroughputExtremamente alto — milhões de mensagens/seg.Moderado — centenas de milhares/seg.
LatênciaBaixa (ms de um dígito)Muito baixa — sub-milissegundo
Persistência de mensagensMensagens retidas por período configurável; totalmente reproduzíveisMensagens excluídas após confirmação por padrão
Modelo de consumidorBaseado em pullBaseado em push
ComplexidadeAltaModerada
Melhor caso de uso MLPipelines de features em tempo real, event sourcingFilas de tarefas para jobs ML assíncronos

Prós e contras

PrósContras
Throughput extremamente alto e escalabilidade horizontalComplexidade operacional significativa
Log durável e reproduzível permite backfills e auditabilidadeNão adequado para cargas de trabalho com mensagens muito pequenas ou infrequentes
Desacopla produtores e consumidoresA evolução do esquema requer um schema registry
Vários grupos de consumidores podem ler o mesmo tópico independentementeAjustar contagens de partições requer experiência
Forte ecossistema: Kafka Connect, Kafka Streams, ksqlDBOverhead operacional historicamente maior que alternativas hospedadas

Exemplo de código

import json, time, random, threading
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic

BROKER = "localhost:9092"
EVENTS_TOPIC = "user-events"
PREDICTIONS_TOPIC = "predictions"

def ensure_topics() -> None:
admin = KafkaAdminClient(bootstrap_servers=BROKER)
existing = admin.list_topics()
topics_to_create = [
NewTopic(name=t, num_partitions=3, replication_factor=1)
for t in [EVENTS_TOPIC, PREDICTIONS_TOPIC] if t not in existing
]
if topics_to_create:
admin.create_topics(new_topics=topics_to_create, validate_only=False)
admin.close()

def run_producer(num_events: int = 20, delay: float = 0.5) -> None:
producer = KafkaProducer(
bootstrap_servers=BROKER,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8"),
acks="all", retries=3,
)
for i in range(num_events):
user_id = f"user_{random.randint(1, 5)}"
event = {
"event_id": i, "user_id": user_id,
"page_id": random.choice(["home", "product", "checkout", "search"]),
"dwell_time_sec": round(random.uniform(1.0, 120.0), 2),
"timestamp": datetime.utcnow().isoformat(),
}
producer.send(topic=EVENTS_TOPIC, key=user_id, value=event)
time.sleep(delay)
producer.flush()
producer.close()

def score_event(event: dict) -> float:
base = event["dwell_time_sec"] / 120.0
multiplier = {"checkout": 1.5, "product": 1.2, "search": 0.9, "home": 0.7}.get(event["page_id"], 1.0)
return min(round(base * multiplier, 4), 1.0)

def run_consumer(max_messages: int = 20) -> None:
consumer = KafkaConsumer(
EVENTS_TOPIC, bootstrap_servers=BROKER,
group_id="ml-feature-pipeline",
value_deserializer=lambda b: json.loads(b.decode("utf-8")),
auto_offset_reset="earliest", enable_auto_commit=True, consumer_timeout_ms=5000,
)
prediction_producer = KafkaProducer(
bootstrap_servers=BROKER,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8"),
)
count = 0
for message in consumer:
if count >= max_messages:
break
event = message.value
score = score_event(event)
prediction = {
"event_id": event["event_id"], "user_id": event["user_id"],
"purchase_probability": score, "scored_at": datetime.utcnow().isoformat(),
}
prediction_producer.send(topic=PREDICTIONS_TOPIC, key=event["user_id"], value=prediction)
count += 1
consumer.close()
prediction_producer.flush()
prediction_producer.close()

if __name__ == "__main__":
ensure_topics()
consumer_thread = threading.Thread(target=run_consumer, args=(20,))
consumer_thread.start()
time.sleep(1.0)
run_producer(num_events=20, delay=0.3)
consumer_thread.join()

Recursos práticos

Veja também