In [0]:
import cv2
import os
import re
import random
import math
import numpy as np
from skimage import segmentation, color
from matplotlib import pyplot as plt
from sklearn.datasets import make_spd_matrix
from scipy import stats

### Generic Helper Functions

In [0]:
#Access training images in dataset (stored by individual)
def read_imgs(directory, binary=False):
    dataset = []
    #Build path to dataset (outer directory should be in same directory as code)
    dataset_path = os.path.join(os.getcwd(), 'flower_segmentation', directory)
    #For each file in the subdirectory
    for img_file in os.listdir(dataset_path):
        img_num = int(img_file.split('_')[1][0:5])
        img_path = os.path.join(dataset_path, img_file)
        #ignore non-jpegs (all images stored as jpeg)
        if not img_file.endswith('.jpg') and not img_file.endswith('.png'): continue
        img_path = os.path.join(dataset_path, img_file)
        #Read each image
        if not binary:
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        else:
            img = cv2.imread(img_path, 0)
        #Add to dataset
        dataset.append((img_num, img))
    #Return Numpy array of images
    return np.array(dataset)

In [0]:
#Helper function to store the predicted image masks
#Stores the images in ./flower_segmentation/{directory}
def write_imgs(directory, images):
    cwd = os.getcwd()
    directory_path = os.path.join(cwd, 'flower_segmentation', directory)
    os.mkdir(directory_path)
    os.chdir(directory_path)
    for (name, img) in images:
        #maintain image file name format as provided
        file_name = "image_00" if name > 100 else "image_000"
        file_name = file_name + str(name) + ".png"
        cv2.imwrite(file_name, img)
    os.chdir(cwd)

In [0]:
# [TP, FP, FN, TN]
# Regard Positive as 255 (white), Negative as 0 (black)
def confusion_matrix(truth, predicted, dictionary=False):
    confusion = np.zeros(4)
    truth = truth[1].flatten()
    predicted = predicted[1].flatten()
    if len(truth) != len(predicted): return None
    for pixel in zip(truth, predicted):
        #TN
        if pixel == (0,0):
            confusion[3] = confusion[3] + 1
        #FP
        elif pixel == (0,255):
            confusion[1] = confusion[1] + 1
        #FN
        elif pixel == (255,0):
            confusion[2] = confusion[2] + 1
        #TP
        elif pixel == (255,255):
            confusion[0] = confusion[0] + 1
    #Option to return as dict not an array
    if dictionary: confusion = {"TP": confusion[0], "FP": confusion[1], "FN": confusion[2], "TN": confusion[3]}
    return confusion

#Computes the confusion matrix for all provided truths/predictions
#Returns single aggregate matrix
def confusion_matrix_multi(truths, predictions, dictionary=False):
    if len(truths) != len(predictions): return None
    #Sort by the number associated with each image to ensure correct matching
    truths = sorted(truths, key=lambda truth: truth[0])
    predictions = sorted(predictions, key=lambda prediction: prediction[0])
    #Sum individual confusion matrices together
    confusions = np.sum([confusion_matrix(truths[i], predictions[i]) for i in range(len(truths))], axis=0)
    #Option to return as dict not an array
    if dictionary: confusions = {"TP": confusions[0], "FP": confusions[1], "FN": confusions[2], "TN": confusions[3]}
    return confusions

In [0]:
#Dice score for individual confusion matrix
def DICE(confusion):
    return (2*confusion[0])/(2*confusion[0] + confusion[1] + confusion[2])

#Returns avg DICE score and stddev DICE score for multiple truth/prediction images
def DICE_stats(truths, predictions):
    truths = sorted(truths, key=lambda truth: truth[0])
    predictions = sorted(predictions, key=lambda prediction: prediction[0])
    confusions = [confusion_matrix(truths[i], predictions[i]) for i in range(len(truths))]
    dice = [DICE(confusion) for confusion in confusions]
    return(np.sum(dice)/len(dice), np.std(dice))

#Returns DICE score of aggregated (single) confusion matrix
def DICE_raw(truths, predictions):
    confusion = confusion_matrix_multi(truths, predictions)
    return DICE(confusion)

