読者です 読者をやめる 読者になる 読者になる

kivantium活動日記

プログラムを使っていろいろやります

TensorFlowでアニメゆるゆりの制作会社を識別する

ついにガロアが死んだ年齢を超えてしまったことに気がつき、自分がまだ何も成し遂げていないことを悲しく思う今日このごろです。

さて、今日はGoogleが出した機械学習ライブラリのTensorFlowの使い方について軽く説明しつつ、ゆるゆりの制作会社の識別を行おうと思います。

TensorFlowとは

TensorFlowはGoogleが11/9に公開したApache 2.0ライセンスで使える機械学習ライブラリです。Googleは様々なところでプロダクトに機械学習を活用していますが、TensorFlowは実際にGoogle内部の研究で使われているそうです(TensorFlow: Google 最新の機械学習ライブラリをオープンソース公開 - Google Developer Japan Blog)。

Googleのネームバリューは恐ろしいもので、GitHubのStar数はすでにChainerCaffeを上回っています。このままコミュニティが成長していけば機械学習デファクトスタンダードになる可能性も高いと思います。公開されている機能は社内で使われているものに比べると制限されているとは思いますが、学んでおく価値はあるでしょう。

TensorFlowのインストール

公式ドキュメントを読むのが一番です。

今回はUbuntu上のVirtualEnvにインストールしました。CPU版を使いました。

sudo apt-get install python-pip python-dev python-virtualenv
mkdir tensorflow
virtualenv --system-site-packages ~/tensorflow
cd ~/tensorflow
source bin/activate
pip install --upgrade https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.5.0-cp27-none-linux_x86_64.whl

古いバージョンのprotobufが入っている場合はVirtualEnvであってもバージョンの衝突を起こす場合があります。一度アンインストールすると直る場合が多いです。特にCaffeをインストールしている人は気をつけてください。

まずはHello, world!

インストールができたかどうかの確認を兼ねて、Hello, world!をやってみます。

$ python

>>> import tensorflow as tf
>>> hello = tf.constant('Hello, TensorFlow!')
>>> sess = tf.Session()
>>> print sess.run(hello)
Hello, TensorFlow!
>>> a = tf.constant(10)
>>> b = tf.constant(32)
>>> print sess.run(a+b)
42
>>>

このように、TensorFlowでは最初に演算を定義してから、後でsessionを呼び出して実際に演算を行うという流れで処理を行います。

動くことが分かったらBasic Usageを読んで概要を把握するのが良いと思います。今後の話に必要なことをザッとまとめておきます。

  • TensorFlowの演算はグラフとして記述する
  • グラフのノードはop(operationの略)と呼ばれる
  • opはTensorを受け取って、Tensorを返す
  • グラフはSessionで実行される
  • SessionはDevice上に置かれる(これはいろんなデバイスで実行できることを示唆している?)

例をもう一つ見ておきます。

# Variableを作成して0で初期化
state = tf.Variable(0, name="counter")

# stateに1を足していくopの作成
one = tf.constant(1)
new_value = tf.add(state, one)
update = tf.assign(state, new_value)

# 全てのVariableを初期化するopの作成。(これがないと初期化されない)
init_op = tf.initialize_all_variables()

# sessionの実行
with tf.Session() as sess:
  # 初期化を行う
  sess.run(init_op)
  # stateの初期値を表示
  print sess.run(state)
  # updateを実行してstateを表示する
  for _ in range(3):
    sess.run(update)
    print sess.run(state)

# output:

# 0
# 1
# 2
# 3

このように、Graphの定義→sessionの実行がTensorFlowを使う上でのポイントとなります。

MNISTサンプルの実行

次にチュートリアルのMNISTを見るのが良いと思います。MNISTは数字を分類するタスクで機械学習では非常に有名なものの一つです。
このチュートリアルには機械学習の初心者用機械学習のプロ用があるので自分の実力に合わせて選んでください。(両方読むのがおすすめですが)

MNISTサンプルを実行した例は掃いて捨てるほどあるのでここでは飛ばします。良さそうな記事をピックアップしておいたので気になる人は読んでください。

