Pipeline
sklearn Pipeline은 전처리 단계들을 하나의 객체로 연결하여, 학습과 예측 시 동일한 변환을 자동으로 적용합니다. 데이터 누수(Data Leakage)를 방지하고, 코드의 재현성과 유지보수성을 높입니다.
학습 목표
Pipeline으로 여러 전처리 단계를 연결할 수 있다
ColumnTransformer로 컬럼별 다른 전처리를 적용할 수 있다
데이터 누수의 위험과 방지 방법을 이해한다
Pipeline을 교차검증과 함께 사용할 수 있다
왜 중요한가
전처리를 수동으로 수행하면 학습 데이터의 통계값이 테스트 데이터에 누수되기 쉽습니다. Pipeline은 이를 자동으로 방지하고, 전체 전처리 과정을 하나의 객체로 관리할 수 있게 합니다.
데이터 누수란?
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
np.random.seed( 42 )
n = 500
df = pd.DataFrame({
'age' : np.random.normal( 35 , 10 , n),
'salary' : np.random.normal( 4500 , 800 , n),
'experience' : np.random.uniform( 1 , 20 , n),
'department' : np.random.choice([ '개발' , '영업' , '마케팅' ], n),
'education' : np.random.choice([ '학사' , '석사' , '박사' ], n),
'turnover' : np.random.choice([ 0 , 1 ], n, p = [ 0.8 , 0.2 ])
})
# 결측치 삽입
df.loc[np.random.choice(n, 30 ), 'salary' ] = np.nan
df.loc[np.random.choice(n, 20 ), 'age' ] = np.nan
스케일링, 인코딩, 결측치 대체를 학습/테스트 분할 전에 전체 데이터에 적용하면 데이터 누수가 발생합니다. 테스트 데이터의 정보가 학습에 포함되어 성능이 실제보다 높게 측정됩니다.
기본 Pipeline
# 수치형 파이프라인: 결측치 대체 → 스케일링
numeric_pipeline = Pipeline([
( 'imputer' , SimpleImputer( strategy = 'median' )),
( 'scaler' , StandardScaler())
])
# 범주형 파이프라인: 결측치 대체 → 원핫인코딩
categorical_pipeline = Pipeline([
( 'imputer' , SimpleImputer( strategy = 'most_frequent' )),
( 'encoder' , OneHotEncoder( drop = 'first' , sparse_output = False ))
])
print ( "수치형 파이프라인 단계:" , [name for name, _ in numeric_pipeline.steps])
print ( "범주형 파이프라인 단계:" , [name for name, _ in categorical_pipeline.steps])
# 컬럼 유형 정의
numeric_features = [ 'age' , 'salary' , 'experience' ]
categorical_features = [ 'department' , 'education' ]
# ColumnTransformer: 컬럼별 다른 전처리 적용
preprocessor = ColumnTransformer(
transformers = [
( 'num' , numeric_pipeline, numeric_features),
( 'cat' , categorical_pipeline, categorical_features)
],
remainder = 'drop' # 지정하지 않은 컬럼은 제거
)
# 전처리 + 모델을 하나의 Pipeline으로
full_pipeline = Pipeline([
( 'preprocessor' , preprocessor),
( 'classifier' , LogisticRegression( random_state = 42 ))
])
print ( "전체 파이프라인 구조:" )
print (full_pipeline)
Pipeline으로 학습과 예측
# 데이터 분할
X = df.drop( 'turnover' , axis = 1 )
y = df[ 'turnover' ]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size = 0.2 , random_state = 42 , stratify = y
)
# 학습: fit()은 학습 데이터의 통계만 사용
full_pipeline.fit(X_train, y_train)
# 예측: 학습 데이터의 통계로 변환 후 예측
y_pred = full_pipeline.predict(X_test)
accuracy = full_pipeline.score(X_test, y_test)
print ( f "정확도: { accuracy :.3f} " )
# 전처리된 데이터 확인
X_transformed = preprocessor.transform(X_test)
feature_names = (numeric_features +
preprocessor.named_transformers_[ 'cat' ]
.named_steps[ 'encoder' ]
.get_feature_names_out(categorical_features).tolist())
print ( f " \n 전처리 후 피처: { feature_names } " )
print ( f "전처리 후 shape: { X_transformed.shape } " )
Pipeline + 교차검증
# 교차검증: 각 fold에서 독립적으로 fit/transform
scores = cross_val_score(full_pipeline, X, y, cv = 5 , scoring = 'accuracy' )
print ( f "교차검증 정확도: { scores.mean() :.3f} (+/- { scores.std() :.3f} )" )
print ( f "개별 fold: { scores.round( 3 ) } " )
Pipeline과 교차검증을 함께 사용하면, 각 fold에서 학습 데이터만으로 전처리 통계를 계산합니다. 이것이 데이터 누수를 완벽하게 방지하는 올바른 방법입니다.
Pipeline + GridSearchCV
from sklearn.model_selection import GridSearchCV
# 파이프라인 내 하이퍼파라미터 탐색
# 파라미터 이름: 'step_name__parameter_name'
param_grid = {
'preprocessor__num__imputer__strategy' : [ 'mean' , 'median' ],
'classifier__C' : [ 0.1 , 1.0 , 10.0 ],
'classifier__max_iter' : [ 200 ]
}
grid_search = GridSearchCV(
full_pipeline,
param_grid,
cv = 5 ,
scoring = 'accuracy' ,
n_jobs =- 1
)
grid_search.fit(X_train, y_train)
print ( f "최적 파라미터: { grid_search.best_params_ } " )
print ( f "최적 정확도: { grid_search.best_score_ :.3f} " )
print ( f "테스트 정확도: { grid_search.score(X_test, y_test) :.3f} " )
모델 교체가 쉬운 Pipeline
# 모델만 교체하여 비교
from sklearn.svm import SVC
from sklearn.ensemble import GradientBoostingClassifier
models = {
'Logistic Regression' : LogisticRegression( random_state = 42 , max_iter = 200 ),
'Random Forest' : RandomForestClassifier( n_estimators = 100 , random_state = 42 ),
'Gradient Boosting' : GradientBoostingClassifier( n_estimators = 100 , random_state = 42 )
}
results = {}
for name, model in models.items():
pipeline = Pipeline([
( 'preprocessor' , preprocessor),
( 'classifier' , model)
])
scores = cross_val_score(pipeline, X, y, cv = 5 , scoring = 'accuracy' )
results[name] = scores.mean()
print ( f " { name } : { scores.mean() :.3f} (+/- { scores.std() :.3f} )" )
# 최적 모델
best_model = max (results, key = results.get)
print ( f " \n 최적 모델: { best_model } ( { results[best_model] :.3f} )" )
Pipeline 저장과 로드
import joblib
# Pipeline 저장
joblib.dump(full_pipeline, 'trained_pipeline.joblib' )
# Pipeline 로드
loaded_pipeline = joblib.load( 'trained_pipeline.joblib' )
# 새 데이터에 바로 예측
new_data = pd.DataFrame({
'age' : [ 30 ], 'salary' : [ 5000 ], 'experience' : [ 5 ],
'department' : [ '개발' ], 'education' : [ '석사' ]
})
prediction = loaded_pipeline.predict(new_data)
print ( f "예측: { '이직' if prediction[ 0 ] == 1 else '잔류' } " )
AI/ML에서의 활용
프로덕션 배포 : Pipeline 하나로 전처리 + 예측을 완결하여 서빙이 간단합니다
재현성 : 동일한 Pipeline으로 동일한 결과를 보장합니다
실험 관리 : 모델과 전처리를 함께 버전 관리할 수 있습니다
AutoML 통합 : Pipeline 구조가 AutoML 도구와 호환됩니다
make_pipeline과 Pipeline의 차이는?
make_pipeline()은 단계 이름을 자동으로 생성하고, Pipeline()은 이름을 직접 지정합니다. GridSearchCV에서 파라미터를 지정할 때 이름이 필요하므로, 명시적으로 Pipeline을 사용하는 것이 권장됩니다.
체크리스트
다음 문서