In [0]:
#Helper function to determine a label's avg. distance from the center of an image
#Ran into issues with flower being colored black & background white so I use this as a heuristic for which label is flower
#Assume the flower labels will be near the center of the image
def avg_dist_to_center(img, label, dictionary=False, shape=None):
    total_dist_row = 0
    total_dist_col = 0
    total = 1
    #This is used for the output of GMM labelling with is (row, col): value
    if dictionary:
        row_center = shape[0]//2
        col_center = shape[1]//2
        for ((row, col), value) in img.items():
            if value == label:
                total_dist_row = total_dist_row + (row_center - row) if row < row_center else total_dist_row + (row - row_center)
                total_dist_col = total_dist_col + (col_center - col) if col < col_center else total_dist_col + (col - col_center)
                total = total + 1
    #This is used for the output of the cut segmentation which is 2D array (imgage shape) of labels
    else:
        row_center = img.shape[0]//2
        col_center = img.shape[1]//2
        for row in range(img.shape[0]):
            for col in range(img.shape[1]):
                if img[row][col] == label:
                    total_dist_row = total_dist_row + (row_center - row) if row < row_center else total_dist_row + (row - row_center)
                    total_dist_col = total_dist_col + (col_center - col) if col < col_center else total_dist_col + (col - col_center)
                    total = total + 1
    #Take avg. distance in row/col dimenstions for the label
    #Compute distance from these averages
    avg_dist_row = total_dist_row / total
    avg_dist_col = total_dist_col / total
    return math.sqrt(avg_dist_row**2 + avg_dist_col**2)

## Preprocessing Images

#### Import raw images & ground-truth masks

All images are stored in pattern: (img_number, image)

In [0]:
images = read_imgs('images')
masks = sorted(read_imgs('segmentation', binary=True), key=lambda img: img[0])

#### Preprocess Images with Bilateral Filter
Smooths the image but maintains edge-like features...

In [0]:
pp_images = [(name, cv2.bilateralFilter(img,50,50,10)) for (name, img) in images]

## GMM Segmentation

GMM code was mostly recycled from group member Durham Abric's (260659760) Assignment 3 for this semester's ECSE415 class...

We first tried using the Scikit-Learn GaussianMixture impelementation, but it was too slow and consumed too many resources (including memory overflow error for training on 20+ images) to be feasible for this project.  We expect it would have improved performance, but without additional compute resources it was impossible.

#### GMM-Specific Helper Functions/Implementation

In [0]:
def calculate_EM_params(data, iters, k):
    #Store for repeated use later
    data_size = len(data)
    
    #Initalize weights, means and covariance of GMM model
    weights = np.ones(k) / k
    means = np.array([random.choice(data) for i in range(k)])
    covariance = np.array([make_spd_matrix(3) for i in range(k)])
    
    #save parameters after each iteration
    iter_params = []
    
    #Perform  iterations (EM Maximization)
    for step in range(iters):

        #Expectation (E) Step
        likelihoods = []
        for i in range(k):
            tmp_likelihood = stats.multivariate_normal.pdf(x=data, mean=means[i], cov=covariance[i])
            likelihoods.append(tmp_likelihood)
        
        #Maximization (M) step
        posteriors = []
        post_sum = np.sum([likelihoods[i] * weights[i] for i in range(k)], axis=0) + 1e-10 #Prevent division by zero
        for i in range(k):
            #calculate posterior probability for cluster
            tmp_posterior = (likelihoods[i] * weights[i]) / post_sum
            posteriors.append(tmp_posterior)
            posterior_sum = (np.sum(posteriors[i]) + 1e-10)

            #update cluster mean
            means[i] = np.sum(posteriors[i].reshape(data_size, 1) * data, axis=0) / posterior_sum
            #update cluster covariance
            covariance[i] = np.dot((posteriors[i].reshape(data_size, 1) * (data - means[i])).T, (data - means[i])) / posterior_sum
            #update cluster weights
            weights[i] = np.mean(posteriors[i])
        
        iter_params.append((means.copy(), covariance.copy(), weights.copy()))
    return iter_params

