2.3. KMeans with Reduce

KMeans is machine-learning algorithm (NP-hard), popularly employed for cluster analysis in data mining, and interesting for benchmarking and performance evaluation.

The objective of the Kmeans algorithm to group a set of multidimensional points into a predefined number of clusters, in which each point belongs to the closest cluster (with the nearest mean distance), in an iterative process.

[1]:
import pycompss.interactive as ipycompss
[2]:
import os
if 'BINDER_SERVICE_HOST' in os.environ:
    ipycompss.start(graph=True,                # trace=True
                    project_xml='../xml/project.xml',
                    resources_xml='../xml/resources.xml')
else:
    ipycompss.start(graph=True, monitor=1000)  # trace=True
******************************************************
*************** PyCOMPSs Interactive *****************
******************************************************
*          .-~~-.--.           _____       ________  *
*         :         )         |____ \     |____   /  *
*   .~ ~ -.\       /.- ~~ .     ___) |        /  /   *
*   >       `.   .'       <    / ___/        /  /    *
*  (         .- -.         )  | |___   _    /  /     *
*   `- -.-~  `- -'  ~-.- -'   |_____| |_|  /__/      *
*     (        :        )           _ _ .-:          *
*      ~--.    :    .--~        .-~  .-~  }          *
*          ~-.-^-.-~ \_      .~  .-~   .~            *
*                   \ \ '     \ '_ _ -~              *
*                    \`.\`.    //                    *
*           . - ~ ~-.__\`.\`-.//                     *
*       .-~   . - ~  }~ ~ ~-.~-.                     *
*     .' .-~      .-~       :/~-.~-./:               *
*    /_~_ _ . - ~                 ~-.~-._            *
*                                     ~-.<           *
******************************************************
* - Starting COMPSs runtime...                       *
* - Log path : /home/user/.COMPSs/Interactive_01/
* - PyCOMPSs Runtime started... Have fun!            *
******************************************************
[3]:
from pycompss.api.task import task
[4]:
import numpy as np
[5]:
def init_random(numV, dim, seed):
    np.random.seed(seed)
    c = [np.random.uniform(-3.5, 3.5, dim)]
    while len(c) < numV:
        p = np.random.uniform(-3.5, 3.5, dim)
        distance = [np.linalg.norm(p-i) for i in c]
        if min(distance) > 2:
            c.append(p)
    return c
[6]:
#@task(returns=list)  # Not a task for plotting
def genFragment(numV, K, c, dim, mode='gauss'):
    if mode == "gauss":
        n = int(float(numV) / K)
        r = numV % K
        data = []
        for k in range(K):
            s = np.random.uniform(0.05, 0.75)
            for i in range(n+r):
                d = np.array([np.random.normal(c[k][j], s) for j in range(dim)])
                data.append(d)
        return np.array(data)[:numV]
    else:
        return [np.random.random(dim) for _ in range(numV)]
[7]:
@task(returns=dict)
def cluster_points_partial(XP, mu, ind):
    dic = {}
    for x in enumerate(XP):
        bestmukey = min([(i[0], np.linalg.norm(x[1] - mu[i[0]])) for i in enumerate(mu)], key=lambda t: t[1])[0]
        if bestmukey not in dic:
            dic[bestmukey] = [x[0] + ind]
        else:
            dic[bestmukey].append(x[0] + ind)
    return dic
Found task: cluster_points_partial
[8]:
@task(returns=dict)
def partial_sum(XP, clusters, ind):
    p = [(i, [(XP[j - ind]) for j in clusters[i]]) for i in clusters]
    dic = {}
    for i, l in p:
        dic[i] = (len(l), np.sum(l, axis=0))
    return dic
Found task: partial_sum
[9]:
def reduceCenters(a, b):
    """
    Reduce method to sum the result of two partial_sum methods
    :param a: partial_sum {cluster_ind: (#points_a, sum(points_a))}
    :param b: partial_sum {cluster_ind: (#points_b, sum(points_b))}
    :return: {cluster_ind: (#points_a+#points_b, sum(points_a+points_b))}
    """
    for key in b:
        if key not in a:
            a[key] = b[key]
        else:
            a[key] = (a[key][0] + b[key][0], a[key][1] + b[key][1])
    return a
[10]:
@task(returns=dict)
def reduceCentersTask(*data):
    reduce_value = data[0]
    for i in range(1, len(data)):
        reduce_value = reduceCenters(reduce_value, data[i])
    return reduce_value
Found task: reduceCentersTask
[11]:
def mergeReduce(function, data, chunk=50):
    """ Apply function cumulatively to the items of data,
        from left to right in binary tree structure, so as to
        reduce the data to a single value.
    :param function: function to apply to reduce data
    :param data: List of items to be reduced
    :return: result of reduce the data to a single value
    """
    while(len(data)) > 1:
        dataToReduce = data[:chunk]
        data = data[chunk:]
        data.append(function(*dataToReduce))
    return data[0]
[12]:
def has_converged(mu, oldmu, epsilon, iter, maxIterations):
    print("iter: " + str(iter))
    print("maxIterations: " + str(maxIterations))
    if oldmu != []:
        if iter < maxIterations:
            aux = [np.linalg.norm(oldmu[i] - mu[i]) for i in range(len(mu))]
            distancia = sum(aux)
            if distancia < epsilon * epsilon:
                print("Distance_T: " + str(distancia))
                return True
            else:
                print("Distance_F: " + str(distancia))
                return False
        else:
            # Reached the max amount of iterations
            return True
[13]:
def plotKMEANS(dim, mu, clusters, data):
    import pylab as plt
    colors = ['b','g','r','c','m','y','k']
    if dim == 2 and len(mu) <= len(colors):
        from matplotlib.patches import Circle
        from matplotlib.collections import PatchCollection
        fig, ax = plt.subplots(figsize=(10,10))
        patches = []
        pcolors = []
        for i in range(len(clusters)):
            for key in clusters[i].keys():
                d = clusters[i][key]
                for j in d:
                    j = j - i * len(data[0])
                    C = Circle((data[i][j][0], data[i][j][1]), .05)
                    pcolors.append(colors[key])
                    patches.append(C)
        collection = PatchCollection(patches)
        collection.set_facecolor(pcolors)
        ax.add_collection(collection)
        x, y = zip(*mu)
        plt.plot(x, y, '*', c='y', markersize=20)
        plt.autoscale(enable=True, axis='both', tight=False)
        plt.show()
    elif dim == 3 and len(mu) <= len(colors):
        from mpl_toolkits.mplot3d import Axes3D
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')
        for i in range(len(clusters)):
            for key in clusters[i].keys():
                d = clusters[i][key]
                for j in d:
                    j = j - i * len(data[0])
                    ax.scatter(data[i][j][0], data[i][j][1], data[i][j][2], 'o', c=colors[key])
        x, y, z = zip(*mu)
        for i in range(len(mu)):
            ax.scatter(x[i], y[i], z[i], s=80, c='y', marker='D')
        plt.show()
    else:
        print("No representable dim or not enough colours")

MAIN

Parameters (that can be configured in the following cell): * numV: number of vectors (default: 10.000)
* dim: dimension of the points (default: 2) * k: number of centers (default: 4) * numFrag: number of fragments (default: 16) * epsilon: convergence condition (default: 1e-10) * maxIterations: Maximum number of iterations (default: 20)
[14]:
%matplotlib inline
import ipywidgets as widgets
from pycompss.api.api import compss_wait_on

w_numV = widgets.IntText(value=10000)        # Number of Vectors - with 1000 it is feasible to see the evolution across iterations
w_dim = widgets.IntText(value=2)             # Number of Dimensions
w_k = widgets.IntText(value=4)               # Centers
w_numFrag = widgets.IntText(value=16)        # Fragments
w_epsilon = widgets.FloatText(value=1e-10)   # Convergence condition
w_maxIterations = widgets.IntText(value=20)  # Max number of iterations
w_seed = widgets.IntText(value=8)            # Random seed

def kmeans(numV, dim, k, numFrag, epsilon, maxIterations, seed):
    size = int(numV / numFrag)
    cloudCenters = init_random(k, dim, seed) # centers to create data groups
    X = [genFragment(size, k, cloudCenters, dim, mode='gauss') for _ in range(numFrag)]
    mu = init_random(k, dim, seed - 1)       # First centers
    oldmu = []
    n = 0
    while not has_converged(mu, oldmu, epsilon, n, maxIterations):
        oldmu = mu
        clusters = [cluster_points_partial(X[f], mu, f * size) for f in range(numFrag)]
        partialResult = [partial_sum(X[f], clusters[f], f * size) for f in range(numFrag)]
        mu = mergeReduce(reduceCentersTask, partialResult, chunk=4)
        mu = compss_wait_on(mu)
        mu = [mu[c][1] / mu[c][0] for c in mu]
        while len(mu) < k:
            # Add new random center if one of the centers has no points.
            indP = np.random.randint(0, size)
            indF = np.random.randint(0, numFrag)
            mu.append(X[indF][indP])
        n += 1
    clusters = compss_wait_on(clusters)
    plotKMEANS(dim, mu, clusters, X)
    print("--------------------")
    print("Result:")
    print("Iterations: ", n)
    print("Centers: ", mu)
    print("--------------------")

widgets.interact_manual(kmeans, numV=w_numV, dim=w_dim, k=w_k, numFrag=w_numFrag, epsilon=w_epsilon, maxIterations=w_maxIterations, seed=w_seed)
[14]:
<function __main__.kmeans>
[15]:
ipycompss.stop()
****************************************************
*************** STOPPING PyCOMPSs ******************
****************************************************
Warning: some of the variables used with PyCOMPSs may
         have not been brought to the master.
****************************************************