Skip to main content

PyTorch로 Transformer 구현

이 튜토리얼에서는 PyTorch를 사용하여 Transformer의 핵심 구성요소를 밑바닥부터 직접 구현합니다. 최종적으로 완성된 모델을 시퀀스 복사(Sequence Copy) 과제에서 학습시켜 정상 동작을 확인합니다.
이 구현은 교육 목적으로 작성되었습니다. 프로덕션 환경에서는 PyTorch의 nn.Transformer 또는 Hugging Face Transformers 라이브러리를 사용하는 것을 권장합니다.
1

위치 인코딩 구현

Transformer는 순서 정보가 없으므로, 사인/코사인 함수를 사용하여 위치 정보를 주입합니다.
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    """사인/코사인 위치 인코딩"""
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 위치 인코딩 행렬 사전 계산
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)
        )

        pe[:, 0::2] = torch.sin(position * div_term)  # 짝수 차원: sin
        pe[:, 1::2] = torch.cos(position * div_term)  # 홀수 차원: cos
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer("pe", pe)

    def forward(self, x):
        """
        x: (batch_size, seq_len, d_model)
        """
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)
register_buffer를 사용하면 위치 인코딩이 학습 파라미터가 아닌 상수 버퍼로 등록되어, 모델 저장/로드 시 함께 관리됩니다.
2

Multi-Head Attention 구현

