scientific-skills/scikit-learn/references/pipelines_and_composition.md
Pipelines chain multiple processing steps into a single estimator, preventing data leakage and simplifying code. They enable reproducible workflows and seamless integration with cross-validation and hyperparameter tuning.
Pipeline (sklearn.pipeline.Pipeline)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
pipeline = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('classifier', LogisticRegression())
])
# Fit the entire pipeline
pipeline.fit(X_train, y_train)
# Predict using the pipeline
y_pred = pipeline.predict(X_test)
y_proba = pipeline.predict_proba(X_test)
make_pipeline
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
pipeline = make_pipeline(
StandardScaler(),
PCA(n_components=10),
SVC(kernel='rbf')
)
pipeline.fit(X_train, y_train)
# By index
scaler = pipeline.steps[0][1]
# By name
scaler = pipeline.named_steps['scaler']
pca = pipeline.named_steps['pca']
# Using indexing syntax
scaler = pipeline['scaler']
pca = pipeline['pca']
# Get all step names
print(pipeline.named_steps.keys())
# Set parameters using double underscore notation
pipeline.set_params(
pca__n_components=15,
classifier__C=0.1
)
# Or during creation
pipeline = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('classifier', LogisticRegression(C=1.0))
])
# Access fitted attributes
pca_components = pipeline.named_steps['pca'].components_
explained_variance = pipeline.named_steps['pca'].explained_variance_ratio_
# Access intermediate transformations
X_scaled = pipeline.named_steps['scaler'].transform(X_test)
X_pca = pipeline.named_steps['pca'].transform(X_scaled)
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', SVC())
])
param_grid = {
'classifier__C': [0.1, 1, 10, 100],
'classifier__gamma': ['scale', 'auto', 0.001, 0.01],
'classifier__kernel': ['rbf', 'linear']
}
grid_search = GridSearchCV(pipeline, param_grid, cv=5, n_jobs=-1)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_:.3f}")
param_grid = {
# PCA parameters
'pca__n_components': [5, 10, 20, 50],
# Classifier parameters
'classifier__C': [0.1, 1, 10],
'classifier__kernel': ['rbf', 'linear']
}
grid_search = GridSearchCV(pipeline, param_grid, cv=5)
grid_search.fit(X_train, y_train)
ColumnTransformer (sklearn.compose.ColumnTransformer)
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
# Define column groups
numeric_features = ['age', 'income', 'hours_per_week']
categorical_features = ['gender', 'occupation', 'native_country']
# Create preprocessor
preprocessor = ColumnTransformer(
transformers=[
('num', StandardScaler(), numeric_features),
('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
],
remainder='passthrough' # Keep other columns unchanged
)
X_transformed = preprocessor.fit_transform(X)
from sklearn.pipeline import Pipeline
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# Full pipeline with model
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', LogisticRegression())
])
full_pipeline.fit(X_train, y_train)
from sklearn.compose import make_column_transformer
preprocessor = make_column_transformer(
(StandardScaler(), numeric_features),
(OneHotEncoder(), categorical_features),
remainder='passthrough'
)
# By column names (if X is DataFrame)
preprocessor = ColumnTransformer([
('num', StandardScaler(), ['age', 'income']),
('cat', OneHotEncoder(), ['gender', 'occupation'])
])
# By column indices
preprocessor = ColumnTransformer([
('num', StandardScaler(), [0, 1, 2]),
('cat', OneHotEncoder(), [3, 4])
])
# By boolean mask
numeric_mask = [True, True, True, False, False]
categorical_mask = [False, False, False, True, True]
preprocessor = ColumnTransformer([
('num', StandardScaler(), numeric_mask),
('cat', OneHotEncoder(), categorical_mask)
])
# By callable
def is_numeric(X):
return X.select_dtypes(include=['number']).columns.tolist()
preprocessor = ColumnTransformer([
('num', StandardScaler(), is_numeric)
])
# Get output feature names
feature_names = preprocessor.get_feature_names_out()
# After fitting
preprocessor.fit(X_train)
output_features = preprocessor.get_feature_names_out()
print(f"Input features: {X_train.columns.tolist()}")
print(f"Output features: {output_features}")
# Drop unspecified columns (default)
preprocessor = ColumnTransformer([...], remainder='drop')
# Pass through unchanged
preprocessor = ColumnTransformer([...], remainder='passthrough')
# Apply transformer to remaining columns
preprocessor = ColumnTransformer([...], remainder=StandardScaler())
FeatureUnion (sklearn.pipeline.FeatureUnion)
from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
# Combine PCA and feature selection
feature_union = FeatureUnion([
('pca', PCA(n_components=10)),
('select_best', SelectKBest(k=20))
])
X_combined = feature_union.fit_transform(X_train, y_train)
print(f"Combined features: {X_combined.shape[1]}") # 10 + 20 = 30
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA, TruncatedSVD
# Create feature union
feature_union = FeatureUnion([
('pca', PCA(n_components=10)),
('svd', TruncatedSVD(n_components=10))
])
# Full pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('features', feature_union),
('classifier', LogisticRegression())
])
pipeline.fit(X_train, y_train)
# Apply weights to transformers
feature_union = FeatureUnion(
transformer_list=[
('pca', PCA(n_components=10)),
('select_best', SelectKBest(k=20))
],
transformer_weights={
'pca': 2.0, # Give PCA features double weight
'select_best': 1.0
}
)
from sklearn.pipeline import Pipeline
from tempfile import mkdtemp
from shutil import rmtree
# Cache intermediate results
cachedir = mkdtemp()
pipeline = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=50)),
('classifier', LogisticRegression())
], memory=cachedir)
pipeline.fit(X_train, y_train)
# Clean up cache
rmtree(cachedir)
from sklearn.pipeline import Pipeline
# Inner pipeline for text processing
text_pipeline = Pipeline([
('vect', CountVectorizer()),
('tfidf', TfidfTransformer())
])
# Outer pipeline combining text and numeric features
full_pipeline = Pipeline([
('features', FeatureUnion([
('text', text_pipeline),
('numeric', StandardScaler())
])),
('classifier', LogisticRegression())
])
from sklearn.base import BaseEstimator, TransformerMixin
class TextLengthExtractor(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X):
return [[len(text)] for text in X]
pipeline = Pipeline([
('length', TextLengthExtractor()),
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
# Get sub-pipeline
sub_pipeline = pipeline[:2] # First two steps
# Get specific range
middle_steps = pipeline[1:3]
TransformedTargetRegressor
from sklearn.compose import TransformedTargetRegressor
from sklearn.preprocessing import QuantileTransformer
from sklearn.linear_model import LinearRegression
model = TransformedTargetRegressor(
regressor=LinearRegression(),
transformer=QuantileTransformer(output_distribution='normal')
)
model.fit(X_train, y_train)
y_pred = model.predict(X_test) # Automatically inverse-transformed
import numpy as np
model = TransformedTargetRegressor(
regressor=LinearRegression(),
func=np.log1p,
inverse_func=np.expm1
)
model.fit(X_train, y_train)
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
# Define feature types
numeric_features = ['age', 'income', 'hours_per_week']
categorical_features = ['gender', 'occupation', 'education']
# Numeric preprocessing pipeline
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Categorical preprocessing pipeline
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Combine preprocessing
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# Full pipeline
pipeline = Pipeline([
('preprocessor', preprocessor),
('pca', PCA(n_components=0.95)), # Keep 95% variance
('classifier', RandomForestClassifier(random_state=42))
])
# Hyperparameter tuning
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'pca__n_components': [0.90, 0.95, 0.99],
'classifier__n_estimators': [100, 200],
'classifier__max_depth': [10, 20, None]
}
grid_search = GridSearchCV(
pipeline, param_grid,
cv=5, scoring='accuracy',
n_jobs=-1, verbose=1
)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.3f}")
print(f"Test score: {grid_search.score(X_test, y_test):.3f}")
# Make predictions
best_pipeline = grid_search.best_estimator_
y_pred = best_pipeline.predict(X_test)
y_proba = best_pipeline.predict_proba(X_test)
# In Jupyter notebooks, pipelines display as diagrams
from sklearn import set_config
set_config(display='diagram')
pipeline # Displays visual diagram
# Print pipeline structure
print(pipeline)
# Get detailed parameters
print(pipeline.get_params())
# Good: Preprocessing inside pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', LogisticRegression())
])
pipeline.fit(X_train, y_train)
# Bad: Preprocessing outside pipeline (can cause leakage)
X_train_scaled = StandardScaler().fit_transform(X_train)
model = LogisticRegression()
model.fit(X_train_scaled, y_train)
Always use ColumnTransformer when you have both numerical and categorical features:
preprocessor = ColumnTransformer([
('num', StandardScaler(), numeric_features),
('cat', OneHotEncoder(), categorical_features)
])
# Good
pipeline = Pipeline([
('imputer', SimpleImputer()),
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('rf_classifier', RandomForestClassifier())
])
# Bad
pipeline = Pipeline([
('step1', SimpleImputer()),
('step2', StandardScaler()),
('step3', PCA(n_components=10)),
('step4', RandomForestClassifier())
])
For repeated fitting (e.g., during grid search), cache expensive steps:
from tempfile import mkdtemp
cachedir = mkdtemp()
pipeline = Pipeline([
('expensive_preprocessing', ExpensiveTransformer()),
('classifier', LogisticRegression())
], memory=cachedir)
Ensure all steps are compatible:
pipeline.set_output(transform='pandas')
X_transformed = pipeline.transform(X) # Returns DataFrame