# -*- coding: utf8
"""Encoders
These modules contains implementations of Encoders that can transform data
in the binary `uint8` format required by the OPU. Compatible with numpy.ndarray
and torch.Tensor.
"""
import numexpr as ne
import numpy as np
import functools
def _tensor_to_array(X):
is_tensor = type(X).__name__ == 'Tensor'
X = X.numpy() if is_tensor else X
return X, is_tensor
def _array_to_tensor(X, is_tensor):
if is_tensor:
import torch
return torch.from_numpy(X)
else:
return X
def preserve_type(transform):
@functools.wraps(transform)
def wrapper(instance, X):
X, is_tensor = _tensor_to_array(X)
Z = transform(instance, X)
return _array_to_tensor(Z, is_tensor)
return wrapper
# noinspection PyPep8Naming
[docs]class SeparatedBitPlanEncoder(BaseTransformer):
"""Implements an encoding that works by separating bitplans.
``n_bits + starting_bit`` must be lower than the bitwidth of data that are going to be fed to the encoder.
E.g. if ``X.dtype`` is ``uint8``, then ``n_bits + starting_bit`` must be lower than 8.
If instead ``X.dtype`` is ``uint32``, then ``n_bits + starting_bit`` must be lower than 32.
Read more in the Examples section.
Parameters
----------
n_bits: int, defaults to 8,
number of bits to keep during the encoding. Must be positive.
starting_bit: int, defaults to 0,
bit used to start the encoding, previous bits will be thrown away. Must be positive.
Attributes
----------
n_bits: int,
number of bits to keep during the encoding.
starting_bit: int,
bit used to start the encoding, previous bits will be thrown away.
"""
def __init__(self, n_bits=8, starting_bit=0):
if n_bits <= 0:
raise ValueError('n_bits must be a positive integer.')
if starting_bit < 0:
raise ValueError('starting_bit must be 0 or a positive integer.')
self.n_bits = n_bits
self.starting_bit = starting_bit
# noinspection PyPep8Naming
[docs]class MixingBitPlanDecoder(BaseTransformer):
"""Implements a decoding that works by mixing bitplanes.
``n_bits`` MUST be the same value used in SeparatedBitPlanEncoder.
Read more in the Examples section.
Parameters
----------
n_bits: int, defaults to 8,
number of bits used during the encoding.
decoding_decay: float, defaults to 0.5,
decay to apply to the bits during the decoding.
Attributes
----------
n_bits: int,
number of bits used during the encoding.
decoding_decay: float, defaults to 0.5,
decay to apply to the bits during the decoding.
"""
def __init__(self, n_bits=8, decoding_decay=0.5):
self.n_bits = n_bits
self.decoding_decay = decoding_decay
[docs]class ConcatenatedBitPlanEncoder(BaseTransformer):
"""Implements an encoding that works by concatenating bitplanes along the feature dimension.
``n_bits + starting_bit`` must be lower than the bitwidth of data that are going to be fed
to the encoder.
E.g. if ``X.dtype`` is ``uint8``, then ``n_bits + starting_bit`` must be lower than 8.
If instead ``X.dtype`` is ``uint32``, then ``n_bits + starting_bit`` must be lower than 32.
Read more in the Examples section.
Parameters
----------
n_bits: int, defaults to 8,
number of bits to keep during the encoding. Must be positive.
starting_bit: int, defaults to 0,
bit used to start the encoding, previous bits will be thrown away. Must be positive.
Attributes
----------
n_bits: int,
number of bits to keep during the encoding.
starting_bit: int,
bit used to start the encoding, previous bits will be thrown away.
"""
def __init__(self, n_bits=8, starting_bit=0):
if n_bits <= 0:
raise ValueError('n_bits must be a positive integer.')
if starting_bit < 0:
raise ValueError('starting_bit must be 0 or a positive integer.')
super(ConcatenatedBitPlanEncoder, self).__init__()
self.n_bits = n_bits
self.starting_bit = starting_bit
[docs]class ConcatenatingBitPlanDecoder(BaseTransformer):
"""Implements a decoding that works by concatenating bitplanes.
``n_bits`` MUST be the same value used in SeparatedBitPlanEncoder.
Read more in the Examples section.
Parameters
----------
n_bits: int, defaults to 8,
number of bits used during the encoding.
decoding_decay: float, defaults to 0.5,
decay to apply to the bits during the decoding.
Attributes
----------
n_bits: int,
number of bits used during the encoding.
decoding_decay: float, defaults to 0.5,
decay to apply to the bits during the decoding.
"""
def __init__(self, n_bits=8, decoding_decay=0.5):
self.n_bits = n_bits
self.decoding_decay = decoding_decay
# noinspection PyPep8Naming
[docs]class Float32Encoder(BaseTransformer):
"""Implements an encoding that works by separating bitplans and selecting how many bits
to keep for sign, mantissa and exponent of the float32.
Parameters
----------
sign_bit: bool, defaults to True,
if True keeps the bit for the sign.
exp_bits: int, defaults to 8,
number of bits of the exponent to keep.
mantissa_bits: int, defaults to 23,
number of bits of the mantissa to keep.
Attributes
----------
sign_bit: bool, defaults to True,
if True keeps the bit for the sign.
exp_bits: int, defaults to 8,
number of bits of the exponent to keep.
mantissa_bits: int, defaults to 23,
number of bits of the mantissa to keep.
n_bits: int,
total number of bits to keep.
indices: list,
list of the indices of the bits to keep.
"""
def __init__(self, sign_bit=True, exp_bits=8, mantissa_bits=23):
if exp_bits < 0 or exp_bits > 8:
raise ValueError('exp_bits must be in the range [0, 8]')
if mantissa_bits < 0 or mantissa_bits > 23:
raise ValueError('mantissa_bits must be in the range [0, 23]')
self.sign_bit = sign_bit
self.exp_bits = exp_bits
self.mantissa_bits = mantissa_bits
self.n_bits = int(sign_bit) + exp_bits + mantissa_bits
indices = list(range(1, self.exp_bits + 1)) + list(range(9, self.mantissa_bits + 9))
if self.sign_bit:
indices = indices + [0]
self.indices = sorted(indices)
[docs]class ConcatenatedFloat32Encoder(BaseTransformer):
"""Implements an encoding that works by concatenating bitplanes and selecting how many bits
to keep for sign, mantissa and exponent of the float32.
Parameters
----------
sign_bit: bool, defaults to True,
if True keeps the bit for the sign.
exp_bits: int, defaults to 8,
number of bits of the exponent to keep.
mantissa_bits: int, defaults to 23,
number of bits of the mantissa to keep.
Attributes
----------
sign_bit: bool, defaults to True,
if True keeps the bit for the sign.
exp_bits: int, defaults to 8,
number of bits of the exponent to keep.
mantissa_bits: int, defaults to 23,
number of bits of the mantissa to keep.
n_bits: int,
total number of bits to keep.
indices: list,
list of the indices of the bits to keep.
"""
def __init__(self, sign_bit=True, exp_bits=8, mantissa_bits=23):
if exp_bits < 0 or exp_bits > 8:
raise ValueError('exp_bits must be in the range [0, 8]')
if mantissa_bits < 0 or mantissa_bits > 23:
raise ValueError('mantissa_bits must be in the range [0, 23]')
super(ConcatenatedFloat32Encoder, self).__init__()
self.sign_bit = sign_bit
self.exp_bits = exp_bits
self.mantissa_bits = mantissa_bits
self.n_bits = int(sign_bit) + exp_bits + mantissa_bits
indices = list(range(1, self.exp_bits + 1)) + list(range(9, self.mantissa_bits + 9))
if self.sign_bit:
indices = indices + [0]
self.indices = sorted(indices)
# noinspection PyPep8Naming
[docs]class BinaryThresholdEncoder(BaseTransformer):
"""Implements binary encoding using a threshold function.
Parameters
----------
threshold_enc : int or str
Threshold for the binary encoder. Default 0.
'auto' will set threshold_enc to feature-wise median of the data passed to the fit function.
greater_is_one : bool
If True, above threshold is 1 and below 0. Vice versa if False.
Attributes
----------
threshold_enc : int or str
Threshold for the binary encoder.
greater_is_one : bool
If True, above threshold is 1 and below 0. Vice versa if False.
"""
def __init__(self, threshold_enc='auto', greater_is_one=True):
if not (isinstance(threshold_enc, float) or isinstance(threshold_enc, int) or threshold_enc == 'auto'):
raise ValueError("Argument threshold_enc should be a number or 'auto'.")
self.threshold_enc = threshold_enc
self.greater_is_one = greater_is_one
[docs] def fit(self, X, y=None):
"""
When threshold_enc is 'auto', this method sets it to a vector containing the median of each column of X.
Otherwise, it does nothing except print a warning in case threshold_enc is not in the range covered by X.
Parameters
----------
X : np.ndarray,
the input data to encode.
y : np.ndarray,
the targets data.
Returns
-------
self : BinaryThresholdEncoding
"""
if isinstance(self.threshold_enc, str):
self.threshold_enc = np.median(X, axis=0) # the median is feature-wise
else:
if self.threshold_enc < X.min() or self.threshold_enc > X.max():
print('WARNING: encoder threshold is outside data range')
return self
[docs]class MultiThresholdEncoder(BaseTransformer):
"""Implements binary encoding using multiple thresholds.
Parameters
----------
thresholds : list, np.ndarray or str
thresholds for the binary encoder. If a list or an array is passed, the thresholds will be used unmodified.
If thresholds='linspace', the values will be evenly distributed along the data range.
If thresholds='quantile', the values will be set to the quantiles corresponding to n_bins.
If n_bins=4, the thresholds will be the 1st, 2nd and 3rd quartiles.
columnwise: bool,
whether to use different thresholds for each column or a common set of thresholds for everything.
n_bins: int,
if `thresholds` is 'linspace' or 'quantiles', `n_bins - 1` thresholds will be created?
Attributes
----------
thresholds : np.ndarray,
thresholds for the binary encoder.
columnwise: bool,
whether to use different thresholds for each column or a common set of thresholds for everything.
n_bins: int,
number of different values the encoding can take. A value is encoded into n_bins-1 bits.
"""
def __init__(self, thresholds='linspace', n_bins=8, columnwise=False):
if isinstance(thresholds, list) or isinstance(thresholds, np.ndarray):
thresholds = np.array(thresholds)
if columnwise and thresholds.ndim != 2:
raise ValueError("""
You set columnwise to True but thresholds is 1D.
Pass a 2D array or set columnwise to False.
""")
self.n_bins = thresholds.shape[-1] + 1
elif thresholds in ['linspace', 'quantiles']:
self.n_bins = n_bins
else:
raise ValueError("Argument thresholds must be a list, a numpy array or be in ['linspace', 'quantiles'].")
self.thresholds = thresholds
self.columnwise = columnwise
[docs] def fit(self, X, y=None):
"""If thresholds is not None, this method doesn't do anything.
If thresholds is `None`, computes `n_bins` thresholds equally spaced on the range of `X`.
The range of `X` is determined column-wise but the number of bins is the same for all features.
Parameters
----------
X : 2D np.ndarray
y: 1D np.ndarray
Returns
-------
self : MultiThresholdEncoder
"""
def set_thresholds(array):
if self.thresholds == 'linspace':
return np.linspace(array.min(), array.max(), self.n_bins + 1)[1:-1]
elif self.thresholds == 'quantiles':
k = 1 / self.n_bins
quantiles = [np.quantile(array, i*k, interpolation='midpoint') for i in range(1, self.n_bins)]
return np.array(quantiles)
if self.thresholds in ['linspace', 'quantiles']:
if self.columnwise:
thresholds = np.empty((X.shape[1], self.n_bins - 1), dtype='float32')
for col in range(X.shape[1]):
column = X[:, col]
column_thresholds = set_thresholds(column)
thresholds[col] = column_thresholds
self.thresholds = thresholds
else:
self.thresholds = set_thresholds(X)
return self
# noinspection PyPep8Naming
[docs]class SequentialBaseTwoEncoder(BaseTransformer):
"""Implements a base 2 encoding.
E.g. :math:`5` is written :math:`101` in base 2: :math:`1 * 2^2 + 0 * 2^1 + 1 * 2^0` = (1)*4 +(0)*2 +(1)*1, so the
encoder will give 1111001.
Parameters
----------
n_gray_levels : int,
number of values that can be encoded. Must be a power of 2.
Attributes
----------
n_gray_levels : int,
number of values that can be encoded. Must be a power of 2.
n_bits : int,
number of bits needed to encode n_gray_levels values.
offset : float,
value to subtract to get the minimum to 0.
scale : float,
scaling factor to normalize the data.
"""
def __init__(self, n_gray_levels=16):
assert type(n_gray_levels) == int, 'n_gray_levels must be an integer power of 2'
assert ((n_gray_levels & (n_gray_levels - 1)) == 0) and n_gray_levels > 0, \
'n_gray_levels must be an integer power of 2'
self.n_gray_levels = n_gray_levels
self.n_bits = np.uint8(np.log2(self.n_gray_levels))
self.n_bits_type = 8
self.indices_axis_2 = np.arange(self.n_bits_type - self.n_bits, self.n_bits_type)
self.offset = None
self.scale = None
[docs] def fit(self, X, y=None):
"""Computes parameters for the normalization.
Must be run only on the training set to avoid leaking information to the dev/test set.
Parameters
----------
X : np.ndarray of uint [n_samples, n_features],
the input data to encode.
y : np.ndarray,
the targets data.
Returns
-------
self : SequentialBaseTwoEncoder.
"""
X, is_tensor = _tensor_to_array(X)
self.offset = np.min(X)
self.scale = np.max(X - self.offset)
return self
[docs] def normalize(self, X):
"""Normalize the data in the right range before the integer casting.
Parameters
----------
X : np.ndarray of uint [n_samples, n_features],
the input data to normalize.
Returns
-------
X_norm : np.ndarray of uint8 [n_samples, n_features],
normalized data.
"""
assert_msg = 'You have to call fit on the training data before calling transform.'
assert self.offset is not None, assert_msg
assert self.scale is not None, assert_msg
# Data normalization
X_norm = ((self.n_gray_levels - 1) * (X - self.offset)) / self.scale
X_norm = np.round(X_norm)
# Force the data is in the good range
X_norm[X_norm < 0] = 0
X_norm[X_norm > (self.n_gray_levels - 1)] = (self.n_gray_levels - 1)
# Cast to uint8
X_norm = X_norm.astype(np.uint8)
return X_norm
[docs]class NoEncoding(BaseTransformer):
"""Implements a No-Op Encoding class for API consistency."""
[docs]class NoDecoding(BaseTransformer):
"""Implements a No-Op Decoding class for API consistency."""