In [0]:
#Convers list of labels to dict (key: (x, y) of pixel, value: pixel cluster label)
def label_pixels(predictions, shape):
    pixel_labels = {}
    for idx in range(len(predictions)):
        row = idx // shape[1]
        col = idx % shape[1]
        pixel_labels[(row, col)] = predictions[idx]
    return pixel_labels

def classify_EM(img, iters, k, all=False):
    #Compute useful data metrics
    data = img.reshape(img.shape[0] * img.shape[1], img.shape[2])
    #Parameters for gaussian of each cluster after each iteration
    params = calculate_EM_params(data, iters, k)

    #If all - classify each pixel for all iterations and return labels
    #Else only classify for final parameters
    if(all):
        likelihoods = [[stats.multivariate_normal.pdf(x=data, mean=means[i], cov=covariance[i]) for i in range(k)] for (means, covariance, weights) in params]
        predictions = [np.argmax(likelihood, axis=0) for likelihood in likelihoods]
        labels = [label_pixels(pred, img.shape) for pred in predictions]
    else:
        likelihoods = [stats.multivariate_normal.pdf(x=data, mean=params[-1][0][i], cov=params[-1][1][i]) for i in range(k)]
        predictions = np.argmax(likelihoods, axis=0)
        labels = label_pixels(predictions, img.shape)
    return labels

In [0]:
#Build image (numpy array) with distinct colors for each cluster
#Input: cluster dict = {Cluster #: [(i,j) of each pixel]}, img_shape = shape of original image
#Output: Numpy array (image)
def build_clustered_img(pixel_labels, img_shape, cluster_colors):
    #Initialize image to black
    output = np.zeros(img_shape, dtype=np.uint8)
    for i in range(img_shape[0]):
        for j in range(img_shape[1]):
            output[i][j] = cluster_colors[pixel_labels[(i,j)]]
    return output

In [0]:
#Reuses the above helper functions, but implements specifically for 2 components & black/white output
def binary_gmm_segmentation(imgs, iters):
    output = []
    for (name, img) in imgs:
        labels = classify_EM(img, iters, 2)
        #Color flower (central) white, background black
        avg_dist_0 = avg_dist_to_center(labels, 0, dictionary=True, shape = img.shape)
        avg_dist_1 = avg_dist_to_center(labels, 1, dictionary=True, shape = img.shape)
        colors = {0:255, 1:0} if avg_dist_0 < avg_dist_1 else {0:0, 1:255}
        #Call above helper function
        output_img = build_clustered_img(labels, img.shape[0:2], colors)
        output.append((name, output_img))
    return output

#### Calculate Predictions, Cofusion Matrix & DICE

In [0]:
predictions_gmm = sorted(binary_gmm_segmentation(pp_images, 10), key=lambda pred: pred[0])
confusion_gmm = confusion_matrix_multi(masks, predictions_gmm, dictionary=True)
dice_stats_gmm = DICE_stats(masks, predictions_gmm)

In [0]:
print("Confusion Matrix (GMM): {}".format(confusion_gmm))
print("Avg. DICE Score: {}\t Std. Dev. DICE Score: {}".format(dice_stats_gmm[0], dice_stats_gmm[1]))

Confusion Matrix (GMM): [ 6550122.  4567536.  5859391. 16445685.]
Avg. DICE Score: 0.46872660850403186	 Std. Dev. DICE Score: 0.29509148194814666


#### Check Performance Across 5 Validation Sets

In [0]:
validation_gmm = []
for set_num in range(0, 5):
    val_gmm = predictions_gmm[set_num*20:(set_num+1)*20]
    val_dice_gmm = DICE_stats(masks[set_num*20:(set_num+1)*20], val_gmm)
    validation_gmm.append(val_dice_gmm)

In [0]:
for val_num in range(len(validation_gmm)):
    val_score = validation_gmm[val_num]
    print("Validation Set #{}: Avg. DICE Score: {}\t Std. Dev. DICE Score: {}".format(val_num, val_score[0], val_score[1]))

