Aller au contenu principal

Apache Kafka

Définition

Apache Kafka est une plateforme de streaming d'événements distribuée développée à l'origine chez LinkedIn et publiée en open source en 2011. Elle est conçue pour gérer des flux d'événements à haut débit, faible latence et durables — messages de log, événements d'activité utilisateur, lectures de capteurs, transactions — sur un cluster de matériel standard. Kafka agit comme un log persistant et reproductible : les producteurs écrivent des événements dans des topics nommés, et les consommateurs lisent ces topics à leur propre rythme, indépendamment les uns des autres.

Dans le contexte ML, Kafka occupe deux rôles critiques. Premièrement, il sert de colonne vertébrale de données pour les pipelines de features en temps réel. Deuxièmement, il est utilisé pour les pipelines de serving de modèles : les demandes de prédiction arrivent comme des messages Kafka, un consommateur applique le modèle et produit des événements de prédiction vers un topic de résultats.

Fonctionnement

Topics et partitions

Un topic est un log nommé, ordonné et immuable d'événements. Les topics sont divisés en partitions — l'unité de parallélisme dans Kafka.

Producteurs

Un producteur est un client qui publie des enregistrements dans un ou plusieurs topics. Lorsqu'une clé est présente, Kafka utilise le hachage cohérent pour router tous les enregistrements avec la même clé vers la même partition.

Consommateurs et groupes de consommateurs

Un consommateur lit les enregistrements d'une ou plusieurs partitions. Les consommateurs s'organisent en groupes de consommateurs : chaque enregistrement est livré exactement à un consommateur au sein d'un groupe.

Quand utiliser / Quand NE PAS utiliser

Utiliser quandÉviter quand
Vous avez besoin de streaming d'événements haut débit et durableVotre cas d'usage est une file d'attente simple avec un taux de messages modeste
Plusieurs groupes de consommateurs indépendants doivent lire le même fluxVous avez besoin d'une logique de routage complexe ou de files de lettres mortes
Vous devez rejouer des événements historiques pour remplir des feature storesLa complexité opérationnelle d'un cluster Kafka n'est pas justifiée
Le calcul de features en temps réel est requisLes messages sont volumineux (> quelques Mo)
L'ordre des événements par entité doit être préservéVotre équipe a besoin d'un message broker entièrement géré

Comparaisons

CritèreApache KafkaRabbitMQ
DébitExtrêmement élevéModéré
LatenceFaible (ms à un chiffre)Très faible — sous-milliseconde
Persistance des messagesMessages conservés pour une période configurable ; entièrement reproductiblesMessages supprimés après acquittement par défaut
Modèle de consommateurBasé sur pullBasé sur push
ComplexitéHauteModérée
Meilleur cas d'usage MLPipelines de features temps réel, event sourcingFiles d'attente pour jobs ML asynchrones

Avantages et inconvénients

AvantagesInconvénients
Débit extrêmement élevé et scalabilité horizontaleComplexité opérationnelle significative
Log durable et reproductibleNon adapté aux charges de travail à faible volume
Découple producteurs et consommateursL'évolution du schéma nécessite un schema registry
Plusieurs groupes de consommateurs peuvent lire le même topic indépendammentRégler les comptes de partitions nécessite de l'expertise
Fort écosystème : Kafka Connect, Kafka Streams, ksqlDBOverhead opérationnel historiquement plus élevé

Exemple de code

import json, time, random, threading
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic

BROKER = "localhost:9092"
EVENTS_TOPIC = "user-events"
PREDICTIONS_TOPIC = "predictions"

def ensure_topics() -> None:
admin = KafkaAdminClient(bootstrap_servers=BROKER)
existing = admin.list_topics()
topics_to_create = [
NewTopic(name=t, num_partitions=3, replication_factor=1)
for t in [EVENTS_TOPIC, PREDICTIONS_TOPIC] if t not in existing
]
if topics_to_create:
admin.create_topics(new_topics=topics_to_create, validate_only=False)
admin.close()

def run_producer(num_events: int = 20, delay: float = 0.5) -> None:
producer = KafkaProducer(
bootstrap_servers=BROKER,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8"),
acks="all", retries=3,
)
for i in range(num_events):
user_id = f"user_{random.randint(1, 5)}"
event = {
"event_id": i, "user_id": user_id,
"page_id": random.choice(["home", "product", "checkout", "search"]),
"dwell_time_sec": round(random.uniform(1.0, 120.0), 2),
"timestamp": datetime.utcnow().isoformat(),
}
producer.send(topic=EVENTS_TOPIC, key=user_id, value=event)
time.sleep(delay)
producer.flush()
producer.close()

def score_event(event: dict) -> float:
base = event["dwell_time_sec"] / 120.0
multiplier = {"checkout": 1.5, "product": 1.2, "search": 0.9, "home": 0.7}.get(event["page_id"], 1.0)
return min(round(base * multiplier, 4), 1.0)

def run_consumer(max_messages: int = 20) -> None:
consumer = KafkaConsumer(
EVENTS_TOPIC, bootstrap_servers=BROKER,
group_id="ml-feature-pipeline",
value_deserializer=lambda b: json.loads(b.decode("utf-8")),
auto_offset_reset="earliest", enable_auto_commit=True, consumer_timeout_ms=5000,
)
prediction_producer = KafkaProducer(
bootstrap_servers=BROKER,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8"),
)
count = 0
for message in consumer:
if count >= max_messages:
break
event = message.value
score = score_event(event)
prediction = {
"event_id": event["event_id"], "user_id": event["user_id"],
"purchase_probability": score, "scored_at": datetime.utcnow().isoformat(),
}
prediction_producer.send(topic=PREDICTIONS_TOPIC, key=event["user_id"], value=prediction)
count += 1
consumer.close()
prediction_producer.flush()
prediction_producer.close()

if __name__ == "__main__":
ensure_topics()
consumer_thread = threading.Thread(target=run_consumer, args=(20,))
consumer_thread.start()
time.sleep(1.0)
run_producer(num_events=20, delay=0.3)
consumer_thread.join()

Ressources pratiques

Voir aussi