Source code for lightonopu.opu

"""
This module contains the OPU class, the main class of the library
"""

from typing import Optional, Union
import numpy as np
import json
import getpass
import time
from contextlib import ExitStack
from pathlib import Path

from lightonopu import camera_roi, dmd_roi, types, input, utils, opu_bookings
from lightonopu.simulated_device import SimulatedOpuDevice
from lightonopu.device import OpuDevice
from lightonopu.context import Context
from lightonopu.progress import Progress
from lightonopu.formatting import OpuFormatter


# noinspection PyPep8Naming,PyPackageRequirements
from lightonopu.types import Roi


# noinspection PyPep8Naming
[docs]class OPU: """Interface to the OPU. .. math:: \\mathbf{y} = \\lvert \\mathbf{R} \\mathbf{x} \\rvert^2 Main methods are :obj:`transform1d` and :obj:`transform2d`, and accept NumPy arrays or PyTorch tensors. OPU offers a context-manager interface for acquiring hardware device resources. Parameters ---------- n_components : int, dimensionality of the target projection space. opu_device : OpuDevice or SimulatedOpuDevice, optional optical processing unit instance linked to a physical or simulated device. If not provided, a device is properly instantiated. If opu_device is of type SimulatedOpuDevice, the random matrix is generated at __init__, using max_n_features and n_components max_n_features: int, optional maximum number of binary features that the OPU will transform used only if opu_device is a SimulatedOpuDevice, in order to initiate the random matrix config_file : str, optional path to the configuration file (for dev purpose) config_override: dict, optional for override of the config_file (for dev purpose) verbose_level: int, optional 0, 1 or 2. 0 = no messages, 1 = most messages, and 2 = messages from OPU device (very verbose). features_fmt: types.FeaturesFormat, optional describes how data is formatted on the DMD can be "auto", "lined" or "macro_pixels", or "none" If "none", the transform input must be of the same size as device.input_size dmd_strategy: types.DmdRoiStrategy, optional describes how to display the features on the DMD @see types.DmdRoiStrategy Attributes ---------- n_components: int dimensionality of the target projection space. max_n_features: int maximum number of binary features that the OPU will transform writeable only if opu_device is a SimulatedOpuDevice, in order to initiate or resize the random matrix device: OpuDevice or SimulatedOpuDevice underlying hardware that performs transformation (read-only) features_fmt: types.FeaturesFormat, optional describes how data is formatted on the DMD dmd_strategy: types.DmdRoiStrategy, optional describes how to display the features on the DMD verbose_level: int, optional 0, 1 or 2. 0 = no messages, 1 = most messages, and 2 = messages """ def __init__(self, n_components: int = 200000, opu_device: Optional[Union[OpuDevice, SimulatedOpuDevice]] = None, max_n_features: int = 1000, config_file: str = "/etc/lighton/opu.json", config_override: dict = None, verbose_level: int = 0, features_fmt: types.FeaturesFormat = types.FeaturesFormat.auto, dmd_strategy: types.DmdRoiStrategy = types.DmdRoiStrategy.auto): self.__opu_config = json.loads(Path(config_file).read_text()) if config_override is not None: utils.recurse_update(self.__opu_config, config_override) self.verbose = verbose_level # Check bookings only in production self.__check_booking = self.__opu_config.get("status") == "production" # Device init, or take the one passed as input if not opu_device: opu_type = self.__opu_config["type"] frametime = self.__opu_config["dmd"]["frametime_us"] exposure = self.__opu_config["camera"]["exposure_us"] seq_nb_prelim = self.__opu_config.get("sequence_nb_prelim", 0) self.device = OpuDevice(opu_type, frametime, exposure, seq_nb_prelim, None, verbose_level >= 2) else: if not isinstance(opu_device, (SimulatedOpuDevice, OpuDevice)): raise TypeError("opu_device must be of type {} or {}" .format(type(SimulatedOpuDevice).__qualname__, type(OpuDevice).__qualname__)) self.device = opu_device self.is_simulated = isinstance(self.device, SimulatedOpuDevice) if self.is_simulated: self._max_n_features = max_n_features # build the random matrix if not done already self._resize_rnd_matrix(max_n_features, n_components) else: self._max_n_features = int(np.prod(self.device.dmd_shape)) # Allowed ROI on the camera: None if non-existent self._allowed_roi = self.__opu_config["camera"]\ .get("allowed_roi") # type: Roi # Minimum number of samples for the DMD self._min_batch_size = self.__opu_config["dmd"]\ .get("minimum_batch_size", 0) # type: int # Minimum roi size for the camera self._min_components = self.__opu_config["camera"]\ .get("minimum_output_size", 0) # type: int # number of samples passed at each iteration to the OPU self._n_samples_by_pass = self.__opu_config\ .get("n_samples_by_pass", 3000) # type: int self._cam_roi = camera_roi.CameraRoi(self.device.cam_shape_max, self.device.cam_roi_strategy, self._allowed_roi, self._min_components) # This also sets the camera ROI self.n_components = n_components self._dmd_roi = dmd_roi.DmdRoi(self.device.dmd_shape, self.__opu_config["ones_range"]) # By default, automatically choose the DMD ROI self.dmd_strategy = dmd_strategy self.features_fmt = features_fmt # number of samples to take in a batch when counting ones self._n_count = 1000 self._projection_times = list()
[docs] def open(self): """Acquires hardware resources used by the OPU device @see close() or use the context manager interface for closing at the end af an indent block """ if self.device.active: return # Check if user has booked the OPU # Rule is opu user doesn't need a time slot, so we just # check if it is booked by anyone if self.__check_booking and not self.is_simulated: if getpass.getuser() == "opu": if opu_bookings.opu_booked(): raise opu_bookings.BookingError("OPU currently booked") elif not opu_bookings.user_opu_booked(): opu_bookings.raise_user_error() self.device.open()
[docs] def close(self): """Releases hardware resources used by the OPU device""" self.device.close()
[docs] def transform1d(self, X, packed: bool = False, dmd_roi_: Roi = None, context: Optional[Context] = Context()): """Performs the nonlinear random projections of 1d input vector(s). The input data can be bit-packed, where n_features = 8*X.shape[-1] Otherwise n_features = X.shape[-1] If tqdm module is available, it is used for progress display Parameters ---------- X: np.ndarray or torch.Tensor a 1d input vector, or batch of 1d input_vectors, binary encoded, packed or not batch can be 1d or 2d. In all cases output.shape[:-1] = X.shape[:-1] packed: bool, optional whether the input data is in bit-packed representation defaults to False dmd_roi_: if provided, as (offset, size), will override the computation of the dmd ROI (advanced parameter) context: Context, optional will be filled with information about transform @see lightonopu.context.Context Returns ------- Y: np.ndarray or torch.Tensor complete array of nonlinear random projections of X, of size self.n_components """ return self.__transform(X, packed, is_2d_features=False, n_2d_features=None, dmd_roi_=dmd_roi_, context=context)
[docs] def transform2d(self, X, packed: bool = False, n_2d_features=None, dmd_roi_: Roi = None, context: Optional[Context] = Context()): """Performs the nonlinear random projections of 2d input vector(s). If tqdm module is available, it is used for progress display Parameters ---------- X: np.ndarray or torch.Tensor a 2d input vector, or batch of 2d input_vectors, binary encoded, packed or not packed: bool, optional whether the input data is in bit-packed representation if True, each input vector is assumed to be a 1d array, and the "real" number of features must be provided as n_2d_features defaults to False n_2d_features: list, tuple or np.ndarray of length 2 If the input is packed dmd_roi_: if provided, as (offset, size), will override the computation of the dmd ROI (advanced parameter) context: Context, optional will be filled with information about transform @see lightonopu.context.Context Returns ------- Y: np.ndarray or torch.Tensor complete array of nonlinear random projections of X, of size self.n_components """ return self.__transform(X, packed, is_2d_features=True, n_2d_features=n_2d_features, dmd_roi_=dmd_roi_, context=context)
# noinspection PyUnresolvedReferences def __transform(self, X, packed: bool, is_2d_features: bool, n_2d_features=None, dmd_roi_: Roi = None, context: Optional[Context] = Context()): # Anything that is entered in stack is automatically exited at # the end of the `with`statement. If nothing is entered, nothing is done. # As per https://stackoverflow.com/a/34798330/4892874 opu_input = input.OpuInput(X, packed, is_2d_features, n_2d_features) with ExitStack() as stack: tr_setup = self._TransformSetup(self, opu_input, dmd_roi_, context, stack) # allocation of empty vector for iteration Y = np.empty((tr_setup.n_samples, self.n_components), dtype=np.uint8) X_ = tr_setup.X indices = tr_setup.indices() # Check if the OPU is active already, otherwise enter its context if not self.device.active: stack.enter_context(self) tr_setup.pre_print() tr_setup.adjust_frametime(stack) stack.enter_context(self.device.acquiring()) t0 = time.time() nb_retries = 0 with Progress(tr_setup.n_samples, self.verbose < 1, "OPU: transform") as p: # iterate over consecutive pairs of indices # (https://stackoverflow.com/a/21303286) for i, (start, end) in enumerate(zip(indices, indices[1:])): c = self.batch_transform(X_[start:end], Y[start:end], packed, tr_setup.fmt_func, i) p.update(end-start) nb_retries += c if nb_retries: self._print("OPU number of retries: ", nb_retries) t1 = time.time() self._projection_times.append(t1 - t0) # Fill context from opu settings at the end of the transform context.from_opu(self.device, t0, t1) opu_input.reshape_output(Y) # if the input is a tensor, return a tensor in CPU memory if opu_input.is_tensor: import torch return torch.from_numpy(Y) else: return Y def __enter__(self): """Context manager interface that acquires hardware resources used by the OPU device""" self.open() return self def __exit__(self, *args): self.close() @property def max_n_components(self): return self._cam_roi.max_components @property def n_components(self) -> int: return self._n_components @property def n_samples_by_pass(self) -> int: return self._n_samples_by_pass @n_samples_by_pass.setter def n_samples_by_pass(self, value: int): self._n_samples_by_pass = value self.device.reserve(self._n_samples_by_pass) @n_components.setter def n_components(self, value: int): self.device.cam_ROI = self._cam_roi.compute_roi(value) if self.is_simulated: self._resize_rnd_matrix(self.max_n_features, value) # Ask opu device to reserve correct amount of n_samples and cam_ROI self.device.reserve(self._n_samples_by_pass) self._n_components = value @property def max_n_features(self) -> int: return self._max_n_features @max_n_features.setter def max_n_features(self, value: int): if self.is_simulated: self._resize_rnd_matrix(value, self._n_components) else: raise AttributeError("max_n_feature can't be set if device is real") self._max_n_features = value
[docs] def batch_transform(self, ins: np.ndarray, output: np.ndarray, packed: bool, fmt_func, batch_index): """Format and transform a single batch of encoded vectors""" batch_size = ins.shape[0] nb_retries = 0 if self.is_simulated: if packed: ins = np.unpackbits(ins, axis=1) # when using simulated device, no formatting and no bitpack self.device.transform2(ins, output) else: # allocate X_format X_format = np.ones((batch_size, self.device.input_size), dtype=np.uint8) # do the formatting fmt_func(ins, X_format, packed) if batch_size <= self._min_batch_size: # if batch size is too small, pad it with empty vectors padding = ((0, self._min_batch_size - batch_size), (0, 0)) X_padded = np.pad(X_format, padding, 'constant', constant_values=0) else: X_padded = X_format # OPU transform, try it up to 5 times if it fails # in verbose mode, print the exception Y, nb_retries = utils.try_or_giveup(self.device.transform2, 5, self.verbose >= 2, X_padded, output, batch_index) return nb_retries
def _count_ones(self, X: np.ndarray, packed): """count avg number of ones in binary array X: 2D ndarray """ return utils.count_ones(X, self._n_count, packed) def _resize_rnd_matrix(self, n_features: int, n_components: int): """Resize device's random matrix""" rnd_mat = self.device.random_matrix if rnd_mat is None or rnd_mat.shape != (n_features, n_components): self._print("OPU: computing the random matrix... ", end='', flush=True) self.device.build_random_matrix(n_features, n_components) self._print("OK") def _print(self, *args, **kwargs): if self.verbose >= 1: print(*args, **kwargs)
[docs] def version(self): """Returns a multi-line string containing name and versions of the OPU""" from lightonopu import __version__ as lgversion version = [] # Build OPU name opu_name = self.__opu_config['name'] opu_version = self.__opu_config['version'] opu_location = self.__opu_config['location'] version.append('OPU ' + opu_name+'-'+opu_version+'-'+opu_location) # module version version.append("lightonopu version " + lgversion) version.append(self.device.versions()) return '\n'.join(version)
def transform(self, *args, **kwargs): raise RuntimeError("Please now use transform1d or transform2d") # noinspection PyProtectedMember class _TransformSetup: """Internal class for use with OPU transform This class is short-lived for each transform""" def __init__(self, opu, opu_input: input.OpuInput, dmd_roi_: Optional[Roi], context: Context, stack: ExitStack): """Prepare for transform, see prepare function Stack is contextlib.ExitStack of the calling OPU object """ self.context = context self.manual_dmd_roi = dmd_roi_ self.input = opu_input self.opu = opu self.device = opu.device # Currently only Ajile is supported (column order) self.formatter = OpuFormatter(self.device.dmd_shape, column_order=True) # Whether input's n_feature matches the max number of features # If yes, no need to format self.input_matches_max = self.input.n_features_s == self.opu.max_n_features # Whether dmd roi should be computed automatically self.auto_dmd_roi = self.manual_dmd_roi is None \ and self.opu.dmd_strategy == types.DmdRoiStrategy.auto # Whether we should format the input before transform self.needs_fmt = not self.input_matches_max \ and not self.opu.is_simulated self._print = self.opu._print self.initial_check() self.prepare(stack) def initial_check(self): if self.input.n_features_s > self.opu.max_n_features: raise ValueError("input's number of features ({})" " can't be greater than {}" .format(self.input.n_features_s, self.opu.max_n_features)) # noinspection PyAttributeOutsideInit def prepare(self, stack: ExitStack): """Prepares for transform * reshape input if needed, * count number of ones * compute DMD roi * configure formatting * adjust exposure Stack is contextlib.ExitStack of the calling OPU object """ # X_ is a potentially reshaped X, if input is 3D # X will be reshaped back at context exit # reshape in all cases except when input matches dmd size (and no simulation) no_x_reshape = self.input_matches_max and not self.opu.is_simulated self.X = stack.enter_context(self.input.reshape_input(no_x_reshape)) self.input.binary_check() self.n_samples = self.X.shape[0] # compute dmd ROI and get format function if self.needs_fmt: n_ones = None # average number of ones in the input if self.auto_dmd_roi: # without automatic dmd roi, number of ones will be needed n_ones = self.opu._count_ones(self.X, self.input.packed) self._print("OPU: counted an average of" " {:.2f} ones in input of size {} ({:.2f}%)." .format(n_ones, self.input.n_features_s, 100 * n_ones / self.input.n_features_s)) # checked_ones says whether the input has sufficient ones ratio # can be None if not automatic dmd_roi checked_ones = self._dmd_fmt_func(n_ones) self.fmt_func = self.formatter.apply if self.auto_dmd_roi and checked_ones > 1: # check_ones > 1 means too much ones on the dmd # we have to lower exposure in order not to saturate excessively previous = self.device.exposure_us self.device.exposure_us /= checked_ones # restore exposure will be called at exit stack.callback(lambda: setattr(self.device, "exposure_us", previous)) else: self.context.fmt_factor = 0 self.context.fmt_type = types.FeaturesFormat.none self.context.dmd_roi_upper = [0, 0] self.context.dmd_roi_shape = self.device.dmd_shape self.context.n_ones = None # no need to format input, just setup the "formatting" function self.fmt_func = self.formatter.apply_plain def pre_print(self): self._print('OPU: random projections of an array of' ' size ({},{})'.format(*self.X.shape)) self._print("OPU: using frametime {} μs, exposure {} μs, " "camera ROI {}".format(self.device.frametime_us, self.device.exposure_us, self.device.cam_ROI)) def adjust_frametime(self, stack: ExitStack): """frametime can be lower camera readout in large ROI""" # Changing ROI can raise the minimum frame-time min_frametime_us = self.device.cam_readout_us + 10 if self.device.frametime_us < min_frametime_us: previous = self.device.frametime_us self.device.frametime_us = min_frametime_us # restore frametime will be called at exit stack.callback(lambda: setattr(self.device, "frametime_us", previous)) def _dmd_fmt_func(self, n_ones) -> float: """Compute dmd roi if necessary, and initialize formatting function Returns ------- checked_ones: float or None (only if in auto mode) * -1 if not enough ones * 0 if enough * ratio over the maximum allowed, if too much (will saturate) """ checked_ones = {} # Get appropriate dmd ROI This also does sanity checks for n_features offset, size = self.manual_dmd_roi or \ self.opu._dmd_roi.compute_roi(self.opu.dmd_strategy, self.input.n_features, n_ones, checked_ones) if self.opu.features_fmt == types.FeaturesFormat.auto: if self.input.is_2d_features: fmt_type = types.FeaturesFormat.macro_pixels else: fmt_type = types.FeaturesFormat.lined else: fmt_type = self.opu.features_fmt # configure formatting (computes the mapping btw features and dmd) self.formatter.configure(self.input.n_features, fmt_type, offset, size) factor = self.formatter.factor self._print("OPU: formatting {} with pixel size {}. DMD ROI: {}, {} " .format(fmt_type.name, factor, offset, size)) self.context.dmd_roi_upper = offset self.context.dmd_roi_shape = size self.context.n_ones = checked_ones.get("total") self.context.fmt_type = fmt_type self.context.fmt_factor = factor # checked_ones may be an empty dict, in this case return None return checked_ones.get("value") def indices(self) -> np.ndarray: # Divide indices in batches of n_samples_by_pass return self._get_batch_indices(self.n_samples, self.opu._n_samples_by_pass) @staticmethod def _get_batch_indices(total_size: int, slice_size: int) -> np.ndarray: """Given total_size, return an array with intermediate slices e.g. 55 with a slice size of 10 gives [0, 10, 20, 30, 40, 50, 55] """ (nb_runs, remainder) = divmod(total_size, slice_size) indices = np.arange(nb_runs + 1) * slice_size if remainder: indices = np.append(indices, total_size) return indices