diff --git a/ohmpi.py b/ohmpi.py index a7f4bd46fff88fe7c8135bf990e1499b0e669362..ef69102311fc53437687d550b44813276dff2d68 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -55,7 +55,7 @@ class OhmPi(object): sequence: 1, 2, 3, 4 is used. """ - def __init__(self, config=None, sequence=None, mqtt=False, on_pi=None): + def __init__(self, config=None, sequence=None, mqtt=False, on_pi=None, idps=False): # flags and attributes if on_pi is None: _, on_pi = OhmPi.get_platform() @@ -100,6 +100,8 @@ class OhmPi(object): else: self.read_quad(sequence) + self.idps = idps # flag to use dps for injection or not + # connect to components on the OhmPi board if self.on_pi: # activation of I2C protocol @@ -115,13 +117,16 @@ class OhmPi(object): self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48) # current injection module - self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, slave address (in decimal) - self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc - self.DPS.serial.bytesize = 8 # - self.DPS.serial.timeout = 1 # greater than 0.5 for it to work - self.DPS.debug = False # - self.DPS.serial.parity = 'N' # No parity - self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode + if self.idps: + self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, slave address (in decimal) + self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc + self.DPS.serial.bytesize = 8 # + self.DPS.serial.timeout = 1 # greater than 0.5 for it to work + self.DPS.debug = False # + self.DPS.serial.parity = 'N' # No parity + self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode + self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays) + # (last number) 0 is for mA, 3 is for A # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) self.pin0 = self.mcp.get_pin(0) @@ -410,7 +415,7 @@ class OhmPi(object): tau = np.nan # voltage optimization - for volt in range(4, 10, 2): + for volt in range(2, 10, 2): print('trying with v:', volt) self.DPS.write_register(0x0000,volt,2) # fixe la voltage pour la mesure à 5V time.sleep(1) # inject for 1 s at least on DPS5005 @@ -493,7 +498,7 @@ class OhmPi(object): print('voltage out of range') self.DPS.write_register(0x09, 0) # DPS5005 off # we keep DPS5005 on if we computed a tau successfully - + # turn off Vab self.pin0.value = False self.pin1.value = False @@ -501,17 +506,32 @@ class OhmPi(object): return tau*volt, polarity - def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None): + def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None, + best_tx=True, tx_volt=0, autogain=True): """Do a 4 electrode measurement and measure transfer resistance obtained. Parameters ---------- + quad : list of int + Quadrupole to measure. nb_stack : int, optional - Number of stacks. + Number of stacks. A stacl is considered two half-cycles (one + positive, one negative). injection_duration : int, optional Injection time in seconds. - quad : list of int - Quadrupole to measure. + best_tx : bool, optional + If True, will attempt to find the best Tx voltage that fill + within our measurement range. If it cannot find it, it will + return NaN as measurement. If False, it will make the + measurement with whatever it has as voltage and never returns + NaN. Finding the best tx voltage can take some time before + each quadrupole. + tx_volt : float, optional + If specified, voltage will be imposed disregarding the value + of best_tx argument. + autogain : bool, optional + If True, will adapt the gain of the ADS1115 to maximize the + resolution of the reading. """ # check arguments if nb_stack is None: @@ -530,91 +550,107 @@ class OhmPi(object): self.exec_logger.info('Waiting for data') # get best voltage to inject - tx_volt, polarity = self.compute_tx_volt() - print('tx volt V:', tx_volt) - - # autogain function now that we know the tau - # ADS1115 for current measurement (AB) + if self.idps and tx_volt == 0: + tx_volt, polarity = self.compute_tx_volt() + print('tx volt V:', tx_volt) + else: + polarity = 1 + + # first reset the gain to 2/3 before trying to find best gain self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x49) - # ADS1115 for voltage measurement (MN) self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48) # turn on the power supply - self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage - self.DPS.write_register(0x09, 1) # DPS5005 on - - # compute autogain - self.pin0.value = True - self.pin1.value = False - time.sleep(injection_duration) - gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0)) - if polarity > 0: - gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - else: - gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - self.pin0.value = False - self.pin1.value = False - print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48) - - # one stack = 2 half-cycles (one positive, one negative) - pinMN = 0 if polarity > 0 else 2 - for n in range(0, nb_stack * 2): # for each half-cycles - # current injection - if (n % 2) == 0: + oor = False + if self.idps: + if tx_volt != np.nan: + self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V + self.DPS.write_register(0x09, 1) # DPS5005 on + else: + print('no best voltage found, will not take measurement') + oor = True + + if oor == False: + if autogain: + # compute autogain self.pin0.value = True self.pin1.value = False - else: + time.sleep(injection_duration) + gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0)) + if polarity > 0: + gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + else: + gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2)) self.pin0.value = False - self.pin1.value = True # current injection nr2 + self.pin1.value = False + print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48) + + # one stack = 2 half-cycles (one positive, one negative) + pinMN = 0 if polarity > 0 else 2 + for n in range(0, nb_stack * 2): # for each half-cycles + # current injection + if (n % 2) == 0: + self.pin0.value = True + self.pin1.value = False + else: + self.pin0.value = False + self.pin1.value = True # current injection nr2 + + start_delay = time.time() # stating measurement time + time.sleep(injection_duration) # delay depending on current injection duration + + # measurement of current i and voltage u + # sampling for each stack at the end of the injection + meas = np.zeros((self.nb_samples, 2)) + for k in range(0, self.nb_samples): + # reading current value on ADS channel A0 + meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) + if pinMN == 0: + meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 + else: + meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1 + print(meas) - start_delay = time.time() # stating measurement time - time.sleep(injection_duration) # delay depending on current injection duration - - # measurement of current i and voltage u - # sampling for each stack at the end of the injection - meas = np.zeros((self.nb_samples, 3)) - for k in range(0, self.nb_samples): - # reading current value on ADS channel A0 - meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) + # we alternate on which ADS1115 pin we measure because of sign of voltage if pinMN == 0: - meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 + pinMN = 2 else: - meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1 - print(meas) - - # we alternate on which ADS1115 pin we measure because of sign of voltage - if pinMN == 0: - pinMN = 2 - else: - pinMN = 0 - - # stop current injection - self.pin0.value = False - self.pin1.value = False - end_delay = time.time() - - # take average from the samples per stack, then sum them all - # average for all stack is done outside the loop - sum_i = sum_i + (np.mean(meas[:, 0])) - vmn1 = np.mean(meas[:, 1]) - np.mean(meas[:, 2]) - if (n % 2) == 0: - sum_vmn = sum_vmn - vmn1 - sum_ps = sum_ps + vmn1 - else: - sum_vmn = sum_vmn + vmn1 - sum_ps = sum_ps + vmn1 - - # TODO get battery voltage and warn if battery is running low - # TODO send a message on SOH stating the battery level - end_calc = time.time() - - # TODO I am not sure I understand the computation below - # wait twice the actual injection time between two injection - # so it's a 50% duty cycle right? - time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay)) - self.DPS.write_register(0x09, 0) # DPS5005 off + pinMN = 0 + + # stop current injection + self.pin0.value = False + self.pin1.value = False + end_delay = time.time() + + # take average from the samples per stack, then sum them all + # average for all stack is done outside the loop + sum_i = sum_i + (np.mean(meas[:, 0])) + vmn1 = np.mean(meas[:, 1]) + if (n % 2) == 0: + sum_vmn = sum_vmn - vmn1 + sum_ps = sum_ps + vmn1 + else: + sum_vmn = sum_vmn + vmn1 + sum_ps = sum_ps + vmn1 + + # TODO get battery voltage and warn if battery is running low + # TODO send a message on SOH stating the battery level + end_calc = time.time() + + # TODO I am not sure I understand the computation below + # wait twice the actual injection time between two injection + # so it's a 50% duty cycle right? + time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay)) + + if self.idps: + self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt + self.DPS.write_register(0x09, 0) # DPS5005 off + else: + sum_i = np.nan + sum_vmn = np.nan + sum_ps = np.nan # create a dictionary and compute averaged values from all stacks d = {