Source code for lightonopu.opu

# Copyright (c) 2020 LightOn, All Rights Reserved.
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.

"""
This module contains the OPU class, the main class of the library
"""
import warnings
from typing import Optional, Union, Tuple
import numpy as np
import getpass
from contextlib import ExitStack, contextmanager
import attr

from lightonopu import output_roi, types, utils, config
from lightonopu.config import get_host_option
from lightonopu.user_input import OpuUserInput
from lightonopu.internal import opu_bookings
from lightonopu.simulated_device import SimulatedOpuDevice
from lightonopu.device import OpuDevice
from lightonopu.context import Context
from lightonopu.settings import OpuSettings, TransformSettings
from lightonopu.runner import TransformRunner

# noinspection PyPep8Naming,PyPackageRequirements
from lightonopu.types import Roi, InputRoiStrategy


# noinspection PyPep8Naming
[docs]class OPU: """Interface to the OPU. .. math:: \\mathbf{y} = \\lvert \\mathbf{R} \\mathbf{x} \\rvert^2 Main methods are :obj:`transform1d` and :obj:`transform2d`, and accept NumPy arrays or PyTorch tensors. Acquiring/releasing hardware device resources is done by open/close and a context-manager interface. Unless `open_at_init=False`, these resources are acquired automatically at init. If another process has not released the resources, an error will be raised, call `close()` or shutdown on the OPU object to release it. Parameters ---------- n_components : int, dimensionality of the target projection space. opu_device : OpuDevice or SimulatedOpuDevice, optional optical processing unit instance linked to a physical or simulated device. If not provided, a device is properly instantiated. If opu_device is of type SimulatedOpuDevice, the random matrix is generated at __init__, using max_n_features and n_components max_n_features: int, optional maximum number of binary features that the OPU will transform used only if opu_device is a SimulatedOpuDevice, in order to initiate the random matrix config_file : str, optional path to the configuration file (for dev purpose) config_override: dict, optional for override of the config_file (for dev purpose) verbose_level: int, optional 0, 1 or 2. 0 = no messages, 1 = most messages, and 2 = messages from OPU device (very verbose). input_roi_strategy: types.InputRoiStrategy, optional describes how to display the features on the input device @see types.InputRoiStrategy open_at_init: bool, optional forces the setting of acquiring hardware resource at init. If not provided, follow system's setting (usually True) Attributes ---------- n_components: int dimensionality of the target projection space. max_n_features: int maximum number of binary features that the OPU will transform writeable only if opu_device is a SimulatedOpuDevice, in order to initiate or resize the random matrix device: OpuDevice or SimulatedOpuDevice underlying hardware that performs transformation (read-only) input_roi_strategy: types.InputRoiStrategy, optional describes how to display the features on the input device verbose_level: int, optional 0, 1 or 2. 0 = no messages, 1 = most messages, and 2 = messages """ def __init__(self, n_components: int = 200000, opu_device: Optional[Union[OpuDevice, SimulatedOpuDevice]] = None, max_n_features: int = 1000, config_file: str = "", config_override: dict = None, verbose_level: int = 0, input_roi_strategy: types.InputRoiStrategy = types.InputRoiStrategy.auto, open_at_init: bool = None): self.__opu_config = None self.__config_file = config_file self.__config_override = config_override self._max_n_features = max_n_features # Get trace and print functions if verbose_level: warnings.warn("Verbose level arg will removed in 1.3, " "Use lightonopu.set_verbose_level instead", DeprecationWarning) from lightonopu import set_verbose_level, get_trace_fn, get_print_fn set_verbose_level(verbose_level) self._trace = get_trace_fn() self._print = get_print_fn() # Device init, or take the one passed as input if not opu_device: opu_type = self.config["type"] frametime = self.config["input"]["frametime_us"] exposure = self.config["output"]["exposure_us"] seq_nb_prelim = self.config.get("sequence_nb_prelim", 0) name = self.config["name"] self.device = OpuDevice(opu_type, frametime, exposure, seq_nb_prelim, None, verbose_level >= 2, name) else: if not isinstance(opu_device, (SimulatedOpuDevice, OpuDevice)): raise TypeError("opu_device must be of type {} or {}" .format(SimulatedOpuDevice.__qualname__, OpuDevice.__qualname__)) self.device = opu_device if self._s.simulated: # build the random matrix if not done already self._resize_rnd_matrix(max_n_features, n_components) self._output_roi = output_roi.OutputRoi(self.device.output_shape_max, self.device.output_roi_strategy, self._s.allowed_roi, self._s.min_n_components) # This also sets the output ROI self.n_components = n_components self.input_roi_strategy = input_roi_strategy self._projection_times = list() # Runner for online mode, initialized when entering online context self._online_runner = None # type: Optional[TransformRunner] self._trace("OPU initialized") # Open at init, unless relevant host.json option is False if open_at_init is None: open_at_init = get_host_option("lightonopu_open_at_init", True) if open_at_init: self.open()
[docs] def transform1d(self, X, packed: bool = False, input_roi_: Roi = None, context: Optional[Context] = Context(), raw_output_size: bool = False): """Performs the nonlinear random projections of one 1d input vector, or a batch of 1d input vectors. The input data can be bit-packed, where n_features = 8*X.shape[-1] Otherwise n_features = X.shape[-1] If tqdm module is available, it is used for progress display Parameters ---------- X: np.ndarray or torch.Tensor a 1d input vector, or batch of 1d input_vectors, binary encoded, packed or not batch can be 1d or 2d. In all cases output.shape[:-1] = X.shape[:-1] packed: bool, optional whether the input data is in bit-packed representation defaults to False input_roi_: if provided, as (offset, size), will override the computation of the input ROI (advanced parameter) context: Context, optional will be filled with information about transform @see lightonopu.context.Context raw_output_size: bool, optional If True, don't cut output size at n_components (advanced parameter) Returns ------- Y: np.ndarray or torch.Tensor complete array of nonlinear random projections of X, of size self.n_components """ return self.__transform(X, packed, is_2d_features=False, n_2d_features=None, input_roi_=input_roi_, context=context, raw_output_size=raw_output_size)
[docs] def transform2d(self, X, packed: bool = False, n_2d_features=None, input_roi_: Roi = None, context: Optional[Context] = Context(), raw_output_size: bool = False): """Performs the nonlinear random projections of one 2d input vector, or a batch of 2d input vectors. If tqdm module is available, it is used for progress display Parameters ---------- X: np.ndarray or torch.Tensor a 2d input vector, or batch of 2d input_vectors, binary encoded, packed or not packed: bool, optional whether the input data is in bit-packed representation if True, each input vector is assumed to be a 1d array, and the "real" number of features must be provided as n_2d_features defaults to False n_2d_features: list, tuple or np.ndarray of length 2 If the input is bit-packed, specifies the shape of each input vector. Not needed if the input isn't bit-packed. input_roi_: if provided, as (offset, size), will override the computation of the input ROI (advanced parameter) context: Context, optional will be filled with information about transform @see lightonopu.context.Context raw_output_size: bool, optional If True, don't cut output size at n_components (advanced parameter) Returns ------- Y: np.ndarray or torch.Tensor complete array of nonlinear random projections of X, of size self.n_components """ return self.__transform(X, packed, is_2d_features=True, n_2d_features=n_2d_features, input_roi_=input_roi_, context=context, raw_output_size=raw_output_size)
# noinspection PyUnresolvedReferences def __transform(self, X: np.ndarray, packed: bool, is_2d_features: bool, n_2d_features=None, input_roi_: Roi = None, context: Optional[Context] = Context(), raw_output_size=False): assert self.device.active, "OPU device isn't active, use opu.open() or \"with opu:\"" # If in online mode, just use the online runner to transform X if self._online_runner: assert X.shape == self._online_runner.n_features,\ "Input vector shape {} must match online shape {}"\ .format(X.shape, self._online_runner.n_features) return self._online_runner.transform_single(X, context) # Batch or single transform user_input = OpuUserInput(X, packed, is_2d_features, n_2d_features) tr_settings = TransformSettings(self.input_roi_strategy, input_roi_, self._n_components, raw_output_size) runner = TransformRunner(self._s, tr_settings, user_input, device=self.device) Y = runner.transform_vectors(context) user_input.reshape_output(Y) # if the input is a tensor, return a tensor in CPU memory if user_input.is_tensor: # noinspection PyPackageRequirements import torch return torch.from_numpy(Y) else: return Y def __enter__(self): """Context manager interface that acquires hardware resources used by the OPU device""" self.open() return self def __exit__(self, *args): self.close()
[docs] @contextmanager def online_transform(self, n_features: Union[int, Tuple[int]]): """Activates the "online mode", allowing faster transforms of single vectors""" stack = ExitStack() try: # device is acquiring with online=True stack.enter_context(self.device.acquiring(triggered=True, online=True)) t = TransformSettings(InputRoiStrategy.full, None, self._n_components) self._online_runner = TransformRunner(self._s, t, features_shape=n_features, device=self.device) yield finally: # Ends device online acquisition stack.close() self._online_runner = None
[docs] def open(self): """Acquires hardware resources used by the OPU device @see close() or use the context manager interface for closing at the end af an indent block """ if self.device.active: return # Check if user has booked the OPU # Rule is opu user doesn't need a time slot, so we just # check if it is booked by anyone if self._s.check_bookings and not self._s.simulated: if getpass.getuser() == "opu": if opu_bookings.opu_booked(): raise opu_bookings.BookingError("OPU currently booked") elif not opu_bookings.user_opu_booked(): opu_bookings.raise_user_error() self.device.open() self._trace("OPU opened")
[docs] def close(self): """Releases hardware resources used by the OPU device""" self.device.close() self._trace("OPU closed")
@property def config(self): """Returns the internal configuration object""" # Load it when asked first time if not self.__opu_config: self.__opu_config = config.load_config(self.__config_file, self._trace) if self.__config_override is not None: utils.recurse_update(self.__opu_config, self.__config_override) return self.__opu_config @property def max_n_components(self): return self._output_roi.max_components @property def n_components(self) -> int: return self._n_components @n_components.setter def n_components(self, value: int): self.device.output_roi = self._output_roi.compute_roi(value) if self._s.simulated: self._resize_rnd_matrix(self.max_n_features, value) # Ask opu device to reserve correct amount of n_samples and output ROI self.device.reserve(self._s.n_samples_by_pass) self._n_components = value @property def max_n_features(self) -> int: return self._max_n_features @max_n_features.setter def max_n_features(self, value: int): if not self._s.simulated: raise AttributeError("max_n_feature can't be set if device is real") self._resize_rnd_matrix(value, self._n_components) self._max_n_features = value @property def _s(self): """Returns immutable settings associated with the OPU Settings are immutable (attrs frozen), so generate it at each call. Performance impact is negligible""" # Get default value pass_default = attr.fields(OpuSettings).n_samples_by_pass.default if isinstance(self.device, SimulatedOpuDevice): return OpuSettings(max_n_features=self._max_n_features, n_samples_by_pass=pass_default, simulated=True, input_shape=self.device.input_shape, output_max_shape=self.device.output_shape_max, ) return OpuSettings( max_n_features=int(np.prod(self.device.input_shape)), # Check bookings only in production check_bookings=self.config.get("status") == "production", # Will use defaults of OpuSettings if not found n_samples_by_pass=self.config.get("n_samples_by_pass", pass_default), min_batch_size=self.config["input"].get("minimum_batch_size", 0), allowed_roi=self.config["output"].get("allowed_roi"), # min_n_components is linked to the minimum output size min_n_components=self.config["output"].get("minimum_output_size", 0), input_shape=self.device.input_shape, output_max_shape=self.device.output_shape_max, ones_range=self.config["ones_range"], n_tries=5, ) def _resize_rnd_matrix(self, n_features: int, n_components: int): """Resize device's random matrix""" rnd_mat = self.device.random_matrix if rnd_mat is None or rnd_mat.shape != (n_features, n_components): self._print("OPU: computing the random matrix... ", end='', flush=True) self.device.build_random_matrix(n_features, n_components) self._print("OK")
[docs] def version(self): """Returns a multi-line string containing name and versions of the OPU""" from lightonopu import __version__ as lgversion version = [] # Build OPU name opu_name = self.__opu_config['name'] opu_version = self.__opu_config['version'] opu_location = self.__opu_config['location'] version.append('OPU ' + opu_name+'-'+opu_version+'-'+opu_location) # module version version.append("lightonopu version " + lgversion) version.append(self.device.versions()) return '\n'.join(version)
def transform(self, *args, **kwargs): raise RuntimeError("Please now use transform1d or transform2d")