预测
This commit is contained in:
		
							
								
								
									
										177
									
								
								nets/CSPdarknet.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										177
									
								
								nets/CSPdarknet.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,177 @@ | ||||
| import torch | ||||
| import torch.nn as nn | ||||
|  | ||||
|  | ||||
| class SiLU(nn.Module): | ||||
|     @staticmethod | ||||
|     def forward(x): | ||||
|         return x * torch.sigmoid(x) | ||||
|  | ||||
| def autopad(k, p=None): | ||||
|     if p is None: | ||||
|         p = k // 2 if isinstance(k, int) else [x // 2 for x in k]  | ||||
|     return p | ||||
|  | ||||
| class Focus(nn.Module): | ||||
|     def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True):  # ch_in, ch_out, kernel, stride, padding, groups | ||||
|         super(Focus, self).__init__() | ||||
|         self.conv = Conv(c1 * 4, c2, k, s, p, g, act) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         # 320, 320, 12 => 320, 320, 64 | ||||
|         return self.conv( | ||||
|             # 640, 640, 3 => 320, 320, 12 | ||||
|             torch.cat( | ||||
|                 [ | ||||
|                     x[..., ::2, ::2],  | ||||
|                     x[..., 1::2, ::2],  | ||||
|                     x[..., ::2, 1::2],  | ||||
|                     x[..., 1::2, 1::2] | ||||
|                 ], 1 | ||||
|             ) | ||||
|         ) | ||||
|  | ||||
| class Conv(nn.Module): | ||||
|     def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): | ||||
|         super(Conv, self).__init__() | ||||
|         self.conv   = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False) | ||||
|         self.bn     = nn.BatchNorm2d(c2, eps=0.001, momentum=0.03) | ||||
|         self.act    = SiLU() if act is True else (act if isinstance(act, nn.Module) else nn.Identity()) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         return self.act(self.bn(self.conv(x))) | ||||
|  | ||||
|     def fuseforward(self, x): | ||||
|         return self.act(self.conv(x)) | ||||
|  | ||||
| class Bottleneck(nn.Module): | ||||
|     # Standard bottleneck | ||||
|     def __init__(self, c1, c2, shortcut=True, g=1, e=0.5):  # ch_in, ch_out, shortcut, groups, expansion | ||||
|         super(Bottleneck, self).__init__() | ||||
|         c_ = int(c2 * e)  # hidden channels | ||||
|         self.cv1 = Conv(c1, c_, 1, 1) | ||||
|         self.cv2 = Conv(c_, c2, 3, 1, g=g) | ||||
|         self.add = shortcut and c1 == c2 | ||||
|  | ||||
|     def forward(self, x): | ||||
|         return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) | ||||
|  | ||||
| class C3(nn.Module): | ||||
|     # CSP Bottleneck with 3 convolutions | ||||
|     def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):  # ch_in, ch_out, number, shortcut, groups, expansion | ||||
|         super(C3, self).__init__() | ||||
|         c_ = int(c2 * e)  # hidden channels | ||||
|         self.cv1 = Conv(c1, c_, 1, 1) | ||||
|         self.cv2 = Conv(c1, c_, 1, 1) | ||||
|         self.cv3 = Conv(2 * c_, c2, 1)  # act=FReLU(c2) | ||||
|         self.m = nn.Sequential(*[Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)]) | ||||
|         # self.m = nn.Sequential(*[CrossConv(c_, c_, 3, 1, g, 1.0, shortcut) for _ in range(n)]) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         return self.cv3(torch.cat( | ||||
|             ( | ||||
|                 self.m(self.cv1(x)),  | ||||
|                 self.cv2(x) | ||||
|             ) | ||||
|             , dim=1)) | ||||
|  | ||||
| class SPP(nn.Module): | ||||
|     # Spatial pyramid pooling layer used in YOLOv3-SPP | ||||
|     def __init__(self, c1, c2, k=(5, 9, 13)): | ||||
|         super(SPP, self).__init__() | ||||
|         c_ = c1 // 2  # hidden channels | ||||
|         self.cv1 = Conv(c1, c_, 1, 1) | ||||
|         self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1) | ||||
|         self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k]) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         x = self.cv1(x) | ||||
|         return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1)) | ||||
|          | ||||
| class CSPDarknet(nn.Module): | ||||
|     def __init__(self, base_channels, base_depth, phi, pretrained): | ||||
|         super().__init__() | ||||
|         #-----------------------------------------------# | ||||
|         #   输入图片是640, 640, 3 | ||||
|         #   初始的基本通道base_channels是64 | ||||
|         #-----------------------------------------------# | ||||
|  | ||||
|         #-----------------------------------------------# | ||||
|         #   利用focus网络结构进行特征提取 | ||||
|         #   640, 640, 3 -> 320, 320, 12 -> 320, 320, 64 | ||||
|         #-----------------------------------------------# | ||||
|         self.stem       = Focus(3, base_channels, k=3) | ||||
|          | ||||
|         #-----------------------------------------------# | ||||
|         #   完成卷积之后,320, 320, 64 -> 160, 160, 128 | ||||
|         #   完成CSPlayer之后,160, 160, 128 -> 160, 160, 128 | ||||
|         #-----------------------------------------------# | ||||
|         self.dark2 = nn.Sequential( | ||||
|             # 320, 320, 64 -> 160, 160, 128 | ||||
|             Conv(base_channels, base_channels * 2, 3, 2), | ||||
|             # 160, 160, 128 -> 160, 160, 128 | ||||
|             C3(base_channels * 2, base_channels * 2, base_depth), | ||||
|         ) | ||||
|          | ||||
|         #-----------------------------------------------# | ||||
|         #   完成卷积之后,160, 160, 128 -> 80, 80, 256 | ||||
|         #   完成CSPlayer之后,80, 80, 256 -> 80, 80, 256 | ||||
|         #                   在这里引出有效特征层80, 80, 256 | ||||
|         #                   进行加强特征提取网络FPN的构建 | ||||
|         #-----------------------------------------------# | ||||
|         self.dark3 = nn.Sequential( | ||||
|             Conv(base_channels * 2, base_channels * 4, 3, 2), | ||||
|             C3(base_channels * 4, base_channels * 4, base_depth * 3), | ||||
|         ) | ||||
|  | ||||
|         #-----------------------------------------------# | ||||
|         #   完成卷积之后,80, 80, 256 -> 40, 40, 512 | ||||
|         #   完成CSPlayer之后,40, 40, 512 -> 40, 40, 512 | ||||
|         #                   在这里引出有效特征层40, 40, 512 | ||||
|         #                   进行加强特征提取网络FPN的构建 | ||||
|         #-----------------------------------------------# | ||||
|         self.dark4 = nn.Sequential( | ||||
|             Conv(base_channels * 4, base_channels * 8, 3, 2), | ||||
|             C3(base_channels * 8, base_channels * 8, base_depth * 3), | ||||
|         ) | ||||
|          | ||||
|         #-----------------------------------------------# | ||||
|         #   完成卷积之后,40, 40, 512 -> 20, 20, 1024 | ||||
|         #   完成SPP之后,20, 20, 1024 -> 20, 20, 1024 | ||||
|         #   完成CSPlayer之后,20, 20, 1024 -> 20, 20, 1024 | ||||
|         #-----------------------------------------------# | ||||
|         self.dark5 = nn.Sequential( | ||||
|             Conv(base_channels * 8, base_channels * 16, 3, 2), | ||||
|             SPP(base_channels * 16, base_channels * 16), | ||||
|             C3(base_channels * 16, base_channels * 16, base_depth, shortcut=False), | ||||
|         ) | ||||
|         if pretrained: | ||||
|             url = { | ||||
|                 's' : 'https://github.com/bubbliiiing/yolov5-pytorch/releases/download/v1.0/cspdarknet_s_backbone.pth', | ||||
|                 'm' : 'https://github.com/bubbliiiing/yolov5-pytorch/releases/download/v1.0/cspdarknet_m_backbone.pth', | ||||
|                 'l' : 'https://github.com/bubbliiiing/yolov5-pytorch/releases/download/v1.0/cspdarknet_l_backbone.pth', | ||||
|                 'x' : 'https://github.com/bubbliiiing/yolov5-pytorch/releases/download/v1.0/cspdarknet_x_backbone.pth', | ||||
|             }[phi] | ||||
|             checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu", model_dir="./model_data") | ||||
|             self.load_state_dict(checkpoint, strict=False) | ||||
|             print("Load weights from ", url.split('/')[-1]) | ||||
|              | ||||
|     def forward(self, x): | ||||
|         x = self.stem(x) | ||||
|         x = self.dark2(x) | ||||
|         #-----------------------------------------------# | ||||
|         #   dark3的输出为80, 80, 256,是一个有效特征层 | ||||
|         #-----------------------------------------------# | ||||
|         x = self.dark3(x) | ||||
|         feat1 = x | ||||
|         #-----------------------------------------------# | ||||
|         #   dark4的输出为40, 40, 512,是一个有效特征层 | ||||
|         #-----------------------------------------------# | ||||
|         x = self.dark4(x) | ||||
|         feat2 = x | ||||
|         #-----------------------------------------------# | ||||
|         #   dark5的输出为20, 20, 1024,是一个有效特征层 | ||||
|         #-----------------------------------------------# | ||||
|         x = self.dark5(x) | ||||
|         feat3 = x | ||||
|         return feat1, feat2, feat3 | ||||
							
								
								
									
										249
									
								
								nets/ConvNext.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										249
									
								
								nets/ConvNext.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,249 @@ | ||||
