Augmentationによるデータ拡張
学習・検証用データが100枚程度だと不足するため、データ拡張(Augmentation)を行います。
色合いと明暗、左右反転の組み合わせを10通り作成して全画像に適応した結果、100枚の画像データを1000に拡張することができました。
これでクロスバリデーションを行っても問題無いデータ量を確保することができました。
以下はプログラムソースコードです。
Augmentationによるデータ拡張のソースコード
python:augmentation.py import keras import numpy as np import matplotlib.pyplot as plt import os import re import csv from keras.utils import np_utils from keras.preprocessing.image import array_to_img, img_to_array, load_img, ImageDataGenerator def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'): return [os.path.join(root, f) for root, _, files in os.walk(directory) for f in files if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())] X = [] Y = [] save_path = './test_dataset/' path = './dataset/' picture_name = [] for picture in list_pictures('./dataset/'): picture_name.append(picture[picture.find(path)+len(path):picture.find('.jpg')]) img = img_to_array(load_img(picture, target_size=(224,224))) X.append(img) with open('DB.csv',encoding="utf-8_sig") as f: reader = csv.reader(f) for row in reader:Y.append(row) train_datagen = ImageDataGenerator(rescale=1.0 / 255, channel_shift_range=5., brightness_range=[0.3, 1.0], horizontal_flip=True) Y = np.array(Y) X = np.asarray(X) print(X.shape) output_Y = [] for j in range(100): print('Generate:'+str(int(picture_name[j])-1)) genperimg = 10 a = X[j,:,:,:] x = a[np.newaxis] y0 = Y[int(picture_name[j])-1,:] y = y0[np.newaxis] train_generator = train_datagen.flow(x, batch_size=1, save_to_dir=save_path, save_prefix=str(int(Y[int(picture_name[j])-1,0])), save_format='jpg') for i in range(genperimg): next(train_generator) if j == 0 and i == 0: output_Y = y else: output_Y = np.concatenate([output_Y, y]) print(output_Y) with open('BD_ex.csv', 'w') as f: writer = csv.writer(f) writer.writerow(output_Y) print("end of script")
学習プログラム機能の追加
前々回に作成した学習プログラムではクロスバリデーションとハイパーパラメータチューニングが実装されていなかったため、これらを追加しました。
ハイパーパラメータチューニングはベイズ最適化アルゴリズムによるOptunaというパッケージを用いて行います。
以下にそれぞれのプログラム全体を示します。
python:train.py import keras import numpy as np import matplotlib.pyplot as plt import os import re import csv from sklearn.model_selection import KFold from keras.utils import np_utils from keras.models import Sequential, Model from keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Activation, Flatten from keras.preprocessing.image import array_to_img, img_to_array, load_img from keras.applications import ResNet50 from keras.callbacks import ModelCheckpoint, EarlyStopping from keras import optimizers from sklearn.model_selection import train_test_split def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'): return [os.path.join(root, f) for root, _, files in os.walk(directory) for f in files if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())] X = [] Y = np.zeros((1000,3)) Y_pre = [] hidden_neurons = 1000 out_neurons = 3 batch_size = 16 nb_epochs = 2 fold_num = 5 es = EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=1, mode='auto') picture_name = [] path = './test_dataset/' for picture in list_pictures(path): picture_name.append(float(picture[picture.find(path)+len(path):picture.find('_0_')])) img = img_to_array(load_img(picture, target_size=(224,224))) X.append(img) with open('DB.csv',encoding="utf-8_sig") as f: for row in csv.reader(f, quoting=csv.QUOTE_NONNUMERIC): Y_pre.append(row) Y_pre = np.array(Y_pre) for i,name in enumerate(picture_name): Y[i,:]=Y_pre[np.where(Y_pre==name)[0],1:4] X = np.asarray(X) X = X / 255.0 print(X.shape) print(Y.shape) x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=0, shuffle=False) print(x_train.shape, x_test.shape, y_train.shape, y_test.shape) kf = KFold(n_splits=fold_num, shuffle=True) cvscores = [] count = 0 for train, test in kf.split(x_train): modelCheckpoint = ModelCheckpoint(filepath = 'Checkpoint.h5', monitor='val_loss', verbose=1, save_best_only=True, save_weights_only=False, mode='min', period=1) input_tensor = Input(shape=(224, 224, 3)) resnet50 = ResNet50(include_top=False, weights='imagenet', input_tensor=input_tensor) top_model = Sequential() top_model.add(Flatten(input_shape=resnet50.output_shape[1:])) top_model.add(Dense(hidden_neurons)) top_model.add(Activation("sigmoid")) top_model.add(Dense(out_neurons)) top_model.add(Activation("linear")) #top_model.summary() model = Model(input=resnet50.input, output=top_model(resnet50.output)) model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer="adam") #model.summary() result = model.fit(x_train[train], y_train[train], batch_size=batch_size, epochs=nb_epochs, verbose=1, validation_data=(x_train[test], y_train[test]), shuffle=True, callbacks=[modelCheckpoint,es]) model.load_weights('Checkpoint.h5') scores = model.evaluate(x_train[test], y_train[test], verbose=0) print("%s: %.2f%%" % (model.metrics_names[1], scores[1]*100)) cvscores.append(scores[1] * 100) result.history.keys() # ヒストリデータのラベルを見てみる] ep =len(result.history['acc']) plt.plot(range(1, ep+1), result.history['acc'], label="training") plt.plot(range(1, ep+1), result.history['val_acc'], label="validation") plt.xlabel('Epochs') plt.ylabel('Accuracy') plt.legend() plt.show() plt.plot(range(1, ep+1), result.history['loss'], label="training") plt.plot(range(1, ep+1), result.history['val_loss'], label="validation") plt.xlabel('Epochs') plt.ylabel('loss') plt.legend() plt.show() print("%.2f%% (+/- %.2f%%)" % (np.mean(cvscores), np.std(cvscores))) #可視化 #model.summary() evascores=[] # モデル評価 for eva in range(1,6): #print(eva) model.load_weights('Checkpoint_'+ str(eva) +'.h5') loss, accuracy = model.evaluate(x_test, y_test, verbose=0) evascores.append(scores[1] * 100) print("%.2f%% (+/- %.2f%%)" % (np.mean(evascores), np.std(evascores))) print("end of script")
python:turning.py import keras import numpy as np import matplotlib.pyplot as plt import os import re import csv from sklearn.model_selection import KFold import keras.backend as K import optuna from keras.utils import np_utils from keras.models import Sequential, Model from keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Activation, Flatten from keras.preprocessing.image import array_to_img, img_to_array, load_img from keras.applications import ResNet50 from keras.callbacks import ModelCheckpoint, EarlyStopping from keras import optimizers from sklearn.model_selection import train_test_split def list_pictures(directory, ext='jpg|jpeg|bmp|png|ppm'): return [os.path.join(root, f) for root, _, files in os.walk(directory) for f in files if re.match(r'([\w]+\.(?:' + ext + '))', f.lower())] def ResNet_dance(activation = "sigmoid",optimizer="adam",hidden_neurons = 1000,out_neurons = 3): input_tensor = Input(shape=(224, 224, 3)) resnet50 = ResNet50(include_top=False, weights='imagenet', input_tensor=input_tensor) top_model = Sequential() top_model.add(Flatten(input_shape=resnet50.output_shape[1:])) top_model.add(Dense(hidden_neurons)) top_model.add(Activation(activation)) top_model.add(Dense(out_neurons)) top_model.add(Activation("linear")) #top_model.summary() model = Model(input=resnet50.input, output=top_model(resnet50.output)) model.compile(loss="mean_squared_error", metrics = ['accuracy'], optimizer=optimizer) #model.summary() return model def objective(trial): K.clear_session() #最適化するパラメータの設定 #中間層1のユニット数 hidden_neurons = int(trial.suggest_discrete_uniform("hidden_neurons", 100, 2000, 50)) #optimizer optimizer = trial.suggest_categorical("optimizer", ["sgd", "adam", "rmsprop"]) activation = trial.suggest_categorical("activation", ["linear", sigmoid"]) model = ResNet_dance(activation = activation ,optimizer=optimizer,hidden_neurons,3) history = model.fit(x_train, y_train, batch_size=batch_size, epochs=nb_epochs, verbose=1, validation_split=0.2, shuffle=True) #検証用データに対する正答率が最大となるハイパーパラメータを求める return 1 - history.history["val_acc"][-1] X = [] Y = np.zeros((1000,3)) Y_pre = [] hidden_neurons = 1000 out_neurons = 3 batch_size = 16 nb_epochs = 2 fold_num = 5 es = EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=1, mode='auto') modelCheckpoint = ModelCheckpoint(filepath = 'Checkpoint.h5', monitor='val_loss', verbose=1, save_best_only=True, save_weights_only=False, mode='min', period=1) picture_name = [] path = './test_dataset/' for picture in list_pictures(path): picture_name.append(float(picture[picture.find(path)+len(path):picture.find('_0_')])) img = img_to_array(load_img(picture, target_size=(224,224))) X.append(img) with open('DB.csv',encoding="utf-8_sig") as f: for row in csv.reader(f, quoting=csv.QUOTE_NONNUMERIC): Y_pre.append(row) Y_pre = np.array(Y_pre) for i,name in enumerate(picture_name): Y[i,:]=Y_pre[np.where(Y_pre==name)[0],1:4] X = np.asarray(X) X = X / 255.0 print(X.shape) print(Y.shape) x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=0, shuffle=False) print(x_train.shape, x_test.shape, y_train.shape, y_test.shape) study = optuna.create_study() study.optimize(objective, n_trials=300) print("End!!") print("All trials") print(study.trials) print("Best Parameters") print(study.best_params) print("Best Value") print(study.best_value) print("end of script")
Trashの削除
余談ですが、AWSでは削除したデータは~/.local/share/Trash/files/*に保存されるため、EC2のストレージ容量を圧迫します。
定期的に削除すれば容量は解放されますが、容量はある程度拡張しておくとよいかもしれません。
次回の予定
- 次回は機械学習とパラメータのチューニングを行い、その後は精度を評価したいと思います。