Autonomous Truck Simulator with PyTorch

Source: Deep Learning on Medium

Autonomous Truck Simulator with PyTorch — finetuning and single shot detectors

This is a continuation of a previous post where I do a full walkthrough of how to build an autonomous truck simulator using fast.ai, but ultimately these methods can work on any case where you need to finetune pretrained models or develop models that predict bounding boxes and classes together.

Now my goal is to walk through some of the more technical aspects of the training and inference processes and explain the details of how they are implemented in PyTorch. You can also reference the codebase in this Github repo.

Recall from the last post that there are two neural networks at work here.

  1. The DNN to predict the turning direction.
  2. The DNN to predict the bounding boxes and classes of cars, people, etc.

Finetuning the turning directions model

Both networks begin with a pretrained resnet34 network and are finetuned to the appropriate task.

A pretrained resnet34 can be obtained from torchvision.models

import torchvision.models as modelsarch = models.resnet34(pretrained=True)

All of the pretrained models have been pretrained on the 1000-class Imagenet dataset.

To finetune a pretrained network we are essentially just starting with a bunch of weights that already have a lot of information about the Imagenet dataset embedded in them. So we can do this one of two ways. One way would be to freeze all of the early layers by setting requires_grad=False and then only have requires_grad=True for the final layers. Another way would be to just use all of the weights as an initialization and continue training on our new training data.

For option 1 where we freeze early layers and train only the final layers, we can set requires_grad=False for all layers and then remove and replace the last layers (whenever you assign a layer to a network it automatically sets the requires_grad attribute to True).

class Flatten(nn.Module):
def __init__(self):
super(Flatten, self).__init__()
def forward(self, x):
x = x.view(x.size(0), -1)
return x
class normalize(nn.Module):
def __init__(self):
super(normalize, self).__init__()
def forward(self, x):
x = F.normalize(x, p=2, dim=1)
return x
layer_list = list(arch.children())[-2:]
arch = nn.Sequential(*list(arch.children())[:-2])
arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
arch.fc = nn.Sequential(
Flatten(),
nn.Linear(in_features=layer_list[1].in_features,
out_features=3,
bias=True),
normalize()
)
arch = arch.to(device)

If you look at the architecture of the resnet34 you can see the last conv block is followed by an AdaptiveAvgPool2d and a Linear layer.

(2): BasicBlock(
(conv1): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=nn.Sequential, track_running_stats=True)
(relu): ReLU(inplace=True)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(avgpool): AdaptiveAvgPool2d(output_size=(1, 1))
(fc): Linear(in_features=512, out_features=1000, bias=True)
)

We can remove the final two layers with nn.Sequential(*list(arch.children())[:-2]) , and then re-attach them to the end with arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1)) and another nn.Sequential with aFlatten, Linear, and normalize layers. We ultimately want to predict 3 classes: left, right, straight — so our out_features will be 3.

Now we will create our Dataset and Dataloader for the directions model. Since our data are simply images and classes [left, right, straight], we could just use the built-in torch dataset class, but I like to use a custom class regardless because I can see exactly how the data are pulled more easily.

class DirectionsDataset(Dataset):
"""Directions dataset."""
def __init__(self, csv_file, root_dir, transform=None):
"""
Args:
csv_file (string): Path to the csv file with labels.
root_dir (string): Directory with all the images.
transform (callable, optional): Optional transform
"""
self.label = pd.read_csv(csv_file)
self.root_dir = root_dir
self.transform = transform
def __len__(self):
return len(self.label)
def __getitem__(self, idx):
img_name = os.path.join(self.root_dir,
self.label.iloc[idx, 0])
image = io.imread(img_name+'.jpg')
sample = image
label = self.label.iloc[idx, 1]
if self.transform:
sample = self.transform(sample)
return sample, label

My image names in the csv file don’t have extensions hence the img_name+’.jpg’ .