独自のデータセットを使う

TensorFlowには他にもチュートリアルがありますが、MNISTやCIFAR-10などの既に用意されたデータセットを扱う例しか挙げられていません。そこで、独自のデータセットを扱う例を作ってみました。
TensorFlow Mechanics 101によると、グラフの作成はinference(), loss(), training()に分けると良いとのことなので、それに従ってまずはグラフを作っていきます。

inference() ― 予測を行う

ここではプロ用MNISTのCNNを少し改造したCNN(入力28x28x3, 畳み込みとプーリングを2回行ったあと全結合層を通してからソフトマックス関数でlossを計算するネットワーク)を使います。これを改造すればAlexnetでも何でも好きなものを書けるはずです。

なお、CNN自体の説明は行いません。知りたい人は

を読むのがいいと思います。(他に良いサイトがあったら教えてほしいです)

日本語の書籍では、

深層学習: Deep Learning

深層学習: Deep Learning

深層学習 (機械学習プロフェッショナルシリーズ)

深層学習 (機械学習プロフェッショナルシリーズ)

がいいでしょう。

ではまずコードです。

def inference(images_placeholder, keep_prob):
    """ 予測モデルを作成する関数

    引数: 
      images_placeholder: 画像のplaceholder
      keep_prob: dropout率のplaceholder

    返り値:
      y_conv: 各クラスの確率(のようなもの)
    """
    # 重みを標準偏差0.1の正規分布で初期化
    def weight_variable(shape):
      initial = tf.truncated_normal(shape, stddev=0.1)
      return tf.Variable(initial)

    # バイアスを標準偏差0.1の正規分布で初期化
    def bias_variable(shape):
      initial = tf.constant(0.1, shape=shape)
      return tf.Variable(initial)

    # 畳み込み層の作成
    def conv2d(x, W):
      return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

    # プーリング層の作成
    def max_pool_2x2(x):
      return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                            strides=[1, 2, 2, 1], padding='SAME')
    
    # 入力を28x28x3に変形
    x_image = tf.reshape(images_placeholder, [-1, 28, 28, 3])

    # 畳み込み層1の作成
    with tf.name_scope('conv1') as scope:
        W_conv1 = weight_variable([5, 5, 3, 32])
        b_conv1 = bias_variable([32])
        h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    # プーリング層1の作成
    with tf.name_scope('pool1') as scope:
        h_pool1 = max_pool_2x2(h_conv1)
    
    # 畳み込み層2の作成
    with tf.name_scope('conv2') as scope:
        W_conv2 = weight_variable([5, 5, 32, 64])
        b_conv2 = bias_variable([64])
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    # プーリング層2の作成
    with tf.name_scope('pool2') as scope:
        h_pool2 = max_pool_2x2(h_conv2)

    # 全結合層1の作成
    with tf.name_scope('fc1') as scope:
        W_fc1 = weight_variable([7*7*64, 1024])
        b_fc1 = bias_variable([1024])
        h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
        h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
        # dropoutの設定
        h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    # 全結合層2の作成
    with tf.name_scope('fc2') as scope:
        W_fc2 = weight_variable([1024, NUM_CLASSES])
        b_fc2 = bias_variable([NUM_CLASSES])

    # ソフトマックス関数による正規化
    with tf.name_scope('softmax') as scope:
        y_conv=tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)

    # 各ラベルの確率のようなものを返す
    return y_conv

placeholderというのは後からデータの実体が入るオブジェクトみたいなものです。

with tf.name_scope('fc2') as scope:

のようにすることで、後述するTensorBoard上でひとかたまりのノードとして表示されるようになります。

loss() ― 損失関数を計算する

inference()で得た予測から、誤差逆伝搬に使う損失関数を計算するのがloss()です。

def loss(logits, labels):
    """ lossを計算する関数

    引数:
      logits: ロジットのtensor, float - [batch_size, NUM_CLASSES]
      labels: ラベルのtensor, int32 - [batch_size, NUM_CLASSES]

    返り値:
      cross_entropy: 交差エントロピーのtensor, float

    """

    # 交差エントロピーの計算
    cross_entropy = -tf.reduce_sum(labels*tf.log(logits))
    # TensorBoardで表示するよう指定
    tf.scalar_summary("cross_entropy", cross_entropy)
    return cross_entropy

