From b435bbb789323c5b8de88b8782a8261dcfed4220 Mon Sep 17 00:00:00 2001 From: Arnaud WATLET <arnaud.watlet@umons.ac.be> Date: Wed, 29 Mar 2023 16:16:24 +0200 Subject: [PATCH] Revert "Adds mcp_board_address in ohmpi_config" This reverts commit 3370f9ac17a240cad416041687280e1e6a821275 --- config.py | 3 +- ohmpi.py | 570 ++++++++++++++++++++++++++++-------------------------- 2 files changed, 293 insertions(+), 280 deletions(-) diff --git a/config.py b/config.py index 40d92c6e..17e03a76 100644 --- a/config.py +++ b/config.py @@ -22,8 +22,7 @@ OHMPI_CONFIG = { 'max_elec': 64, 'board_addresses': {'A': 0x73, 'B': 0x72, 'M': 0x71, 'N': 0x70}, # CHECK IF YOUR BOARDS HAVE THESE ADDRESSES 'settings': 'ohmpi_settings.json', # INSERT YOUR FAVORITE SETTINGS FILE HERE - 'board_version': 'mb.2023.0.0',#,'22.10' - 'mcp_board_address': '0x20' + 'board_version': 'mb.2023.0.0'#,'22.10' } # TODO: add a dictionary with INA models and associated gain values # SET THE LOGGING LEVELS, MQTT BROKERS AND MQTT OPTIONS ACCORDING TO YOUR NEEDS diff --git a/ohmpi.py b/ohmpi.py index 4236f615..93290116 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -118,8 +118,8 @@ class OhmPi(object): self.i2c = busio.I2C(board.SCL, board.SDA) # noqa # I2C connexion to MCP23008, for current injection - self.mcp = MCP23008(self.i2c, address=self.mcp_board_address) - self.pin4 = self.mcp.get_pin(4) # Ohmpi_run + self.MCPIHM = MCP23008(self.i2c, address=0x24) + self.pin4 = self.MCPIHM.get_pin(4) # Ohmpi_run self.pin4.direction = Direction.OUTPUT self.pin4.value = True @@ -133,13 +133,13 @@ class OhmPi(object): # current injection module if self.idps: - self.pin2 = self.mcp.get_pin(2) # dsp + + self.pin2 = self.MCPIHM.get_pin(2) # dsp + self.pin2.direction = Direction.OUTPUT self.pin2.value = True - self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3 = self.MCPIHM.get_pin(3) # dsp - self.pin3.direction = Direction.OUTPUT self.pin3.value = True - time.sleep(3) + time.sleep(4) self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc self.DPS.serial.bytesize = 8 # @@ -147,7 +147,7 @@ class OhmPi(object): self.DPS.debug = False # self.DPS.serial.parity = 'N' # No parity self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode - self.DPS.write_register(0x0001, 100, 0) # max current allowed (100 mA for relays) + self.DPS.write_register(0x0001, 1000, 0) # max current allowed (100 mA for relays) print(self.DPS.read_register(0x05,2 )) # max current allowed (100 mA for relays) #voltage self.pin2.value = False @@ -155,10 +155,10 @@ class OhmPi(object): # (last number) 0 is for mA, 3 is for A # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) - self.pin0 = self.mcp.get_pin(0) + self.pin0 = self.MCPIHM.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.mcp.get_pin(1) + self.pin1 = self.MCPIHM.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False @@ -305,67 +305,152 @@ class OhmPi(object): volt = 5. # redefined the pin of the mcp (needed when relays are connected) - self.pin0 = self.mcp.get_pin(0) + self.pin0 = self.MCPIHM.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.mcp.get_pin(1) + self.pin1 = self.MCPIHM.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False # select a polarity to start with self.pin0.value = True self.pin1.value = False - I=0 - vmn=0 - count=0 - - while I < 3 or abs(vmn) < 10 : # I supérieur à 1 mA et Vmn surpérieur - if count >0 : - volt = volt + 2 - count=count+1 - if volt > 50: - break - # set voltage for test - self.DPS.write_register(0x0000, volt, 2) - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time + + + if strategy == 'constant': + vab = volt + + elif strategy == 'vmax': + # implement different strategy + I=0 + vmn=0 + count=0 + while I < 3 or abs(vmn) < 20 : #TODO: hardware related - place in config - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + if count > 0 : + print('o', volt) + volt = volt + 2 + print('>', volt) + count=count+1 + if volt > 50: + break + + # set voltage for test + if count==1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + self.DPS.write_register(0x0000, volt, 2) + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - if abs(vmn)>4500 or I> 45 : - volt = volt - 2 - self.DPS.write_register(0x0000, volt, 2) - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. - - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor + n = 0 + while (abs(vmn) > voltage_max or I > current_max) and volt>0: #If starting voltage is too high, need to lower it down + # print('we are out of range! so decreasing volt') + volt = volt - 2 + self.DPS.write_register(0x0000, volt, 2) + #self.DPS.write_register(0x09, 1) # DPS5005 on + #time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor polarity = -1 vmn = U2 - break - + n+=1 + if n > 25 : + break + + factor_I = (current_max) / I + factor_vmn = voltage_max / vmn + factor = factor_I + if factor_I > factor_vmn: + factor = factor_vmn + vab = factor * volt * 0.8 + if vab > tx_max: + vab = tx_max + print(factor_I, factor_vmn, 'factor!!') + + + elif strategy == 'vmin': + # implement different strategy + I=20 + vmn=400 + count=0 + while I > 10 or abs(vmn) > 300 : #TODO: hardware related - place in config + if count > 0 : + volt = volt - 2 + print(volt, count) + count=count+1 + if volt > 50: + break + + # set voltage for test + if count==1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + self.DPS.write_register(0x0000, volt, 2) + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + + n=0 + while (abs(vmn) < voltage_min or I < current_min) and volt > 0 : #If starting voltage is too high, need to lower it down + # print('we are out of range! so increasing volt') + volt = volt + 2 + print(volt) + self.DPS.write_register(0x0000, volt, 2) + #self.DPS.write_register(0x09, 1) # DPS5005 on + #time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + n+=1 + if n > 25 : + break + + vab = volt self.DPS.write_register(0x09, 0) # DPS5005 off # print('polarity', polarity) @@ -377,27 +462,11 @@ class OhmPi(object): self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') - # implement different strategy - if strategy == 'vmax': - factor_I = (current_max) / I - factor_vmn = voltage_max / vmn - factor = factor_I - if factor_I > factor_vmn: - factor = factor_vmn - vab = factor * volt * 0.9 - if vab > tx_max: - vab = tx_max - - elif strategy == 'constant': - vab = volt - else: - vab = 5 - # self.DPS.write_register(0x09, 0) # DPS5005 off self.pin0.value = False self.pin1.value = False - return vab, polarity + return vab, polarity, Rab @staticmethod def _find_identical_in_line(quads): @@ -647,12 +716,8 @@ class OhmPi(object): self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes self.board_addresses = OHMPI_CONFIG['board_addresses'] self.board_version = OHMPI_CONFIG['board_version'] - self.mux_board_version = OHMPI_CONFIG['mux_board_version'] - self.mux_addressing_table = OHMPI_CONFIG['mux_addressing_table'] - self.mcp_board_address = OHMPI_CONFIG['mcp_board_address'] self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') - def read_quad(self, **kwargs): warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) self.load_sequence(**kwargs) @@ -737,52 +802,54 @@ class OhmPi(object): # let's define the pin again as if we run through measure() # as it's run in another thread, it doesn't consider these # and this can lead to short circuit! - self.pin0 = self.mcp.get_pin(0) + + self.pin0 = self.MCPIHM.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.mcp.get_pin(1) + self.pin1 = self.MCPIHM.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False - self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement + self.pin7 = self.MCPIHM.get_pin(7) #IHM on mesaurement self.pin7.direction = Direction.OUTPUT self.pin7.value = False - if self.sequence is None : - self.pin2 = self.mcp.get_pin(2) # dsp + - self.pin2.direction = Direction.OUTPUT - self.pin2.value = True - self.pin3 = self.mcp.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True - - self.pin5 = self.mcp.get_pin(5) #IHM on mesaurement + if self.idps: + self.pin2 = self.MCPIHM.get_pin(2) # dsp + + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.MCPIHM.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + time.sleep(5) + + self.pin5 = self.MCPIHM.get_pin(5) #IHM on mesaurement self.pin5.direction = Direction.OUTPUT self.pin5.value = True - self.pin6 = self.mcp.get_pin(6) #IHM on mesaurement + self.pin6 = self.MCPIHM.get_pin(6) #IHM on mesaurement self.pin6.direction = Direction.OUTPUT self.pin6.value = False - self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement + self.pin7 = self.MCPIHM.get_pin(7) #IHM on mesaurement self.pin7.direction = Direction.OUTPUT self.pin7.value = False - - if self.DPS.read_register(0x05,2) < 11: - self.pin7.value = True# max current allowed (100 mA for relays) #voltage + if self.idps: + if self.DPS.read_register(0x05,2) < 11: + self.pin7.value = True# max current allowed (100 mA for relays) #voltage # get best voltage to inject AND polarity if self.idps: - tx_volt, polarity = self._compute_tx_volt( + tx_volt, polarity, Rab = self._compute_tx_volt( best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V') else: polarity = 1 + Rab = None # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address, mode=0) self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address, mode=0) - # turn on the power supply start_delay = None end_delay = None @@ -791,40 +858,43 @@ class OhmPi(object): if not np.isnan(tx_volt): self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(0.05) + time.sleep(0.3) else: self.exec_logger.debug('No best voltage found, will not take measurement') out_of_range = True if not out_of_range: # we found a Vab in the range so we measure if autogain: - if self.board_version == 'mb.2023.0.0': - # compute autogain - self.pin0.value = True - self.pin1.value = False - self.pin6.value = True # IHM current injection led on + + # compute autogain + gain_voltage = [] + for n in [0,1]: # make short cycle for gain computation + if n == 0: + self.pin0.value = True + self.pin1.value = False + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on + else: + self.pin0.value = False + self.pin1.value = True # current injection nr2 + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on + time.sleep(injection_duration) gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) if polarity > 0: - gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) else: - gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) self.pin0.value = False self.pin1.value = False - self.pin6.value = False # IHM current injection led off - self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage:.3f}') - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, - address=self.ads_current_address, mode=0) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, - address=self.ads_voltage_address, mode=0) - elif self.board_version == '22.10': - gain_current = 2 / 3 - gain_voltage = 2 / 3 - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, - address=self.ads_current_address, mode=0) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, - address=self.ads_voltage_address, mode=0) + if self.board_version == 'mb.2023.0.0': + self.pin6.value = False # IHM current injection led off + self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, ' + f'{gain_voltage[1]:.3f}') + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, + address=self.ads_current_address, mode=0) self.pin0.value = False self.pin1.value = False @@ -833,8 +903,8 @@ class OhmPi(object): pinMN = 0 if polarity > 0 else 2 # noqa # sampling for each stack at the end of the injection - sampling_interval = 10 # ms - self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 + sampling_interval = 10 # ms # TODO: make this a config option + self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy # full data for waveform fulldata = [] @@ -849,13 +919,18 @@ class OhmPi(object): if (n % 2) == 0: self.pin0.value = True self.pin1.value = False - self.pin6.value = True# IHM current injection led on + if autogain: # select gain computed on first half cycle + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[0], data_rate=860, + address=self.ads_voltage_address, mode=0) else: self.pin0.value = False self.pin1.value = True # current injection nr2 - self.pin6.value = True# IHM current injection led on + if autogain: # select gain computed on first half cycle + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[1], data_rate=860, + address=self.ads_voltage_address, mode=0) self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') - + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on # measurement of current i and voltage u during injection meas = np.zeros((self.nb_samples, 3)) * np.nan start_delay = time.time() # stating measurement time @@ -885,7 +960,7 @@ class OhmPi(object): self.pin6.value = False# IHM current injection led on end_delay = time.time() - # truncate the meas array if we didn't fill the last samples + # truncate the meas array if we didn't fill the last samples #TODO: check why meas = meas[:k + 1] # measurement of current i and voltage u during off time @@ -929,10 +1004,19 @@ class OhmPi(object): # TODO send a message on SOH stating the battery level # let's do some calculation (out of the stacking loop) + + # i_stack = np.empty(2 * nb_stack, dtype=object) + # vmn_stack = np.empty(2 * nb_stack, dtype=object) + i_stack, vmn_stack = [], [] + # select appropriate window length to average the readings + window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3) for n, meas in enumerate(fulldata[::2]): # take average from the samples per stack, then sum them all # average for the last third of the stacked values # is done outside the loop + i_stack.append(meas[-int(window):, 0]) + vmn_stack.append(meas[-int(window):, 1]) + sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) if (n % 2) == 0: @@ -965,43 +1049,64 @@ class OhmPi(object): else: np.array([[]]) - # create a dictionary and compute averaged values from all stacks - if self.board_version == 'mb.2023.0.0': - d = { - "time": datetime.now().isoformat(), - "A": quad[0], - "B": quad[1], - "M": quad[2], - "N": quad[3], - "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - "Vmn [mV]": sum_vmn / (2 * nb_stack), - "I [mA]": sum_i / (2 * nb_stack), - "R [ohm]": sum_vmn / sum_i, - "Ps [mV]": sum_ps / (2 * nb_stack), - "nbStack": nb_stack, - "Tx [V]": tx_volt if not out_of_range else 0., - "CPU temp [degC]": CPUTemperature().temperature, - "Nb samples [-]": self.nb_samples, - "fulldata": fulldata, - } - elif self.board_version == '22.10': - d = { - "time": datetime.now().isoformat(), - "A": quad[0], - "B": quad[1], - "M": quad[2], - "N": quad[3], - "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - "Vmn [mV]": sum_vmn / (2 * nb_stack), - "I [mA]": sum_i / (2 * nb_stack), - "R [ohm]": sum_vmn / sum_i, - "Ps [mV]": sum_ps / (2 * nb_stack), - "nbStack": nb_stack, - "Tx [V]": tx_volt if not out_of_range else 0., - "CPU temp [degC]": CPUTemperature().temperature, - "Nb samples [-]": self.nb_samples, - } + vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) + vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) + i_stack_mean = np.mean(i_stack) + i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) + r_stack_mean = vmn_stack_mean / i_stack_mean + r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean + ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) + # create a dictionary and compute averaged values from all stacks + # if self.board_version == 'mb.2023.0.0': + d = { + "time": datetime.now().isoformat(), + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], + "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + "Vmn [mV]": sum_vmn / (2 * nb_stack), + "I [mA]": sum_i / (2 * nb_stack), + "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": sum_ps / (2 * nb_stack), + "nbStack": nb_stack, + "Tx [V]": tx_volt if not out_of_range else 0., + "CPU temp [degC]": CPUTemperature().temperature, + "Nb samples [-]": self.nb_samples, + "fulldata": fulldata, + "I_stack [mA]": i_stack_mean, + "I_std [mA]": i_std, + "I_per_stack [mA]": np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), + "Vmn_stack [mV]": vmn_stack_mean, + "Vmn_std [mV]": vmn_std, + "Vmn_per_stack [mV]": np.array([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)]), + "R_stack [ohm]": r_stack_mean, + "R_std [ohm]": r_stack_std, + "R_per_stack [Ohm]": np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), + "PS_per_stack [mV]": np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)]), + "PS_stack [mV]": ps_stack_mean, + "R_ab [ohm]": Rab + } + # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)])) + # elif self.board_version == '22.10': + # d = { + # "time": datetime.now().isoformat(), + # "A": quad[0], + # "B": quad[1], + # "M": quad[2], + # "N": quad[3], + # "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + # "Vmn [mV]": sum_vmn / (2 * nb_stack), + # "I [mA]": sum_i / (2 * nb_stack), + # "R [ohm]": sum_vmn / sum_i, + # "Ps [mV]": sum_ps / (2 * nb_stack), + # "nbStack": nb_stack, + # "Tx [V]": tx_volt if not out_of_range else 0., + # "CPU temp [degC]": CPUTemperature().temperature, + # "Nb samples [-]": self.nb_samples, + # "fulldata": fulldata, + # } else: # for testing, generate random data d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], @@ -1086,19 +1191,17 @@ class OhmPi(object): self.exec_logger.debug(f'Status: {self.status}') self.exec_logger.debug(f'Measuring sequence: {self.sequence}') t0 = time.time() - self.pin2 = self.mcp.get_pin(2) # dsp - - self.pin2.direction = Direction.OUTPUT - self.pin2.value = True - self.pin3 = self.mcp.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True + self.reset_mux() + + + # create filename with timestamp filename = self.settings["export_path"].replace('.csv', f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') self.exec_logger.debug(f'Saving to {filename}') # make sure all multiplexer are off - #self.reset_mux() + # measure all quadrupole of the sequence if self.sequence is None: @@ -1115,6 +1218,14 @@ class OhmPi(object): # call the switch_mux function to switch to the right electrodes self.switch_mux_on(quad) + self.mcp = MCP23008(self.i2c, address=0x24) + self.pin2 = self.MCPIHM.get_pin(2) # dsp - + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.MCPIHM.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + time.sleep (5) # run a measurement if self.on_pi: @@ -1151,10 +1262,9 @@ class OhmPi(object): # save data and print in a text file self.append_and_save(filename, acquired_data) self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') - self.pin2.value = True - self.pin3.value = True + self.pin2.value = False + self.pin3.value = False self.status = 'idle' - return acquired_data def run_sequence_async(self, cmd_id=None, **kwargs): """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. @@ -1289,15 +1399,16 @@ class OhmPi(object): role : str Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. """ + if not self.use_mux or not self.on_pi: if not self.on_pi: self.exec_logger.warning('Cannot reset mux while in simulation mode...') else: self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' ' Set use_mux to True to use the multiplexer...') - elif self.sequence is None: + elif self.sequence is None and not self.use_mux: self.exec_logger.warning('Unable to switch MUX without a sequence') - elif self.mux_board_version == '2023.0.0': + else: # choose with MUX board tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) @@ -1309,109 +1420,18 @@ class OhmPi(object): if i2c_address is not None: # select the MCP23017 of the selected MUX board mcp2 = MCP23017(tca[i2c_address]) - mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT + mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT if state == 'on': - mcp2.get_pin(relay_nr - 1).value = True + mcp2.get_pin(relay_nr).value = True else: - mcp2.get_pin(relay_nr - 1).value = False + mcp2.get_pin(relay_nr).value = False self.exec_logger.debug(f'Switching relay {relay_nr} ' f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') else: self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') - elif self.mux_board_version == '2024.0.0': - mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table) - with open(mux_addressing_table_file, 'r') as myfile: - header = myfile.readlines()[0].strip('\n').split(',') - mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str, - delimiter=',', skip_header=1, ) - mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))} - - def set_relay_state(mcp, mcp_pin, state=True): - pin_enable = mcp.get_pin(mcp_pin) - pin_enable.direction = Direction.OUTPUT - pin_enable.value = state - mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16) - mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16) - idx = np.where((mux_addressing_table['Electrode_id'] == electrode_nr) & (mux_addressing_table['Role'] == role))[0] - tca_addr = mux_addressing_table['TCA_address'][idx][0] - tca_channel = mux_addressing_table['TCA_channel'][idx][0] - mcp_gpio = mux_addressing_table['MCP_GPIO'][idx][0] - if tca_addr == 'None': - tca = self.i2c - else: - tca = adafruit_tca9548a.TCA9548A(self.i2c, int(tca_addr,16))[tca_channel] - mcp_addr = int(mux_addressing_table['MCP_address'][idx][0], 16) - mcp = MCP23017(tca, address=mcp_addr) - if state == 'on': - print('opening gpio nr', mcp_gpio) - print('opening electrode nr', electrode_nr) - print('opening role', role) - print('opening MCP', mux_addressing_table['MCP_address'][idx][0]) - set_relay_state(mcp, mcp_gpio, True) - if state == 'off': - set_relay_state(mcp, mcp_gpio, False) - - else: - self.exec_logger.warning('MUX board version not recognized') - - def _switch_mux_quad(self, electrodes, state, roles): - """Selects the right channel for the multiplexer cascade for a given electrode. - - Parameters - ---------- - electrodes : np.array or list - Electrode indexes to be switched on or off. - state : str - Either 'on' or 'off'. - roles : np.array or list - Array of roles 'A', 'B', 'M' or 'N', so we can assign them to a MUX board. - """ - mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table) - with open(mux_addressing_table_file, 'r') as myfile: - header = myfile.readlines()[0].strip('\n').split(',') - mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str, - delimiter=',', skip_header=1, ) - mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))} - mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16) - mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16) - - def set_relay_state(mcp, mcp_pin, state=True): - pin_enable = mcp.get_pin(mcp_pin) - pin_enable.direction = Direction.OUTPUT - pin_enable.value = state - - i2c = busio.I2C(board.SCL, board.SDA) - mcp_list = np.array([]) - tca_addr = np.array([]) - tca_channel = np.array([]) - mcp_gpio = np.array([]) - mcp_addr = np.array([]) - for i in range(len(electrodes)): - idx = np.where((mux_addressing_table['Electrode_id'] == electrodes[i]) & (mux_addressing_table['Role'] == roles[i]))[0] - tca_addr = np.append(tca_addr,mux_addressing_table['TCA_address'][idx][0]) - tca_channel = np.append(tca_channel,mux_addressing_table['TCA_channel'][idx][0]) - mcp_addr = np.append(mcp_addr,int(mux_addressing_table['MCP_address'][idx][0], 16)) - mcp_gpio = np.append(mcp_gpio,int(mux_addressing_table['MCP_GPIO'][idx][0])) - if tca_addr[i] == 'None': - tca = i2c - else: - tca = adafruit_tca9548a.TCA9548A(i2c, tca_addr[i]) - mcp_list = np.append(mcp_list,MCP23017(tca, address=int(mcp_addr[i]))) - - mux_list = np.column_stack([tca_addr,tca_channel,mcp_addr]) - mcp_to_address, mcp_idx, mcp_counts = np.unique(mux_list, return_index=True, return_counts=True, axis=0) - for j,mcp in enumerate(mcp_to_address): - for i,mux in enumerate(mux_list): - if np.array_equal(mux,mcp): - mcp_tmp = mcp_list[mcp_idx[j]] - if state == 'on': - set_relay_state(mcp_tmp,int(mcp_gpio[i]), True) - elif state == 'off': - set_relay_state(mcp_tmp,int(mcp_gpio[i]), False) - def switch_mux_on(self, quadrupole, cmd_id=None): """Switches on multiplexer relays for given quadrupole. @@ -1425,12 +1445,9 @@ class OhmPi(object): roles = ['A', 'B', 'M', 'N'] # another check to be sure A != B if quadrupole[0] != quadrupole[1]: - if self.mux_board_version == '2024.0.0': - self._switch_mux_quad(quadrupole, 'on', roles) - else: - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'on', roles[i]) + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'on', roles[i]) else: self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') @@ -1445,12 +1462,9 @@ class OhmPi(object): List of 4 integers representing the electrode numbers. """ roles = ['A', 'B', 'M', 'N'] - if self.mux_board_version == '2024.0.0': - self._switch_mux_quad(quadrupole, 'off', roles) - else: - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'off', roles[i]) + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'off', roles[i]) def test_mux(self, activation_time=1.0, address=0x70): """Interactive method to test the multiplexer. -- GitLab