# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
print(dirname)
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session
import torch
import torch.nn as nn
import torchvision.transforms.functional as F
from torch.utils.data import Dataset,DataLoader
class DoubleConv(nn.Module):
def __init__(self, in_channels, out_channels):
super(DoubleConv, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
)
def forward(self, x):
return self.conv(x)
class UNET(nn.Module):
def __init__(
self, in_channels=3, out_channels=1, features=[64, 128, 256, 512],
):
super(UNET, self).__init__()
self.ups = nn.ModuleList()
self.downs = nn.ModuleList()
self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
# Down part of UNET
for feature in features:
self.downs.append(DoubleConv(in_channels, feature))
in_channels = feature
# Up part of UNET
for feature in reversed(features):
self.ups.append(
nn.ConvTranspose2d(
feature*2, feature, kernel_size=2, stride=2,)
)
self.ups.append(DoubleConv(feature*2, feature))
self.bottleneck = DoubleConv(features[-1], features[-1]*2)
self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)
def forward(self, x):
skip_connections = []
for down in self.downs:
x = down(x)
skip_connections.append(x)
x = self.pool(x)
x = self.bottleneck(x)
skip_connections = skip_connections[::-1]
for idx in range(0, len(self.ups), 2):
x = self.ups[idx](x)
skip_connection = skip_connections[idx//2]
# In case we have given image with odd dimensions
if x.shape != skip_connection.shape:
x = F.resize(x, size=skip_connection.shape[2:])
concat_skip = torch.cat((skip_connection, x), dim=1)
x = self.ups[idx+1](concat_skip)
return self.final_conv(x)
def test():
x = torch.randn((3,1,161,161))
model = UNET(in_channels=1, out_channels=1)
preds = model(x)
print(preds.shape)
print(x.shape)
assert preds.shape == x.shape
test()
import os
import PIL
from PIL import Image
from torch.utils.data import Dataset
import numpy as np
class CarvanaDataset(Dataset):
def __init__(self,images,image_dir,mask_dir,transform=None,train=True):
self.image_dir = image_dir
self.mask_dir = mask_dir
self.transform = transform
self.isTrain = train
self.images = images
def __len__(self):
return len(self.images)
def __getitem__(self,index):
img_path = os.path.join(self.image_dir,self.images[index])
mask_path = os.path.join(self.mask_dir,self.images[index].replace(".jpg","_mask.gif"))
image = np.array(Image.open(img_path).convert("RGB"))
mask = np.array(Image.open(mask_path).convert("L"),dtype=np.float32)
mask[mask == 255.0] = 1.0
if self.transform is not None:
augmentations = self.transform(image=image,mask=mask)
image = augmentations['image']
mask = augmentations['mask']
return {"image":image,"mask":mask}
train_dirs = "./train/" train_mask_dirs = "./train_masks/" train_transform=None
loader= get_loaders( train_img_dir, train_mask_dir, batch_size, num_workers, pin_memory, )
for batch_idx, (data, targets) in enumerate(loader): data = data.to(device) targets = targets.float().unsqueeze(1).to(device)
import zipfile
# 'test.zip'
dirs = ['train.zip','train_masks.zip']
for x in dirs:
with zipfile.ZipFile("../input/carvana-image-masking-challenge/"+ x,'r') as z:
z.extractall(".")
!ls
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.optim as optim
LEARNING_RATE = 1e-4
SPLIT=0.2
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 4
EPOCHS = 4
NUM_WORKERS = 4
IMAGE_HEIGHT = 572
IMAGE_WIDTH = 572
PIN_MEMORY = True
DATAPATH = "../input/carvana-image-masking-challenge/"
TRAIN_IMG_DIR = '../input/unet-practice/train/'
TRAIN_MASK_DIR = '../input/unet-practice/train_masks/'
images = os.listdir(TRAIN_IMG_DIR)
masks = os.listdir(TRAIN_MASK_DIR)
import matplotlib.pyplot as plt
img = np.array(Image.open(TRAIN_IMG_DIR+"/"+images[0]).convert("RGB"))
plt.imshow(img,cmap="gray")
print(img.shape)
msk = np.array(Image.open(TRAIN_MASK_DIR+"/"+images[0].replace(".jpg","_mask.gif")).convert("L"))
plt.imshow(msk,cmap="gray")
print(msk.shape)
def fit(model,dataloader,data,optimizer,criterion):
print('-------------Training---------------')
model.train()
train_running_loss = 0.0
counter=0
# num of batches
num_batches = int(len(data)/dataloader.batch_size)
for i,data in tqdm(enumerate(dataloader),total=num_batches):
counter+=1
image,mask = data["image"].to(DEVICE),data["mask"].to(DEVICE)
optimizer.zero_grad()
outputs = model(image)
outputs =outputs.squeeze(1)
loss = criterion(outputs,mask)
train_running_loss += loss.item()
loss.backward()
optimizer.step()
train_loss = train_running_loss/counter
return train_loss
def validate(model,dataloader,data,criterion):
print("\n--------Validating---------\n")
model.eval()
valid_running_loss = 0.0
counter = 0
# number of batches
num_batches = int(len(data)/dataloader.batch_size)
with torch.no_grad():
for i,data in tqdm(enumerate(dataloader),total=num_batches):
counter+=1
image,mask = data["image"].to(DEVICE),data["mask"].to(DEVICE)
outputs = model(image)
outputs =outputs.squeeze(1)
loss = criterion(outputs,mask)
valid_running_loss += loss.item()
valid_loss = valid_running_loss/counter
return valid_loss
train_transform = A.Compose([
A.Resize(IMAGE_HEIGHT,IMAGE_WIDTH),
A.Rotate(limit=35,p=1.0),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.1),
A.Normalize(
mean=[0.0,0.0,0.0],
std = [1.0,1.0,1.0],
max_pixel_value=255.0
),
ToTensorV2()
])
validation_transform = A.Compose([
A.Resize(IMAGE_HEIGHT,IMAGE_WIDTH),
A.Normalize(
mean = [0.0,0.0,0.0],
std = [1.0,1.0,1.0],
max_pixel_value=255.0,
),
ToTensorV2()
])
def train_test_split(images,splitSize):
imageLen = len(images)
val_len = int(splitSize*imageLen)
train_len = imageLen - val_len
train_images,val_images = images[:train_len],images[train_len:]
return train_images,val_images
train_images_path,val_images_path = train_test_split(images,SPLIT)
train_data = CarvanaDataset(train_images_path,TRAIN_IMG_DIR,TRAIN_MASK_DIR,train_transform,True)
valid_data = CarvanaDataset(val_images_path,TRAIN_IMG_DIR,TRAIN_MASK_DIR,validation_transform,True)
train_dataloader = DataLoader(train_data,batch_size=BATCH_SIZE,shuffle=True)
valid_dataloader = DataLoader(valid_data,batch_size=BATCH_SIZE,shuffle=False)
train_loss = []
val_loss =[]
model = UNET().to(DEVICE)
optimizer = optim.Adam(model.parameters(),lr=LEARNING_RATE)
criterion = nn.BCEWithLogitsLoss()
for epoch in range(EPOCHS):
print(f"Epoch {epoch+1} of {EPOCHS}")
train_epoch_loss = fit(model, train_dataloader, train_data,optimizer,criterion)
val_epoch_loss = validate(model, valid_dataloader, valid_data, criterion)
train_loss.append(train_epoch_loss)
val_loss.append(val_epoch_loss)
print(f"Train Loss: {train_epoch_loss:.4f}")
print(f'Val Loss: {val_epoch_loss:.4f}')
# loss plots
plt.figure(figsize=(10, 7))
plt.plot(train_loss, color="orange", label='train loss')
plt.plot(val_loss, color="red", label='validation loss')
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
# plt.savefig(f"../input/loss.png")
plt.show()
torch.save({
'epoch': EPOCHS,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': criterion,
}, "./model.pth")
print("\n---------DONE TRAINING----------\n")
checkpoint = torch.load('../input/unet-practice/model.pth')
model = UNET()
model.load_state_dict(checkpoint["model_state_dict"])
model.eval()
model.cuda()
data = train_data.__getitem__(101)
plt.imshow(data['mask'],cmap="gray")
print(train_data.__getitem__(0)['mask'].shape)
# for Testing on Single datapoint after training
# plt.imshow(np.transpose(np.array(data['image']),(1,2,0)),cmap="gray")
# print(data['image'].shape)
# img = data['image'].unsqueeze(0).to(device="cuda")
# model = UNet()
output = model(img)
output = torch.squeeze(output)
output[output>0.0] = 1.0
output[output<=0.0]=0
# print(torch.max(output))
# print(output.shape)
disp = output.detach().cpu()
plt.imshow(disp,cmap="gray")
torch.save({
'epoch': EPOCHS,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': criterion,
}, "./model.pth")
!pip install gradio
import numpy as np
import gradio as gr
def sepia(input_img):
sepia_filter = np.array(
[[0.393, 0.769, 0.189], [0.349, 0.686, 0.168], [0.272, 0.534, 0.131]]
)
sepia_img = input_img.dot(sepia_filter.T)
sepia_img /= sepia_img.max()
return sepia_img
demo = gr.Interface(sepia, gr.Image(shape=(200, 200)), "image")
demo.launch(share=True)
import cv2
import numpy as np
import albumentations as A
def preprocess(img_filepath):
image = cv2.imread(img_filepath)
test_transform = A.Compose([
A.Resize(IMAGE_HEIGHT,IMAGE_WIDTH),
A.Normalize(
mean = [0.0,0.0,0.0],
std = [1.0,1.0,1.0],
max_pixel_value=255.0,
),
ToTensorV2()
])
aug = test_transform(image=image)
image = aug['image']
# image = image.transpose((2,0,1))
# #image normalize
# mean_vec = np.array([0.485, 0.456, 0.406])
# std_vec = np.array([0.229, 0.224, 0.225])
# for i in range(image.shape[0]):
# image[i, :, :] = (image[i, :, :] - mean_vec[i]) / (std_vec[i])
# image = np.stack([image]*1)
return image
import torch
def predict(input_image):
output = model(img)
output = torch.squeeze(output)
output[output>0.0] = 1.0
output[output<=0.0]=0
return output
import gradio as gr
import numpy as np
import matplotlib.pyplot as plt
# from preprocess import preprocess
# from predict import predict
def inference(filepath):
# input_batch = preprocess(filepath)
# result = predict(input_batch)
# pred_mask = np.array(result).astype(np.float32)
# pred_mask = pred_mask * 255
# pred_mask = pred_mask[0, 0, 0, :, :].astype(np.uint8)
# plt.imshow(pred_mask)
# plt.title("Predicted Tumor Mask")
# print(data['image'].shape)
input_batch = preprocess(filepath)
result = predict(input_batch)
plt.imshow(result,cmap="gray")
plt.title("Segmented Image")
return plt
title = "Carvana Image Segmentation using PyTorch"
description = "Segmentation of cars from Carvana Dataset"
article = "<p style='text-align: center'><a href='https://www.kaggle.com/' target='_blank'>Kaggle Notebook: Brain MRI-UNET-PyTorch</a> | <a href='https://github.com/' target='_blank'>Github Repo</a></p>"
examples = [['../input/unet-practice/train/00087a6bd4dc_01.jpg'],
['../input/unet-practice/train/00087a6bd4dc_04.jpg'],
['../input/unet-practice/train/00087a6bd4dc_11.jpg'],
['../input/unet-practice/train/0d1a9caf4350_02.jpg']]
gr.Interface(inference, inputs=gr.inputs.Image(type="filepath"), outputs=gr.outputs.Image('plot'), title=title,
description=description,
article=article,
examples=examples).launch(share=True,debug=False, enable_queue=True)
plt