123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import numpy as np
- import paddle
- import paddle.nn as nn
- import typing
- from ppdet.core.workspace import register
- from ppdet.modeling.post_process import nms
- __all__ = ['BaseArch']
- @register
- class BaseArch(nn.Layer):
- def __init__(self, data_format='NCHW'):
- super(BaseArch, self).__init__()
- self.data_format = data_format
- self.inputs = {}
- self.fuse_norm = False
- def load_meanstd(self, cfg_transform):
- scale = 1.
- mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
- std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
- for item in cfg_transform:
- if 'NormalizeImage' in item:
- mean = np.array(
- item['NormalizeImage']['mean'], dtype=np.float32)
- std = np.array(item['NormalizeImage']['std'], dtype=np.float32)
- if item['NormalizeImage'].get('is_scale', True):
- scale = 1. / 255.
- break
- if self.data_format == 'NHWC':
- self.scale = paddle.to_tensor(scale / std).reshape((1, 1, 1, 3))
- self.bias = paddle.to_tensor(-mean / std).reshape((1, 1, 1, 3))
- else:
- self.scale = paddle.to_tensor(scale / std).reshape((1, 3, 1, 1))
- self.bias = paddle.to_tensor(-mean / std).reshape((1, 3, 1, 1))
- def forward(self, inputs):
- if self.data_format == 'NHWC':
- image = inputs['image']
- inputs['image'] = paddle.transpose(image, [0, 2, 3, 1])
- if self.fuse_norm:
- image = inputs['image']
- self.inputs['image'] = image * self.scale + self.bias
- self.inputs['im_shape'] = inputs['im_shape']
- self.inputs['scale_factor'] = inputs['scale_factor']
- else:
- self.inputs = inputs
- self.model_arch()
- if self.training:
- out = self.get_loss()
- else:
- inputs_list = []
- # multi-scale input
- if not isinstance(inputs, typing.Sequence):
- inputs_list.append(inputs)
- else:
- inputs_list.extend(inputs)
- outs = []
- for inp in inputs_list:
- if self.fuse_norm:
- self.inputs['image'] = inp['image'] * self.scale + self.bias
- self.inputs['im_shape'] = inp['im_shape']
- self.inputs['scale_factor'] = inp['scale_factor']
- else:
- self.inputs = inp
- outs.append(self.get_pred())
- # multi-scale test
- if len(outs) > 1:
- out = self.merge_multi_scale_predictions(outs)
- else:
- out = outs[0]
- return out
- def merge_multi_scale_predictions(self, outs):
- # default values for architectures not included in following list
- num_classes = 80
- nms_threshold = 0.5
- keep_top_k = 100
- if self.__class__.__name__ in ('CascadeRCNN', 'FasterRCNN', 'MaskRCNN'):
- num_classes = self.bbox_head.num_classes
- keep_top_k = self.bbox_post_process.nms.keep_top_k
- nms_threshold = self.bbox_post_process.nms.nms_threshold
- else:
- raise Exception(
- "Multi scale test only supports CascadeRCNN, FasterRCNN and MaskRCNN for now"
- )
- final_boxes = []
- all_scale_outs = paddle.concat([o['bbox'] for o in outs]).numpy()
- for c in range(num_classes):
- idxs = all_scale_outs[:, 0] == c
- if np.count_nonzero(idxs) == 0:
- continue
- r = nms(all_scale_outs[idxs, 1:], nms_threshold)
- final_boxes.append(
- np.concatenate([np.full((r.shape[0], 1), c), r], 1))
- out = np.concatenate(final_boxes)
- out = np.concatenate(sorted(
- out, key=lambda e: e[1])[-keep_top_k:]).reshape((-1, 6))
- out = {
- 'bbox': paddle.to_tensor(out),
- 'bbox_num': paddle.to_tensor(np.array([out.shape[0], ]))
- }
- return out
- def build_inputs(self, data, input_def):
- inputs = {}
- for i, k in enumerate(input_def):
- inputs[k] = data[i]
- return inputs
- def model_arch(self, ):
- pass
- def get_loss(self, ):
- raise NotImplementedError("Should implement get_loss method!")
- def get_pred(self, ):
- raise NotImplementedError("Should implement get_pred method!")
|