iTAC_Technical_Documents

アイタックソリューションズ株式会社

ブログ名

【第14回】簡易体積算出アルゴリズム 入出力に関する検討1

今回の概要

  • 出力値を体積に変更(VGG19_train1.py)
  • 回帰値をone-hot vecに変更(VGG_train2.py)

作業内容

前回の考察を基に改善の戦略を立てましたので、それを順次実行していきます。

出力値を体積に変更

今まで幅と奥行、高さの3つを出力として扱っていましたが、累積誤差を考えて直接体積を出力値として扱うようにします。
交差検証により出力されたloss配列の平均が一番低くなるように、optuaを用いたプログラムを作成しました。
ソースコードを以下に示します(VGG19_train1.py)。

python:VGG19_train1.py
import keras
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import csv
from sklearn.model_selection import KFold
import keras.backend as K
import optuna

from keras.utils import np_utils
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Activation, Flatten
from keras.preprocessing.image import array_to_img, img_to_array, load_img
from keras.applications import ResNet50, VGG19
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import optimizers
from sklearn.model_selection import train_test_split

import tensorflow as tf

batch_size = 8
nb_epochs = 1000
fold_num = 5

def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'):
    return [os.path.join(root, f)
            for root, _, files in os.walk(directory) for f in files
            if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())]

def ResNet_dance(activation = "sigmoid",optimizer="adam",hidden_neurons1 = 1000,out_neurons = 1):
    gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
    sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
    K.set_session(sess)

    input_tensor = Input(shape=(224, 224, 3))
    resnet50 = VGG19(include_top=False, weights='imagenet', input_tensor=input_tensor)
    top_model = Sequential()
    top_model.add(Flatten(input_shape=resnet50.output_shape[1:]))
    top_model.add(Dropout(0.5))
    top_model.add(Dense(hidden_neurons1))
    top_model.add(Activation(activation))
    top_model.add(Dropout(0.5))
    top_model.add(Dense(out_neurons))
    top_model.add(Activation("linear"))
    #top_model.summary()

    model = Model(input=resnet50.input, output=top_model(resnet50.output))
    model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizer)
    #model.summary()

    return model

def objective(trial):

    #最適化するパラメータの設定

    #中間層1のユニット数
    hidden_neurons1 = int(trial.suggest_discrete_uniform("hidden_neurons1", 100, 5000, 100))

    #optimizer
    optimizer = trial.suggest_categorical("optimizer", ["sgd", "adam", "rmsprop"])

    activation = trial.suggest_categorical("activation", ["linear", "sigmoid"])

    loss_all = []

    for train, test in kf.split(x_train):

        es = EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=1, mode='auto')
        modelCheckpoint = ModelCheckpoint(filepath = 'Checkpoint.h5',
                                              monitor='val_loss',
                                              verbose=1,
                                              save_best_only=True,
                                              save_weights_only=False,
                                              mode='min',
                                              period=1)

        model = ResNet_dance(activation = activation ,optimizer=optimizer,hidden_neurons1 = hidden_neurons1,out_neurons = 1)

        history = model.fit(x_train[train],
                           y_train[train],
                           batch_size=batch_size,
                           epochs=nb_epochs,
                           verbose=1,
                           validation_data=(x_train[test], y_train[test]),
                           shuffle=True,
                           callbacks=[modelCheckpoint,es])

        #検証用データに対する正答率が最大となるハイパーパラメータを求める
        loss = history.history["val_loss"]
        loss.sort()
        loss_all.append(loss[0])

    print(loss_all)
    print("%.2f%% (+/- %.2f%%)" % (np.mean(loss_all), np.std(loss_all)))

    return np.mean(loss_all)

X = []
Y = np.zeros((1000,1))
Y_pre = []

picture_name = []
path = './test_dataset/'
for picture in list_pictures(path):
    picture_name.append(float(picture[picture.find(path)+len(path):picture.find('_0_')]))
    img = img_to_array(load_img(picture, target_size=(224,224)))
    X.append(img)

with open('DB.csv',encoding="utf-8_sig") as f:
    for row in csv.reader(f, quoting=csv.QUOTE_NONNUMERIC):
        Y_pre.append(row)

Y_pre = np.array(Y_pre)
for i,name in enumerate(picture_name):
    Y[i,:]=Y_pre[np.where(Y_pre==name)[0],4]
X = np.asarray(X)
X = X / 255.0

print(X.shape)
print(Y.shape)

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0, random_state=0, shuffle=False)
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

kf = KFold(n_splits=fold_num, shuffle=True)

study = optuna.create_study()
study.optimize(objective, n_trials=50)
print("End!!")
print("All trials")
print(study.trials)
print("Best Parameters")
print(study.best_params)
print("Best Value")
print(study.best_value)

print("end of script")

しかし、このまま実行してもメモリに負荷がかかってしまい、使用環境によっては実行できないことがあります。
これを解決するには、vgg19をflattenしたものを入力値としてcsvに保存して、学習時に全結合のみ読み込み学習させるのが良いでしょう。 メモリを節約することができます。
画像からVGG19を通し、(25088,1)を取得して保存するソースコードを以下に示します(img2tensor.py)。

