diff --git a/ohmpi.py b/ohmpi.py index 4236f615910f8f54fd18afafdd656cee379e4410..d3029cb8d6e111c4b2460810cea27b8026e97d5a 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -52,7 +52,7 @@ class OhmPi(object): """ OhmPi class. """ - def __init__(self, settings=None, sequence=None, use_mux=False, mqtt=True, onpi=None, idps=False): + def __init__(self, settings=None, sequence=None, use_mux=None, mqtt=True, onpi=None, idps=None): """Constructs the ohmpi object Parameters @@ -110,7 +110,16 @@ class OhmPi(object): if sequence is not None: self.load_sequence(sequence) + #Use MUX by default on mb.2023.0.0 version + if self.use_mux is None: + if self.board_version == "mb.2023.0.0": + self.use_mux = True + self.idps = idps # flag to use dps for injection or not + #Use IDPS by default on mb.2023.0.0 version + if self.idps is None: + if self.board_version == "mb.2023.0.0": + self.idps = True # connect to components on the OhmPi board if self.on_pi: @@ -118,8 +127,8 @@ class OhmPi(object): self.i2c = busio.I2C(board.SCL, board.SDA) # noqa # I2C connexion to MCP23008, for current injection - self.mcp = MCP23008(self.i2c, address=self.mcp_board_address) - self.pin4 = self.mcp.get_pin(4) # Ohmpi_run + self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address) + self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run self.pin4.direction = Direction.OUTPUT self.pin4.value = True @@ -133,10 +142,10 @@ class OhmPi(object): # current injection module if self.idps: - self.pin2 = self.mcp.get_pin(2) # dsp + + self.pin2 = self.mcp_board.get_pin(2) # dsp + self.pin2.direction = Direction.OUTPUT self.pin2.value = True - self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3 = self.mcp_board.get_pin(3) # dsp - self.pin3.direction = Direction.OUTPUT self.pin3.value = True time.sleep(3) @@ -155,10 +164,10 @@ class OhmPi(object): # (last number) 0 is for mA, 3 is for A # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) - self.pin0 = self.mcp.get_pin(0) + self.pin0 = self.mcp_board.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.mcp.get_pin(1) + self.pin1 = self.mcp_board.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False @@ -305,67 +314,148 @@ class OhmPi(object): volt = 5. # redefined the pin of the mcp (needed when relays are connected) - self.pin0 = self.mcp.get_pin(0) + self.pin0 = self.mcp_board.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.mcp.get_pin(1) + self.pin1 = self.mcp_board.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False # select a polarity to start with self.pin0.value = True self.pin1.value = False - I=0 - vmn=0 - count=0 - - while I < 3 or abs(vmn) < 10 : # I supÊrieur à 1 mA et Vmn surpÊrieur - if count >0 : - volt = volt + 2 - count=count+1 - if volt > 50: - break - # set voltage for test - self.DPS.write_register(0x0000, volt, 2) - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time - - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - if abs(vmn)>4500 or I> 45 : - volt = volt - 2 - self.DPS.write_register(0x0000, volt, 2) - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. - - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor + + if strategy == 'constant': + vab = volt + + elif strategy == 'vmax': + # implement different strategy + I=0 + vmn=0 + count=0 + while I < 3 or abs(vmn) < 20 : #TODO: hardware related - place in config + if count > 0 : + volt = volt + 2 + count = count + 1 + if volt > 50: + break + + # set voltage for test + self.DPS.write_register(0x0000, volt, 2) + if count==1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor polarity = -1 vmn = U2 - break - + + n = 0 + while (abs(vmn) > voltage_max or I > current_max) and volt>0: #If starting voltage is too high, need to lower it down + # print('we are out of range! so decreasing volt') + volt = volt - 2 + self.DPS.write_register(0x0000, volt, 2) + #self.DPS.write_register(0x09, 1) # DPS5005 on + #time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + n+=1 + if n > 25 : + break + + factor_I = (current_max) / I + factor_vmn = voltage_max / vmn + factor = factor_I + if factor_I > factor_vmn: + factor = factor_vmn + #print('factor', factor_I, factor_vmn) + vab = factor * volt #* 0.8 + print(factor, volt, vab) + if vab > tx_max: + vab = tx_max + + elif strategy == 'vmin': + # implement different strategy + I=20 + vmn=400 + count=0 + while I > 10 or abs(vmn) > 300 : #TODO: hardware related - place in config + if count > 0 : + volt = volt - 2 + count=count+1 + if volt > 50: + break + + # set voltage for test + if count==1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + self.DPS.write_register(0x0000, volt, 2) + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + + n=0 + while (abs(vmn) < voltage_min or I < current_min) and volt > 0 : #If starting voltage is too high, need to lower it down + # print('we are out of range! so increasing volt') + volt = volt + 2 + print(volt) + self.DPS.write_register(0x0000, volt, 2) + #self.DPS.write_register(0x09, 1) # DPS5005 on + #time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + n+=1 + if n > 25 : + break + + vab = volt self.DPS.write_register(0x09, 0) # DPS5005 off # print('polarity', polarity) @@ -377,23 +467,6 @@ class OhmPi(object): self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') - # implement different strategy - if strategy == 'vmax': - factor_I = (current_max) / I - factor_vmn = voltage_max / vmn - factor = factor_I - if factor_I > factor_vmn: - factor = factor_vmn - vab = factor * volt * 0.9 - if vab > tx_max: - vab = tx_max - - elif strategy == 'constant': - vab = volt - else: - vab = 5 - - # self.DPS.write_register(0x09, 0) # DPS5005 off self.pin0.value = False self.pin1.value = False @@ -737,32 +810,32 @@ class OhmPi(object): # let's define the pin again as if we run through measure() # as it's run in another thread, it doesn't consider these # and this can lead to short circuit! - self.pin0 = self.mcp.get_pin(0) + self.pin0 = self.mcp_board.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False - self.pin1 = self.mcp.get_pin(1) + self.pin1 = self.mcp_board.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False - self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement + self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement self.pin7.direction = Direction.OUTPUT self.pin7.value = False if self.sequence is None : - self.pin2 = self.mcp.get_pin(2) # dsp + + self.pin2 = self.mcp_board.get_pin(2) # dsp + self.pin2.direction = Direction.OUTPUT self.pin2.value = True - self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3 = self.mcp_board.get_pin(3) # dsp - self.pin3.direction = Direction.OUTPUT self.pin3.value = True - self.pin5 = self.mcp.get_pin(5) #IHM on mesaurement + self.pin5 = self.mcp_board.get_pin(5) #IHM on mesaurement self.pin5.direction = Direction.OUTPUT self.pin5.value = True - self.pin6 = self.mcp.get_pin(6) #IHM on mesaurement + self.pin6 = self.mcp_board.get_pin(6) #IHM on mesaurement self.pin6.direction = Direction.OUTPUT self.pin6.value = False - self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement + self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement self.pin7.direction = Direction.OUTPUT self.pin7.value = False @@ -798,33 +871,40 @@ class OhmPi(object): if not out_of_range: # we found a Vab in the range so we measure if autogain: - if self.board_version == 'mb.2023.0.0': - # compute autogain - self.pin0.value = True - self.pin1.value = False - self.pin6.value = True # IHM current injection led on + # compute autogain + gain_voltage = [] + for n in [0,1]: # make short cycle for gain computation + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, + address=self.ads_voltage_address, mode=0) + if n == 0: + self.pin0.value = True + self.pin1.value = False + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on + else: + self.pin0.value = False + self.pin1.value = True # current injection nr2 + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on + time.sleep(injection_duration) gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + if polarity > 0: - gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + if n == 0: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + else: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) else: - gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - self.pin0.value = False - self.pin1.value = False - self.pin6.value = False # IHM current injection led off - self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage:.3f}') - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, - address=self.ads_current_address, mode=0) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, - address=self.ads_voltage_address, mode=0) - elif self.board_version == '22.10': - gain_current = 2 / 3 - gain_voltage = 2 / 3 - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, - address=self.ads_current_address, mode=0) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, - address=self.ads_voltage_address, mode=0) + if n == 0: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + else: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + # self.pin0.value = False + # self.pin1.value = False + if self.board_version == 'mb.2023.0.0': + self.pin6.value = False # IHM current injection led off self.pin0.value = False self.pin1.value = False @@ -965,43 +1045,65 @@ class OhmPi(object): else: np.array([[]]) + vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) + vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) + i_stack_mean = np.mean(i_stack) + i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) + r_stack_mean = vmn_stack_mean / i_stack_mean + r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean + ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) + if Rab is None: + Rab = 'None' # create a dictionary and compute averaged values from all stacks - if self.board_version == 'mb.2023.0.0': - d = { - "time": datetime.now().isoformat(), - "A": quad[0], - "B": quad[1], - "M": quad[2], - "N": quad[3], - "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - "Vmn [mV]": sum_vmn / (2 * nb_stack), - "I [mA]": sum_i / (2 * nb_stack), - "R [ohm]": sum_vmn / sum_i, - "Ps [mV]": sum_ps / (2 * nb_stack), - "nbStack": nb_stack, - "Tx [V]": tx_volt if not out_of_range else 0., - "CPU temp [degC]": CPUTemperature().temperature, - "Nb samples [-]": self.nb_samples, - "fulldata": fulldata, - } - elif self.board_version == '22.10': - d = { - "time": datetime.now().isoformat(), - "A": quad[0], - "B": quad[1], - "M": quad[2], - "N": quad[3], - "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - "Vmn [mV]": sum_vmn / (2 * nb_stack), - "I [mA]": sum_i / (2 * nb_stack), - "R [ohm]": sum_vmn / sum_i, - "Ps [mV]": sum_ps / (2 * nb_stack), - "nbStack": nb_stack, - "Tx [V]": tx_volt if not out_of_range else 0., - "CPU temp [degC]": CPUTemperature().temperature, - "Nb samples [-]": self.nb_samples, - } - + # if self.board_version == 'mb.2023.0.0': + d = { + "time": datetime.now().isoformat(), + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], + "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + "Vmn [mV]": sum_vmn / (2 * nb_stack), + "I [mA]": sum_i / (2 * nb_stack), + "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": sum_ps / (2 * nb_stack), + "nbStack": nb_stack, + "Tx [V]": tx_volt if not out_of_range else 0., + "CPU temp [degC]": CPUTemperature().temperature, + "Nb samples [-]": self.nb_samples, + "fulldata": fulldata, + "I_stack [mA]": i_stack_mean, + "I_std [mA]": i_std, + "I_per_stack [mA]": [np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)], + "Vmn_stack [mV]": vmn_stack_mean, + "Vmn_std [mV]": vmn_std, + "Vmn_per_stack [mV]": [np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)], + "R_stack [ohm]": r_stack_mean, + "R_std [ohm]": r_stack_std, + "R_per_stack [Ohm]": list(np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)])), + "PS_per_stack [mV]": list(np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)])), + "PS_stack [mV]": ps_stack_mean, + "R_ab [ohm]": Rab + } + # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)])) + # elif self.board_version == '22.10': + # d = { + # "time": datetime.now().isoformat(), + # "A": quad[0], + # "B": quad[1], + # "M": quad[2], + # "N": quad[3], + # "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + # "Vmn [mV]": sum_vmn / (2 * nb_stack), + # "I [mA]": sum_i / (2 * nb_stack), + # "R [ohm]": sum_vmn / sum_i, + # "Ps [mV]": sum_ps / (2 * nb_stack), + # "nbStack": nb_stack, + # "Tx [V]": tx_volt if not out_of_range else 0., + # "CPU temp [degC]": CPUTemperature().temperature, + # "Nb samples [-]": self.nb_samples, + # "fulldata": fulldata, + # } else: # for testing, generate random data d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], @@ -1086,10 +1188,10 @@ class OhmPi(object): self.exec_logger.debug(f'Status: {self.status}') self.exec_logger.debug(f'Measuring sequence: {self.sequence}') t0 = time.time() - self.pin2 = self.mcp.get_pin(2) # dsp - + self.pin2 = self.mcp_board.get_pin(2) # dsp - self.pin2.direction = Direction.OUTPUT self.pin2.value = True - self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3 = self.mcp_board.get_pin(3) # dsp - self.pin3.direction = Direction.OUTPUT self.pin3.value = True # create filename with timestamp @@ -1115,6 +1217,14 @@ class OhmPi(object): # call the switch_mux function to switch to the right electrodes self.switch_mux_on(quad) + self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address) + self.pin2 = self.MCPIHM.get_pin(2) # dsp - + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.MCPIHM.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + time.sleep(4) # run a measurement if self.on_pi: