Source code for openlabctrl.frame

from __future__ import annotations
import numpy as np
import copy
from .io.sync import TriggerSource


PREALLOCATION_BLOCK_LEN = 0x10000

[docs] class IoSyncFrame: """IO instruction container with independent time-base. At instantiation, an IO instance is created for each entry in ``device_type.IO_DICT`` and exposed as an attribute with the same name. The exact set of attributes therefore depends on the device type passed (e.g. see RP 125-14 Base :attr:`~openlabctrl.device.rp_125_14.Rp_125_14.IO_DICT`). :param device_type: Device class (not instance) :param trig: Trigger source to initiate frame execution. If ``None`` or ``TriggerSource.NONE``, frame is triggered immediately. """ def __init__(self, device_type, trig: TriggerSource | None = None): if trig is None: trig = TriggerSource.NONE if trig not in TriggerSource: raise Exception(f"Trigger source {trig} is not valid. Should be of type TriggerSource or None.") self._device_type = device_type self._trig = trig self._io_dict = {} self._idx = 0 for io_name in self._device_type.IO_DICT.keys(): io_class = self._device_type.IO_DICT[io_name]["class"] io_addr = self._device_type.IO_DICT[io_name]["addr"] clk_freq = self._device_type.CLK_FREQ self._io_dict[io_name] = io_class(addr=io_addr, clk_freq=clk_freq) self._set_sync() self._instr_list = np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64) self._preallocation_len = PREALLOCATION_BLOCK_LEN def __getattr__(self, name): io_dict = object.__getattribute__(self, "_io_dict") if name in io_dict: return io_dict[name] raise AttributeError(name)
[docs] def reset(self): """ Reset all IOs (instruction lists and time-base). """ for io in self._io_dict.values(): io.reset() self._set_sync() self._instr_list = np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64) self._preallocation_len = PREALLOCATION_BLOCK_LEN self._idx = 0
def _set_sync(self): self._io_dict["_sync"].trigger(src=self._trig) def _is_locked(self): return all(io._is_locked() for io in self._io_dict.values()) def _get_instruction_list(self): if self._is_locked(): return self._instr_list[:self._idx] else: #Retrieve instruction lists and last instruction index for each IO idx_max_dict = {} instr_list_dict = {} t_list_dict = {} for io_name in self._io_dict.keys(): instr_list, t_list = self._io_dict[io_name]._get_instruction_and_time_list() idx_max_dict[io_name] = len(instr_list) instr_list_dict[io_name] = instr_list t_list_dict[io_name] = t_list #Create dictionaries for iteration variables idx_dict = {} t_dict = {} for io_name in self._io_dict.keys(): idx_dict[io_name] = 0 t_dict[io_name] = 0 #Iterate over instructions for all IOs in chronological order t = 0 idx = 0 keep_iterating = True while keep_iterating: keep_iterating = False if idx >= self._preallocation_len: self._instr_list = np.concatenate((self._instr_list, np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64))) self._preallocation_len += PREALLOCATION_BLOCK_LEN for io_name in self._io_dict.keys(): io_idx = idx_dict[io_name] io_idx_max = idx_max_dict[io_name] if io_idx < io_idx_max: keep_iterating = True io_t = t_list_dict[io_name][io_idx] io_t_previous = t_list_dict[io_name][io_idx - 1] if (io_idx == 0) or (io_t_previous <= t): self._instr_list[idx] = instr_list_dict[io_name][io_idx] t_dict[io_name] = io_t t = max(t, io_t) idx_dict[io_name] += 1 idx += 1 self._idx = idx return self._instr_list[:self._idx]
[docs] def set_time(self, t : int): """ Set new time for all IOs of this frame, see :meth:`~openlabctrl.io.base.BaseIo.set_time`. :param t: time (in units of clk cycles). """ for io in self._io_dict.values(): io.set_time(t)
[docs] def set_time_increment(self, t_incr : int): """ Set new time increment for all IOs of this frame, see :meth:`~openlabctrl.io.base.BaseIo.set_time_increment`. """ for io in self._io_dict.values(): io.set_time_increment(t_incr)
[docs] def rsync(self): """ Time-aligns all IOs of this frame. The backend retrieves the current time of all IOs and sets them to the latest (largest) value. """ tmax = max(io.get_time() for io in self._io_dict.values()) self.set_time(tmax)
[docs] def delay(self, val : int): """ Delay all IOs of this frame, see :meth:`~openlabctrl.io.base.BaseIo.delay`. :param val: Delay time (in units of clk cycles). """ for io in self._io_dict.values(): io.delay(val)
def _get_acquisition_dict(self): acq_dict = {} for io_name in self._io_dict.keys(): if "scope" in io_name: acq_dict[io_name] = self._io_dict[io_name]._get_acquisition_dict() return acq_dict
[docs] class ParamIoSyncFrame(): """ Parameterized version of :class:`IoSyncFrame`, which builds upon a frame function and a frame parameter set. To reduce overall compilation time, parameterized IO sync frames are (re-)compiled only when changes in the frame function or the frame parameter set are detected. """ def __init__(self, device_type, trig: int | None = None): if trig is None: trig = TriggerSource.NONE if trig not in TriggerSource.__dict__.values(): raise Exception(f"Trigger source {trig} is not valid. Valid sources are: {list(TriggerSource.__dict__.values())}.") self._device_type = device_type self._trig = trig self._frame = IoSyncFrame(device_type=device_type, trig=trig) self._frame_args = () self._frame_kwargs = {} self._frame_args_last = () self._frame_kwargs_last = {} self._frame_func = None
[docs] def reset(self): """ Reset frame function and parameter set. """ self._frame.reset() self._frame_args = () self._frame_kwargs = {} self._frame_args_last = () self._frame_kwargs_last = {} self._frame_func = None
[docs] def set_frame_parameter(self, *args, **kwargs): """ Set the parameters passed to the frame function.. :param args: Positional arguments forwarded to the frame function. :param kwargs: Keyword arguments forwarded to the frame function. """ self._frame_args = args self._frame_kwargs = kwargs
[docs] def set_frame_function(self, func): """ Set the function that builds the IO sequence. The function must accept an :class:`IoSyncFrame` as its first argument followed by any positional and/or keyword arguments defined in :meth:`set_frame_parameter`. :param func: Callable with signature ``func(frame, *args, **kwargs)``. """ self._frame.reset() self._frame_func = func
def _dicts_equal(self, d1, d2): if d1.keys() != d2.keys(): return False for k in d1: v1, v2 = d1[k], d2[k] if isinstance(v1, np.ndarray) and isinstance(v2, np.ndarray): if not np.array_equal(v1, v2): return False elif isinstance(v1, dict) and isinstance(v2, dict): if not self._dicts_equal(v1, v2): return False elif isinstance(v1, tuple) and isinstance(v2, tuple): if not self._tuples_equal(v1, v2): return False else: if v1 != v2: return False return True def _tuples_equal(self, t1, t2): if len(t1) != len(t2): return False for v1, v2 in zip(t1, t2): if isinstance(v1, np.ndarray) and isinstance(v2, np.ndarray): if not np.array_equal(v1, v2): return False elif isinstance(v1, tuple) and isinstance(v2, tuple): if not self._tuples_equal(v1, v2): return False elif isinstance(v1, dict) and isinstance(v2, dict): if not self._dicts_equal(v1, v2): return False else: if v1 != v2: return False return True def _is_locked(self): return (self._frame._is_locked() and self._tuples_equal(self._frame_args, self._frame_args_last) and self._dicts_equal(self._frame_kwargs, self._frame_kwargs_last) and (self._frame_func is not None)) def _get_instruction_list(self): if not self._is_locked(): self._frame.reset() self._frame_func(self._frame, *copy.deepcopy(self._frame_args), **copy.deepcopy(self._frame_kwargs)) self._frame_args_last = copy.deepcopy(self._frame_args) self._frame_kwargs_last = copy.deepcopy(self._frame_kwargs) return self._frame._get_instruction_list() def _get_acquisition_dict(self): return self._frame._get_acquisition_dict()