# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import json
import uuid
import jubatus
import jubatus.embedded
from .base import GenericSchema, BaseDataset, BaseService, GenericConfig, Utils
from .loader.array import ArrayLoader, ZipArrayLoader
from .loader.sparse import SparseMatrixLoader
from .loader.chain import ValueMapChainLoader, MergeChainLoader
from .compat import *
[ドキュメント]class Schema(GenericSchema):
"""
Schema for Clustering service.
"""
ID = 'i'
[ドキュメント] def __init__(self, mapping, fallback=None):
self._id_key = self._get_unique_mapping(mapping, fallback, self.ID, 'ID', True)
super(Schema, self).__init__(mapping, fallback)
[ドキュメント]class Dataset(BaseDataset):
"""
Dataset for Clustering service.
"""
@classmethod
def _predict(cls, row):
return Schema.predict(row, False)
@classmethod
def _from_loader(cls, data_loader, ids, static):
if ids is None:
loader = data_loader
schema = Schema({}, Schema.NUMBER)
else:
id_loader = ZipArrayLoader(_id=ids)
loader = MergeChainLoader(data_loader, id_loader)
schema = Schema({'_id': Schema.ID}, Schema.NUMBER)
return Dataset(loader, schema, static)
[ドキュメント] @classmethod
def from_data(cls, data, ids=None, feature_names=None, static=True):
"""
Converts two arrays or a sparse matrix data and its associated id array to Dataset.
Parameters
----------
data : array or scipy 2-D sparse matrix of shape [n_samples, n_features]
ids : array of shape [n_samples], optional
feature_names : array of shape [n_features], optional
"""
if hasattr(data, 'todense'):
return cls.from_matrix(data, ids, feature_names, static)
else:
return cls.from_array(data, ids, feature_names, static)
[ドキュメント] @classmethod
def from_array(cls, data, ids=None, feature_names=None, static=True):
"""
Converts two arrays (data and its associated targets) to Dataset.
Parameters
----------
data : array of shape [n_samples, n_features]
ids : array of shape [n_samples], optional
feature_names : array of shape [n_features], optional
"""
data_loader = ArrayLoader(data, feature_names)
return cls._from_loader(data_loader, ids, static)
[ドキュメント] @classmethod
def from_matrix(cls, data, ids=None, feature_names=None, static=True):
"""
Converts a sparse matrix data and its associated target array to Dataset.
Parameters
----------
data : scipy 2-D sparse matrix of shape [n_samples, n_features]
ids : array of shape [n_samples], optional
feature_names : array of shape [n_features], optional
"""
data_loader = SparseMatrixLoader(data, feature_names)
return cls._from_loader(data_loader, ids, static)
[ドキュメント] def get_ids(self):
"""
Returns labels of each record in the dataset.
"""
if not self._static:
raise RuntimeError('non-static datasets cannot fetch list of ids')
for (idx, (row_id, d)) in self:
yield row_id
[ドキュメント]class Clustering(BaseService):
"""
Clustering service.
"""
[ドキュメント] @classmethod
def name(cls):
return 'clustering'
@classmethod
def _client_class(cls):
return jubatus.clustering.client.Clustering
@classmethod
def _embedded_class(cls):
return jubatus.embedded.Clustering
[ドキュメント] def push(self, dataset):
"""
Add data points.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('each row must have `id`.')
result = cli.push([jubatus.clustering.types.IndexedPoint(row_id, d)])
yield (idx, row_id, result)
[ドキュメント] def get_revision(self):
"""
Return revision of clusters
"""
cli = self._client()
return cli.get_revision()
[ドキュメント] def get_core_members(self, light=False):
"""
Returns coreset of cluster in datum.
"""
cli = self._client()
method = self._get_method()
if light:
return cli.get_core_members_light()
else:
return cli.get_core_members()
[ドキュメント] def get_k_center(self):
"""
Return k cluster centers.
"""
cli = self._client()
method = self._get_method()
if method not in ('kmeans', 'gmm'):
raise RuntimeError('{0} is not supported'.format(method))
return cli.get_k_center()
[ドキュメント] def get_nearest_center(self, dataset):
"""
Returns nearest cluster center without adding points to cluster.
"""
cli = self._client()
method = self._get_method()
if method not in ('kmeans', 'gmm'):
raise RuntimeError('{0} is not supported'.format(method))
for (idx, (row_id, d)) in dataset:
result = cli.get_nearest_center(d)
yield (idx, row_id, result)
[ドキュメント] def get_nearest_members(self, dataset, light=False):
"""
Returns nearest summary of cluster(coreset) from each point.
"""
cli = self._client()
method = self._get_method()
if method not in ('kmeans', 'gmm'):
raise RuntimeError('{0} is not supported'.format(method))
for (idx, (row_id, d)) in dataset:
if light:
result = cli.get_nearest_members_light(d)
else:
result = cli.get_nearest_members(d)
yield (idx, row_id, result)
def _get_method(self):
method = None
if self._embedded:
config = json.loads(self._backend.model.get_config())
method = config['method']
else:
if 'method' in self._backend.config:
method = self._backend.config['method']
return method
[ドキュメント]class Config(GenericConfig):
"""
Configulation to run Clustering service.
"""
[ドキュメント] def __init__(self, method=None, parameter=None,
compressor_method=None, compressor_parameter=None,
converter=None, distance=None):
super(Config, self).__init__(method, parameter, converter)
if compressor_method is not None:
self['compressor_method'] = compressor_method
default_compressor_parameter = \
self._default_compressor_parameter(compressor_method)
if default_compressor_parameter is None:
if 'compressor_parameter' in self:
del self['compressor_parameter']
else:
self['compressor_parameter'] = default_compressor_parameter
if compressor_parameter is not None:
if 'compressor_parameter' in self:
self['compressor_parameter'].update(compressor_parameter)
else:
self['compressor_parameter'] = compressor_parameter
if distance is not None:
self['distance'] = distance
@classmethod
def _default(cls, cfg):
super(Config, cls)._default(cfg)
compressor_method = cls._default_compressor_method()
compressor_parameter = cls._default_compressor_parameter(compressor_method)
distance = cls._default_distance()
if compressor_method is not None:
cfg['compressor_method'] = compressor_method
if compressor_parameter is not None:
cfg['compressor_parameter'] = compressor_parameter
if distance is not None:
cfg['distance'] = distance
return cfg
@classmethod
def _default_method(cls):
return 'kmeans'
@classmethod
def _default_compressor_method(cls):
return 'simple'
@classmethod
def _default_distance(cls):
return 'euclidean'
@classmethod
def _default_parameter(cls, method):
if method in ('kmeans', 'gmm'):
return {
'k': 3,
'seed': 0
}
elif method in ('dbscan'):
return {
'eps': 0.2,
'min_core_point': 3
}
else:
raise RuntimeError('unknown method: {0}'.format(method))
@classmethod
def _default_compressor_parameter(cls, method):
if method in ('simple'):
return {
'bucket_size': 100
}
elif method in ('compressive'):
return {
'bucket_size': 100,
'bucket_length': 2,
'compressed_bucket_size': 100,
'bicriteria_base_size': 10,
'forgetting_factor': 0.0,
'forgetting_threshold': 0.5,
'seed': 0
}
else:
raise RuntimeError('unknown method: {0}'.format(method))
[ドキュメント] @classmethod
def methods(cls):
return ['kmeans', 'gmm', 'dbscan']
[ドキュメント] @classmethod
def compressor_methods(cls):
return ['simple', 'compressive']
[ドキュメント] @classmethod
def distances(cls):
return ['euclidean', 'cosine']