본문 바로가기

문돌이 존버/데이터 분석

RNN 개념 잡고 간단 예제 코드 돌려보기 (2)

반응형
해당 글은 핸즈온 머신러닝 2판을 기준으로 작성되었습니다.

시계열 예측하기

시간당 접속 사용자 수, 도시의 날짜별 온도, 여러 지표를 사용한 기업 분기별 재정 안정성 등은 모두 타임 스텝마다 하나 이상의 값을 가진 긴 시퀀스다. 이를 시계열(time series)이라 부르며, 타임 스텝마다 하나의 값을 가지는 것을 단변량 시계열(univariate time series), 2개 이상 값을 가지는 것을 다변량 시계열(multivariate time series)이라 한다.

아래 그림을 보면 50개의 타임 스텝이 있고, 파란색 X로 표시된 다음 스텝의 값을 예측하는 것이다.

<출처: hands-on mahcine learning>

이제 시계열 데이터를 생성하고 테스트해보자.  

def generate_time_series(batch_size, n_steps):
    freq1, freq2, offsets1, offsets2 = np.random.rand(4, batch_size, 1)
    time = np.linspace(0, 1, n_steps)
    series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10))  # 사인 곡선 1
    series += 0.2 * np.sin((time - offsets2) * (freq2 * 20 + 20)) # 사인 곡선 2
    series += 0.1 * (np.random.rand(batch_size, n_steps) - 0.5)   # 잡음(noise)
    return series[..., np.newaxis].astype(np.float32)

단변량 시계열 데이터를 생성하는 것이고, 위 함수의 출력 형태는 (배치 크기, 타임 스텝 수, 1) 넘파이 배열이다. 이것을 잘 모르겠다면 미니 테스트를 해보면 된다.

# np.newaxis 알아보기
test = np.array([[1, 3], [2, 4]])
print(test)
print('\n')
print(test[..., np.newaxis].astype(np.int32)) # 새로운 차원 추가

import numpy as np
np.random.seed(42)

n_steps = 50
series = generate_time_series(10000, n_steps + 1)
X_train, y_train = series[:7000, :n_steps], series[:7000, -1]
X_valid, y_valid = series[7000:9000, :n_steps], series[7000:9000, -1]
X_test, y_test = series[9000:, :n_steps], series[9000:, -1]

여기까지 코드를 돌리고 시각화하면 위의 그래프와 같이 나온다.

RNN을 시작하기 전 기준 성능을 몇 개 준비하는 것이 좋다. 간단한 방법은 각 시계열의 마지막 값을 그대로 예측하는 것으로, 이를 순진한 예측(naive forecasting)이라 부른다. 

y_pred = X_valid[:, -1]
np.mean(keras.losses.mean_squared_error(y_valid, y_pred))

또 다른 방법은 FC 네트워크를 사용하는 것이다. 해당 네트워크는 입력마다 1차원 특성 배열을 기대하기 때문에 Flatten 층을 추가해야 한다.

model = keras.models.Squential([
    keras.layers.Flatten(input_shape=[50, 1]),
    keras.layers.Dense(1)
])

간단한 RNN 구현

model = keras.models.Sequential([
    keras.layers.SimpleRNN(1, input_shape=[None, 1])
])

input_shape 의 첫 번째 입력 차원을 None으로 정했다는 것은 어떤 길이의 타임 스텝도 처리할 수 있다는 것이다. 기본적으로 SimpleRNN 층은 하이퍼볼릭탄젠트 활성화 함수를 사용한다. 

트랜드와 계절성
가중 이동 평균(weighted moving average)나 자동 회귀 누적 이동 평균(autoregressive integrated moving average, ARIMA) 같이 시계열을 예측하는 다른 방법이 많다. 이런 방법 중 일부는 트렌드(trend)나 계절성(seasonality)을 제거해야 한다. 매 타임 스텝의 값과 작년도(혹은 한달, 하루) 값의 차이를 계산하여 사용하며 이를 차분(differencing)이라 부른다.

RNN은 일반적으로 이런 작업이 모두 필요 없지만, 어떤 경우 트렌드나 계절성을 학습할 필요가 없기 때문에 성능을 향상할 수 있다.

심층 RNN

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.SimpleRNN(1)
])

