Loader開発ガイド¶
Jubakitには予め様々な種類のLoaderが実装されていますが、それらを拡張したり新しいLoaderを実装することもできるようになっています。
既存のLoaderの拡張¶
全てのLoaderには preprocess
という拡張するためのポイントがあります。このメソッドをオーバーライドすることでレコードに対して様々な処理を行うことができます。preprocess
メソッドの引数は、Loaderが読み込んだレコード(辞書形式)1つだけです。戻り値は処理をした辞書形式のオブジェクトか None
でなければなりません。
preprocess
のデフォルトの実装は以下の通り何もしません。
def preprocess(self, ent):
return ent
必須ではありませんが、 preprocess
の出力はフラットな辞書形式、つまり、valueをオブジェクトにしないことが推奨されます。
加工¶
例えばJSONLファイル(1行が1つのJSONレコードとなっているファイル)を扱う場合、テキストファイルを読み込み、一行ずつ出力する LineBasedFileLoader
を継承したクラスを作成すると良いでしょう。そして、 preprocess
メソッドを以下のように実装します。
import json
from jubakit.loader.core import LineBasedFileLoader
class JsonLLoader(LineBasedFileLoader):
def preprocess(self, ent):
return json.loads(ent['line'])
フィルタリング¶
preprocess
メソッドをフィルタリングに使うこともできます。あるレコードをスキップさせるには None
を返すようにすれば良いでしょう。以下のLoaderは奇数行のみ返します。
from jubakit.loader.core import LineBasedFileLoader
class OddLineLoader(LineBasedFileLoader):
def preprocess(self, ent):
if ent['number'] % 2 == 0:
return None
return ent
ウインドウ処理¶
Loaderは状態を持つことができるため、 preprocess
メソッドをウインドウ処理に使うこともできます。 以下は x
の移動平均を求めるLoaderの例です。
from jubakit.base import BaseLoader
class MovingAverageLoader(BaseLoader):
def __init__(self, window_size, *args, **kwargs):
self._window = []
self._window_size = window_size
super(MovingAverageLoader, self).__init__(*args, **kwargs)
def preprocess(self, ent):
# Window holds the last N records.
self._window = self._window[-1 * self._window_size + 1:] + [float(ent['x'])]
# At least N records must be processed.
if len(self._window) < self._window_size: return None
# Calculate moving average, add it as a column named `x_avg` and return it.
ent['x_avg'] = sum(self._window) / self._window_size
return ent
def rows(self):
# Dummy records.
for x in [1, 10, 5, 8, 7, 6, 2]:
yield {'x': x}
新しいLoaderの実装¶
既存のLoaderで対処できない場合には、Loaderを自分で実装することもできます。実装はとても簡単です。最低限必要なことは以下の2点です。
jubakit.base.BaseLoader
クラスを継承すること。rows
メソッドを実装すること。このメソッドは辞書形式のオブジェクトを生成します。
以下は2次元の乱数のレコードを5回出力する単純なLoaderの例です。
from random import Random
from jubakit.base import BaseLoader
class RandomLoader(BaseLoader):
def rows(self):
r = Random()
for i in range(5):
yield {'x': r.random(), 'y': r.random()}
Loaderのテストは以下のように簡単にできます。
>>> loader = RandomLoader()
>>> for row in loader:
... print(row)
...
{'y': 0.12162269633934364, 'x': 0.005440374791884306}
{'y': 0.04132353727105431, 'x': 0.12812214533765487}
{'y': 0.9734068465823698, 'x': 0.35152948844306664}
{'y': 0.12417565325498592, 'x': 0.7501678925073599}
{'y': 0.6370897206201418, 'x': 0.01709999005458307}
rows
メソッドではわかりやすさのためフラットな辞書形式のオブジェクト、すなわちvalueにオブジェクトを持たない辞書形式のオブジェクトを返すようにすることが推奨されます。
Twitterストリームのように永続するLoaderを実装する場合には、is_infinite
メソッドが True
を返すよう実装してください。デフォルトでは Dataset
を作成した際に全ての要素がメモリにロードされます。is_infinite
が True
を返す場合、または Dataset
のコンストラクタの static
オプションが明示的に False
を指定された場合のみ、全ての要素のロードが行われません。
from random import Random
from jubakit.base import BaseLoader
class InfiniteRandomLoader(BaseLoader):
def is_infinite(self):
return True
def rows(self):
r = Random()
while True:
yield {'x': r.random(), 'y': r.random()}
Loaderにパラメータが必要な場合はコンストラクタを使います。
from random import Random
from jubakit.base import BaseLoader
class InfiniteRandomLoader(BaseLoader):
def __init__(self, seed=0):
self.seed = seed
def is_infinite(self):
return True
def rows(self):
r = Random(self.seed)
while True:
yield {'x': r.random(), 'y': r.random()}
汎用的にも使えそうなLoaderを作成した際には是非 Pull-Request をお送りください!