| # Copyright (c) Meta Platforms, Inc. and affiliates. | ||||
|  | ||||
| # All rights reserved. | ||||
|  | ||||
| # This source code is licensed under the license found in the | ||||
| # LICENSE file in the root directory of this source tree. | ||||
|  | ||||
|  | ||||
| import math | ||||
|  | ||||
| import numpy as np | ||||
| import torch | ||||
| import torch.nn as nn | ||||
| import torch.nn.functional as F | ||||
|  | ||||
|  | ||||
| def drop_path(x, drop_prob: float = 0., training: bool = False, scale_by_keep: bool = True): | ||||
|     if drop_prob == 0. or not training: | ||||
|         return x | ||||
|     keep_prob       = 1 - drop_prob | ||||
|     shape           = (x.shape[0],) + (1,) * (x.ndim - 1) | ||||
|     random_tensor   = x.new_empty(shape).bernoulli_(keep_prob) | ||||
|     if keep_prob > 0.0 and scale_by_keep: | ||||
|         random_tensor.div_(keep_prob) | ||||
|     return x * random_tensor | ||||
|  | ||||
| class DropPath(nn.Module): | ||||
|     """ | ||||
|     Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks). | ||||
|     """ | ||||
|     def __init__(self, drop_prob=None, scale_by_keep=True): | ||||
|         super(DropPath, self).__init__() | ||||
|         self.drop_prob = drop_prob | ||||
|         self.scale_by_keep = scale_by_keep | ||||
|  | ||||
|     def forward(self, x): | ||||
|         return drop_path(x, self.drop_prob, self.training, self.scale_by_keep) | ||||
|  | ||||
| def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.): | ||||
|     def _no_grad_trunc_normal_(tensor, mean, std, a, b): | ||||
|         def norm_cdf(x): | ||||
|             return (1. + math.erf(x / math.sqrt(2.))) / 2. | ||||
|  | ||||
|         with torch.no_grad(): | ||||
|             l = norm_cdf((a - mean) / std) | ||||
|             u = norm_cdf((b - mean) / std) | ||||
|  | ||||
|             tensor.uniform_(2 * l - 1, 2 * u - 1) | ||||
|             tensor.erfinv_() | ||||
|  | ||||
|             tensor.mul_(std * math.sqrt(2.)) | ||||
|             tensor.add_(mean) | ||||
|  | ||||
|             tensor.clamp_(min=a, max=b) | ||||
|             return tensor | ||||
|     return _no_grad_trunc_normal_(tensor, mean, std, a, b) | ||||
|  | ||||
| #--------------------------------------# | ||||
| #   Gelu激活函数的实现 | ||||
| #   利用近似的数学公式 | ||||
| #--------------------------------------# | ||||
| class GELU(nn.Module): | ||||
|     def __init__(self): | ||||
|         super(GELU, self).__init__() | ||||
|  | ||||
|     def forward(self, x): | ||||
|         return 0.5 * x * (1 + torch.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * torch.pow(x,3)))) | ||||
|      | ||||
| #---------------------------------------------------------------------------------# | ||||
| #   LayerNorm 支持两种形式channels_last (default) or channels_first.  | ||||
| #   channels_last   对应具有形状的输入(batch_size, height, width, channels)  | ||||
| #   channels_first  对应具有形状的输入(batch_size, channels, height, width).    | ||||
| #---------------------------------------------------------------------------------# | ||||
| class LayerNorm(nn.Module): | ||||
|     def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"): | ||||
|         super().__init__() | ||||
|         self.weight = nn.Parameter(torch.ones(normalized_shape)) | ||||
|         self.bias   = nn.Parameter(torch.zeros(normalized_shape)) | ||||
|         self.eps = eps | ||||
|         self.data_format = data_format | ||||
|         if self.data_format not in ["channels_last", "channels_first"]: | ||||
|             raise NotImplementedError  | ||||
|         self.normalized_shape = (normalized_shape, ) | ||||
|      | ||||
|     def forward(self, x): | ||||
|         if self.data_format == "channels_last": | ||||
|             return F.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps) | ||||
|         elif self.data_format == "channels_first": | ||||
|             u = x.mean(1, keepdim=True) | ||||
|             s = (x - u).pow(2).mean(1, keepdim=True) | ||||
|             x = (x - u) / torch.sqrt(s + self.eps) | ||||
|             x = self.weight[:, None, None] * x + self.bias[:, None, None] | ||||
|             return x | ||||
|  | ||||
| #--------------------------------------------------------------------------------------------------------------# | ||||
| #   ConvNeXt Block有两种等效的实现: | ||||
| #   (1) DwConv -> LayerNorm (channels_first) -> 1x1 Conv -> GELU -> 1x1 Conv; all in (N, C, H, W) | ||||
| #   (2) DwConv -> Permute to (N, H, W, C); LayerNorm (channels_last) -> Linear -> GELU -> Linear; Permute back | ||||
| #   代码中使用(2),因为这个在PyTorch中稍微快一点 | ||||
| #--------------------------------------------------------------------------------------------------------------# | ||||
| class Block(nn.Module): | ||||
|     def __init__(self, dim, drop_path=0., layer_scale_init_value=1e-6): | ||||
|         super().__init__() | ||||
|         #--------------------------# | ||||
|         #   7x7的逐层卷积 | ||||
|         #--------------------------# | ||||
|         self.dwconv     = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim) | ||||
|         self.norm       = LayerNorm(dim, eps=1e-6) | ||||
|         #--------------------------# | ||||
|         #   利用全连接层代替1x1卷积 | ||||
|         #--------------------------# | ||||
|         self.pwconv1    = nn.Linear(dim, 4 * dim) | ||||
|         self.act        = GELU() | ||||
|         #--------------------------# | ||||
|         #   利用全连接层代替1x1卷积 | ||||
|         #--------------------------# | ||||
|         self.pwconv2    = nn.Linear(4 * dim, dim) | ||||
|         #--------------------------# | ||||
|         #   加入缩放系数 | ||||
|         #--------------------------# | ||||
|         self.gamma      = nn.Parameter(layer_scale_init_value * torch.ones((dim)), requires_grad=True) if layer_scale_init_value > 0 else None | ||||
|         #--------------------------# | ||||
|         #   加入Drop_path正则化 | ||||
|         #--------------------------# | ||||
|         self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity() | ||||
|  | ||||
|     def forward(self, x): | ||||
|         input = x | ||||
|         #--------------------------# | ||||
|         #   7x7的逐层卷积 | ||||
|         #--------------------------# | ||||
|         x = self.dwconv(x) | ||||
|         x = x.permute(0, 2, 3, 1) # (N, C, H, W) -> (N, H, W, C) | ||||
|         x = self.norm(x) | ||||
|         #--------------------------# | ||||
|         #   利用全连接层代替1x1卷积 | ||||
|         #--------------------------# | ||||
|         x = self.pwconv1(x) | ||||
|         x = self.act(x) | ||||
|         #--------------------------# | ||||
|         #   利用全连接层代替1x1卷积 | ||||
|         #--------------------------# | ||||
|         x = self.pwconv2(x) | ||||
|         #--------------------------# | ||||
|         #   加入缩放系数 | ||||
|         #--------------------------# | ||||
|         if self.gamma is not None: | ||||
|             x = self.gamma * x | ||||
|         x = x.permute(0, 3, 1, 2) # (N, H, W, C) -> (N, C, H, W) | ||||
|         #--------------------------# | ||||
|         #   加入Drop_path正则化 | ||||
|         #--------------------------# | ||||
|         x = input + self.drop_path(x) | ||||
|         return x | ||||
|  | ||||
| #-----------------------------------------------------# | ||||
| #   ConvNeXt | ||||
| #   A PyTorch impl of : `A ConvNet for the 2020s` | ||||
| #   https://arxiv.org/pdf/2201.03545.pdf | ||||
| #-----------------------------------------------------# | ||||
| class ConvNeXt(nn.Module): | ||||
|     def __init__( | ||||
|         self, in_chans=3, num_classes=1000, depths=[3, 3, 9, 3], dims=[96, 192, 384, 768],  | ||||
|         drop_path_rate=0., layer_scale_init_value=1e-6, head_init_scale=1., **kwargs | ||||
|     ): | ||||
|         super().__init__() | ||||
|  | ||||
|         self.downsample_layers = nn.ModuleList() | ||||
|         #--------------------------------------------------# | ||||
|         #   bs, 3, 224, 224 -> bs, 96, 56, 56 | ||||
|         #--------------------------------------------------# | ||||
|         stem = nn.Sequential( | ||||
|             nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4), | ||||
|             LayerNorm(dims[0], eps=1e-6, data_format="channels_first") | ||||
|         ) | ||||
|         self.downsample_layers.append(stem) | ||||
|          | ||||
|         #--------------------------------------------------# | ||||
|         #   定义三次下采样的过程 | ||||
|         #   利用步长为2x2,卷积核大小为2x2的卷积进行下采样 | ||||
|         #--------------------------------------------------# | ||||
|         for i in range(3): | ||||
|             downsample_layer = nn.Sequential( | ||||
|                 LayerNorm(dims[i], eps=1e-6, data_format="channels_first"), | ||||
|                 nn.Conv2d(dims[i], dims[i+1], kernel_size=2, stride=2), | ||||
|             ) | ||||
|             self.downsample_layers.append(downsample_layer) | ||||
|  | ||||
|         #--------------------------------------------------# | ||||
|         #   根据深度的不同,定义不同的drop率 | ||||
|         #--------------------------------------------------# | ||||
|         self.stages = nn.ModuleList() | ||||
|         dp_rates    = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]  | ||||
|         cur         = 0 | ||||
|         #--------------------------------------------------# | ||||
|         #   整个ConvNeXt除了Stem外,存在四个Stage | ||||
|         #   每个Stage里面是多个ConvNeXt Block的堆叠。 | ||||
|         #--------------------------------------------------# | ||||
|         for i in range(4): | ||||
|             stage = nn.Sequential( | ||||
|                 *[Block(dim=dims[i], drop_path=dp_rates[cur + j], layer_scale_init_value=layer_scale_init_value) for j in range(depths[i])] | ||||
|             ) | ||||
|             self.stages.append(stage) | ||||
|             cur += depths[i] | ||||
|  | ||||
|         self.apply(self._init_weights) | ||||
|  | ||||
|     def _init_weights(self, m): | ||||
|         if isinstance(m, (nn.Conv2d, nn.Linear)): | ||||
|             trunc_normal_(m.weight, std=.02) | ||||
|             nn.init.constant_(m.bias, 0) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         outs = [] | ||||
|         for i in range(4): | ||||
|             x = self.downsample_layers[i](x) | ||||
|             x = self.stages[i](x) | ||||
|             if i != 0: | ||||
|                 outs.append(x) | ||||
|         return outs | ||||
|  | ||||
| model_urls = { | ||||
|     "convnext_tiny_1k"      : "https://github.com/bubbliiiing/yolov5-pytorch/releases/download/v1.0/convnext_tiny_1k_224_ema_no_jit.pth", | ||||
|     "convnext_small_1k"     : "https://github.com/bubbliiiing/yolov5-pytorch/releases/download/v1.0/convnext_small_1k_224_ema_no_jit.pth", | ||||
| } | ||||
|  | ||||
| #------------------------------------------------------# | ||||
| #   Tiny约等于Cspdarknet-L的尺寸 | ||||
| #------------------------------------------------------# | ||||
| def ConvNeXt_Tiny(pretrained=False, **kwargs): | ||||
|     model = ConvNeXt(depths=[3, 3, 9, 3], dims=[96, 192, 384, 768], **kwargs) | ||||
|     if pretrained: | ||||
|         url = model_urls['convnext_tiny_1k'] | ||||
|         checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu", model_dir="./model_data") | ||||
|         model.load_state_dict(checkpoint, strict=False) | ||||
|         print("Load weights from ", url.split('/')[-1]) | ||||
|     return model | ||||
|  | ||||
| #------------------------------------------------------# | ||||
| #   Tiny约等于Cspdarknet-X的尺寸 | ||||
| #------------------------------------------------------# | ||||
| def ConvNeXt_Small(pretrained=False, **kwargs): | ||||
|     model = ConvNeXt(depths=[3, 3, 27, 3], dims=[96, 192, 384, 768], **kwargs) | ||||
|     if pretrained: | ||||
|         url = model_urls['convnext_small_1k'] | ||||
|         checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu", model_dir="./model_data") | ||||
|         model.load_state_dict(checkpoint, strict=False) | ||||
|         print("Load weights from ", url.split('/')[-1]) | ||||
|     return model | ||||
							
								
								
									
										638
									
								
								nets/Swin_transformer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										638
									
								
								nets/Swin_transformer.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,638 @@ | ||||
