From 8aecddabb5faf65b691f8da802f599871e25692f Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Sun, 30 Apr 2023 17:07:32 +0200 Subject: [PATCH] Updates hardware_system _cabling from mux_board cabling; implements test_mux --- hardware_system.py | 71 ++++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 44 deletions(-) diff --git a/hardware_system.py b/hardware_system.py index b908571c..71cfa941 100644 --- a/hardware_system.py +++ b/hardware_system.py @@ -63,8 +63,11 @@ class OhmPiHardware: controller=self.controller, cabling = self._cabling)}) self.mux_barrier = Barrier(len(self.mux_boards) + 1) - for _, mux in self.mux_boards.items(): + self._cabling={} + for mux_id, mux in self.mux_boards.items(): mux.barrier = self.mux_barrier + update_dict(self._cabling, mux.cabling) + print(self._cabling) self.readings = np.array([]) # time series of acquired data self._start_time = None # time of the beginning of a readings acquisition self._pulse = 0 # pulse number @@ -286,7 +289,6 @@ class OhmPiHardware: mux_workers.append(Thread(target=mux.switch, kwargs={'elec_dict': elec_dict})) for mux_worker in mux_workers: mux_worker.start() - self.exec_logger.debug(f'Waiting: {self.mux_barrier.n_waiting}/{self.mux_barrier.parties}') self.mux_barrier.wait() for mux_worker in mux_workers: mux_worker.join() @@ -294,59 +296,40 @@ class OhmPiHardware: self.exec_logger.error( 'Unable to switch electrodes: number of electrodes and number of roles do not match!') - def test_mux(self, activation_time=1.0, channel=None, bypass_check=False): + def test_mux(self, channel=None, activation_time=1.0): """Interactive method to test the multiplexer. Parameters ---------- - activation_time : float, optional - Time in seconds during which the relays are activated. channel : tuple, optional (electrode_nr, role) to test. - bypass_check : bool, optional - if True, test will be conducted even if several mux boards are connected to the same electrode with the same role + activation_time : float, optional + Time in seconds during which the relays are activated. """ self.reset_mux() if channel is None: - pass + a = input('Which channel do you want to test? (1,A)') + if a =='': + electrodes = [1] + roles = ['A'] + else: + try: + a = a.lstrip('(').rstrip(')').split(',') + electrodes = [int(a[0])] + roles = [a[1]] + except Exception as e: + self.exec_logger.error(f'Unable to parse your answer: {e}') + return + self.switch_mux(electrodes,roles,state='on') + time.sleep(activation_time) + self.switch_mux(electrodes,roles, state='off') else: - pass - # choose with MUX board - # tca = adafruit_tca9548a.TCA9548A(self.i2c, address) - # - # # ask use some details on how to proceed - # a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!') - # if a == '1': - # print('run channel by channel test') - # electrode = int(input('Choose your electrode number (integer):')) - # electrodes = [electrode] - # elif a == '2': - # electrodes = range(1, 65) - # else: - # print('Wrong choice !') - # return - # - # # run the test - # for electrode_nr in electrodes: - # # find I2C address of the electrode and corresponding relay - # # considering that one MCP23017 can cover 16 electrodes - # i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division - # relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 - # - # if i2c_address is not None: - # # select the MCP23017 of the selected MUX board - # mcp2 = MCP23017(tca[i2c_address]) - # mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT - # - # # activate relay for given time - # mcp2.get_pin(relay_nr - 1).value = True - # print('electrode:', electrode_nr, ' activated...', end='', flush=True) - # time.sleep(activation_time) - # mcp2.get_pin(relay_nr - 1).value = False - # print(' deactivated') - # time.sleep(activation_time) - # print('Test finished.') + for c in self._cabling.keys: + self.switch_mux(electrodes=c[0],roles=c[1],state='on') + time.sleep(activation_time) + self.switch_mux(electrodes=c[0], roles=c[1], state='off') + self.exec_logger.info('Test finished.') def reset_mux(self): """Switches off all multiplexer relays. -- GitLab