This commit is contained in:
TerenceLiu98 2024-05-07 18:15:53 +00:00
parent c6b7bce2f4
commit e83298d09f
8 changed files with 469 additions and 108 deletions

View File

@ -24,37 +24,4 @@ A ADAS(Advanced Driver Assistance System) for Euro Truck Simulator 2 (or America
- This dataset contains recorded screen of Euro Truck Simulator 2 and paired input from Steering wheel controller (Thrustmaster Ff430).
- Dataset contains 323894 frames captured at 25fps.
- Each frame is paired with steering wheel controller input values at that moment
- Using [Europilot](https://github.com/marsauto/europilot)
Data Directory Structure:
```
(BDD100k dataset)
├── image
│ └── 100k
│ ├── test
│ ├── train
│ └── val
└── label
├── det_20
├── drivable
│ ├── colormaps
│ │ ├── train
│ │ └── val
│ ├── masks
│ │ ├── train
│ │ └── val
│ ├── polygons
│ └── rles
└── lane
├── colormaps
│ ├── train
│ └── val
├── masks
│ ├── train
│ └── val
└── polygons
```
![example.png](src/perception/sample.png)
- Using [Europilot](https://github.com/marsauto/europilot)

199
perception.ipynb Normal file

File diff suppressed because one or more lines are too long

BIN
sample.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

View File

@ -62,19 +62,21 @@ def padding(image, patch_size, fill_value=0):
if W % patch_size > 0:
pad_w = patch_size - (W % patch_size)
image_padded = image
padding_mask = torch.ones_like(image, dtype=torch.float32) # Initialize mask with ones
if pad_h > 0 or pad_w > 0:
image_padded = F.pad(image, (0, pad_w, 0, pad_h), value=fill_value)
return image_padded
padding_mask = F.pad(padding_mask, (0, pad_w, 0, pad_h), value=0) # Update mask for padded regions
return image_padded, padding_mask[0, 0, :, :]
def unpadding(image, target_size):
H, W = target_size
H_pad, W_pad = image.size(2), image.size(3)
# crop predictions on extra pixels coming from padding
extra_h = H_pad - H
extra_w = W_pad - W
if extra_h > 0:
image = image[:, :, :-extra_h]
if extra_w > 0:
image = image[:, :, :, :-extra_w]
return image
def unpadding(image_padded, padding_mask):
H_padded, W_padded = image_padded.size(2), image_padded.size(3)
H_ori, W_ori = abs(H_padded - padding_mask[0,0,:,1].sum().long().item()), abs(W_padded - padding_mask[0,0,1,:].sum().long().item())
if H_ori == 0:
return image_padded[:, :, :, :-W_ori]
elif W_ori == 0:
return image_padded[:, :, :-H_ori, :]
elif H_ori == 0 and W_ori == 0:
return image_padded
else:
return image_padded[:, :, :-H_ori, :-W_ori]

View File

@ -83,8 +83,8 @@ if __name__ == "__main__":
rect = patches.Rectangle((D["boxes"][i][0], D["boxes"][i][1]),
D["boxes"][i][2] - D["boxes"][i][0], D["boxes"][i][3] - D["boxes"][i][1], linewidth=1, edgecolor='r', facecolor='none')
ax[0].add_patch(rect) # import matplotlib.patches as patches
ax[0].axis("off")
ax[0].axis("off")
ax[1].imshow(images[0].permute(1,2,0).numpy())
ax[1].imshow(lane[0].permute(1,2,0).numpy(), alpha=0.5)

View File

@ -19,8 +19,8 @@ class ResNet(nn.Module):
self.model = IntermediateLayerGetter(resnet, return_layers=return_layers)
def forward(self, x):
output = self.model(x)
return output["stage7"] #[output["stage3"], output["stage4"], output["stage5"], output["stage6"], output["stage7"]]
output_ftrs = self.model(x)
return output_ftrs["stage7"], torch.ones(output_ftrs["stage7"].shape, device=x.device)[:, 0]
class Compression(nn.Module):
def __init__(self, back_dim=2048, embed_dim=256):
@ -36,67 +36,56 @@ class PositionEmbedding(nn.Module):
'''
def __init__(self, num_queries=100, hidden_dim=256):
super(PositionEmbedding, self).__init__()
self.row_embed = nn.Parameter(torch.rand(num_queries // 2, hidden_dim // 2))
self.col_embed = nn.Parameter(torch.rand(num_queries // 2, hidden_dim // 2))
self.row_embed = nn.Embedding(num_queries // 2, hidden_dim // 2)
self.col_embed = nn.Embedding(num_queries // 2, hidden_dim // 2)
self.reset_parameters()
def reset_parameters(self):
nn.init.uniform_(self.row_embed.weight)
nn.init.uniform_(self.col_embed.weight)
def forward(self, x):
b, c, h, w = x.shape
pos = torch.cat([self.col_embed[:w].unsqueeze(0).repeat(h, 1, 1),
self.row_embed[:h].unsqueeze(1).repeat(1, w, 1)], dim=-1).flatten(0, 1).unsqueeze(1).repeat(1, b, 1)
x_emb = self.col_embed(torch.arange(w, device=x.device))
y_emb = self.row_embed(torch.arange(h, device=x.device))
pos = torch.cat([x_emb.unsqueeze(0).repeat(h, 1, 1), y_emb.unsqueeze(1).repeat(1, w, 1)], dim=-1).permute(2, 0, 1).unsqueeze(0).flatten(2).permute(2, 0, 1)
# y = pos + 0.1 * y.flatten(2).permute(2,0,1)
return pos
class SinePositionEmbedding(nn.Module):
'''
A Static Positional Embedding
'''
def __init__(self, num_queries=100, hidden_dim=256):
super(SinePositionEmbedding, self).__init__()
self.num_queries, self.hidden_dim = num_queries, hidden_dim
self.base = 10000
def forward(self, x):
pos = torch.zeros(self.num_queries, self.hidden_dim)
position = torch.arange(0, num_queries, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim))
pos[:, 0::2] = torch.sin(position * div_term)
pos[:, 1::2] = torch.cos(position * div_term)
return pos.unsqueeze(1).repeat(1, x.shape[1], 1)
class FeedForward(nn.Module):
def __init__(self, embed_dim=256, ffn_dim=512, dropout=0.0):
'''
FFN + add&Norm
'''
super(FeedForward, self).__init__()
self.fc_1 = nn.Conv1d(in_channels=embed_dim, out_channels=ffn_dim, kernel_size=1)
self.fc_2 = nn.Conv1d(in_channels=ffn_dim, out_channels=embed_dim, kernel_size=1)
self.fc_1 = nn.Linear(in_features=embed_dim, out_features=ffn_dim)
self.fc_2 = nn.Linear(in_features=ffn_dim, out_features=embed_dim)
self.norm = nn.LayerNorm(embed_dim)
self.drop = nn.Dropout(dropout)
def forward(self, x):
output = self.drop(self.fc_2(F.relu(self.fc_1(x.transpose(1, 2))))) + x.transpose(1, 2)
output = self.norm(output.transpose(1, 2))
output = self.drop(self.fc_2(F.relu(self.fc_1(x)))) + x
output = self.norm(output)
return output
class EncoderLayer(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0):
'''
Architecture: Multihead Attention + add&Norm + FFN + Add&Norm
'''
super(EncoderLayer, self).__init__()
self.attention = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
self.feedforward = FeedForward(embed_dim=embed_dim, ffn_dim=ffn_dim, dropout=dropout)
self.norm = nn.LayerNorm(embed_dim)
def forward(self, x, attn_mask=None):
query = key = x
x, attention = self.attention(query, key, x, attn_mask)
output = self.feedforward(x)
return output, attention
class DecoderLayer(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0):
super(DecoderLayer, self).__init__()
self.attention = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
self.feedforward = FeedForward(embed_dim=embed_dim, ffn_dim=ffn_dim, dropout=dropout)
def add_pos_embed(self, x, pos:Optional[torch.Tensor]=None):
return x if pos is None else x + pos
def forward(self, x, queries, attn_mask=None):
output, attention = self.attention(queries, x, x, attn_mask)
output = self.feedforward(output)
def forward(self, x, mask:Optional[torch.Tensor]=None, pos:Optional[torch.Tensor]=None):
query = key = self.add_pos_embed(x, pos)
ftrs, attention = self.attention(query, key, x, mask)
ftrs = self.norm(ftrs + x)
output = self.feedforward(ftrs)
return output, attention
class Encoder(nn.Module):
@ -105,20 +94,44 @@ class Encoder(nn.Module):
layer = EncoderLayer(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout)
self.encoder = __clones__(layer, num_layers)
def forward(self, x):
def forward(self, x, mask:Optional[torch.Tensor]=None, pos:Optional[torch.Tensor]=None):
output, attns = x, []
for layer in self.encoder:
output, attention = layer(output)
output, attention = layer(x=output, mask=mask, pos=pos)
attns.append(attention)
return output, attns
class DecoderLayer(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0):
super(DecoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
self.attention = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
self.feedforward = FeedForward(embed_dim=embed_dim, ffn_dim=ffn_dim, dropout=dropout)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
def add_pos_embed(self, x, pos:Optional[torch.Tensor]=None):
return x if pos is None else x + pos
def forward(self, x, queries, mask:Optional[torch.Tensor]=None, queries_mask:Optional[torch.Tensor]=None,
pos:Optional[torch.Tensor]=None, queries_pos:Optional[torch.Tensor]=None):
q = k = self.add_pos_embed(queries, queries_pos)
queries_, _ = self.self_attn(q, k, queries, queries_mask)
queries = self.norm1(queries_ + queries)
output, attention = self.attention(self.add_pos_embed(queries, queries_mask),
self.add_pos_embed(x, pos), self.add_pos_embed(x, pos), mask)
output = self.norm2(output + queries)
output = self.feedforward(output)
return output, attention
class Decoder(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0, num_layers=8, hidden_dim=256):
super(Decoder, self).__init__()
layer = DecoderLayer(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout)
self.decoder = __clones__(layer, num_layers)
def forward(self, x, queries):
def forward(self, x, queries, mask:Optional[torch.Tensor]=None, queries_mask:Optional[torch.Tensor]=None,
pos:Optional[torch.Tensor]=None, queries_pos:Optional[torch.Tensor]=None):
output, attns = x, []
for layer in self.decoder:
output, attention = layer(output, queries)
@ -155,33 +168,35 @@ class Segmentor(nn.Module):
return x
class Perception(nn.Module):
def __init__(self, pretrained=True, num_queries=100, num_classes={"obj": 13, "seg": 2}, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0, num_layers=8, hidden_dim=256):
def __init__(self, pretrained=True, num_queries=100, num_classes={"obj": 13, "seg": 2}, embed_dim=256, num_heads=8, ffn_dim=512, dropout=0.0, num_layers=2, hidden_dim=256):
super(Perception, self).__init__()
self.backbone = ResNet(pretrained=True)
self.position = PositionEmbedding(num_queries=num_queries)
self.compress = Compression(back_dim=2048, embed_dim=embed_dim)
self.encoder = Encoder(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout, num_layers=num_layers, hidden_dim=hidden_dim)
self.decoder = Decoder(embed_dim=embed_dim, num_heads=num_heads, ffn_dim=ffn_dim, dropout=dropout, num_layers=num_layers, hidden_dim=hidden_dim)
self.detector = Detector(num_classes=num_classes["obj"], embed_dim=embed_dim)
self.segmentor = Segmentor(num_classes=num_classes["seg"], embed_dim=embed_dim)
requires_grad(self.backbone)
def forward(self, x, queries):
h_ori, w_ori = x.size(2), x.size(3)
x = padding(x, 32)
h, w = x.size(2), x.size(3)
output_ori = self.compress(self.backbone(x))
output = self.position(output_ori) + output_ori.flatten(2).permute(2, 0, 1)
output_enc, attns_enc = self.encoder(output)
output, attns_dec = self.decoder(output_enc, queries)
dect, seg = self.detector(output), unpadding(self.segmentor(output_enc.permute(1, 2, 0).reshape(output_ori.shape)), (h_ori, w_ori))
return {"decoded": output, "detection": dect, "segment": seg}, attns_enc, attns_dec
self.query_embed = nn.Embedding(num_queries, hidden_dim)
def forward(self, x):
b, c, h_ori, w_ori = x.shape
queries = self.query_embed.weight.unsqueeze(1).repeat(1, b, 1)
x, mask = padding(x, 32)
h, w = x.size(2), x.size(3)
back_ftrs, back_mask = self.backbone(x)
features = self.compress(back_ftrs)
features_flat, position = features.flatten(2).permute(2, 0, 1), self.position(features)
enc_ftrs, attns_enc = self.encoder(x=features_flat, mask=back_mask.flatten(1), pos=position)
dec_ftrs, attns_dec = self.decoder(x=enc_ftrs, queries=queries, mask=back_mask.flatten(1), pos=position, queries_pos=self.query_embed)
dect, seg = self.detector(dec_ftrs), unpadding(self.segmentor(features + enc_ftrs.reshape([b, -1, back_ftrs.size(2), back_ftrs.size(3)])), mask.unsqueeze(0).unsqueeze(1))
return {"decoded": dec_ftrs, "detection": dect, "segment": seg}, attns_enc, attns_dec
if __name__ == "__main__":
from src.perception.model.perception import *
x = torch.randn([2, 3, 720, 1280])
x = torch.randn([2, 3, 360, 640])
perception = Perception()
queries = torch.randn([100, 2, 256])
output, attns_enc, attns_dec = perception(x, queries)

View File

@ -0,0 +1,79 @@
import copy
from typing import Optional, List
from src.perception.base import *
from torchvision.models._utils import IntermediateLayerGetter
class DualConv(nn.Module):
def __init__(self, in_channel=64, out_channel=128, stride=1, activate="relu"):
super(DualConv, self).__init__()
if activate == "elu":
self.activate = nn.ELU()
elif activate == "leaklyrelu":
self.activate = nn.LeakyReLU(0.2)
else:
self.activate = nn.ReLU()
self.convblocks = nn.Sequential(
nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, padding_mode="reflect", bias=False),
nn.InstanceNorm2d(out_channel, affine=False),
self.activate,
nn.Conv2d(out_channel, out_channel, kernel_size=3, stride=stride, padding=1, padding_mode="reflect", bias=False),
nn.InstanceNorm2d(out_channel, affine=False),
self.activate
)
def forward(self, x):
output = self.convblocks(x)
return output
class Encoder(nn.Module):
def __init__(self, channels=[3, 64, 128, 256, 512, 1024]):
super(Encoder, self).__init__()
self.encblocks = nn.ModuleList([
DualConv(channels[i], channels[i+1], stride=1) if i == 0 else DualConv(channels[i], channels[i+1], stride=2)
for i in range(len(channels) - 1)
])
def forward(self, x):
output = []
for i, encblock in enumerate(self.encblocks):
x = encblock(x)
output.append(x)
return output
class Decoder(nn.Module):
def __init__(self, channels=[1024, 512, 256, 128, 64]):
super(Decoder, self).__init__()
self.upsample = nn.ModuleList([
nn.Sequential(nn.Conv2d(channels[i], channels[i+1], kernel_size=1),
nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)) for i in range(len(channels) - 1)
])
self.decblocks = nn.ModuleList([
DualConv(channels[i], channels[i+1]) for i in range(len(channels) - 1)
])
def forward(self, x, enc_ftrs):
for i, decblock in enumerate(self.decblocks):
x = self.upsample[i](x)
x = decblock(torch.cat([x, enc_ftrs[i]], dim=1))
return x
class UNet(nn.Module):
def __init__(self, enc_channels=[3, 64, 128, 256, 512, 1024], dec_channels=[1024, 512, 256, 128, 64], out_channel=3):
super(UNet, self).__init__()
self.encoder = Encoder(channels=enc_channels)
self.decoder = Decoder(channels=dec_channels)
self.output = nn.Conv2d(dec_channels[-1], out_channel, kernel_size=1)
def forward(self, x):
enc_out = self.encoder(x)
dec_out = self.decoder(enc_out[::-1][0], enc_out[::-1][1:])
output = self.output(dec_out)
return output
def weights_init(self, m):
if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
nn.init.kaiming_normal_(m.weight)

View File

@ -0,0 +1,99 @@
from src.perception.base import *
class DiceLoss(nn.Module):
def __init__(self):
super(DiceLoss, self).__init__()
self.epsilon = 1e-5
def forward(self, x, y):
assert x.size() == y.size(), "the size of x and y must be equal."
num = x.size(0)
pre = torch.sigmoid(x).view(num, -1)
tar = y.view(num, -1)
intersection = (pre * tar).sum(-1).sum()
union = (pre + tar).sum(-1).sum()
score = 1 - 2 * (intersection + self.epsilon) / (union + self.epsilon)
return score
class TopoLoss(nn.Module):
def __init__(self, k):
super(TopoLoss, self).__init__()
self.k, self.epsilon = k, 1e-5
self.criterion = DiceLoss()
def dilate(self, image):
B, C, H ,W = image.size()
pad = (self.k - 1) // 2
image = F.pad(image, pad=[pad, pad, pad, pad], mode='reflect')
patches = image.unfold(dimension=2, size=self.k, step=1)
patches = patches.unfold(dimension=3, size=self.k, step=1)
dilate, _ = patches.reshape(B, C, H ,W, -1).max(dim=-1)
return dilate
def erode(self, image):
B, C, H ,W = image.size()
pad = (self.k - 1) // 2
image = F.pad(image, pad=[pad, pad, pad, pad], mode='reflect')
patches = image.unfold(dimension=2, size=self.k, step=1)
patches = patches.unfold(dimension=3, size=self.k, step=1)
dilate, _ = patches.reshape(B, C, H ,W, -1).min(dim=-1)
return dilate
def forward(self, x, y):
x_d, y_d = self.erode(x), self.erode(y)
return self.criterion(x_d, y_d)
class DifferentialLoss(nn.Module):
def __init__(self):
super(DifferentialLoss, self).__init__()
self.loss = nn.BCELoss()
def forward(self, x, y):
x_mul = x[:,0,:,:] * x[:,1,:,:]
y_mul = y[:,0,:,:] * y[:,1,:,:]
loss = self.loss(x_mul, y_mul)
return loss
class ModifiedBCELoss(nn.Module):
def __init__(self):
super(ModifiedBCELoss, self).__init__()
self.bce_loss = nn.BCEWithLogitsLoss()
def forward(self, x, y):
B, C, H, W = x.size()
loss = 0
for i in range(C):
loss += self.bce_loss(x[:, i, :, :], y[:, i, :, :])
return loss
class ModifiedCELoss(nn.Module):
def __init__(self):
super(ModifiedCELoss, self).__init__()
self.ce_loss = nn.CrossEntropyLoss()
def __labelize__(self, y):
new_label = torch.zeros_like(y[:,0,:,:].unsqueeze(1))
new_label[y[:,0,:,:].unsqueeze(1) == 1] = 3
new_label[y[:,1,:,:].unsqueeze(1) == 1] = 2
new_label[(y[:,0,:,:].unsqueeze(1) == 1) & (y[:,1,:,:].unsqueeze(1) == 1)] = 1
new_label[(y[:,0,:,:].unsqueeze(1) == 0) & (y[:,1,:,:].unsqueeze(1) == 0)] = 0
return new_label
def forward(self, x, y):
new_y = self.__labelize__(y=y)
loss = self.ce_loss(x, new_y)
return loss
class SegmentLoss(nn.Module):
def __init__(self, loss=["BCE"], LAMBDA=[1]):
assert len(loss) == len(LAMBDA)
super(SegmentLoss, self).__init__()
self.loss, self.LAMBDA = loss, LAMBDA
self.loss_list = {"BCE": nn.BCEWithLogitsLoss(), "ModiBCE": ModifiedBCELoss(),
"ModiCE": ModifiedBCELoss(), "Topo":TopoLoss(k=5), "Diff": DifferentialLoss(),
"Dice": DiceLoss()}
def forward(self, x, y):
loss = 0
for i, (loss_, LAMBDA) in enumerate(zip(self.loss, self.LAMBDA)):
loss += LAMBDA * self.loss_list[loss_](x, y)
return loss