Loader Development Guide

Although Jubakit provides various kinds of built-in Loaders, you can also extend existing Loader or even develop your own one to suit your needs.

Extending Existing Loader

All Loaders have an extension point called preprocess, which is a method you can override and perform operations on each record from the Loader. preprocess method takes only 1 argument, which is a single record loaded (dict-like object). The method must return the preprocessed dict-like object or None.

The default implementation of preprocess method is as follows; it does nothing.

def preprocess(self, ent):
  return ent

Although not mandatory, the output of preprocess method should be a flat dict-like object, i.e., values should not be objects.

Transformation

For example, if you want to process JSONL files (files that contain one JSON record per line), you can create a class that inherits from LineBasedFileLoader which loads a single text file and emits record for each line, and implement preprocess method as follows:

import json
from jubakit.loader.core import LineBasedFileLoader

class JsonLLoader(LineBasedFileLoader):
  def preprocess(self, ent):
    return json.loads(ent['line'])

Filtering

You can also use preprocess method for filtering. If you want to skip the record, just return None. The following Loader loads lines whose line number is odd.

from jubakit.loader.core import LineBasedFileLoader

class OddLineLoader(LineBasedFileLoader):
  def preprocess(self, ent):
    if ent['number'] % 2 == 0:
      return None
    return ent

Window Processing

As Loaders can be stateful, preprocess method can also be used for window processing. Here is an example of Loader that calculates moving average over x.

from jubakit.base import BaseLoader

class MovingAverageLoader(BaseLoader):
  def __init__(self, window_size, *args, **kwargs):
    self._window = []
    self._window_size = window_size
    super(MovingAverageLoader, self).__init__(*args, **kwargs)

  def preprocess(self, ent):
    # Window holds the last N records.
    self._window = self._window[-1 * self._window_size + 1:] + [float(ent['x'])]

    # At least N records must be processed.
    if len(self._window) < self._window_size: return None

    # Calculate moving average, add it as a column named `x_avg` and return it.
    ent['x_avg'] = sum(self._window) / self._window_size
    return ent

  def rows(self):
    # Dummy records.
    for x in [1, 10, 5, 8, 7, 6, 2]:
      yield {'x': x}

Implementing New Loader

If none of the existing Loaders work for you, create your own Loader from scratch. It is quite simple – the minimum requirements for Loader classes are:

  • Loaders must inherit from jubakit.base.BaseLoader class.
  • Loaders must implement rows method, which yields a dict object.

Here is a simple example of a Loader, which emits 2-dimensional random number records for 5 times.

from random import Random
from jubakit.base import BaseLoader

class RandomLoader(BaseLoader):
  def rows(self):
    r = Random()
    for i in range(5):
      yield {'x': r.random(), 'y': r.random()}

Loaders can easily be tested as follows.

>>> loader = RandomLoader()
>>> for row in loader:
...   print(row)
...
{'y': 0.12162269633934364, 'x': 0.005440374791884306}
{'y': 0.04132353727105431, 'x': 0.12812214533765487}
{'y': 0.9734068465823698, 'x': 0.35152948844306664}
{'y': 0.12417565325498592, 'x': 0.7501678925073599}
{'y': 0.6370897206201418, 'x': 0.01709999005458307}

It is advised to emit flat dict-like object (i.e., no objects in values) in rows method to avoid confusion.

If you are developing infinite Loader (e.g., Twitter streams), it should implement is_infinite method and return True. Please note that all entries are loaded from Loader to memory when creating Dataset by default, unless is_infinite returns True (or static option of Dataset constructor is explicitly set to False).

from random import Random
from jubakit.base import BaseLoader

class InfiniteRandomLoader(BaseLoader):
  def is_infinite(self):
    return True

  def rows(self):
    r = Random()
    while True:
      yield {'x': r.random(), 'y': r.random()}

Now you need a parameter for your Loader? You can use a constructor.

from random import Random
from jubakit.base import BaseLoader

class InfiniteRandomLoader(BaseLoader):
  def __init__(self, seed=0):
    self.seed = seed

  def is_infinite(self):
    return True

  def rows(self):
    r = Random(self.seed)
    while True:
      yield {'x': r.random(), 'y': r.random()}

If you wrote a Loader that can be commonly used, please consider submitting Pull-Request to make the Loader as a part of Jubakit!