| # -------------------------------------------------------- | ||||
| # Swin Transformer | ||||
| # Copyright (c) 2021 Microsoft | ||||
| # Licensed under The MIT License [see LICENSE for details] | ||||
| # Written by Ze Liu | ||||
| # -------------------------------------------------------- | ||||
| import math | ||||
|  | ||||
| import numpy as np | ||||
| import torch | ||||
| import torch.nn as nn | ||||
| import torch.nn.functional as F | ||||
| import torch.utils.checkpoint as checkpoint | ||||
|  | ||||
|  | ||||
| def _make_divisible(v, divisor, min_value=None): | ||||
|     if min_value is None: | ||||
|         min_value = divisor | ||||
|     new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) | ||||
|     if new_v < 0.9 * v: | ||||
|         new_v += divisor | ||||
|     return new_v | ||||
|  | ||||
| def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.): | ||||
|     def _no_grad_trunc_normal_(tensor, mean, std, a, b): | ||||
|         def norm_cdf(x): | ||||
|             return (1. + math.erf(x / math.sqrt(2.))) / 2. | ||||
|  | ||||
|         with torch.no_grad(): | ||||
|             l = norm_cdf((a - mean) / std) | ||||
|             u = norm_cdf((b - mean) / std) | ||||
|  | ||||
|             tensor.uniform_(2 * l - 1, 2 * u - 1) | ||||
|             tensor.erfinv_() | ||||
|  | ||||
|             tensor.mul_(std * math.sqrt(2.)) | ||||
|             tensor.add_(mean) | ||||
|  | ||||
|             tensor.clamp_(min=a, max=b) | ||||
|             return tensor | ||||
|     return _no_grad_trunc_normal_(tensor, mean, std, a, b) | ||||
|  | ||||
| #--------------------------------------# | ||||
| #   Gelu激活函数的实现 | ||||
| #   利用近似的数学公式 | ||||
| #--------------------------------------# | ||||
| class GELU(nn.Module): | ||||
|     def __init__(self): | ||||
|         super(GELU, self).__init__() | ||||
|  | ||||
|     def forward(self, x): | ||||
|         return 0.5 * x * (1 + torch.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * torch.pow(x,3)))) | ||||
|  | ||||
| #-------------------------------------------------------# | ||||
| #   对输入进来的图片进行高和宽的压缩 | ||||
| #   并且进行通道的扩张。 | ||||
| #-------------------------------------------------------# | ||||
| class PatchEmbed(nn.Module): | ||||
|     def __init__(self, img_size=[224, 224], patch_size=4, in_chans=3, embed_dim=96, norm_layer=None): | ||||
|         super().__init__() | ||||
|         # [224, 224] | ||||
|         self.img_size           = img_size | ||||
|         # [4, 4] | ||||
|         self.patch_size         = [patch_size, patch_size] | ||||
|         # [56, 56] | ||||
|         self.patches_resolution = [self.img_size[0] // self.patch_size[0], self.img_size[1] // self.patch_size[1]] | ||||
|  | ||||
|         # 3136 | ||||
|         self.num_patches        = self.patches_resolution[0] * self.patches_resolution[1] | ||||
|         # 3 | ||||
|         self.in_chans           = in_chans | ||||
|         # 96 | ||||
|         self.embed_dim          = embed_dim | ||||
|  | ||||
|         #-------------------------------------------------------# | ||||
|         #   bs, 224, 224, 3 -> bs, 56, 56, 96 | ||||
|         #-------------------------------------------------------# | ||||
|         self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size) | ||||
|         if norm_layer is not None: | ||||
|             self.norm = norm_layer(embed_dim) | ||||
|         else: | ||||
|             self.norm = None | ||||
|  | ||||
|     def forward(self, x): | ||||
|         B, C, H, W = x.shape | ||||
|         # FIXME look at relaxing size constraints | ||||
|         assert H == self.img_size[0] and W == self.img_size[1], \ | ||||
|             f"Input image size ({H}*{W}) doesn't match model ({self.img_size[0]} * {self.img_size[1]})." | ||||
|         #-------------------------------------------------------# | ||||
|         #   bs, 224, 224, 3 -> bs, 56, 56, 96 -> bs, 3136, 96 | ||||
|         #-------------------------------------------------------# | ||||
|         x = self.proj(x).flatten(2).transpose(1, 2) | ||||
|         if self.norm is not None: | ||||
|             x = self.norm(x) | ||||
|         return x | ||||
|  | ||||
| def window_partition(x, window_size): | ||||
|     B, H, W, C  = x.shape | ||||
|     #------------------------------------------------------------------# | ||||
|     #   bs, 56, 56, 96 -> bs, 8, 7, 8, 7, 96 -> bs * 64, 7, 7, 96 | ||||
|     #------------------------------------------------------------------# | ||||
|     x           = x.view(B, H // window_size, window_size, W // window_size, window_size, C) | ||||
|     windows     = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(-1, window_size, window_size, C) | ||||
|     return windows | ||||
|  | ||||
| def window_reverse(windows, window_size, H, W): | ||||
|     #------------------------------------------------------------------# | ||||
|     #   bs * 64, 7, 7, 96 -> bs, 8, 8, 7, 7, 96 -> bs, 56, 56, 96 | ||||
|     #------------------------------------------------------------------# | ||||
|     B = int(windows.shape[0] / (H * W / window_size / window_size)) | ||||
|     x = windows.view(B, H // window_size, W // window_size, window_size, window_size, -1) | ||||
|     x = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, H, W, -1) | ||||
|     return x | ||||
|  | ||||
|  | ||||
| class WindowAttention(nn.Module): | ||||
|     def __init__(self, dim, window_size, num_heads, qkv_bias=True, qk_scale=None, attn_drop=0., proj_drop=0.): | ||||
|         super().__init__() | ||||
|         self.dim            = dim | ||||
|         self.window_size    = window_size  # Wh, Ww | ||||
|         self.num_heads      = num_heads | ||||
|         head_dim            = dim // num_heads | ||||
|         self.scale          = qk_scale or head_dim ** -0.5 | ||||
|  | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   相对坐标矩阵,用于表示每个窗口内,其它点相对于自己的坐标 | ||||
|         #   由于相对坐标取值范围为-6 ~ +6。中间共13个值,因此需要13 * 13 | ||||
|         #   13 * 13, num_heads | ||||
|         #--------------------------------------------------------------------------# | ||||
|         self.relative_position_bias_table = nn.Parameter( | ||||
|             torch.zeros((2 * window_size[0] - 1) * (2 * window_size[1] - 1), num_heads) | ||||
|         )  | ||||
|          | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   该部分用于获取7x7的矩阵内部,其它特征点相对于自身相对坐标 | ||||
|         #--------------------------------------------------------------------------# | ||||
|         coords_h    = torch.arange(self.window_size[0]) | ||||
|         coords_w    = torch.arange(self.window_size[1]) | ||||
|         coords      = torch.stack(torch.meshgrid([coords_h, coords_w]))  # 2, Wh, Ww | ||||
|         coords_flatten  = torch.flatten(coords, 1)  # 2, Wh*Ww | ||||
|         relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]  # 2, Wh*Ww, Wh*Ww | ||||
|         relative_coords = relative_coords.permute(1, 2, 0).contiguous()  # Wh*Ww, Wh*Ww, 2 | ||||
|         relative_coords[:, :, 0]    += self.window_size[0] - 1  # shift to start from 0 | ||||
|         relative_coords[:, :, 1]    += self.window_size[1] - 1 | ||||
|         relative_coords[:, :, 0]    *= 2 * self.window_size[1] - 1 | ||||
|         relative_position_index     = relative_coords.sum(-1)  # Wh*Ww, Wh*Ww | ||||
|         self.register_buffer("relative_position_index", relative_position_index) | ||||
|  | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   乘积获得q、k、v,用于计算多头注意力机制 | ||||
|         #--------------------------------------------------------------------------# | ||||
|         self.qkv        = nn.Linear(dim, dim * 3, bias=qkv_bias) | ||||
|         self.attn_drop  = nn.Dropout(attn_drop) | ||||
|         self.proj       = nn.Linear(dim, dim) | ||||
|         self.proj_drop  = nn.Dropout(proj_drop) | ||||
|  | ||||
|         trunc_normal_(self.relative_position_bias_table, std=.02) | ||||
|         self.softmax = nn.Softmax(dim=-1) | ||||
|  | ||||
|     def forward(self, x, mask=None): | ||||
|         B_, N, C    = x.shape | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   bs * 64, 49, 96 -> bs * 64, 49, 96 * 3 ->  | ||||
|         #   bs * 64, 49, 3, num_heads, 32 -> 3, bs * 64, num_head, 49, 32     | ||||
|         #--------------------------------------------------------------------------# | ||||
|         qkv         = self.qkv(x).reshape(B_, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   bs * 64, num_head, 49, 32    | ||||
|         #--------------------------------------------------------------------------# | ||||
|         q, k, v     = qkv[0], qkv[1], qkv[2]  | ||||
|  | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   bs * 64, num_head, 49, 49 | ||||
|         #--------------------------------------------------------------------------# | ||||
|         q       = q * self.scale | ||||
|         attn    = (q @ k.transpose(-2, -1)) | ||||
|  | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   这一步是根据已经求得的注意力,加上相对坐标的偏执量 | ||||
|         #   形成最后的注意力 | ||||
|         #--------------------------------------------------------------------------# | ||||
|         relative_position_bias = self.relative_position_bias_table[self.relative_position_index.view(-1)].view( | ||||
|             self.window_size[0] * self.window_size[1], self.window_size[0] * self.window_size[1], -1)  # Wh*Ww,Wh*Ww,nH | ||||
|         relative_position_bias = relative_position_bias.permute(2, 0, 1).contiguous() | ||||
|         attn = attn + relative_position_bias.unsqueeze(0) | ||||
|  | ||||
|         #--------------------------------------------------------------------------# | ||||
|         #   加上mask,保证分区。 | ||||
|         #   bs * 64, num_head, 49, 49 | ||||
|         #--------------------------------------------------------------------------# | ||||
|         if mask is not None: | ||||
|             nW = mask.shape[0] | ||||
|             attn = attn.view(B_ // nW, nW, self.num_heads, N, N) + mask.unsqueeze(1).unsqueeze(0) | ||||
|             attn = attn.view(-1, self.num_heads, N, N) | ||||
|             attn = self.softmax(attn) | ||||
|         else: | ||||
|             attn = self.softmax(attn) | ||||
|  | ||||
|         attn = self.attn_drop(attn) | ||||
|  | ||||
|         #---------------------------------------------------------------------------------------# | ||||
|         #   bs * 64, num_head, 49, 49 @ bs * 64, num_head, 49, 32 -> bs * 64, num_head, 49, 32 | ||||
|         #     | ||||
|         #   bs * 64, num_head, 49, 32 -> bs * 64, 49, 96 | ||||
|         #---------------------------------------------------------------------------------------# | ||||
|         x = (attn @ v).transpose(1, 2).reshape(B_, N, C) | ||||
|         x = self.proj(x) | ||||
|         x = self.proj_drop(x) | ||||
|         return x | ||||
|  | ||||
|  | ||||
| def drop_path(x, drop_prob: float = 0., training: bool = False, scale_by_keep: bool = True): | ||||
|     """ | ||||
|     Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). | ||||
|     This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, | ||||
|     the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... | ||||
|     See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for | ||||
|     changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use | ||||
|     'survival rate' as the argument. | ||||
|     """ | ||||
|     if drop_prob == 0. or not training: | ||||
|         return x | ||||
|     keep_prob       = 1 - drop_prob | ||||
|     shape           = (x.shape[0],) + (1,) * (x.ndim - 1)  # work with diff dim tensors, not just 2D ConvNets | ||||
|     random_tensor   = x.new_empty(shape).bernoulli_(keep_prob) | ||||
|     if keep_prob > 0.0 and scale_by_keep: | ||||
|         random_tensor.div_(keep_prob) | ||||
|     return x * random_tensor | ||||
|  | ||||
|  | ||||
| class DropPath(nn.Module): | ||||
|     """ | ||||
|     Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks). | ||||
|     """ | ||||
|     def __init__(self, drop_prob=None, scale_by_keep=True): | ||||
|         super(DropPath, self).__init__() | ||||
|         self.drop_prob = drop_prob | ||||
|         self.scale_by_keep = scale_by_keep | ||||
|  | ||||
|     def forward(self, x): | ||||
|         return drop_path(x, self.drop_prob, self.training, self.scale_by_keep) | ||||
|  | ||||
|  | ||||
| #-------------------------------------------------------# | ||||
| #   两次全连接 | ||||
| #-------------------------------------------------------# | ||||
| class Mlp(nn.Module): | ||||
|     def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=GELU, drop=0.): | ||||
|         super().__init__() | ||||
|         out_features = out_features or in_features | ||||
|         hidden_features = hidden_features or in_features | ||||
|         self.fc1 = nn.Linear(in_features, hidden_features) | ||||
|         self.act = act_layer() | ||||
|         self.fc2 = nn.Linear(hidden_features, out_features) | ||||
|         self.drop = nn.Dropout(drop) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         x = self.fc1(x) | ||||
|         x = self.act(x) | ||||
|         x = self.drop(x) | ||||
|         x = self.fc2(x) | ||||
|         x = self.drop(x) | ||||
|         return x | ||||
|  | ||||
| #-------------------------------------------------------# | ||||
| #   每个阶段重复的基础模块 | ||||
| #   在这其中会使用WindowAttention进行特征提取 | ||||
| #-------------------------------------------------------# | ||||
| class SwinTransformerBlock(nn.Module): | ||||
|     def __init__(self, dim, input_resolution, num_heads, window_size=7, shift_size=0, | ||||
|                  mlp_ratio=4., qkv_bias=True, qk_scale=None, drop=0., attn_drop=0., drop_path=0., | ||||
|                  act_layer=GELU, norm_layer=nn.LayerNorm): | ||||
|         super().__init__() | ||||
|         self.dim                = dim | ||||
|         self.input_resolution   = input_resolution | ||||
|         self.num_heads          = num_heads | ||||
|         self.window_size        = window_size | ||||
|         self.shift_size         = shift_size | ||||
|  | ||||
|         self.mlp_ratio          = mlp_ratio | ||||
|         if min(self.input_resolution) <= self.window_size: | ||||
|             self.shift_size = 0 | ||||
|             self.window_size = min(self.input_resolution) | ||||
|         assert 0 <= self.shift_size < self.window_size, "shift_size must in 0-window_size" | ||||
|  | ||||
|         self.norm1  = norm_layer(dim) | ||||
|         self.attn   = WindowAttention( | ||||
|             dim,  | ||||
|             window_size = [self.window_size, self.window_size],  | ||||
|             num_heads   = num_heads, | ||||
|             qkv_bias    = qkv_bias,  | ||||
|             qk_scale    = qk_scale,  | ||||
|             attn_drop   = attn_drop,  | ||||
|             proj_drop   = drop | ||||
|         ) | ||||
|  | ||||
|         self.drop_path  = DropPath(drop_path) if drop_path > 0. else nn.Identity() | ||||
|         self.norm2      = norm_layer(dim) | ||||
|         mlp_hidden_dim  = int(dim * mlp_ratio) | ||||
|         self.mlp        = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop) | ||||
|  | ||||
|         if self.shift_size > 0: | ||||
|             #----------------------------------------------------------------# | ||||
|             #   由于进行特征提取时,会对输入的特征层进行的平移 | ||||
|             #   如: | ||||
|             #   [                                   [ | ||||
|             #       [1, 2, 3],                          [5, 6, 4],    | ||||
|             #       [4, 5, 6],          -->             [8, 9, 7], | ||||
|             #       [7, 8, 9],                          [1, 2, 3], | ||||
|             #   ]                                   ] | ||||
|             #   这一步的作用就是使得平移后的区域块只计算自己部分的注意力机制 | ||||
|             #----------------------------------------------------------------# | ||||
|             H, W = self.input_resolution | ||||
|             _H, _W  =  _make_divisible(H, self.window_size), _make_divisible(W, self.window_size), | ||||
|             img_mask = torch.zeros((1, _H, _W, 1))  # 1 H W 1 | ||||
|             h_slices = (slice(0, -self.window_size), | ||||
|                         slice(-self.window_size, -self.shift_size), | ||||
|                         slice(-self.shift_size, None)) | ||||
|             w_slices = (slice(0, -self.window_size), | ||||
|                         slice(-self.window_size, -self.shift_size), | ||||
|                         slice(-self.shift_size, None)) | ||||
|             cnt = 0 | ||||
|             for h in h_slices: | ||||
|                 for w in w_slices: | ||||
|                     img_mask[:, h, w, :] = cnt | ||||
|                     cnt += 1 | ||||
|  | ||||
|             mask_windows = window_partition(img_mask, self.window_size)  # nW, window_size, window_size, 1 | ||||
|             mask_windows = mask_windows.view(-1, self.window_size * self.window_size) | ||||
|             attn_mask       = mask_windows.unsqueeze(1) - mask_windows.unsqueeze(2) | ||||
|             attn_mask       = attn_mask.masked_fill(attn_mask != 0, float(-100.0)).masked_fill(attn_mask == 0, float(0.0)) | ||||
|             self.attn_mask  = attn_mask.cpu().numpy() | ||||
|         else: | ||||
|             self.attn_mask = None | ||||
|  | ||||
|     def forward(self, x): | ||||
|         H, W = self.input_resolution | ||||
|         B, L, C = x.shape | ||||
|         assert L == H * W, "input feature has wrong size" | ||||
|         #-----------------------------------------------# | ||||
|         #   bs, 3136, 96 -> bs, 56, 56, 96 | ||||
|         #-----------------------------------------------# | ||||
|         shortcut = x | ||||
|         x = self.norm1(x) | ||||
|         x = x.view(B, H, W, C) | ||||
|  | ||||
|         _H, _W  =  _make_divisible(H, self.window_size), _make_divisible(W, self.window_size), | ||||
|         x       = x.permute(0, 3, 1, 2) | ||||
|         x       = F.interpolate(x, [_H, _W], mode='bicubic', align_corners=False).permute(0, 2, 3, 1) | ||||
|  | ||||
|         #-----------------------------------------------# | ||||
|         #   进行特征层的平移 | ||||
|         #-----------------------------------------------# | ||||
|         if self.shift_size > 0: | ||||
|             shifted_x = torch.roll(x, shifts=(-self.shift_size, -self.shift_size), dims=(1, 2)) | ||||
|         else: | ||||
|             shifted_x = x | ||||
|         #------------------------------------------------------------------------------------------# | ||||
|         #   bs, 56, 56, 96 -> bs * 64, 7, 7, 96 -> bs * 64, 49, 96 | ||||
|         #------------------------------------------------------------------------------------------# | ||||
|         x_windows = window_partition(shifted_x, self.window_size)  # num_windows * B, window_size, window_size, C | ||||
|         x_windows = x_windows.view(-1, self.window_size * self.window_size, C)  # nW*B, window_size*window_size, C | ||||
|  | ||||
|         #-----------------------------------------------# | ||||
|         #   bs * 64, 49, 97 -> bs * 64, 49, 97 | ||||
|         #-----------------------------------------------# | ||||
|         if type(self.attn_mask) != type(None): | ||||
|             attn_mask = torch.tensor(self.attn_mask).cuda() if x.is_cuda else torch.tensor(self.attn_mask) | ||||
|         else: | ||||
|             attn_mask = None | ||||
|         attn_windows = self.attn(x_windows, mask=attn_mask)  # nW*B, window_size*window_size, C | ||||
|         #-----------------------------------------------# | ||||
|         #   bs * 64, 49, 97 -> bs, 56, 56, 96 | ||||
|         #-----------------------------------------------# | ||||
|         attn_windows = attn_windows.view(-1, self.window_size, self.window_size, C) | ||||
|         shifted_x = window_reverse(attn_windows, self.window_size, _H, _W)  # B H' W' C | ||||
|  | ||||
|         #-----------------------------------------------# | ||||
|         #   将特征层平移回来 | ||||
|         #-----------------------------------------------# | ||||
|         if self.shift_size > 0: | ||||
|             x = torch.roll(shifted_x, shifts=(self.shift_size, self.shift_size), dims=(1, 2)) | ||||
|         else: | ||||
|             x = shifted_x | ||||
|          | ||||
|         x = x.permute(0, 3, 1, 2) | ||||
|         x = F.interpolate(x, [H, W], mode='bicubic', align_corners=False).permute(0, 2, 3, 1) | ||||
|         #-----------------------------------------------# | ||||
|         #   bs, 3136, 96 | ||||
|         #-----------------------------------------------# | ||||
|         x = x.view(B, H * W, C) | ||||
|         #-----------------------------------------------# | ||||
|         #   FFN | ||||
|         #   bs, 3136, 96 | ||||
|         #-----------------------------------------------# | ||||
|         x = shortcut + self.drop_path(x) | ||||
|         x = x + self.drop_path(self.mlp(self.norm2(x))) | ||||
|  | ||||
|         return x | ||||
|  | ||||
| #-------------------------------------------------------# | ||||
| #   对输入进来的特征层进行高和宽的压缩 | ||||
| #   进行跨特征点的特征提取,提取完成后进行堆叠。 | ||||
| #-------------------------------------------------------# | ||||
| class PatchMerging(nn.Module): | ||||
|     def __init__(self, input_resolution, dim, norm_layer=nn.LayerNorm): | ||||
|         super().__init__() | ||||
|         self.input_resolution   = input_resolution | ||||
|         self.dim                = dim | ||||
|  | ||||
|         self.norm               = norm_layer(4 * dim) | ||||
|         self.reduction          = nn.Linear(4 * dim, 2 * dim, bias=False) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         H, W = self.input_resolution | ||||
|         B, L, C = x.shape | ||||
|         assert L == H * W, "input feature has wrong size" | ||||
|         assert H % 2 == 0 and W % 2 == 0, f"x size ({H}*{W}) are not even." | ||||
|  | ||||
|         #-------------------------------------------------------# | ||||
|         #   bs, 3136, 96 -> bs, 56, 56, 96 | ||||
|         #-------------------------------------------------------# | ||||
|         x = x.view(B, H, W, C) | ||||
|  | ||||
|         #-------------------------------------------------------# | ||||
|         #   x0 ~ x3   bs, 56, 56, 96 -> bs, 28, 28, 96 | ||||
|         #-------------------------------------------------------# | ||||
|         x0 = x[:, 0::2, 0::2, :]  # B H/2 W/2 C | ||||
|         x1 = x[:, 1::2, 0::2, :]  # B H/2 W/2 C | ||||
|         x2 = x[:, 0::2, 1::2, :]  # B H/2 W/2 C | ||||
|         x3 = x[:, 1::2, 1::2, :]  # B H/2 W/2 C | ||||
|          | ||||
|         #-------------------------------------------------------# | ||||
|         #   4 X bs, 28, 28, 96 -> bs, 28, 28, 384 | ||||
|         #-------------------------------------------------------# | ||||
|         x = torch.cat([x0, x1, x2, x3], -1)  # B H/2 W/2 4*C | ||||
|         #-------------------------------------------------------# | ||||
|         #   bs, 28, 28, 384 -> bs, 784, 384 | ||||
|         #-------------------------------------------------------# | ||||
|         x = x.view(B, -1, 4 * C)  # B H/2*W/2 4*C | ||||
|  | ||||
|         #-------------------------------------------------------# | ||||
|         #   bs, 784, 384 -> bs, 784, 192 | ||||
|         #-------------------------------------------------------# | ||||
|         x = self.norm(x) | ||||
|         x = self.reduction(x) | ||||
|         return x | ||||
|  | ||||
|  | ||||
| #-------------------------------------------------------# | ||||
| #   Swin-Transformer的基础模块。 | ||||
| #   使用窗口多头注意力机制进行特征提取。 | ||||
| #   使用PatchMerging进行高和宽的压缩。 | ||||
| #-------------------------------------------------------# | ||||
| class BasicLayer(nn.Module): | ||||
|     def __init__(self, dim, input_resolution, depth, num_heads, window_size, | ||||
|                  mlp_ratio=4., qkv_bias=True, qk_scale=None, drop=0., attn_drop=0., | ||||
|                  drop_path=0., norm_layer=nn.LayerNorm, downsample=None, use_checkpoint=False): | ||||
|         super().__init__() | ||||
|         #-------------------------------------------------------# | ||||
|         #   四个阶段对应不同的dim | ||||
|         #   [96, 192, 384, 768] | ||||
|         #-------------------------------------------------------# | ||||
|         self.dim                = dim | ||||
|         #-------------------------------------------------------# | ||||
|         #   四个阶段对应不同的输入分辨率 | ||||
|         #   [[56, 56], [28, 28], [14, 14], [7, 7]] | ||||
|         #-------------------------------------------------------# | ||||
|         self.input_resolution   = input_resolution | ||||
|         #-------------------------------------------------------# | ||||
|         #   四个阶段对应不同的多头注意力机制重复次数   | ||||
|         #   [2, 2, 6, 2] | ||||
|         #-------------------------------------------------------# | ||||
|         self.depth              = depth | ||||
|         self.use_checkpoint     = use_checkpoint | ||||
|  | ||||
|         #-------------------------------------------------------# | ||||
|         #   根据depth的次数利用窗口多头注意力机制进行特征提取。 | ||||
|         #-------------------------------------------------------# | ||||
|         self.blocks = nn.ModuleList( | ||||
|             [ | ||||
|                 SwinTransformerBlock( | ||||
|                     dim         = dim,  | ||||
|                     input_resolution = input_resolution, | ||||
|                     num_heads   = num_heads,  | ||||
|                     window_size = window_size, | ||||
|                     shift_size  = 0 if (i % 2 == 0) else window_size // 2, | ||||
|                     mlp_ratio   = mlp_ratio, | ||||
|                     qkv_bias    = qkv_bias,  | ||||
|                     qk_scale    = qk_scale, | ||||
|                     drop        = drop,  | ||||
|                     attn_drop   = attn_drop, | ||||
|                     drop_path   = drop_path[i] if isinstance(drop_path, list) else drop_path, | ||||
|                     norm_layer  = norm_layer | ||||
|                 ) | ||||
|                 for i in range(depth) | ||||
|             ] | ||||
|         ) | ||||
|  | ||||
|         if downsample is not None: | ||||
|             #-------------------------------------------------------# | ||||
|             #   判断是否要进行下采样,即:高宽压缩 | ||||
|             #-------------------------------------------------------# | ||||
|             self.downsample = downsample(input_resolution, dim=dim, norm_layer=norm_layer) | ||||
|         else: | ||||
|             self.downsample = None | ||||
|  | ||||
|     def forward(self, x): | ||||
|         for blk in self.blocks: | ||||
|             if self.use_checkpoint: | ||||
|                 x_ = checkpoint.checkpoint(blk, x) | ||||
|             else: | ||||
|                 x_ = blk(x) | ||||
|         if self.downsample is not None: | ||||
|             x = self.downsample(x_) | ||||
|         else: | ||||
|             x = x_ | ||||
|         return x_, x | ||||
|  | ||||
| class SwinTransformer(nn.Module): | ||||
|     def __init__(self, img_size=[640, 640], patch_size=4, in_chans=3, num_classes=1000, | ||||
|                  embed_dim=96, depths=[2, 2, 6, 2], num_heads=[3, 6, 12, 24], | ||||
|                  window_size=7, mlp_ratio=4., qkv_bias=True, qk_scale=None, | ||||
|                  drop_rate=0., attn_drop_rate=0., drop_path_rate=0.1, | ||||
|                  norm_layer=nn.LayerNorm, ape=False, patch_norm=True, | ||||
|                  use_checkpoint=False, **kwargs): | ||||
|         super().__init__() | ||||
|         self.num_classes    = num_classes | ||||
|         self.num_layers     = len(depths) | ||||
|         self.embed_dim      = embed_dim | ||||
|         self.ape            = ape | ||||
|         self.patch_norm     = patch_norm | ||||
|         self.num_features   = int(embed_dim * 2 ** (self.num_layers - 1)) | ||||
|         self.mlp_ratio      = mlp_ratio | ||||
|          | ||||
|         #--------------------------------------------------# | ||||
|         #   bs, 224, 224, 3 -> bs, 3136, 96 | ||||
|         #--------------------------------------------------# | ||||
|         self.patch_embed = PatchEmbed( | ||||
|             img_size    = img_size,  | ||||
|             patch_size  = patch_size, | ||||
|             in_chans    = in_chans,  | ||||
|             embed_dim   = embed_dim, | ||||
|             norm_layer  = norm_layer if self.patch_norm else None | ||||
|         ) | ||||
|  | ||||
|         #--------------------------------------------------# | ||||
|         #   PatchEmbed之后的图像序列长度        3136 | ||||
|         #   PatchEmbed之后的图像对应的分辨率    [56, 56] | ||||
|         #--------------------------------------------------# | ||||
|         num_patches             = self.patch_embed.num_patches | ||||
|         patches_resolution      = self.patch_embed.patches_resolution | ||||
|         self.patches_resolution = patches_resolution | ||||
|  | ||||
|         if self.ape: | ||||
|             self.absolute_pos_embed = nn.Parameter(torch.zeros(1, num_patches, embed_dim)) | ||||
|             trunc_normal_(self.absolute_pos_embed, std=.02) | ||||
|  | ||||
|         self.pos_drop = nn.Dropout(p=drop_rate) | ||||
|  | ||||
|         #--------------------------------------------------# | ||||
|         #   stochastic depth | ||||
|         #--------------------------------------------------# | ||||
|         dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]  # stochastic depth decay rule | ||||
|  | ||||
|         #---------------------------------------------------------------# | ||||
|         #   构建swin-transform的每个阶段 | ||||
|         #   bs, 3136, 96 -> bs, 784, 192 -> bs, 196, 384 -> bs, 49, 768 | ||||
|         #---------------------------------------------------------------# | ||||
|         self.layers = nn.ModuleList() | ||||
|         for i_layer in range(self.num_layers): | ||||
|             layer = BasicLayer( | ||||
|                 dim                 = int(embed_dim * 2 ** i_layer), | ||||
|                 input_resolution    = (patches_resolution[0] // (2 ** i_layer), patches_resolution[1] // (2 ** i_layer)), | ||||
|                 depth               = depths[i_layer], | ||||
|                 num_heads           = num_heads[i_layer], | ||||
|                 window_size         = window_size, | ||||
|                 mlp_ratio           = self.mlp_ratio, | ||||
|                 qkv_bias            = qkv_bias,  | ||||
|                 qk_scale            = qk_scale, | ||||
|                 drop                = drop_rate,  | ||||
|                 attn_drop           = attn_drop_rate, | ||||
|                 drop_path           = dpr[sum(depths[:i_layer]):sum(depths[:i_layer + 1])], | ||||
|                 norm_layer          = norm_layer, | ||||
|                 downsample          = PatchMerging if (i_layer < self.num_layers - 1) else None, | ||||
|                 use_checkpoint      = use_checkpoint | ||||
|             ) | ||||
|             self.layers.append(layer) | ||||
|  | ||||
|         self.apply(self._init_weights) | ||||
|  | ||||
|     def _init_weights(self, m): | ||||
|         if isinstance(m, nn.Linear): | ||||
|             trunc_normal_(m.weight, std=.02) | ||||
|             if isinstance(m, nn.Linear) and m.bias is not None: | ||||
|                 nn.init.constant_(m.bias, 0) | ||||
|         elif isinstance(m, nn.LayerNorm): | ||||
|             nn.init.constant_(m.bias, 0) | ||||
|             nn.init.constant_(m.weight, 1.0) | ||||
|  | ||||
|     @torch.jit.ignore | ||||
|     def no_weight_decay(self): | ||||
|         return {'absolute_pos_embed'} | ||||
|  | ||||
|     @torch.jit.ignore | ||||
|     def no_weight_decay_keywords(self): | ||||
|         return {'relative_position_bias_table'} | ||||
|  | ||||
|     def forward(self, x): | ||||
|         x = self.patch_embed(x) | ||||
|         if self.ape: | ||||
|             x = x + self.absolute_pos_embed | ||||
|         x = self.pos_drop(x) | ||||
|  | ||||
|         inverval_outs = [] | ||||
|         for i, layer in enumerate(self.layers): | ||||
|             x_, x = layer(x) | ||||
|             if i != 0: | ||||
|                 inverval_outs.append(x_) | ||||
|          | ||||
|         outs = [] | ||||
|         for i, layer in enumerate(inverval_outs): | ||||
|             H, W    = (self.patches_resolution[0] // (2 ** (i + 1)), self.patches_resolution[1] // (2 ** (i + 1))) | ||||
|             B, L, C = layer.shape | ||||
|             layer   = layer.view([B, H, W, C]).permute([0, 3, 1, 2]) | ||||
|             outs.append(layer) | ||||
|  | ||||
|         return outs | ||||
|      | ||||
| def Swin_transformer_Tiny(pretrained = False, input_shape = [640, 640], **kwargs): | ||||
|     model = SwinTransformer(input_shape, depths=[2, 2, 6, 2], **kwargs) | ||||
|     if pretrained: | ||||
|         url = "https://github.com/bubbliiiing/yolov5-pytorch/releases/download/v1.0/swin_tiny_patch4_window7.pth" | ||||
|         checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu", model_dir="./model_data") | ||||
|         model.load_state_dict(checkpoint, strict=False) | ||||
|         print("Load weights from ", url.split('/')[-1]) | ||||
|          | ||||
|     return model | ||||
							
								
								
									
										1
									
								
								nets/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								nets/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| # | ||||
							
								
								
									
										132
									
								
								nets/yolo.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										132
									
								
								nets/yolo.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,132 @@ | ||||
| import torch | ||||
| import torch.nn as nn | ||||
|  | ||||
| from nets.ConvNext import ConvNeXt_Small, ConvNeXt_Tiny | ||||
| from nets.CSPdarknet import C3, Conv, CSPDarknet | ||||
| from nets.Swin_transformer import Swin_transformer_Tiny | ||||
|  | ||||
|  | ||||
| #---------------------------------------------------# | ||||
| #   yolo_body | ||||
| #---------------------------------------------------# | ||||
| class YoloBody(nn.Module): | ||||
|     def __init__(self, anchors_mask, num_classes, phi, backbone='cspdarknet', pretrained=False, input_shape=[640, 640]): | ||||
|         super(YoloBody, self).__init__() | ||||
|         depth_dict          = {'s' : 0.33, 'm' : 0.67, 'l' : 1.00, 'x' : 1.33,} | ||||
|         width_dict          = {'s' : 0.50, 'm' : 0.75, 'l' : 1.00, 'x' : 1.25,} | ||||
|         dep_mul, wid_mul    = depth_dict[phi], width_dict[phi] | ||||
|  | ||||
|         base_channels       = int(wid_mul * 64)  # 64 | ||||
|         base_depth          = max(round(dep_mul * 3), 1)  # 3 | ||||
|         #-----------------------------------------------# | ||||
|         #   输入图片是640, 640, 3 | ||||
|         #   初始的基本通道是64 | ||||
|         #-----------------------------------------------# | ||||
|         self.backbone_name  = backbone | ||||
|         if backbone == "cspdarknet": | ||||
|             #---------------------------------------------------#    | ||||
|             #   生成CSPdarknet53的主干模型 | ||||
|             #   获得三个有效特征层,他们的shape分别是: | ||||
|             #   80,80,256 | ||||
|             #   40,40,512 | ||||
|             #   20,20,1024 | ||||
|             #---------------------------------------------------# | ||||
|             self.backbone   = CSPDarknet(base_channels, base_depth, phi, pretrained) | ||||
|         else: | ||||
|             #---------------------------------------------------#    | ||||
|             #   如果输入不为cspdarknet,则调整通道数 | ||||
|             #   使其符合YoloV5的格式 | ||||
|             #---------------------------------------------------# | ||||
|             self.backbone       = { | ||||
|                 'convnext_tiny'         : ConvNeXt_Tiny, | ||||
|                 'convnext_small'        : ConvNeXt_Small, | ||||
|                 'swin_transfomer_tiny'  : Swin_transformer_Tiny, | ||||
|             }[backbone](pretrained=pretrained, input_shape=input_shape) | ||||
|             in_channels         = { | ||||
|                 'convnext_tiny'         : [192, 384, 768], | ||||
|                 'convnext_small'        : [192, 384, 768], | ||||
|                 'swin_transfomer_tiny'  : [192, 384, 768], | ||||
|             }[backbone] | ||||
|             feat1_c, feat2_c, feat3_c = in_channels  | ||||
|             self.conv_1x1_feat1 = Conv(feat1_c, base_channels * 4, 1, 1) | ||||
|             self.conv_1x1_feat2 = Conv(feat2_c, base_channels * 8, 1, 1) | ||||
|             self.conv_1x1_feat3 = Conv(feat3_c, base_channels * 16, 1, 1) | ||||
|              | ||||
|         self.upsample   = nn.Upsample(scale_factor=2, mode="nearest") | ||||
|  | ||||
|         self.conv_for_feat3         = Conv(base_channels * 16, base_channels * 8, 1, 1) | ||||
|         self.conv3_for_upsample1    = C3(base_channels * 16, base_channels * 8, base_depth, shortcut=False) | ||||
|  | ||||
|         self.conv_for_feat2         = Conv(base_channels * 8, base_channels * 4, 1, 1) | ||||
|         self.conv3_for_upsample2    = C3(base_channels * 8, base_channels * 4, base_depth, shortcut=False) | ||||
|  | ||||
|         self.down_sample1           = Conv(base_channels * 4, base_channels * 4, 3, 2) | ||||
|         self.conv3_for_downsample1  = C3(base_channels * 8, base_channels * 8, base_depth, shortcut=False) | ||||
|  | ||||
|         self.down_sample2           = Conv(base_channels * 8, base_channels * 8, 3, 2) | ||||
|         self.conv3_for_downsample2  = C3(base_channels * 16, base_channels * 16, base_depth, shortcut=False) | ||||
|  | ||||
|         # 80, 80, 256 => 80, 80, 3 * (5 + num_classes) => 80, 80, 3 * (4 + 1 + num_classes) | ||||
|         self.yolo_head_P3 = nn.Conv2d(base_channels * 4, len(anchors_mask[2]) * (5 + num_classes), 1) | ||||
|         # 40, 40, 512 => 40, 40, 3 * (5 + num_classes) => 40, 40, 3 * (4 + 1 + num_classes) | ||||
|         self.yolo_head_P4 = nn.Conv2d(base_channels * 8, len(anchors_mask[1]) * (5 + num_classes), 1) | ||||
|         # 20, 20, 1024 => 20, 20, 3 * (5 + num_classes) => 20, 20, 3 * (4 + 1 + num_classes) | ||||
|         self.yolo_head_P5 = nn.Conv2d(base_channels * 16, len(anchors_mask[0]) * (5 + num_classes), 1) | ||||
|  | ||||
|     def forward(self, x): | ||||
|         #  backbone | ||||
|         feat1, feat2, feat3 = self.backbone(x) | ||||
|         if self.backbone_name != "cspdarknet": | ||||
|             feat1 = self.conv_1x1_feat1(feat1) | ||||
|             feat2 = self.conv_1x1_feat2(feat2) | ||||
|             feat3 = self.conv_1x1_feat3(feat3) | ||||
|  | ||||
|         # 20, 20, 1024 -> 20, 20, 512 | ||||
|         P5          = self.conv_for_feat3(feat3) | ||||
|         # 20, 20, 512 -> 40, 40, 512 | ||||
|         P5_upsample = self.upsample(P5) | ||||
|         # 40, 40, 512 -> 40, 40, 1024 | ||||
|         P4          = torch.cat([P5_upsample, feat2], 1) | ||||
|         # 40, 40, 1024 -> 40, 40, 512 | ||||
|         P4          = self.conv3_for_upsample1(P4) | ||||
|  | ||||
|         # 40, 40, 512 -> 40, 40, 256 | ||||
|         P4          = self.conv_for_feat2(P4) | ||||
|         # 40, 40, 256 -> 80, 80, 256 | ||||
|         P4_upsample = self.upsample(P4) | ||||
|         # 80, 80, 256 cat 80, 80, 256 -> 80, 80, 512 | ||||
|         P3          = torch.cat([P4_upsample, feat1], 1) | ||||
|         # 80, 80, 512 -> 80, 80, 256 | ||||
|         P3          = self.conv3_for_upsample2(P3) | ||||
|          | ||||
|         # 80, 80, 256 -> 40, 40, 256 | ||||
|         P3_downsample = self.down_sample1(P3) | ||||
|         # 40, 40, 256 cat 40, 40, 256 -> 40, 40, 512 | ||||
|         P4 = torch.cat([P3_downsample, P4], 1) | ||||
|         # 40, 40, 512 -> 40, 40, 512 | ||||
|         P4 = self.conv3_for_downsample1(P4) | ||||
|  | ||||
|         # 40, 40, 512 -> 20, 20, 512 | ||||
|         P4_downsample = self.down_sample2(P4) | ||||
|         # 20, 20, 512 cat 20, 20, 512 -> 20, 20, 1024 | ||||
|         P5 = torch.cat([P4_downsample, P5], 1) | ||||
|         # 20, 20, 1024 -> 20, 20, 1024 | ||||
|         P5 = self.conv3_for_downsample2(P5) | ||||
|  | ||||
|         #---------------------------------------------------# | ||||
|         #   第三个特征层 | ||||
|         #   y3=(batch_size,75,80,80) | ||||
|         #---------------------------------------------------# | ||||
|         out2 = self.yolo_head_P3(P3) | ||||
|         #---------------------------------------------------# | ||||
|         #   第二个特征层 | ||||
|         #   y2=(batch_size,75,40,40) | ||||
|         #---------------------------------------------------# | ||||
|         out1 = self.yolo_head_P4(P4) | ||||
|         #---------------------------------------------------# | ||||
|         #   第一个特征层 | ||||
|         #   y1=(batch_size,75,20,20) | ||||
|         #---------------------------------------------------# | ||||
|         out0 = self.yolo_head_P5(P5) | ||||
|         return out0, out1, out2 | ||||
|  | ||||
							
								
								
									
										465
									
								
								nets/yolo_training.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										465
									
								
								nets/yolo_training.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,465 @@ | ||||
| import math | ||||
| from copy import deepcopy | ||||
| from functools import partial | ||||
|  | ||||
| import numpy as np | ||||
| import torch | ||||
| import torch.nn as nn | ||||
|  | ||||
|  | ||||
| class YOLOLoss(nn.Module): | ||||
|     def __init__(self, anchors, num_classes, input_shape, cuda, anchors_mask = [[6,7,8], [3,4,5], [0,1,2]], label_smoothing = 0): | ||||
|         super(YOLOLoss, self).__init__() | ||||
|         #-----------------------------------------------------------# | ||||
|         #   20x20的特征层对应的anchor是[116,90],[156,198],[373,326] | ||||
|         #   40x40的特征层对应的anchor是[30,61],[62,45],[59,119] | ||||
|         #   80x80的特征层对应的anchor是[10,13],[16,30],[33,23] | ||||
|         #-----------------------------------------------------------# | ||||
|         self.anchors        = anchors | ||||
|         self.num_classes    = num_classes | ||||
|         self.bbox_attrs     = 5 + num_classes | ||||
|         self.input_shape    = input_shape | ||||
|         self.anchors_mask   = anchors_mask | ||||
|         self.label_smoothing = label_smoothing | ||||
|  | ||||
|         self.threshold      = 4 | ||||
|  | ||||
|         self.balance        = [0.4, 1.0, 4] | ||||
|         self.box_ratio      = 0.05 | ||||
|         self.obj_ratio      = 1 * (input_shape[0] * input_shape[1]) / (640 ** 2) | ||||
|         self.cls_ratio      = 0.5 * (num_classes / 80) | ||||
|         self.cuda = cuda | ||||
|  | ||||
|     def clip_by_tensor(self, t, t_min, t_max): | ||||
|         t = t.float() | ||||
|         result = (t >= t_min).float() * t + (t < t_min).float() * t_min | ||||
|         result = (result <= t_max).float() * result + (result > t_max).float() * t_max | ||||
|         return result | ||||
|  | ||||
|     def MSELoss(self, pred, target): | ||||
|         return torch.pow(pred - target, 2) | ||||
|  | ||||
|     def BCELoss(self, pred, target): | ||||
|         epsilon = 1e-7 | ||||
|         pred    = self.clip_by_tensor(pred, epsilon, 1.0 - epsilon) | ||||
|         output  = - target * torch.log(pred) - (1.0 - target) * torch.log(1.0 - pred) | ||||
|         return output | ||||
|          | ||||
|     def box_giou(self, b1, b2): | ||||
|         """ | ||||
|         输入为: | ||||
|         ---------- | ||||
|         b1: tensor, shape=(batch, feat_w, feat_h, anchor_num, 4), xywh | ||||
|         b2: tensor, shape=(batch, feat_w, feat_h, anchor_num, 4), xywh | ||||
|  | ||||
|         返回为: | ||||
|         ------- | ||||
|         giou: tensor, shape=(batch, feat_w, feat_h, anchor_num, 1) | ||||
|         """ | ||||
|         #----------------------------------------------------# | ||||
|         #   求出预测框左上角右下角 | ||||
|         #----------------------------------------------------# | ||||
|         b1_xy       = b1[..., :2] | ||||
|         b1_wh       = b1[..., 2:4] | ||||
|         b1_wh_half  = b1_wh/2. | ||||
|         b1_mins     = b1_xy - b1_wh_half | ||||
|         b1_maxes    = b1_xy + b1_wh_half | ||||
|         #----------------------------------------------------# | ||||
|         #   求出真实框左上角右下角 | ||||
|         #----------------------------------------------------# | ||||
|         b2_xy       = b2[..., :2] | ||||
|         b2_wh       = b2[..., 2:4] | ||||
|         b2_wh_half  = b2_wh/2. | ||||
|         b2_mins     = b2_xy - b2_wh_half | ||||
|         b2_maxes    = b2_xy + b2_wh_half | ||||
|  | ||||
|         #----------------------------------------------------# | ||||
|         #   求真实框和预测框所有的iou | ||||
|         #----------------------------------------------------# | ||||
|         intersect_mins  = torch.max(b1_mins, b2_mins) | ||||
|         intersect_maxes = torch.min(b1_maxes, b2_maxes) | ||||
|         intersect_wh    = torch.max(intersect_maxes - intersect_mins, torch.zeros_like(intersect_maxes)) | ||||
|         intersect_area  = intersect_wh[..., 0] * intersect_wh[..., 1] | ||||
|         b1_area         = b1_wh[..., 0] * b1_wh[..., 1] | ||||
|         b2_area         = b2_wh[..., 0] * b2_wh[..., 1] | ||||
|         union_area      = b1_area + b2_area - intersect_area | ||||
|         iou             = intersect_area / union_area | ||||
|  | ||||
|         #----------------------------------------------------# | ||||
|         #   找到包裹两个框的最小框的左上角和右下角 | ||||
|         #----------------------------------------------------# | ||||
|         enclose_mins    = torch.min(b1_mins, b2_mins) | ||||
|         enclose_maxes   = torch.max(b1_maxes, b2_maxes) | ||||
|         enclose_wh      = torch.max(enclose_maxes - enclose_mins, torch.zeros_like(intersect_maxes)) | ||||
|         #----------------------------------------------------# | ||||
|         #   计算对角线距离 | ||||
|         #----------------------------------------------------# | ||||
|         enclose_area    = enclose_wh[..., 0] * enclose_wh[..., 1] | ||||
|         giou            = iou - (enclose_area - union_area) / enclose_area | ||||
|          | ||||
|         return giou | ||||
|  | ||||
|     #---------------------------------------------------# | ||||
|     #   平滑标签 | ||||
|     #---------------------------------------------------# | ||||
|     def smooth_labels(self, y_true, label_smoothing, num_classes): | ||||
|         return y_true * (1.0 - label_smoothing) + label_smoothing / num_classes | ||||
|  | ||||
|     def forward(self, l, input, targets=None, y_true=None): | ||||
|         #----------------------------------------------------# | ||||
|         #   l               代表使用的是第几个有效特征层 | ||||
|         #   input的shape为  bs, 3*(5+num_classes), 20, 20 | ||||
|         #                   bs, 3*(5+num_classes), 40, 40 | ||||
|         #                   bs, 3*(5+num_classes), 80, 80 | ||||
|         #   targets         真实框的标签情况 [batch_size, num_gt, 5] | ||||
|         #----------------------------------------------------# | ||||
|         #--------------------------------# | ||||
|         #   获得图片数量,特征层的高和宽 | ||||
|         #   20, 20 | ||||
|         #--------------------------------# | ||||
|         bs      = input.size(0) | ||||
|         in_h    = input.size(2) | ||||
|         in_w    = input.size(3) | ||||
|         #-----------------------------------------------------------------------# | ||||
|         #   计算步长 | ||||
|         #   每一个特征点对应原来的图片上多少个像素点 | ||||
|         #   [640, 640] 高的步长为640 / 20 = 32,宽的步长为640 / 20 = 32 | ||||
|         #   如果特征层为20x20的话,一个特征点就对应原来的图片上的32个像素点 | ||||
|         #   如果特征层为40x40的话,一个特征点就对应原来的图片上的16个像素点 | ||||
|         #   如果特征层为80x80的话,一个特征点就对应原来的图片上的8个像素点 | ||||
|         #   stride_h = stride_w = 32、16、8 | ||||
|         #-----------------------------------------------------------------------# | ||||
|         stride_h = self.input_shape[0] / in_h | ||||
|         stride_w = self.input_shape[1] / in_w | ||||
|         #-------------------------------------------------# | ||||
|         #   此时获得的scaled_anchors大小是相对于特征层的 | ||||
|         #-------------------------------------------------# | ||||
|         scaled_anchors  = [(a_w / stride_w, a_h / stride_h) for a_w, a_h in self.anchors] | ||||
|         #-----------------------------------------------# | ||||
|         #   输入的input一共有三个,他们的shape分别是 | ||||
|         #   bs, 3 * (5+num_classes), 20, 20 => bs, 3, 5 + num_classes, 20, 20 => batch_size, 3, 20, 20, 5 + num_classes | ||||
|  | ||||
|         #   batch_size, 3, 20, 20, 5 + num_classes | ||||
|         #   batch_size, 3, 40, 40, 5 + num_classes | ||||
|         #   batch_size, 3, 80, 80, 5 + num_classes | ||||
|         #-----------------------------------------------# | ||||
|         prediction = input.view(bs, len(self.anchors_mask[l]), self.bbox_attrs, in_h, in_w).permute(0, 1, 3, 4, 2).contiguous() | ||||
|          | ||||
|         #-----------------------------------------------# | ||||
|         #   先验框的中心位置的调整参数 | ||||
|         #-----------------------------------------------# | ||||
|         x = torch.sigmoid(prediction[..., 0]) | ||||
|         y = torch.sigmoid(prediction[..., 1]) | ||||
|         #-----------------------------------------------# | ||||
|         #   先验框的宽高调整参数 | ||||
|         #-----------------------------------------------# | ||||
|         w = torch.sigmoid(prediction[..., 2])  | ||||
|         h = torch.sigmoid(prediction[..., 3])  | ||||
|         #-----------------------------------------------# | ||||
|         #   获得置信度,是否有物体 | ||||
|         #-----------------------------------------------# | ||||
|         conf = torch.sigmoid(prediction[..., 4]) | ||||
|         #-----------------------------------------------# | ||||
|         #   种类置信度 | ||||
|         #-----------------------------------------------# | ||||
|         pred_cls = torch.sigmoid(prediction[..., 5:]) | ||||
|         #-----------------------------------------------# | ||||
|         #   self.get_target已经合并到dataloader中 | ||||
|         #   原因是在这里执行过慢,会大大延长训练时间 | ||||
|         #-----------------------------------------------# | ||||
|         # y_true, noobj_mask = self.get_target(l, targets, scaled_anchors, in_h, in_w) | ||||
|  | ||||
|         #---------------------------------------------------------------# | ||||
|         #   将预测结果进行解码,判断预测结果和真实值的重合程度 | ||||
|         #   如果重合程度过大则忽略,因为这些特征点属于预测比较准确的特征点 | ||||
|         #   作为负样本不合适 | ||||
|         #----------------------------------------------------------------# | ||||
|         pred_boxes = self.get_pred_boxes(l, x, y, h, w, targets, scaled_anchors, in_h, in_w) | ||||
|  | ||||
|         if self.cuda: | ||||
|             y_true          = y_true.type_as(x) | ||||
|          | ||||
|         loss    = 0 | ||||
|         n       = torch.sum(y_true[..., 4] == 1) | ||||
|         if n != 0: | ||||
|             #---------------------------------------------------------------# | ||||
|             #   计算预测结果和真实结果的giou,计算对应有真实框的先验框的giou损失 | ||||
|             #                         loss_cls计算对应有真实框的先验框的分类损失 | ||||
|             #----------------------------------------------------------------# | ||||
|             giou        = self.box_giou(pred_boxes, y_true[..., :4]).type_as(x) | ||||
|             loss_loc    = torch.mean((1 - giou)[y_true[..., 4] == 1]) | ||||
|             loss_cls    = torch.mean(self.BCELoss(pred_cls[y_true[..., 4] == 1], self.smooth_labels(y_true[..., 5:][y_true[..., 4] == 1], self.label_smoothing, self.num_classes))) | ||||
|             loss        += loss_loc * self.box_ratio + loss_cls * self.cls_ratio | ||||
|             #-----------------------------------------------------------# | ||||
|             #   计算置信度的loss | ||||
|             #   也就意味着先验框对应的预测框预测的更准确 | ||||
|             #   它才是用来预测这个物体的。 | ||||
|             #-----------------------------------------------------------# | ||||
|             tobj        = torch.where(y_true[..., 4] == 1, giou.detach().clamp(0), torch.zeros_like(y_true[..., 4])) | ||||
|         else: | ||||
|             tobj        = torch.zeros_like(y_true[..., 4]) | ||||
|         loss_conf   = torch.mean(self.BCELoss(conf, tobj)) | ||||
|          | ||||
|         loss        += loss_conf * self.balance[l] * self.obj_ratio | ||||
|         # if n != 0: | ||||
|         #     print(loss_loc * self.box_ratio, loss_cls * self.cls_ratio, loss_conf * self.balance[l] * self.obj_ratio) | ||||
|         return loss | ||||
|      | ||||
|     def get_near_points(self, x, y, i, j): | ||||
|         sub_x = x - i | ||||
|         sub_y = y - j | ||||
|         if sub_x > 0.5 and sub_y > 0.5: | ||||
|             return [[0, 0], [1, 0], [0, 1]] | ||||
|         elif sub_x < 0.5 and sub_y > 0.5: | ||||
|             return [[0, 0], [-1, 0], [0, 1]] | ||||
|         elif sub_x < 0.5 and sub_y < 0.5: | ||||
|             return [[0, 0], [-1, 0], [0, -1]] | ||||
|         else: | ||||
|             return [[0, 0], [1, 0], [0, -1]] | ||||
|  | ||||
|     def get_target(self, l, targets, anchors, in_h, in_w): | ||||
|         #-----------------------------------------------------# | ||||
|         #   计算一共有多少张图片 | ||||
|         #-----------------------------------------------------# | ||||
|         bs              = len(targets) | ||||
|         #-----------------------------------------------------# | ||||
|         #   用于选取哪些先验框不包含物体 | ||||
|         #   bs, 3, 20, 20 | ||||
|         #-----------------------------------------------------# | ||||
|         noobj_mask      = torch.ones(bs, len(self.anchors_mask[l]), in_h, in_w, requires_grad = False) | ||||
|         #-----------------------------------------------------# | ||||
|         #   帮助找到每一个先验框最对应的真实框 | ||||
|         #-----------------------------------------------------# | ||||
|         box_best_ratio = torch.zeros(bs, len(self.anchors_mask[l]), in_h, in_w, requires_grad = False) | ||||
|         #-----------------------------------------------------# | ||||
|         #   batch_size, 3, 20, 20, 5 + num_classes | ||||
|         #-----------------------------------------------------# | ||||
|         y_true          = torch.zeros(bs, len(self.anchors_mask[l]), in_h, in_w, self.bbox_attrs, requires_grad = False) | ||||
|         for b in range(bs):             | ||||
|             if len(targets[b])==0: | ||||
|                 continue | ||||
|             batch_target = torch.zeros_like(targets[b]) | ||||
|             #-------------------------------------------------------# | ||||
|             #   计算出正样本在特征层上的中心点 | ||||
|             #   获得真实框相对于特征层的大小 | ||||
|             #-------------------------------------------------------# | ||||
|             batch_target[:, [0,2]] = targets[b][:, [0,2]] * in_w | ||||
|             batch_target[:, [1,3]] = targets[b][:, [1,3]] * in_h | ||||
|             batch_target[:, 4] = targets[b][:, 4] | ||||
|             batch_target = batch_target.cpu() | ||||
|              | ||||
|             #-----------------------------------------------------------------------------# | ||||
|             #   batch_target                                    : num_true_box, 5 | ||||
|             #   batch_target[:, 2:4]                            : num_true_box, 2 | ||||
|             #   torch.unsqueeze(batch_target[:, 2:4], 1)        : num_true_box, 1, 2 | ||||
|             #   anchors                                         : 9, 2 | ||||
|             #   torch.unsqueeze(torch.FloatTensor(anchors), 0)  : 1, 9, 2 | ||||
|             #   ratios_of_gt_anchors    : num_true_box, 9, 2 | ||||
|             #   ratios_of_anchors_gt    : num_true_box, 9, 2 | ||||
|             # | ||||
|             #   ratios                  : num_true_box, 9, 4 | ||||
|             #   max_ratios              : num_true_box, 9    | ||||
|             #   max_ratios每一个真实框和每一个先验框的最大宽高比! | ||||
|             #------------------------------------------------------------------------------# | ||||
|             ratios_of_gt_anchors = torch.unsqueeze(batch_target[:, 2:4], 1) / torch.unsqueeze(torch.FloatTensor(anchors), 0) | ||||
|             ratios_of_anchors_gt = torch.unsqueeze(torch.FloatTensor(anchors), 0) /  torch.unsqueeze(batch_target[:, 2:4], 1) | ||||
|             ratios               = torch.cat([ratios_of_gt_anchors, ratios_of_anchors_gt], dim = -1) | ||||
|             max_ratios, _        = torch.max(ratios, dim = -1) | ||||
|  | ||||
|             for t, ratio in enumerate(max_ratios): | ||||
|                 #-------------------------------------------------------# | ||||
|                 #   ratio : 9 | ||||
|                 #-------------------------------------------------------# | ||||
|                 over_threshold = ratio < self.threshold | ||||
|                 over_threshold[torch.argmin(ratio)] = True | ||||
|                 for k, mask in enumerate(self.anchors_mask[l]): | ||||
|                     if not over_threshold[mask]: | ||||
|                         continue | ||||
|                     #----------------------------------------# | ||||
|                     #   获得真实框属于哪个网格点 | ||||
|                     #   x  1.25     => 1 | ||||
|                     #   y  3.75     => 3 | ||||
|                     #----------------------------------------# | ||||
|                     i = torch.floor(batch_target[t, 0]).long() | ||||
|                     j = torch.floor(batch_target[t, 1]).long() | ||||
|                      | ||||
|                     offsets = self.get_near_points(batch_target[t, 0], batch_target[t, 1], i, j) | ||||
|                     for offset in offsets: | ||||
|                         local_i = i + offset[0] | ||||
|                         local_j = j + offset[1] | ||||
|  | ||||
|                         if local_i >= in_w or local_i < 0 or local_j >= in_h or local_j < 0: | ||||
|                             continue | ||||
|  | ||||
|                         if box_best_ratio[b, k, local_j, local_i] != 0: | ||||
|                             if box_best_ratio[b, k, local_j, local_i] > ratio[mask]: | ||||
|                                 y_true[b, k, local_j, local_i, :] = 0 | ||||
|                             else: | ||||
|                                 continue | ||||
|                              | ||||
|                         #----------------------------------------# | ||||
|                         #   取出真实框的种类 | ||||
|                         #----------------------------------------# | ||||
|                         c = batch_target[t, 4].long() | ||||
|  | ||||
|                         #----------------------------------------# | ||||
|                         #   noobj_mask代表无目标的特征点 | ||||
|                         #----------------------------------------# | ||||
|                         noobj_mask[b, k, local_j, local_i] = 0 | ||||
|                         #----------------------------------------# | ||||
|                         #   tx、ty代表中心调整参数的真实值 | ||||
|                         #----------------------------------------# | ||||
|                         y_true[b, k, local_j, local_i, 0] = batch_target[t, 0] | ||||
|                         y_true[b, k, local_j, local_i, 1] = batch_target[t, 1] | ||||
|                         y_true[b, k, local_j, local_i, 2] = batch_target[t, 2] | ||||
|                         y_true[b, k, local_j, local_i, 3] = batch_target[t, 3] | ||||
|                         y_true[b, k, local_j, local_i, 4] = 1 | ||||
|                         y_true[b, k, local_j, local_i, c + 5] = 1 | ||||
|                         #----------------------------------------# | ||||
|                         #   获得当前先验框最好的比例 | ||||
|                         #----------------------------------------# | ||||
|                         box_best_ratio[b, k, local_j, local_i] = ratio[mask] | ||||
|                          | ||||
|         return y_true, noobj_mask | ||||
|  | ||||
|     def get_pred_boxes(self, l, x, y, h, w, targets, scaled_anchors, in_h, in_w): | ||||
|         #-----------------------------------------------------# | ||||
|         #   计算一共有多少张图片 | ||||
|         #-----------------------------------------------------# | ||||
|         bs = len(targets) | ||||
|  | ||||
|         #-----------------------------------------------------# | ||||
|         #   生成网格,先验框中心,网格左上角 | ||||
|         #-----------------------------------------------------# | ||||
|         grid_x = torch.linspace(0, in_w - 1, in_w).repeat(in_h, 1).repeat( | ||||
|             int(bs * len(self.anchors_mask[l])), 1, 1).view(x.shape).type_as(x) | ||||
|         grid_y = torch.linspace(0, in_h - 1, in_h).repeat(in_w, 1).t().repeat( | ||||
|             int(bs * len(self.anchors_mask[l])), 1, 1).view(y.shape).type_as(x) | ||||
|  | ||||
|         # 生成先验框的宽高 | ||||
|         scaled_anchors_l = np.array(scaled_anchors)[self.anchors_mask[l]] | ||||
|         anchor_w = torch.Tensor(scaled_anchors_l).index_select(1, torch.LongTensor([0])).type_as(x) | ||||
|         anchor_h = torch.Tensor(scaled_anchors_l).index_select(1, torch.LongTensor([1])).type_as(x) | ||||
|          | ||||
|         anchor_w = anchor_w.repeat(bs, 1).repeat(1, 1, in_h * in_w).view(w.shape) | ||||
|         anchor_h = anchor_h.repeat(bs, 1).repeat(1, 1, in_h * in_w).view(h.shape) | ||||
|         #-------------------------------------------------------# | ||||
|         #   计算调整后的先验框中心与宽高 | ||||
|         #-------------------------------------------------------# | ||||
|         pred_boxes_x    = torch.unsqueeze(x * 2. - 0.5 + grid_x, -1) | ||||
|         pred_boxes_y    = torch.unsqueeze(y * 2. - 0.5 + grid_y, -1) | ||||
|         pred_boxes_w    = torch.unsqueeze((w * 2) ** 2 * anchor_w, -1) | ||||
|         pred_boxes_h    = torch.unsqueeze((h * 2) ** 2 * anchor_h, -1) | ||||
|         pred_boxes      = torch.cat([pred_boxes_x, pred_boxes_y, pred_boxes_w, pred_boxes_h], dim = -1) | ||||
|         return pred_boxes | ||||
|  | ||||
| def is_parallel(model): | ||||
|     # Returns True if model is of type DP or DDP | ||||
|     return type(model) in (nn.parallel.DataParallel, nn.parallel.DistributedDataParallel) | ||||
|  | ||||
| def de_parallel(model): | ||||
|     # De-parallelize a model: returns single-GPU model if model is of type DP or DDP | ||||
|     return model.module if is_parallel(model) else model | ||||
|      | ||||
| def copy_attr(a, b, include=(), exclude=()): | ||||
|     # Copy attributes from b to a, options to only include [...] and to exclude [...] | ||||
|     for k, v in b.__dict__.items(): | ||||
|         if (len(include) and k not in include) or k.startswith('_') or k in exclude: | ||||
|             continue | ||||
|         else: | ||||
|             setattr(a, k, v) | ||||
|  | ||||
| class ModelEMA: | ||||
|     """ Updated Exponential Moving Average (EMA) from https://github.com/rwightman/pytorch-image-models | ||||
|     Keeps a moving average of everything in the model state_dict (parameters and buffers) | ||||
|     For EMA details see https://www.tensorflow.org/api_docs/python/tf/train/ExponentialMovingAverage | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, model, decay=0.9999, tau=2000, updates=0): | ||||
|         # Create EMA | ||||
|         self.ema = deepcopy(de_parallel(model)).eval()  # FP32 EMA | ||||
|         # if next(model.parameters()).device.type != 'cpu': | ||||
|         #     self.ema.half()  # FP16 EMA | ||||
|         self.updates = updates  # number of EMA updates | ||||
|         self.decay = lambda x: decay * (1 - math.exp(-x / tau))  # decay exponential ramp (to help early epochs) | ||||
|         for p in self.ema.parameters(): | ||||
|             p.requires_grad_(False) | ||||
|  | ||||
|     def update(self, model): | ||||
|         # Update EMA parameters | ||||
|         with torch.no_grad(): | ||||
|             self.updates += 1 | ||||
|             d = self.decay(self.updates) | ||||
|  | ||||
|             msd = de_parallel(model).state_dict()  # model state_dict | ||||
|             for k, v in self.ema.state_dict().items(): | ||||
|                 if v.dtype.is_floating_point: | ||||
|                     v *= d | ||||
|                     v += (1 - d) * msd[k].detach() | ||||
|  | ||||
|     def update_attr(self, model, include=(), exclude=('process_group', 'reducer')): | ||||
|         # Update EMA attributes | ||||
|         copy_attr(self.ema, model, include, exclude) | ||||
|  | ||||
| def weights_init(net, init_type='normal', init_gain = 0.02): | ||||
|     def init_func(m): | ||||
|         classname = m.__class__.__name__ | ||||
|         if hasattr(m, 'weight') and classname.find('Conv') != -1: | ||||
|             if init_type == 'normal': | ||||
|                 torch.nn.init.normal_(m.weight.data, 0.0, init_gain) | ||||
|             elif init_type == 'xavier': | ||||
|                 torch.nn.init.xavier_normal_(m.weight.data, gain=init_gain) | ||||
|             elif init_type == 'kaiming': | ||||
|                 torch.nn.init.kaiming_normal_(m.weight.data, a=0, mode='fan_in') | ||||
|             elif init_type == 'orthogonal': | ||||
|                 torch.nn.init.orthogonal_(m.weight.data, gain=init_gain) | ||||
|             else: | ||||
|                 raise NotImplementedError('initialization method [%s] is not implemented' % init_type) | ||||
|         elif classname.find('BatchNorm2d') != -1: | ||||
|             torch.nn.init.normal_(m.weight.data, 1.0, 0.02) | ||||
|             torch.nn.init.constant_(m.bias.data, 0.0) | ||||
|     print('initialize network with %s type' % init_type) | ||||
|     net.apply(init_func) | ||||
|  | ||||
| def get_lr_scheduler(lr_decay_type, lr, min_lr, total_iters, warmup_iters_ratio = 0.05, warmup_lr_ratio = 0.1, no_aug_iter_ratio = 0.05, step_num = 10): | ||||
|     def yolox_warm_cos_lr(lr, min_lr, total_iters, warmup_total_iters, warmup_lr_start, no_aug_iter, iters): | ||||
|         if iters <= warmup_total_iters: | ||||
|             # lr = (lr - warmup_lr_start) * iters / float(warmup_total_iters) + warmup_lr_start | ||||
|             lr = (lr - warmup_lr_start) * pow(iters / float(warmup_total_iters), 2 | ||||
|             ) + warmup_lr_start | ||||
|         elif iters >= total_iters - no_aug_iter: | ||||
|             lr = min_lr | ||||
|         else: | ||||
|             lr = min_lr + 0.5 * (lr - min_lr) * ( | ||||
|                 1.0 | ||||
|                 + math.cos( | ||||
|                     math.pi | ||||
|                     * (iters - warmup_total_iters) | ||||
|                     / (total_iters - warmup_total_iters - no_aug_iter) | ||||
|                 ) | ||||
|             ) | ||||
|         return lr | ||||
|  | ||||
|     def step_lr(lr, decay_rate, step_size, iters): | ||||
|         if step_size < 1: | ||||
|             raise ValueError("step_size must above 1.") | ||||
|         n       = iters // step_size | ||||
|         out_lr  = lr * decay_rate ** n | ||||
|         return out_lr | ||||
|  | ||||
|     if lr_decay_type == "cos": | ||||
|         warmup_total_iters  = min(max(warmup_iters_ratio * total_iters, 1), 3) | ||||
|         warmup_lr_start     = max(warmup_lr_ratio * lr, 1e-6) | ||||
|         no_aug_iter         = min(max(no_aug_iter_ratio * total_iters, 1), 15) | ||||
|         func = partial(yolox_warm_cos_lr ,lr, min_lr, total_iters, warmup_total_iters, warmup_lr_start, no_aug_iter) | ||||
|     else: | ||||
|         decay_rate  = (min_lr / lr) ** (1 / (step_num - 1)) | ||||
|         step_size   = total_iters / step_num | ||||
|         func = partial(step_lr, lr, decay_rate, step_size) | ||||
|  | ||||
|     return func | ||||
|  | ||||
| def set_optimizer_lr(optimizer, lr_scheduler_func, epoch): | ||||
|     lr = lr_scheduler_func(epoch) | ||||
|     for param_group in optimizer.param_groups: | ||||
|         param_group['lr'] = lr | ||||
		Reference in New Issue
	
	Block a user