見たら分かると思います。

training() ― 訓練の実行

loss()で得た誤差を逆伝搬してネットワークを訓練します。

def training(loss, learning_rate):
    """ 訓練のopを定義する関数

    引数:
      loss: 損失のtensor, loss()の結果
      learning_rate: 学習係数

    返り値:
      train_step: 訓練のop

    """

    train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss)
    return train_step

AdamOptimizer()を呼び出すだけで全体をうまいこと最適化してくれます。この辺はChainerなどの自動微分とすごく近いものを感じます。

データの読み込み

ここでは自前で用意したJPG画像とラベルの組を読ませることにします。

akari.jpg 0
chinatsu.jpg 1
kyoko.jpg 2
yui.jpg 3

のように画像名とラベルをスペース区切りで用意してtrain.txtなどのファイルに保存しておきます。

TensorFlowにもdecode_jpegのような画像を読み込むための関数が用意されているのですが、使い方があまり説明されていないのでとりあえずOpenCVで読み込むことにしました。

# ファイルを開く
f = open(FLAGS.train, 'r')
# データを入れる配列
train_image = []
train_label = []
for line in f:
    # 改行を除いてスペース区切りにする
    line = line.rstrip()
    l = line.split()
    # データを読み込んで28x28に縮小
    img = cv2.imread(l[0])
    img = cv2.resize(img, (28, 28))
    # 一列にした後、0-1のfloat値にする
    train_image.append(img.flatten().astype(np.float32)/255.0)
    # ラベルを1-of-k方式で用意する
    tmp = np.zeros(NUM_CLASSES)
    tmp[int(l[1])] = 1
    train_label.append(tmp)
# numpy形式に変換
train_image = np.asarray(train_image)
train_label = np.asarray(train_label)
f.close()

一列にする必要はなさそうですが、そうしないと型チェックが通らなかったので仕方なくという感じです。

実際に訓練する

実際に訓練を行う部分がこれです。コメントをたくさんつけておいたので見れば分かると思います。

with tf.Graph().as_default():
    # 画像を入れる仮のTensor
    images_placeholder = tf.placeholder("float", shape=(None, IMAGE_PIXELS))
    # ラベルを入れる仮のTensor
    labels_placeholder = tf.placeholder("float", shape=(None, NUM_CLASSES))
    # dropout率を入れる仮のTensor
    keep_prob = tf.placeholder("float")

    # inference()を呼び出してモデルを作る
    logits = inference(images_placeholder, keep_prob)
    # loss()を呼び出して損失を計算
    loss_value = loss(logits, labels_placeholder)
    # training()を呼び出して訓練
    train_op = training(loss_value, FLAGS.learning_rate)
    # 精度の計算
    acc = accuracy(logits, labels_placeholder)

    # 保存の準備
    saver = tf.train.Saver()
    # Sessionの作成
    sess = tf.Session()
    # 変数の初期化
    sess.run(tf.initialize_all_variables())
    # TensorBoardで表示する値の設定
    summary_op = tf.merge_all_summaries()
    summary_writer = tf.train.SummaryWriter(FLAGS.train_dir, sess.graph_def)
    
    # 訓練の実行
    for step in range(FLAGS.max_steps):
        for i in range(len(train_image)/FLAGS.batch_size):
            # batch_size分の画像に対して訓練の実行
            batch = FLAGS.batch_size*i
            # feed_dictでplaceholderに入れるデータを指定する
            sess.run(train_op, feed_dict={
              images_placeholder: train_image[batch:batch+FLAGS.batch_size],
              labels_placeholder: train_label[batch:batch+FLAGS.batch_size],
              keep_prob: 0.5})

        # 1 step終わるたびに精度を計算する
        train_accuracy = sess.run(acc, feed_dict={
            images_placeholder: train_image,
            labels_placeholder: train_label,
            keep_prob: 1.0})
        print "step %d, training accuracy %g"%(step, train_accuracy)

        # 1 step終わるたびにTensorBoardに表示する値を追加する
        summary_str = sess.run(summary_op, feed_dict={
            images_placeholder: train_image,
            labels_placeholder: train_label,
            keep_prob: 1.0})
        summary_writer.add_summary(summary_str, step)

