Commit 3591061a authored by remi.clement@inrae.fr's avatar remi.clement@inrae.fr
Browse files

update run_measurement with autogain and tx_volt and protection dps5005

Showing with 125 additions and 89 deletions
+125 -89
......@@ -55,7 +55,7 @@ class OhmPi(object):
sequence: 1, 2, 3, 4 is used.
"""
def __init__(self, config=None, sequence=None, mqtt=False, on_pi=None):
def __init__(self, config=None, sequence=None, mqtt=False, on_pi=None, idps=False):
# flags and attributes
if on_pi is None:
_, on_pi = OhmPi.get_platform()
......@@ -100,6 +100,8 @@ class OhmPi(object):
else:
self.read_quad(sequence)
self.idps = idps # flag to use dps for injection or not
# connect to components on the OhmPi board
if self.on_pi:
# activation of I2C protocol
......@@ -115,13 +117,16 @@ class OhmPi(object):
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48)
# current injection module
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, slave address (in decimal)
self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc
self.DPS.serial.bytesize = 8 #
self.DPS.serial.timeout = 1 # greater than 0.5 for it to work
self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
if self.idps:
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, slave address (in decimal)
self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc
self.DPS.serial.bytesize = 8 #
self.DPS.serial.timeout = 1 # greater than 0.5 for it to work
self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays)
# (last number) 0 is for mA, 3 is for A
# injection courant and measure (TODO check if it works, otherwise back in run_measurement())
self.pin0 = self.mcp.get_pin(0)
......@@ -410,7 +415,7 @@ class OhmPi(object):
tau = np.nan
# voltage optimization
for volt in range(4, 10, 2):
for volt in range(2, 10, 2):
print('trying with v:', volt)
self.DPS.write_register(0x0000,volt,2) # fixe la voltage pour la mesure à 5V
time.sleep(1) # inject for 1 s at least on DPS5005
......@@ -493,7 +498,7 @@ class OhmPi(object):
print('voltage out of range')
self.DPS.write_register(0x09, 0) # DPS5005 off
# we keep DPS5005 on if we computed a tau successfully
# turn off Vab
self.pin0.value = False
self.pin1.value = False
......@@ -501,17 +506,32 @@ class OhmPi(object):
return tau*volt, polarity
def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None):
def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None,
best_tx=True, tx_volt=0, autogain=True):
"""Do a 4 electrode measurement and measure transfer resistance obtained.
Parameters
----------
quad : list of int
Quadrupole to measure.
nb_stack : int, optional
Number of stacks.
Number of stacks. A stacl is considered two half-cycles (one
positive, one negative).
injection_duration : int, optional
Injection time in seconds.
quad : list of int
Quadrupole to measure.
best_tx : bool, optional
If True, will attempt to find the best Tx voltage that fill
within our measurement range. If it cannot find it, it will
return NaN as measurement. If False, it will make the
measurement with whatever it has as voltage and never returns
NaN. Finding the best tx voltage can take some time before
each quadrupole.
tx_volt : float, optional
If specified, voltage will be imposed disregarding the value
of best_tx argument.
autogain : bool, optional
If True, will adapt the gain of the ADS1115 to maximize the
resolution of the reading.
"""
# check arguments
if nb_stack is None:
......@@ -530,91 +550,107 @@ class OhmPi(object):
self.exec_logger.info('Waiting for data')
# get best voltage to inject
tx_volt, polarity = self.compute_tx_volt()
print('tx volt V:', tx_volt)
# autogain function now that we know the tau
# ADS1115 for current measurement (AB)
if self.idps and tx_volt == 0:
tx_volt, polarity = self.compute_tx_volt()
print('tx volt V:', tx_volt)
else:
polarity = 1
# first reset the gain to 2/3 before trying to find best gain
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x49)
# ADS1115 for voltage measurement (MN)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48)
# turn on the power supply
self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage
self.DPS.write_register(0x09, 1) # DPS5005 on
# compute autogain
self.pin0.value = True
self.pin1.value = False
time.sleep(injection_duration)
gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0))
if polarity > 0:
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0))
else:
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2))
self.pin0.value = False
self.pin1.value = False
print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48)
# one stack = 2 half-cycles (one positive, one negative)
pinMN = 0 if polarity > 0 else 2
for n in range(0, nb_stack * 2): # for each half-cycles
# current injection
if (n % 2) == 0:
oor = False
if self.idps:
if tx_volt != np.nan:
self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V
self.DPS.write_register(0x09, 1) # DPS5005 on
else:
print('no best voltage found, will not take measurement')
oor = True
if oor == False:
if autogain:
# compute autogain
self.pin0.value = True
self.pin1.value = False
else:
time.sleep(injection_duration)
gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0))
if polarity > 0:
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0))
else:
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2))
self.pin0.value = False
self.pin1.value = True # current injection nr2
self.pin1.value = False
print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48)
# one stack = 2 half-cycles (one positive, one negative)
pinMN = 0 if polarity > 0 else 2
for n in range(0, nb_stack * 2): # for each half-cycles
# current injection
if (n % 2) == 0:
self.pin0.value = True
self.pin1.value = False
else:
self.pin0.value = False
self.pin1.value = True # current injection nr2
start_delay = time.time() # stating measurement time
time.sleep(injection_duration) # delay depending on current injection duration
# measurement of current i and voltage u
# sampling for each stack at the end of the injection
meas = np.zeros((self.nb_samples, 2))
for k in range(0, self.nb_samples):
# reading current value on ADS channel A0
meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt)
if pinMN == 0:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000
else:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1
print(meas)
start_delay = time.time() # stating measurement time
time.sleep(injection_duration) # delay depending on current injection duration
# measurement of current i and voltage u
# sampling for each stack at the end of the injection
meas = np.zeros((self.nb_samples, 3))
for k in range(0, self.nb_samples):
# reading current value on ADS channel A0
meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt)
# we alternate on which ADS1115 pin we measure because of sign of voltage
if pinMN == 0:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000
pinMN = 2
else:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1
print(meas)
# we alternate on which ADS1115 pin we measure because of sign of voltage
if pinMN == 0:
pinMN = 2
else:
pinMN = 0
# stop current injection
self.pin0.value = False
self.pin1.value = False
end_delay = time.time()
# take average from the samples per stack, then sum them all
# average for all stack is done outside the loop
sum_i = sum_i + (np.mean(meas[:, 0]))
vmn1 = np.mean(meas[:, 1]) - np.mean(meas[:, 2])
if (n % 2) == 0:
sum_vmn = sum_vmn - vmn1
sum_ps = sum_ps + vmn1
else:
sum_vmn = sum_vmn + vmn1
sum_ps = sum_ps + vmn1
# TODO get battery voltage and warn if battery is running low
# TODO send a message on SOH stating the battery level
end_calc = time.time()
# TODO I am not sure I understand the computation below
# wait twice the actual injection time between two injection
# so it's a 50% duty cycle right?
time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay))
self.DPS.write_register(0x09, 0) # DPS5005 off
pinMN = 0
# stop current injection
self.pin0.value = False
self.pin1.value = False
end_delay = time.time()
# take average from the samples per stack, then sum them all
# average for all stack is done outside the loop
sum_i = sum_i + (np.mean(meas[:, 0]))
vmn1 = np.mean(meas[:, 1])
if (n % 2) == 0:
sum_vmn = sum_vmn - vmn1
sum_ps = sum_ps + vmn1
else:
sum_vmn = sum_vmn + vmn1
sum_ps = sum_ps + vmn1
# TODO get battery voltage and warn if battery is running low
# TODO send a message on SOH stating the battery level
end_calc = time.time()
# TODO I am not sure I understand the computation below
# wait twice the actual injection time between two injection
# so it's a 50% duty cycle right?
time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay))
if self.idps:
self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt
self.DPS.write_register(0x09, 0) # DPS5005 off
else:
sum_i = np.nan
sum_vmn = np.nan
sum_ps = np.nan
# create a dictionary and compute averaged values from all stacks
d = {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment