NNablaでRNNの実装

あけましておめでとうございます。新年一発目のエントリです。
NNablaでKerasっぽくRNNを書きました。

Recurrent Neural Networks (RNNs)

RNNは現在の入力w(t)\boldsymbol{w}(t)を重みUUによって隠れ状態に変換し,過去の入力から生成された隠れ状態s(t1)\boldsymbol{s}(t-1)と足し合わせて出力y(t)\boldsymbol{y}(t)の生成を行うモデルです。

rnn s(t)=f(Uw(t)+Ws(t1)+b){\displaystyle \boldsymbol{s}(t) = f\left( U \boldsymbol{w}(t) + W \boldsymbol{s}(t-1) + \boldsymbol{b} \right)} y(t)=g(Vs(t)+b){\displaystyle \boldsymbol{y}(t) = g\left( V \boldsymbol{s}(t) + \boldsymbol{b} \right)}

NNabla公式のRNNの例では

def rnn(xs, h0, hidden=32):
    hs = []
    with nn.parameter_scope("rnn"):
        h = h0
        # Time step loop
        for x in xs:
            # Note: Parameter scopes are reused over time
            # which means parametrs are shared over time.
            with nn.parameter_scope("x2h"):
                x2h = PF.affine(x, hidden, with_bias=False)
            with nn.parameter_scope("h2h"):
                h2h = PF.affine(h, hidden)
            h = F.tanh(x2h + h2h)
            hs.append(h)
        with nn.parameter_scope("classifier"):
            y = PF.affine(h, 10)
    return y, hs

となっていましたが,これは入力のxsnn.Variableのリストでbatch学習を考えているものではありませんでした。 今回はこれをbatch化し,隠れ状態s\boldsymbol{s}だけを考えます。 まず,入力は,batch sizeが32,sentence lengthが20,embeddingsにする前のindexの列を考えます。

import nnabla as nn
batch_size = 32
sentence_length = 20
x = nn.Variable((batch_size, sentence_length))

実際にxに入る値は,

array([[   2,    3,    4, ...,   19,   20,   21],
       [  27,   28,   29, ...,    0,    0,    0],
       [  27,   41,   42, ...,    0,    0,    0],
       ...,
       [5046,   36, 1177, ...,    0,    0,    0],
       [ 841,   31, 1215, ...,    0,    0,    0],
       [3648, 3649,  182, ...,  918, 3197,  259]], dtype=int32)

のようなものが考えられます。
適当に辞書サイズを10000,埋め込みベクトルの次元を128とおいて埋め込みベクトルの系列へ変換すると,

import nnabla.parametric_functions as PF
h = PF.embed(x, 10000, 128, name='embed')
print(h)
# <Variable((32, 20, 128), need_grad=True) at 0x7f8bdf911ef8>

となります。
このhに対してRNNを適用したいので,sentenceの頭から順に処理することを考えます。 なので,このhが下記のようにsentence length分のnnabla.Variableになっていてほしいです。

[<Variable((32, 128), need_grad=True) at 0x7f8bdf911f48>,
 <Variable((32, 128), need_grad=True) at 0x7f8bdfb07728>,
 #中略
 <Variable((32, 128), need_grad=True) at 0x7f8bdf91e9f8>,
 <Variable((32, 128), need_grad=True) at 0x7f8bdf91ea48>]

便利なことにnnabla.functions.splitを使えば,<Variable((32, 20, 128), need_grad=True) at 0x7f8bdf911ef8>を上記のように分割することができます。

また,隠れ状態s\boldsymbol{s}を保存しておくための変数が必要です。
これは,(batch size)*隠れ状態のベクトルの次元分だけあればよいので,

import numpy as np
h0 = nn.Variable.from_numpy_array(np.zeros((batch_size, units)))

としておけばよいでしょう。
最終的に出来上がったRNNが以下の通りです。

import nnabla as nn
import nnabla.parametric_functions as PF
import nnabla.functions as F
import numpy as np

def SimpleRNN(inputs, units, return_sequences=False):
    '''
    A vanilla recurrent neural network layer
    Args:
        inputs (nnabla.Variable): A shape of [B, SentenceLength, EmbeddingSize].
        units (int): Dimensionality of the output space.
        return_sequences (bool): Whether to return the last output. in the output sequence, or the full sequence.
    Returns:
        nn.Variable: A shape [B, SentenceLength, units].
        or
        nn.Variable: A shape [B, units]
    '''

    hs = []
    batch_size = inputs.shape[0]
    sentence_length = inputs.shape[1]
    h0 = nn.Variable.from_numpy_array(np.zeros((batch_size, units)))

    inputs = F.split(inputs, axis=1) # split in the direction of sequence

    h = h0
    for x in inputs:
        with nn.parameter_scope('x2h'):
            x2h = PF.affine(x, units, with_bias=False)
        with nn.parameter_scope('h2h'):
            h2h = PF.affine(h, units)
        h = F.tanh(x2h + h2h)
        if return_sequences:
            h = F.reshape(h, (batch_size, 1, units))
        hs.append(h)

    if return_sequences:
        hs = F.concatenate(*hs, axis=1)
        return hs
    else:
        return hs[-1]

Kerasと同様にreturn_sequences (bool)を引数に用意しており,Falseにすると

last = SimpleRNN(h, 128, return_sequences=False)
print(last)
# <Variable((32, 128), need_grad=True) at 0x7f8bdf8a2a48>

のように最後の出力のみが,Trueにすると,

all_hidden = SimpleRNN(h, 128, return_sequences=True)
print(all_hidden)
# <Variable((32, 20, 128), need_grad=True) at 0x7f8bdf8a29f8>

のように全てのタイムスタンプでの出力が得られます。

この隠れ状態からy\boldsymbol{y}を計算したいのですが,RNNを汎用的なものにするためにその処理を外に出すことを考えます。 RNNの出力,<Variable((32, 20, 128), need_grad=True) at 0x7f8bdf8a29f8>に対してPF.affineを適用するには,F.splitを用いて系列方向に分割して,それぞれのタイムスタンプでPF.affineを適用します。汎用的なものにするために以下のような関数を書いてみました。

def TimeDistributed(func):
    def TimeDistributedFunc(x, *args, **kwargs):
        ret = []
        batch_size = x.shape[0]
        units = x.shape[2]
        for x_ in F.split(x, axis=1):
            value = func(x_, *args, **kwargs)
            _, output_dim = value.shape
            ret.append(F.reshape(value, (batch_size, 1, output_dim)))
        return F.concatenate(*ret, axis=1)
    return TimeDistributedFunc

これを用いると,

y = TimeDistributed(PF.affine)(all_hidden, 10000)
print(y)
# <Variable((32, 20, 10000), need_grad=True) at 0x7f8bdf6890e8>

のように書くことができます。


Written by@Minato Sato
Senior Software Engineer - Embedded AI

GitHubTwitterFacebookLinkedIn

© 2023 Minato Sato. All Rights Reserved