This commit is contained in:
TerenceLiu 2024-05-09 16:00:01 +01:00
parent 0b73c6585c
commit d9808162fc
8 changed files with 264 additions and 1542 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

5
.gitignore vendored
View File

@ -1,6 +1,11 @@
# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode
# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode
## others
data/
bdd100k/
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/

File diff suppressed because one or more lines are too long

15
requirements.txt Normal file
View File

@ -0,0 +1,15 @@
tqdm
timm
numpy
torch
pandas
gitinfo
matplotlib
torchaudio
torchvision
coloredlogs
scikit-learn
scikit-image
transformers
opencv-python
albumentations

View File

@ -27,6 +27,9 @@ import matplotlib.pyplot as plt
import albumentations as Aug
from albumentations.pytorch import ToTensorV2
from einops import rearrange
from timm.models.layers import DropPath, trunc_normal_
OBJ_LABELS = {
"truck": 0, "bicycle": 1, "car": 2, "motorcycle": 3,
"train": 4, "bus": 5, "traffic sign": 6, "rider": 7, "person": 8,

View File

@ -1,8 +1,7 @@
from src.perception.base import *
class AutoDriveDataset(Dataset):
def __init__(self, image_dir, lane_dir, drivable_dir, transform=True):
self.lane_dir = [str(i) for i in list(sorted(Path(f"{lane_dir}").glob("*")))]
def __init__(self, image_dir, drivable_dir, transform=True):
self.image_dir = [str(i) for i in list(sorted(Path(f"{image_dir}").glob("*")))]
self.drivable_dir = [str(i) for i in list(sorted(Path(f"{drivable_dir}").glob("*")))]
self.transform = transform
@ -12,7 +11,6 @@ class AutoDriveDataset(Dataset):
def __readdata__(self, idx):
name = self.image_dir[idx].split("/")[-1].split(".")[0]
lane = cv2.cvtColor(cv2.imread(f"{[i for i in self.lane_dir if name in i][0]}"), cv2.COLOR_BGR2RGB)
image = cv2.cvtColor(cv2.imread(f"{[i for i in self.image_dir if name in i][0]}"), cv2.COLOR_BGR2RGB)
drivable = cv2.cvtColor(cv2.imread(f"{[i for i in self.drivable_dir if name in i][0]}"), cv2.COLOR_BGR2RGB)
drivable[np.all(drivable == [219, 94, 86], axis=-1)] = [255, 0, 0]
@ -32,7 +30,7 @@ class AutoDriveDataset(Dataset):
def __augmentation__(self, image, drivable):
transform = Aug.Compose([
Aug.Normalize(mean=(0.485, 0.56, 0.406), std=(0.229, 0.224, 0.225)),
Aug.Resize (360, 640, p=1), Aug.HorizontalFlip(p=0.5), Aug.RandomBrightnessContrast(p=0.5)])
Aug.Resize (180, 320, p=1), Aug.HorizontalFlip(p=0.5), Aug.RandomBrightnessContrast(p=0.5)])
transformed = transform(image=image, mask=drivable)
image = transformed["image"].transpose(2, 0, 1)
drivable = transformed["mask"].transpose(2, 0, 1)

View File

@ -4,6 +4,10 @@ from src.perception.base import *
from torchvision.models._utils import IntermediateLayerGetter
'''
SegFormer: https://arxiv.org/abs/2105.15203 & https://huggingface.co/docs/transformers/en/model_doc/segformer
'''
def __clones__(module, N):
return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
@ -22,181 +26,227 @@ class ResNet(nn.Module):
output_ftrs = self.model(x)
return output_ftrs["stage7"], torch.ones(output_ftrs["stage7"].shape, device=x.device)[:, 0]
class Compression(nn.Module):
def __init__(self, back_dim=2048, embed_dim=256):
super(Compression, self).__init__()
self.conv = nn.Conv2d(in_channels=back_dim, out_channels=embed_dim, kernel_size=1)
def forward(self, x):
return self.conv(x)
class PositionEmbedding(nn.Module):
'''
A Learnable Positional Embedding
'''
def __init__(self, num_queries=100, hidden_dim=256):
super(PositionEmbedding, self).__init__()
self.row_embed = nn.Embedding(num_queries // 2, hidden_dim // 2)
self.col_embed = nn.Embedding(num_queries // 2, hidden_dim // 2)
self.reset_parameters()
def reset_parameters(self):
nn.init.uniform_(self.row_embed.weight)
nn.init.uniform_(self.col_embed.weight)
def forward(self, x):
b, c, h, w = x.shape
x_emb = self.col_embed(torch.arange(w, device=x.device))
y_emb = self.row_embed(torch.arange(h, device=x.device))
pos = torch.cat([x_emb.unsqueeze(0).repeat(h, 1, 1), y_emb.unsqueeze(1).repeat(1, w, 1)], dim=-1).permute(2, 0, 1).unsqueeze(0).flatten(2).permute(2, 0, 1)
# y = pos + 0.1 * y.flatten(2).permute(2,0,1)
return pos
class FeedForward(nn.Module):
def __init__(self, embed_dim=256, ffn_dim=512, dropout=0.0):
'''
FFN + add&Norm
'''
super(FeedForward, self).__init__()
self.fc_1 = nn.Linear(in_features=embed_dim, out_features=ffn_dim)
self.fc_2 = nn.Linear(in_features=ffn_dim, out_features=embed_dim)
class overlap_patch_embed(nn.Module):
def __init__(self, patch_size, stride, in_chans, embed_dim):
super().__init__()
self.patch_size = patch_size
self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=stride,
padding=(patch_size // 2, patch_size // 2))
self.norm = nn.LayerNorm(embed_dim)
self.drop = nn.Dropout(dropout)
def forward(self, x):
output = self.drop(self.fc_2(F.relu(self.fc_1(x)))) + x
output = self.norm(output)
x = self.proj(x)
_, _, h, w = x.shape
x = rearrange(x, 'b c h w -> b (h w) c')
x = self.norm(x)
return x, h, w
class efficient_self_attention(nn.Module):
def __init__(self, attn_dim, num_heads, dropout_p, sr_ratio):
super().__init__()
assert attn_dim % num_heads == 0, f'expected attn_dim {attn_dim} to be a multiple of num_heads {num_heads}'
self.attn_dim = attn_dim
self.num_heads = num_heads
self.dropout_p = dropout_p
self.sr_ratio = sr_ratio
if sr_ratio > 1:
self.sr = nn.Conv2d(attn_dim, attn_dim, kernel_size=sr_ratio, stride=sr_ratio)
self.norm = nn.LayerNorm(attn_dim)
self.q = nn.Linear(attn_dim, attn_dim, bias=True)
self.kv = nn.Linear(attn_dim, attn_dim * 2, bias=True)
self.scale = (attn_dim // num_heads) ** -0.5
self.proj = nn.Linear(attn_dim, attn_dim)
self.proj_drop = nn.Dropout(self.dropout_p)
def forward(self, x, h, w):
B, N, C = x.shape
q = self.q(x)
q = rearrange(q, 'b n (h e) -> b n h e', h=self.num_heads).permute(0, 2, 1, 3)
if self.sr_ratio > 1:
x = rearrange(x, 'b (h w) c -> b c h w', h=h)
x = self.sr(x)
x = rearrange(x, 'b c h w -> b (h w) c')
x = self.norm(x)
kv = self.kv(x)
kv = rearrange(kv, 'b n (s h e) -> b n s h e', s=2, h=self.num_heads).permute(2, 0, 3, 1, 4)
k, v = kv.unbind(0)
attn = (q @ k.transpose(-1, -2)) * self.scale
attn = attn.softmax(dim=-1)
output = attn @ v
output = rearrange(output, 'b h n e -> b n (h e)')
output = self.proj(output) # (attn_dim, attn_dim)
output = self.proj_drop(output)
attn_output = {'attn' : attn}
return output, attn_output
class mix_feedforward(nn.Module):
def __init__(self, in_features, out_features, hidden_features, dropout_p = 0.0):
super().__init__()
self.fc1 = nn.Linear(in_features, hidden_features)
self.conv = nn.Conv2d(hidden_features, hidden_features, kernel_size=3, padding=1, groups=hidden_features)
self.gelu = nn.GELU()
self.drop = nn.Dropout(dropout_p)
self.fc2 = nn.Linear(hidden_features, out_features)
def forward(self, x, h, w):
output = self.fc1(x)
output = rearrange(output, 'b (h w) c -> b c h w', h=h)
output = self.conv(output)
output = rearrange(output, 'b c h w -> b (h w) c')
output = self.gelu(output)
output = self.drop(output)
output = self.fc2(output)
output = self.drop(output)
return output
class EncoderLayer(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0):
'''
Architecture: Multihead Attention + add&Norm + FFN + Add&Norm
'''
super(EncoderLayer, self).__init__()
self.attention = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
self.feedforward = FeedForward(embed_dim=embed_dim, ffn_dim=ffn_dim, dropout=dropout)
self.norm = nn.LayerNorm(embed_dim)
def add_pos_embed(self, x, pos:Optional[torch.Tensor]=None):
return x if pos is None else x + pos
def forward(self, x, mask:Optional[torch.Tensor]=None, pos:Optional[torch.Tensor]=None):
query = key = self.add_pos_embed(x, pos)
ftrs, attention = self.attention(query, key, x, mask)
ftrs = self.norm(ftrs + x)
output = self.feedforward(ftrs)
return output, attention
class Encoder(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0, num_layers=8, hidden_dim=256):
super(Encoder, self).__init__()
layer = EncoderLayer(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout)
self.encoder = __clones__(layer, num_layers)
class transformer_block(nn.Module):
def __init__(self, dim, num_heads, dropout_p, drop_path_p, sr_ratio):
super().__init__()
self.attn = efficient_self_attention(attn_dim=dim, num_heads=num_heads,
dropout_p=dropout_p, sr_ratio=sr_ratio)
self.ffn = mix_feedforward(dim, dim, hidden_features=dim * 4, dropout_p=dropout_p)
self.norm1 = nn.LayerNorm(dim, eps=1e-6)
self.norm2 = nn.LayerNorm(dim, eps=1e-6)
self.drop_path = DropPath(drop_path_p) if drop_path_p > 0. else nn.Identity()
def forward(self, x, mask:Optional[torch.Tensor]=None, pos:Optional[torch.Tensor]=None):
output, attns = x, []
for layer in self.encoder:
output, attention = layer(x=output, mask=mask, pos=pos)
attns.append(attention)
return output, attns
def forward(self, x, h, w):
x1 = x
x = self.norm1(x)
x, attn_output = self.attn(x, h, w)
x = self.drop_path(x)
x += x1
x2 = x
x = self.norm2(x)
x = self.drop_path(self.ffn(x, h, w))
x += x2
return x, attn_output
class DecoderLayer(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0):
super(DecoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
self.attention = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
self.feedforward = FeedForward(embed_dim=embed_dim, ffn_dim=ffn_dim, dropout=dropout)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
def add_pos_embed(self, x, pos:Optional[torch.Tensor]=None):
return x if pos is None else x + pos
def forward(self, x, queries, mask:Optional[torch.Tensor]=None, queries_mask:Optional[torch.Tensor]=None,
pos:Optional[torch.Tensor]=None, queries_pos:Optional[torch.Tensor]=None):
q = k = self.add_pos_embed(queries, queries_pos)
queries_, _ = self.self_attn(q, k, queries, queries_mask)
queries = self.norm1(queries_ + queries)
output, attention = self.attention(self.add_pos_embed(queries, queries_mask),
self.add_pos_embed(x, pos), self.add_pos_embed(x, pos), mask)
output = self.norm2(output + queries)
output = self.feedforward(output)
return output, attention
class mix_transformer_stage(nn.Module):
def __init__(self, patch_embed, blocks, norm):
super().__init__()
self.patch_embed = patch_embed
self.blocks = blocks
self.norm = norm
class Decoder(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0, num_layers=8, hidden_dim=256):
super(Decoder, self).__init__()
layer = DecoderLayer(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout)
self.decoder = __clones__(layer, num_layers)
def forward(self, x, queries, mask:Optional[torch.Tensor]=None, queries_mask:Optional[torch.Tensor]=None,
pos:Optional[torch.Tensor]=None, queries_pos:Optional[torch.Tensor]=None):
output, attns = x, []
for layer in self.decoder:
output, attention = layer(output, queries)
attns.append(attention)
return output.transpose(0, 1), attns
class Detector(nn.Module):
'''
Object Detection Head
'''
def __init__(self, num_classes=100, embed_dim=256):
super(Detector, self).__init__()
self.linear_class = nn.Linear(embed_dim, num_classes + 1)
self.linear_boxes = nn.Linear(embed_dim, 4)
def forward(self, x):
return {"pred_logits": self.linear_class(x), "boxes": self.linear_boxes(x)}
x, h, w = self.patch_embed(x)
class Segmentor(nn.Module):
'''
Semantic Segmentation Head
'''
def __init__(self, num_classes=3, embed_dim=256):
super(Segmentor, self).__init__()
channels = [embed_dim // 2**i for i in range(6)]
self.upsample = nn.Sequential(*nn.ModuleList([
nn.Sequential(nn.Conv2d(channels[i], channels[i+1], kernel_size=1),
nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)) for i in range(len(channels) - 1)
]))
self.segmentor = nn.Conv2d(channels[-1], num_classes, kernel_size=1, bias=False)
for block in self.blocks:
x, stage_output = block(x, h, w)
x = self.norm(x)
x = rearrange(x, 'b (h w) c -> b c h w', h=h, w=w)
return x, stage_output
class mix_transformer(nn.Module):
def __init__(self, in_chans, embed_dims, num_heads, depths,
sr_ratios, dropout_p, drop_path_p):
super().__init__()
self.stages = nn.ModuleList()
for stage_i in range(len(depths)):
if(stage_i == 0):
patch_size = 7
stride = 4
in_chans = in_chans
else:
patch_size = 3
stride = 2
in_chans = embed_dims[stage_i-1]
patch_embed = overlap_patch_embed(patch_size=patch_size, stride=stride,
in_chans=in_chans, embed_dim=embed_dims[stage_i])
blocks = nn.ModuleList()
for N in range(depths[stage_i]):
blocks.append(transformer_block(dim=embed_dims[stage_i],
num_heads=num_heads[stage_i],
dropout_p=dropout_p,
drop_path_p=drop_path_p * (sum(depths[:stage_i])+N) / (sum(depths)-1), # stochastic depth decay rule
sr_ratio=sr_ratios[stage_i]))
self.stages.append(mix_transformer_stage(patch_embed, blocks, nn.LayerNorm(embed_dims[stage_i], eps=1e-6)))
def forward(self, x):
x = self.segmentor(self.upsample(x))
outputs = []
for stage in self.stages:
x, _ = stage(x)
outputs.append(x)
return outputs
def get_attn_outputs(self, x):
stage_outputs = []
for stage in self.stages:
x, stage_data = stage(x)
stage_outputs.append(stage_data)
return stage_outputs
class segformer_head(nn.Module):
def __init__(self, in_channels, num_classes, embed_dim, dropout_p=0.1):
super().__init__()
self.in_channels = in_channels
self.num_classes = num_classes
self.embed_dim = embed_dim
self.dropout_p = dropout_p
self.layers = nn.ModuleList([nn.Conv2d(chans, embed_dim, (1, 1)) for chans in in_channels])
self.linear_fuse = nn.Conv2d(embed_dim * len(self.layers), embed_dim, (1, 1), bias=False)
self.bn = nn.BatchNorm2d(embed_dim, eps=1e-5)
self.drop = nn.Dropout(dropout_p)
self.linear_pred = nn.Conv2d(self.embed_dim, num_classes, kernel_size=(1, 1))
self.init_weights()
def init_weights(self):
nn.init.kaiming_normal_(self.linear_fuse.weight, mode='fan_out', nonlinearity='relu')
nn.init.constant_(self.bn.weight, 1)
nn.init.constant_(self.bn.bias, 0)
def forward(self, x):
feature_size = x[0].shape[2:]
unify_stages = [layer(xi) for xi, layer in zip(x, self.layers)]
ups = [unify_stages[0]] + \
[F.interpolate(stage, size=feature_size, mode='bilinear') for stage in unify_stages[1:]]
concat = torch.cat(ups[::-1], 1)
x = self.linear_fuse(concat)
x = self.bn(x)
x = F.relu(x, inplace=True)
x = self.drop(x)
x = self.linear_pred(x)
return x
class Perception(nn.Module):
def __init__(self, pretrained=True, num_queries=100, num_classes={"obj": 13, "seg": 2}, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0, num_layers=2, hidden_dim=256):
super(Perception, self).__init__()
self.backbone = ResNet(pretrained=True)
self.position = PositionEmbedding(num_queries=num_queries)
self.compress = Compression(back_dim=2048, embed_dim=embed_dim)
self.encoder = Encoder(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout, num_layers=num_layers, hidden_dim=hidden_dim)
self.decoder = Decoder(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout, num_layers=num_layers, hidden_dim=hidden_dim)
self.detector = Detector(num_classes=num_classes["obj"], embed_dim=embed_dim)
self.segmentor = Segmentor(num_classes=num_classes["seg"], embed_dim=embed_dim)
self.query_embed = nn.Embedding(num_queries, hidden_dim)
class segformer_mit_b3(nn.Module):
def __init__(self, in_channels, num_classes):
super().__init__()
self.backbone = mix_transformer(in_chans=in_channels, embed_dims=(64, 128, 320, 512),
num_heads=(1, 2, 5, 8), depths=(3, 4, 18, 3),
sr_ratios=(8, 4, 2, 1), dropout_p=0.1, drop_path_p=0.2)
self.decoder_head = segformer_head(in_channels=(64, 128, 320, 512),
num_classes=num_classes, embed_dim=256)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
def forward(self, x):
b, c, h_ori, w_ori = x.shape
queries = self.query_embed.weight.unsqueeze(1).repeat(1, b, 1)
x, mask = padding(x, 32)
h, w = x.size(2), x.size(3)
back_ftrs, back_mask = self.backbone(x)
features = self.compress(back_ftrs)
features_flat, position = features.flatten(2).permute(2, 0, 1), self.position(features)
enc_ftrs, attns_enc = self.encoder(x=features_flat, mask=back_mask.flatten(1), pos=position)
dec_ftrs, attns_dec = self.decoder(x=enc_ftrs, queries=queries, mask=back_mask.flatten(1), pos=position, queries_pos=self.query_embed)
dect, seg = self.detector(dec_ftrs), unpadding(self.segmentor(features + enc_ftrs.reshape([b, -1, back_ftrs.size(2), back_ftrs.size(3)])), mask.unsqueeze(0).unsqueeze(1))
return {"decoded": dec_ftrs, "detection": dect, "segment": seg}, attns_enc, attns_dec
image_hw = x.shape[2:]
x = self.backbone(x)
x = self.decoder_head(x)
x = F.interpolate(x, size=image_hw, mode='bilinear')
return x
def get_attention_outputs(self, x):
return self.backbone.get_attn_outputs(x)
if __name__ == "__main__":
from src.perception.model.perception import *
x = torch.randn([2, 3, 360, 640])
perception = Perception()
queries = torch.randn([100, 2, 256])
output, attns_enc, attns_dec = perception(x, queries)
perception = segformer_mit_b3(in_channels=3, num_classes=3)
output = perception(x)

View File

@ -1,79 +0,0 @@
import copy
from typing import Optional, List
from src.perception.base import *
from torchvision.models._utils import IntermediateLayerGetter
class DualConv(nn.Module):
def __init__(self, in_channel=64, out_channel=128, stride=1, activate="relu"):
super(DualConv, self).__init__()
if activate == "elu":
self.activate = nn.ELU()
elif activate == "leaklyrelu":
self.activate = nn.LeakyReLU(0.2)
else:
self.activate = nn.ReLU()
self.convblocks = nn.Sequential(
nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, padding_mode="reflect", bias=False),
nn.InstanceNorm2d(out_channel, affine=False),
self.activate,
nn.Conv2d(out_channel, out_channel, kernel_size=3, stride=stride, padding=1, padding_mode="reflect", bias=False),
nn.InstanceNorm2d(out_channel, affine=False),
self.activate
)
def forward(self, x):
output = self.convblocks(x)
return output
class Encoder(nn.Module):
def __init__(self, channels=[3, 64, 128, 256, 512, 1024]):
super(Encoder, self).__init__()
self.encblocks = nn.ModuleList([
DualConv(channels[i], channels[i+1], stride=1) if i == 0 else DualConv(channels[i], channels[i+1], stride=2)
for i in range(len(channels) - 1)
])
def forward(self, x):
output = []
for i, encblock in enumerate(self.encblocks):
x = encblock(x)
output.append(x)
return output
class Decoder(nn.Module):
def __init__(self, channels=[1024, 512, 256, 128, 64]):
super(Decoder, self).__init__()
self.upsample = nn.ModuleList([
nn.Sequential(nn.Conv2d(channels[i], channels[i+1], kernel_size=1),
nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)) for i in range(len(channels) - 1)
])
self.decblocks = nn.ModuleList([
DualConv(channels[i], channels[i+1]) for i in range(len(channels) - 1)
])
def forward(self, x, enc_ftrs):
for i, decblock in enumerate(self.decblocks):
x = self.upsample[i](x)
x = decblock(torch.cat([x, enc_ftrs[i]], dim=1))
return x
class UNet(nn.Module):
def __init__(self, enc_channels=[3, 64, 128, 256, 512, 1024], dec_channels=[1024, 512, 256, 128, 64], out_channel=3):
super(UNet, self).__init__()
self.encoder = Encoder(channels=enc_channels)
self.decoder = Decoder(channels=dec_channels)
self.output = nn.Conv2d(dec_channels[-1], out_channel, kernel_size=1)
def forward(self, x):
enc_out = self.encoder(x)
dec_out = self.decoder(enc_out[::-1][0], enc_out[::-1][1:])
output = self.output(dec_out)
return output
def weights_init(self, m):
if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
nn.init.kaiming_normal_(m.weight)