diff --git a/config.py b/config.py index 17e03a760ac9a99b4d99066e4f5f100b9d63c24b..40d92c6e85c7d5d87fd17bc0397483714e8aab25 100644 --- a/config.py +++ b/config.py @@ -22,7 +22,8 @@ OHMPI_CONFIG = { 'max_elec': 64, 'board_addresses': {'A': 0x73, 'B': 0x72, 'M': 0x71, 'N': 0x70}, # CHECK IF YOUR BOARDS HAVE THESE ADDRESSES 'settings': 'ohmpi_settings.json', # INSERT YOUR FAVORITE SETTINGS FILE HERE - 'board_version': 'mb.2023.0.0'#,'22.10' + 'board_version': 'mb.2023.0.0',#,'22.10' + 'mcp_board_address': '0x20' } # TODO: add a dictionary with INA models and associated gain values # SET THE LOGGING LEVELS, MQTT BROKERS AND MQTT OPTIONS ACCORDING TO YOUR NEEDS diff --git a/ohmpi.py b/ohmpi.py index 93290116daa327571c91c23db7047bbf167db610..4236f615910f8f54fd18afafdd656cee379e4410 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -118,8 +118,8 @@ class OhmPi(object): self.i2c = busio.I2C(board.SCL, board.SDA) # noqa # I2C connexion to MCP23008, for current injection - self.MCPIHM = MCP23008(self.i2c, address=0x24) - self.pin4 = self.MCPIHM.get_pin(4) # Ohmpi_run + self.mcp = MCP23008(self.i2c, address=self.mcp_board_address) + self.pin4 = self.mcp.get_pin(4) # Ohmpi_run self.pin4.direction = Direction.OUTPUT self.pin4.value = True @@ -133,13 +133,13 @@ class OhmPi(object): # current injection module if self.idps: - self.pin2 = self.MCPIHM.get_pin(2) # dsp + + self.pin2 = self.mcp.get_pin(2) # dsp + self.pin2.direction = Direction.OUTPUT self.pin2.value = True - self.pin3 = self.MCPIHM.get_pin(3) # dsp - + self.pin3 = self.mcp.get_pin(3) # dsp - self.pin3.direction = Direction.OUTPUT self.pin3.value = True - time.sleep(4) + time.sleep(3) self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc self.DPS.serial.bytesize = 8 # @@ -147,7 +147,7 @@ class OhmPi(object): self.DPS.debug = False # self.DPS.serial.parity = 'N' # No parity self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode - self.DPS.write_register(0x0001, 1000, 0) # max current allowed (100 mA for relays) + self.DPS.write_register(0x0001, 100, 0) # max current allowed (100 mA for relays) print(self.DPS.read_register(0x05,2 )) # max current allowed (100 mA for relays) #voltage self.pin2.value = False @@ -155,10 +155,10 @@ class OhmPi(object): # (last number) 0 is for mA, 3 is for A # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) - self.pin0 = self.MCPIHM.get_pin(0) + self.pin0 = self.mcp.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.MCPIHM.get_pin(1) + self.pin1 = self.mcp.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False @@ -305,152 +305,67 @@ class OhmPi(object): volt = 5. # redefined the pin of the mcp (needed when relays are connected) - self.pin0 = self.MCPIHM.get_pin(0) + self.pin0 = self.mcp.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.MCPIHM.get_pin(1) + self.pin1 = self.mcp.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False # select a polarity to start with self.pin0.value = True self.pin1.value = False - - - if strategy == 'constant': - vab = volt - - elif strategy == 'vmax': - # implement different strategy - I=0 - vmn=0 - count=0 - while I < 3 or abs(vmn) < 20 : #TODO: hardware related - place in config + I=0 + vmn=0 + count=0 + + while I < 3 or abs(vmn) < 10 : # I supérieur à 1 mA et Vmn surpérieur + if count >0 : + volt = volt + 2 + count=count+1 + if volt > 50: + break + # set voltage for test + self.DPS.write_register(0x0000, volt, 2) + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time - if count > 0 : - print('o', volt) - volt = volt + 2 - print('>', volt) - count=count+1 - if volt > 50: - break - - # set voltage for test - if count==1: - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time - self.DPS.write_register(0x0000, volt, 2) - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - n = 0 - while (abs(vmn) > voltage_max or I > current_max) and volt>0: #If starting voltage is too high, need to lower it down - # print('we are out of range! so decreasing volt') - volt = volt - 2 - self.DPS.write_register(0x0000, volt, 2) - #self.DPS.write_register(0x09, 1) # DPS5005 on - #time.sleep(best_tx_injtime) - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + if abs(vmn)>4500 or I> 45 : + volt = volt - 2 + self.DPS.write_register(0x0000, volt, 2) + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor polarity = -1 vmn = U2 - n+=1 - if n > 25 : - break - - factor_I = (current_max) / I - factor_vmn = voltage_max / vmn - factor = factor_I - if factor_I > factor_vmn: - factor = factor_vmn - vab = factor * volt * 0.8 - if vab > tx_max: - vab = tx_max - print(factor_I, factor_vmn, 'factor!!') - - - elif strategy == 'vmin': - # implement different strategy - I=20 - vmn=400 - count=0 - while I > 10 or abs(vmn) > 300 : #TODO: hardware related - place in config - if count > 0 : - volt = volt - 2 - print(volt, count) - count=count+1 - if volt > 50: - break - - # set voltage for test - if count==1: - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time - self.DPS.write_register(0x0000, volt, 2) - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - - n=0 - while (abs(vmn) < voltage_min or I < current_min) and volt > 0 : #If starting voltage is too high, need to lower it down - # print('we are out of range! so increasing volt') - volt = volt + 2 - print(volt) - self.DPS.write_register(0x0000, volt, 2) - #self.DPS.write_register(0x09, 1) # DPS5005 on - #time.sleep(best_tx_injtime) - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - n+=1 - if n > 25 : - break - - vab = volt + break + self.DPS.write_register(0x09, 0) # DPS5005 off # print('polarity', polarity) @@ -462,11 +377,27 @@ class OhmPi(object): self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') + # implement different strategy + if strategy == 'vmax': + factor_I = (current_max) / I + factor_vmn = voltage_max / vmn + factor = factor_I + if factor_I > factor_vmn: + factor = factor_vmn + vab = factor * volt * 0.9 + if vab > tx_max: + vab = tx_max + + elif strategy == 'constant': + vab = volt + else: + vab = 5 + # self.DPS.write_register(0x09, 0) # DPS5005 off self.pin0.value = False self.pin1.value = False - return vab, polarity, Rab + return vab, polarity @staticmethod def _find_identical_in_line(quads): @@ -716,8 +647,12 @@ class OhmPi(object): self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes self.board_addresses = OHMPI_CONFIG['board_addresses'] self.board_version = OHMPI_CONFIG['board_version'] + self.mux_board_version = OHMPI_CONFIG['mux_board_version'] + self.mux_addressing_table = OHMPI_CONFIG['mux_addressing_table'] + self.mcp_board_address = OHMPI_CONFIG['mcp_board_address'] self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') + def read_quad(self, **kwargs): warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) self.load_sequence(**kwargs) @@ -802,54 +737,52 @@ class OhmPi(object): # let's define the pin again as if we run through measure() # as it's run in another thread, it doesn't consider these # and this can lead to short circuit! - - self.pin0 = self.MCPIHM.get_pin(0) + self.pin0 = self.mcp.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.MCPIHM.get_pin(1) + self.pin1 = self.mcp.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False - self.pin7 = self.MCPIHM.get_pin(7) #IHM on mesaurement + self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement self.pin7.direction = Direction.OUTPUT self.pin7.value = False + if self.sequence is None : - if self.idps: - self.pin2 = self.MCPIHM.get_pin(2) # dsp + - self.pin2.direction = Direction.OUTPUT - self.pin2.value = True - self.pin3 = self.MCPIHM.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True - time.sleep(5) - - self.pin5 = self.MCPIHM.get_pin(5) #IHM on mesaurement + self.pin2 = self.mcp.get_pin(2) # dsp + + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + + self.pin5 = self.mcp.get_pin(5) #IHM on mesaurement self.pin5.direction = Direction.OUTPUT self.pin5.value = True - self.pin6 = self.MCPIHM.get_pin(6) #IHM on mesaurement + self.pin6 = self.mcp.get_pin(6) #IHM on mesaurement self.pin6.direction = Direction.OUTPUT self.pin6.value = False - self.pin7 = self.MCPIHM.get_pin(7) #IHM on mesaurement + self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement self.pin7.direction = Direction.OUTPUT self.pin7.value = False - if self.idps: - if self.DPS.read_register(0x05,2) < 11: - self.pin7.value = True# max current allowed (100 mA for relays) #voltage + + if self.DPS.read_register(0x05,2) < 11: + self.pin7.value = True# max current allowed (100 mA for relays) #voltage # get best voltage to inject AND polarity if self.idps: - tx_volt, polarity, Rab = self._compute_tx_volt( + tx_volt, polarity = self._compute_tx_volt( best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V') else: polarity = 1 - Rab = None # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address, mode=0) self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address, mode=0) + # turn on the power supply start_delay = None end_delay = None @@ -858,43 +791,40 @@ class OhmPi(object): if not np.isnan(tx_volt): self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(0.3) + time.sleep(0.05) else: self.exec_logger.debug('No best voltage found, will not take measurement') out_of_range = True if not out_of_range: # we found a Vab in the range so we measure if autogain: - - # compute autogain - gain_voltage = [] - for n in [0,1]: # make short cycle for gain computation - if n == 0: - self.pin0.value = True - self.pin1.value = False - if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on - else: - self.pin0.value = False - self.pin1.value = True # current injection nr2 - if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on - + if self.board_version == 'mb.2023.0.0': + # compute autogain + self.pin0.value = True + self.pin1.value = False + self.pin6.value = True # IHM current injection led on time.sleep(injection_duration) gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) if polarity > 0: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) else: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) self.pin0.value = False self.pin1.value = False - if self.board_version == 'mb.2023.0.0': - self.pin6.value = False # IHM current injection led off + self.pin6.value = False # IHM current injection led off + self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage:.3f}') + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, + address=self.ads_current_address, mode=0) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, + address=self.ads_voltage_address, mode=0) + elif self.board_version == '22.10': + gain_current = 2 / 3 + gain_voltage = 2 / 3 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, + address=self.ads_current_address, mode=0) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, + address=self.ads_voltage_address, mode=0) - self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, ' - f'{gain_voltage[1]:.3f}') - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, - address=self.ads_current_address, mode=0) self.pin0.value = False self.pin1.value = False @@ -903,8 +833,8 @@ class OhmPi(object): pinMN = 0 if polarity > 0 else 2 # noqa # sampling for each stack at the end of the injection - sampling_interval = 10 # ms # TODO: make this a config option - self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy + sampling_interval = 10 # ms + self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 # full data for waveform fulldata = [] @@ -919,18 +849,13 @@ class OhmPi(object): if (n % 2) == 0: self.pin0.value = True self.pin1.value = False - if autogain: # select gain computed on first half cycle - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[0], data_rate=860, - address=self.ads_voltage_address, mode=0) + self.pin6.value = True# IHM current injection led on else: self.pin0.value = False self.pin1.value = True # current injection nr2 - if autogain: # select gain computed on first half cycle - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[1], data_rate=860, - address=self.ads_voltage_address, mode=0) + self.pin6.value = True# IHM current injection led on self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') - if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on + # measurement of current i and voltage u during injection meas = np.zeros((self.nb_samples, 3)) * np.nan start_delay = time.time() # stating measurement time @@ -960,7 +885,7 @@ class OhmPi(object): self.pin6.value = False# IHM current injection led on end_delay = time.time() - # truncate the meas array if we didn't fill the last samples #TODO: check why + # truncate the meas array if we didn't fill the last samples meas = meas[:k + 1] # measurement of current i and voltage u during off time @@ -1004,19 +929,10 @@ class OhmPi(object): # TODO send a message on SOH stating the battery level # let's do some calculation (out of the stacking loop) - - # i_stack = np.empty(2 * nb_stack, dtype=object) - # vmn_stack = np.empty(2 * nb_stack, dtype=object) - i_stack, vmn_stack = [], [] - # select appropriate window length to average the readings - window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3) for n, meas in enumerate(fulldata[::2]): # take average from the samples per stack, then sum them all # average for the last third of the stacked values # is done outside the loop - i_stack.append(meas[-int(window):, 0]) - vmn_stack.append(meas[-int(window):, 1]) - sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) if (n % 2) == 0: @@ -1049,64 +965,43 @@ class OhmPi(object): else: np.array([[]]) - vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) - vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) - i_stack_mean = np.mean(i_stack) - i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) - r_stack_mean = vmn_stack_mean / i_stack_mean - r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean - ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) - # create a dictionary and compute averaged values from all stacks - # if self.board_version == 'mb.2023.0.0': - d = { - "time": datetime.now().isoformat(), - "A": quad[0], - "B": quad[1], - "M": quad[2], - "N": quad[3], - "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - "Vmn [mV]": sum_vmn / (2 * nb_stack), - "I [mA]": sum_i / (2 * nb_stack), - "R [ohm]": sum_vmn / sum_i, - "Ps [mV]": sum_ps / (2 * nb_stack), - "nbStack": nb_stack, - "Tx [V]": tx_volt if not out_of_range else 0., - "CPU temp [degC]": CPUTemperature().temperature, - "Nb samples [-]": self.nb_samples, - "fulldata": fulldata, - "I_stack [mA]": i_stack_mean, - "I_std [mA]": i_std, - "I_per_stack [mA]": np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), - "Vmn_stack [mV]": vmn_stack_mean, - "Vmn_std [mV]": vmn_std, - "Vmn_per_stack [mV]": np.array([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)]), - "R_stack [ohm]": r_stack_mean, - "R_std [ohm]": r_stack_std, - "R_per_stack [Ohm]": np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), - "PS_per_stack [mV]": np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)]), - "PS_stack [mV]": ps_stack_mean, - "R_ab [ohm]": Rab - } - # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)])) - # elif self.board_version == '22.10': - # d = { - # "time": datetime.now().isoformat(), - # "A": quad[0], - # "B": quad[1], - # "M": quad[2], - # "N": quad[3], - # "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - # "Vmn [mV]": sum_vmn / (2 * nb_stack), - # "I [mA]": sum_i / (2 * nb_stack), - # "R [ohm]": sum_vmn / sum_i, - # "Ps [mV]": sum_ps / (2 * nb_stack), - # "nbStack": nb_stack, - # "Tx [V]": tx_volt if not out_of_range else 0., - # "CPU temp [degC]": CPUTemperature().temperature, - # "Nb samples [-]": self.nb_samples, - # "fulldata": fulldata, - # } + if self.board_version == 'mb.2023.0.0': + d = { + "time": datetime.now().isoformat(), + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], + "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + "Vmn [mV]": sum_vmn / (2 * nb_stack), + "I [mA]": sum_i / (2 * nb_stack), + "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": sum_ps / (2 * nb_stack), + "nbStack": nb_stack, + "Tx [V]": tx_volt if not out_of_range else 0., + "CPU temp [degC]": CPUTemperature().temperature, + "Nb samples [-]": self.nb_samples, + "fulldata": fulldata, + } + elif self.board_version == '22.10': + d = { + "time": datetime.now().isoformat(), + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], + "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + "Vmn [mV]": sum_vmn / (2 * nb_stack), + "I [mA]": sum_i / (2 * nb_stack), + "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": sum_ps / (2 * nb_stack), + "nbStack": nb_stack, + "Tx [V]": tx_volt if not out_of_range else 0., + "CPU temp [degC]": CPUTemperature().temperature, + "Nb samples [-]": self.nb_samples, + } + else: # for testing, generate random data d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], @@ -1191,17 +1086,19 @@ class OhmPi(object): self.exec_logger.debug(f'Status: {self.status}') self.exec_logger.debug(f'Measuring sequence: {self.sequence}') t0 = time.time() - self.reset_mux() - - - + self.pin2 = self.mcp.get_pin(2) # dsp - + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True # create filename with timestamp filename = self.settings["export_path"].replace('.csv', f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') self.exec_logger.debug(f'Saving to {filename}') # make sure all multiplexer are off - + #self.reset_mux() # measure all quadrupole of the sequence if self.sequence is None: @@ -1218,14 +1115,6 @@ class OhmPi(object): # call the switch_mux function to switch to the right electrodes self.switch_mux_on(quad) - self.mcp = MCP23008(self.i2c, address=0x24) - self.pin2 = self.MCPIHM.get_pin(2) # dsp - - self.pin2.direction = Direction.OUTPUT - self.pin2.value = True - self.pin3 = self.MCPIHM.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True - time.sleep (5) # run a measurement if self.on_pi: @@ -1262,9 +1151,10 @@ class OhmPi(object): # save data and print in a text file self.append_and_save(filename, acquired_data) self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') - self.pin2.value = False - self.pin3.value = False + self.pin2.value = True + self.pin3.value = True self.status = 'idle' + return acquired_data def run_sequence_async(self, cmd_id=None, **kwargs): """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. @@ -1399,16 +1289,15 @@ class OhmPi(object): role : str Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. """ - if not self.use_mux or not self.on_pi: if not self.on_pi: self.exec_logger.warning('Cannot reset mux while in simulation mode...') else: self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' ' Set use_mux to True to use the multiplexer...') - elif self.sequence is None and not self.use_mux: + elif self.sequence is None: self.exec_logger.warning('Unable to switch MUX without a sequence') - else: + elif self.mux_board_version == '2023.0.0': # choose with MUX board tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) @@ -1420,18 +1309,109 @@ class OhmPi(object): if i2c_address is not None: # select the MCP23017 of the selected MUX board mcp2 = MCP23017(tca[i2c_address]) - mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT + mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT if state == 'on': - mcp2.get_pin(relay_nr).value = True + mcp2.get_pin(relay_nr - 1).value = True else: - mcp2.get_pin(relay_nr).value = False + mcp2.get_pin(relay_nr - 1).value = False self.exec_logger.debug(f'Switching relay {relay_nr} ' f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') else: self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') + elif self.mux_board_version == '2024.0.0': + mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table) + with open(mux_addressing_table_file, 'r') as myfile: + header = myfile.readlines()[0].strip('\n').split(',') + mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str, + delimiter=',', skip_header=1, ) + mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))} + + def set_relay_state(mcp, mcp_pin, state=True): + pin_enable = mcp.get_pin(mcp_pin) + pin_enable.direction = Direction.OUTPUT + pin_enable.value = state + mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16) + mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16) + idx = np.where((mux_addressing_table['Electrode_id'] == electrode_nr) & (mux_addressing_table['Role'] == role))[0] + tca_addr = mux_addressing_table['TCA_address'][idx][0] + tca_channel = mux_addressing_table['TCA_channel'][idx][0] + mcp_gpio = mux_addressing_table['MCP_GPIO'][idx][0] + if tca_addr == 'None': + tca = self.i2c + else: + tca = adafruit_tca9548a.TCA9548A(self.i2c, int(tca_addr,16))[tca_channel] + mcp_addr = int(mux_addressing_table['MCP_address'][idx][0], 16) + mcp = MCP23017(tca, address=mcp_addr) + if state == 'on': + print('opening gpio nr', mcp_gpio) + print('opening electrode nr', electrode_nr) + print('opening role', role) + print('opening MCP', mux_addressing_table['MCP_address'][idx][0]) + set_relay_state(mcp, mcp_gpio, True) + if state == 'off': + set_relay_state(mcp, mcp_gpio, False) + + else: + self.exec_logger.warning('MUX board version not recognized') + + def _switch_mux_quad(self, electrodes, state, roles): + """Selects the right channel for the multiplexer cascade for a given electrode. + + Parameters + ---------- + electrodes : np.array or list + Electrode indexes to be switched on or off. + state : str + Either 'on' or 'off'. + roles : np.array or list + Array of roles 'A', 'B', 'M' or 'N', so we can assign them to a MUX board. + """ + mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table) + with open(mux_addressing_table_file, 'r') as myfile: + header = myfile.readlines()[0].strip('\n').split(',') + mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str, + delimiter=',', skip_header=1, ) + mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))} + mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16) + mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16) + + def set_relay_state(mcp, mcp_pin, state=True): + pin_enable = mcp.get_pin(mcp_pin) + pin_enable.direction = Direction.OUTPUT + pin_enable.value = state + + i2c = busio.I2C(board.SCL, board.SDA) + mcp_list = np.array([]) + tca_addr = np.array([]) + tca_channel = np.array([]) + mcp_gpio = np.array([]) + mcp_addr = np.array([]) + for i in range(len(electrodes)): + idx = np.where((mux_addressing_table['Electrode_id'] == electrodes[i]) & (mux_addressing_table['Role'] == roles[i]))[0] + tca_addr = np.append(tca_addr,mux_addressing_table['TCA_address'][idx][0]) + tca_channel = np.append(tca_channel,mux_addressing_table['TCA_channel'][idx][0]) + mcp_addr = np.append(mcp_addr,int(mux_addressing_table['MCP_address'][idx][0], 16)) + mcp_gpio = np.append(mcp_gpio,int(mux_addressing_table['MCP_GPIO'][idx][0])) + if tca_addr[i] == 'None': + tca = i2c + else: + tca = adafruit_tca9548a.TCA9548A(i2c, tca_addr[i]) + mcp_list = np.append(mcp_list,MCP23017(tca, address=int(mcp_addr[i]))) + + mux_list = np.column_stack([tca_addr,tca_channel,mcp_addr]) + mcp_to_address, mcp_idx, mcp_counts = np.unique(mux_list, return_index=True, return_counts=True, axis=0) + for j,mcp in enumerate(mcp_to_address): + for i,mux in enumerate(mux_list): + if np.array_equal(mux,mcp): + mcp_tmp = mcp_list[mcp_idx[j]] + if state == 'on': + set_relay_state(mcp_tmp,int(mcp_gpio[i]), True) + elif state == 'off': + set_relay_state(mcp_tmp,int(mcp_gpio[i]), False) + def switch_mux_on(self, quadrupole, cmd_id=None): """Switches on multiplexer relays for given quadrupole. @@ -1445,9 +1425,12 @@ class OhmPi(object): roles = ['A', 'B', 'M', 'N'] # another check to be sure A != B if quadrupole[0] != quadrupole[1]: - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'on', roles[i]) + if self.mux_board_version == '2024.0.0': + self._switch_mux_quad(quadrupole, 'on', roles) + else: + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'on', roles[i]) else: self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') @@ -1462,9 +1445,12 @@ class OhmPi(object): List of 4 integers representing the electrode numbers. """ roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'off', roles[i]) + if self.mux_board_version == '2024.0.0': + self._switch_mux_quad(quadrupole, 'off', roles) + else: + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'off', roles[i]) def test_mux(self, activation_time=1.0, address=0x70): """Interactive method to test the multiplexer.