모든 순환 층에서 return_sequences=True 로 설정해야 한다. 단 마지막 층은 마지막 출력만 관심 대상이므로 설정하지 않아도 된다. 위의 코드는 마지막 층 때문에 이상적이지 않다. 보통 은닉 층이 하나로만 이루어지는 것은 쓸모가 없으며 따라서 아래 코드로 바꿔준다.

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(1)
])

여러 타임 스텝 앞을 예측

지금까지는 다음 타임 스텝의 값만 예측했지만 여러 타임 스텝 앞의 값도 예측할 수 있다. 즉 다음 날만 예측하는 것이 아니라 다음 일주일도 예측할 수 있다는 뜻이다.

series = generate_time_series(1, n_steps + 10)
X_new, Y_new = series[:, :n_steps], series[:, n_steps:]
X = X_new
for step_ahead in range(10):
    y_pred_one = model.predict(X[:, step_ahead:])[:, np.newaxis, :] # 다음 날 예측
    X = np.concatenate([X, y_pred_one], axis=1) # 예측한 값을 다시 학습 데이터셋과 결합

Y_pred = X[:, n_steps:]

<출처: hands-on mahcine learning>

다음은 RNN을 훈련하여 다음 값 10개를 한 번에 예측해보자. 먼저 타깃을 다음 10개의 값이 담긴 벡터로 바꾸어야 한다.

series = generate_time_series(10000, n_steps + 10)
# Y_train, valid, test의 sequence_length가 10이 됨
X_train, Y_train = series[:7000, :n_steps], series[:7000, -10:, 0] 
X_valid, Y_valid = series[7000:9000, :n_steps], series[7000:9000, -10:, 0]
X_test, Y_test = series[9000:, :n_steps], series[9000:, -10:, 0]
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(10)
])

Y_pred = model.predict(X_new)

이 시퀀스-투-벡터 RNN을 시퀀스-투-시퀀스 RNN으로 바꿀 수도 있다. 즉 모든 타임 스텝에서 다음 값 10개를 예측하도록 모델을 훈련하는 것이다.

Y = np.empty((10000, n_steps, 10))
for step_ahead in range(1, 10 + 1):
    Y[:, :, step_ahead - 1] = series[:, step_ahead:step_ahead + n_steps, 0]
Y_train = Y[:7000]
Y_valid = Y[7000:9000]
Y_test = Y[9000:]
X_train.shape # (7000, 50, 1)
Y_train.shape # (7000, 50, 10)
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True), # TimeDistributed 적용을 위해 마지막 층에도 return_sequences=True 필수
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.summary()

TimeDistributed 층은 모든 타임 스텝에서 출력을 Dense 층에 적용하는 역할을 한다. 쉽게 말해 매 스텝마다 FC가 연결된 것처럼 이해할 수 있다. 각 타임 스텝을 별개의 샘플처럼 다루도록 입력의 크기를 바꾸어 이를 효과적으로 수행한다. 

(배치 크기, 타임 스텝 수, 입력 차원) -> (배치 크기 x 타임 스텝 수, 입력 차원)
본 예제에선 n_steps=50이므로 FC의 최종 출력은 (50, 10) 이다.

<출처: 아마추어 퀀트 블로그>

아마추어 퀀트님의 소개를 참고하여 말하면, TimeDistributed를 통해 각 스텝 지점에서 발생하는 오류가 하위 스텝으로 전파된다. 반대로 TimeDistributed를 사용하지 않으면 오류가 마지막 층에서 한 번 계산되고 이것이 첫 스텝까지 전파된다. 각각 many-to-many, many-to-one RNN이라고 부른다.

마지막 출력 크기는 시퀀스로 되돌린다.

(배치 크기 x 타임 스텝 수, 출력 차원) -> (배치 크기, 타임 스텝 수, 출력 차원)
def last_time_step_mse(Y_true, Y_pred):
    return keras.metrics.mean_squared_error(Y_true[:, -1], Y_pred[:, -1])

model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.01), metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

훈련하는 동안 모든 출력이 필요하지만 예측과 평가에는 마지막 타임 스텝의 출력만 사용된다.

이렇게 간단한 RNN을 살펴봤는데, 간단한 RNN의 경우 시계열을 예측하거나 다른 종류의 시퀀스를 잘 다룰 수 있다. 하지만 긴 시계열이나 시퀀스에선 잘 작동하지 않는데 이때 사용하는 다른 RNN 계열을 이어서 공부해보자.

728x90
반응형