Source code for lightonopu.opu

# Copyright (c) 2020 LightOn, All Rights Reserved.
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.

"""
This module contains the OPU class, the main class of the library
"""
from typing import Optional, Union
import numpy as np
import getpass
import time, datetime as dt
from contextlib import ExitStack

from lightonopu import output_roi, input_roi, types, utils, config
from lightonopu.config import get_host_option
from lightonopu.user_input import OpuUserInput
from lightonopu.internal import opu_bookings
from lightonopu.simulated_device import SimulatedOpuDevice
from lightonopu.device import OpuDevice
from lightonopu.context import Context
from lightonopu.internal.progress import Progress
from lightonopu.formatting import model1_formatter
from lightonopu.utils import blank_fn

# noinspection PyPep8Naming,PyPackageRequirements
from lightonopu.types import Roi


# noinspection PyPep8Naming
[docs]class OPU: """Interface to the OPU. .. math:: \\mathbf{y} = \\lvert \\mathbf{R} \\mathbf{x} \\rvert^2 Main methods are :obj:`transform1d` and :obj:`transform2d`, and accept NumPy arrays or PyTorch tensors. Acquiring/releasing hardware device resources is done by open/close and a context-manager interface. Unless `open_at_init=False`, these resources are acquired automatically at init. If another process has not released the resources, an error will be raised, call `close()` or shutdown on the OPU object to release it. Parameters ---------- n_components : int, dimensionality of the target projection space. opu_device : OpuDevice or SimulatedOpuDevice, optional optical processing unit instance linked to a physical or simulated device. If not provided, a device is properly instantiated. If opu_device is of type SimulatedOpuDevice, the random matrix is generated at __init__, using max_n_features and n_components max_n_features: int, optional maximum number of binary features that the OPU will transform used only if opu_device is a SimulatedOpuDevice, in order to initiate the random matrix config_file : str, optional path to the configuration file (for dev purpose) config_override: dict, optional for override of the config_file (for dev purpose) verbose_level: int, optional 0, 1 or 2. 0 = no messages, 1 = most messages, and 2 = messages from OPU device (very verbose). features_fmt: types.FeaturesFormat, optional describes how data is formatted on the input device can be "auto", "lined" or "macro_2d", "auto", or "none" If "none", the transform input must be of the same size as device.input_size input_roi_strategy: types.InputRoiStrategy, optional describes how to display the features on the input device @see types.InputRoiStrategy open_at_init: bool, optional forces the setting of acquiring hardware resource at init. If not provided, follow system's setting (usually True) Attributes ---------- n_components: int dimensionality of the target projection space. max_n_features: int maximum number of binary features that the OPU will transform writeable only if opu_device is a SimulatedOpuDevice, in order to initiate or resize the random matrix device: OpuDevice or SimulatedOpuDevice underlying hardware that performs transformation (read-only) features_fmt: types.FeaturesFormat, optional describes how data is formatted on the input device input_roi_strategy: types.InputRoiStrategy, optional describes how to display the features on the input device verbose_level: int, optional 0, 1 or 2. 0 = no messages, 1 = most messages, and 2 = messages """ def __init__(self, n_components: int = 200000, opu_device: Optional[Union[OpuDevice, SimulatedOpuDevice]] = None, max_n_features: int = 1000, config_file: str = "", config_override: dict = None, verbose_level: int = 0, features_fmt: types.FeaturesFormat = types.FeaturesFormat.auto, input_roi_strategy: types.InputRoiStrategy = types.InputRoiStrategy.auto, open_at_init: bool = None): self.__set_verbose(verbose_level) self.__opu_config = None self.__config_file = config_file self.__config_override = config_override # Device init, or take the one passed as input if not opu_device: opu_type = self.config["type"] frametime = self.config["input"]["frametime_us"] exposure = self.config["output"]["exposure_us"] seq_nb_prelim = self.config.get("sequence_nb_prelim", 0) name = self.config["name"] self.device = OpuDevice(opu_type, frametime, exposure, seq_nb_prelim, None, verbose_level >= 2, name) else: if not isinstance(opu_device, (SimulatedOpuDevice, OpuDevice)): raise TypeError("opu_device must be of type {} or {}" .format(SimulatedOpuDevice.__qualname__, OpuDevice.__qualname__)) self.device = opu_device self.is_simulated = isinstance(self.device, SimulatedOpuDevice) default_n_by_pass = 3000 if self.is_simulated: self.__check_booking = False self._max_n_features = max_n_features self._min_batch_size = 0 self._min_components = 0 self._n_samples_by_pass = default_n_by_pass # build the random matrix if not done already self._resize_rnd_matrix(max_n_features, n_components) self._output_roi = output_roi.OutputRoi(self.device.output_shape_max, self.device.output_roi_strategy) self._input_roi = None else: # Check bookings only in production self.__check_booking = self.config.get("status") == "production" self._max_n_features = int(np.prod(self.device.input_shape)) # Minimum number of samples for the input device self._min_batch_size = self.config["input"].\ get("minimum_batch_size", 0) # type: int # Minimum roi size for the output device self._min_components = self.config["output"]\ .get("minimum_output_size", 0) # type: int # number of samples passed at each iteration to the OPU self._n_samples_by_pass = self.config.\ get("n_samples_by_pass", default_n_by_pass) # type: int # Allowed ROI on the output device: None if non-existent allowed_roi = self.config["output"].get("allowed_roi") # type: Roi self._output_roi = output_roi.OutputRoi(self.device.output_shape_max, self.device.output_roi_strategy, allowed_roi, self._min_components) self._input_roi = input_roi.InputRoi(self.device.input_shape, self.config["ones_range"]) # This also sets the output ROI self.n_components = n_components # By default, automatically choose the input ROI self.input_roi_strategy = input_roi_strategy self.features_fmt = features_fmt # number of samples to take in a batch when counting ones self._n_count = 1000 self._projection_times = list() self._trace("OPU initialized") # Open at init, unless relevant host.json option is False if open_at_init is None: open_at_init = get_host_option("lightonopu_open_at_init", True) if open_at_init: self.open() def __set_verbose(self, verbose_level): self.verbose = verbose_level # detect if trace function is available and activated if self.verbose >= 2: try: from lightonopu import logger self._trace = logger.trace except ImportError: self._trace = blank_fn else: self._trace = blank_fn
[docs] def open(self): """Acquires hardware resources used by the OPU device @see close() or use the context manager interface for closing at the end af an indent block """ if self.device.active: return # Check if user has booked the OPU # Rule is opu user doesn't need a time slot, so we just # check if it is booked by anyone if self.__check_booking and not self.is_simulated: if getpass.getuser() == "opu": if opu_bookings.opu_booked(): raise opu_bookings.BookingError("OPU currently booked") elif not opu_bookings.user_opu_booked(): opu_bookings.raise_user_error() self.device.open() self._trace("OPU opened")
[docs] def close(self): """Releases hardware resources used by the OPU device""" self.device.close() self._trace("OPU closed")
[docs] def transform1d(self, X, packed: bool = False, input_roi_: Roi = None, context: Optional[Context] = Context(), raw_output_size: bool = False): """Performs the nonlinear random projections of one 1d input vector, or a batch of 1d input vectors. The input data can be bit-packed, where n_features = 8*X.shape[-1] Otherwise n_features = X.shape[-1] If tqdm module is available, it is used for progress display Parameters ---------- X: np.ndarray or torch.Tensor a 1d input vector, or batch of 1d input_vectors, binary encoded, packed or not batch can be 1d or 2d. In all cases output.shape[:-1] = X.shape[:-1] packed: bool, optional whether the input data is in bit-packed representation defaults to False input_roi_: if provided, as (offset, size), will override the computation of the input ROI (advanced parameter) context: Context, optional will be filled with information about transform @see lightonopu.context.Context raw_output_size: bool, optional If True, don't cut output size at n_components (advanced parameter) Returns ------- Y: np.ndarray or torch.Tensor complete array of nonlinear random projections of X, of size self.n_components """ return self.__transform(X, packed, is_2d_features=False, n_2d_features=None, input_roi_=input_roi_, context=context, raw_output_size=raw_output_size)
[docs] def transform2d(self, X, packed: bool = False, n_2d_features=None, input_roi_: Roi = None, context: Optional[Context] = Context(), raw_output_size: bool = False): """Performs the nonlinear random projections of one 2d input vector, or a batch of 2d input vectors. If tqdm module is available, it is used for progress display Parameters ---------- X: np.ndarray or torch.Tensor a 2d input vector, or batch of 2d input_vectors, binary encoded, packed or not packed: bool, optional whether the input data is in bit-packed representation if True, each input vector is assumed to be a 1d array, and the "real" number of features must be provided as n_2d_features defaults to False n_2d_features: list, tuple or np.ndarray of length 2 If the input is bit-packed, specifies the shape of each input vector. Not needed if the input isn't bit-packed. input_roi_: if provided, as (offset, size), will override the computation of the input ROI (advanced parameter) context: Context, optional will be filled with information about transform @see lightonopu.context.Context raw_output_size: bool, optional If True, don't cut output size at n_components (advanced parameter) Returns ------- Y: np.ndarray or torch.Tensor complete array of nonlinear random projections of X, of size self.n_components """ return self.__transform(X, packed, is_2d_features=True, n_2d_features=n_2d_features, input_roi_=input_roi_, context=context, raw_output_size=raw_output_size)
# noinspection PyUnresolvedReferences def __transform(self, X, packed: bool, is_2d_features: bool, n_2d_features=None, input_roi_: Roi = None, context: Optional[Context] = Context(), raw_output_size=False): # Anything that is entered in stack is automatically exited at # the end of the `with`statement. If nothing is entered, nothing is done. # As per https://stackoverflow.com/a/34798330/4892874 user_input = OpuUserInput(X, packed, is_2d_features, n_2d_features) with ExitStack() as stack: tr_setup = self._TransformSetup(self, user_input, input_roi_, context, stack, raw_output_size) X_ = tr_setup.X indices = tr_setup.indices() # Check if the OPU is active already, otherwise enter its context if not self.device.active: stack.enter_context(self) tr_setup.pre_print() tr_setup.adjust_frametime(stack) t0 = time.time() Y = self.__transform_vectors(tr_setup) t1 = time.time() self._projection_times.append(t1 - t0) # Fill context from opu settings at the end of the transform context.from_opu(self.device, dt.datetime.fromtimestamp(t0), dt.datetime.fromtimestamp(t1)) user_input.reshape_output(Y) # if the input is a tensor, return a tensor in CPU memory if user_input.is_tensor: import torch return torch.from_numpy(Y) else: return Y def __transform_vectors(self, tr_setup): """Do the transform from the input and configuration of a TransformSetup""" nb_retries = 0 # allocation of empty vector for iteration X_ = tr_setup.X indices = tr_setup.indices() # determine the output size of Y; you want the real one, use output ROI size if tr_setup.raw_output_size and not self.is_simulated: out_size = np.prod(self.device.output_roi[1]) else: out_size = self.n_components # single or batch transform: if not tr_setup.user_input.is_batch: # with single transform no need to start acquisition with self.device.acquiring Y, nb_retries = self.__single_transform(X_, tr_setup.user_input.packed, tr_setup.fmt_func, out_size) else: # Batch transform, allocate the result, progress bar, and start acquisition # allocation of empty vector for iteration if self.is_simulated: Y = np.empty((tr_setup.n_samples, out_size), dtype=np.float32) else: Y = np.empty((tr_setup.n_samples, out_size), dtype=np.uint8) self._trace("Y allocated") with self.device.acquiring(), \ Progress(tr_setup.n_samples, self.verbose < 1, "OPU: transform") as p: # iterate over consecutive pairs of indices # (https://stackoverflow.com/a/21303286) for i, (start, end) in enumerate(zip(indices, indices[1:])): c = self.__batch_transform(X_[start:end], Y[start:end], tr_setup.user_input.packed, tr_setup.fmt_func, i) p.update(end - start) nb_retries += c if nb_retries: self._print("OPU number of retries: ", nb_retries) return Y def __enter__(self): """Context manager interface that acquires hardware resources used by the OPU device""" self.open() return self def __exit__(self, *args): self.close() @property def config(self): """Returns the internal configuration object""" # Load it when asked first time if not self.__opu_config: self.__opu_config = config.load_config(self.__config_file, self._trace) if self.__config_override is not None: utils.recurse_update(self.__opu_config, self.__config_override) return self.__opu_config @property def max_n_components(self): return self._output_roi.max_components @property def n_components(self) -> int: return self._n_components @property def n_samples_by_pass(self) -> int: return self._n_samples_by_pass @n_samples_by_pass.setter def n_samples_by_pass(self, value: int): self._n_samples_by_pass = value self.device.reserve(self._n_samples_by_pass) @n_components.setter def n_components(self, value: int): self.device.output_roi = self._output_roi.compute_roi(value) if self.is_simulated: self._resize_rnd_matrix(self.max_n_features, value) # Ask opu device to reserve correct amount of n_samples and cam_ROI self.device.reserve(self._n_samples_by_pass) self._n_components = value @property def max_n_features(self) -> int: return self._max_n_features @max_n_features.setter def max_n_features(self, value: int): if self.is_simulated: self._resize_rnd_matrix(value, self._n_components) else: raise AttributeError("max_n_feature can't be set if device is real") self._max_n_features = value def __single_transform(self, ins: np.ndarray, packed: bool, fmt_func, out_size): """ Format and transform a single vector. ins is 1d or 2d (you can make a single transform of a 2d vector if it matches the input size) Returns tuple: number of retries, output vector """ if self.is_simulated: if packed: ins = np.unpackbits(ins) # when using simulated device, no formatting output, n_retries = self.device.transform_single(ins), 0 else: X_format = np.ones(self.device.input_size, dtype=np.uint8) # do the formatting fmt_func(ins, X_format, packed) output, n_retries = self.__try_or_giveup(self.device.transform_single, X_format) # reshape output to match n_components return output[:out_size], n_retries def __batch_transform(self, ins: np.ndarray, output: np.ndarray, packed: bool, fmt_func, batch_index): """Format and transform a single batch of encoded vectors""" batch_size = ins.shape[0] nb_retries = 0 if self.is_simulated: if packed: ins = np.unpackbits(ins, axis=1) # when using simulated device, no formatting self.device.transform2(ins, output) else: # allocate X_format, which receives the input formatted for input device X_format = np.ones((batch_size, self.device.input_size), dtype=np.uint8) # do the formatting fmt_func(ins, X_format, packed) if batch_size <= self._min_batch_size: # if batch size is too small, pad it with empty vectors padding = ((0, self._min_batch_size - batch_size), (0, 0)) X_padded = np.pad(X_format, padding, 'constant', constant_values=0) else: X_padded = X_format # OPU transform, try it up to 5 times if it fails _, nb_retries = self.__try_or_giveup(self.device.transform2, X_padded, output, batch_index) return nb_retries def __try_or_giveup(self, func, *args, **kwargs): # in verbose mode, print the exception return utils.try_or_giveup(func, 5, self.verbose >= 2, *args, **kwargs) def _count_ones(self, X: np.ndarray, packed): """count avg number of ones in binary array X: 2D ndarray """ return utils.count_ones(X, self._n_count, packed) def _resize_rnd_matrix(self, n_features: int, n_components: int): """Resize device's random matrix""" rnd_mat = self.device.random_matrix if rnd_mat is None or rnd_mat.shape != (n_features, n_components): self._print("OPU: computing the random matrix... ", end='', flush=True) self.device.build_random_matrix(n_features, n_components) self._print("OK") def _print(self, *args, **kwargs): if self.verbose >= 1: print(*args, **kwargs)
[docs] def version(self): """Returns a multi-line string containing name and versions of the OPU""" from lightonopu import __version__ as lgversion version = [] # Build OPU name opu_name = self.__opu_config['name'] opu_version = self.__opu_config['version'] opu_location = self.__opu_config['location'] version.append('OPU ' + opu_name+'-'+opu_version+'-'+opu_location) # module version version.append("lightonopu version " + lgversion) version.append(self.device.versions()) return '\n'.join(version)
def transform(self, *args, **kwargs): raise RuntimeError("Please now use transform1d or transform2d") # noinspection PyProtectedMember class _TransformSetup: """Internal class for use with OPU transform This class is short-lived for each transform""" def __init__(self, opu, user_input: OpuUserInput, input_roi_: Optional[Roi], context: Context, stack: ExitStack, raw_output_size): """Prepare for transform, see prepare function Stack is contextlib.ExitStack of the calling OPU object """ self.raw_output_size = raw_output_size self.context = context self.manual_input_roi = input_roi_ self.user_input = user_input self.opu = opu self.device = opu.device # Currently only Model1 is supported (column order) # Instantiate at each TransformSetup because of issue #36 self.formatter = model1_formatter(self.device.input_shape, opu._trace) # Whether input's n_feature matches the max number of features # If yes, no need to format self.input_matches_max = self.user_input.n_features_s == self.opu.max_n_features # Whether input ROI should be computed automatically self.auto_input_roi = \ self.manual_input_roi is None \ and self.opu.input_roi_strategy == types.InputRoiStrategy.auto # Whether we should format the input before transform self.needs_fmt = not self.input_matches_max\ and not self.opu.is_simulated self._print = self.opu._print self.initial_check() self.prepare(stack) def initial_check(self): if self.user_input.n_features_s > self.opu.max_n_features: raise ValueError("input's number of features ({})" " can't be greater than {}" .format(self.user_input.n_features_s, self.opu.max_n_features)) # noinspection PyAttributeOutsideInit def prepare(self, stack: ExitStack): """Prepares for transform * reshape input if needed, * count number of ones * compute input ROI * configure formatting * adjust exposure Stack is contextlib.ExitStack of the calling OPU object """ # X_ is a potentially reshaped X, if input is 3D # X will be reshaped back at context exit # reshape in all cases except when input matches # input device's size (and no simulation) # and 2d unpacked leave_2d_features = self.input_matches_max and \ self.user_input.is_2d_features and \ not self.user_input.packed and \ not self.opu.is_simulated self.X = stack.enter_context(self.user_input.reshape_input(leave_2d_features)) # Check if data is binary, unless OPU is simulated, # which is an allowed case (issue #41) if not self.opu.is_simulated: self.user_input.binary_check() self.n_samples = len(self.user_input) # compute input ROI and get format function if self.needs_fmt: n_ones = None # average number of ones in the input if self.auto_input_roi: # without automatic input ROI, number of ones will be needed n_ones = self.opu._count_ones(self.X, self.user_input.packed) self._print("OPU: counted an average of" " {:.2f} ones in input of size {} ({:.2f}%)." .format(n_ones, self.user_input.n_features_s, 100 * n_ones / self.user_input.n_features_s)) # checked_ones says whether the input has sufficient ones ratio # can be None if not automatic input_roi checked_ones = self._input_fmt_func(n_ones) self.fmt_func = self.formatter.apply if self.auto_input_roi and checked_ones > 1: # check_ones > 1 means too much ones on the input device # we have to lower exposure in order not to saturate excessively previous = self.device.exposure_us self.device.exposure_us /= checked_ones # restore exposure will be called at exit stack.callback(lambda: setattr(self.device, "exposure_us", previous)) else: self.context.fmt_factor = 0 self.context.fmt_type = types.FeaturesFormat.none self.context.input_roi_upper = [0, 0] self.context.input_roi_shape = self.device.input_shape self.context.n_ones = None # no need to format input, just setup the "formatting" function self.fmt_func = self.formatter.apply_plain def pre_print(self): self._print('OPU: random projections of an array of' ' size {}'.format(self.X.shape)) self._print("OPU: using frametime {} μs, exposure {} μs, " "output ROI {}".format(self.device.frametime_us, self.device.exposure_us, self.device.output_roi)) def adjust_frametime(self, stack: ExitStack): """frametime can be lower device readout in large ROI""" # Changing ROI can raise the minimum frame-time min_frametime_us = self.device.output_readout_us + 10 if self.device.frametime_us < min_frametime_us: previous = self.device.frametime_us self.device.frametime_us = min_frametime_us # restore frametime will be called at exit stack.callback(lambda: setattr(self.device, "frametime_us", previous)) def _input_fmt_func(self, n_ones) -> float: """Compute input ROI if necessary, and initialize formatting function Returns ------- checked_ones: float or None (only if in auto mode) * -1 if not enough ones * 0 if enough * ratio over the maximum allowed, if too much (will saturate) """ checked_ones = {} # Get appropriate input ROI This also does sanity checks for n_features offset, size = self.manual_input_roi or \ self.opu._input_roi.compute_roi(self.opu.input_roi_strategy, self.user_input.n_features, n_ones, checked_ones) if self.opu.features_fmt == types.FeaturesFormat.auto: if self.user_input.is_2d_features: fmt_type = types.FeaturesFormat.macro_2d else: fmt_type = types.FeaturesFormat.lined else: fmt_type = self.opu.features_fmt # configure formatting (computes the mapping btw features and input device) self.formatter.configure(self.user_input.n_features, fmt_type, offset, size) factor = self.formatter.factor self._print("OPU: formatting {} with elements size {}. input ROI: {}, {} " .format(fmt_type.name, factor, offset, size)) self.context.input_roi_upper = offset self.context.input_roi_shape = size self.context.n_ones = checked_ones.get("total") self.context.fmt_type = fmt_type self.context.fmt_factor = factor # checked_ones may be an empty dict, in this case return None return checked_ones.get("value") def indices(self) -> np.ndarray: # Divide indices in batches of n_samples_by_pass return self._get_batch_indices(self.n_samples, self.opu._n_samples_by_pass) @staticmethod def _get_batch_indices(total_size: int, slice_size: int) -> np.ndarray: """Given total_size, return an array with intermediate slices e.g. 55 with a slice size of 10 gives [0, 10, 20, 30, 40, 50, 55] """ (nb_runs, remainder) = divmod(total_size, slice_size) indices = np.arange(nb_runs + 1) * slice_size if remainder: indices = np.append(indices, total_size) return indices