Validation Set #0: Avg. DICE Score: 0.4296743799699425	 Std. Dev. DICE Score: 0.2622713724234282
Validation Set #1: Avg. DICE Score: 0.29264872279962856	 Std. Dev. DICE Score: 0.23524657238606705
Validation Set #2: Avg. DICE Score: 0.4888110176165055	 Std. Dev. DICE Score: 0.27707248514273003
Validation Set #3: Avg. DICE Score: 0.7539163216089377	 Std. Dev. DICE Score: 0.29707778206058977
Validation Set #4: Avg. DICE Score: 0.3785826005251449	 Std. Dev. DICE Score: 0.1544419756734954


#### Store Predicted Masks Using GMM

In [0]:
write_imgs('gmm_segmentation', predictions_gmm)

## Normalized Graph-Cut Segmentation

#### Normalized-Graph Cut Implementation

Utilizes the Scikit-Image implementation. Again, colors central label white (flower) and background black.  Returns list of (img_num, prediction).

In [0]:
def binary_cut_segmentation(imgs, iters, compact):
    output = []
    for (name, img) in imgs:
        tmp_labels = segmentation.slic(img, compactness=compact, n_segments=2, convert2lab=True, enforce_connectivity=True, max_iter=iters)
        avg_dist_0 = avg_dist_to_center(tmp_labels, 0)
        avg_dist_1 = avg_dist_to_center(tmp_labels, 1)
        color_map = ((0, 0, 0), (255,255,255)) if avg_dist_0 > avg_dist_1 else ((255,255,255), (0, 0, 0))
        tmp_segmentation = color.label2rgb(tmp_labels, colors=color_map)
        tmp_segmentation = cv2.cvtColor(tmp_segmentation.astype('uint8'), cv2.COLOR_BGR2GRAY)
        output.append((name, tmp_segmentation))
    return output

#### Calculate Predictions, Cofusion Matrix & DICE

In [0]:
predictions_cut = sorted(binary_cut_segmentation(pp_images, 10, 1), key=lambda pred: pred[0])
confusion_cut = confusion_matrix_multi(masks, predictions_cut, dictionary=True)
dice_stats_cut = DICE_stats(masks, predictions_cut)

In [0]:
print("Confusion Matrix (Cut): {}".format(confusion_cut))
print("Avg. DICE Score: {}\t Std. Dev. DICE Score: {}".format(dice_stats_cut[0], dice_stats_cut[1]))

Confusion Matrix (Cut): {'TP': 7589831.0, 'FP': 2312724.0, 'FN': 4819682.0, 'TN': 18700497.0}
Avg. DICE Score: 0.5156134653365576	 Std. Dev. DICE Score: 0.42322102157752367


#### Check Performance Across 5 Validation Sets

In [0]:
validation_cut = []
for set_num in range(0, 5):
    val_cut = predictions_cut[set_num*20:(set_num+1)*20]
    val_dice_cut = DICE_stats(masks[set_num*20:(set_num+1)*20], val_cut)
    validation_cut.append(val_dice_cut)

In [0]:
for val_num in range(len(validation_cut)):
    val_score = validation_cut[val_num]
    print("Validation Set #{}: Avg. DICE Score: {}\t Std. Dev. DICE Score: {}".format(val_num, val_score[0], val_score[1]))

Validation Set #0: Avg. DICE Score: 0.4055996399324707	 Std. Dev. DICE Score: 0.41473224016260446
Validation Set #1: Avg. DICE Score: 0.4723786405079842	 Std. Dev. DICE Score: 0.4138836503975406
Validation Set #2: Avg. DICE Score: 0.491684388922151	 Std. Dev. DICE Score: 0.39266630342027287
Validation Set #3: Avg. DICE Score: 0.6938693818229743	 Std. Dev. DICE Score: 0.40780273595249983
Validation Set #4: Avg. DICE Score: 0.5145352754972071	 Std. Dev. DICE Score: 0.43065825700322424


#### Store Predicted Masks Using GMM

In [0]:
write_imgs('cut_segmentation', predictions_cut)