python:img2tensor.py
import tensorflow as tf
import keras.backend as K
import numpy as np
import os
import re
import csv

from keras.models import Sequential, Model
from keras.layers import Input, Flatten
from keras.applications import VGG19
from keras.preprocessing.image import img_to_array, load_img

def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'):
    return [os.path.join(root, f)
            for root, _, files in os.walk(directory) for f in files
            if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())]

X = []
Y = np.zeros((1000,1))
Y_pre = []

picture_name = []
path = './test_dataset/'
for picture in list_pictures(path):
    picture_name.append(float(picture[picture.find(path)+len(path):picture.find('_0_')]))
    img = img_to_array(load_img(picture, target_size=(224,224)))
    X.append(img)

with open('DB.csv',encoding="utf-8_sig") as f:
    for row in csv.reader(f, quoting=csv.QUOTE_NONNUMERIC):
        Y_pre.append(row)

Y_pre = np.array(Y_pre)
for i,name in enumerate(picture_name):
    Y[i,:]=Y_pre[np.where(Y_pre==name)[0],4]
X = np.asarray(X)
X = X / 255.0

print(X.shape)
print(Y.shape)

input_tensor = Input(shape=(224, 224, 3))
vgg19 = VGG19(include_top=True, weights='imagenet', input_tensor=input_tensor)

model = Model(inputs=vgg19.input, outputs=vgg19.get_layer("flatten").output)

model.summary()

predict = model.predict(X)
print(predict.shape)

Y_post = []
X_post = []

with open('X_data.csv', 'w', newline="") as f:
    writer = csv.writer(f)
    writer.writerows(predict)

with open('Y_data.csv', 'w', newline="") as f:
    writer = csv.writer(f)
    writer.writerows(Y)

with open('X_data.csv') as f:
    for row in csv.reader(f):
        X_post.append(row)

with open('Y_data.csv') as f:
    for row in csv.reader(f):
        Y_post.append(row)

X_post = np.array(X_post)
Y_post = np.array(Y_post)

print(X_post.shape)
print(Y_post.shape)

optuaを用いたプログラムもcsvを読み込んで学習するよう、チューニングを行いながら修正しました。 ソースコードを以下に示します(VGG19_train1_2.py)。

python:VGG19_train1_2.py
import keras
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import csv
from sklearn.model_selection import KFold
import keras.backend as K
import optuna

from keras.utils import np_utils
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Activation, Flatten
from keras.preprocessing.image import array_to_img, img_to_array, load_img
from keras.applications import ResNet50, VGG19
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import optimizers
from sklearn.model_selection import train_test_split

import tensorflow as tf

batch_size = 32
nb_epochs = 1000
fold_num = 5

gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
K.set_session(sess)

def ResNet_dance(activation = "sigmoid",optimizer="adam",hidden_neurons1 = 1000,hidden_neurons2 = 1000,out_neurons = 1):
    model = Sequential()
    model.add(Dense(hidden_neurons1, input_shape= (25088, )))
    model.add(Activation(activation))
    model.add(Dropout(0.3))
    model.add(Dense(hidden_neurons2))
    model.add(Activation(activation))
    model.add(Dropout(0.3))
    model.add(Dense(out_neurons))
    model.add(Activation("linear"))

    if optimizer == "sgd":
        model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizers.SGD(lr=0.0001))
    elif optimizer == "adam":
        model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizers.Adam(lr=0.0001))
    elif optimizer == "rmsprop":
        model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizers.RMSprop(lr=0.0001))

    return model

def objective(trial):

    #最適化するパラメータの設定

    #中間層1のユニット数
    hidden_neurons1 = int(trial.suggest_discrete_uniform("hidden_neurons1", 100, 5000, 100))

    hidden_neurons2 = int(trial.suggest_discrete_uniform("hidden_neurons2", 100, 5000, 100))

    #optimizer
    optimizer = trial.suggest_categorical("optimizer", ["sgd", "adam", "rmsprop"])

    activation = trial.suggest_categorical("activation", ["linear", "sigmoid"])

    loss_all = []

    for train, test in kf.split(X):

        es = EarlyStopping(monitor='val_loss', min_delta=0, patience=20, verbose=1, mode='auto')
        # modelCheckpoint = ModelCheckpoint(filepath = 'Checkpoint.h5',
        #                                       monitor='val_loss',
        #                                       verbose=1,
        #                                       save_best_only=True,
        #                                       save_weights_only=False,
        #                                       mode='min',
        #                                       period=1)

        model = ResNet_dance(activation = activation ,optimizer=optimizer,hidden_neurons1 = hidden_neurons1,hidden_neurons2 = hidden_neurons2,out_neurons = 1)

        history = model.fit(X[train],
                           Y[train],
                           batch_size=batch_size,
                           epochs=nb_epochs,
                           verbose=1,
                           validation_data=(X[test], Y[test]),
                           shuffle=True,
                           callbacks=[es])

        #検証用データに対する正答率が最大となるハイパーパラメータを求める
        loss = history.history["val_loss"]
        loss.sort()
        loss_all.append(loss[0])
        print("Loss"+str(loss[0]))

    print(loss_all)
    print("%.2f%% (+/- %.2f%%)" % (np.mean(loss_all), np.std(loss_all)))

    return np.mean(loss_all)

