跳到主要内容

Apache Spark

定义

Apache Spark 是一个专为大规模数据处理而设计的开源分布式计算引擎。它于 2009 年在 UC Berkeley 的 AMPLab 创建,并捐赠给 Apache 软件基金会,在那里它成为大数据分析的主导框架。Spark 在机器集群上以内存方式执行计算,对于迭代工作负载,其性能远超基于磁盘的前身(如 Hadoop MapReduce)——这一特性使其特别适合机器学习。

Spark 为四种主要工作负载提供统一的 API:批处理、SQL 分析、流处理(Structured Streaming)和机器学习(MLlib)。

工作原理

RDD 和 DataFrame

弹性分布式数据集(RDD)是 Spark 的低级抽象。DataFrame(以及 Scala/Java 中的类型化对应物 Dataset)在 RDD 之上添加了命名模式并公开了类似 SQL 的 API。Catalyst 优化器可以对 DataFrame 查询应用谓词下推、列修剪和 join 重排序。

Spark SQL

Spark SQL 允许使用标准 SQL 语法查询 DataFrame,在同一程序中混合 SQL 和 DataFrame API 调用。

MLlib

MLlib 是 Spark 的分布式机器学习库。Pipeline API(pyspark.ml)反映了 scikit-learn 的设计:Transformer 步骤(缩放器、编码器)和 Estimator 步骤(模型拟合)被链接成可以拟合和序列化的 Pipeline 对象。

Driver 和 Executor 架构

Spark 应用程序有一个 driver 进程和一个或多个 executor 进程。Driver 运行用户程序,构建逻辑计划,并将工作划分为任务

何时使用 / 何时不使用

使用时机避免时机
数据集无法放入单台机器的 RAM(> ~50 GB)数据可以舒适地放在内存中——pandas 或 Polars 会更快
特征工程需要在数十亿行上进行 join 或聚合您的团队缺乏分布式系统和集群管理经验
您需要分布式 ML 训练(MLlib)或超参数搜索Job 启动开销对延迟敏感型任务不可接受
您已经有 Spark 集群(Databricks、EMR、Dataproc)您需要实时亚秒级处理

对比

标准Apache SparkPandas
数据规模拍字节——跨集群分区吉字节——受单机 RAM 限制
执行模型分布式、并行、惰性评估进程内、即时、默认单线程
配置复杂性
小数据性能由于序列化和任务调度开销而较慢快速——最小开销

优缺点

优点缺点
无需更改代码即可扩展到拍字节显著的基础设施和运营复杂性
批处理、SQL 和流式处理的统一引擎JVM 启动和任务调度增加延迟——不适合小型作业
内存处理比基于磁盘的 MapReduce 快得多内存管理(溢出、GC 压力)需要仔细调优
丰富的生态系统:Delta Lake、Iceberg、Hudi、MLflowPySpark 通过 Py4J 序列化 Python UDF——自定义逻辑可能较慢

代码示例

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = (
SparkSession.builder
.appName("MLPipelineExample")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate()
)

raw_df = spark.createDataFrame(
[(1, 25.0, "engineer", 55000.0, 0), (2, 32.0, "manager", 85000.0, 1),
(3, 28.0, "engineer", 62000.0, 0), (4, 45.0, "manager", 110000.0, 1),
(5, 38.0, "analyst", 72000.0, 1), (6, 22.0, "analyst", 48000.0, 0)],
schema=["id", "age", "role", "salary", "high_earner"],
)

feature_df = raw_df.withColumn("log_salary", F.log1p(F.col("salary"))).drop("salary")

role_indexer = StringIndexer(inputCol="role", outputCol="role_idx")
assembler = VectorAssembler(inputCols=["age", "log_salary", "role_idx"], outputCol="raw_features")
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)
lr = LogisticRegression(featuresCol="features", labelCol="high_earner", maxIter=100, regParam=0.01)

pipeline = Pipeline(stages=[role_indexer, assembler, scaler, lr])
train_df, test_df = raw_df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_df)

predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="high_earner")
auc = evaluator.evaluate(predictions)
print(f"=== AUC on test set: {auc:.4f} ===")

model.write().overwrite().save("/tmp/spark_lr_model")
spark.stop()

实用资源

另请参阅