# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import jubatus
import jubatus.embedded
from .base import GenericSchema, BaseDataset, BaseService, GenericConfig
from .compat import *
[docs]class Schema(GenericSchema):
"""
Schema for Anomaly service.
"""
ID = 'i'
FLAG = 'f'
[docs] def __init__(self, mapping, fallback=None):
self._id_key = self._get_unique_mapping(mapping, fallback, self.ID, 'ID', True)
self._flag_key = self._get_unique_mapping(mapping, fallback, self.FLAG, 'FLAG', True)
super(Schema, self).__init__(mapping, fallback)
[docs]class Dataset(BaseDataset):
"""
Dataset for Anomaly service.
"""
@classmethod
def _predict(cls, row):
return Schema.predict(row, False)
[docs]class Anomaly(BaseService):
"""
Anomaly service.
"""
[docs] @classmethod
def name(cls):
return 'anomaly'
@classmethod
def _client_class(cls):
return jubatus.anomaly.client.Anomaly
@classmethod
def _embedded_class(cls):
return jubatus.embedded.Anomaly
[docs] def add(self, dataset):
"""
Adds data points to the anomaly model using the given dataset and returns
LOF scores.
"""
cli = self._client()
for (idx, (row_id, row_flag, d)) in dataset:
if row_id is not None:
raise RuntimeError('ID-based datasets must use `overwrite` or `update` instead of `add`')
result = cli.add(d)
yield (idx, result.id, row_flag, result.score)
[docs] def add_bulk(self, dataset):
"""
Adds data points to the anomaly model using the given dataset and returns
a list of data point IDs.
"""
cli = self._client()
data = [d[1][2] for d in dataset]
return cli.add_bulk(data)
[docs] def update(self, dataset):
"""
Updates data points in the anomaly model using the given dataset and
returns LOF scores.
"""
cli = self._client()
for (idx, (row_id, row_flag, d)) in dataset:
if row_id is None:
raise RuntimeError('Non ID-based datasets must use `add` instead of `update`')
result = cli.update(row_id, d)
yield (idx, row_id, row_flag, result)
[docs] def overwrite(self, dataset):
"""
Overwrites data points in the anomaly model using the given dataset and
returns LOF scores.
"""
cli = self._client()
for (idx, (row_id, row_flag, d)) in dataset:
if row_id is None:
raise RuntimeError('Non ID-based datasets must use `add` instead of `overwrite`')
result = cli.overwrite(row_id, d)
yield (idx, row_id, row_flag, result)
[docs] def calc_score(self, dataset):
"""
Calculates LOF scores for the given dataset.
"""
cli = self._client()
for (idx, (row_id, row_flag, d)) in dataset:
result = cli.calc_score(d)
yield (idx, row_id, row_flag, result)
[docs]class Config(GenericConfig):
"""
Configuration to run Anomaly service.
"""
[docs] @classmethod
def methods(cls):
return ['lof', 'light_lof']
@classmethod
def _default_method(cls):
return 'lof'
@classmethod
def _default_parameter(cls, method):
params = {
'nearest_neighbor_num': 10,
'reverse_nearest_neighbor_num': 30,
'method': None,
'parameter': {},
'ignore_kth_same_point': True,
}
if method == 'lof':
params['method'] = 'inverted_index_euclid'
elif method == 'light_lof':
params['method'] = 'euclid_lsh'
params['parameter'] = {
'threads': -1, # use number of logical CPU cores
'hash_num': 64,
}
else:
raise RuntimeError('unknown method: {0}'.format(method))
return params