2.3. KMeans with Reduce
KMeans is machine-learning algorithm (NP-hard), popularly employed for cluster analysis in data mining, and interesting for benchmarking and performance evaluation.
The objective of the Kmeans algorithm to group a set of multidimensional points into a predefined number of clusters, in which each point belongs to the closest cluster (with the nearest mean distance), in an iterative process.
[1]:
import pycompss.interactive as ipycompss
[2]:
import os
if 'BINDER_SERVICE_HOST' in os.environ:
ipycompss.start(graph=True, # trace=True
project_xml='../xml/project.xml',
resources_xml='../xml/resources.xml')
else:
ipycompss.start(graph=True, monitor=1000) # trace=True
********************************************************
**************** PyCOMPSs Interactive ******************
********************************************************
* .-~~-.--. ______ ______ *
* : ) |____ \ |____ \ *
* .~ ~ -.\ /.- ~~ . __) | __) | *
* > `. .' < |__ | |__ | *
* ( .- -. ) ____) | _ ____) | *
* `- -.-~ `- -' ~-.- -' |______/ |_| |______/ *
* ( : ) _ _ .-: *
* ~--. : .--~ .-~ .-~ } *
* ~-.-^-.-~ \_ .~ .-~ .~ *
* \ \ ' \ '_ _ -~ *
* \`.\`. // *
* . - ~ ~-.__\`.\`-.// *
* .-~ . - ~ }~ ~ ~-.~-. *
* .' .-~ .-~ :/~-.~-./: *
* /_~_ _ . - ~ ~-.~-._ *
* ~-.< *
********************************************************
* - Starting COMPSs runtime... *
* - Log path : /home/user/.COMPSs/Interactive_17/
* - PyCOMPSs Runtime started... Have fun! *
********************************************************
[3]:
from pycompss.api.task import task
[4]:
import numpy as np
[5]:
def init_random(numV, dim, seed):
np.random.seed(seed)
c = [np.random.uniform(-3.5, 3.5, dim)]
while len(c) < numV:
p = np.random.uniform(-3.5, 3.5, dim)
distance = [np.linalg.norm(p-i) for i in c]
if min(distance) > 2:
c.append(p)
return c
[6]:
#@task(returns=list) # Not a task for plotting
def genFragment(numV, K, c, dim, mode='gauss'):
if mode == "gauss":
n = int(float(numV) / K)
r = numV % K
data = []
for k in range(K):
s = np.random.uniform(0.05, 0.75)
for i in range(n+r):
d = np.array([np.random.normal(c[k][j], s) for j in range(dim)])
data.append(d)
return np.array(data)[:numV]
else:
return [np.random.random(dim) for _ in range(numV)]
[7]:
@task(returns=dict)
def cluster_points_partial(XP, mu, ind):
dic = {}
for x in enumerate(XP):
bestmukey = min([(i[0], np.linalg.norm(x[1] - mu[i[0]])) for i in enumerate(mu)], key=lambda t: t[1])[0]
if bestmukey not in dic:
dic[bestmukey] = [x[0] + ind]
else:
dic[bestmukey].append(x[0] + ind)
return dic
[8]:
@task(returns=dict)
def partial_sum(XP, clusters, ind):
p = [(i, [(XP[j - ind]) for j in clusters[i]]) for i in clusters]
dic = {}
for i, l in p:
dic[i] = (len(l), np.sum(l, axis=0))
return dic
[9]:
def reduceCenters(a, b):
"""
Reduce method to sum the result of two partial_sum methods
:param a: partial_sum {cluster_ind: (#points_a, sum(points_a))}
:param b: partial_sum {cluster_ind: (#points_b, sum(points_b))}
:return: {cluster_ind: (#points_a+#points_b, sum(points_a+points_b))}
"""
for key in b:
if key not in a:
a[key] = b[key]
else:
a[key] = (a[key][0] + b[key][0], a[key][1] + b[key][1])
return a
[10]:
@task(returns=dict)
def reduceCentersTask(*data):
reduce_value = data[0]
for i in range(1, len(data)):
reduce_value = reduceCenters(reduce_value, data[i])
return reduce_value
[11]:
def mergeReduce(function, data, chunk=50):
""" Apply function cumulatively to the items of data,
from left to right in binary tree structure, so as to
reduce the data to a single value.
:param function: function to apply to reduce data
:param data: List of items to be reduced
:return: result of reduce the data to a single value
"""
while(len(data)) > 1:
dataToReduce = data[:chunk]
data = data[chunk:]
data.append(function(*dataToReduce))
return data[0]
[12]:
def has_converged(mu, oldmu, epsilon, iter, maxIterations):
print("iter: " + str(iter))
print("maxIterations: " + str(maxIterations))
if oldmu != []:
if iter < maxIterations:
aux = [np.linalg.norm(oldmu[i] - mu[i]) for i in range(len(mu))]
distancia = sum(aux)
if distancia < epsilon * epsilon:
print("Distance_T: " + str(distancia))
return True
else:
print("Distance_F: " + str(distancia))
return False
else:
# Reached the max amount of iterations
return True
[13]:
def plotKMEANS(dim, mu, clusters, data):
import pylab as plt
colors = ['b','g','r','c','m','y','k']
if dim == 2 and len(mu) <= len(colors):
from matplotlib.patches import Circle
from matplotlib.collections import PatchCollection
fig, ax = plt.subplots(figsize=(10,10))
patches = []
pcolors = []
for i in range(len(clusters)):
for key in clusters[i].keys():
d = clusters[i][key]
for j in d:
j = j - i * len(data[0])
C = Circle((data[i][j][0], data[i][j][1]), .05)
pcolors.append(colors[key])
patches.append(C)
collection = PatchCollection(patches)
collection.set_facecolor(pcolors)
ax.add_collection(collection)
x, y = zip(*mu)
plt.plot(x, y, '*', c='y', markersize=20)
plt.autoscale(enable=True, axis='both', tight=False)
plt.show()
elif dim == 3 and len(mu) <= len(colors):
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for i in range(len(clusters)):
for key in clusters[i].keys():
d = clusters[i][key]
for j in d:
j = j - i * len(data[0])
ax.scatter(data[i][j][0], data[i][j][1], data[i][j][2], 'o', c=colors[key])
x, y, z = zip(*mu)
for i in range(len(mu)):
ax.scatter(x[i], y[i], z[i], s=80, c='y', marker='D')
plt.show()
else:
print("No representable dim or not enough colours")
MAIN
Parameters (that can be configured in the following cell): * numV: number of vectors (default: 10.000)
* dim: dimension of the points (default: 2) * k: number of centers (default: 4) * numFrag: number of fragments (default: 16) * epsilon: convergence condition (default: 1e-10) * maxIterations: Maximum number of iterations (default: 20)
[14]:
%matplotlib inline
import ipywidgets as widgets
from pycompss.api.api import compss_wait_on
w_numV = widgets.IntText(value=10000) # Number of Vectors - with 1000 it is feasible to see the evolution across iterations
w_dim = widgets.IntText(value=2) # Number of Dimensions
w_k = widgets.IntText(value=4) # Centers
w_numFrag = widgets.IntText(value=16) # Fragments
w_epsilon = widgets.FloatText(value=1e-10) # Convergence condition
w_maxIterations = widgets.IntText(value=20) # Max number of iterations
w_seed = widgets.IntText(value=8) # Random seed
def kmeans(numV, dim, k, numFrag, epsilon, maxIterations, seed):
size = int(numV / numFrag)
cloudCenters = init_random(k, dim, seed) # centers to create data groups
X = [genFragment(size, k, cloudCenters, dim, mode='gauss') for _ in range(numFrag)]
mu = init_random(k, dim, seed - 1) # First centers
oldmu = []
n = 0
while not has_converged(mu, oldmu, epsilon, n, maxIterations):
oldmu = mu
clusters = [cluster_points_partial(X[f], mu, f * size) for f in range(numFrag)]
partialResult = [partial_sum(X[f], clusters[f], f * size) for f in range(numFrag)]
mu = mergeReduce(reduceCentersTask, partialResult, chunk=4)
mu = compss_wait_on(mu)
mu = [mu[c][1] / mu[c][0] for c in mu]
while len(mu) < k:
# Add new random center if one of the centers has no points.
indP = np.random.randint(0, size)
indF = np.random.randint(0, numFrag)
mu.append(X[indF][indP])
n += 1
clusters = compss_wait_on(clusters)
plotKMEANS(dim, mu, clusters, X)
print("--------------------")
print("Result:")
print("Iterations: ", n)
print("Centers: ", mu)
print("--------------------")
widgets.interact_manual(kmeans, numV=w_numV, dim=w_dim, k=w_k, numFrag=w_numFrag, epsilon=w_epsilon, maxIterations=w_maxIterations, seed=w_seed)
[14]:
<function __main__.kmeans(numV, dim, k, numFrag, epsilon, maxIterations, seed)>
[15]:
ipycompss.stop()
********************************************************
***************** STOPPING PyCOMPSs ********************
********************************************************
Checking if any issue happened.
Warning: some of the variables used with PyCOMPSs may
have not been brought to the master.
********************************************************
[16]:
ipycompss.complete_task_graph(fit=True)