From 3e0f11e8d7cce333c5165b9628f2bb0931736f76 Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Wed, 21 Jun 2023 11:31:22 +0200 Subject: [PATCH] Fixes rs-check adding a bypass option to switching tests --- .../abstract_hardware_components.py | 9 +++++++-- ohmpi/hardware_components/ohmpi_card_3_15.py | 1 + ohmpi/hardware_system.py | 6 +++--- ohmpi/ohmpi.py | 10 ++++++++-- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/ohmpi/hardware_components/abstract_hardware_components.py b/ohmpi/hardware_components/abstract_hardware_components.py index 70bbfc1f..2f788aad 100644 --- a/ohmpi/hardware_components/abstract_hardware_components.py +++ b/ohmpi/hardware_components/abstract_hardware_components.py @@ -139,7 +139,7 @@ class MuxAbstract(ABC): def reset(self): pass - def switch(self, elec_dict=None, state='off'): # TODO: generalize for other roles + def switch(self, elec_dict=None, state='off', bypass_check=False): # TODO: generalize for other roles """Switch a given list of electrodes with different roles. Electrodes with a value of 0 will be ignored. @@ -149,6 +149,8 @@ class MuxAbstract(ABC): Dictionary of the form: {role: [list of electrodes]}. state : str, optional Either 'on' or 'off'. + bypass_check: bool, optional + Bypasses checks for A==M or A==M or B==M or B==N (i.e. used for rs-check) """ status = True if elec_dict is not None: @@ -166,7 +168,7 @@ class MuxAbstract(ABC): # as to prevent burning the MN part which cannot take # the full voltage of the DPS if 'A' in elec_dict.keys() and 'B' in elec_dict.keys() and 'M' in elec_dict.keys() and 'N' in elec_dict.keys(): - if (np.in1d(elec_dict['M'], elec_dict['A']).any() # noqa + if not bypass_check and (np.in1d(elec_dict['M'], elec_dict['A']).any() # noqa or np.in1d(elec_dict['M'], elec_dict['B']).any() # noqa or np.in1d(elec_dict['N'], elec_dict['A']).any() # noqa or np.in1d(elec_dict['N'], elec_dict['B']).any()) and state=='on': # noqa @@ -174,6 +176,8 @@ class MuxAbstract(ABC): 'This could create an over-voltage in the RX! Switching aborted.') status = False return status + elif bypass_check: + self.exec_logger.debug('Bypassing switching check') # if all ok, then wait for the barrier to open, then switch the electrodes self.exec_logger.debug(f'{self.board_id} waiting to switch.') @@ -328,6 +332,7 @@ class RxAbstract(ABC): self.board_name = kwargs.pop('board_name', 'unknown RX hardware') self._sampling_rate = kwargs.pop('sampling_rate', 1) # ms self.exec_logger.debug(f'{self.board_name} RX initialization') + self._voltage_max = kwargs.pop('voltage_max', 0.) self._adc_gain = 1. self._max_sampling_rate = np.inf self._bias = 0. diff --git a/ohmpi/hardware_components/ohmpi_card_3_15.py b/ohmpi/hardware_components/ohmpi_card_3_15.py index 5043eb7e..b12e26e1 100644 --- a/ohmpi/hardware_components/ohmpi_card_3_15.py +++ b/ohmpi/hardware_components/ohmpi_card_3_15.py @@ -225,6 +225,7 @@ class Rx(RxAbstract): address=self._ads_voltage_address) self._ads_voltage.mode = Mode.CONTINUOUS self._coef_p2 = kwargs.pop('coef_p2', RX_CONFIG['coef_p2']) + self._voltage_max = kwargs.pop('voltage_max', RX_CONFIG['voltage_max']) self._sampling_rate = kwargs.pop('sampling_rate', sampling_rate) self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}') diff --git a/ohmpi/hardware_system.py b/ohmpi/hardware_system.py index 79c42a18..e8ec50ab 100644 --- a/ohmpi/hardware_system.py +++ b/ohmpi/hardware_system.py @@ -370,7 +370,7 @@ class OhmPiHardware: self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True) - def switch_mux(self, electrodes, roles=None, state='off'): + def switch_mux(self, electrodes, roles=None, state='off', **kwargs): """Switches on multiplexer relays for given quadrupole. Parameters @@ -407,8 +407,8 @@ class OhmPiHardware: for idx, mux in enumerate(mux_workers): # Create a new thread to perform some work self.mux_boards[mux].barrier = b - mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs={'elec_dict': elec_dict, - 'state': state}) + kwargs.update({'elec_dict': elec_dict, 'state': state}) + mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs=kwargs) mux_workers[idx].start() self.mux_barrier.wait() for mux_worker in mux_workers: diff --git a/ohmpi/ohmpi.py b/ohmpi/ohmpi.py index b89b4ea3..8dce80b4 100644 --- a/ohmpi/ohmpi.py +++ b/ohmpi/ohmpi.py @@ -754,7 +754,7 @@ class OhmPi(object): self.exec_logger.warning(f'Unable to set sequence: {e}') status = False - def switch_mux_on(self, quadrupole, cmd_id=None): + def switch_mux_on(self, quadrupole, bypass_check=False, cmd_id=None): """Switches on multiplexer relays for given quadrupole. Parameters @@ -763,9 +763,15 @@ class OhmPi(object): Unique command identifier quadrupole : list of 4 int List of 4 integers representing the electrode numbers. + bypass_check: bool, optional + Bypasses checks for A==M or A==M or B==M or B==N (i.e. used for rs-check) """ assert len(quadrupole) == 4 - return self._hw.switch_mux(electrodes=quadrupole, state='on') + if self._hw.tx.voltage > self._hw.rx.max_voltage and bypass_check: + self.exec_logger.warning('Cannot bypass checking electrode roles because tx voltage is over rx maximum voltage') + return False + else: + return self._hw.switch_mux(electrodes=quadrupole, state='on', bypass_check=bypass_check) def switch_mux_off(self, quadrupole, cmd_id=None): """Switches off multiplexer relays for given quadrupole. -- GitLab