X = []
Y = []

with open('X_data.csv') as f:
    for row in csv.reader(f):
        X.append(row)

with open('Y_data.csv') as f:
    for row in csv.reader(f):
        Y.append(row)

X = np.array(X)
Y = np.array(Y)

print(X.shape)
print(Y.shape)

kf = KFold(n_splits=fold_num, shuffle=True)

study = optuna.create_study()
study.optimize(objective, n_trials=50)
print("End!!")
print("All trials")
print(study.trials)
print("Best Parameters")
print(study.best_params)
print("Best Value")
print(study.best_value)

print("end of script")

回帰値→one-hot vecに変更

上記では回帰値で精度と損失を見ていましたが、このようなニューラルネットワークにおいてロバスト性を上げるには、one-hot vecを用いることが有用であると考えられます。
そこで、上記のプログラムにone-hot vecの機能を付与しました。
これにより、認識させるサイズが制限されるというデメリットがあるため、今後改善していきます。
以下にソースコードを示します(VGG_train2.py)。

python:VGG_train2.py
import keras
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import csv
from sklearn.model_selection import KFold
import keras.backend as K
import optuna

from keras.utils import np_utils
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Activation, Flatten
from keras.preprocessing.image import array_to_img, img_to_array, load_img
from keras.applications import ResNet50, VGG19
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import optimizers
from sklearn.model_selection import train_test_split

import tensorflow as tf

batch_size = 8
nb_epochs = 1000
fold_num = 5

gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
K.set_session(sess)

def ResNet_dance(activation = "sigmoid",optimizer="adam",hidden_neurons1 = 1000,hidden_neurons2 = 1000,out_neurons = 200):
    model = Sequential()
    model.add(Dense(hidden_neurons1, input_shape= (25088, )))
    model.add(Activation(activation))
    model.add(Dropout(0.3))
    model.add(Dense(hidden_neurons2))
    model.add(Activation(activation))
    model.add(Dropout(0.3))
    model.add(Dense(out_neurons))
    model.add(Activation("softmax"))

    if optimizer == "sgd":
        model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizers.SGD(lr=0.0001))
    elif optimizer == "adam":
        model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizers.Adam(lr=0.0001))
    elif optimizer == "rmsprop":
        model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizers.RMSprop(lr=0.0001))

    return model

def objective(trial):

    #最適化するパラメータの設定

    #中間層1のユニット数
    hidden_neurons1 = int(trial.suggest_discrete_uniform("hidden_neurons1", 100, 5000, 100))

    hidden_neurons2 = int(trial.suggest_discrete_uniform("hidden_neurons2", 100, 5000, 100))

    #optimizer
    optimizer = trial.suggest_categorical("optimizer", ["sgd", "adam", "rmsprop"])

    activation = trial.suggest_categorical("activation", ["linear", "sigmoid"])

    loss_all = []

    for train, test in kf.split(X):

        es = EarlyStopping(monitor='val_loss', min_delta=0, patience=20, verbose=1, mode='auto')
        # modelCheckpoint = ModelCheckpoint(filepath = 'Checkpoint.h5',
        #                                       monitor='val_loss',
        #                                       verbose=1,
        #                                       save_best_only=True,
        #                                       save_weights_only=False,
        #                                       mode='min',
        #                                       period=1)

        model = ResNet_dance(activation = activation ,
                             optimizer=optimizer,
                             hidden_neurons1 = hidden_neurons1,
                             hidden_neurons2 = hidden_neurons2,
                             out_neurons = 200)

        history = model.fit(X[train],
                           Y[train],
                           batch_size=batch_size,
                           epochs=nb_epochs,
                           verbose=1,
                           validation_data=(X[test], Y[test]),
                           shuffle=True,
                           callbacks=[es])

        #検証用データに対する正答率が最大となるハイパーパラメータを求める
        loss = history.history["val_loss"]
        loss.sort()
        loss_all.append(loss[0])

    print(loss_all)
    print("%.2f%% (+/- %.2f%%)" % (np.mean(loss_all), np.std(loss_all)))

    return np.mean(loss_all)

X = []
Y_pre = []
Y = np.zeros((1000,200))

with open('X_data.csv') as f:
    for row in csv.reader(f):
        X.append(row)

with open('Y_data.csv') as f:
    for row in csv.reader(f):
        Y_pre.append(row)

X = np.array(X)
Y_pre = np.array(Y_pre)

for i,num in enumerate(Y_pre):
    Y[i,int(num/0.005)-1]=1

print(X.shape)
print(Y.shape)

kf = KFold(n_splits=fold_num, shuffle=True)

study = optuna.create_study()
study.optimize(objective, n_trials=50)
print("End!!")
print("All trials")
print(study.trials)
print("Best Parameters")
print(study.best_params)
print("Best Value")
print(study.best_value)

print("end of script")

次回の予定

入力時に画像サイズの情報を付与するようにして、アンサンブル学習を行います。


次の記事へ

前の記事へ 戻る