SQLAlchemy 기초
SQLAlchemy는 Python의 대표적인 데이터베이스 도구입니다. 데이터베이스 연결을 관리하고, SQL을 안전하게 실행하며, 필요시 ORM(Object-Relational Mapping)으로 Python 객체와 데이터베이스 테이블을 매핑합니다.
학습 목표
SQLAlchemy 엔진을 생성하고 연결을 관리할 수 있다
Core API로 SQL 쿼리를 안전하게 실행할 수 있다
트랜잭션을 관리하고 에러 처리를 할 수 있다
ORM의 기본 개념을 이해한다
왜 중요한가
pandas의 read_sql()도 내부적으로 SQLAlchemy 엔진을 사용합니다. SQLAlchemy를 직접 사용하면 연결 풀링, 트랜잭션 관리, 다양한 데이터베이스 지원 등 더 세밀한 제어가 가능합니다.
엔진 생성
from sqlalchemy import create_engine, text, inspect
# SQLite (개발/테스트용)
engine = create_engine( 'sqlite:///analysis.db' , echo = False )
# PostgreSQL
# engine = create_engine(
# 'postgresql://user:password@localhost:5432/mydb',
# pool_size=5, # 연결 풀 크기
# max_overflow=10, # 추가 연결 허용 수
# pool_timeout=30, # 연결 대기 시간(초)
# pool_recycle=1800 # 연결 재사용 주기(초)
# )
# MySQL
# engine = create_engine(
# 'mysql+pymysql://user:password@localhost:3306/mydb',
# pool_size=5
# )
print ( f "엔진: { engine } " )
print ( f "데이터베이스: { engine.url.database } " )
연결 문자열 형식
데이터베이스 연결 문자열 드라이버 SQLite sqlite:///파일경로내장 PostgreSQL postgresql://user:pass@host:5432/dbpsycopg2 MySQL mysql+pymysql://user:pass@host:3306/dbPyMySQL SQL Server mssql+pyodbc://user:pass@host/dbpyodbc
데이터베이스 탐색
import pandas as pd
import numpy as np
# 예시 데이터 생성
np.random.seed( 42 )
employees = pd.DataFrame({
'id' : range ( 1 , 101 ),
'name' : [ f '직원_ { i } ' for i in range ( 1 , 101 )],
'department' : np.random.choice([ '개발' , '영업' , '마케팅' ], 100 ),
'salary' : np.random.normal( 4500 , 800 , 100 ).round( 0 ),
'hire_date' : pd.date_range( '2018-01-01' , periods = 100 , freq = '25D' )
})
employees.to_sql( 'employees' , engine, if_exists = 'replace' , index = False )
# 테이블 목록 확인
inspector = inspect(engine)
tables = inspector.get_table_names()
print ( f "테이블: { tables } " )
# 컬럼 정보 확인
for col in inspector.get_columns( 'employees' ):
print ( f " { col[ 'name' ] } : { col[ 'type' ] } " )
SQL 실행 — Core API
# 기본 쿼리 실행
with engine.connect() as conn:
# SELECT
result = conn.execute(text( "SELECT * FROM employees LIMIT 5" ))
for row in result:
print (row)
# 파라미터 바인딩
result = conn.execute(
text( "SELECT * FROM employees WHERE department = :dept AND salary > :min_sal" ),
{ 'dept' : '개발' , 'min_sal' : 5000 }
)
rows = result.fetchall()
print ( f " \n 개발팀 고연봉: { len (rows) } 명" )
INSERT/UPDATE/DELETE
with engine.begin() as conn: # 자동 커밋/롤백
# INSERT
conn.execute(
text( "INSERT INTO employees (id, name, department, salary) VALUES (:id, :name, :dept, :sal)" ),
{ 'id' : 101 , 'name' : '신입_1' , 'dept' : '개발' , 'sal' : 3500 }
)
# UPDATE
conn.execute(
text( "UPDATE employees SET salary = salary * 1.1 WHERE department = :dept" ),
{ 'dept' : '개발' }
)
# DELETE
conn.execute(
text( "DELETE FROM employees WHERE id = :id" ),
{ 'id' : 101 }
)
# begin() 블록을 벗어나면 자동 커밋
print ( "INSERT/UPDATE/DELETE 완료" )
트랜잭션 관리
# 명시적 트랜잭션
from sqlalchemy.exc import SQLAlchemyError
with engine.connect() as conn:
trans = conn.begin()
try :
conn.execute(
text( "UPDATE employees SET salary = salary * 1.05 WHERE department = :dept" ),
{ 'dept' : '영업' }
)
conn.execute(
text( "UPDATE employees SET salary = salary * 1.03 WHERE department = :dept" ),
{ 'dept' : '마케팅' }
)
trans.commit()
print ( "트랜잭션 성공: 커밋 완료" )
except SQLAlchemyError as e:
trans.rollback()
print ( f "트랜잭션 실패: 롤백 — { e } " )
engine.begin()을 사용하면 블록 종료 시 자동으로 커밋되고, 예외 발생 시 자동으로 롤백됩니다. 대부분의 경우 이 패턴이 더 안전합니다.
ORM 기초
ORM은 Python 클래스와 데이터베이스 테이블을 매핑합니다.
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column
from sqlalchemy import String, Integer, Float
# 베이스 클래스 정의
class Base ( DeclarativeBase ):
pass
# 모델 정의
class Employee ( Base ):
"""직원 테이블 ORM 모델."""
__tablename__ = 'employees_orm'
id : Mapped[ int ] = mapped_column(Integer, primary_key = True )
name: Mapped[ str ] = mapped_column(String( 50 ))
department: Mapped[ str ] = mapped_column(String( 20 ))
salary: Mapped[ float ] = mapped_column(Float)
# 테이블 생성
Base.metadata.create_all(engine)
# ORM으로 데이터 조회
with Session(engine) as session:
# 전체 조회
all_employees = session.query(Employee).all()
# 조건 조회
dev_team = session.query(Employee).filter(
Employee.department == '개발' ,
Employee.salary > 5000
).all()
# 집계
from sqlalchemy import func
avg_salary = session.query(
Employee.department,
func.avg(Employee.salary).label( 'avg_salary' ),
func.count(Employee.id).label( 'count' )
).group_by(Employee.department).all()
for dept, avg_sal, count in avg_salary:
print ( f " { dept } : 평균 { avg_sal :.0f} 만원 ( { count } 명)" )
실무 패턴
Pandas와 결합
# SQLAlchemy 엔진 + Pandas: 가장 흔한 패턴
def get_department_report ( engine , department ):
"""부서별 보고서를 DataFrame으로 반환합니다."""
query = text( """
SELECT name, salary, hire_date
FROM employees
WHERE department = :dept
ORDER BY salary DESC
""" )
return pd.read_sql(query, engine, params = { 'dept' : department})
report = get_department_report(engine, '개발' )
print ( f "개발팀 보고서: { len (report) } 명" )
print (report.describe())
연결 관리 패턴
# Context Manager 패턴 (권장)
def safe_query ( engine , query_text , params = None ):
"""안전한 쿼리 실행 함수."""
try :
return pd.read_sql(text(query_text), engine, params = params)
except SQLAlchemyError as e:
print ( f "쿼리 오류: { e } " )
return pd.DataFrame()
# 사용
result = safe_query(engine, "SELECT * FROM employees WHERE salary > :min" , { 'min' : 5000 })
print ( f "결과: { len (result) } 행" )
AI/ML에서의 활용
데이터 파이프라인 : SQLAlchemy로 데이터 추출 → Pandas로 전처리 → sklearn으로 모델링
피처 스토어 : 전처리된 피처를 데이터베이스에 저장하여 팀 공유
모델 레지스트리 : 모델 메타데이터와 성능 지표를 데이터베이스에 기록
예측 결과 저장 : 배치 예측 결과를 데이터베이스에 저장하여 비즈니스 시스템과 연동
ORM과 Core API 중 무엇을 사용해야 하나요?
데이터 분석에서는 Core API (raw SQL + Pandas)가 더 직관적이고 효율적입니다. ORM은 웹 애플리케이션처럼 CRUD 작업이 많은 경우에 유용합니다. 둘을 혼합해서 사용해도 됩니다.
매번 새 연결을 만들면 오버헤드가 큽니다. 연결 풀(Connection Pool)은 미리 여러 연결을 만들어 두고 재사용합니다. pool_size=5면 최대 5개의 동시 연결을 유지합니다.
체크리스트
다음 문서