tensor_dataset = DirectionsDataset(csv_file='data/labels_directions.csv',
root_dir='data/train3/',
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(
(0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))
dataloader = DataLoader(tensor_dataset, batch_size=16, shuffle=True)

So we are ready to begin training the model.

def train_model(model, criterion, optimizer, scheduler, 
dataloader, num_epochs=25):
since = time.time()
FT_losses = []
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
iters = 0
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
scheduler.step()
model.train() # Set model to training mode
running_loss = 0.0
running_corrects = 0
# Iterate over data.
for i, (inputs, labels) in enumerate(dataloader):
#set_trace()
inputs = inputs.to(device)
labels = labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward
# track history if only in train
model.eval() # Set model to evaluate mode
with torch.no_grad():
outputs = model(inputs)
#set_trace()
_, preds = torch.max(outputs, 1)

outputs = model(inputs)
loss = criterion(outputs, labels)
# backward + optimize only if in training phase
loss.backward()
optimizer.step()
FT_losses.append(loss.item())
# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
#set_trace()
iters += 1

if iters % 2 == 0:
print('Prev Loss: {:.4f} Prev Acc: {:.4f}'.format(
loss.item(), torch.sum(preds == labels.data) / inputs.size(0)))
epoch_loss = running_loss / dataset_size
epoch_acc = running_corrects.double() / dataset_size
print('Loss: {:.4f} Acc: {:.4f}'.format(
epoch_loss, epoch_acc))
# deep copy the model
if epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# load best model weights
model.load_state_dict(best_model_wts)
return model, FT_losses

In this training loop we can keep track of the best model weights if that epoch accuracy is the best so far. We can also keep track of the losses at each iteration and each epoch and return that at the end to plot and see what it looks like for debugging or presentation.

Keep in mind that the model is being trained at each iteration and if you stop the training loop it will retain those weights and training can continue again by simply running the train_model() command again. To start from the beginning again go back and reinitialize the weights with the pretrained architecture.

criterion = nn.CrossEntropyLoss()
# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(arch.parameters(), lr=1e-2, momentum=0.9)
# Decay LR by a factor of *gamma* every *step_size* epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
arch, FT_losses = train_model(arch, criterion, optimizer_ft, exp_lr_scheduler, dataloader, num_epochs=5)

Finetuning the bounding box model

sample data

Again, we will use a pretrained resnet34 architecture. However, this time we will have to edit it more substantially to output both the class predictions and the bounding box values. In addition, this is a multi-class prediction problem so there may be 1 bounding box or there may be 15 — also 1 or 15 classes.

We will create a custom head to the architecture in a similar way that we replaced the layers in the directions model.

class StdConv(nn.Module):
def __init__(self, nin, nout, stride=2, drop=0.1):
super().__init__()
self.conv = nn.Conv2d(nin, nout, 3, stride=stride, padding=1)
self.bn = nn.BatchNorm2d(nout)
self.drop = nn.Dropout(drop)

def forward(self, x):
return self.drop(self.bn(F.relu(self.conv(x))))

def flatten_conv(x,k):
bs,nf,gx,gy = x.size()
x = x.permute(0,2,3,1).contiguous()
return x.view(bs,-1,nf//k)
class OutConv(nn.Module):
def __init__(self, k, nin, bias):
super().__init__()
self.k = k
self.oconv1 = nn.Conv2d(nin, (len(id2cat)+1)*k, 3, padding=1)
self.oconv2 = nn.Conv2d(nin, 4*k, 3, padding=1)
self.oconv1.bias.data.zero_().add_(bias)

def forward(self, x):
return [flatten_conv(self.oconv1(x), self.k),
flatten_conv(self.oconv2(x), self.k)]
drop=0.4class SSD_MultiHead(nn.Module):
def __init__(self, k, bias):
super().__init__()
self.drop = nn.Dropout(drop)
self.sconv0 = StdConv(512,256, stride=1, drop=drop)
self.sconv1 = StdConv(256,256, drop=drop)
self.sconv2 = StdConv(256,256, drop=drop)
self.sconv3 = StdConv(256,256, drop=drop)
self.out0 = OutConv(k, 256, bias)
self.out1 = OutConv(k, 256, bias)
self.out2 = OutConv(k, 256, bias)
self.out3 = OutConv(k, 256, bias)
def forward(self, x):
x = self.drop(F.relu(x))
x = self.sconv0(x)
x = self.sconv1(x)
o1c,o1l = self.out1(x)
x = self.sconv2(x)
o2c,o2l = self.out2(x)
x = self.sconv3(x)
o3c,o3l = self.out3(x)
return [torch.cat([o1c,o2c,o3c], dim=1),
torch.cat([o1l,o2l,o3l], dim=1)]

So now we want to connect this custom head to the resnet34 architecture and we have a handy function that does this.

class ConvnetBuilder():
def __init__(self, f, c, is_multi, is_reg, ps=None,
xtra_fc=None, xtra_cut=0,
custom_head=None,pretrained=True):
self.f,self.c,self.is_multi,self.is_reg,self.xtra_cut = f,c,is_multi,is_reg,xtra_cut
xtra_fc = [512]
ps = [0.25]*len(xtra_fc) + [0.5]
self.ps,self.xtra_fc = ps,xtra_fc
cut,self.lr_cut = [8,6] # specific to resnet_34 arch
cut-=xtra_cut
layers = cut_model(f(pretrained), cut)
self.nf = num_features(layers)*2
self.top_model = nn.Sequential(*layers)
n_fc = len(self.xtra_fc)+1
self.ps = [self.ps]*n_fc
fc_layers = [custom_head]
self.n_fc = len(fc_layers)
self.fc_model = nn.Sequential(*fc_layers).to(device)
self.model = nn.Sequential(*(layers+fc_layers)).to(device)
def cut_model(m, cut):
return list(m.children())[:cut] if cut else [m]
def num_features(m):
c=children(m)
if len(c)==0: return None
for l in reversed(c):
if hasattr(l, 'num_features'): return l.num_features
res = num_features(l)
if res is not None: return res
def children(m): return m if isinstance(m, (list, tuple)) else list(m.children())

Using this ConvnetBuilder class we can combine the custom head and the resnet34 architecture.

k = len(anchor_scales)
head_reg4 = SSD_MultiHead(k, -4.)
f_model = models.resnet34
modelss = ConvnetBuilder(f_model, 0, 0, 0, custom_head=head_reg4)

k is 9

We can now access the model via the model attribute on modelss .

The loss function must be able to accept both classifications (classes) and continuous values (bounding boxes) and output a single loss value.

def ssd_loss(pred,targ,print_it=False):
lcs,lls = 0.,0.
for b_c,b_bb,bbox,clas in zip(*pred,*targ):
loc_loss,clas_loss = ssd_1_loss(b_c,b_bb,bbox,clas,print_it)
lls += loc_loss
lcs += clas_loss
if print_it:
print(f'loc: {lls.data.item()}, clas: {lcs.data.item()}')
return lls+lcs
def ssd_1_loss(b_c,b_bb,bbox,clas,print_it=False):
bbox,clas = get_y(bbox,clas)
a_ic = actn_to_bb(b_bb, anchors)
overlaps = jaccard(bbox.data, anchor_cnr.data)
gt_overlap,gt_idx = map_to_ground_truth(overlaps,print_it)
gt_clas = clas[gt_idx]
pos = gt_overlap > 0.4
pos_idx = torch.nonzero(pos)[:,0]
gt_clas[1-pos] = len(id2cat)
gt_bbox = bbox[gt_idx]
loc_loss = ((a_ic[pos_idx] - gt_bbox[pos_idx]).abs()).mean()
clas_loss = loss_f(b_c, gt_clas)
return loc_loss, clas_loss
def one_hot_embedding(labels, num_classes):
return torch.eye(num_classes)[labels.data.long().cpu()]
class BCE_Loss(nn.Module):
def __init__(self, num_classes):
super().__init__()
self.num_classes = num_classes
def forward(self, pred, targ):
t = one_hot_embedding(targ, self.num_classes+1)
t = V(t[:,:-1].contiguous()).cpu()
x = pred[:,:-1]
w = self.get_weight(x,t)
return F.binary_cross_entropy_with_logits(x, t, w, size_average=False)/self.num_classes

def get_weight(self,x,t): return None
loss_f = BCE_Loss(len(id2cat))def get_y(bbox,clas):
bbox = bbox.view(-1,4)/sz
bb_keep = ((bbox[:,2]-bbox[:,0])>0).nonzero()[:,0]
return bbox[bb_keep],clas[bb_keep]
def actn_to_bb(actn, anchors):
actn_bbs = torch.tanh(actn)
actn_centers = (actn_bbs[:,:2]/2 * grid_sizes) + anchors[:,:2]
actn_hw = (actn_bbs[:,2:]/2+1) * anchors[:,2:]
return hw2corners(actn_centers, actn_hw)
def intersect(box_a, box_b):
max_xy = torch.min(box_a[:, None, 2:], box_b[None, :, 2:])
min_xy = torch.max(box_a[:, None, :2], box_b[None, :, :2])
inter = torch.clamp((max_xy - min_xy), min=0)
return inter[:, :, 0] * inter[:, :, 1]
def box_sz(b): return ((b[:, 2]-b[:, 0]) * (b[:, 3]-b[:, 1]))def jaccard(box_a, box_b):
inter = intersect(box_a, box_b)
union = box_sz(box_a).unsqueeze(1) + box_sz(box_b).unsqueeze(0) - inter
return inter / union

We can test out the loss function on a batch output from our bbox model once we set up our dataset and dataloader.

Here we actually need a custom dataset class to work with these data types.

class BboxDataset(Dataset):
"""Bbox dataset."""
def __init__(self, csv_file, root_dir, transform=None):
"""
Args:
csv_file (string): Path to csv file with bounding boxes.
root_dir (string): Directory with all the images.
transform (callable, optional): Optional transform.
"""
self.label = pd.read_csv(csv_file)
self.root_dir = root_dir
self.transform = transform
self.sz = 224
def __len__(self):
return len(self.label)
def __getitem__(self, idx):
img_name = os.path.join(self.root_dir,
self.label.iloc[idx, 0])
image = io.imread(img_name)
sample = image

h, w = sample.shape[:2]; new_h, new_w = (224,224)
bb = np.array([float(x) for x in self.label.iloc[idx, 1].split(' ')], dtype=np.float32)
bb = np.reshape(bb, (int(bb.shape[0]/2),2))
bb = bb * [new_h / h, new_w / w]
bb = bb.flatten()
bb = T(np.concatenate((np.zeros((189*4) - len(bb)), bb), axis=None)) # 189 is 21 * 9 where 9 = k
if self.transform:
sample = self.transform(sample)
return sample, bb
mbb.csv

This custom dataset class deals with bounding boxes, but we want a dataset class that will deal with both classes and bounding boxes.

bb_dataset = BboxDataset(csv_file='data/pascal/tmp/mbb.csv',
root_dir='data/pascal/VOCdevkit2/VOC2007/JPEGImages/',
transform=transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224,224)),
transforms.ToTensor(),
transforms.Normalize(
(0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))
bb_dataloader = DataLoader(bb_dataset, batch_size=16, shuffle=True)

Here we can concatenate the two dataset classes so that with each image the classes and bounding boxes are returned.

class ConcatLblDataset(Dataset):
def __init__(self, ds, y2):
self.ds,self.y2 = ds,y2
self.sz = ds.sz
def __len__(self): return len(self.ds)

def __getitem__(self, i):
self.y2[i] = np.concatenate((np.zeros(189 - len(self.y2[i])), self.y2[i]), axis=None)
x,y = self.ds[i]
return (x, (y,self.y2[i]))
trn_ds2 = ConcatLblDataset(bb_dataset, mcs)

Where mcs is a numpy array of arrays with the classes of each training image.

PATH_pascal = Path('data/pascal')
trn_j = json.load((PATH_pascal / 'pascal_train2007.json').open())
cats = dict((o['id'], o['name']) for o in trn_j['categories'])
mc = [[cats[p[1]] for p in trn_anno[o]] for o in trn_ids]
id2cat = list(cats.values())
cat2id = {v:k for k,v in enumerate(id2cat)}
mcs = np.array([np.array([cat2id[p] for p in o]) for o in mc])

Now we can test out our custom loss.

sz=224
x,y = next(iter(bb_dataloader2))
batch = modelss.model(x)ssd_loss(batch, y, True)tensor([0.6254])
tensor([0.6821, 0.7257, 0.4922])
tensor([0.9563])
tensor([0.6522, 0.5276, 0.6226])
tensor([0.6811, 0.3338])
tensor([0.7008])
tensor([0.5316, 0.2926])
tensor([0.9422])
tensor([0.5487, 0.7187, 0.3620, 0.1578])
tensor([0.6546, 0.3753, 0.4231, 0.4663, 0.2125, 0.0729])
tensor([0.3756, 0.5085])
tensor([0.2304, 0.1390, 0.0853])
tensor([0.2484])
tensor([0.6419])
tensor([0.5954, 0.5375, 0.5552])
tensor([0.2383])
loc: 1.844399333000183, clas: 79.79206085205078

Out[1024]:

tensor(81.6365, grad_fn=<AddBackward0>)

Now to train the ssd model.

beta1 = 0.5
optimizer = optim.Adam(modelss.model.parameters(), lr=1e-3, betas=(beta1, 0.99))
# Decay LR by a factor of *gamma* every *step_size* epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

We can use essentially the same train_model() function as before, but this time we pass a list of the bounding boxes and classes to the loss function ssd_loss() .

Now we have both of our models trained on our new training datasets and we are ready to use them for inference on our truck simulator game.

I encourage you to check out this Github repo to see the full implementation where you can train the models and record training data and test the implementation out on the video game.

Have fun!