Loader開発ガイド

Jubakitには予め様々な種類のLoaderが実装されていますが、それらを拡張したり新しいLoaderを実装することもできるようになっています。

既存のLoaderの拡張

全てのLoaderには preprocess という拡張するためのポイントがあります。このメソッドをオーバーライドすることでレコードに対して様々な処理を行うことができます。preprocess メソッドの引数は、Loaderが読み込んだレコード(辞書形式)1つだけです。戻り値は処理をした辞書形式のオブジェクトか None でなければなりません。

preprocess のデフォルトの実装は以下の通り何もしません。

def preprocess(self, ent):
  return ent

必須ではありませんが、 preprocess の出力はフラットな辞書形式、つまり、valueをオブジェクトにしないことが推奨されます。

加工

例えばJSONLファイル(1行が1つのJSONレコードとなっているファイル)を扱う場合、テキストファイルを読み込み、一行ずつ出力する LineBasedFileLoader を継承したクラスを作成すると良いでしょう。そして、 preprocess メソッドを以下のように実装します。

import json
from jubakit.loader.core import LineBasedFileLoader

class JsonLLoader(LineBasedFileLoader):
  def preprocess(self, ent):
    return json.loads(ent['line'])

フィルタリング

preprocess メソッドをフィルタリングに使うこともできます。あるレコードをスキップさせるには None を返すようにすれば良いでしょう。以下のLoaderは奇数行のみ返します。

from jubakit.loader.core import LineBasedFileLoader

class OddLineLoader(LineBasedFileLoader):
  def preprocess(self, ent):
    if ent['number'] % 2 == 0:
      return None
    return ent

ウインドウ処理

Loaderは状態を持つことができるため、 preprocess メソッドをウインドウ処理に使うこともできます。 以下は x の移動平均を求めるLoaderの例です。

from jubakit.base import BaseLoader

class MovingAverageLoader(BaseLoader):
  def __init__(self, window_size, *args, **kwargs):
    self._window = []
    self._window_size = window_size
    super(MovingAverageLoader, self).__init__(*args, **kwargs)

  def preprocess(self, ent):
    # Window holds the last N records.
    self._window = self._window[-1 * self._window_size + 1:] + [float(ent['x'])]

    # At least N records must be processed.
    if len(self._window) < self._window_size: return None

    # Calculate moving average, add it as a column named `x_avg` and return it.
    ent['x_avg'] = sum(self._window) / self._window_size
    return ent

  def rows(self):
    # Dummy records.
    for x in [1, 10, 5, 8, 7, 6, 2]:
      yield {'x': x}

新しいLoaderの実装

既存のLoaderで対処できない場合には、Loaderを自分で実装することもできます。実装はとても簡単です。最低限必要なことは以下の2点です。

  • jubakit.base.BaseLoader クラスを継承すること。
  • rows メソッドを実装すること。このメソッドは辞書形式のオブジェクトを生成します。

以下は2次元の乱数のレコードを5回出力する単純なLoaderの例です。

from random import Random
from jubakit.base import BaseLoader

class RandomLoader(BaseLoader):
  def rows(self):
    r = Random()
    for i in range(5):
      yield {'x': r.random(), 'y': r.random()}

Loaderのテストは以下のように簡単にできます。

>>> loader = RandomLoader()
>>> for row in loader:
...   print(row)
...
{'y': 0.12162269633934364, 'x': 0.005440374791884306}
{'y': 0.04132353727105431, 'x': 0.12812214533765487}
{'y': 0.9734068465823698, 'x': 0.35152948844306664}
{'y': 0.12417565325498592, 'x': 0.7501678925073599}
{'y': 0.6370897206201418, 'x': 0.01709999005458307}

rows メソッドではわかりやすさのためフラットな辞書形式のオブジェクト、すなわちvalueにオブジェクトを持たない辞書形式のオブジェクトを返すようにすることが推奨されます。

Twitterストリームのように永続するLoaderを実装する場合には、is_infinite メソッドが True を返すよう実装してください。デフォルトでは Dataset を作成した際に全ての要素がメモリにロードされます。is_infiniteTrue を返す場合、または Dataset のコンストラクタの static オプションが明示的に False を指定された場合のみ、全ての要素のロードが行われません。

from random import Random
from jubakit.base import BaseLoader

class InfiniteRandomLoader(BaseLoader):
  def is_infinite(self):
    return True

  def rows(self):
    r = Random()
    while True:
      yield {'x': r.random(), 'y': r.random()}

Loaderにパラメータが必要な場合はコンストラクタを使います。

from random import Random
from jubakit.base import BaseLoader

class InfiniteRandomLoader(BaseLoader):
  def __init__(self, seed=0):
    self.seed = seed

  def is_infinite(self):
    return True

  def rows(self):
    r = Random(self.seed)
    while True:
      yield {'x': r.random(), 'y': r.random()}

汎用的にも使えそうなLoaderを作成した際には是非 Pull-Request をお送りください!