Source code for lightonopu.device

from builtins import object, TypeError
from contextlib import contextmanager
from typing import List

from lightonopu.types import CamRoiStrategy, Roi

# Default values for hardware parameters
default_frametime_us = 500
default_exposure_us = 400
default_gain_dB = 0.

# for constant. checked for truth when hardware is opened
_ajile_dmd_shape = [912, 1140]
_ximea_cam_shape_max = [1920, 1080]
_basler_cam_shape_max = [2040, 1088]


# noinspection PyPep8Naming
[docs]class OpuDevice(object): """ Class for hardware interface with a LightOn OPU. Implements a context manager interface for acquiring access to the OPU, but most properties are gettable and settable even though the OPU isn't active. """ def __init__(self, opu_type: str, frametime_us: int, exposure_us: int, sequence_nb_prelim=0, cam_ROI: Roi = None, verbose=False): if opu_type == "ajxi": from . import ajxi_pybind self.__opu = ajxi_pybind.OPU() self._cam_shape_max = _ximea_cam_shape_max # With Ximea we just get the ROI in the middle self._cam_strategy = CamRoiStrategy.mid_square self._cam_roi_increment = 8 elif opu_type == "ajba": from . import ajba_pybind self.__opu = ajba_pybind.OPU() self._cam_shape_max = _basler_cam_shape_max # With Basler we get max width in the middle self._cam_strategy = CamRoiStrategy.mid_width self._cam_roi_increment = 1 else: raise TypeError("Don't know this OPU type: " + opu_type) self.opu_type = opu_type # "off" fields allow to know what to send at resource acquisition # force to int if input is e.g. a float self._frametime_us_off = int(frametime_us) self._exposure_us_off = int(exposure_us) self._gain_dB_off = default_gain_dB self._cam_ROI_off = cam_ROI self._reserved_off = 0 self._sequence_nb_prelim = sequence_nb_prelim self.verbose = verbose # forward opu interface to class self.transform1 = self.__opu.transform1 self.transform2 = self.__opu.transform2 self.transform_single = self.__opu.transform_single self.acquire_seq = self.__opu.acquire_seq self.fill_dmd = self.__opu.fill_dmd
[docs] def open(self): """ Acquires hardware resource. Do nothing if the resource is already acquired in the current object. If the resource isn't available, do 4 retries until it is available, or raise a OPUUsedByOther exception Equivalent is to use context manager interface:: with opu: outs = opu.transform(ins) Raises ------ self.OPUUsedByOther if hardware has already been acquired in another process or object """ if self.__opu.active: return if self.verbose: print("Opening OPU... ", end='', flush=True) # noinspection PyPep8 try: self.__opu.open(self._frametime_us_off, self._exposure_us_off, self._sequence_nb_prelim, self.verbose) # check that dmd constants is conform to what the hardware tells if self.dmd_shape != self.__opu.dmd_shape: raise Exception("Incorrect values for internal DMD shape") if self.cam_shape_max != self.__opu.cam_shape_max: raise Exception("Incorrect values for internal max camera shape") if self.input_size != self.__opu.input_size: raise Exception("Incorrect values for internal input size") if default_gain_dB != self.__opu.gain_dB: raise Exception("Incorrect defaults for camera gain") # sets hardware parameter back self.gain_dB = self._gain_dB_off if self._cam_ROI_off is not None: self.cam_ROI = self._cam_ROI_off if self._reserved_off != 0: self.reserve(self._reserved_off) if self.verbose: print("OK") except: # Cleanup if an exception was thrown self.__opu.close() raise
def __enter__(self): self.open() return self @contextmanager def acquiring(self, triggered=True): try: self.__opu.start_acq(triggered) yield finally: self.__opu.stop_acq() @property def active(self): """bool, whether the hardware resources have been acquired""" return self.__opu.active @property def dmd_shape(self) -> List[int]: """list(int), Shape of the DMD, in pixels and cartesian coordinates """ return _ajile_dmd_shape @property def cam_shape_max(self) -> List[int]: """list(int): Shape of the whole camera sensor, in pixels and cartesian coordinates""" return self._cam_shape_max @property def cam_roi_strategy(self) -> CamRoiStrategy: return self._cam_strategy @property def cam_roi_increment(self) -> int: return self._cam_roi_increment @property def nb_features(self) -> int: """int: Total number of features supported by the OPU""" return self.dmd_shape[0] * self.dmd_shape[1] @property def input_size(self) -> int: """int: Input size of the DMD, in bytes""" return self.nb_features // 8 @property def exposure_us(self) -> int: """int: exposure of the camera, in microseconds""" return self.__opu.exposure_us if self.active else self._exposure_us_off @property def cam_readout_us(self) -> int: """int: time given by camera for one image readout, in microseconds""" if not self.active: raise RuntimeError("The Opu must be active to get this value") return self.__opu.cam_readout_us @property def frametime_us(self) -> int: """int: time for which each DMD frame is displayed, in microseconds""" return self.__opu.frametime_us if self.active else self._frametime_us_off @frametime_us.setter def frametime_us(self, value): if self.active: self.__opu.frametime_us = int(value) else: self._frametime_us_off = int(value) @exposure_us.setter def exposure_us(self, value): if self.active: self.__opu.exposure_us = int(value) else: self._exposure_us_off = int(value)
[docs] def reserve(self, n_images): """Does internal allocation of a number of images, necessary for transform2 calls""" if self.active: self.__opu.reserve(int(n_images)) else: self._reserved_off = int(n_images)
@property def cam_shape(self) -> List[int]: """ list(int): Shape of the current camera ROI, in pixels and cartesian coordinates """ return self.cam_ROI[1] @property def cam_ROI(self) -> Roi: """ tuple(list(int)): offset and size of the current camera ROI""" return self.__opu.cam_ROI if self.active else self._cam_ROI_off @cam_ROI.setter def cam_ROI(self, value): if self.active: self.__opu.cam_ROI = value else: self._cam_ROI_off = value @property def gain_dB(self) -> float: """ Gain of the camera output (not implemented in every camera)""" return self.__opu.gain_dB if self.active else self._gain_dB_off @gain_dB.setter def gain_dB(self, value): if self.active: self.__opu.gain_dB = value else: self._gain_dB_off = value def __exit__(self, *args): self.close() def close(self): # grab hardware values before closing if self.active: self._frametime_us_off = self.__opu.frametime_us self._exposure_us_off = self.__opu.exposure_us self._gain_dB_off = self.__opu.gain_dB self._cam_ROI_off = self.__opu.cam_ROI self.__opu.close()
[docs] def versions(self): """Returns multi-line string with device and libraries versions""" version = [] # device version needs an active OPU if self.active: version.append(self.__opu.device_versions()) # this is static so works even if non active version.append(self.__opu.library_versions()) return '\n'.join(version)
def __deepcopy__(self, memo): """ Deep copy of an object Can't make a deep copy of an OPU that represents a hardware resource, However sklearn needs to clone it as an estimator """ return self def __getstate__(self): """Closes and return current state""" state = {"active": self.active, "opu_type": self.opu_type, "frametime_us": self.frametime_us, "exposure_us": self.exposure_us, "gain_dB": self.gain_dB, "cam_ROI": self.cam_ROI, "reserved": self._reserved_off, "verbose": self.verbose} self.close() return state def __setstate__(self, state): """Restore object with given state""" self.__init__(state["opu_type"], state["frametime_us"], state["exposure_us"], state["verbose"]) self.cam_ROI = state["cam_ROI"] self.gain_dB = state["gain_dB"] self._reserved_off = state["reserved"] # If state was active then open OPU if state["active"]: self.open() def __str__(self): active = "Active" if self.active else "Inactive" return '{} OPU, frametime {} μs, exposure {} μs, camera ROI {}'.format( active, self.frametime_us, self.exposure_us, self.cam_ROI)