From 940936e65edf00233d4db8b4db81e1d66fb08d85 Mon Sep 17 00:00:00 2001 From: "remi.clement@inrae.fr" <arnaud.watlet@umons.ac.be> Date: Wed, 15 Feb 2023 11:10:20 +0100 Subject: [PATCH] improves mux addressing and adds switch_mux_quad function --- .../2024.0.0/relay_board_32/test_MUX_32.py | 54 ++++++---- ohmpi.py | 98 ++++++++++++++++--- 2 files changed, 119 insertions(+), 33 deletions(-) diff --git a/MUX_board/2024.0.0/relay_board_32/test_MUX_32.py b/MUX_board/2024.0.0/relay_board_32/test_MUX_32.py index a39928b4..0f0ef3e7 100644 --- a/MUX_board/2024.0.0/relay_board_32/test_MUX_32.py +++ b/MUX_board/2024.0.0/relay_board_32/test_MUX_32.py @@ -11,10 +11,11 @@ mux_board_version = '2024.0.0' mux_addressing_table_file = "compiled_mux_addressing_table.csv" #mux_addressing_table_file = os.path.join("MUX_board",mux_board_version,"relay_board_32",mux_addressing_table_file) -electrode_nr = "1" -role = "A" +electrode_nr = ["1","4","2","3"] +role = ["A","B","M","N"] # state = on + with open(mux_addressing_table_file, 'r') as myfile: header = myfile.readlines()[0].strip('\n').split(',') mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str, @@ -23,25 +24,42 @@ mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len( print(mux_addressing_table) def set_relay_state(mcp, mcp_pin, state=True): - print(mcp,mcp_gpio) pin_enable = mcp.get_pin(mcp_pin) pin_enable.direction = Direction.OUTPUT pin_enable.value = state i2c = busio.I2C(board.SCL, board.SDA) -idx = np.where((mux_addressing_table['Electrode_id'] == electrode_nr) & (mux_addressing_table['Role'] == role))[0] -tca_addr = mux_addressing_table['TCA_address'][idx][0] -tca_channel = mux_addressing_table['TCA_channel'][idx][0] -mcp_gpio = int(mux_addressing_table['MCP_GPIO'][idx][0]) -if tca_addr is None: - tca = i2c -else: - tca = adafruit_tca9548a.TCA9548A(i2c, role) -#tca = adafruit_tca9548a.TCA9548A(i2c, hex(int(tca_addr, 16)))[tca_channel] -tca = i2c -mcp_addr = int(mux_addressing_table['MCP_address'][idx][0], 16) -mcp = MCP23017(tca, address=mcp_addr) +mcp_list = np.array([]) +tca_addr = np.array([]) +tca_channel = np.array([]) +mcp_gpio = np.array([]) +mcp_addr = np.array([]) +for i in range(len(electrode_nr)): + idx = np.where((mux_addressing_table['Electrode_id'] == electrode_nr[i]) & (mux_addressing_table['Role'] == role[i]))[0] + print(idx) + tca_addr = np.append(tca_addr,mux_addressing_table['TCA_address'][idx][0]) + tca_channel = np.append(tca_channel,mux_addressing_table['TCA_channel'][idx][0]) + mcp_addr = np.append(mcp_addr,int(mux_addressing_table['MCP_address'][idx][0], 16)) + mcp_gpio = np.append(mcp_gpio,int(mux_addressing_table['MCP_GPIO'][idx][0])) + if tca_addr[i] == 'None': + tca = i2c + else: + tca = adafruit_tca9548a.TCA9548A(i2c, tca_addr[i]) + mcp_list = np.append(mcp_list,MCP23017(tca, address=int(mcp_addr[i]))) + +mux_list = np.column_stack([tca_addr,tca_channel,mcp_addr]) +mcp_to_address, mcp_idx, mcp_counts = np.unique(mux_list, return_index=True, return_counts=True, axis=0) +print(mcp_to_address,mcp_idx) +for j,mcp in enumerate(mcp_to_address): + for i,mux in enumerate(mux_list): + if np.array_equal(mux,mcp): + print(mcp_list[mcp_idx[j]].get_pin(int(mcp_gpio[i]))) + mcp_tmp = mcp_list[mcp_idx[j]] + set_relay_state(mcp_tmp,int(mcp_gpio[i]), True) -set_relay_state(mcp,mcp_gpio, True) -time.sleep(2) -set_relay_state(mcp,mcp_gpio, False) +time.sleep(10) +for j,mcp in enumerate(mcp_to_address): + for i,mux in enumerate(mux_list): + if np.array_equal(mux,mcp): + set_relay_state(mcp_tmp,int(mcp_gpio[i]), False) + diff --git a/ohmpi.py b/ohmpi.py index 0d507d5d..12fa4dbd 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -648,6 +648,7 @@ class OhmPi(object): self.board_addresses = OHMPI_CONFIG['board_addresses'] self.board_version = OHMPI_CONFIG['board_version'] self.mux_board_version = OHMPI_CONFIG['mux_board_version'] + self.mux_addressing_table = OHMPI_CONFIG['mux_addressing_table'] self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') def read_quad(self, **kwargs): @@ -745,7 +746,7 @@ class OhmPi(object): self.pin7.value = False - if self.sequence == None : + if self.sequence is None : self.pin2 = self.mcp.get_pin(2) # dsp + self.pin2.direction = Direction.OUTPUT self.pin2.value = True @@ -1020,7 +1021,7 @@ class OhmPi(object): dd['cmd_id'] = str(cmd_id) self.data_logger.info(dd) self.pin5.value = False #IHM led on measurement off - if self.sequence == None : + if self.sequence is None : self.pin2.value = False # DSP + off self.pin3.value = False # DSP - off return d @@ -1095,7 +1096,7 @@ class OhmPi(object): self.exec_logger.debug(f'Saving to {filename}') # make sure all multiplexer are off - self.reset_mux() + #self.reset_mux() # measure all quadrupole of the sequence if self.sequence is None: @@ -1151,6 +1152,7 @@ class OhmPi(object): self.pin2.value = True self.pin3.value = True self.status = 'idle' + return acquired_data def run_sequence_async(self, cmd_id=None, **kwargs): """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. @@ -1318,9 +1320,10 @@ class OhmPi(object): self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') elif self.mux_board_version == '2024.0.0': - with open(self.mux_addressing_table, 'r') as myfile: + mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table) + with open(mux_addressing_table_file, 'r') as myfile: header = myfile.readlines()[0].strip('\n').split(',') - mux_addressing_table = np.genfromtxt(self.mux_addressing_table, dtype=str, + mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str, delimiter=',', skip_header=1, ) mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))} @@ -1328,19 +1331,23 @@ class OhmPi(object): pin_enable = mcp.get_pin(mcp_pin) pin_enable.direction = Direction.OUTPUT pin_enable.value = state - + mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16) + mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16) idx = np.where((mux_addressing_table['Electrode_id'] == electrode_nr) & (mux_addressing_table['Role'] == role))[0] tca_addr = mux_addressing_table['TCA_address'][idx][0] tca_channel = mux_addressing_table['TCA_channel'][idx][0] mcp_gpio = mux_addressing_table['MCP_GPIO'][idx][0] - if tca_addr is None: + if tca_addr == 'None': tca = self.i2c else: - tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) - tca = adafruit_tca9548a.TCA9548A(self.i2c, int(tca_addr,16))[tca_channel] + tca = adafruit_tca9548a.TCA9548A(self.i2c, int(tca_addr,16))[tca_channel] mcp_addr = int(mux_addressing_table['MCP_address'][idx][0], 16) mcp = MCP23017(tca, address=mcp_addr) if state == 'on': + print('opening gpio nr', mcp_gpio) + print('opening electrode nr', electrode_nr) + print('opening role', role) + print('opening MCP', mux_addressing_table['MCP_address'][idx][0]) set_relay_state(mcp, mcp_gpio, True) if state == 'off': set_relay_state(mcp, mcp_gpio, False) @@ -1348,6 +1355,61 @@ class OhmPi(object): else: self.exec_logger.warning('MUX board version not recognized') + def _switch_mux_quad(self, electrodes, state, roles): + """Selects the right channel for the multiplexer cascade for a given electrode. + + Parameters + ---------- + electrodes : np.array or list + Electrode indexes to be switched on or off. + state : str + Either 'on' or 'off'. + roles : np.array or list + Array of roles 'A', 'B', 'M' or 'N', so we can assign them to a MUX board. + """ + mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table) + with open(mux_addressing_table_file, 'r') as myfile: + header = myfile.readlines()[0].strip('\n').split(',') + mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str, + delimiter=',', skip_header=1, ) + mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))} + mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16) + mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16) + + def set_relay_state(mcp, mcp_pin, state=True): + pin_enable = mcp.get_pin(mcp_pin) + pin_enable.direction = Direction.OUTPUT + pin_enable.value = state + + i2c = busio.I2C(board.SCL, board.SDA) + mcp_list = np.array([]) + tca_addr = np.array([]) + tca_channel = np.array([]) + mcp_gpio = np.array([]) + mcp_addr = np.array([]) + for i in range(len(electrodes)): + idx = np.where((mux_addressing_table['Electrode_id'] == electrodes[i]) & (mux_addressing_table['Role'] == roles[i]))[0] + tca_addr = np.append(tca_addr,mux_addressing_table['TCA_address'][idx][0]) + tca_channel = np.append(tca_channel,mux_addressing_table['TCA_channel'][idx][0]) + mcp_addr = np.append(mcp_addr,int(mux_addressing_table['MCP_address'][idx][0], 16)) + mcp_gpio = np.append(mcp_gpio,int(mux_addressing_table['MCP_GPIO'][idx][0])) + if tca_addr[i] == 'None': + tca = i2c + else: + tca = adafruit_tca9548a.TCA9548A(i2c, tca_addr[i]) + mcp_list = np.append(mcp_list,MCP23017(tca, address=int(mcp_addr[i]))) + + mux_list = np.column_stack([tca_addr,tca_channel,mcp_addr]) + mcp_to_address, mcp_idx, mcp_counts = np.unique(mux_list, return_index=True, return_counts=True, axis=0) + for j,mcp in enumerate(mcp_to_address): + for i,mux in enumerate(mux_list): + if np.array_equal(mux,mcp): + mcp_tmp = mcp_list[mcp_idx[j]] + if state == 'on': + set_relay_state(mcp_tmp,int(mcp_gpio[i]), True) + elif state == 'off': + set_relay_state(mcp_tmp,int(mcp_gpio[i]), False) + def switch_mux_on(self, quadrupole, cmd_id=None): """Switches on multiplexer relays for given quadrupole. @@ -1361,9 +1423,12 @@ class OhmPi(object): roles = ['A', 'B', 'M', 'N'] # another check to be sure A != B if quadrupole[0] != quadrupole[1]: - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'on', roles[i]) + if self.mux_board_version == '2024.0.0': + self._switch_mux_quad(quadrupole, 'on', roles) + else: + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'on', roles[i]) else: self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') @@ -1378,9 +1443,12 @@ class OhmPi(object): List of 4 integers representing the electrode numbers. """ roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'off', roles[i]) + if self.mux_board_version == '2024.0.0': + self._switch_mux_quad(quadrupole, 'off', roles) + else: + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'off', roles[i]) def test_mux(self, activation_time=1.0, address=0x70): """Interactive method to test the multiplexer. -- GitLab