Zum Hauptinhalt springen

Apache Spark

Definition

Apache Spark ist eine Open-Source-Engine für verteiltes Computing, die für die groß angelegte Datenverarbeitung entwickelt wurde. Sie wurde 2009 am AMPLab der UC Berkeley erstellt und an die Apache Software Foundation gespendet, wo sie zum dominierenden Framework für Big-Data-Analysen wurde. Spark führt Berechnungen im Speicher über ein Cluster von Maschinen aus, was disk-basierte Vorgänger wie Hadoop MapReduce für iterative Workloads deutlich übertrifft — eine Eigenschaft, die es besonders gut für maschinelles Lernen geeignet macht.

Spark bietet eine einheitliche API über vier primäre Workloads: Batch-Verarbeitung, SQL-Analysen, Streaming (Structured Streaming) und maschinelles Lernen (MLlib). Diese Vereinheitlichung bedeutet, dass Teams ein einzelnes Cluster und ein einzelnes Programmiermodell für die gesamte ML-Datenpipeline verwenden können — von der Rohdaten-Ingestion und Feature-Engineering im Petabyte-Maßstab bis hin zum verteilten Modelltraining mit MLlib.

Funktionsweise

RDDs und DataFrames

Resilient Distributed Datasets (RDDs) sind Sparks Low-Level-Abstraktion: unveränderliche, fehlertolerante, partitionierte Datensatzsammlungen, die über ein Cluster verteilt sind. DataFrames fügen ein benanntes Schema auf RDDs hinzu und exponieren eine SQL-ähnliche API. Der Catalyst-Optimizer kann Predicate-Pushdown, Column-Pruning und Join-Reordering anwenden.

Spark SQL

Spark SQL ermöglicht es, DataFrames mit standardmäßiger SQL-Syntax abzufragen und SQL- und DataFrame-API-Aufrufe im selben Programm zu mischen.

MLlib

MLlib ist Sparks verteilte Machine-Learning-Bibliothek. Die Pipeline-API (pyspark.ml) spiegelt das Design von scikit-learn wider: Transformer-Schritte und Estimator-Schritte werden in ein Pipeline-Objekt gekettet.

Treiber- und Executor-Architektur

Eine Spark-Anwendung hat einen Treiber-Prozess und einen oder mehrere Executor-Prozesse. Der Treiber führt das Programm des Benutzers aus, konstruiert den logischen Plan und teilt die Arbeit in Tasks auf.

Wann verwenden / Wann NICHT verwenden

Verwenden wennVermeiden wenn
Datensatz passt nicht in den RAM einer einzelnen Maschine (> ~50 GB)Daten passen komfortabel in den Speicher — pandas oder Polars sind schneller
Feature Engineering erfordert Joins oder Aggregationen über Milliarden von ZeilenIhrem Team fehlt Erfahrung mit verteilten Systemen
Sie verteiltes ML-Training (MLlib) oder Hyperparameter-Suche benötigenJob-Startup-Overhead für latenzsensitive Aufgaben unakzeptabel ist
Sie bereits ein Spark-Cluster haben (Databricks, EMR, Dataproc)Sie Echtzeit-Sub-Sekunden-Verarbeitung benötigen

Vergleiche

KriteriumApache SparkPandas
DatenskalierungPetabytes — partitioniert über ein ClusterGigabytes — begrenzt durch RAM einer Maschine
AusführungsmodellVerteilt, parallel, verzögerte AuswertungIn-Process, eager, standardmäßig single-threaded
Setup-KomplexitätHochNiedrig
Leistung bei kleinen DatenLangsam aufgrund Serialisierungs-OverheadSchnell
API-VertrautheitPySpark ähnelt pandas, aber mit verteilter SemantikWeit bekannt

Vor- und Nachteile

VorteileNachteile
Skaliert auf Petabytes ohne Code-ÄnderungenErhebliche Infrastruktur- und operative Komplexität
Einheitliche Engine für Batch, SQL und StreamingJVM-Startup und Task-Scheduling fügen Latenz hinzu
In-Memory-Verarbeitung deutlich schneller als disk-basiertes MapReduceSpeicherverwaltung erfordert sorgfältige Abstimmung
Reiches Ökosystem: Delta Lake, Iceberg, Hudi, MLflowPySpark serialisiert Python-UDFs über Py4J

Codebeispiel

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = (
SparkSession.builder
.appName("MLPipelineExample")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate()
)

raw_df = spark.createDataFrame(
[(1, 25.0, "engineer", 55000.0, 0), (2, 32.0, "manager", 85000.0, 1),
(3, 28.0, "engineer", 62000.0, 0), (4, 45.0, "manager", 110000.0, 1),
(5, 38.0, "analyst", 72000.0, 1), (6, 22.0, "analyst", 48000.0, 0)],
schema=["id", "age", "role", "salary", "high_earner"],
)

feature_df = raw_df.withColumn("log_salary", F.log1p(F.col("salary"))).drop("salary")

role_indexer = StringIndexer(inputCol="role", outputCol="role_idx")
assembler = VectorAssembler(inputCols=["age", "log_salary", "role_idx"], outputCol="raw_features")
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)
lr = LogisticRegression(featuresCol="features", labelCol="high_earner", maxIter=100, regParam=0.01)

pipeline = Pipeline(stages=[role_indexer, assembler, scaler, lr])
train_df, test_df = raw_df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_df)

predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="high_earner")
auc = evaluator.evaluate(predictions)
print(f"=== AUC on test set: {auc:.4f} ===")

model.write().overwrite().save("/tmp/spark_lr_model")
spark.stop()

Praktische Ressourcen

Siehe auch