Skip to main content

Pipeline

sklearn Pipeline은 전처리 단계들을 하나의 객체로 연결하여, 학습과 예측 시 동일한 변환을 자동으로 적용합니다. 데이터 누수(Data Leakage)를 방지하고, 코드의 재현성과 유지보수성을 높입니다.

학습 목표

  • Pipeline으로 여러 전처리 단계를 연결할 수 있다
  • ColumnTransformer로 컬럼별 다른 전처리를 적용할 수 있다
  • 데이터 누수의 위험과 방지 방법을 이해한다
  • Pipeline을 교차검증과 함께 사용할 수 있다

왜 중요한가

전처리를 수동으로 수행하면 학습 데이터의 통계값이 테스트 데이터에 누수되기 쉽습니다. Pipeline은 이를 자동으로 방지하고, 전체 전처리 과정을 하나의 객체로 관리할 수 있게 합니다.

데이터 누수란?

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

np.random.seed(42)
n = 500
df = pd.DataFrame({
    'age': np.random.normal(35, 10, n),
    'salary': np.random.normal(4500, 800, n),
    'experience': np.random.uniform(1, 20, n),
    'department': np.random.choice(['개발', '영업', '마케팅'], n),
    'education': np.random.choice(['학사', '석사', '박사'], n),
    'turnover': np.random.choice([0, 1], n, p=[0.8, 0.2])
})

# 결측치 삽입
df.loc[np.random.choice(n, 30), 'salary'] = np.nan
df.loc[np.random.choice(n, 20), 'age'] = np.nan
스케일링, 인코딩, 결측치 대체를 학습/테스트 분할 전에 전체 데이터에 적용하면 데이터 누수가 발생합니다. 테스트 데이터의 정보가 학습에 포함되어 성능이 실제보다 높게 측정됩니다.

기본 Pipeline

# 수치형 파이프라인: 결측치 대체 → 스케일링
numeric_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# 범주형 파이프라인: 결측치 대체 → 원핫인코딩
categorical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(drop='first', sparse_output=False))
])

print("수치형 파이프라인 단계:", [name for name, _ in numeric_pipeline.steps])
print("범주형 파이프라인 단계:", [name for name, _ in categorical_pipeline.steps])

ColumnTransformer — 컬럼별 전처리

# 컬럼 유형 정의
numeric_features = ['age', 'salary', 'experience']
categorical_features = ['department', 'education']

# ColumnTransformer: 컬럼별 다른 전처리 적용
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_pipeline, numeric_features),
        ('cat', categorical_pipeline, categorical_features)
    ],
    remainder='drop'  # 지정하지 않은 컬럼은 제거
)

# 전처리 + 모델을 하나의 Pipeline으로
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(random_state=42))
])

print("전체 파이프라인 구조:")
print(full_pipeline)

Pipeline으로 학습과 예측

# 데이터 분할
X = df.drop('turnover', axis=1)
y = df['turnover']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# 학습: fit()은 학습 데이터의 통계만 사용
full_pipeline.fit(X_train, y_train)

# 예측: 학습 데이터의 통계로 변환 후 예측
y_pred = full_pipeline.predict(X_test)
accuracy = full_pipeline.score(X_test, y_test)
print(f"정확도: {accuracy:.3f}")

# 전처리된 데이터 확인
X_transformed = preprocessor.transform(X_test)
feature_names = (numeric_features +
                 preprocessor.named_transformers_['cat']
                 .named_steps['encoder']
                 .get_feature_names_out(categorical_features).tolist())
print(f"\n전처리 후 피처: {feature_names}")
print(f"전처리 후 shape: {X_transformed.shape}")

Pipeline + 교차검증

# 교차검증: 각 fold에서 독립적으로 fit/transform
scores = cross_val_score(full_pipeline, X, y, cv=5, scoring='accuracy')
print(f"교차검증 정확도: {scores.mean():.3f} (+/- {scores.std():.3f})")
print(f"개별 fold: {scores.round(3)}")
Pipeline과 교차검증을 함께 사용하면, 각 fold에서 학습 데이터만으로 전처리 통계를 계산합니다. 이것이 데이터 누수를 완벽하게 방지하는 올바른 방법입니다.

Pipeline + GridSearchCV

from sklearn.model_selection import GridSearchCV

# 파이프라인 내 하이퍼파라미터 탐색
# 파라미터 이름: 'step_name__parameter_name'
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'classifier__C': [0.1, 1.0, 10.0],
    'classifier__max_iter': [200]
}

grid_search = GridSearchCV(
    full_pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)
grid_search.fit(X_train, y_train)

print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최적 정확도: {grid_search.best_score_:.3f}")
print(f"테스트 정확도: {grid_search.score(X_test, y_test):.3f}")

모델 교체가 쉬운 Pipeline

# 모델만 교체하여 비교
from sklearn.svm import SVC
from sklearn.ensemble import GradientBoostingClassifier

models = {
    'Logistic Regression': LogisticRegression(random_state=42, max_iter=200),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42)
}

results = {}
for name, model in models.items():
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipeline, X, y, cv=5, scoring='accuracy')
    results[name] = scores.mean()
    print(f"{name}: {scores.mean():.3f} (+/- {scores.std():.3f})")

# 최적 모델
best_model = max(results, key=results.get)
print(f"\n최적 모델: {best_model} ({results[best_model]:.3f})")

Pipeline 저장과 로드

import joblib

# Pipeline 저장
joblib.dump(full_pipeline, 'trained_pipeline.joblib')

# Pipeline 로드
loaded_pipeline = joblib.load('trained_pipeline.joblib')

# 새 데이터에 바로 예측
new_data = pd.DataFrame({
    'age': [30], 'salary': [5000], 'experience': [5],
    'department': ['개발'], 'education': ['석사']
})
prediction = loaded_pipeline.predict(new_data)
print(f"예측: {'이직' if prediction[0] == 1 else '잔류'}")

AI/ML에서의 활용

  • 프로덕션 배포: Pipeline 하나로 전처리 + 예측을 완결하여 서빙이 간단합니다
  • 재현성: 동일한 Pipeline으로 동일한 결과를 보장합니다
  • 실험 관리: 모델과 전처리를 함께 버전 관리할 수 있습니다
  • AutoML 통합: Pipeline 구조가 AutoML 도구와 호환됩니다
make_pipeline()은 단계 이름을 자동으로 생성하고, Pipeline()은 이름을 직접 지정합니다. GridSearchCV에서 파라미터를 지정할 때 이름이 필요하므로, 명시적으로 Pipeline을 사용하는 것이 권장됩니다.

체크리스트

  • Pipeline으로 여러 전처리 단계를 연결할 수 있다
  • ColumnTransformer로 컬럼별 다른 전처리를 적용할 수 있다
  • 데이터 누수의 위험과 Pipeline의 방지 원리를 이해한다
  • Pipeline과 교차검증/GridSearchCV를 함께 사용할 수 있다
  • 학습된 Pipeline을 저장하고 로드할 수 있다

다음 문서