Source code for jubakit.wrapper.clustering

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals

import numpy as np
from sklearn.base import BaseEstimator, ClusterMixin, TransformerMixin
from ..clustering import Clustering, Config, Dataset


[docs]class BaseJubatusClustering(BaseEstimator, ClusterMixin): """ scikit-learn Wrapper for Jubatus Clustering. """
[docs] def __init__(self, compressor_method='simple', bucket_size=100, compressed_bucket_size=100, bicriteria_base_size=10, bucket_length=2, forgetting_factor=0.0, forgetting_threshold=0.5, seed=0, embedded=True, distance='euclidean'): """ Creates a base class for Jubatus Clustering """ self.compressor_method = compressor_method self.bucket_size = bucket_size self.compressed_bucket_size = compressed_bucket_size self.bicriteria_base_size = bicriteria_base_size self.bucket_length = bucket_length self.forgetting_factor = forgetting_factor self.forgetting_threshold = forgetting_threshold self.seed = seed self.embedded = embedded self.distance = distance self.compressor_parameter = \ self._make_compressor_parameter(self.compressor_method) self.fitted = False
def _launch_clustering(self): """ Launch Jubatus Clustering """ raise NotImplementedError() def _make_compressor_parameter(self, compressor_method): if compressor_method == 'simple': return { 'bucket_size': self.bucket_size, } elif compressor_method == 'compressive': return { 'bucket_size': self.bucket_size, 'compressed_bucket_size': self.compressed_bucket_size, 'bicriteria_base_size': self.bicriteria_base_size, 'bucket_length': self.bucket_length, 'forgetting_factor': self.forgetting_factor, 'forgetting_threshold': self.forgetting_threshold, 'seed': self.seed } else: raise NotImplementedError()
[docs] def fit_predict(self, X, y=None): """ Construct clustering model and Predict the closest cluster each sample in X belongs to. """ ids = list(range(len(X))) dataset = Dataset.from_data(X, ids=ids) self._launch_clustering() self.clustering_.clear() for _ in self.clustering_.push(dataset): pass self.fitted = True clusters = self.clustering_.get_core_members(light=True) labels = ['None'] * len(ids) for cluster_id, cluster in enumerate(clusters): for point in cluster: labels[int(point.id)] = cluster_id return labels
[docs] def stop(self): self.clustering_.stop()
[docs] def clear(self): self.clustering_.clear()
[docs]class BaseKFixedClustering(BaseJubatusClustering):
[docs] def __init__(self, k=2, compressor_method='simple', bucket_size=100, compressed_bucket_size=100, bicriteria_base_size=10, bucket_length=2, forgetting_factor=0.0, forgetting_threshold=0.5, seed=0, embedded=True, distance='euclidean'): super(BaseKFixedClustering, self).__init__( compressor_method, bucket_size, compressed_bucket_size, bicriteria_base_size, bucket_length, forgetting_factor, forgetting_threshold, seed, embedded, distance) self.k = k
def _method(self): raise NotImplementedError() def _launch_clustering(self): self.method = self._method() self.parameter = { 'k': self.k, 'seed': self.seed } self.config_ = Config(method=self.method, parameter=self.parameter, compressor_method=self.compressor_method, compressor_parameter=self.compressor_parameter, distance=self.distance) self.clustering_ = Clustering.run(config=self.config_, embedded=self.embedded)
[docs] def fit(self, X, y=None): """ Construct clustering model. """ if len(X) < self.k: raise RuntimeWarning("At least k={0} points are needed \ but {1} points given".format(self.k, len(X))) dataset = Dataset.from_data(X) self._launch_clustering() self.clustering_.clear() for _ in self.clustering_.push(dataset): pass self.fitted = True return self
[docs] def predict(self, X): """ Predict the closest cluster each sample in X belongs to. """ if not self.fitted: raise RuntimeError("clustering model not fitted yet.") dataset = Dataset.from_data(X) y_pred = [] mappings = {} count = 0 for idx, row_id, result in self.clustering_.get_nearest_center(dataset): if result not in mappings: mappings[result] = count y_pred.append(count) count += 1 else: y_pred.append(mappings[result]) return y_pred
[docs]class KMeans(BaseKFixedClustering): def _method(self): return 'kmeans'
[docs]class GMM(BaseKFixedClustering): def _method(self): return 'gmm'
[docs]class DBSCAN(BaseJubatusClustering):
[docs] def __init__(self, eps=0.2, min_core_point=3, bucket_size=100, compressed_bucket_size=100, bicriteria_base_size=10, bucket_length=2, forgetting_factor=0.0, forgetting_threshold=0.5, seed=0, embedded=True, distance='euclidean'): super(DBSCAN, self).__init__('simple', bucket_size, compressed_bucket_size, bicriteria_base_size, bucket_length, forgetting_factor, forgetting_threshold, seed, embedded, distance) self.eps = eps self.min_core_point = min_core_point
def _launch_clustering(self): self.method = 'dbscan' self.parameter = { 'eps': self.eps, 'min_core_point': self.min_core_point } self.config_ = Config(method=self.method, parameter=self.parameter, compressor_method=self.compressor_method, compressor_parameter=self.compressor_parameter, distance=self.distance) self.clustering_ = Clustering.run(config=self.config_, embedded=self.embedded)