# 訓練が終了したらテストデータに対する精度を表示
print "test accuracy %g"%sess.run(acc, feed_dict={
    images_placeholder: test_image,
    labels_placeholder: test_label,
    keep_prob: 1.0})

# 最終的なモデルを保存
save_path = saver.save(sess, "model.ckpt")

コード全体

以上で説明したコードを合わせて実際に動くようにしたものが以下のコードです。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import cv2
import numpy as np
import tensorflow as tf
import tensorflow.python.platform

NUM_CLASSES = 2
IMAGE_SIZE = 28
IMAGE_PIXELS = IMAGE_SIZE*IMAGE_SIZE*3

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('train', 'train.txt', 'File name of train data')
flags.DEFINE_string('test', 'test.txt', 'File name of train data')
flags.DEFINE_string('train_dir', '/tmp/data', 'Directory to put the training data.')
flags.DEFINE_integer('max_steps', 200, 'Number of steps to run trainer.')
flags.DEFINE_integer('batch_size', 10, 'Batch size'
                     'Must divide evenly into the dataset sizes.')
flags.DEFINE_float('learning_rate', 1e-4, 'Initial learning rate.')

def inference(images_placeholder, keep_prob):
    """ 予測モデルを作成する関数

    引数: 
      images_placeholder: 画像のplaceholder
      keep_prob: dropout率のplace_holder

    返り値:
      y_conv: 各クラスの確率(のようなもの)
    """
    # 重みを標準偏差0.1の正規分布で初期化
    def weight_variable(shape):
      initial = tf.truncated_normal(shape, stddev=0.1)
      return tf.Variable(initial)

    # バイアスを標準偏差0.1の正規分布で初期化
    def bias_variable(shape):
      initial = tf.constant(0.1, shape=shape)
      return tf.Variable(initial)

    # 畳み込み層の作成
    def conv2d(x, W):
      return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

    # プーリング層の作成
    def max_pool_2x2(x):
      return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                            strides=[1, 2, 2, 1], padding='SAME')
    
    # 入力を28x28x3に変形
    x_image = tf.reshape(images_placeholder, [-1, 28, 28, 3])

    # 畳み込み層1の作成
    with tf.name_scope('conv1') as scope:
        W_conv1 = weight_variable([5, 5, 3, 32])
        b_conv1 = bias_variable([32])
        h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    # プーリング層1の作成
    with tf.name_scope('pool1') as scope:
        h_pool1 = max_pool_2x2(h_conv1)
    
    # 畳み込み層2の作成
    with tf.name_scope('conv2') as scope:
        W_conv2 = weight_variable([5, 5, 32, 64])
        b_conv2 = bias_variable([64])
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    # プーリング層2の作成
    with tf.name_scope('pool2') as scope:
        h_pool2 = max_pool_2x2(h_conv2)

    # 全結合層1の作成
    with tf.name_scope('fc1') as scope:
        W_fc1 = weight_variable([7*7*64, 1024])
        b_fc1 = bias_variable([1024])
        h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
        h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
        # dropoutの設定
        h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    # 全結合層2の作成
    with tf.name_scope('fc2') as scope:
        W_fc2 = weight_variable([1024, NUM_CLASSES])
        b_fc2 = bias_variable([NUM_CLASSES])

    # ソフトマックス関数による正規化
    with tf.name_scope('softmax') as scope:
        y_conv=tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)

    # 各ラベルの確率のようなものを返す
    return y_conv

