久しぶりの投稿になりました!今回は自然言語処理に挑戦してみました。
そもそも自然言語処理とは
「人間が日常的に使っている自然言語をコンピュータに処理させる試み」
のことです。自然言語処理を用いることで音声認識や質問応答システム、情報抽出などのシステムを作ることもできます。
今回はPythonを使って簡単な自然言語処理に挑戦してみます。挑戦する内容は「足し算を答える」受け答えアプリケーションです。
今回はプログラムで計算して答えを出すのではなく、数字や記号に対してどのような処理をすべきなのかを全く与えない状態で
人工知能に勉強させながら徐々に正確な回答を得ていきます。
今回はニューラルネットワークのモデルの一つSequence-to-Sequence(Seq2Seq)を用いてやっていきます。Seq2Seqとは、入出力データが
ある時系列データに対し、再帰型ニューラルネットワーク(RNN)を用い処理していく仕組みです。ムズカシイ話は後述する書籍を参考に
していただければと思います。早速中身を見ていきましょう
大きな流れとしてモデルを定義するinference()、誤差関数を定義するloss()、正解率を求めるaccuracy()学習アルゴリズムを定義するtraining()、それらを用いて具体的に
処理をするmain処理に分けてみていきます。
まずinference()についてです。サンプルソースは以下のようになります
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import numpy as np import tensorflow as tf from tensorflow.contrib import rnn from sklearn.model_selection import train_test_split from sklearn.utils import shuffle np.random.seed(0) tf.set_random_seed(1234) def inference(x, y, n_batch, is_training, input_digits=None, output_digits=None, n_hidden=None, n_out=None): def weight_variable(shape): initial = tf.truncated_normal(shape, stddev=0.01) return tf.Variable(initial) def bias_variable(shape): initial = tf.zeros(shape, dtype=tf.float32) return tf.Variable(initial) # Encode encoder = rnn.BasicLSTMCell(n_hidden, forget_bias=1.0) state = encoder.zero_state(n_batch, tf.float32) encoder_outputs = [] encoder_states = [] with tf.variable_scope('Encoder'): for t in range(input_digits): if t > 0: tf.get_variable_scope().reuse_variables() (output, state) = encoder(x[:, t, :], state) encoder_outputs.append(output) encoder_states.append(state) # Decode decoder = rnn.BasicLSTMCell(n_hidden, forget_bias=1.0) state = encoder_states[-1] decoder_outputs = [encoder_outputs[-1]] # 出力層の重みとバイアスを事前に定義 V = weight_variable([n_hidden, n_out]) c = bias_variable([n_out]) outputs = [] with tf.variable_scope('Decoder'): for t in range(1, output_digits): if t > 1: tf.get_variable_scope().reuse_variables() if is_training is True: (output, state) = decoder(y[:, t-1, :], state) else: # 直前の出力を求める linear = tf.matmul(decoder_outputs[-1], V) + c out = tf.nn.softmax(linear) outputs.append(out) out = tf.one_hot(tf.argmax(out, -1), depth=output_digits) (output, state) = decoder(out, state) decoder_outputs.append(output) if is_training is True: output = tf.reshape(tf.concat(decoder_outputs, axis=1), [-1, output_digits, n_hidden]) linear = tf.einsum('ijk,kl->ijl', output, V) + c return tf.nn.softmax(linear) else: # 最後の出力を求める linear = tf.matmul(decoder_outputs[-1], V) + c out = tf.nn.softmax(linear) outputs.append(out) output = tf.reshape(tf.concat(outputs, axis=1), [-1, output_digits, n_out]) return output |
LSTM(long short-term memory)という考え方を用いて実装しています。前半でエンコード、デコード処理を実施し、後半でデコード処理の出力に対し
活性化処理をすることでモデルの出力シーケンスを得る動きになります
次にloss()とtraining()、accuracy()の内容です。サンプルソースです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
def loss(y, t): cross_entropy = \ tf.reduce_mean(-tf.reduce_sum( t * tf.log(tf.clip_by_value(y, 1e-10, 1.0)), reduction_indices=[1])) return cross_entropy def training(loss): optimizer = \ tf.train.AdamOptimizer(learning_rate=0.001, beta1=0.9, beta2=0.999) train_step = optimizer.minimize(loss) return train_step def accuracy(y, t): correct_prediction = tf.equal(tf.argmax(y, -1), tf.argmax(t, -1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) return accuracy |
tensorflowを用いて誤差関数および学習アルゴリズム、正解率を定義しています。
これで定義が完了したのでmain処理に入っていきます。サンプルです
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
if __name__ == '__main__': def n(digits=3): number = '' for i in range(np.random.randint(1, digits + 1)): number += np.random.choice(list('0123456789')) return int(number) def padding(chars, maxlen): return chars + ' ' * (maxlen - len(chars)) ''' データの生成 ''' N = 20000 N_train = int(N * 0.9) N_validation = N - N_train digits = 4 input_digits = digits * 2 + 1 output_digits = digits + 1 added = set() questions = [] answers = [] chars = '0123456789+ ' char_indices = dict((c, i) for i, c in enumerate(chars)) indices_char = dict((i, c) for i, c in enumerate(chars)) while len(questions) < N: a, b = n(digits), n(digits) pair = tuple(sorted((a, b))) if pair in added: continue question = '{}+{}'.format(a, b) question = padding(question, input_digits) answer = str(a + b) answer = padding(answer, output_digits) added.add(pair) questions.append(question) answers.append(answer) X = np.zeros((len(questions), input_digits, len(chars)), dtype=np.integer) Y = np.zeros((len(questions), digits + 1, len(chars)), dtype=np.integer) for i in range(N): for t, char in enumerate(questions[i]): X[i, t, char_indices[char]] = 1 for t, char in enumerate(answers[i]): Y[i, t, char_indices[char]] = 1 X_train, X_validation, Y_train, Y_validation = \ train_test_split(X, Y, train_size=N_train) ''' モデル設定 ''' n_in = len(chars) n_hidden = 128 n_out = len(chars) x = tf.placeholder(tf.float32, shape=[None, input_digits, n_in]) t = tf.placeholder(tf.float32, shape=[None, output_digits, n_out]) n_batch = tf.placeholder(tf.int32, shape=[]) is_training = tf.placeholder(tf.bool) y = inference(x, t, n_batch, is_training, input_digits=input_digits, output_digits=output_digits, n_hidden=n_hidden, n_out=n_out) loss = loss(y, t) train_step = training(loss) acc = accuracy(y, t) history = { 'val_loss': [], 'val_acc': [] } ''' モデル学習 ''' epochs = 200 batch_size = 200 init = tf.global_variables_initializer() sess = tf.Session() sess.run(init) n_batches = N_train // batch_size for epoch in range(epochs): print('=' * 10) print('Epoch:', epoch) print('=' * 10) X_, Y_ = shuffle(X_train, Y_train) for i in range(n_batches): start = i * batch_size end = start + batch_size sess.run(train_step, feed_dict={ x: X_[start:end], t: Y_[start:end], n_batch: batch_size, is_training: True }) # 検証データを用いた評価 val_loss = loss.eval(session=sess, feed_dict={ x: X_validation, t: Y_validation, n_batch: N_validation, is_training: False }) val_acc = acc.eval(session=sess, feed_dict={ x: X_validation, t: Y_validation, n_batch: N_validation, is_training: False }) history['val_loss'].append(val_loss) history['val_acc'].append(val_acc) print('validation loss:', val_loss) print('validation acc: ', val_acc) # 検証データからランダムに問題を選んで答え合わせ for i in range(10): index = np.random.randint(0, N_validation) question = X_validation[np.array([index])] answer = Y_validation[np.array([index])] prediction = y.eval(session=sess, feed_dict={ x: question, # t: answer, n_batch: 1, is_training: False }) question = question.argmax(axis=-1) answer = answer.argmax(axis=-1) prediction = np.argmax(prediction, -1) q = ''.join(indices_char[i] for i in question[0]) a = ''.join(indices_char[i] for i in answer[0]) p = ''.join(indices_char[i] for i in prediction[0]) print('-' * 10) print('Q: ', q) print('A: ', p) print('T/F:', end=' ') if a == p: print('T') else: print('F') print('-' * 10) |
今回は4桁同士の足し算を200エポック学習させてみました。最初の段階では
Q: 3205+6
A: 3149
T/F: F
や
Q: 1945+528
A: 3301
T/F: F
といったとんでもない計算をしていましたが最後のほうには
Q: 6687+2
A: 6689
T/F: T
や
Q: 389+695
A: 1064
T/F: F
といったように計算できるようになりました。正解率9割ぐらいで計算できるようになっていました。
今後この方法を応用してチャットボットを作成していく予定です。また進捗があればここでお知らせします。