MLlib Basics
/tldr: Pipeline + CrossValidator = 95% of all ML jobs
Pipeline
Feature Eng
CrossValidator
2025
2025 LAW
Never write MLlib without Pipeline + CrossValidator
Everything else is amateur hour.
The Golden Pipeline Pattern
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# 1. Feature engineering
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_vec") for c in cat_cols]
assembler = VectorAssembler(inputCols=num_cols + [c+"_vec" for c in cat_cols], outputCol="features")
# 2. Model
lr = LogisticRegression(featuresCol="features", labelCol="label")
# 3. Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])
# 4. Hyperparameter tuning
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()
cv = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=5,
parallelism=4)
cvModel = cv.fit(train_df)
best_model = cvModel.bestModel
Must-Know Transformers (2025)
StringIndexer
String → Int (with "unknown" handling)
OneHotEncoder
Index → Sparse Vector
VectorAssembler
All cols → "features" vector
StandardScaler
withStd=True, withMean=True
Bucketizer / QuantileDiscretizer
Continuous → bins
SQLTransformer
Raw SQL feature eng
Evaluation — Never Guess
# Binary
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
# Multiclass
evaluator = MulticlassClassificationEvaluator(metricName="f1")
# Regression
evaluator = RegressionEvaluator(metricName="rmse")
score = evaluator.evaluate(predictions)
Best Metrics 2025
- Binary → areaUnderROC / areaUnderPR
- Multiclass → weightedF1
- Regression → RMSE + R²
Production: Save & Load
# Save entire pipeline (transforms + model)
best_model.write().overwrite().save("/models/churn_pipeline")
# Load in production
loaded_pipeline = PipelineModel.load("/models/churn_pipeline")
predictions = loaded_pipeline.transform(new_data)
FINAL ANSWER:
Pipeline + CrossValidator
= Production-grade ML in Spark
No exceptions. Ever.