def loss(logits, labels):
    """ lossを計算する関数

    引数:
      logits: ロジットのtensor, float - [batch_size, NUM_CLASSES]
      labels: ラベルのtensor, int32 - [batch_size, NUM_CLASSES]

    返り値:
      cross_entropy: 交差エントロピーのtensor, float

    """

    # 交差エントロピーの計算
    cross_entropy = -tf.reduce_sum(labels*tf.log(logits))
    # TensorBoardで表示するよう指定
    tf.scalar_summary("cross_entropy", cross_entropy)
    return cross_entropy

def training(loss, learning_rate):
    """ 訓練のOpを定義する関数

    引数:
      loss: 損失のtensor, loss()の結果
      learning_rate: 学習係数

    返り値:
      train_step: 訓練のOp

    """

    train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss)
    return train_step

def accuracy(logits, labels):
    """ 正解率(accuracy)を計算する関数

    引数: 
      logits: inference()の結果
      labels: ラベルのtensor, int32 - [batch_size, NUM_CLASSES]

    返り値:
      accuracy: 正解率(float)

    """
    correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
    tf.scalar_summary("accuracy", accuracy)
    return accuracy

if __name__ == '__main__':
    # ファイルを開く
    f = open(FLAGS.train, 'r')
    # データを入れる配列
    train_image = []
    train_label = []
    for line in f:
        # 改行を除いてスペース区切りにする
        line = line.rstrip()
        l = line.split()
        # データを読み込んで28x28に縮小
        img = cv2.imread(l[0])
        img = cv2.resize(img, (28, 28))
        # 一列にした後、0-1のfloat値にする
        train_image.append(img.flatten().astype(np.float32)/255.0)
        # ラベルを1-of-k方式で用意する
        tmp = np.zeros(NUM_CLASSES)
        tmp[int(l[1])] = 1
        train_label.append(tmp)
    # numpy形式に変換
    train_image = np.asarray(train_image)
    train_label = np.asarray(train_label)
    f.close()

    f = open(FLAGS.test, 'r')
    test_image = []
    test_label = []
    for line in f:
        line = line.rstrip()
        l = line.split()
        img = cv2.imread(l[0])
        img = cv2.resize(img, (28, 28))
        test_image.append(img.flatten().astype(np.float32)/255.0)
        tmp = np.zeros(NUM_CLASSES)
        tmp[int(l[1])] = 1
        test_label.append(tmp)
    test_image = np.asarray(test_image)
    test_label = np.asarray(test_label)
    f.close()
    
    with tf.Graph().as_default():
        # 画像を入れる仮のTensor
        images_placeholder = tf.placeholder("float", shape=(None, IMAGE_PIXELS))
        # ラベルを入れる仮のTensor
        labels_placeholder = tf.placeholder("float", shape=(None, NUM_CLASSES))
        # dropout率を入れる仮のTensor
        keep_prob = tf.placeholder("float")

        # inference()を呼び出してモデルを作る
        logits = inference(images_placeholder, keep_prob)
        # loss()を呼び出して損失を計算
        loss_value = loss(logits, labels_placeholder)
        # training()を呼び出して訓練
        train_op = training(loss_value, FLAGS.learning_rate)
        # 精度の計算
        acc = accuracy(logits, labels_placeholder)

        # 保存の準備
        saver = tf.train.Saver()
        # Sessionの作成
        sess = tf.Session()
        # 変数の初期化
        sess.run(tf.initialize_all_variables())
        # TensorBoardで表示する値の設定
        summary_op = tf.merge_all_summaries()
        summary_writer = tf.train.SummaryWriter(FLAGS.train_dir, sess.graph_def)
        
        # 訓練の実行
        for step in range(FLAGS.max_steps):
            for i in range(len(train_image)/FLAGS.batch_size):
                # batch_size分の画像に対して訓練の実行
                batch = FLAGS.batch_size*i
                # feed_dictでplaceholderに入れるデータを指定する
                sess.run(train_op, feed_dict={
                  images_placeholder: train_image[batch:batch+FLAGS.batch_size],
                  labels_placeholder: train_label[batch:batch+FLAGS.batch_size],
                  keep_prob: 0.5})

            # 1 step終わるたびに精度を計算する
            train_accuracy = sess.run(acc, feed_dict={
                images_placeholder: train_image,
                labels_placeholder: train_label,
                keep_prob: 1.0})
            print "step %d, training accuracy %g"%(step, train_accuracy)

            # 1 step終わるたびにTensorBoardに表示する値を追加する
            summary_str = sess.run(summary_op, feed_dict={
                images_placeholder: train_image,
                labels_placeholder: train_label,
                keep_prob: 1.0})
            summary_writer.add_summary(summary_str, step)

    # 訓練が終了したらテストデータに対する精度を表示
    print "test accuracy %g"%sess.run(acc, feed_dict={
        images_placeholder: test_image,
        labels_placeholder: test_label,
        keep_prob: 1.0})

    # 最終的なモデルを保存
    save_path = saver.save(sess, "model.ckpt")

