+ 개발

파이토치(Pytorch): 다중 선형 회귀 모델 구현

AI.Logger 2024. 9. 20. 00:03
  • 수업 내용 리마인드 및 아카이빙 목적의 업로드


 

이번 글에서는 다중 선형 회귀를 파이토치로 직접 구현해보는 과정을 소개할게요. 다층 퍼셉트론(MLP) 모델을 사용해서 비선형 데이터에 대한 예측을 해볼 거예요. 처음부터 끝까지 함께 진행하면서 학습 과정과 평가 방법까지 차근차근 살펴봐요.

 

1. 필요 모듈 로드

먼저, 필요한 라이브러리를 가져오고 GPU가 사용 가능한지 확인해볼게요. 파이토치, 데이터 전처리 및 시각화를 위한 다양한 모듈들을 사용합니다.

!pip install torchinfo

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchinfo import summary
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.metrics import r2_score

# 시드 설정 (재현 가능성을 위해)
torch.manual_seed(10)
torch.cuda.manual_seed(10)

# CUDA 사용 여부 확인
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"사용 중인 장치: {device}")

 

2. 데이터 생성 및 로드

이번에는 직접 데이터를 만들어볼 거예요. 3개의 특성을 가진 1000개의 샘플을 생성하고, 각 특성 간의 비선형 관계를 설정해보겠습니다.

# 비선형 데이터 생성
X = torch.randn(1000, 3)  # 3개의 특성을 가진 1000개의 샘플
y = 2 * X[:, 0]**2 + 3 * torch.sin(X[:, 1]) + 4 * X[:, 2]**3 + torch.randn(1000)  # 비선형 관계를 포함한 데이터

# 데이터의 크기 확인
X.shape, y.shape

# y 데이터 범위 확인
print("y 데이터 범위:")
print(f"최소값: {y.min()}, 최대값: {y.max()}")

 

3. 커스텀 데이터셋 정의 및 데이터로더 생성

torch.utils.data.Dataset 클래스를 상속받아 직접 커스텀 데이터셋을 정의한 후, 이를 훈련, 검증, 테스트 세트로 나눌 거예요. 그리고 데이터로더를 사용해서 배치 단위로 데이터를 준비해볼게요.

# 커스텀 데이터셋 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# 데이터셋 나누기 (train, validation, test)
custom_dataset = CustomDataset(X, y)
train_size = int(0.7 * len(custom_dataset))
val_size = int(0.15 * len(custom_dataset))
test_size = len(custom_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(custom_dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# 데이터 확인
for batch_x, batch_y in train_loader:
    print(batch_x.shape, batch_y.shape)
    break

 

4. 모델 정의

다층 퍼셉트론(MLP) 모델을 정의합니다. 이번 모델은 2개의 히든 레이어와 ReLU 활성화 함수를 사용합니다. 입력은 3개의 특성을 받고, 출력은 1개의 값을 예측해요.

# MLP 모델 정의
class MLPModel(nn.Module):
    def __init__(self, input_size, hidden_units, output_size):
        super(MLPModel, self).__init__()
        self.hidden1 = nn.Linear(input_size, hidden_units)
        self.hidden2 = nn.Linear(hidden_units, hidden_units)
        self.output = nn.Linear(hidden_units, output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.hidden1(x))
        x = self.relu(self.hidden2(x))
        return self.output(x)

# 모델 생성: 입력 크기 3, 히든 유닛 64, 출력 크기 1
model = MLPModel(input_size=3, hidden_units=64, output_size=1).to(device)

# 모델 요약 정보 출력
summary(model, input_size=batch_x.shape, col_names=["input_size", "output_size", "num_params"])

 

5. 모델 학습

이제 모델 학습을 위한 손실 함수와 옵티마이저를 설정할게요. 우리는 MSELoss를 사용해 예측값과 실제값의 차이를 계산하고, Adam 옵티마이저로 모델을 업데이트할 겁니다.

# 손실 함수 및 옵티마이저 정의
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 모델 학습 함수
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for X_batch, y_batch in dataloader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output.squeeze(), y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * X_batch.size(0)
    return total_loss / len(dataloader.dataset)

# 모델 평가 함수
def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            output = model(X_batch)
            loss = criterion(output.squeeze(), y_batch)
            total_loss += loss.item() * X_batch.size(0)
    return total_loss / len(dataloader.dataset)

# 훈련 루프
n_epochs = 1000
best_valid_loss = float('inf')
train_losses = []
val_losses = []

with tqdm(total=n_epochs, desc="Training", unit="epoch") as pbar:
    for epoch in range(1, n_epochs + 1):
        train_loss = train(model, train_loader, criterion, optimizer, device)
        valid_loss = evaluate(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        val_losses.append(valid_loss)

        pbar.set_description(f"Epoch {epoch}/{n_epochs}, Train Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}")
        pbar.update(1)

        # 가장 낮은 검증 손실일 때 모델 저장
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            best_epoch = epoch
            torch.save({
                'epoch': best_epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': best_valid_loss,
            }, 'best_model.pth')

print("훈련 완료.")
print(f"가장 낮은 검증 손실에서 저장된 모델은 {best_epoch}번째 에포크에서 저장됨, 검증 손실: {best_valid_loss:.4f}")

 

6. 모델 평가 및 성능 시각화

훈련이 끝났으면, 이제 테스트 데이터를 사용해 모델의 성능을 평가하고, 예측값과 실제값을 비교하는 시각화를 해볼게요.

# 테스트 데이터에서 모델 성능 평가
checkpoint = torch.load('best_model.pth')
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()

test_loss = 0.0
predictions_list = []
actual_list = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        predictions = model(X_batch)
        loss = criterion(predictions.squeeze(), y_batch)
        test_loss += loss.item() * X_batch.size(0)

        predictions_list.extend(predictions.cpu().numpy())
        actual_list.extend(y_batch.cpu().numpy())

test_loss /= len(test_loader.dataset)
print(f'테스트 손실: {test_loss:.4f}')

# 성능 지표 계산
r2 = r2_score(actual_list, predictions_list)
rmse = np.sqrt(np.mean((np.array(actual_list) - np.array(predictions_list)) ** 2))
mae = np.mean(np.abs(np.array(actual_list) - np.array(predictions_list)))

print(f'R2 점수: {r2:.4f}')
print(f'RMSE: {rmse:.4f}')
print(f'MAE: {mae:.4f}')