Przejdź do głównej zawartości

pipeline

czym są pipeline'y? ⭐

  • Pipeline'y to narzędzia w scikit-learn, które łączą wiele kroków przetwarzania danych w jeden obiekt.
  • Automatyzują proces transformacji danych i trenowania modelu.
  • Zapewniają, że te same transformacje są zastosowane do danych treningowych i testowych.
  • Ułatwiają walidację krzyżową i tuning hiperparametrów.
  • Zapobiegają data leakage (przeciekaniu danych).

zalety używania pipeline'ów ⭐

  1. Automatyzacja: Wszystkie kroki są wykonywane automatycznie.
  2. Spójność: Te same transformacje dla danych treningowych i testowych.
  3. Walidacja: Łatwa walidacja krzyżowa całego procesu.
  4. Tuning: Możliwość tuningu hiperparametrów wszystkich kroków.
  5. Reprodukowalność: Łatwiejsze odtwarzanie eksperymentów.

przykładowy kod (scikit-learn)

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np

# Generowanie syntetycznych danych
np.random.seed(42)
n_samples = 1000

# Dane numeryczne
age = np.random.normal(45, 15, n_samples)
income = np.random.normal(50000, 20000, n_samples)
credit_score = np.random.normal(700, 100, n_samples)

# Dane kategoryczne
education = np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n_samples)
employment = np.random.choice(['Full-time', 'Part-time', 'Unemployed'], n_samples)

# Dodanie brakujących wartości
age[np.random.choice(n_samples, 50, replace=False)] = np.nan
income[np.random.choice(n_samples, 30, replace=False)] = np.nan

# Target
purchase = (age/100 + income/100000 + credit_score/1000) / 3 + np.random.normal(0, 0.1, n_samples) > 0.5

# Tworzenie DataFrame
data = pd.DataFrame({
'age': age,
'income': income,
'credit_score': credit_score,
'education': education,
'employment': employment,
'purchase': purchase
})

print("Przykładowe dane:")
print(data.head())
print(f"\nBrakujące wartości:\n{data.isnull().sum()}")

tworzenie pipeline'u

# Podział danych
X = data.drop('purchase', axis=1)
y = data['purchase']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Definicja kolumn numerycznych i kategorycznych
numeric_features = ['age', 'income', 'credit_score']
categorical_features = ['education', 'employment']

# Preprocessing dla kolumn numerycznych
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])

# Preprocessing dla kolumn kategorycznych
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(drop='first', sparse=False))
])

# Łączenie transformacji
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)

# Pełny pipeline
pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(random_state=42))
])

print("Pipeline:")
print(pipeline)

trenowanie i ocena pipeline'u

# Trenowanie pipeline'u
pipeline.fit(X_train, y_train)

# Przewidywanie
y_pred = pipeline.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.3f}")

# Walidacja krzyżowa
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV score: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")

# Dostęp do kroków pipeline'u
print(f"\nNazwy kroków: {pipeline.named_steps.keys()}")
print(f"Preprocessor: {pipeline.named_steps['preprocessor']}")
print(f"Classifier: {pipeline.named_steps['classifier']}")

tuning hiperparametrów z pipeline'em

from sklearn.model_selection import GridSearchCV

# Definicja przestrzeni parametrów
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [5, 10, None]
}

# Grid search z pipeline'em
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1
)

grid_search.fit(X_train, y_train)

print(f"Najlepsze parametry: {grid_search.best_params_}")
print(f"Najlepszy score: {grid_search.best_score_:.3f}")

# Ocena najlepszego modelu
best_pipeline = grid_search.best_estimator_
y_pred_best = best_pipeline.predict(X_test)
print(f"Test accuracy (best model): {accuracy_score(y_test, y_pred_best):.3f}")

praktyczne ćwiczenia

  1. Stwórz pipeline z różnymi modelami - przetestuj różne klasyfikatory w pipeline'ie.

  2. Dodaj więcej kroków preprocessingu - dodaj feature selection, PCA, lub inne transformacje.

  3. Eksperymentuj z różnymi strategiami imputacji - porównaj mean, median, constant.

  4. Użyj pipeline'u z regresją - dostosuj pipeline do problemu regresyjnego.

  5. Zapisz i załaduj pipeline - użyj pickle do zapisania wytrenowanego pipeline'u.

dobre praktyki

  • Kolejność kroków: Najpierw preprocessing, potem model.
  • Walidacja: Zawsze używaj cross-validation z pipeline'em.
  • Nazewnictwo: Używaj opisowych nazw dla kroków.
  • Testowanie: Testuj pipeline na małym zbiorze danych przed pełnym trenowaniem.

zaawansowane techniki

Feature Union

from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest

# Łączenie różnych transformacji cech
feature_union = FeatureUnion([
('pca', PCA(n_components=2)),
('select', SelectKBest(k=3))
])

Custom Transformers

from sklearn.base import BaseEstimator, TransformerMixin

class CustomTransformer(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self

def transform(self, X):
# Twoja logika transformacji
return X

polecane źródła