画像とラベルの組をtrain.txtとtest.txtに入れておいてこれを実行すれば大抵の画像分類タスクはこのコードを少し変えるだけでできるかと思います。

画像に対して予想ラベルを表示する

さっきのコードだと精度を表示するだけで面白くないので画像を与えて予想ラベルを返すプログラムを書きました。

#!/usr/bin/env python
#! -*- coding: utf-8 -*-

import sys
import numpy as np
import tensorflow as tf
import cv2


NUM_CLASSES = 2
IMAGE_SIZE = 28
IMAGE_PIXELS = IMAGE_SIZE*IMAGE_SIZE*3

def inference(images_placeholder, keep_prob):
    """ モデルを作成する関数

    引数: 
      images_placeholder: inputs()で作成した画像のplaceholder
      keep_prob: dropout率のplace_holder

    返り値:
      cross_entropy: モデルの計算結果
    """
    def weight_variable(shape):
      initial = tf.truncated_normal(shape, stddev=0.1)
      return tf.Variable(initial)

    def bias_variable(shape):
      initial = tf.constant(0.1, shape=shape)
      return tf.Variable(initial)

    def conv2d(x, W):
      return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

    def max_pool_2x2(x):
      return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                            strides=[1, 2, 2, 1], padding='SAME')
    
    x_image = tf.reshape(images_placeholder, [-1, 28, 28, 3])

    with tf.name_scope('conv1') as scope:
        W_conv1 = weight_variable([5, 5, 3, 32])
        b_conv1 = bias_variable([32])
        h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    with tf.name_scope('pool1') as scope:
        h_pool1 = max_pool_2x2(h_conv1)
    
    with tf.name_scope('conv2') as scope:
        W_conv2 = weight_variable([5, 5, 32, 64])
        b_conv2 = bias_variable([64])
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    with tf.name_scope('pool2') as scope:
        h_pool2 = max_pool_2x2(h_conv2)

    with tf.name_scope('fc1') as scope:
        W_fc1 = weight_variable([7*7*64, 1024])
        b_fc1 = bias_variable([1024])
        h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
        h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
        h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    with tf.name_scope('fc2') as scope:
        W_fc2 = weight_variable([1024, NUM_CLASSES])
        b_fc2 = bias_variable([NUM_CLASSES])

    with tf.name_scope('softmax') as scope:
        y_conv=tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)

    return y_conv

if __name__ == '__main__':
    test_image = []
    for i in range(1, len(sys.argv)):
        img = cv2.imread(sys.argv[i])
        img = cv2.resize(img, (28, 28))
        test_image.append(img.flatten().astype(np.float32)/255.0)
    test_image = np.asarray(test_image)

    images_placeholder = tf.placeholder("float", shape=(None, IMAGE_PIXELS))
    labels_placeholder = tf.placeholder("float", shape=(None, NUM_CLASSES))
    keep_prob = tf.placeholder("float")

    logits = inference(images_placeholder, keep_prob)
    sess = tf.InteractiveSession()

    saver = tf.train.Saver()
    sess.run(tf.initialize_all_variables())
    saver.restore(sess, "model.ckpt")

    for i in range(len(test_image)):
        pred = np.argmax(logits.eval(feed_dict={ 
            images_placeholder: [test_image[i]],
            keep_prob: 1.0 })[0])
        print pred

