From 8c95fd2e8e2a9ef5d70863cc84ee20b9cb0d0ff7 Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Sun, 30 Apr 2023 12:27:09 +0200 Subject: [PATCH] Adds mux switching from OhmPiHardware --- .../abstract_hardware_components.py | 25 ++- hardware_system.py | 121 +++++++++++++- ohmpi.py | 154 +++--------------- test_measure_with_ohmpi_card_3_15.py | 14 +- utils.py | 25 +++ 5 files changed, 200 insertions(+), 139 deletions(-) diff --git a/hardware_components/abstract_hardware_components.py b/hardware_components/abstract_hardware_components.py index 5522a437..4cf08d44 100644 --- a/hardware_components/abstract_hardware_components.py +++ b/hardware_components/abstract_hardware_components.py @@ -3,6 +3,7 @@ from abc import ABC, abstractmethod import numpy as np from OhmPi.logging_setup import create_stdout_logger import time +from threading import Barrier class ControllerAbstract(ABC): def __init__(self, **kwargs): @@ -54,6 +55,7 @@ class MuxAbstract(ABC): self.cabling.update({k: (v[1], k[1])}) self.exec_logger.debug(f'{self.board_id} cabling: {self.cabling}') self.addresses = kwargs.pop('addresses', None) + self.barrier = kwargs.pop('barrier', None) @abstractmethod def _get_addresses(self): @@ -63,6 +65,15 @@ class MuxAbstract(ABC): def reset(self): pass + @property + def barrier(self): + return self.barrier + + @barrier.setter + def barrier(self, value): + assert isinstance(value, Barrier) + self.barrier = value + def switch(self, elec_dict=None, state='on'): """Switch a given list of electrodes with different roles. Electrodes with a value of 0 will be ignored. @@ -96,11 +107,19 @@ class MuxAbstract(ABC): 'This could create an over-voltage in the RX! Switching aborted.') return - # if all ok, then switch the electrodes + # if all ok, then wait for the barrier to open, then switch the electrodes + if self.barrier is not None: + self.exec_logger.debug(f'{self.board_id} waiting to switch.') + self.barrier.wait() for role in elec_dict: for elec in elec_dict[role]: - if elec > 0: - self.switch_one(elec, role, state) + if elec > 0: # Is this condition related to electrodes to infinity? + if (elec, role) in self.cabling.keys(): + self.switch_one(elec, role, state) + else: + self.exec_logger.debug(f'{self.board_id} skipping switching {(elec, role)} because it is' + f'is not in board cabling') + self.exec_logger.debug(f'{self.board_id} switching done.') else: self.exec_logger.warning(f'Missing argument for {self.board_name}.switch: elec_dict is None.') diff --git a/hardware_system.py b/hardware_system.py index 56c6774f..42d3ace4 100644 --- a/hardware_system.py +++ b/hardware_system.py @@ -1,11 +1,10 @@ import importlib import datetime import time - import numpy as np from OhmPi.logging_setup import create_stdout_logger from OhmPi.config import HARDWARE_CONFIG -from threading import Thread, Event +from threading import Thread, Event, Barrier controller_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["controller"]["model"]}') tx_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["tx"]["model"]}') @@ -55,6 +54,7 @@ class OhmPiHardware: soh_logger=self.soh_logger, controller=self.controller, cabling = self._cabling)}) + self.readings = np.array([]) # time series of acquired data self._start_time = None # time of the beginning of a readings acquisition self._pulse = 0 # pulse number @@ -69,6 +69,11 @@ class OhmPiHardware: self.tx.voltage_pulse(length=duration) self.tx_sync.clear() + def _set_mux_barrier(self): + self.mux_barrier = Barrier(len(self.mux)+1) + for mux in self.mux: + mux.barrier = self.mux_barrier + @property def pulses(self): pulses = {} @@ -228,4 +233,114 @@ class OhmPiHardware: if not append: self._clear_values() for i in range(n_pulses): - self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True) \ No newline at end of file + self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True) + + # _______________________________________________ + def switch_dps(self, state='off'): + """Switches DPS on or off. + + Parameters + ---------- + state : str + 'on', 'off' + """ + if state == 'on': + self.tx.turn_on() + else: + self.tx.turn_off() + if state != 'off': + self.exec_logger.warning(f'Unknown state {state} for DPS switching. switching off...') + + def switch_mux(self, electrodes, roles=None, state='off'): + """Switches on multiplexer relays for given quadrupole. + + Parameters + ---------- + electrodes : list + List of integers representing the electrode ids. + roles : list, optional + List of roles of electrodes, optional + state : str, optional + Either 'on' or 'off'. + """ + if roles is None: + roles = ['A', 'B', 'M', 'N'] + if len(electrodes) == len(roles): + # TODO: Check that we don't set incompatible roles to the same electrode + elec_dict = {i: [] for i in roles} + for i in range(len(electrodes)): + elec_dict[roles[i]].append(electrodes[i]) + mux_workers = [] + for mux in self.mux: + # start a new thread to perform some work + mux_workers.append(Thread(target=mux.switch, kwargs={'elec_dict': elec_dict})) + for mux_worker in mux_workers: + mux_worker.start() + self.mux_barrier.wait() + for mux_worker in mux_workers: + mux_worker.join() + else: + self.exec_logger.error( + 'Unable to switch electrodes: number of electrodes and number of roles do not match!') + + def test_mux(self, activation_time=1.0, channel=None, bypass_check=False): + """Interactive method to test the multiplexer. + + Parameters + ---------- + activation_time : float, optional + Time in seconds during which the relays are activated. + channel : tuple, optional + (electrode_nr, role) to test. + bypass_check : bool, optional + if True, test will be conducted even if several mux boards are connected to the same electrode with the same role + """ + self.reset_mux() + + if channel is None: + pass + else: + pass + # choose with MUX board + # tca = adafruit_tca9548a.TCA9548A(self.i2c, address) + # + # # ask use some details on how to proceed + # a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!') + # if a == '1': + # print('run channel by channel test') + # electrode = int(input('Choose your electrode number (integer):')) + # electrodes = [electrode] + # elif a == '2': + # electrodes = range(1, 65) + # else: + # print('Wrong choice !') + # return + # + # # run the test + # for electrode_nr in electrodes: + # # find I2C address of the electrode and corresponding relay + # # considering that one MCP23017 can cover 16 electrodes + # i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division + # relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 + # + # if i2c_address is not None: + # # select the MCP23017 of the selected MUX board + # mcp2 = MCP23017(tca[i2c_address]) + # mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT + # + # # activate relay for given time + # mcp2.get_pin(relay_nr - 1).value = True + # print('electrode:', electrode_nr, ' activated...', end='', flush=True) + # time.sleep(activation_time) + # mcp2.get_pin(relay_nr - 1).value = False + # print(' deactivated') + # time.sleep(activation_time) + # print('Test finished.') + + def reset_mux(self): + """Switches off all multiplexer relays. + """ + + self.exec_logger.debug('Resetting all mux boards ...') + for mux in self.mux: + mux.reset() \ No newline at end of file diff --git a/ohmpi.py b/ohmpi.py index 6c38c1e2..85893355 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -1107,52 +1107,7 @@ class OhmPi(object): warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning) self.interrupt(**kwargs) - def _switch_mux(self, electrode_nr, state, role): - """Selects the right channel for the multiplexer cascade for a given electrode. - - Parameters - ---------- - electrode_nr : int - Electrode index to be switched on or off. - state : str - Either 'on' or 'off'. - role : str - Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. - """ - - if not self.use_mux or not self.on_pi: - if not self.on_pi: - self.exec_logger.warning('Cannot reset mux while in simulation mode...') - else: - self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' - ' Set use_mux to True to use the multiplexer...') - elif self.sequence is None and not self.use_mux: - self.exec_logger.warning('Unable to switch MUX without a sequence') - else: - # choose with MUX board - tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) - - # find I2C address of the electrode and corresponding relay - # considering that one MCP23017 can cover 16 electrodes - i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division - relay_nr = (electrode_nr-1) - ((electrode_nr-1) // 16) * 16 - - if i2c_address is not None: - # select the MCP23017 of the selected MUX board - mcp2 = MCP23017(tca[i2c_address]) - mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT - - if state == 'on': - mcp2.get_pin(relay_nr).value = True - else: - mcp2.get_pin(relay_nr).value = False - - self.exec_logger.debug(f'Switching relay {relay_nr} ' - f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') - else: - self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') - - def switch_dps(self,state='off'): + def switch_dps(self, state='off'): """Switches DPS on or off. Parameters @@ -1160,20 +1115,7 @@ class OhmPi(object): state : str 'on', 'off' """ - self.pin2 = self.mcp_board.get_pin(2) # dsp - - self.pin2.direction = Direction.OUTPUT - self.pin3 = self.mcp_board.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - if state == 'on': - self.pin2.value = True - self.pin3.value = True - self.exec_logger.debug(f'Switching DPS on') - time.sleep(4) - elif state == 'off': - self.pin2.value = False - self.pin3.value = False - self.exec_logger.debug(f'Switching DPS off') - + self._hw.switch_dps(state=state) def switch_mux_on(self, quadrupole, cmd_id=None): """Switches on multiplexer relays for given quadrupole. @@ -1185,14 +1127,8 @@ class OhmPi(object): quadrupole : list of 4 int List of 4 integers representing the electrode numbers. """ - roles = ['A', 'B', 'M', 'N'] - # another check to be sure A != B - if quadrupole[0] != quadrupole[1]: - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'on', roles[i]) - else: - self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') + + self._hw.switch_mux_on(electrodes=quadrupole, state='on') def switch_mux_off(self, quadrupole, cmd_id=None): """Switches off multiplexer relays for given quadrupole. @@ -1204,59 +1140,25 @@ class OhmPi(object): quadrupole : list of 4 int List of 4 integers representing the electrode numbers. """ - roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'off', roles[i]) - # def test_mux(self, activation_time=1.0, address=0x70): TODO: add this in the MUX code - # """Interactive method to test the multiplexer. - # - # Parameters - # ---------- - # activation_time : float, optional - # Time in seconds during which the relays are activated. - # address : hex, optional - # Address of the multiplexer board to test (e.g. 0x70, 0x71, ...). - # """ - # self.use_mux = True - # self.reset_mux() - # - # # choose with MUX board - # tca = adafruit_tca9548a.TCA9548A(self.i2c, address) - # - # # ask use some details on how to proceed - # a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!') - # if a == '1': - # print('run channel by channel test') - # electrode = int(input('Choose your electrode number (integer):')) - # electrodes = [electrode] - # elif a == '2': - # electrodes = range(1, 65) - # else: - # print('Wrong choice !') - # return - # - # # run the test - # for electrode_nr in electrodes: - # # find I2C address of the electrode and corresponding relay - # # considering that one MCP23017 can cover 16 electrodes - # i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division - # relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 - # - # if i2c_address is not None: - # # select the MCP23017 of the selected MUX board - # mcp2 = MCP23017(tca[i2c_address]) - # mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT - # - # # activate relay for given time - # mcp2.get_pin(relay_nr - 1).value = True - # print('electrode:', electrode_nr, ' activated...', end='', flush=True) - # time.sleep(activation_time) - # mcp2.get_pin(relay_nr - 1).value = False - # print(' deactivated') - # time.sleep(activation_time) - # print('Test finished.') + self._hw.switch_mux(electrodes=quadrupole, state='off') + + def test_mux(self, activation_time=1.0, mux=None): # TODO: add this in the MUX code + """Interactive method to test the multiplexer boards. + + Parameters + ---------- + activation_time : float, optional + Time in seconds during which the relays are activated. + address : hex, optional + Address of the multiplexer board to test (e.g. 0x70, 0x71, ...). + """ + self.reset_mux() # All muxes should be reset even if we only want to test one otherwise we might create a shortcut + if mux is None: + self._hw.test_mux(activation_time) + else: + self._hw.mux[mux].test_mux() + def reset_mux(self, cmd_id=None): """Switches off all multiplexer relays. @@ -1266,17 +1168,7 @@ class OhmPi(object): cmd_id : str, optional Unique command identifier """ - if self.on_pi and self.use_mux: - roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - for j in range(1, self.max_elec + 1): - self._switch_mux(j, 'off', roles[i]) - self.exec_logger.debug('All MUX switched off.') - elif not self.on_pi: - self.exec_logger.warning('Cannot reset mux while in simulation mode...') - else: - self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' - ' Set use_mux to True to use the multiplexer...') + self._hw.reset_mux() def _update_acquisition_settings(self, config): warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) diff --git a/test_measure_with_ohmpi_card_3_15.py b/test_measure_with_ohmpi_card_3_15.py index c7cba220..6c012876 100644 --- a/test_measure_with_ohmpi_card_3_15.py +++ b/test_measure_with_ohmpi_card_3_15.py @@ -1,3 +1,5 @@ +import time + import numpy as np import logging import matplotlib.pyplot as plt @@ -7,7 +9,12 @@ from OhmPi.hardware_system import OhmPiHardware k = OhmPiHardware() k.exec_logger.setLevel(logging.INFO) -# Test #1: + +# Test mux switching +k.reset_mux() +k.switch_mux(electrodes=[9,10,11,12], roles=['A', 'B', 'M', 'N'], state='on') + +# Test _vab_pulse: print('Testing positive _vab_pulse') k._vab_pulse(vab=12, length=1., sampling_rate=k.rx.sampling_rate, polarity=1) r = k.readings[:,4]/k.readings[:,3] @@ -19,7 +26,7 @@ r = k.readings[:,4]/k.readings[:,3] print(f'Mean resistance: {np.mean(r):.3f} Ohms, Dev. {100*np.std(r)/np.mean(r):.1f} %') print(f'sampling rate: {k.rx.sampling_rate:.1f} ms, mean sample spacing: {np.mean(np.diff(k.readings[:,0]))*1000.:.1f} ms') -# Test #2: +# Test vab_square_wave: print('\n\nTesting vab_square_wave') cycles=3 cycle_length = 1. @@ -47,5 +54,8 @@ print('\nTesting with pulses') r = [np.abs((k.pulses[i]['polarity']*k.pulses[i]['vmn']-k.sp)/k.pulses[i]['iab']) for i in k.pulses.keys()] for i in range(len(r)): print(f'Mean resistance with sp correction for pulse{i}: {np.mean(r[i]):.3f} Ohms, Dev. {100*np.std(r[i])/np.mean(r[i]):.1f} %') + +k.switch_mux(electrodes=[9,10,11,12], roles=['A', 'B', 'M', 'N'], state='off') +k.reset_mux() change_config('config_default.py', verbose=False) diff --git a/utils.py b/utils.py index 5e192644..537297e1 100644 --- a/utils.py +++ b/utils.py @@ -1,6 +1,31 @@ import io import os import shutil +import collections.abc + +def update_dict(d, u): + """Updates a dictionary by adding elements to collection items associated to existing keys + + Parameters + ---------- + d: dict + Dictionary that will be updated + u: dict + Dictionary that is used to update d + + Returns + ------- + dict + The updated dictionary + """ + + for k, v in u.items(): + if isinstance(v, collections.abc.Mapping): + d[k] = update_dict(d.get(k, {}), v) + else: + d[k] = v + return d + def get_platform(): """Gets platform name and checks if it is a raspberry pi -- GitLab