Commit 8aecddab authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Updates hardware_system _cabling from mux_board cabling; implements test_mux

No related merge requests found
Showing with 27 additions and 44 deletions
+27 -44
......@@ -63,8 +63,11 @@ class OhmPiHardware:
controller=self.controller,
cabling = self._cabling)})
self.mux_barrier = Barrier(len(self.mux_boards) + 1)
for _, mux in self.mux_boards.items():
self._cabling={}
for mux_id, mux in self.mux_boards.items():
mux.barrier = self.mux_barrier
update_dict(self._cabling, mux.cabling)
print(self._cabling)
self.readings = np.array([]) # time series of acquired data
self._start_time = None # time of the beginning of a readings acquisition
self._pulse = 0 # pulse number
......@@ -286,7 +289,6 @@ class OhmPiHardware:
mux_workers.append(Thread(target=mux.switch, kwargs={'elec_dict': elec_dict}))
for mux_worker in mux_workers:
mux_worker.start()
self.exec_logger.debug(f'Waiting: {self.mux_barrier.n_waiting}/{self.mux_barrier.parties}')
self.mux_barrier.wait()
for mux_worker in mux_workers:
mux_worker.join()
......@@ -294,59 +296,40 @@ class OhmPiHardware:
self.exec_logger.error(
'Unable to switch electrodes: number of electrodes and number of roles do not match!')
def test_mux(self, activation_time=1.0, channel=None, bypass_check=False):
def test_mux(self, channel=None, activation_time=1.0):
"""Interactive method to test the multiplexer.
Parameters
----------
activation_time : float, optional
Time in seconds during which the relays are activated.
channel : tuple, optional
(electrode_nr, role) to test.
bypass_check : bool, optional
if True, test will be conducted even if several mux boards are connected to the same electrode with the same role
activation_time : float, optional
Time in seconds during which the relays are activated.
"""
self.reset_mux()
if channel is None:
pass
a = input('Which channel do you want to test? (1,A)')
if a =='':
electrodes = [1]
roles = ['A']
else:
try:
a = a.lstrip('(').rstrip(')').split(',')
electrodes = [int(a[0])]
roles = [a[1]]
except Exception as e:
self.exec_logger.error(f'Unable to parse your answer: {e}')
return
self.switch_mux(electrodes,roles,state='on')
time.sleep(activation_time)
self.switch_mux(electrodes,roles, state='off')
else:
pass
# choose with MUX board
# tca = adafruit_tca9548a.TCA9548A(self.i2c, address)
#
# # ask use some details on how to proceed
# a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!')
# if a == '1':
# print('run channel by channel test')
# electrode = int(input('Choose your electrode number (integer):'))
# electrodes = [electrode]
# elif a == '2':
# electrodes = range(1, 65)
# else:
# print('Wrong choice !')
# return
#
# # run the test
# for electrode_nr in electrodes:
# # find I2C address of the electrode and corresponding relay
# # considering that one MCP23017 can cover 16 electrodes
# i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
# relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1
#
# if i2c_address is not None:
# # select the MCP23017 of the selected MUX board
# mcp2 = MCP23017(tca[i2c_address])
# mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT
#
# # activate relay for given time
# mcp2.get_pin(relay_nr - 1).value = True
# print('electrode:', electrode_nr, ' activated...', end='', flush=True)
# time.sleep(activation_time)
# mcp2.get_pin(relay_nr - 1).value = False
# print(' deactivated')
# time.sleep(activation_time)
# print('Test finished.')
for c in self._cabling.keys:
self.switch_mux(electrodes=c[0],roles=c[1],state='on')
time.sleep(activation_time)
self.switch_mux(electrodes=c[0], roles=c[1], state='off')
self.exec_logger.info('Test finished.')
def reset_mux(self):
"""Switches off all multiplexer relays.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment