from abc import ABC, abstractmethod import numpy as np from OhmPi.logging_setup import create_stdout_logger import time class ControllerAbstract(ABC): def __init__(self, **kwargs): self.board_name = kwargs.pop('board_name', 'unknown Controller hardware') self.bus = None self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_ctl') self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_ctl') self.exec_logger.debug(f'{self.board_name} Controller initialization') self._cpu_temp_available = False self.max_cpu_temp = np.inf @property def cpu_temperature(self): if not self._cpu_temp_available: self.exec_logger.warning(f'CPU temperature reading is not available for {self.board_name}') cpu_temp = np.nan else: cpu_temp = self._get_cpu_temp() if cpu_temp > self.max_cpu_temp: self.soh_logger.warning(f'CPU temperature of {self.board_name} is over the limit!') return cpu_temp @abstractmethod def _get_cpu_temp(self): pass class MuxAbstract(ABC): def __init__(self, **kwargs): self.board_name = kwargs.pop('board_name', 'unknown MUX hardware') # TODO: introduce MUX boards that take part to a MUX system (could be the same for RX boards that take part to an RX system (e.g. different channels) self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_mux') self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_mux') self.exec_logger.debug(f'{self.board_name} MUX initialization') self.controller = kwargs.pop('controller', None) @abstractmethod def reset(self): pass def switch(self, elec_dict=None, state='on'): """Switch a given list of electrodes with different roles. Electrodes with a value of 0 will be ignored. Parameters ---------- elec_dict : dictionary, optional Dictionary of the form: {role: [list of electrodes]}. state : str, optional Either 'on' or 'off'. """ if elec_dict is not None: self.exec_logger.debug(f'Switching {self.board_name} ') # check to prevent A == B (SHORT-CIRCUIT) if 'A' in elec_dict.keys() and 'B' in elec_dict.keys(): out = np.in1d(elec_dict['A'], elec_dict['B']) if out.any() and state=='on': self.exec_logger.error('Trying to switch on some electrodes with both A and B roles. ' 'This would create a short-circuit! Switching aborted.') return # check that none of M or N are the same as A or B # as to prevent burning the MN part which cannot take # the full voltage of the DPS if 'A' in elec_dict.keys() and 'B' in elec_dict.keys() and 'M' in elec_dict.keys() and 'N' in elec_dict.keys(): if (np.in1d(elec_dict['M'], elec_dict['A']).any() or np.in1d(elec_dict['M'], elec_dict['B']).any() or np.in1d(elec_dict['N'], elec_dict['A']).any() or np.in1d(elec_dict['N'], elec_dict['B']).any()) and state=='on': self.exec_logger.error('Trying to switch on some electrodes with both M or N roles and A or B roles.' 'This would create an over-voltage in the RX! Switching aborted.') return # if all ok, then switch the electrodes for role in elec_dict: for elec in elec_dict[role]: if elec > 0: self.switch_one(elec, role, state) else: self.exec_logger.warning(f'Missing argument for {self.board_name}.switch: elec_dict is None.') @abstractmethod def switch_one(self, elec, role, state): pass def test(self, elec_dict, activation_time=1.): """Method to test the multiplexer. Parameters ---------- elec_dict : dictionary, optional Dictionary of the form: {role: [list of electrodes]}. activation_time : float, optional Time in seconds during which the relays are activated. """ self.exec_logger.debug(f'Starting {self.board_name} test...') self.reset() for role in elec_dict.keys(): for elec in elec_dict['role']: self.switch_one(elec, role, 'on') self.exec_logger.debug(f'electrode: {elec} activated.') time.sleep(activation_time) self.switch_one(elec, role, 'off') self.exec_logger.debug(f'electrode: {elec} deactivated.') time.sleep(activation_time) self.exec_logger.debug('Test finished.') class TxAbstract(ABC): def __init__(self, **kwargs): self.board_name = kwargs.pop('board_name', 'unknown TX hardware') polarity = kwargs.pop('polarity', 1) if polarity is None: polarity = 0 self._polarity = polarity inj_time = kwargs.pop('inj_time', 1.) self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_tx') self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_tx') self.controller = kwargs.pop('controller', None) self._inj_time = None self._dps_state = 'off' self._adc_gain = 1. self.inj_time = inj_time self.exec_logger.debug(f'{self.board_name} TX initialization') @property def adc_gain(self): return self._adc_gain @adc_gain.setter def adc_gain(self, value): self._adc_gain = value self.exec_logger.debug(f'Setting TX ADC gain to {value}') @abstractmethod def adc_gain_auto(self): pass @property @abstractmethod def current(self): # add actions to read the TX current and return it return None @current.setter @abstractmethod def current(self, value, **kwargs): # add actions to set the DPS current pass @abstractmethod def current_pulse(self, **kwargs): pass @abstractmethod def inject(self, state='on'): assert state in ['on', 'off'] @property def inj_time(self): return self._inj_time @inj_time.setter def inj_time(self, value): assert isinstance(value, float) self._inj_time = value @property def polarity(self): return self._polarity @polarity.setter def polarity(self, value): assert value in [-1,0,1] self._polarity = value # add actions to set the polarity (switch relays) def turn_off(self): self.exec_logger.debug(f'Switching DPS off') self._dps_state = 'off' def turn_on(self): self.exec_logger.debug(f'Switching DPS on') self._dps_state = 'on' @property @abstractmethod def voltage(self): # add actions to read the DPS voltage and return it return None @voltage.setter @abstractmethod def voltage(self, value, **kwargs): # add actions to set the DPS voltage pass @property @abstractmethod def tx_bat(self): pass def voltage_pulse(self, voltage=0., length=None, polarity=None): """ Generates a square voltage pulse Parameters ---------- voltage: float, optional Voltage to apply in volts, tx_v_def is applied if omitted. length: float, optional Length of the pulse in seconds polarity: 1,0,-1 Polarity of the pulse """ if length is None: length = self.inj_time if polarity is None: polarity = self.polarity self.polarity = polarity self.voltage = voltage self.exec_logger.debug(f'Voltage pulse of {polarity * voltage:.3f} V for {length:.3f} s') self.inject(state='on') time.sleep(length) # self.tx_sync.clear() self.inject(state='off') class RxAbstract(ABC): def __init__(self, **kwargs): self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_rx') self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_rx') self.controller = kwargs.pop('controller', None) self.board_name = kwargs.pop('board_name', 'unknown RX hardware') self._sampling_rate = kwargs.pop('sampling_rate', 1) self.exec_logger.debug(f'{self.board_name} RX initialization') self._adc_gain = 1. self._max_sampling_rate = np.inf @property def adc_gain(self): return self._adc_gain @adc_gain.setter def adc_gain(self, value): self._adc_gain = value self.exec_logger.debug(f'Setting RX ADC gain to {value}') @abstractmethod def adc_gain_auto(self): pass @property def sampling_rate(self): return self._sampling_rate @sampling_rate.setter def sampling_rate(self, value): assert value > 0. if value > self._max_sampling_rate: self.exec_logger.warning(f'{self} maximum sampling rate is {self._max_sampling_rate}. ' f'Setting sampling rate to the highest allowed value.') value = self._max_sampling_rate self._sampling_rate = value self.exec_logger.debug(f'Sampling rate set to {value}') @property @abstractmethod def voltage(self): """ Gets the voltage VMN in Volts """ pass