それではこれを使って本題のゆるゆり制作会社の識別を行います。

TensorBoard

その前に、TensorFlowのキラーアプリとなりそうな、可視化ツールのTensorBoardについて紹介しておきます。
このソースコードだとTensorBoardのデータは/tmp/dataに保存されているので、

tensorboard --logdir /tmp/data

のようにして起動します。

http://localhost:6006/にアクセスするとリアルタイムの学習状況や、作成したグラフを見ることができます。
f:id:kivantium:20151118230327p:plain:w600f:id:kivantium:20151118230341p:plain:w600f:id:kivantium:20151118230352p:plain:w600
これだけを理由にTensorFlowを使いたくなるほどの便利な機能は今のところないですが、確かに見た目はきれいです。

ゆるゆりの制作会社判定

閑話休題

今回テーマに選んだのは現在アニメ三期が絶賛放送中のゆるゆりです。一期・二期は動画工房が制作していましたが、OVAの「ゆるゆり なちゅやちゅみ!」からはTYOアニメーションズが制作を行っています。キャラデザが少し変わっているので、顔をよく見ればどちらの会社の顔かなんとなく分かります。というわけで友利奈緒判定botのコードを流用すれば簡単に作れそうな気がします。動いている様子がこちら。

ゆるゆり1期, 2期の画像を適当に集めて顔を抜き出したものをラベル0、ゆるゆり3期の画像の顔をラベル1としてさっきのコードで2クラス分類しました。
今回のメインテーマはTensorFlowを使うことなのでデータは100枚くらいしか集めていない適当な判定器ですが、なんとなくそれなりにそれっぽい感じの結果を返している気がしないこともなくはありません。
まだサーバー上に環境を構築していないので、僕のPCが落ちているときは動作しませんが、まあこんなこともできるんだよという感じで眺めてもらえれば。

コードは以下の通りです。

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from tweepy import *
import urllib
import sys
import datetime
import re
from PIL import Image
import cv2
import sys
import os.path
import numpy as np
import skimage
import copy
import dlib
import scipy
import tensorflow as tf

NUM_CLASSES = 2
IMAGE_SIZE = 28
IMAGE_PIXELS = IMAGE_SIZE*IMAGE_SIZE*3

def inference(images_placeholder, keep_prob):
    """ モデルを作成する関数

    引数: 
      images_placeholder: inputs()で作成した画像のplaceholder
      keep_prob: dropout率のplace_holder

    返り値:
      cross_entropy: モデルの計算結果
    """
    def weight_variable(shape):
      initial = tf.truncated_normal(shape, stddev=0.1)
      return tf.Variable(initial)

    def bias_variable(shape):
      initial = tf.constant(0.1, shape=shape)
      return tf.Variable(initial)

    def conv2d(x, W):
      return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

    def max_pool_2x2(x):
      return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                            strides=[1, 2, 2, 1], padding='SAME')
    
    x_image = tf.reshape(images_placeholder, [-1, 28, 28, 3])

    with tf.name_scope('conv1') as scope:
        W_conv1 = weight_variable([5, 5, 3, 32])
        b_conv1 = bias_variable([32])
        h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    with tf.name_scope('pool1') as scope:
        h_pool1 = max_pool_2x2(h_conv1)
    
    with tf.name_scope('conv2') as scope:
        W_conv2 = weight_variable([5, 5, 32, 64])
        b_conv2 = bias_variable([64])
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    with tf.name_scope('pool2') as scope:
        h_pool2 = max_pool_2x2(h_conv2)

    with tf.name_scope('fc1') as scope:
        W_fc1 = weight_variable([7*7*64, 1024])
        b_fc1 = bias_variable([1024])
        h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
        h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
        h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    with tf.name_scope('fc2') as scope:
        W_fc2 = weight_variable([1024, NUM_CLASSES])
        b_fc2 = bias_variable([NUM_CLASSES])

    with tf.name_scope('softmax') as scope:
        y_conv=tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)

    return y_conv

# mitra_sun22のログイン情報
f = open('config.txt')
data = f.read()
f.close()
lines = data.split('\n')