Scaled Dot-Product Attention과 Multi-Head Attention을 구현합니다.
class MultiHeadAttention(nn.Module):
    """Multi-Head Attention"""
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        assert d_model % num_heads == 0, "d_model은 num_heads로 나누어떨어져야 합니다"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Q, K, V, Output 투영 행렬
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(p=dropout)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        Scaled Dot-Product Attention
        Q, K, V: (batch, heads, seq_len, d_k)
        mask: (batch, 1, 1, seq_len) 또는 (batch, 1, seq_len, seq_len)
        """
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-inf"))

        attn_weights = torch.softmax(scores, dim=-1)
        attn_weights = self.dropout(attn_weights)
        output = torch.matmul(attn_weights, V)
        return output, attn_weights

    def forward(self, Q, K, V, mask=None):
        """
        Q, K, V: (batch, seq_len, d_model)
        반환: (batch, seq_len, d_model)
        """
        batch_size = Q.size(0)

        # 선형 투영 + 헤드 분리
        # (batch, seq_len, d_model) → (batch, num_heads, seq_len, d_k)
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # Attention 계산
        output, attn_weights = self.scaled_dot_product_attention(Q, K, V, mask)

        # 헤드 병합: (batch, num_heads, seq_len, d_k) → (batch, seq_len, d_model)
        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # 출력 투영
        return self.W_o(output)
3

Feed-Forward Network 구현

각 위치에 독립적으로 적용되는 2층 완전연결 네트워크입니다.
class PositionWiseFeedForward(nn.Module):
    """Position-wise Feed-Forward Network"""
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(p=dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        """
        x: (batch, seq_len, d_model)
        """
        return self.linear2(self.dropout(self.relu(self.linear1(x))))
4

인코더 레이어 구현

Self-Attention + FFN에 잔차 연결과 레이어 정규화를 추가합니다.
class EncoderLayer(nn.Module):
    """Transformer 인코더 레이어"""
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(p=dropout)
        self.dropout2 = nn.Dropout(p=dropout)

    def forward(self, x, src_mask=None):
        """
        x: (batch, src_len, d_model)
        src_mask: (batch, 1, 1, src_len) 패딩 마스크
        """
        # 서브레이어 1: Multi-Head Self-Attention + Add & Norm
        attn_output = self.self_attn(x, x, x, src_mask)
        x = self.norm1(x + self.dropout1(attn_output))

        # 서브레이어 2: FFN + Add & Norm
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout2(ffn_output))
        return x
5

디코더 레이어 구현

인코더 레이어에 Masked Self-Attention과 Cross-Attention을 추가합니다.
class DecoderLayer(nn.Module):
    """Transformer 디코더 레이어"""
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.cross_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(p=dropout)
        self.dropout2 = nn.Dropout(p=dropout)
        self.dropout3 = nn.Dropout(p=dropout)

    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
        """
        x: (batch, tgt_len, d_model)
        encoder_output: (batch, src_len, d_model)
        src_mask: (batch, 1, 1, src_len) 패딩 마스크
        tgt_mask: (batch, 1, tgt_len, tgt_len) 인과적 마스크
        """
        # 서브레이어 1: Masked Self-Attention
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout1(attn_output))

        # 서브레이어 2: Cross-Attention (Q=디코더, K/V=인코더)
        attn_output = self.cross_attn(x, encoder_output, encoder_output, src_mask)
        x = self.norm2(x + self.dropout2(attn_output))

        # 서브레이어 3: FFN
        ffn_output = self.ffn(x)
        x = self.norm3(x + self.dropout3(ffn_output))
        return x
6

전체 Transformer 조립

인코더, 디코더, 임베딩, 출력 레이어를 조합하여 전체 Transformer를 완성합니다.
class Transformer(nn.Module):
    """Transformer 전체 모델"""
    def __init__(
        self,
        src_vocab_size,
        tgt_vocab_size,
        d_model=512,
        num_heads=8,
        num_layers=6,
        d_ff=2048,
        max_len=5000,
        dropout=0.1,
    ):
        super().__init__()
        self.d_model = d_model

        # 임베딩 + 위치 인코딩
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len, dropout)

        # 인코더 스택
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

        # 디코더 스택
        self.decoder_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

        # 출력 레이어
        self.output_linear = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(p=dropout)

    def encode(self, src, src_mask=None):
        """인코더: 소스 시퀀스 인코딩"""
        # 임베딩 + 스케일링 + 위치 인코딩
        x = self.src_embedding(src) * math.sqrt(self.d_model)
        x = self.positional_encoding(x)

        for layer in self.encoder_layers:
            x = layer(x, src_mask)
        return x

    def decode(self, tgt, encoder_output, src_mask=None, tgt_mask=None):
        """디코더: 타겟 시퀀스 디코딩"""
        x = self.tgt_embedding(tgt) * math.sqrt(self.d_model)
        x = self.positional_encoding(x)

        for layer in self.decoder_layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return x

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        """
        src: (batch, src_len) - 소스 토큰 ID
        tgt: (batch, tgt_len) - 타겟 토큰 ID
        """
        encoder_output = self.encode(src, src_mask)
        decoder_output = self.decode(tgt, encoder_output, src_mask, tgt_mask)
        logits = self.output_linear(decoder_output)  # (batch, tgt_len, tgt_vocab_size)
        return logits
7

마스크 유틸리티 함수

패딩 마스크와 인과적 마스크를 생성하는 유틸리티입니다.
def create_padding_mask(seq, pad_idx=0):
    """패딩 토큰(pad_idx)을 마스킹합니다."""
    # (batch, seq_len) → (batch, 1, 1, seq_len)
    return (seq != pad_idx).unsqueeze(1).unsqueeze(2)

def create_causal_mask(size):
    """인과적 마스크: 미래 토큰 참조를 차단합니다."""
    # (1, 1, size, size)
    mask = torch.tril(torch.ones(size, size)).unsqueeze(0).unsqueeze(0)
    return mask

def create_tgt_mask(tgt, pad_idx=0):
    """타겟 시퀀스용 결합 마스크 (패딩 + 인과적)"""
    tgt_pad_mask = create_padding_mask(tgt, pad_idx)  # (batch, 1, 1, tgt_len)
    tgt_causal_mask = create_causal_mask(tgt.size(1)).to(tgt.device)  # (1, 1, tgt_len, tgt_len)
    # 두 마스크를 결합 (둘 다 1인 위치만 참조 가능)
    return tgt_pad_mask & tgt_causal_mask
8

시퀀스 복사 과제로 학습

구현한 Transformer가 정상 동작하는지 검증하기 위해, 입력 시퀀스를 그대로 출력하는 간단한 복사 과제를 학습합니다.
import torch.optim as optim

# === 데이터 생성 ===
def generate_copy_data(batch_size, seq_len, vocab_size, pad_idx=0, sos_idx=1):
    """시퀀스 복사 과제 데이터 생성
    입력: [3, 5, 7, 2]
    출력: [SOS, 3, 5, 7, 2]  (SOS 토큰으로 시작)
    """
    # 2부터 시작 (0=PAD, 1=SOS 예약)
    src = torch.randint(2, vocab_size, (batch_size, seq_len))

    # 타겟: SOS + 소스 시퀀스
    tgt_input = torch.cat([
        torch.full((batch_size, 1), sos_idx),
        src
    ], dim=1)  # (batch, seq_len + 1)

    # 정답: 소스 시퀀스 + EOS (여기서는 PAD로 대체)
    tgt_output = torch.cat([
        src,
        torch.full((batch_size, 1), pad_idx)
    ], dim=1)  # (batch, seq_len + 1)

    return src, tgt_input, tgt_output


# === 하이퍼파라미터 ===
VOCAB_SIZE = 20    # 어휘 크기
D_MODEL = 64       # 모델 차원 (교육용으로 축소)
NUM_HEADS = 4      # 어텐션 헤드 수
NUM_LAYERS = 2     # 레이어 수
D_FF = 128         # FFN 내부 차원
SEQ_LEN = 8        # 시퀀스 길이
BATCH_SIZE = 32
EPOCHS = 50
LR = 1e-3
PAD_IDX = 0
SOS_IDX = 1

# === 모델 초기화 ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Transformer(
    src_vocab_size=VOCAB_SIZE,
    tgt_vocab_size=VOCAB_SIZE,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    num_layers=NUM_LAYERS,
    d_ff=D_FF,
    dropout=0.1,
).to(device)

optimizer = optim.Adam(model.parameters(), lr=LR, betas=(0.9, 0.98), eps=1e-9)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

print(f"모델 파라미터 수: {sum(p.numel() for p in model.parameters()):,}")

# === 학습 루프 ===
model.train()
for epoch in range(EPOCHS):
    src, tgt_input, tgt_output = generate_copy_data(
        BATCH_SIZE, SEQ_LEN, VOCAB_SIZE, PAD_IDX, SOS_IDX
    )
    src = src.to(device)
    tgt_input = tgt_input.to(device)
    tgt_output = tgt_output.to(device)

    # 마스크 생성
    src_mask = create_padding_mask(src, PAD_IDX).to(device)
    tgt_mask = create_tgt_mask(tgt_input, PAD_IDX).to(device)

    # 순전파
    logits = model(src, tgt_input, src_mask, tgt_mask)
    # logits: (batch, tgt_len, vocab_size)
    # 손실 계산을 위해 reshape
    loss = criterion(
        logits.view(-1, VOCAB_SIZE),
        tgt_output.view(-1)
    )

    # 역전파
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        # 정확도 계산
        preds = logits.argmax(dim=-1)
        correct = (preds == tgt_output).float()
        # PAD 위치 제외
        non_pad = (tgt_output != PAD_IDX).float()
        accuracy = (correct * non_pad).sum() / non_pad.sum()
        print(f"Epoch {epoch+1:3d} | Loss: {loss.item():.4f} | Accuracy: {accuracy:.4f}")
학습이 잘 진행되면 Accuracy가 1.0에 수렴해야 합니다. 시퀀스 복사는 단순한 과제이므로, 올바르게 구현되었다면 50 에포크 이내에 거의 완벽한 정확도에 도달합니다.
9

추론 (Greedy Decoding)

학습된 모델로 새로운 시퀀스를 복사해 봅니다.
@torch.no_grad()
def greedy_decode(model, src, max_len, sos_idx=1, pad_idx=0):
    """Greedy Decoding으로 출력 시퀀스를 생성합니다."""
    model.eval()
    device = src.device

    # 인코더 출력 계산 (한 번만)
    src_mask = create_padding_mask(src, pad_idx).to(device)
    encoder_output = model.encode(src, src_mask)

    # SOS 토큰으로 시작
    tgt_ids = torch.full((src.size(0), 1), sos_idx, dtype=torch.long, device=device)

    for _ in range(max_len):
        tgt_mask = create_tgt_mask(tgt_ids, pad_idx).to(device)
        decoder_output = model.decode(tgt_ids, encoder_output, src_mask, tgt_mask)
        logits = model.output_linear(decoder_output[:, -1:, :])  # 마지막 위치만
        next_token = logits.argmax(dim=-1)  # (batch, 1)
        tgt_ids = torch.cat([tgt_ids, next_token], dim=1)

    return tgt_ids[:, 1:]  # SOS 제거


# === 추론 테스트 ===
model.eval()
test_src = torch.tensor([[3, 5, 7, 2, 11, 15, 8, 4]], device=device)
predicted = greedy_decode(model, test_src, max_len=SEQ_LEN)

print(f"입력:   {test_src[0].tolist()}")
print(f"예측:   {predicted[0].tolist()}")
# 올바르게 학습되었다면 입력과 예측이 동일해야 합니다
다음 사항을 점검해 보세요.
  • 학습률: 1e-3이 너무 크거나 작을 수 있습니다. 1e-4~3e-4 범위를 시도해 보세요
  • 마스크 검증: tgt_mask가 인과적 마스크 + 패딩 마스크를 올바르게 결합하고 있는지 확인하세요
  • 임베딩 스케일링: sqrt(d_model)을 곱하는 부분이 빠지면 학습 초기에 그래디언트가 불안정해질 수 있습니다
  • 그래디언트 클리핑: clip_grad_norm_이 없으면 그래디언트 폭발이 발생할 수 있습니다
  • 에포크 수: 최소 30~50 에포크는 학습해야 합니다
시퀀스 복사와 비교하여 다음이 추가로 필요합니다.
  • 토크나이저: BPE(Byte Pair Encoding) 또는 SentencePiece로 텍스트를 토큰화
  • 대규모 데이터: 수백만 문장 쌍 (WMT 데이터셋 등)
  • 학습률 스케줄러: Warmup + Inverse Square Root Decay
  • Label Smoothing: 일반화 성능 향상을 위한 정규화
  • Beam Search: Greedy Decoding 대신 빔 서치로 더 나은 출력 생성
  • BLEU 평가: 번역 품질 평가 메트릭
  • 모델 크기: d_model=512, num_layers=6, num_heads=8 (원 논문 설정)
PyTorch의 nn.Transformer는 이 구현과 동일한 아키텍처를 최적화된 형태로 제공합니다.
# PyTorch 내장 Transformer 사용 예
model = nn.Transformer(
    d_model=512,
    nhead=8,
    num_encoder_layers=6,
    num_decoder_layers=6,
    dim_feedforward=2048,
)
차이점: 내장 구현은 F.multi_head_attention_forward를 사용하여 C++/CUDA 수준에서 최적화되어 있어 속도가 더 빠릅니다. 마스크 형식도 약간 다릅니다 (additive mask 사용). 교육 목적으로는 직접 구현이 이해에 유리하고, 실무에서는 내장 또는 Hugging Face 구현을 사용하는 것이 좋습니다.

다음 문서