Source code for lightonml.encoding.base

# -*- coding: utf8
"""Encoders

These modules contains implementations of Encoders that can transform data
in the binary `uint8` format required by the OPU. Compatible with numpy.ndarray
and torch.Tensor.
"""
import numexpr as ne
import numpy as np


def _tensor_to_array(X):
    is_tensor = type(X).__name__ == 'Tensor'
    X = X.numpy() if is_tensor else X
    return X, is_tensor


def _array_to_tensor(X, is_tensor):
    if is_tensor:
        import torch
        return torch.from_numpy(X)
    else:
        return X


# noinspection PyPep8Naming
[docs]class SeparatedBitPlanEncoder: """Implements an encoding that works by separating bitplans. ``n_bits + starting_bit`` must be lower than the bitwidth of data that are going to be fed to the encoder. E.g. if ``X.dtype`` is ``uint8``, then ``n_bits + starting_bit`` must be lower than 8. If instead ``X.dtype`` is ``uint32``, then ``n_bits + starting_bit`` must be lower than 32. Read more in the Examples section. Parameters ---------- n_bits: int, defaults to 8, number of bits to keep during the encoding. Must be positive. starting_bit: int, defaults to 0, bit used to start the encoding, previous bits will be thrown away. Must be positive. Attributes ---------- n_bits: int, number of bits to keep during the encoding. starting_bit: int, bit used to start the encoding, previous bits will be thrown away. """ def __init__(self, n_bits=8, starting_bit=0): if n_bits <= 0: raise ValueError('n_bits must be a positive integer.') if starting_bit < 0: raise ValueError('starting_bit must be 0 or a positive integer.') self.n_bits = n_bits self.starting_bit = starting_bit
[docs] def fit(self, X, y=None): """No-op. This method doesn't do anything. It exists purely for compatibility with the scikit-learn transformer API. Parameters ---------- X : 2D np.ndarray or torch.Tensor y: 1D np.ndarray or torch.Tensor Returns ------- self : SepartatedBitPlanEncoder """ return self
[docs] def transform(self, X): """Performs the encoding. Parameters ---------- X : 2D np.ndarray or torch.Tensor of uint8, 16, 32 or 64 [n_samples, n_features], input data to encode. Returns ------- X_enc: 2D np.ndarray or torch.Tensor of uint8 [n_samples*n_bits, n_features] encoded input data. """ X, is_tensor = _tensor_to_array(X) bitwidth = X.dtype.itemsize*8 if self.n_bits+self.starting_bit > bitwidth: raise ValueError('n_bits + starting_bit is greater than bitwidth of input data: ' '{}+{} > {}'.format(self.n_bits, self.starting_bit, bitwidth)) n_samples, n_features = X.shape # add a dimension [n_samples, n_features, 1] and returns a view of the data as uint8 X_uint8 = np.expand_dims(X, axis=2).view(np.uint8) # Unpacks the bits along the auxiliary axis X_uint8_unpacked = np.unpackbits(X_uint8, axis=2) # Reverse the order of bits: from LSB to MSB X_uint8_reversed = np.flip(X_uint8_unpacked, axis=2) X_enc = np.transpose(X_uint8_reversed, [0, 2, 1]) shape = (n_samples * self.n_bits, n_features) X_enc = X_enc[:, self.starting_bit:self.n_bits + self.starting_bit, :].reshape(shape) return _array_to_tensor(X_enc, is_tensor)
# noinspection PyPep8Naming
[docs]class MixingBitPlanDecoder: """Implements a decoding that works by mixing bitplanes. ``n_bits`` MUST be the same value used in SeparatedBitPlanEncoder. Read more in the Examples section. Parameters ---------- n_bits: int, defaults to 8, number of bits used during the encoding. decoding_decay: float, defaults to 0.5, decay to apply to the bits during the decoding. Attributes ---------- n_bits: int, number of bits used during the encoding. decoding_decay: float, defaults to 0.5, decay to apply to the bits during the decoding. """ def __init__(self, n_bits=8, decoding_decay=0.5): self.n_bits = n_bits self.decoding_decay = decoding_decay
[docs] def fit(self, X, y=None): """No-op. This method doesn't do anything. It exists purely for compatibility with the scikit-learn transformer API. Parameters ---------- X : np.ndarray y: np.ndarray, optional, defaults to None. Returns ------- self : MixingBitPlanDecoder """ return self
[docs] def transform(self, X, y=None): """Performs the decoding. Parameters ---------- X : 2D np.ndarray of uint8 or uint16, input data to decode. Returns ------- X_dec : 2D np.ndarray of floats decoded data. """ X, is_tensor = _tensor_to_array(X) n_out, n_features = X.shape n_dim_0 = int(n_out / self.n_bits) if n_dim_0 * self.n_bits * n_features != X.size: raise ValueError('Check that you used the same number of bits in encoder and decoder.') X = np.reshape(X, (n_dim_0, self.n_bits, n_features)) # compute factors for each bit to weight their significance decay_factors = (self.decoding_decay ** np.arange(self.n_bits)).astype('float32') if self.n_bits < 16: d = {'X' + str(i): X[:, i] for i in range(self.n_bits)} d.update({'decay_factors' + str(i): decay_factors[i] for i in range(self.n_bits)}) eval_str = ' + '.join(['X' + str(i) + '*' + 'decay_factors' + str(i) for i in range(self.n_bits)]) X_dec = ne.evaluate(eval_str, d) else: # fallback to slower version if n_bits > 15 because of https://github.com/lightonai/lightonml/issues/58 X_dec = np.einsum('ijk,j->ik', X, decay_factors).astype('float32') return _array_to_tensor(X_dec, is_tensor)
[docs]class ConcatenatedBitPlanEncoder: """Implements an encoding that works by concatenating bitplanes along the feature dimension. ``n_bits + starting_bit`` must be lower than the bitwidth of data that are going to be fed to the encoder. E.g. if ``X.dtype`` is ``uint8``, then ``n_bits + starting_bit`` must be lower than 8. If instead ``X.dtype`` is ``uint32``, then ``n_bits + starting_bit`` must be lower than 32. Read more in the Examples section. Parameters ---------- n_bits: int, defaults to 8, number of bits to keep during the encoding. Must be positive. starting_bit: int, defaults to 0, bit used to start the encoding, previous bits will be thrown away. Must be positive. Attributes ---------- n_bits: int, number of bits to keep during the encoding. starting_bit: int, bit used to start the encoding, previous bits will be thrown away. """ def __init__(self, n_bits=8, starting_bit=0): if n_bits <= 0: raise ValueError('n_bits must be a positive integer.') if starting_bit < 0: raise ValueError('starting_bit must be 0 or a positive integer.') super(ConcatenatedBitPlanEncoder, self).__init__() self.n_bits = n_bits self.starting_bit = starting_bit
[docs] def fit(self, X, y=None): """No-op. This method doesn't do anything. It exists purely for compatibility with the scikit-learn transformer API. Parameters ---------- X : 2D np.ndarray y: 1D np.ndarray Returns ------- self : SeparatedBitPlanEncoder """ return self
[docs] def transform(self, X): """Performs the encoding. Parameters ---------- X : 2D np.ndarray of uint8, 16, 32 or 64 [n_samples, n_features], input data to encode. Returns ------- X_enc: 2D np.ndarray of uint8 [n_samples, n_features*n_bits] encoded input data. A line is arranged as [bits_for_first_feature, ..., bits_for_last_feature]. """ X, is_tensor = _tensor_to_array(X) bitwidth = X.dtype.itemsize*8 if self.n_bits+self.starting_bit > bitwidth: raise ValueError('n_bits + starting_bit is greater than bitwidth of input data: ' '{}+{} > {}'.format(self.n_bits, self.starting_bit, bitwidth)) n_samples, n_features = X.shape # add a dimension [n_samples, n_features, 1] and returns a view of the data as uint8 X_uint8 = np.expand_dims(X, axis=2).view(np.uint8) # Unpacks the bits along the auxiliary axis X_uint8_unpacked = np.unpackbits(X_uint8, axis=2) X_enc = X_uint8_unpacked[:, :, self.starting_bit:self.n_bits + self.starting_bit]\ .reshape((n_samples, self.n_bits*n_features)) return _array_to_tensor(X_enc, is_tensor)
[docs]class ConcatenatingBitPlanDecoder: """Implements a decoding that works by concatenating bitplanes. ``n_bits`` MUST be the same value used in SeparatedBitPlanEncoder. Read more in the Examples section. Parameters ---------- n_bits: int, defaults to 8, number of bits used during the encoding. decoding_decay: float, defaults to 0.5, decay to apply to the bits during the decoding. Attributes ---------- n_bits: int, number of bits used during the encoding. decoding_decay: float, defaults to 0.5, decay to apply to the bits during the decoding. """ def __init__(self, n_bits=8, decoding_decay=0.5): self.n_bits = n_bits self.decoding_decay = decoding_decay
[docs] def fit(self, X, y=None): """No-op. This method doesn't do anything. It exists purely for compatibility with the scikit-learn transformer API. Parameters ---------- X : np.ndarray y: np.ndarray, optional, defaults to None. Returns ------- self : MixingBitPlanDecoder """ return self
[docs] def transform(self, X, y=None): """Performs the decoding. Parameters ---------- X : 2D np.ndarray of uint8 or uint16, input data to decode. Returns ------- X_dec : 2D np.ndarray of floats decoded data. """ X, is_tensor = _tensor_to_array(X) n_out, n_features = X.shape n_dim_0 = int(n_out / self.n_bits) X_dec = np.zeros(shape=(n_dim_0, self.n_bits*n_features), dtype='float32') if n_dim_0*self.n_bits*n_features != X.size: raise ValueError('Check that you used the same number of bits in encoder and decoder.') X = np.reshape(X, (n_dim_0, self.n_bits, n_features)) decay_factors = np.reshape(self.decoding_decay ** np.arange(self.n_bits), (1, self.n_bits, 1)) X = X * decay_factors X_dec[:] = np.reshape(X, (n_dim_0, self.n_bits * n_features)) return _array_to_tensor(X_dec, is_tensor)
# noinspection PyPep8Naming
[docs]class Float32Encoder: """Implements an encoding that works by separating bitplans and selecting how many bits to keep for sign, mantissa and exponent of the float32. Parameters ---------- sign_bit: bool, defaults to True, if True keeps the bit for the sign. exp_bits: int, defaults to 8, number of bits of the exponent to keep. mantissa_bits: int, defaults to 23, number of bits of the mantissa to keep. Attributes ---------- sign_bit: bool, defaults to True, if True keeps the bit for the sign. exp_bits: int, defaults to 8, number of bits of the exponent to keep. mantissa_bits: int, defaults to 23, number of bits of the mantissa to keep. n_bits: int, total number of bits to keep. indices: list, list of the indices of the bits to keep. """ def __init__(self, sign_bit=True, exp_bits=8, mantissa_bits=23): if exp_bits < 0 or exp_bits > 8: raise ValueError('exp_bits must be in the range [0, 8]') if mantissa_bits < 0 or mantissa_bits > 23: raise ValueError('mantissa_bits must be in the range [0, 23]') self.sign_bit = sign_bit self.exp_bits = exp_bits self.mantissa_bits = mantissa_bits self.n_bits = int(sign_bit) + exp_bits + mantissa_bits indices = list(range(1, self.exp_bits + 1)) + list(range(9, self.mantissa_bits + 9)) if self.sign_bit: indices = indices + [0] self.indices = sorted(indices)
[docs] def fit(self, X, y=None): """No-op. This method doesn't do anything. It exists purely for compatibility with the scikit-learn transformer API. Parameters ---------- X : 2D np.ndarray y: 1D np.ndarray Returns ------- self : SeparatedBitPlanEncoder """ return self
[docs] def transform(self, X): """Performs the encoding. Parameters ---------- X : 2D np.ndarray of uint8, 16, 32 or 64 [n_samples, n_features], input data to encode. Returns ------- X_enc: 2D np.ndarray of uint8 [n_samples*n_bits, n_features], encoded input data. """ X, is_tensor = _tensor_to_array(X) n_samples, n_features = X.shape # create a new axis and separate the binary representation into 4 uint8 X_uint8 = np.expand_dims(X, axis=2).view('uint8') # reverse the ordering of the uint8 before unpacking # it is not done the other way around because float32 and uint8 don't read the bits in the # same direction X_uint8_reversed = np.flip(X_uint8, axis=2) X_bits = np.unpackbits(X_uint8_reversed, axis=2) # select the bits we asked to keep X_enc = np.transpose(X_bits, [0, 2, 1]) X_enc = X_enc[:, self.indices, :].reshape(n_samples * self.n_bits, n_features) return _array_to_tensor(X_enc, is_tensor)
[docs]class ConcatenatedFloat32Encoder: """Implements an encoding that works by concatenating bitplanes and selecting how many bits to keep for sign, mantissa and exponent of the float32. Parameters ---------- sign_bit: bool, defaults to True, if True keeps the bit for the sign. exp_bits: int, defaults to 8, number of bits of the exponent to keep. mantissa_bits: int, defaults to 23, number of bits of the mantissa to keep. Attributes ---------- sign_bit: bool, defaults to True, if True keeps the bit for the sign. exp_bits: int, defaults to 8, number of bits of the exponent to keep. mantissa_bits: int, defaults to 23, number of bits of the mantissa to keep. n_bits: int, total number of bits to keep. indices: list, list of the indices of the bits to keep. """ def __init__(self, sign_bit=True, exp_bits=8, mantissa_bits=23): if exp_bits < 0 or exp_bits > 8: raise ValueError('exp_bits must be in the range [0, 8]') if mantissa_bits < 0 or mantissa_bits > 23: raise ValueError('mantissa_bits must be in the range [0, 23]') super(ConcatenatedFloat32Encoder, self).__init__() self.sign_bit = sign_bit self.exp_bits = exp_bits self.mantissa_bits = mantissa_bits self.n_bits = int(sign_bit) + exp_bits + mantissa_bits indices = list(range(1, self.exp_bits + 1)) + list(range(9, self.mantissa_bits + 9)) if self.sign_bit: indices = indices + [0] self.indices = sorted(indices)
[docs] def fit(self, X, y=None): """No-op. This method doesn't do anything. It exists purely for compatibility with the scikit-learn transformer API. Parameters ---------- X : 2D np.ndarray y: 1D np.ndarray Returns ------- self : ConcatenatedFloat32Encoder """ return self
[docs] def transform(self, X): """Performs the encoding. Parameters ---------- X : 2D np.ndarray of uint8, 16, 32 or 64 [n_samples, n_features], input data to encode. Returns ------- X_enc: 2D np.ndarray of uint8 [n_samples*n_bits, n_features], encoded input data. """ X, is_tensor = _tensor_to_array(X) n_samples, n_features = X.shape # create a new axis and separate the binary representation into 4 uint8 X_uint8 = np.expand_dims(X, axis=2).view('uint8') # reverse the ordering of the uint8 before unpacking # it is not done the other way around because the 4 butes of float32 are stored # one after the other, it's not one block of 32 bits X_uint8_reversed = np.flip(X_uint8, axis=2) X_bits = np.unpackbits(X_uint8_reversed, axis=2) # select the bits we asked to keep X_enc = X_bits[:, :, self.indices].reshape(n_samples, self.n_bits * n_features) return _array_to_tensor(X_enc, is_tensor)
# noinspection PyPep8Naming
[docs]class BinaryThresholdEncoder: """Implements binary encoding using a threshold function. Parameters ---------- threshold_enc : int Threshold for the binary encoder. Must be in the interval [0, 255] greater_is_one : bool If True, above threshold is 1 and below 0. Vice versa if False. Attributes ---------- threshold_enc : int Threshold for the binary encoder. Must be in the interval [0, 255] greater_is_one : bool If True, above threshold is 1 and below 0. Vice versa if False. """ def __init__(self, threshold_enc=25, greater_is_one=True): if threshold_enc < 0 or threshold_enc > 255: raise ValueError('Invalid value for threshold_enc: must be in the interval [0, 255].') self.threshold_enc = threshold_enc self.greater_is_one = greater_is_one
[docs] def fit(self, X, y=None): """No-op. This method doesn't do anything. It exists purely for compatibility with the scikit-learn transformer API. Parameters ---------- X : np.ndarray, the input data to encode. y : np.ndarray, the targets data. Returns ------- self : BinaryThresholdEncoding """ return self
[docs] def transform(self, X, y=None): """Transform a uint8 array in [0, 255] in a uint8 binary array of [0, 1]. Parameters ---------- X : np.ndarray of uint8, the input data to encode. Returns ------- X_enc : np.ndarray of uint8, the encoded data. """ X, is_tensor = _tensor_to_array(X) if self.greater_is_one: X_enc = (X > self.threshold_enc) else: X_enc = (X < self.threshold_enc) return _array_to_tensor(X_enc.astype(np.uint8), is_tensor)
[docs]class MultiThresholdEncoder: """Implements binary encoding using multiple thresholds. Parameters ---------- thresholds : np.ndarray, thresholds for the binary encoder columnwise: bool, whether to use different thresholds for each column or a common set of thresholds for everything. n_bins: int, if `thresholds` is not specified, `n_bins - 1` thresholds will be created equally spaced on the input range Attributes ---------- thresholds : np.ndarray, thresholds for the binary encoder. columnwise: bool, whether to use different thresholds for each column or a common set of thresholds for everything. n_bins: int, number of different values the encoding can take. A value is encoded into n_bins-1 bits. """ def __init__(self, thresholds=None, n_bins=8, columnwise=False): self.thresholds = thresholds self.columnwise = columnwise if thresholds is not None: if columnwise and len(thresholds.shape) != 2: raise ValueError("You set columnwise=True but passed thresholds is 1D." "Pass a 2D array or set columnwise=False.") self.n_bins = thresholds.shape[-1] + 1 else: self.n_bins = n_bins
[docs] def fit(self, X, y=None): """If thresholds is not None, this method doesn't do anything. If thresholds is `None`, computes `n_bins` thresholds equally spaced on the range of `X`. The range of `X` is determined column-wise but the number of bins is the same for all features. Parameters ---------- X : 2D np.ndarray y: 1D np.ndarray Returns ------- self : MultiThresholdEncoder """ if self.thresholds is None: if self.columnwise: self.thresholds = np.empty((X.shape[1], self.n_bins - 1), dtype='float32') for col in range(X.shape[1]): column = X[:, col] column_thresholds = np.linspace(column.min(), column.max(), self.n_bins + 1)[1:-1] self.thresholds[col] = column_thresholds else: self.thresholds = np.linspace(X.min(), X.max(), self.n_bins + 1)[1:-1] return self
[docs] def transform(self, X): """Transforms an array to a uint8 binary array of [0, 1]. The bins defined by the thresholds are not mutually exclusive, i.e a value x will activate all the bins corresponding to thresholds lesser than x. Parameters ---------- X : np.ndarray of size n_sample x n_features The input data to encode. Returns ------- X_enc : np.ndarray of uint8, of size n_samples x (n_features x n_bins) The encoded data. """ n_thres = self.thresholds.shape[-1] if self.thresholds.ndim == 1: self.thresholds = self.thresholds[:, None, None] elif self.thresholds.ndim == 2: self.thresholds = np.transpose(self.thresholds, (1, 0))[..., None] X, is_tensor = _tensor_to_array(X) X = np.repeat(X[None, ...], n_thres, axis=0) X = X > self.thresholds X_enc = np.concatenate([X[i] for i in range(self.thresholds.shape[0])], axis=-1).astype('uint8') return _array_to_tensor(X_enc, is_tensor)
# noinspection PyPep8Naming
[docs]class SequentialBaseTwoEncoder: """Implements a base 2 encoding. E.g. :math:`5` is written :math:`101` in base 2: :math:`1 * 2^2 + 0 * 2^1 + 1 * 2^0` = (1)*4 +(0)*2 +(1)*1, so the encoder will give 1111001. Parameters ---------- n_gray_levels : int, number of values that can be encoded. Must be a power of 2. Attributes ---------- n_gray_levels : int, number of values that can be encoded. Must be a power of 2. n_bits : int, number of bits needed to encode n_gray_levels values. offset : float, value to subtract to get the minimum to 0. scale : float, scaling factor to normalize the data. """ def __init__(self, n_gray_levels=16): assert type(n_gray_levels) == int, 'n_gray_levels must be an integer power of 2' assert ((n_gray_levels & (n_gray_levels - 1)) == 0) and n_gray_levels > 0, \ 'n_gray_levels must be an integer power of 2' self.n_gray_levels = n_gray_levels self.n_bits = np.uint8(np.log2(self.n_gray_levels)) self.n_bits_type = 8 self.indices_axis_2 = np.arange(self.n_bits_type - self.n_bits, self.n_bits_type) self.offset = None self.scale = None
[docs] def fit(self, X, y=None): """Computes parameters for the normalization. Must be run only on the training set to avoid leaking information to the dev/test set. Parameters ---------- X : np.ndarray of uint [n_samples, n_features], the input data to encode. y : np.ndarray, the targets data. Returns ------- self : SequentialBaseTwoEncoder. """ X, is_tensor = _tensor_to_array(X) self.offset = np.min(X) self.scale = np.max(X - self.offset) return self
[docs] def normalize(self, X): """Normalize the data in the right range before the integer casting. Parameters ---------- X : np.ndarray of uint [n_samples, n_features], the input data to normalize. Returns ------- X_norm : np.ndarray of uint8 [n_samples, n_features], normalized data. """ assert_msg = 'You have to call fit on the training data before calling transform.' assert self.offset is not None, assert_msg assert self.scale is not None, assert_msg # Data normalization X_norm = ((self.n_gray_levels - 1) * (X - self.offset)) / self.scale X_norm = np.round(X_norm) # Force the data is in the good range X_norm[X_norm < 0] = 0 X_norm[X_norm > (self.n_gray_levels - 1)] = (self.n_gray_levels - 1) # Cast to uint8 X_norm = X_norm.astype(np.uint8) return X_norm
[docs] def transform(self, X, y=None): """Performs the encoding. Parameters ---------- X : 2D np.ndarray of uint [n_samples, n_features], input data to encode. Returns ------- X_enc: 2D np.ndarray of uint8 [n_samples, n_features*(n_gray_levels-1) encoded input data. """ X, is_tensor = _tensor_to_array(X) n_samples, n_features = X.shape X = self.normalize(X) # Expand bits along auxiliary axis X_bits = np.unpackbits(np.expand_dims(X, axis=2), axis=2) # Repeat each bit value for the corresponding power of 2 X_enc = np.repeat(X_bits[:, :, self.indices_axis_2], 2 ** np.arange(self.n_bits)[::-1], axis=2) X_enc = X_enc.reshape((n_samples, n_features * (2 ** self.n_bits - 1))) return _array_to_tensor(X_enc, is_tensor)
[docs]class NoEncoding: """Implements a No-Op Encoding class for API consistency. """ def fit(self, X, y=None): return self def transform(self, X, y=None): return X
[docs]class NoDecoding: """Implements a No-Op Decoding class for API consistency. """ def fit(self, X, y=None): return self def transform(self, X, y=None): return X