- 今回の記事の目的
- SSDモデルの構築
- 学習データ,評価データの取り込みと閾値設定
- 誤差関数と学習の設定
- 評価結果の設定
- 評価結果の出力
- 学習の実行と検出
- 検出結果とその考察
- まとめ
- 参考文献
- プログラムコード全体
今回の記事の目的
今回は前回に引き続きFine tuningを用いてSSDの学習モデルを作成したいと思います.
前回までの記事では,Fine-tuningを行うための準備として,学習データの作成とその取り込み,そして学習データの水増しを行いました.
今回は実際にFine-tuningの実行をしてみたいと思います.
SSDモデルの構築
前回の記述したようにSSDの各層では,bounding boxの矩形領域と矩形領域内の物体のクラスの候補を出力しています.そのためVGG16でのFine-tuningの時とは異なり,各層でクラス候補出力しているため,層の特定が容易ではありません.
そこでChainerを用いてSSDをFine-tuningする方法の概要として,以下のような方法をとります.
- SSDの学習済みモデルと重みが初期化されたSSDのモデルをそれぞれインポート
- 初期化されたモデルは,Fine-tuningで作成したいモデルのクラス数
(今回の場合,男女を識別したいので,初期化されたモデルのクラス数は2) - 学習済みモデルと初期化したSSDのモデルの差分を抽出
- 2つのモデルでの違いは,重みの有無とクラス数が違う.
(学習済みモデルのクラス数は20) - クラス数が違う部分以外の部分を学習済みモデルから初期化したSSDのモデルに移動
少し抽象的なので,具体的なコードで説明していきたいと思います.
# 学習済みモデルと初期化したモデルのクラス数が異なる時にスキップする層を指定する関数 def get_shape_mismatch_names(src, dst): mismatch_names = [] src_params = {p[0]: p[1] for p in src.namedparams()} # クラス数が違うところだけをmismatch_namesにappend→出力 for dst_named_param in dst.namedparams(): name = dst_named_param[0] dst_param = dst_named_param[1] src_param = src_params[name] if src_param.shape != dst_param.shape: mismatch_names.append(name) return mismatch_names label = ['man','woman'] # pre-trainedモデルの重みの一部を使う. src = SSD300(pretrained_model='voc0712') # dstとsrcでクラス数が異なる dst = SSD300(n_fg_class=len(label)) # dstの重みをとりあえず全ての層を初期化 dst(np.zeros((1, 3, dst.insize, dst.insize), dtype=np.float32)) ignore_names = get_shape_mismatch_names(src, dst) # ignore_names以外の層の重みをsrcからdstに移動 src_params = {p[0]: p[1] for p in src.namedparams()} for dst_named_param in dst.namedparams(): name = dst_named_param[0] if name not in ignore_names: dst_named_param[1].array[:] = src_params[name].array[:] # 読み込まれているかチェック. # 読み込まれていない場合のみ出力としてAssertionエラーがでる. np.testing.assert_equal(dst.extractor.conv1_1.W.data, src.extractor.conv1_1.W.data) # スキップした層の名前を出力 print(ignore_names)
上のプログラムではまず,学習済みモデルと初期化したモデルを作成しています.
src
には学習済みモデルとして前回の記事同様PASCAL VOC2007/2012を学習したモデルを用いています.
dst
にはクラス数として2(man
,woman
)を指定し,np.zerosを入力することで初期化します.
そして,これらをget_shape_mismatch_names
に入力することで,src
の重みを継承しない層を特定します.
get_shape_mismatch_names
では,src_params
には層の名前と重みを辞書型として格納します.for文内でパラメータの大きさが違うところのみ学習しない層の名前を配列に格納して出力しています.
そしてignore_names
以外の層の重みをdst
に移動します.
最後にnp.testing.assert_equal
でちゃんと重みが移動されているかテストをします.
学習データ,評価データの取り込みと閾値設定
学習を行う前に,以下のコードで学習データや評価データを取り込み,閾値の設定を行います.
gpu = 0 batchsize = 16 model = dst # score_threshとnms_threshの値を変更 model.use_preset('evaluate') # 学習にGPUの設定 chainer.cuda.get_device_from_id(gpu).use() model.to_gpu() # 学習データの取り込み・水増し train_dataset = originalDataset(split='train') train = TransformDataset(train_dataset,('img', 'mb_loc', 'mb_label'),Transform(model.coder, model.insize, model.mean)) train_iter = chainer.iterators.MultiprocessIterator(train, batchsize) # 評価データの取り込み valid = originalDataset(split='valid') valid_iter = chainer.iterators.SerialIterator(valid, batchsize, repeat=False, shuffle=False)
まず,model.use_preset
で閾値の設定をしています.
この関数では引数としてvisualize
とevaluate
を与えることができ,これにより,クラスの閾値(score_thresh)とbounding boxの閾値(nms_thresh)を設定することができます.
具体的に閾値は以下が設定されますが,学習の段階では,evaluate
の指定で大丈夫だと思います.
* visualize
:nms_thresh=0.45, score_thresh=0.6
* evaluate
:nms_thresh=0.45, score_thresh=0.01
次に以下のコードの部分では学習にGPUを使うための設定をしています.
chainer.cuda.get_device_from_id(gpu).use() model.to_gpu()
そして最後に学習データと評価データの取り込みをしています.具体的には前回の記事で説明した通りです.
そしてそれぞれ最後にイテレーターというものを作成しています.
イテレーターとは,リスト等に対して順番にアクセスするためのもので,これを用いると順番にリスト等を辿ることができます.
具体的な説明は省きますが,この後学習の設定等を行うときに学習データの入力としてイテレーターでの入力が要求されるため,今回はこれを定義します.
ちなみに学習データではMultiprocessIterator
を,訓練データではSerialIterator
を用いていますが,基本的にイテレーターはSerialIterator
で構いませんが,MultiprocessIterator
を用いると並列処理ができるため,データが多くなる可能性が高い学習データに
はMultiprocessIterator
を用いています.
誤差関数と学習の設定
次に学習する際の処理を記述します.
そのために,学習を行うクラス(Trainer
)を作成します.
Trainer
を作成するために,まずニューラルネットワークの作成を行い,そこでは損失関数の定義を行なっています.そして最適化を行い,それをTrainer
に実装するという手順を踏んています.
損失関数や最適化に関しては補足記事を作成しましたので,そちらを参照していただければと思います.
class MultiboxTrainChain(chainer.Chain): # 初期化(入力されたパラメータをクラスに入力) def __init__(self, model, alpha=1, k=3): super(MultiboxTrainChain, self).__init__() with self.init_scope(): self.model = model self.alpha = alpha self.k = k def forward(self, imgs, gt_mb_locs, gt_mb_labels): mb_locs, mb_confs = self.model(imgs) loc_loss, conf_loss = multibox_loss( mb_locs, mb_confs, gt_mb_locs, gt_mb_labels, self.k) loss = loc_loss * self.alpha + conf_loss chainer.reporter.report( {'loss': loss, 'loss/loc': loc_loss, 'loss/conf': conf_loss}, self) return loss iteration = 120 step = [80, 100] out = 'result' # 出力ファイルパス # 確率的勾配法 # setupで初期化 train_chain = MultiboxTrainChain(model) optimizer = chainer.optimizers.MomentumSGD() optimizer.setup(train_chain) # bの時:勾配のスケールを2倍にあげる. # wの時:重みの減衰率を0.0005倍にする. for param in train_chain.params(): if param.name == 'b': param.update_rule.add_hook(GradientScaling(2)) else: param.update_rule.add_hook(WeightDecay(0.0005)) # 最適化関数とトレーニングデータセットを入力 # trainerに学習データとoptimizerを設定 updater = training.updaters.StandardUpdater( train_iter, optimizer, device=gpu) trainer = training.Trainer( updater, (iteration, 'iteration'), out) # 指数関数的に学習率を変更できるように設定 trainer.extend( extensions.ExponentialShift('lr', 0.1, init=1e-3), trigger=triggers.ManualScheduleTrigger(step, 'iteration')) # 学習とともにAPやmAPを評価するように設定 trainer.extend( DetectionVOCEvaluator( valid_iter, model, use_07_metric=True, label_names=(voc_bbox_label_names+('new_label',))))
具体的にはまず,MultiboxTrainChain
クラスを作成し,SSDを学習するためのオブジェクトを作成しています.ここでは,chainer.Chainオブジェクトを継承することによって,ニューラルネットワークを作成しています.
SSDの損失関数は以下のように定義されます.
L(x,c,l,g) = \frac{1}{N}(L_{conf}(x,c)+\alpha L_{loc}(x,l,g))
そのため,multibox_lossという関数を用いて損失を出力し,その結果から上の数式を元に損失を求めています.
次に最適化を行なっています.最適化はMomentumSGD
というOptimizerを用いています.
このOptimizerに先ほど作成したMultiboxTrainChain
クラスをセットしています.
Optimizerでは線形式のパラメータ(y=wx+bのwとb)の最適化を行なっています.そのため,SSDモデルを入力したtrain_chainのパラメータを出力して,それぞれのパラメータに対して,wに対しては大きなパラメータを取らないように過学習を抑制するための減衰率を0.005に設定,bに対しては更新する際の勾配を2倍にスケールしています.
これらをStandardUpdater
でパラメータの更新する際にてきそうするように設定,そしてそのUpdater
とを表すiteration,さらには結果の出力パスをTrainer
に入力します.
これで準備は完了ですが,最後にtrainerの学習率を指数関数的に変更できるようにしたり,学習ごとにAP(Average Precision:平均適合率)とmAP(mean Average Precision)を求めます.
適合率とは,物体検知した際に提示された矩形領域の何割が正解データと被っているかを示しているものです.APはクラスごとで,提示されたbounding boxに対する適合率の平均値です.そのクラスごとのAPの平均値,それがmAPになります.
評価結果の設定
最後に出力される評価結果の設定等をtrainer.extend
を用いて行います.
評価結果の出力
log_interval = 10, 'iteration' # iterationのタイミングで作成されたモデルを保存 trainer.extend( extensions.snapshot_object(model, 'model_iter_{.updater.iteration}'), trigger=(iteration, 'iteration')) trainer.extend(extensions.LogReport(trigger=log_interval)) trainer.extend(extensions.observe_lr(), trigger=log_interval) trainer.extend(extensions.PrintReport( ['epoch', 'iteration', 'lr', 'main/loss', 'main/loss/loc', 'main/loss/conf', 'validation/main/map']), trigger=log_interval) trainer.extend(extensions.ProgressBar(update_interval=10)) if extensions.PlotReport.available(): trainer.extend( extensions.PlotReport( ['main/loss', 'main/loss/loc', 'main/loss/conf'], 'epoch', file_name='loss.png')) trainer.extend( extensions.PlotReport( ['validation/main/map'], 'epoch', file_name='accuracy.png')) trainer.extend(extensions.snapshot( filename='snapshot_epoch_{.updater.epoch}.npz'), trigger=(10, 'epoch'))
if文の前の部分ではログとして出力されるものが設定されています.具体的にはモデルを保存し,epoch数,iteraton,学習率,損失誤差等をログファイルに出力しています. そしてif文内では評価結果のグラフを画像として出力しています.
学習の実行と検出
学習は作成したtrainer
のrun
メソッドを実行するだけで行うことができます.
trainer.run()
そしてこれを以下のコードで実行します.
img = utils.read_image('ssd_picts/test/image/000000581139.jpg', color=True) model.score_thresh = 0.40 model.nms_thresh = 0.45 bboxes, labels, scores = model.predict([img]) bbox, label, score = bboxes[0], labels[0], scores[0] vis_bbox(img, bbox, label, score, label_names={0: "man", 1:"woman"})
具体的には画像を読み込み,表示する閾値を設定しています.
そしてmodel.predict
で予測を行い,その結果としてbounding boxの座標,ラベルとそのスコアが出力されます.
これらをvis_bbox
という関数でbounding boxとラベルを画像内に表示することができます.
また,作成した学習データでは,男性の時に0,女性の時に1を指定したので,それを対応づけています.
検出結果とその考察
以上のコードを実行して検出を行なった結果,精度と損失は以下のようになりました.
精度
損失(loss:全体の損失,loc:LOCalization loss(bounding boxの位置の誤差),conf:loss of CONFidence(クラスの確信度))
実行結果は以下のようになりました.
まず精度のグラフから見ると,最終的に0.5程度であり,あまり精度がいいとは言えず,実際に実行した結果を見ても,人は検知できている一方で,その人が男性か女性かは区別ができていないことがわかります.
この原因を損失関数の結果と実行結果を照らし合わせて探りたいと思います
まず,全体の損失の値は結果として,通常は1以下で収めることができるのに対して,1どころか3程度まで大きくなっていることが分かります.
そしてより具体的に分析すると,bounding boxの損失は0.5程度に抑えられているのに対して,confの損失が2以上と非常に大きく,それが全体の損失にも影響を与えていると考えられ,これは,上述した実際の実行結果にも整合します.
これが生じた原因として考えられるのが,タスクの難しさと学習データの不十分さがあげられる.
今回は男性か女性かを区別するというタスクでの画像検知を行なっており,これは人か馬かなどに比べると比較的難しいタスクであると言えます.
そのため学習データをより集める必要があるのですが,前回の記事で書いたように学習データ数が100枚,評価データ数が10枚と非常に少なく,さらに学習データ,評価データ共に女性よりも男性の方が多く含まれていました.
そのため,人の部分には全て「男性」というラベルが付与されています.
これを改善するには学習データを増やすことがあげられ,それでも改善されなかった場合にはさらに損失関数や学習率をどう変化させるかという部分をより精査する必要があると考えられます.
まとめ
前回に引き続き,今回もSSDのFine-tuningを行い,実際に実行してみました.基本的なFine-tuningの考え方としてはクラス推定の部分のみを学習するということであるため,他の学習モデルを用いて行うときでも,どこでクラス推定を行なっているか?その部分のみを学習するためにはどのようにして実装すれば良いかを考えればFine-tuningを実行することができると思います.
参考文献
- Chainerの基本オブジェクトについて〜Chain編〜 - Qiita
- SSD_FineTuning/ChaienrCV_SSD_new_Fine_tuning.ipynb at master · junyamahira/SSD_FineTuning · GitHub
- 6. 実践編: 血液の顕微鏡画像からの細胞検出 — メディカルAI専門コース オンライン講義資料 ドキュメント
- Chainer – A flexible framework of neural networks — Chainer 6.2.0 documentation
プログラムコード全体
import json import matplotlib.pyplot as plt import copy import chainer from chainercv.chainer_experimental.datasets.sliceable import GetterDataset import os from chainer.datasets import TransformDataset from chainer.optimizer_hooks import WeightDecay from chainer import training from chainer.training import extensions from chainer.training import triggers from chainercv import utils from chainercv import transforms from chainercv.visualizations import vis_bbox from chainercv.chainer_experimental.datasets.sliceable import TransformDataset from chainercv.datasets import voc_bbox_label_names from chainercv.extensions import DetectionVOCEvaluator from chainercv.links.model.ssd import GradientScaling from chainercv.links.model.ssd import multibox_loss from chainercv.links.model.ssd import random_crop_with_bbox_constraints from chainercv.links.model.ssd import resize_with_random_interpolation from chainercv.links.model.ssd import random_distort from chainercv.links import SSD300 from chainercv.utils import read_image import numpy as np # 学習データをインポートするクラス # GetterDatasetを継承 class originalDataset(GetterDataset): # 初期値はとりあえずはsplit='train' def __init__(self,split='train'): super(originalDataset, self).__init__() data_dir = 'ssd_picts/'+split+'/' file_names = [] for file in os.listdir(data_dir+'image'): file_names.append(file) self.filenames = file_names self.data_dir = data_dir # _get_imageと_get_annotationsで画像とそのアノテーションをインポート self.add_getter('img', self._get_image) self.add_getter(('bbox', 'label'), self._get_annotations) # ファイル数を出力 def __len__(self): return len(self.filenames) # 画像のインポート def _get_image(self,i): file_name = self.filenames[i] img = read_image(self.data_dir+'image/'+file_name) return img # i番目の画像のアノテーション情報(メタデータ)をインポート def _get_annotations(self,i): bbox = np.empty((0,4), float) label = np.empty((1,0), int) filename = self.filenames[i] # メタデータから該当データ探索 f = open(self.data_dir+'metadata.json') json_data = json.load(f)['_via_img_metadata'] tmp_picts = list(json_data.values()) tmp_data = {} objs = [p['regions'] for p in tmp_picts if p['filename'] == filename][0] # 領域それぞれに対して領域の角の点を抽出,ラベルも同時に定義 for obj in objs: xmax = int(obj['shape_attributes']['x']) + int(obj['shape_attributes']['width']) ymax = int(obj['shape_attributes']['y']) + int(obj['shape_attributes']['height']) tmp_bbox=np.array([int(obj['shape_attributes']['y']), int(obj['shape_attributes']['x']), ymax, xmax]) tmp_label = int(obj['region_attributes']['label']) bbox = np.append(bbox,np.array([tmp_bbox]), axis = 0) label = np.append(label,tmp_label) bbox = np.array(bbox,dtype=np.float32) label = np.array(label,dtype=np.int32) bbox = np.stack(bbox).astype(np.float32) label = np.stack(label).astype(np.int32) return bbox, label # 学習済みデータセットとクラス数が異なる時にスキップする重みを指定する関数 def get_shape_mismatch_names(src, dst): mismatch_names = [] src_params = {p[0]: p[1] for p in src.namedparams()} # クラス数が違うところだけをmismatch_namesにappend→出力 for dst_named_param in dst.namedparams(): name = dst_named_param[0] dst_param = dst_named_param[1] src_param = src_params[name] if src_param.shape != dst_param.shape: mismatch_names.append(name) return mismatch_names # SSDのモデルの定義 class MultiboxTrainChain(chainer.Chain): # 初期化(入力されたパラメータをクラスに入力) def __init__(self, model, alpha=1, k=3): super(MultiboxTrainChain, self).__init__() with self.init_scope(): self.model = model self.alpha = alpha self.k = k def forward(self, imgs, gt_mb_locs, gt_mb_labels): mb_locs, mb_confs = self.model(imgs) loc_loss, conf_loss = multibox_loss( mb_locs, mb_confs, gt_mb_locs, gt_mb_labels, self.k) loss = loc_loss * self.alpha + conf_loss chainer.reporter.report( {'loss': loss, 'loss/loc': loc_loss, 'loss/conf': conf_loss}, self) return loss # 学習データの水増しとSSDに入力するための準備処理 class Transform(object): def __init__(self, coder, size, mean): self.coder = copy.copy(coder) self.coder.to_cpu() self.size = size self.mean = mean def __call__(self, in_data): img, bbox, label = in_data # 1. 色の拡張 img = random_distort(img) # 2. ランダムな拡大 if np.random.randint(2): # キャンバスの様々な座標に入力画像を置いて,様々な比率の画像を生成し,bounding boxを更新 img, param = transforms.random_expand(img, fill=self.mean, return_param=True) bbox = transforms.translate_bbox(bbox, y_offset=param['y_offset'], x_offset=param['x_offset']) # 3. ランダムなトリミング img, param = random_crop_with_bbox_constraints(img, bbox, return_param=True) # トリミングされた画像内にbounding boxが入るように調整 bbox, param = transforms.crop_bbox( bbox, y_slice=param['y_slice'], x_slice=param['x_slice'], allow_outside_center=False, return_param=True) label = label[param['index']] # 4. ランダムな補完の再補正 _, H, W = img.shape img = resize_with_random_interpolation(img, (self.size, self.size)) bbox = transforms.resize_bbox(bbox, (H, W), (self.size, self.size)) # 5. ランダムな水平反転 img, params = transforms.random_flip(img, x_random=True, return_param=True) bbox = transforms.flip_bbox(bbox, (self.size, self.size), x_flip=params['x_flip']) # SSDのネットワークに入力するための準備の処理 img -= self.mean mb_loc, mb_label = self.coder.encode(bbox, label) return img, mb_loc, mb_label # fine-tuningの準備 # pre-trainedモデルの重みの一部を使う. src = SSD300(pretrained_model='voc0712') dst = SSD300(n_fg_class=2) # dstの重みを全ての層において初期化 dst(np.zeros((1, 3, dst.insize, dst.insize), dtype=np.float32)) # ignore_names以外の層に関しては,出力先のdstにパラメータを出力 ignore_names = get_shape_mismatch_names(src, dst) src_params = {p[0]: p[1] for p in src.namedparams()} for dst_named_param in dst.namedparams(): name = dst_named_param[0] if name not in ignore_names: dst_named_param[1].array[:] = src_params[name].array[:] # 読み込まれているかチェック. # 読み込まれていない場合のみ出力としてAssertionエラーがでる. np.testing.assert_equal(dst.extractor.conv1_1.W.data, src.extractor.conv1_1.W.data) # スキップした層の名前を出力 print(ignore_names) # 学習 ## 変数定義 gpu = 0 # gpuのID batchsize = 16 # バッチサイズ iteration = 120 step = [50, 70] out = 'result' # 出力ファイルパス label = ['man','woman'] # dstのignore_namesを学習する model = dst # score_threshとnms_threshの値を変更 model.use_preset('evaluate') # GPUの設定 chainer.cuda.get_device_from_id(gpu).use() model.to_gpu() # 学習データの取り込み with 水増し train_dataset = originalDataset(split='train') train = TransformDataset(train_dataset,('img', 'mb_loc', 'mb_label'),Transform(model.coder, model.insize, model.mean)) train_iter = chainer.iterators.MultiprocessIterator(train, batchsize) # テストデータの取り込み valid = originalDataset(split='valid') valid_iter = chainer.iterators.SerialIterator(valid, batchsize, repeat=False, shuffle=False) # 確率的勾配法 # setupで初期化 train_chain = MultiboxTrainChain(model) optimizer = chainer.optimizers.MomentumSGD() optimizer.setup(train_chain) # bの時:勾配のスケールを2倍にあげる. # Wの時:重みの減衰率を0.0005倍にする. for param in train_chain.params(): if param.name == 'b': param.update_rule.add_hook(GradientScaling(2)) else: param.update_rule.add_hook(WeightDecay(0.0005)) # trainerに学習データとoptimizerを設定 updater = training.updaters.StandardUpdater( train_iter, optimizer, device=gpu) trainer = training.Trainer( updater, (iteration, 'iteration'), out) # 指数関数的に学習率を変更できるように設定 trainer.extend( extensions.ExponentialShift('lr', 0.1, init=1e-3), trigger=triggers.ManualScheduleTrigger(step, 'iteration')) # 学習とともにAPやmAPを評価するように設定 trainer.extend( DetectionVOCEvaluator( valid_iter, model, use_07_metric=True, label_names=(voc_bbox_label_names+('new_label',)))) # 学習率をスケジューリング # epoch=50,70のタイミングで学習率を変更 trainer.extend( extensions.snapshot(), trigger=triggers.ManualScheduleTrigger( step + [iteration], 'iteration')) log_interval = 10, 'iteration' # iterationのタイミングで作成されたモデルを保存 trainer.extend( extensions.snapshot_object(model, 'model_iter_{.updater.iteration}'), trigger=(iteration, 'iteration')) # 評価結果の出力 trainer.extend(extensions.LogReport(trigger=log_interval)) trainer.extend(extensions.observe_lr(), trigger=log_interval) trainer.extend(extensions.PrintReport( ['epoch', 'iteration', 'lr', 'main/loss', 'main/loss/loc', 'main/loss/conf', 'validation/main/map']), trigger=log_interval) trainer.extend(extensions.ProgressBar(update_interval=10)) if extensions.PlotReport.available(): trainer.extend( extensions.PlotReport( ['main/loss', 'main/loss/loc', 'main/loss/conf'], 'epoch', file_name='loss.png')) trainer.extend( extensions.PlotReport( ['validation/main/map'], 'epoch', file_name='accuracy.png')) trainer.extend(extensions.snapshot( filename='snapshot_epoch_{.updater.epoch}.npz'), trigger=(10, 'epoch')) trainer.run() # 作成したモデルで認識 img = utils.read_image('ssd_picts/test/image/000000581139.jpg', color=True) model.score_thresh = 0.40 model.nms_thresh = 0.45 bboxes, labels, scores = model.predict([img]) bbox, label, score = bboxes[0], labels[0], scores[0] vis_bbox(img, bbox, label, score, label_names={0: "man", 1:"woman"})