diff --git a/ohmpi.py b/ohmpi.py index 23b27fc01900676080d5c0d1550c74ad9649c584..b06781a8be310b5b466be7cbdf8dc91c075debd5 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -17,6 +17,7 @@ from datetime import datetime from termcolor import colored import threading from logging_setup import setup_loggers +import minimalmodbus # for programmable power supply # from mqtt_setup import mqtt_client_setup @@ -108,10 +109,28 @@ class OhmPi(object): self.mcp = MCP23008(self.i2c, address=0x20) # ADS1115 for current measurement (AB) - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48) + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x49) # ADS1115 for voltage measurement (MN) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48) + + # current injection module + self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, slave address (in decimal) + self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc + self.DPS.serial.bytesize = 8 # + self.DPS.serial.timeout = 1 # greater than 0.5 for it to work + self.DPS.debug = False # + self.DPS.serial.parity = 'N' # No parity + self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode + + # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) + self.pin0 = self.mcp.get_pin(0) + self.pin0.direction = Direction.OUTPUT + self.pin0.value = False + self.pin1 = self.mcp.get_pin(1) + self.pin1.direction = Direction.OUTPUT + self.pin1.value = False + def _read_acquisition_parameters(self, config): """Read acquisition parameters. @@ -136,6 +155,7 @@ class OhmPi(object): self.pardict.update(dic) self.exec_logger.debug('Acquisition parameters updated: ' + str(self.pardict)) + def _read_hardware_parameters(self): """Read hardware parameters from config.py """ @@ -154,6 +174,7 @@ class OhmPi(object): self.board_address = OHMPI_CONFIG['board_address'] self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') + @staticmethod def find_identical_in_line(quads): """Find quadrupole which where A and B are identical. @@ -193,6 +214,7 @@ class OhmPi(object): # output.append(i) return output + @staticmethod def get_platform(): """Get platform name and check if it is a raspberry pi @@ -212,6 +234,7 @@ class OhmPi(object): pass return platform, on_pi + def read_quad(self, filename): """Read quadrupole sequence from file. @@ -252,6 +275,7 @@ class OhmPi(object): self.sequence = output + def switch_mux(self, electrode_nr, state, role): """Select the right channel for the multiplexer cascade for a given electrode. @@ -305,6 +329,7 @@ class OhmPi(object): else: self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') + def switch_mux_on(self, quadrupole): """ Switch on multiplexer relays for given quadrupole. @@ -333,6 +358,7 @@ class OhmPi(object): for i in range(0, 4): self.switch_mux(quadrupole[i], 'off', roles[i]) + def reset_mux(self): """Switch off all multiplexer relays.""" roles = ['A', 'B', 'M', 'N'] @@ -341,6 +367,7 @@ class OhmPi(object): self.switch_mux(j, 'off', roles[i]) self.exec_logger.debug('All MUX switched off.') + def gain_auto(self, channel): """ Automatically set the gain on a channel @@ -364,8 +391,118 @@ class OhmPi(object): self.exec_logger.debug(f'Setting gain to {gain}') return gain - def run_measurement(self, quad, nb_stack=None, injection_duration=None): - """ Do a 4 electrode measurement and measure transfer resistance obtained. + + def compute_tx_volt(self): + """Compute best voltage to inject to be in our range of Vmn + (10 mV - 4500 mV) and current (2 - 45 mA) + """ + # inferring best voltage for injection Vab + # we guess the polarity on Vmn by trying both cases. once found + # we inject a starting voltage of 5V and measure our Vmn. Based + # on the data we then compute a multiplifcation factor to inject + # a voltage that will fall right in the measurable range of Vmn + # (10 - 4500 mV) and current (45 mA max) + + # select a polarity to start with + self.pin0.value = True + self.pin1.value = False + self.DPS.write_register(0x09, 1) # DPS5005 on + + tau = np.nan + # voltage optimization + for volt in range(1, 10, 2): + print('trying with v:', volt) + self.DPS.write_register(0x0000,volt,2) # fixe la voltage pour la mesure à 5V + time.sleep(1) # inject for 1 s at least on DPS5005 + + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=128, address=0x49) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=128, address=0x48) + print(AnalogIn(self.ads_current, ads.P0).voltage) + print(AnalogIn(self.ads_voltage, ads.P0).voltage) + print(AnalogIn(self.ads_voltage, ads.P2).voltage) + gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) + print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48) + + # we measure the voltage on both A0 and A2 to guess the polarity + I = (AnalogIn(self.ads_current, ads.P0).voltage) * 1000/50/2 # measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 # measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 + print('I (mV)', I*50*2) + print('I (mA)', I) + print('U0 (mV)', U0) + print('U2 (mV)', U2) + + # check polarity + polarity = 1 # by default, we guessed it right + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + print('polarity', polarity) + # TODO (edge case) if PS is negative and greater than Vmn, it can + # potentially cause two negative values so none above 0 + + # check if we can actually measure smth + ok = True + if I > 2 and I <= 45: + if (((U0 < 4500) and (polarity > 0)) + or ((2 < 4500) and (polarity < 0))): + if (((U0 > 10) and (polarity > 0)) + or ((U2 > 10) and (polarity < 0))): + # ok, we compute tau + # inferring polarity and computing best voltage to inject + # by hardware design we can measure 10-4500 mV and 2-45 mA + # we will decide on the Vab to fall within this range + if U0 > 0: # we guessed the polarity right, let's keep that + tauI = 45 / I # compute ratio to maximize measuring range of I + tauU = 4500 / U0 # compute ratio to maximize measuring range of U + elif U0 < 0: # we guessed it wrong, let's use a correction factor + tauI = 45 / I + tauU = 4500 / U2 + + # let's be careful and avoid saturation by taking only 90% of + # the smallest factor + if tauI < tauU: + tau = tauI * 0.9 + elif tauI > tauU: + tau = tauU * 0.9 + print('tauI', tauI) + print('tauU', tauU) + print('best tau is', tau) + break + else: + # too weak, but let's try with a higher voltage + pass # we'll come back to the loop with higher voltage + else: + print('voltage out of range, max 4500 mV') + # doesn't work, tau will be NaN + break + else: + if I <= 2: + # let's try again + pass + else: + print('current out of range, max 45 mA') + # doesn't work, tau will be NaN + break + if tau == np.nan: + print('voltage out of range') + self.DPS.write_register(0x09, 0) # DPS5005 off + # we keep DPS5005 on if we computed a tau successfully + + # turn off Vab + self.pin0.value = False + self.pin1.value = False + + return tau*volt, polarity + + + def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None): + """Do a 4 electrode measurement and measure transfer resistance obtained. Parameters ---------- @@ -376,7 +513,6 @@ class OhmPi(object): quad : list of int Quadrupole to measure. """ - # TODO here we can add the current_injected or voltage_injected in mA or mV # check arguments if nb_stack is None: nb_stack = self.pardict['nb_stack'] @@ -386,48 +522,53 @@ class OhmPi(object): start_time = time.time() # inner variable initialization - injection_current = 0 + sum_i = 0 sum_vmn = 0 sum_ps = 0 - # injection courant and measure - pin0 = self.mcp.get_pin(0) - pin0.direction = Direction.OUTPUT - pin1 = self.mcp.get_pin(1) - pin1.direction = Direction.OUTPUT - pin0.value = False - pin1.value = False - self.exec_logger.debug('Starting measurement') self.exec_logger.info('Waiting for data') - # FUNCTION AUTOGAIN + # get best voltage to inject + tx_volt, polarity = self.compute_tx_volt() + print('tx volt V:', tx_volt) + + # autogain function now that we know the tau # ADS1115 for current measurement (AB) - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48) + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x49) # ADS1115 for voltage measurement (MN) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49) - # try auto gain - pin1.value = True - pin0.value = False + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48) + + # turn on the power supply + self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage + self.DPS.write_register(0x09, 1) # DPS5005 on + + # compute autogain + self.pin0.value = True + self.pin1.value = False time.sleep(injection_duration) gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0, ads.P1)) - pin0.value = False - pin1.value = False + if polarity > 0: + gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + else: + gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + self.pin0.value = False + self.pin1.value = False print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=0x48) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=0x49) + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48) - # TODO I don't get why 3 + 2*nb_stack - 1? why not just range(nb_stack)? - # or do we consider 1 stack = one full polarity? do we discard the first 3 readings? - for n in range(0, 3 + 2 * nb_stack - 1): + # one stack = 2 half-cycles (one positive, one negative) + pinMN = 0 if polarity > 0 else 2 + for n in range(0, nb_stack * 2): # for each half-cycles # current injection if (n % 2) == 0: - pin1.value = True - pin0.value = False # current injection polarity nr1 + self.pin0.value = True + self.pin1.value = False else: - pin0.value = True - pin1.value = False # current injection nr2 + self.pin0.value = False + self.pin1.value = True # current injection nr2 + start_delay = time.time() # stating measurement time time.sleep(injection_duration) # delay depending on current injection duration @@ -437,18 +578,26 @@ class OhmPi(object): for k in range(0, self.nb_samples): # reading current value on ADS channel A0 meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) - meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 - # reading voltage value on ADS channel A2 - # meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000 - + if pinMN == 0: + meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 + else: + meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1 + print(meas) + + # we alternate on which ADS1115 pin we measure because of sign of voltage + if pinMN == 0: + pinMN = 2 + else: + pinMN = 0 + # stop current injection - pin1.value = False - pin0.value = False + self.pin0.value = False + self.pin1.value = False end_delay = time.time() # take average from the samples per stack, then sum them all # average for all stack is done outside the loop - injection_current = injection_current + (np.mean(meas[:, 0])) + sum_i = sum_i + (np.mean(meas[:, 0])) vmn1 = np.mean(meas[:, 1]) - np.mean(meas[:, 2]) if (n % 2) == 0: sum_vmn = sum_vmn - vmn1 @@ -465,6 +614,7 @@ class OhmPi(object): # wait twice the actual injection time between two injection # so it's a 50% duty cycle right? time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay)) + self.DPS.write_register(0x09, 0) # DPS5005 off # create a dictionary and compute averaged values from all stacks d = { @@ -474,10 +624,10 @@ class OhmPi(object): "M": quad[2], "N": quad[3], "inj time [ms]": (end_delay - start_delay) * 1000, - "Vmn [mV]": (sum_vmn / (3 + 2 * nb_stack - 1)), - "I [mA]": (injection_current / (3 + 2 * nb_stack - 1)), - "R [ohm]": (sum_vmn / (3 + 2 * nb_stack - 1) / (injection_current / (3 + 2 * nb_stack - 1))), - "Ps [mV]": (sum_ps / (3 + 2 * nb_stack - 1)), + "Vmn [mV]": sum_vmn / (2 * nb_stack), + "I [mA]": sum_i / (2 * nb_stack), + "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": sum_ps / (2 * nb_stack), "nbStack": nb_stack, "CPU temp [degC]": CPUTemperature().temperature, "Time [s]": (-start_time + time.time()), @@ -496,10 +646,10 @@ class OhmPi(object): output += f'{val}\t' output = output[:-1] self.exec_logger.debug(output) - time.sleep(1) # NOTE: why this? return d + def rs_check(self): """ Check contact resistance. """ @@ -543,25 +693,25 @@ class OhmPi(object): self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c) # current injection - pin0 = self.mcp.get_pin(0) - pin0.direction = Direction.OUTPUT - pin1 = self.mcp.get_pin(1) - pin1.direction = Direction.OUTPUT - pin0.value = False - pin1.value = False + self.pin0 = self.mcp.get_pin(0) + self.pin0.direction = Direction.OUTPUT + self.pin1 = self.mcp.get_pin(1) + self.pin1.direction = Direction.OUTPUT + self.pin0.value = False + self.pin1.value = False # call the switch_mux function to switch to the right electrodes self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48) # ADS1115 for voltage measurement (MN) self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49) - pin1.value = True # inject from pin1 to pin0 - pin0.value = False + self.pin1.value = True # inject from pin1 to self.pin0 + self.pin0.value = False time.sleep(0.5) # measure current and voltage current = AnalogIn(self.ads_current, ads.P0).voltage / (50 * self.r_shunt) - voltage = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * 2.5 + voltage = -AnalogIn(self.ads_voltage, ads.P0, ADS.P2).voltage * 2.5 resistance = voltage / current print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} Ohm'.format( current*1000, voltage*1000, resistance)) @@ -590,8 +740,8 @@ class OhmPi(object): # close mux path and put pin back to GND self.switch_mux_off(quad) - pin0.value = False - pin1.value = False + self.pin0.value = False + self.pin1.value = False self.reset_mux() self.status = 'idle' @@ -732,9 +882,9 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S")) # for testing if __name__ == "__main__": ohmpi = OhmPi(config='ohmpi_param.json') + ohmpi.run_measurement() #ohmpi.measure() #ohmpi.read_quad('breadboard.txt') - ohmpi.rs_check() #ohmpi.measure() #time.sleep(20) #ohmpi.stop()