Source code for openlabctrl.io.base
from enum import Enum
import numpy as np
class BaseIoCmd(Enum):
NOP = 0xF
DONE = 0xE
PREALLOCATION_BLOCK_LEN = 0x1000
[docs]
class BaseIo():
_requires_done_instruction = True #Whether the IO requires a DONE instruction at the end of the instruction list
_t_overflow = (1 << 24) - 32 #Number of clk cycles after which time value overflows (set to 2^24 - margin for latency compensations)
def __init__(self, addr, clk_freq):
self._addr = addr
self._clk_freq = clk_freq
self._tlast = 0
self._tnext = 0
self._tincr = 1
self._t_list = np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64)
self._instr_list = np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64)
self._idx = 0
self._preallocation_len = PREALLOCATION_BLOCK_LEN
self._locked = False
[docs]
def reset(self):
"""
Reset IO instruction cache and time base.
"""
self._tlast = 0
self._tnext = 0
self._tincr = 1
self._t_list = np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64)
self._instr_list = np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64)
self._idx = 0
self._preallocation_len = PREALLOCATION_BLOCK_LEN
self._locked = False
def _add_instruction(self, cmd: int, data: int, duration: int = 1):
#Check if instruction list is locked
if self._locked:
raise Exception("Cannot add instruction because the instruction list is locked. Please reset instruction list.")
#Map to uint64
cmd = np.uint64(cmd)
data = np.uint64(data)
duration = np.uint64(duration)
#Assign time for instruction
t = self._tnext
#Insert NOP instructions every t_overflow clk cycles (if needed)
while (t - self._tlast) > self._t_overflow:
self._tnext = self._tlast + self._t_overflow
self._add_instruction(cmd=BaseIoCmd.NOP.value, data=0, duration=self._t_overflow)
#Preallocate more space if needed
if self._idx >= self._preallocation_len:
self._instr_list = np.concatenate((self._instr_list, np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64)))
self._t_list = np.concatenate((self._t_list, np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint64)))
self._mask_list = np.concatenate((self._mask_list, np.zeros(PREALLOCATION_BLOCK_LEN, dtype=np.uint32)))
self._preallocation_len += PREALLOCATION_BLOCK_LEN
#Create new instruction
#63 - 60 | 59 - 56 | 55 - 32 | 31 - 0
#Addr | Cmd | Time | Data
instr = ((self._addr << 60) | (cmd << 56) | ((t & 0xffffff) << 32) | data)
#Add instruction to list
self._instr_list[self._idx] = instr
self._t_list[self._idx] = t
self._idx += 1
self._tlast = t
self._tnext = t + max(duration, self._tincr)
return
def _add_done_instruction(self):
self._add_instruction(cmd=BaseIoCmd.DONE.value, data=0)
self._locked = True
def _get_instruction_and_time_list(self):
if not self._locked:
if self._requires_done_instruction:
self._add_done_instruction()
instr_list = self._instr_list[:self._idx]
instr_list = self._instr_list[:self._idx]
t_list = self._t_list[:self._idx]
return instr_list, t_list
[docs]
def get_time(self) -> int:
"""
Retrieve current IO time (in units of CLK cycles).
"""
return self._tnext
[docs]
def set_time(self, val: int):
"""
Set new IO time (should be larger or equal than current IO time).
:param val: time (in units of clk cycles).
"""
if val < self._tnext:
raise Exception(f"Cannot set time to {val} because it is smaller than current time {self._tnext}.")
self._tnext = np.uint64(val)
[docs]
def set_time_increment(self, val: int):
"""
Set IO instruction time increment, i.e. time between consecutive IO instructions. Default time increment after reset is 1 clk cycle.
:param val: time increment (in units of clk cycles).
"""
if val <= 0:
raise Exception(f"Time increment must be a positive integer. Got {val} instead.")
self._tincr = np.uint64(val)
[docs]
def get_time_increment(self) -> int:
"""
Retrieve current IO time increment (in units of clk cycles).
"""
return self._tincr
[docs]
def delay(self, val: int):
"""
Increment current IO time by a specified delay time. Used to retard following IO instruction.
:param val: Delay time (in units of clk cycles).
"""
t = self.get_time()
self.set_time(t + val)
def _is_locked(self):
return self._locked