Source code for openlabctrl.device.rp_base

import time
from pathlib import Path
import numpy as np
from zynq_tcp_ctrl import ZynqTcpCtrlClient
from ..frame import IoSyncFrame, ParamIoSyncFrame
from ..dma.dma import DMA


[docs] class Rp_base(): """ Base class for Red Pitaya devices. :param ip: IP address or URL :param label: User-defined name :param daisy_0_en: Enable daisy chain sync connector 0 :param daisy_1_en: Enable daisy chain sync connector 1 :param force: Force bitstream reloading (resets FPGA configuration) """ def __init__(self, ip: str, label: str, daisy_0_en: bool, daisy_1_en: bool, force: bool): self._ip = ip self._label = label self._check_attribute("CLK_FREQ") self._check_attribute("IO_DICT") self._check_attribute("MMAP_DICT") self._check_attribute("ADDR_DICT") self._check_attribute("BITSTREAM") self._check_attribute("COMPATIBLE_DEVICES") self._tcp_ctrl_client = None self._init_tcp_ctrl_client() self._init_mmap() self._load_bitstream(force=force) self._frame_dict = {} self._ptr_dict = {} self._dma_dict = {} self._init_ptr() self._init_dma() self._init_daisy(daisy_0_en, daisy_1_en) self._stop() def _init_daisy(self, daisy_0_en, daisy_1_en): self._tcp_ctrl_client.write(addr=self.ADDR_DICT["reg_bank_daisy_0_sel"], val=not(daisy_0_en)) self._tcp_ctrl_client.write(addr=self.ADDR_DICT["reg_bank_daisy_1_sel"], val=not(daisy_1_en)) def _init_mmap(self): self._mmap_dict = {} for mmap_name in self.MMAP_DICT.keys(): addr = self.MMAP_DICT[mmap_name]["addr"] size = self.MMAP_DICT[mmap_name]["size"] self._tcp_ctrl_client.add_mmap_region(addr=addr, size=size) def _init_tcp_ctrl_client(self): self._tcp_ctrl_client = ZynqTcpCtrlClient(self._ip) def _init_ptr(self): self._ptr_dict["instr"] = self.MMAP_DICT["mem_instr"]["addr"] for io_name in self.IO_DICT.keys(): if "scope" in io_name: self._ptr_dict[io_name] = self.MMAP_DICT[f"mem_{io_name}"]["addr"] def _init_dma(self): DMA_MARKER = "dma_" for addr_name in self.ADDR_DICT.keys(): if addr_name.startswith(DMA_MARKER) and ("instr" in addr_name or "scope" in addr_name): key = addr_name[len(DMA_MARKER):] self._dma_dict[key] = DMA(addr=self.ADDR_DICT[f"dma_{key}"], tcp_ctrl_client=self._tcp_ctrl_client) def _check_attribute(self, attr): if not hasattr(self, attr): raise Exception(f"Attribute {attr} not found. Please define {attr} in the device subclass.") def _load_bitstream(self, force=False): path = Path(__file__).parent.parent / self.BITSTREAM self._tcp_ctrl_client.load_bitstream(path=str(path), force=force) def _reset(self): self._stop() self._frame_dict = {} self._init_ptr() def _add_frame(self, frame, label=None): if (type(frame) is not IoSyncFrame) and (type(frame) is not ParamIoSyncFrame): raise Exception(f"Frame must be of type IoSyncFrame or ParamIoSyncFrame. Got {type(frame)}.") if label is None: label = f"frame_{len(self._frame_dict)}" if label in self._frame_dict: raise Exception(f"Frame label {label} already exists in device {self.get_uid()}. Please provide a unique label for each frame.") self._frame_dict[label] = frame def _upload(self, ): frame_dict = self._frame_dict self._reset() for label in frame_dict.keys(): frame = frame_dict[label] instr_list = frame._get_instruction_list() #Check instruction memory size instr_list_bytes = instr_list.tobytes() instr_list_size = len(instr_list_bytes) if self._ptr_dict["instr"] + instr_list_size > self.MMAP_DICT["mem_instr"]["addr"] + self.MMAP_DICT["mem_instr"]["size"]: raise Exception(f"Frame {label} ({self.get_uid()}) exceeds available instruction memory ({self.MMAP_DICT['mem_instr']['size']} bytes).") #Check scope memory size acq_dict = frame._get_acquisition_dict() for scope_label in acq_dict.keys(): for acq_label in acq_dict[scope_label].keys(): acq_samples = acq_dict[scope_label][acq_label]["samples"] acq_size = acq_samples * np.int16().nbytes if (self._ptr_dict[scope_label] + acq_size) > (self.MMAP_DICT[f"mem_{scope_label}"]["addr"] + self.MMAP_DICT[f"mem_{scope_label}"]["size"]): raise Exception(f"Frame {label} ({self.get_uid()}) exceeds available {scope_label} memory ({self.MMAP_DICT[f'mem_{scope_label}']['size']} bytes).") #Transfer frame instruction list to device self._tcp_ctrl_client.write(addr=self._ptr_dict["instr"], val=instr_list_bytes) #Append frame metadata self._frame_dict[label] = frame self._ptr_dict["instr"] += instr_list_size for scope_label in acq_dict.keys(): for acq_label in acq_dict[scope_label].keys(): acq_dict[scope_label][acq_label]["addr"] = self._ptr_dict[scope_label] self._ptr_dict[scope_label] += acq_dict[scope_label][acq_label]["samples"] * np.int16().nbytes def _parse_err(self, err_code): io_name_list_with_err = [] for io_name in self.IO_DICT.keys(): io_addr = self.IO_DICT[io_name]["addr"] if err_code == (1 << io_addr): io_name_list_with_err.append(io_name) if len(io_name_list_with_err) == 0: return None else: return f"Error in IO(s): {', '.join(io_name_list_with_err)}" def _get_status(self): en = bool(self._tcp_ctrl_client.read(addr=self.ADDR_DICT["reg_bank_en"])) sync_counter = self._tcp_ctrl_client.read(addr=self.ADDR_DICT["reg_bank_sync_counter"]) err_code = self._tcp_ctrl_client.read(addr=self.ADDR_DICT["reg_bank_err"]) done_code = self._tcp_ctrl_client.read(addr=self.ADDR_DICT["reg_bank_done"]) frame_label_list = list(self._frame_dict.keys()) frame_label = frame_label_list[sync_counter - 1] if sync_counter > 0 else None io_status = {} for io_name in self.IO_DICT.keys(): if io_name[0] != "_": io_status[io_name] = {} io_status[io_name]["error"] = bool(err_code & (1 << self.IO_DICT[io_name]["addr"])) io_status[io_name]["done"] = bool(done_code & (1 << self.IO_DICT[io_name]["addr"])) err = any(io_status[io_name]["error"] for io_name in io_status.keys()) if len(frame_label_list) == 0: done = False else: done = ((frame_label == frame_label_list[-1]) and all(io_status[io_name]["done"] for io_name in io_status.keys()) and not any(io_status[io_name]["error"] for io_name in io_status.keys())) status = { "enabled": en, "done": done, "error": err, "current_frame": frame_label, "io": io_status } return status def _start(self): if len(self._frame_dict) == 0: raise Exception("No frames uploaded. Please add at least one frame before starting the device.") pass status = self._get_status() if status["enabled"] and not status["done"]: raise Exception("Device is already running. Please stop the device before starting it again.") #Start dma for key in self._dma_dict.keys(): if "scope" in key: dma_ch = self._dma_dict[key].recvchannel elif "instr" in key: dma_ch = self._dma_dict[key].sendchannel nbytes = self._ptr_dict[key] - self.MMAP_DICT[f"mem_{key}"]["addr"] if nbytes > 0: dma_ch.transfer(addr_start=self.MMAP_DICT[f"mem_{key}"]["addr"], nbytes=nbytes) #Set stream ctrl of scopes for key in self._dma_dict.keys(): if "scope" in key: nbytes = self._ptr_dict[key] - self.MMAP_DICT[f"mem_{key}"]["addr"] self._tcp_ctrl_client.write(addr=self.ADDR_DICT[f"reg_bank_{key}_samples"], val=nbytes//2) #Assert enable bit self._tcp_ctrl_client.write(addr=self.ADDR_DICT["reg_bank_en"], val=0x1) def _stop(self): #Reset DMA for key in self._dma_dict.keys(): if "scope" in key: dma_ch = self._dma_dict[key].recvchannel elif "instr" in key: dma_ch = self._dma_dict[key].sendchannel dma_ch.stop() dma_ch.start() #Deassert enable bit self._tcp_ctrl_client.write(addr=self.ADDR_DICT["reg_bank_en"], val=0x0) #Flush fifos self._tcp_ctrl_client.write(addr=self.ADDR_DICT["reg_bank_flush"], val=0x1) time.sleep(1e-3) self._tcp_ctrl_client.write(addr=self.ADDR_DICT["reg_bank_flush"], val=0x0) return
[docs] def get_uid(self) -> str: """ Return a unique device identifier of the form ``<label>@<ip>``, e.g. ``my_rp@192.168.1.100``. """ uid = f"{self._label}@{self._ip}" return uid
def _get_scope(self): scope_dict = {} for frame_label in self._frame_dict.keys(): scope_dict[frame_label] = {} frame = self._frame_dict[frame_label] acq_dict = frame._get_acquisition_dict() for scope_label in acq_dict.keys(): scope_dict[frame_label][scope_label] = {} for acq_label in acq_dict[scope_label].keys(): addr = acq_dict[scope_label][acq_label]["addr"] samples = acq_dict[scope_label][acq_label]["samples"] t = acq_dict[scope_label][acq_label]["t"] src = acq_dict[scope_label][acq_label]["src"] dec = acq_dict[scope_label][acq_label]["dec"] data = self._tcp_ctrl_client.read(addr=addr, size=(samples), dtype=np.int16) scope_dict[frame_label] [scope_label][acq_label] = {"t": t, "src": src, "dec": dec, "samples": samples, "data": data} return scope_dict