# 顔検出器
detector = dlib.simple_object_detector("detector.svm")

images_placeholder = tf.placeholder("float", shape=(None, IMAGE_PIXELS))
labels_placeholder = tf.placeholder("float", shape=(None, NUM_CLASSES))
keep_prob = tf.placeholder("float")

logits = inference(images_placeholder, keep_prob)
sess = tf.InteractiveSession()

saver = tf.train.Saver()
sess.run(tf.initialize_all_variables())
saver.restore(sess, "model.ckpt")

# エンコード設定
reload(sys)
sys.setdefaultencoding('utf-8')

def get_oauth():
	consumer_key = lines[0]
	consumer_secret = lines[1]
	access_key = lines[2]
	access_secret = lines[3]
	auth = OAuthHandler(consumer_key, consumer_secret)
	auth.set_access_token(access_key, access_secret)
	return auth

class StreamListener(StreamListener):
    # ツイートされるたびにここが実行される
    def on_status(self, status):
        if status.in_reply_to_screen_name=='mitra_sun22':
            if status.entities.has_key('media') :
                text = re.sub(r'@mitra_sun22 ', '', status.text)
                text = re.sub(r'(https?|ftp)(://[\w:;/.?%#&=+-]+)', '', text)
                medias = status.entities['media']
                m =  medias[0]
                media_url = m['media_url']
                print media_url
                now = datetime.datetime.now()
                time = now.strftime("%H%M%S")
                filename = '{}.jpg'.format(time)
                try:
                    urllib.urlretrieve(media_url, filename)
                except IOError:
                    print "保存に失敗しました"

                frame = cv2.imread(filename)
                img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                #顔の検出
                dets = detector(img)
                height, width = img.shape[:2]
                flag = True
                #顔が見つかった場合は顔領域だけについて判定
                if len(dets) > 0:
                    flag = False
                    d = dets[0] # 一番大きいものだけを調べる仕様にした
                    # 顔の領域がおかしい場合のチェック
                    if d.top()<0 or d.bottom()>height or  d.left()<0 or d.right()>width:
                        flag = True
                    else:
                        image = frame[d.top():d.bottom(), d.left():d.right()]
                        margin = min((d.bottom()-d.top())/4, d.top(), height-d.bottom(), d.left(), width-d.right())
                        icon = frame[d.top()-margin:d.bottom()+margin, d.left()-margin:d.right()+margin]
                        #顔部分を白枠で囲む
                        cv2.rectangle(frame, (d.left(), d.top()), (d.right(), d.bottom()), (255, 255, 255), 2)
                        cv2.imwrite(filename, frame)

                if flag: #顔が見つからない場合には全体について判定する
                    image = frame
                    cv2.imwrite("original.jpg", image)
                # 形式を変換
                img = cv2.resize(img.copy(), (28, 28))
                ximage = img.flatten().astype(np.float32)/255.0

                pred = np.argmax(logits.eval(feed_dict={ 
                    images_placeholder: [ximage],
                    keep_prob: 1.0 })[0])

                if pred==0: #動画工房の場合
                    print "動画工房です"
                    message = '.@'+status.author.screen_name+' 動画工房です'
                else:
                    print "動画工房ではありません"
                    message = '.@'+status.author.screen_name+' 動画工房ではありません'
                message = message.decode("utf-8")
                try:
                    #画像をつけてリプライ
                    api.update_with_media(filename, status=message, in_reply_to_status_id=status.id)
                except TweepError, e:
                    print "error response code: " + str(e.response.status)
                    print "error message: " + str(e.response.reason)

# streamingを始めるための準備
auth = get_oauth()
api = API(auth)
stream = Stream(auth, StreamListener(), secure=True)
print "Start Streaming!"
stream.userstream()

というわけでTensorFlowを使って何かやってみるお話でした。


以下アフィリンク。

ゆるゆり (1) 新装版 (IDコミックス 百合姫コミックス)

ゆるゆり (1) 新装版 (IDコミックス 百合姫コミックス)

大室家: 1 (百合姫コミックス)

大室家: 1 (百合姫コミックス)