iTAC_Technical_Documents

アイタックソリューションズ株式会社

ブログ名

【第9回】簡易体積算出のAugmentationによるデータ拡張と学習プログラム機能追加

Augmentationによるデータ拡張

学習・検証用データが100枚程度だと不足するため、データ拡張(Augmentation)を行います。
色合いと明暗、左右反転の組み合わせを10通り作成して全画像に適応した結果、100枚の画像データを1000に拡張することができました。
これでクロスバリデーションを行っても問題無いデータ量を確保することができました。
以下はプログラムソースコードです。

Augmentationによるデータ拡張のソースコード

python:augmentation.py
import keras
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import csv

from keras.utils import np_utils
from keras.preprocessing.image import array_to_img, img_to_array, load_img, ImageDataGenerator

def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'):
    return [os.path.join(root, f)
            for root, _, files in os.walk(directory) for f in files
            if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())]

X = []
Y = []
save_path = './test_dataset/'
path = './dataset/'

picture_name = []
for picture in list_pictures('./dataset/'):
    picture_name.append(picture[picture.find(path)+len(path):picture.find('.jpg')])
    img = img_to_array(load_img(picture, target_size=(224,224)))
    X.append(img)

with open('DB.csv',encoding="utf-8_sig") as f:
    reader = csv.reader(f)
    for row in reader:Y.append(row)

train_datagen = ImageDataGenerator(rescale=1.0 / 255, channel_shift_range=5., brightness_range=[0.3, 1.0], horizontal_flip=True)
Y = np.array(Y)
X = np.asarray(X)
print(X.shape)
output_Y = []

for j in range(100):
    print('Generate:'+str(int(picture_name[j])-1))
    genperimg = 10
    a = X[j,:,:,:]
    x = a[np.newaxis]
    y0 = Y[int(picture_name[j])-1,:]
    y  = y0[np.newaxis]

    train_generator = train_datagen.flow(x, batch_size=1, save_to_dir=save_path, save_prefix=str(int(Y[int(picture_name[j])-1,0])), save_format='jpg')

    for i in range(genperimg):
        next(train_generator)
        if j == 0 and i == 0:
            output_Y = y
        else:
            output_Y = np.concatenate([output_Y, y])

print(output_Y)
with open('BD_ex.csv', 'w') as f:
    writer = csv.writer(f)
    writer.writerow(output_Y)
            
print("end of script")

学習プログラム機能の追加

前々回に作成した学習プログラムではクロスバリデーションとハイパーパラメータチューニングが実装されていなかったため、これらを追加しました。
ハイパーパラメータチューニングはベイズ最適化アルゴリズムによるOptunaというパッケージを用いて行います。
以下にそれぞれのプログラム全体を示します。

python:train.py
import keras
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import csv
from sklearn.model_selection import KFold

from keras.utils import np_utils
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Activation, Flatten
from keras.preprocessing.image import array_to_img, img_to_array, load_img
from keras.applications import ResNet50
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import optimizers
from sklearn.model_selection import train_test_split

def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'):
    return [os.path.join(root, f)
            for root, _, files in os.walk(directory) for f in files
            if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())]

X = []
Y = np.zeros((1000,3))
Y_pre = []

hidden_neurons = 1000
out_neurons = 3
batch_size = 16
nb_epochs = 2
fold_num = 5

es = EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=1, mode='auto')

picture_name = []
path = './test_dataset/'
for picture in list_pictures(path):
    picture_name.append(float(picture[picture.find(path)+len(path):picture.find('_0_')]))
    img = img_to_array(load_img(picture, target_size=(224,224)))
    X.append(img)

with open('DB.csv',encoding="utf-8_sig") as f:
    for row in csv.reader(f, quoting=csv.QUOTE_NONNUMERIC):
        Y_pre.append(row)

Y_pre = np.array(Y_pre)
for i,name in enumerate(picture_name):
    Y[i,:]=Y_pre[np.where(Y_pre==name)[0],1:4]
X = np.asarray(X)
X = X / 255.0

print(X.shape)
print(Y.shape)

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=0, shuffle=False)
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

kf = KFold(n_splits=fold_num, shuffle=True)
cvscores = []
count = 0

for train, test in kf.split(x_train):
    modelCheckpoint = ModelCheckpoint(filepath = 'Checkpoint.h5',
                                        monitor='val_loss',
                                        verbose=1,
                                        save_best_only=True,
                                        save_weights_only=False,
                                        mode='min', 
                                        period=1)
    input_tensor = Input(shape=(224, 224, 3))
    resnet50 = ResNet50(include_top=False, weights='imagenet', input_tensor=input_tensor)
    top_model = Sequential()
    top_model.add(Flatten(input_shape=resnet50.output_shape[1:]))
    top_model.add(Dense(hidden_neurons))
    top_model.add(Activation("sigmoid"))
    top_model.add(Dense(out_neurons))
    top_model.add(Activation("linear"))
    #top_model.summary()

    model = Model(input=resnet50.input, output=top_model(resnet50.output))
    model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer="adam")
    #model.summary()
    result = model.fit(x_train[train],
                       y_train[train],
                       batch_size=batch_size,
                       epochs=nb_epochs,
                       verbose=1,
                       validation_data=(x_train[test], y_train[test]),
                       shuffle=True,
                       callbacks=[modelCheckpoint,es])
    
    model.load_weights('Checkpoint.h5')
    scores = model.evaluate(x_train[test], y_train[test], verbose=0)
    print("%s: %.2f%%" % (model.metrics_names[1], scores[1]*100))
    cvscores.append(scores[1] * 100)
        
    result.history.keys() # ヒストリデータのラベルを見てみる]
    ep =len(result.history['acc'])
    plt.plot(range(1, ep+1), result.history['acc'], label="training")
    plt.plot(range(1, ep+1), result.history['val_acc'], label="validation")
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()
    plt.plot(range(1, ep+1), result.history['loss'], label="training")
    plt.plot(range(1, ep+1), result.history['val_loss'], label="validation")
    plt.xlabel('Epochs')
    plt.ylabel('loss')
    plt.legend()
    plt.show()
    
