import numpy as np
import traitlets
import vaex.ml.transformations
import logging
import vaex
import vaex.serialize
from numba import jit
from . import generate
# vaex.set_log_level_debug()
logger_km = logging.getLogger('vaex.ml.kmeans')
def Matrix(type=traitlets.CFloat):
return traitlets.List(traitlets.List(type()))
# @jit('void(double[:], double[:], double[:])', nopython=True, nogil=True)
@jit(nopython=True, nogil=True, cache=True)
def distances_square(result, centroids, *blocks):
"""
Function under test.
"""
k = centroids.shape[0]
dimensions = centroids.shape[1]
N = result.shape[1]
for i in range(k):
for d in range(dimensions):
for j in range(N):
result[i, j] += (blocks[d][j] - centroids[i][d])**2
@jit(nopython=True, nogil=True, cache=True)
def centroid_stats(centroids, counts, sumpos, inertia, *blocks):
# classes = np.argmin(distances_sq, axis=0)
# distances_sq = np.choose(classes, distances_sq)
# inertia = (distances_sq).sum()
# inertia = 0
N = blocks[0].shape[0]
runs = sumpos.shape[0]
clusters = sumpos.shape[1]
dimensions = sumpos.shape[2]
for j in range(N):
for run in range(runs):
# if done[run]: ## TODO: bug in numba? seems to crash if included
best_distance = 1e100
best_class = 10000
for cluster in range(clusters):
distance = 0
for d in range(dimensions):
distance += (centroids[run, cluster, d] - blocks[d][j])**2
if distance < best_distance:
best_class = cluster
best_distance = distance
cls = best_class # classes[j]
inertia[run] += best_distance
for d in range(dimensions):
sumpos[run, cls, d] += blocks[d][j]
counts[run, cls] += 1
# return inertia
# for i in range(k):
[docs]@vaex.serialize.register
@generate.register
class KMeans(vaex.ml.transformations.Transformer):
'''The KMeans clustering algorithm.
Example:
>>> import vaex.ml
>>> import vaex.ml.cluster
>>> df = vaex.datasets.iris()
>>> features = ['sepal_width', 'petal_length', 'sepal_length', 'petal_width']
>>> cls = vaex.ml.cluster.KMeans(n_clusters=3, features=features, init='random', max_iter=10)
>>> cls.fit(df)
>>> df = cls.transform(df)
>>> df.head(5)
# sepal_width petal_length sepal_length petal_width class_ prediction_kmeans
0 3 4.2 5.9 1.5 1 2
1 3 4.6 6.1 1.4 1 2
2 2.9 4.6 6.6 1.3 1 2
3 3.3 5.7 6.7 2.1 2 0
4 4.2 1.4 5.5 0.2 0 1
'''
snake_name = 'kmeans'
features = traitlets.List(traitlets.Unicode(), help='List of features to cluster.')
n_clusters = traitlets.CInt(default_value=2, help='Number of clusters to form.')
init = traitlets.Union([Matrix(), traitlets.Unicode()], default_value='random', help='Method for initializing the centroids.')
n_init = traitlets.CInt(default_value=1, help='Number of centroid initializations. \
The KMeans algorithm will be run for each initialization, \
and the final results will be the best output of the n_init \
consecutive runs in terms of inertia.')
max_iter = traitlets.CInt(default_value=300, help='Maximum number of iterations of the KMeans algorithm for a single run.')
random_state = traitlets.CInt(default_value=None, allow_none=True, help='Random number generation for centroid initialization. If an int is specified, the randomness becomes deterministic.')
verbose = traitlets.CBool(default_value=False, help='If True, enable verbosity mode.')
cluster_centers = traitlets.List(traitlets.List(traitlets.CFloat()), help='Coordinates of cluster centers.')
inertia = traitlets.CFloat(default_value=None, allow_none=True, help='Sum of squared distances of samples to their closest cluster center.')
prediction_label = traitlets.Unicode(default_value='prediction_kmeans', help='The name of the virtual column that houses the cluster labels for each point.')
def __call__(self, *blocks):
return self._calculate_classes(*blocks)
def _calculate_distances_squared(self, *blocks):
N = len(blocks[0]) # they are all the same length
blocks = [np.asarray(k) for k in blocks]
centroids = np.array(self.cluster_centers)
k = centroids.shape[0]
dimensions = centroids.shape[1]
distances_sq = np.zeros((k, N))
if True:
distances_square(distances_sq, centroids, *blocks)
else:
for d in range(dimensions):
for i in range(k):
distances_sq[i] += (blocks[d] - centroids[i][d])**2
return distances_sq
def _calculate_classes(self, *blocks):
distances_sq = self._calculate_distances_squared(*blocks)
classes = np.argmin(distances_sq, axis=0)
return classes
def generate_cluster_centers_random(self, dataframe, rng):
indices = rng.randint(0, len(dataframe), self.n_clusters)
return [[dataframe.evaluate(feature, i1=i, i2=i+1, array_type='python')[0] for feature in self.features] for i in indices]
[docs] def fit(self, dataframe):
'''
Fit the KMeans model to the dataframe.
:param dataframe: A vaex DataFrame.
'''
if self.init == 'random':
rng = np.random.RandomState(self.random_state)
self.run_cluster_centers = [self.generate_cluster_centers_random(dataframe, rng) for k in range(self.n_init)]
else:
if self.n_init > 1:
print("WARNING: n_init > 1 , but init given, only doing one run")
self.run_cluster_centers = [self.init]
done = [False] * len(self.run_cluster_centers)
alldone = False
first = True
previous_inertias = None
iteration = 0
inertias_list = []
while not alldone:
new_centers, inertias = self._find_centers_and_inertias(dataframe, done)
if self.verbose:
inertia_msges = [('--' if d else '{: 3}'.format(k)) for k, d in zip(inertias, done)]
if len(inertias) == 1:
inertia_msg = inertia_msges[0]
else:
inertia_msg = " | ".join(inertia_msges)
print('Iteration {: 4}, inertia {}'.format(iteration, inertia_msg))
if not first: # we can only to a check after the second iteration
done = self._is_done(previous_inertias, inertias)
alldone = np.all(done)
else:
first = False
iteration += 1
if (iteration >= self.max_iter):
logger_km.debug('reached max iterations: %s', self.max_iter)
alldone = True
previous_inertias = inertias
inertias_list.append(inertias)
self.run_cluster_centers = new_centers
best_run = np.argmin(inertias)
self.inertia = inertias[best_run]
self.inertias = np.array(inertias_list)[:, best_run]
self.cluster_centers = new_centers[best_run]
def _is_done(self, inertias1, inertias2):
diffs = [(inertia1 - inertia2) for inertia1, inertia2 in zip(inertias1, inertias2)]
return [(diff < 1e-3) for diff in diffs]
def _find_centers_and_inertias(self, dataframe, done):
done = np.array(done, dtype=np.int8)
centroids = np.array(self.run_cluster_centers)
runs = centroids.shape[0]
clusters = centroids.shape[1]
dimensions = centroids.shape[2]
# print("k =", k)
assert dimensions == len(self.features), "The number of dimensions for the centroid should equal the number of features."
def map(*blocks): # this will be called with a chunk of the data
sumpos = np.zeros((runs, clusters, dimensions))
counts = np.zeros((runs, clusters))
inertia = np.zeros((runs))
blocks = [np.asarray(k) for k in blocks]
if True:
centroid_stats(centroids, counts, sumpos, inertia, *blocks)
else:
# this is the pure python code
# although not made for multiple runs yet
distances_sq = self._calculate_distances_squared(*blocks)
classes = np.argmin(distances_sq, axis=0)
distances_sq = np.choose(classes, distances_sq)
inertia = (distances_sq).sum()
for i in range(k):
mask = classes == i
counts[i] = np.sum(mask)
for d in range(dimensions):
sumpos[i, d] += blocks[d][mask].sum()
return sumpos, counts, inertia
def reduce(x, y):
sumpos1, counts1, inertia1 = x
sumpos2, counts2, inertia2 = y
return sumpos1 + sumpos2, counts1 + counts2, inertia1 + inertia2
sumpos, counts, inertia = dataframe.map_reduce(map, reduce, self.features)
means = (sumpos/counts[:, :, np.newaxis])
return means.tolist(), inertia