# YOLOv5 🚀 by Ultralytics, GPL-3.0 license """ Common modules """ import json import math import platform import warnings from collections import OrderedDict, namedtuple from copy import copy from pathlib import Path import cv2 import numpy as np import pandas as pd import requests import torch import torch.nn as nn import yaml from PIL import Image from torch.cuda import amp from utils.datasets import exif_transpose, letterbox from utils.general import (LOGGER, check_requirements, check_suffix, check_version, colorstr, increment_path, make_divisible, non_max_suppression, scale_coords, xywh2xyxy, xyxy2xywh) from utils.plots import Annotator, colors, save_one_box from utils.torch_utils import copy_attr, time_sync # 为same卷积或same池化自动扩充 def autopad(k, p=None): # kernel, padding """ 用于Conv函数和Classify函数,根据卷积核大小k自动计算卷积核和padding数 v5中只有两种卷积: 1.下采样卷积:conv3*3 s=2 p=k//2=1 2.feature size不变的卷积:conv1*1 s=1 p=k//2=1 :param k: 卷积核的kernel_size :type k: :param p:自动计算的pad值 :type p: :return: :rtype: """ # Pad to 'same' if p is None: p = k // 2 if isinstance(k, int) else (x // 2 for x in k) # auto-pad return p class Conv(nn.Module): # Standard convolution 标准卷积:conv+BN+SiLU def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups """ 在Focus、Bottleneck、BottleneckCSP、C3、SPP、DWConv、TransformerBloc等模块中调用的基础组件 :param c1:输入的channel值 :type c1: :param c2:输出的channel值 :type c2: :param k:卷积的kernel_size :type k: :param s:卷积的stride :type s: :param p:卷积的padding数,可以通过autopad自行计算padding数 :type p: :param g:卷积的groups数 一般等于1为普通卷积,大于1就是深度可分离卷积 :type g: :param act:激活函数类型 True就是SiLU :type act: """ super().__init__() self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False) self.bn = nn.BatchNorm2d(c2) self.act = nn.SiLU() if act is True else (act if isinstance(act, nn.Module) else nn.Identity()) def forward(self, x): # 网络的执行顺序是根据 forward 函数决定的 return self.act(self.bn(self.conv(x))) def forward_fuse(self, x): """ 用于Model类的fuse函数 相较于forward函数去掉了BN层,加速推理,一般用于测试/验证阶段 :param x: :type x: :return: :rtype: """ return self.act(self.conv(x)) class DWConv(Conv): # Depth-wise convolution class def __init__(self, c1, c2, k=1, s=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups super().__init__(c1, c2, k, s, g=math.gcd(c1, c2), act=act) class TransformerLayer(nn.Module): # Transformer layer https://arxiv.org/abs/2010.11929 (LayerNorm layers removed for better performance) def __init__(self, c, num_heads): super().__init__() self.q = nn.Linear(c, c, bias=False) self.k = nn.Linear(c, c, bias=False) self.v = nn.Linear(c, c, bias=False) self.ma = nn.MultiheadAttention(embed_dim=c, num_heads=num_heads) self.fc1 = nn.Linear(c, c, bias=False) self.fc2 = nn.Linear(c, c, bias=False) def forward(self, x): x = self.ma(self.q(x), self.k(x), self.v(x))[0] + x x = self.fc2(self.fc1(x)) + x return x class TransformerBlock(nn.Module): # Vision Transformer https://arxiv.org/abs/2010.11929 def __init__(self, c1, c2, num_heads, num_layers): super().__init__() self.conv = None if c1 != c2: self.conv = Conv(c1, c2) self.linear = nn.Linear(c2, c2) # learnable position embedding self.tr = nn.Sequential(*(TransformerLayer(c2, num_heads) for _ in range(num_layers))) self.c2 = c2 def forward(self, x): if self.conv is not None: x = self.conv(x) b, _, w, h = x.shape p = x.flatten(2).permute(2, 0, 1) return self.tr(p + self.linear(p)).permute(1, 2, 0).reshape(b, self.c2, w, h) class Bottleneck(nn.Module): # Standard bottleneck """ 由1*1conv、3*3conv、残差块组成 """ def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion """ 在BottleneckCSP、C3、parse_model中调用 组件分为两种情况,当shortcut为True时,bottleneck需要在经过1*1卷积和3*3卷积后在经过shortcut 当shortcut为False时,bottleneck只需要经过1*1卷积和3*3卷积即可 :param c1:输入channel :type c1: :param c2:输出channel :type c2: :param shortcut:是否进行shortcut 默认为True :type shortcut: :param g: 卷积的groups数 等于1普通卷积 大于1深度可分离卷积 :type g: :param e:膨胀系数 :type e: """ super().__init__() c_ = int(c2 * e) # hidden channels 中间层的channel数 self.cv1 = Conv(c1, c_, 1, 1) # 第一层卷积输出的channel数为c_ self.cv2 = Conv(c_, c2, 3, 1, g=g)# 第二层卷积输入的channel数为c_ self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class BottleneckCSP(nn.Module): # CSP Bottleneck https://github.com/WongKinYiu/CrossStagePartialNetworks def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion """ 该组件由bottleneck模块和CSP模块组成,此模块与C3模块等效。 :param c1:输入channel :type c1: :param c2:输出channel :type c2: :param n:有n个bottleneck :type n: :param shortcut:bottleneck中是shortcut,默认为True :type shortcut: :param g: bottleneck中的groups 等于1,普通卷积 大于1,深度可分离卷积 :type g: :param e:bottleneck中的膨胀系数 :type e: """ super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) #Conv+BN+SiLU self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False) self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False) self.cv4 = Conv(2 * c_, c2, 1, 1) self.bn = nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3) self.act = nn.SiLU() self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n))) #叠加n次bottleneck def forward(self, x): y1 = self.cv3(self.m(self.cv1(x))) y2 = self.cv2(x) return self.cv4(self.act(self.bn(torch.cat((y1, y2), 1)))) class C3(nn.Module): # CSP Bottleneck with 3 convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion """ 简化版的bottleneckCSP模块,除了bottleneck部分整个结构只有3个卷积,可以减少参数 :param c1: 输入channel :type c1: :param c2: 输出channel :type c2: :param n: 有n个bottleneck :type n: :param shortcut: bottleneck中是否有shortcut,默认为True :type shortcut: :param g: bottleneck中的groups 等于1,普通卷积 大于1,深度可分离卷积 :type g: :param e: bottleneck中的膨胀系数 :type e: """ super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c1, c_, 1, 1) self.cv3 = Conv(2 * c_, c2, 1) # optional act=FReLU(c2) self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n))) # self.m = nn.Sequential(*(CrossConv(c_, c_, 3, 1, g, 1.0, shortcut) for _ in range(n))) def forward(self, x): return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1)) class C3TR(C3): # C3 module with TransformerBlock() def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): super().__init__(c1, c2, n, shortcut, g, e) c_ = int(c2 * e) self.m = TransformerBlock(c_, c_, 4, n) class C3SPP(C3): # C3 module with SPP() def __init__(self, c1, c2, k=(5, 9, 13), n=1, shortcut=True, g=1, e=0.5): super().__init__(c1, c2, n, shortcut, g, e) c_ = int(c2 * e) self.m = SPP(c_, c_, k) class C3Ghost(C3): # C3 module with GhostBottleneck() def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): super().__init__(c1, c2, n, shortcut, g, e) c_ = int(c2 * e) # hidden channels self.m = nn.Sequential(*(GhostBottleneck(c_, c_) for _ in range(n))) class SPP(nn.Module): # Spatial Pyramid Pooling (SPP) layer https://arxiv.org/abs/1406.4729 def __init__(self, c1, c2, k=(5, 9, 13)): """ 空间金字塔池化 :param c1: 输入channel :type c1: :param c2: 输出channel :type c2: :param k: 保存着三个maxpool卷积的kernel_size。默认是(5, 9, 13) :type k: """ super().__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) # 第一层卷积 self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1) # 最后一层卷积 self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k]) # 中间的maxpool层 def forward(self, x): x = self.cv1(x) with warnings.catch_warnings(): warnings.simplefilter('ignore') # suppress torch 1.9.0 max_pool2d() warning return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1)) class SPPF(nn.Module): # Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher def __init__(self, c1, c2, k=5): # equivalent to SPP(k=(5, 9, 13)) """ SPP的升级改进版,将5*5,9*9,13*13三个amxpool并行输出的结果改成了3个5*5的maxpool串行输出的结果。结果是提升了计算速度 :param c1: 输入channel :type c1: :param c2: 输出channel :type c2: :param k: 卷积的kernel_size :type k: """ super().__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_ * 4, c2, 1, 1) self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2) def forward(self, x): x = self.cv1(x) with warnings.catch_warnings(): warnings.simplefilter('ignore') # suppress torch 1.9.0 max_pool2d() warning y1 = self.m(x) y2 = self.m(y1) return self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1)) class Focus(nn.Module): # Focus:把宽度w和高度h的信息整合到c空间中。 """ Focus组件是为了减少计算量,提升速度。并不能增加网络的精度。 从高分辨率图片中,周期性的抽出像素点重构到低分辨率图像中,将图像相邻的四个位置进行堆叠,聚焦wh维度信息到c通道空间,提高每个点的感受野,并减少原始信息的丢失。 该组件在减少计算量,提升速度的前提下减少原始信息的丢失。 """ # Focus wh information into c-space def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups """ :param c1: 输入的channel数 :type c1: :param c2: Focus输出的channel数 :type c2: :param k: 卷积的kernel_size :type k: :param s: 卷积的stride :type s: :param p: 卷积的padding :type p: :param g: 卷积的groups 等于1为普通卷积 大于1为深度可分离卷积 :type g: :param act:激活函数类型 True:SiLU/Swish False:不使用激活函数 :type act: """ super().__init__() self.conv = Conv(c1 * 4, c2, k, s, p, g, act) # self.contract = Contract(gain=2) def forward(self, x): # x(b,c,w,h) -> y(b,4c,w/2,h/2) return self.conv(torch.cat((x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]), 1)) # return self.conv(self.contract(x)) class GhostConv(nn.Module): # Ghost Convolution https://github.com/huawei-noah/ghostnet def __init__(self, c1, c2, k=1, s=1, g=1, act=True): # ch_in, ch_out, kernel, stride, groups super().__init__() c_ = c2 // 2 # hidden channels self.cv1 = Conv(c1, c_, k, s, None, g, act) self.cv2 = Conv(c_, c_, 5, 1, None, c_, act) def forward(self, x): y = self.cv1(x) return torch.cat((y, self.cv2(y)), 1) class GhostBottleneck(nn.Module): # Ghost Bottleneck https://github.com/huawei-noah/ghostnet def __init__(self, c1, c2, k=3, s=1): # ch_in, ch_out, kernel, stride super().__init__() c_ = c2 // 2 self.conv = nn.Sequential( GhostConv(c1, c_, 1, 1), # pw DWConv(c_, c_, k, s, act=False) if s == 2 else nn.Identity(), # dw GhostConv(c_, c2, 1, 1, act=False)) # pw-linear self.shortcut = nn.Sequential(DWConv(c1, c1, k, s, act=False), Conv(c1, c2, 1, 1, act=False)) if s == 2 else nn.Identity() def forward(self, x): return self.conv(x) + self.shortcut(x) class Contract(nn.Module): # Contract width-height into channels, i.e. x(1,64,80,80) to x(1,256,40,40) def __init__(self, gain=2): """ Focus模块的辅助函数,目的是改变输入特征的shape w和h维度的数据减半后将channel通道数提升4倍 :param gain: :type gain: """ super().__init__() self.gain = gain def forward(self, x): b, c, h, w = x.size() # assert (h / s == 0) and (W / s == 0), 'Indivisible gain' s = self.gain x = x.view(b, c, h // s, s, w // s, s) # x(1,64,40,2,40,2) x = x.permute(0, 3, 5, 1, 2, 4).contiguous() # x(1,2,2,64,40,40) return x.view(b, c * s * s, h // s, w // s) # x(1,256,40,40) class Expand(nn.Module): # Expand channels into width-height, i.e. x(1,64,80,80) to x(1,16,160,160) def __init__(self, gain=2): """ Contract函数的还原函数,目的是将channel维度(缩小4倍)的数据扩展到W和H维度(扩大两倍) :param gain: :type gain: """ super().__init__() self.gain = gain def forward(self, x): b, c, h, w = x.size() # assert C / s ** 2 == 0, 'Indivisible gain' s = self.gain x = x.view(b, s, s, c // s ** 2, h, w) # x(1,2,2,16,80,80) x = x.permute(0, 3, 4, 1, 5, 2).contiguous() # x(1,16,80,2,80,2) return x.view(b, c // s ** 2, h * s, w * s) # x(1,16,160,160) class Concat(nn.Module): # Concatenate a list of tensors along dimension def __init__(self, dimension=1): """ 按指定维度进行拼接 :param dimension:维度 :type dimension: """ super().__init__() self.d = dimension def forward(self, x): return torch.cat(x, self.d) class DetectMultiBackend(nn.Module): # YOLOv5 多类型模型推理 # YOLOv5 MultiBackend class for python inference on various backends def __init__(self, weights='yolov5s.pt', device=torch.device('cpu'), dnn=False, data=None, fp16=False): # Usage: # PyTorch: weights = *.pt # TorchScript: *.torchscript # ONNX Runtime: *.onnx # ONNX OpenCV DNN: *.onnx with --dnn # OpenVINO: *.xml # CoreML: *.mlmodel # TensorRT: *.engine # TensorFlow SavedModel: *_saved_model # TensorFlow GraphDef: *.pb # TensorFlow Lite: *.tflite # TensorFlow Edge TPU: *_edgetpu.tflite from models.experimental import attempt_download, attempt_load # scoped to avoid circular import super().__init__() w = str(weights[0] if isinstance(weights, list) else weights) # 获取 weights的名称 pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs = self.model_type(w) # get backend 返回模型的类型,如果模型属于该类则返回True stride, names = 32, [f'class{i}' for i in range(1000)] # assign defaults 自定义步长为32,类别为1000种 w = attempt_download(w) # download if not local 下载权重文件,如果文件不存在 fp16 &= (pt or jit or onnx or engine) and device.type != 'cpu' # FP16 if data: # data.yaml path (optional) 如果yaml文件存在则读取文件种的class_name with open(data, errors='ignore') as f: names = yaml.safe_load(f)['names'] # class names if pt: # PyTorch model = attempt_load(weights if isinstance(weights, list) else w, map_location=device) # 加载权重文件 stride = max(int(model.stride.max()), 32) # model stride #获取模型的下采样倍数(最小32倍) names = model.module.names if hasattr(model, 'module') else model.names # get class names 获取分类名称 model.half() if fp16 else model.float() # 全精度/半精度 self.model = model # explicitly assign for to(), cpu(), cuda(), half() elif jit: # TorchScript LOGGER.info(f'Loading {w} for TorchScript inference...') extra_files = {'config.txt': ''} # model metadata model = torch.jit.load(w, _extra_files=extra_files) model.half() if fp16 else model.float() if extra_files['config.txt']: d = json.loads(extra_files['config.txt']) # extra_files dict stride, names = int(d['stride']), d['names'] elif dnn: # ONNX OpenCV DNN LOGGER.info(f'Loading {w} for ONNX OpenCV DNN inference...') check_requirements(('opencv-python>=4.5.4',)) net = cv2.dnn.readNetFromONNX(w) elif onnx: # ONNX Runtime LOGGER.info(f'Loading {w} for ONNX Runtime inference...') cuda = torch.cuda.is_available() check_requirements(('onnx', 'onnxruntime-gpu' if cuda else 'onnxruntime')) import onnxruntime providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda else ['CPUExecutionProvider'] session = onnxruntime.InferenceSession(w, providers=providers) meta = session.get_modelmeta().custom_metadata_map # metadata if 'stride' in meta: stride, names = int(meta['stride']), eval(meta['names']) elif xml: # OpenVINO LOGGER.info(f'Loading {w} for OpenVINO inference...') check_requirements(('openvino-dev',)) # requires openvino-dev: https://pypi.org/project/openvino-dev/ import openvino.inference_engine as ie core = ie.IECore() if not Path(w).is_file(): # if not *.xml w = next(Path(w).glob('*.xml')) # get *.xml file from *_openvino_model dir network = core.read_network(model=w, weights=Path(w).with_suffix('.bin')) # *.xml, *.bin paths executable_network = core.load_network(network, device_name='CPU', num_requests=1) elif engine: # TensorRT LOGGER.info(f'Loading {w} for TensorRT inference...') import tensorrt as trt # https://developer.nvidia.com/nvidia-tensorrt-download check_version(trt.__version__, '7.0.0', hard=True) # require tensorrt>=7.0.0 Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr')) logger = trt.Logger(trt.Logger.INFO) with open(w, 'rb') as f, trt.Runtime(logger) as runtime: model = runtime.deserialize_cuda_engine(f.read()) bindings = OrderedDict() fp16 = False # default updated below for index in range(model.num_bindings): name = model.get_binding_name(index) dtype = trt.nptype(model.get_binding_dtype(index)) shape = tuple(model.get_binding_shape(index)) data = torch.from_numpy(np.empty(shape, dtype=np.dtype(dtype))).to(device) bindings[name] = Binding(name, dtype, shape, data, int(data.data_ptr())) if model.binding_is_input(index) and dtype == np.float16: fp16 = True binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items()) context = model.create_execution_context() batch_size = bindings['images'].shape[0] elif coreml: # CoreML LOGGER.info(f'Loading {w} for CoreML inference...') import coremltools as ct model = ct.models.MLModel(w) else: # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU) if saved_model: # SavedModel LOGGER.info(f'Loading {w} for TensorFlow SavedModel inference...') import tensorflow as tf keras = False # assume TF1 saved_model model = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w) elif pb: # GraphDef https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt LOGGER.info(f'Loading {w} for TensorFlow GraphDef inference...') import tensorflow as tf def wrap_frozen_graph(gd, inputs, outputs): x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), []) # wrapped ge = x.graph.as_graph_element return x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs)) gd = tf.Graph().as_graph_def() # graph_def with open(w, 'rb') as f: gd.ParseFromString(f.read()) frozen_func = wrap_frozen_graph(gd, inputs="x:0", outputs="Identity:0") elif tflite or edgetpu: # https://www.tensorflow.org/lite/guide/python#install_tensorflow_lite_for_python try: # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu from tflite_runtime.interpreter import Interpreter, load_delegate except ImportError: import tensorflow as tf Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate, if edgetpu: # Edge TPU https://coral.ai/software/#edgetpu-runtime LOGGER.info(f'Loading {w} for TensorFlow Lite Edge TPU inference...') delegate = { 'Linux': 'libedgetpu.so.1', 'Darwin': 'libedgetpu.1.dylib', 'Windows': 'edgetpu.dll'}[platform.system()] interpreter = Interpreter(model_path=w, experimental_delegates=[load_delegate(delegate)]) else: # Lite LOGGER.info(f'Loading {w} for TensorFlow Lite inference...') interpreter = Interpreter(model_path=w) # load TFLite model interpreter.allocate_tensors() # allocate input_details = interpreter.get_input_details() # inputs output_details = interpreter.get_output_details() # outputs elif tfjs: raise Exception('ERROR: YOLOv5 TF.js inference is not supported') self.__dict__.update(locals()) # assign all variables to self def forward(self, im, augment=False, visualize=False, val=False): # YOLOv5 MultiBackend inference YOLOv5支持不同模型的推理 b, ch, h, w = im.shape # batch, channel, height, width if self.pt: # PyTorch y = self.model(im, augment=augment, visualize=visualize)[0] elif self.jit: # TorchScript y = self.model(im)[0] elif self.dnn: # ONNX OpenCV DNN im = im.cpu().numpy() # torch to numpy self.net.setInput(im) y = self.net.forward() elif self.onnx: # ONNX Runtime im = im.cpu().numpy() # torch to numpy y = self.session.run([self.session.get_outputs()[0].name], {self.session.get_inputs()[0].name: im})[0] elif self.xml: # OpenVINO im = im.cpu().numpy() # FP32 desc = self.ie.TensorDesc(precision='FP32', dims=im.shape, layout='NCHW') # Tensor Description request = self.executable_network.requests[0] # inference request request.set_blob(blob_name='images', blob=self.ie.Blob(desc, im)) # name=next(iter(request.input_blobs)) request.infer() y = request.output_blobs['output'].buffer # name=next(iter(request.output_blobs)) elif self.engine: # TensorRT assert im.shape == self.bindings['images'].shape, (im.shape, self.bindings['images'].shape) self.binding_addrs['images'] = int(im.data_ptr()) self.context.execute_v2(list(self.binding_addrs.values())) y = self.bindings['output'].data elif self.coreml: # CoreML im = im.permute(0, 2, 3, 1).cpu().numpy() # torch BCHW to numpy BHWC shape(1,320,192,3) im = Image.fromarray((im[0] * 255).astype('uint8')) # im = im.resize((192, 320), Image.ANTIALIAS) y = self.model.predict({'image': im}) # coordinates are xywh normalized if 'confidence' in y: box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float) y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1) else: k = 'var_' + str(sorted(int(k.replace('var_', '')) for k in y)[-1]) # output key y = y[k] # output else: # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU) im = im.permute(0, 2, 3, 1).cpu().numpy() # torch BCHW to numpy BHWC shape(1,320,192,3) if self.saved_model: # SavedModel y = (self.model(im, training=False) if self.keras else self.model(im)).numpy() elif self.pb: # GraphDef y = self.frozen_func(x=self.tf.constant(im)).numpy() else: # Lite or Edge TPU input, output = self.input_details[0], self.output_details[0] int8 = input['dtype'] == np.uint8 # is TFLite quantized uint8 model if int8: scale, zero_point = input['quantization'] im = (im / scale + zero_point).astype(np.uint8) # de-scale self.interpreter.set_tensor(input['index'], im) self.interpreter.invoke() y = self.interpreter.get_tensor(output['index']) if int8: scale, zero_point = output['quantization'] y = (y.astype(np.float32) - zero_point) * scale # re-scale y[..., :4] *= [w, h, w, h] # xywh normalized to pixels if isinstance(y, np.ndarray): y = torch.tensor(y, device=self.device) return (y, []) if val else y def warmup(self, imgsz=(1, 3, 640, 640)): # 模型预热推理 # Warmup model by running inference once if any((self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb)): # warmup types 检查模型类型 if self.device.type != 'cpu': # only warmup GPU models im = torch.zeros(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device) # input 初始化全零矩阵作为模型的输入 for _ in range(2 if self.jit else 1): # self.forward(im) # warmup @staticmethod def model_type(p='path/to/model.pt'): # 根据模型的路径信息返回模型的类型 # Return model type from model path, i.e. path='path/to/model.onnx' -> type=onnx from export import export_formats suffixes = list(export_formats().Suffix) + ['.xml'] # export suffixes 获取YOLOv5模型支持格式 check_suffix(p, suffixes) # checks 检查模型后缀 p = Path(p).name # eliminate trailing separators 去除目录信息 pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, xml2 = (s in p for s in suffixes) xml |= xml2 # *_openvino_model or *.xml tflite &= not edgetpu # *.tflite return pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs class AutoShape(nn.Module): #自动调整shape # YOLOv5 input-robust model wrapper for passing cv2/np/PIL/torch inputs. Includes preprocessing, inference and NMS conf = 0.25 # NMS confidence threshold iou = 0.45 # NMS IoU threshold agnostic = False # NMS class-agnostic multi_label = False # NMS multiple labels per box classes = None # (optional list) filter by class, i.e. = [0, 15, 16] for COCO persons, cats and dogs max_det = 1000 # maximum number of detections per image amp = False # Automatic Mixed Precision (AMP) inference def __init__(self, model): super().__init__() LOGGER.info('Adding AutoShape... ') copy_attr(self, model, include=('yaml', 'nc', 'hyp', 'names', 'stride', 'abc'), exclude=()) # copy attributes self.dmb = isinstance(model, DetectMultiBackend) # DetectMultiBackend() instance self.pt = not self.dmb or model.pt # PyTorch model self.model = model.eval() def _apply(self, fn): # Apply to(), cpu(), cuda(), half() to model tensors that are not parameters or registered buffers self = super()._apply(fn) if self.pt: m = self.model.model.model[-1] if self.dmb else self.model.model[-1] # Detect() m.stride = fn(m.stride) m.grid = list(map(fn, m.grid)) if isinstance(m.anchor_grid, list): m.anchor_grid = list(map(fn, m.anchor_grid)) return self @torch.no_grad() def forward(self, imgs, size=640, augment=False, profile=False): # Inference from various sources. For height=640, width=1280, RGB images example inputs are: # file: imgs = 'data/images/zidane.jpg' # str or PosixPath # URI: = 'https://ultralytics.com/images/zidane.jpg' # OpenCV: = cv2.imread('image.jpg')[:,:,::-1] # HWC BGR to RGB x(640,1280,3) # PIL: = Image.open('image.jpg') or ImageGrab.grab() # HWC x(640,1280,3) # numpy: = np.zeros((640,1280,3)) # HWC # torch: = torch.zeros(16,3,320,640) # BCHW (scaled to size=640, 0-1 values) # multiple: = [Image.open('image1.jpg'), Image.open('image2.jpg'), ...] # list of images t = [time_sync()] p = next(self.model.parameters()) if self.pt else torch.zeros(1) # for device and type autocast = self.amp and (p.device.type != 'cpu') # Automatic Mixed Precision (AMP) inference if isinstance(imgs, torch.Tensor): # torch with amp.autocast(autocast): return self.model(imgs.to(p.device).type_as(p), augment, profile) # inference # Pre-process n, imgs = (len(imgs), list(imgs)) if isinstance(imgs, (list, tuple)) else (1, [imgs]) # number, list of images shape0, shape1, files = [], [], [] # image and inference shapes, filenames for i, im in enumerate(imgs): f = f'image{i}' # filename if isinstance(im, (str, Path)): # filename or uri im, f = Image.open(requests.get(im, stream=True).raw if str(im).startswith('http') else im), im im = np.asarray(exif_transpose(im)) elif isinstance(im, Image.Image): # PIL Image im, f = np.asarray(exif_transpose(im)), getattr(im, 'filename', f) or f files.append(Path(f).with_suffix('.jpg').name) if im.shape[0] < 5: # image in CHW im = im.transpose((1, 2, 0)) # reverse dataloader .transpose(2, 0, 1) im = im[..., :3] if im.ndim == 3 else np.tile(im[..., None], 3) # enforce 3ch input s = im.shape[:2] # HWC shape0.append(s) # image shape g = (size / max(s)) # gain shape1.append([y * g for y in s]) imgs[i] = im if im.data.contiguous else np.ascontiguousarray(im) # update shape1 = [make_divisible(x, self.stride) if self.pt else size for x in np.array(shape1).max(0)] # inf shape x = [letterbox(im, shape1, auto=False)[0] for im in imgs] # pad x = np.ascontiguousarray(np.array(x).transpose((0, 3, 1, 2))) # stack and BHWC to BCHW x = torch.from_numpy(x).to(p.device).type_as(p) / 255 # uint8 to fp16/32 t.append(time_sync()) with amp.autocast(autocast): # Inference y = self.model(x, augment, profile) # forward t.append(time_sync()) # Post-process y = non_max_suppression(y if self.dmb else y[0], self.conf, self.iou, self.classes, self.agnostic, self.multi_label, max_det=self.max_det) # NMS for i in range(n): scale_coords(shape1, y[i][:, :4], shape0[i]) t.append(time_sync()) return Detections(imgs, y, files, t, self.names, x.shape) class Detections: # YOLOv5 detections class for inference results def __init__(self, imgs, pred, files, times=(0, 0, 0, 0), names=None, shape=None): super().__init__() d = pred[0].device # device gn = [torch.tensor([*(im.shape[i] for i in [1, 0, 1, 0]), 1, 1], device=d) for im in imgs] # normalizations self.imgs = imgs # list of images as numpy arrays self.pred = pred # list of tensors pred[0] = (xyxy, conf, cls) self.names = names # class names self.files = files # image filenames self.times = times # profiling times self.xyxy = pred # xyxy pixels self.xywh = [xyxy2xywh(x) for x in pred] # xywh pixels self.xyxyn = [x / g for x, g in zip(self.xyxy, gn)] # xyxy normalized self.xywhn = [x / g for x, g in zip(self.xywh, gn)] # xywh normalized self.n = len(self.pred) # number of images (batch size) self.t = tuple((times[i + 1] - times[i]) * 1000 / self.n for i in range(3)) # timestamps (ms) self.s = shape # inference BCHW shape def display(self, pprint=False, show=False, save=False, crop=False, render=False, labels=True, save_dir=Path('')): crops = [] for i, (im, pred) in enumerate(zip(self.imgs, self.pred)): s = f'image {i + 1}/{len(self.pred)}: {im.shape[0]}x{im.shape[1]} ' # string if pred.shape[0]: for c in pred[:, -1].unique(): n = (pred[:, -1] == c).sum() # detections per class s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string if show or save or render or crop: annotator = Annotator(im, example=str(self.names)) for *box, conf, cls in reversed(pred): # xyxy, confidence, class label = f'{self.names[int(cls)]} {conf:.2f}' if crop: file = save_dir / 'crops' / self.names[int(cls)] / self.files[i] if save else None crops.append({ 'box': box, 'conf': conf, 'cls': cls, 'label': label, 'im': save_one_box(box, im, file=file, save=save)}) else: # all others annotator.box_label(box, label if labels else '', color=colors(cls)) im = annotator.im else: s += '(no detections)' im = Image.fromarray(im.astype(np.uint8)) if isinstance(im, np.ndarray) else im # from np if pprint: LOGGER.info(s.rstrip(', ')) if show: im.show(self.files[i]) # show if save: f = self.files[i] im.save(save_dir / f) # save if i == self.n - 1: LOGGER.info(f"Saved {self.n} image{'s' * (self.n > 1)} to {colorstr('bold', save_dir)}") if render: self.imgs[i] = np.asarray(im) if crop: if save: LOGGER.info(f'Saved results to {save_dir}\n') return crops def print(self): self.display(pprint=True) # print results LOGGER.info(f'Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {tuple(self.s)}' % self.t) def show(self, labels=True): self.display(show=True, labels=labels) # show results def save(self, labels=True, save_dir='runs/detect/exp'): save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True) # increment save_dir self.display(save=True, labels=labels, save_dir=save_dir) # save results def crop(self, save=True, save_dir='runs/detect/exp'): save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True) if save else None return self.display(crop=True, save=save, save_dir=save_dir) # crop results def render(self, labels=True): self.display(render=True, labels=labels) # render results return self.imgs def pandas(self): # return detections as pandas DataFrames, i.e. print(results.pandas().xyxy[0]) new = copy(self) # return copy ca = 'xmin', 'ymin', 'xmax', 'ymax', 'confidence', 'class', 'name' # xyxy columns cb = 'xcenter', 'ycenter', 'width', 'height', 'confidence', 'class', 'name' # xywh columns for k, c in zip(['xyxy', 'xyxyn', 'xywh', 'xywhn'], [ca, ca, cb, cb]): a = [[x[:5] + [int(x[5]), self.names[int(x[5])]] for x in x.tolist()] for x in getattr(self, k)] # update setattr(new, k, [pd.DataFrame(x, columns=c) for x in a]) return new def tolist(self): # return a list of Detections objects, i.e. 'for result in results.tolist():' r = range(self.n) # iterable x = [Detections([self.imgs[i]], [self.pred[i]], [self.files[i]], self.times, self.names, self.s) for i in r] # for d in x: # for k in ['imgs', 'pred', 'xyxy', 'xyxyn', 'xywh', 'xywhn']: # setattr(d, k, getattr(d, k)[0]) # pop out of list return x def __len__(self): return self.n class Classify(nn.Module): # Classification head, i.e. x(b,c1,20,20) to x(b,c2) def __init__(self, c1, c2, k=1, s=1, p=None, g=1): # ch_in, ch_out, kernel, stride, padding, groups super().__init__() self.aap = nn.AdaptiveAvgPool2d(1) # to x(b,c1,1,1) self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g) # to x(b,c2,1,1) self.flat = nn.Flatten() def forward(self, x): z = torch.cat([self.aap(y) for y in (x if isinstance(x, list) else [x])], 1) # cat if list return self.flat(self.conv(z)) # flatten to x(b,c2)