print("%.2f%% (+/- %.2f%%)" % (np.mean(cvscores), np.std(cvscores)))

#可視化
#model.summary()
evascores=[]
# モデル評価
for eva in range(1,6):
    #print(eva)
    model.load_weights('Checkpoint_'+ str(eva) +'.h5')
    loss, accuracy = model.evaluate(x_test, y_test, verbose=0)
    evascores.append(scores[1] * 100)
print("%.2f%% (+/- %.2f%%)" % (np.mean(evascores), np.std(evascores)))
print("end of script")
python:turning.py
import keras
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import csv
from sklearn.model_selection import KFold
import keras.backend as K
import optuna

from keras.utils import np_utils
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Activation, Flatten
from keras.preprocessing.image import array_to_img, img_to_array, load_img
from keras.applications import ResNet50
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import optimizers
from sklearn.model_selection import train_test_split

def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'):
    return [os.path.join(root, f)
            for root, _, files in os.walk(directory) for f in files
            if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())]

def ResNet_dance(activation = "sigmoid",optimizer="adam",hidden_neurons = 1000,out_neurons = 3):
    input_tensor = Input(shape=(224, 224, 3))
    resnet50 = ResNet50(include_top=False, weights='imagenet', input_tensor=input_tensor)
    top_model = Sequential()
    top_model.add(Flatten(input_shape=resnet50.output_shape[1:]))
    top_model.add(Dense(hidden_neurons))
    top_model.add(Activation(activation))
    top_model.add(Dense(out_neurons))
    top_model.add(Activation("linear"))
    #top_model.summary()

    model = Model(input=resnet50.input, output=top_model(resnet50.output))
    model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizer)
    #model.summary()
    
    return model

def objective(trial):

    K.clear_session()

    #最適化するパラメータの設定

    #中間層1のユニット数
    hidden_neurons = int(trial.suggest_discrete_uniform("hidden_neurons", 100, 2000, 50))

    #optimizer
    optimizer = trial.suggest_categorical("optimizer", ["sgd", "adam", "rmsprop"])
    
    activation = trial.suggest_categorical("activation", ["linear", sigmoid"])

    model = ResNet_dance(activation = activation ,optimizer=optimizer,hidden_neurons,3)

    history = model.fit(x_train,
                       y_train,
                       batch_size=batch_size,
                       epochs=nb_epochs,
                       verbose=1,
                       validation_split=0.2,
                       shuffle=True)

    #検証用データに対する正答率が最大となるハイパーパラメータを求める
    return 1 - history.history["val_acc"][-1]

X = []
Y = np.zeros((1000,3))
Y_pre = []

hidden_neurons = 1000
out_neurons = 3
batch_size = 16
nb_epochs = 2
fold_num = 5

es = EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=1, mode='auto')
modelCheckpoint = ModelCheckpoint(filepath = 'Checkpoint.h5',
                                  monitor='val_loss',
                                  verbose=1,
                                  save_best_only=True,
                                  save_weights_only=False,
                                  mode='min',
                                  period=1)
picture_name = []
path = './test_dataset/'
for picture in list_pictures(path):
    picture_name.append(float(picture[picture.find(path)+len(path):picture.find('_0_')]))
    img = img_to_array(load_img(picture, target_size=(224,224)))
    X.append(img)

with open('DB.csv',encoding="utf-8_sig") as f:
    for row in csv.reader(f, quoting=csv.QUOTE_NONNUMERIC):
        Y_pre.append(row)

Y_pre = np.array(Y_pre)
for i,name in enumerate(picture_name):
    Y[i,:]=Y_pre[np.where(Y_pre==name)[0],1:4]
X = np.asarray(X)
X = X / 255.0

print(X.shape)
print(Y.shape)

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=0, shuffle=False)
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

study = optuna.create_study()
study.optimize(objective, n_trials=300)
print("End!!")
print("All trials")
print(study.trials)
print("Best Parameters")
print(study.best_params)
print("Best Value")
print(study.best_value)

print("end of script")

Trashの削除

余談ですが、AWSでは削除したデータは~/.local/share/Trash/files/*に保存されるため、EC2のストレージ容量を圧迫します。
定期的に削除すれば容量は解放されますが、容量はある程度拡張しておくとよいかもしれません。

次回の予定

  • 次回は機械学習とパラメータのチューニングを行い、その後は精度を評価したいと思います。

次の記事へ

前の記事へ 戻る