Commit 31d355e8 authored by remi.clement@inrae.fr's avatar remi.clement@inrae.fr
Browse files

ohmpi update to run_measurement() for board v4

Showing with 206 additions and 56 deletions
+206 -56
......@@ -17,6 +17,7 @@ from datetime import datetime
from termcolor import colored
import threading
from logging_setup import setup_loggers
import minimalmodbus # for programmable power supply
# from mqtt_setup import mqtt_client_setup
......@@ -108,10 +109,28 @@ class OhmPi(object):
self.mcp = MCP23008(self.i2c, address=0x20)
# ADS1115 for current measurement (AB)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x49)
# ADS1115 for voltage measurement (MN)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48)
# current injection module
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, slave address (in decimal)
self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc
self.DPS.serial.bytesize = 8 #
self.DPS.serial.timeout = 1 # greater than 0.5 for it to work
self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
# injection courant and measure (TODO check if it works, otherwise back in run_measurement())
self.pin0 = self.mcp.get_pin(0)
self.pin0.direction = Direction.OUTPUT
self.pin0.value = False
self.pin1 = self.mcp.get_pin(1)
self.pin1.direction = Direction.OUTPUT
self.pin1.value = False
def _read_acquisition_parameters(self, config):
"""Read acquisition parameters.
......@@ -136,6 +155,7 @@ class OhmPi(object):
self.pardict.update(dic)
self.exec_logger.debug('Acquisition parameters updated: ' + str(self.pardict))
def _read_hardware_parameters(self):
"""Read hardware parameters from config.py
"""
......@@ -154,6 +174,7 @@ class OhmPi(object):
self.board_address = OHMPI_CONFIG['board_address']
self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}')
@staticmethod
def find_identical_in_line(quads):
"""Find quadrupole which where A and B are identical.
......@@ -193,6 +214,7 @@ class OhmPi(object):
# output.append(i)
return output
@staticmethod
def get_platform():
"""Get platform name and check if it is a raspberry pi
......@@ -212,6 +234,7 @@ class OhmPi(object):
pass
return platform, on_pi
def read_quad(self, filename):
"""Read quadrupole sequence from file.
......@@ -252,6 +275,7 @@ class OhmPi(object):
self.sequence = output
def switch_mux(self, electrode_nr, state, role):
"""Select the right channel for the multiplexer cascade for a given electrode.
......@@ -305,6 +329,7 @@ class OhmPi(object):
else:
self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
def switch_mux_on(self, quadrupole):
""" Switch on multiplexer relays for given quadrupole.
......@@ -333,6 +358,7 @@ class OhmPi(object):
for i in range(0, 4):
self.switch_mux(quadrupole[i], 'off', roles[i])
def reset_mux(self):
"""Switch off all multiplexer relays."""
roles = ['A', 'B', 'M', 'N']
......@@ -341,6 +367,7 @@ class OhmPi(object):
self.switch_mux(j, 'off', roles[i])
self.exec_logger.debug('All MUX switched off.')
def gain_auto(self, channel):
""" Automatically set the gain on a channel
......@@ -364,8 +391,118 @@ class OhmPi(object):
self.exec_logger.debug(f'Setting gain to {gain}')
return gain
def run_measurement(self, quad, nb_stack=None, injection_duration=None):
""" Do a 4 electrode measurement and measure transfer resistance obtained.
def compute_tx_volt(self):
"""Compute best voltage to inject to be in our range of Vmn
(10 mV - 4500 mV) and current (2 - 45 mA)
"""
# inferring best voltage for injection Vab
# we guess the polarity on Vmn by trying both cases. once found
# we inject a starting voltage of 5V and measure our Vmn. Based
# on the data we then compute a multiplifcation factor to inject
# a voltage that will fall right in the measurable range of Vmn
# (10 - 4500 mV) and current (45 mA max)
# select a polarity to start with
self.pin0.value = True
self.pin1.value = False
self.DPS.write_register(0x09, 1) # DPS5005 on
tau = np.nan
# voltage optimization
for volt in range(1, 10, 2):
print('trying with v:', volt)
self.DPS.write_register(0x0000,volt,2) # fixe la voltage pour la mesure à 5V
time.sleep(1) # inject for 1 s at least on DPS5005
# autogain
self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=128, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=128, address=0x48)
print(AnalogIn(self.ads_current, ads.P0).voltage)
print(AnalogIn(self.ads_voltage, ads.P0).voltage)
print(AnalogIn(self.ads_voltage, ads.P2).voltage)
gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0))
gain_voltage0 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0))
gain_voltage2 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2))
gain_voltage = np.min([gain_voltage0, gain_voltage2])
print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48)
# we measure the voltage on both A0 and A2 to guess the polarity
I = (AnalogIn(self.ads_current, ads.P0).voltage) * 1000/50/2 # measure current
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 # measure voltage
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000
print('I (mV)', I*50*2)
print('I (mA)', I)
print('U0 (mV)', U0)
print('U2 (mV)', U2)
# check polarity
polarity = 1 # by default, we guessed it right
if U0 < 0: # we guessed it wrong, let's use a correction factor
polarity = -1
print('polarity', polarity)
# TODO (edge case) if PS is negative and greater than Vmn, it can
# potentially cause two negative values so none above 0
# check if we can actually measure smth
ok = True
if I > 2 and I <= 45:
if (((U0 < 4500) and (polarity > 0))
or ((2 < 4500) and (polarity < 0))):
if (((U0 > 10) and (polarity > 0))
or ((U2 > 10) and (polarity < 0))):
# ok, we compute tau
# inferring polarity and computing best voltage to inject
# by hardware design we can measure 10-4500 mV and 2-45 mA
# we will decide on the Vab to fall within this range
if U0 > 0: # we guessed the polarity right, let's keep that
tauI = 45 / I # compute ratio to maximize measuring range of I
tauU = 4500 / U0 # compute ratio to maximize measuring range of U
elif U0 < 0: # we guessed it wrong, let's use a correction factor
tauI = 45 / I
tauU = 4500 / U2
# let's be careful and avoid saturation by taking only 90% of
# the smallest factor
if tauI < tauU:
tau = tauI * 0.9
elif tauI > tauU:
tau = tauU * 0.9
print('tauI', tauI)
print('tauU', tauU)
print('best tau is', tau)
break
else:
# too weak, but let's try with a higher voltage
pass # we'll come back to the loop with higher voltage
else:
print('voltage out of range, max 4500 mV')
# doesn't work, tau will be NaN
break
else:
if I <= 2:
# let's try again
pass
else:
print('current out of range, max 45 mA')
# doesn't work, tau will be NaN
break
if tau == np.nan:
print('voltage out of range')
self.DPS.write_register(0x09, 0) # DPS5005 off
# we keep DPS5005 on if we computed a tau successfully
# turn off Vab
self.pin0.value = False
self.pin1.value = False
return tau*volt, polarity
def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None):
"""Do a 4 electrode measurement and measure transfer resistance obtained.
Parameters
----------
......@@ -376,7 +513,6 @@ class OhmPi(object):
quad : list of int
Quadrupole to measure.
"""
# TODO here we can add the current_injected or voltage_injected in mA or mV
# check arguments
if nb_stack is None:
nb_stack = self.pardict['nb_stack']
......@@ -386,48 +522,53 @@ class OhmPi(object):
start_time = time.time()
# inner variable initialization
injection_current = 0
sum_i = 0
sum_vmn = 0
sum_ps = 0
# injection courant and measure
pin0 = self.mcp.get_pin(0)
pin0.direction = Direction.OUTPUT
pin1 = self.mcp.get_pin(1)
pin1.direction = Direction.OUTPUT
pin0.value = False
pin1.value = False
self.exec_logger.debug('Starting measurement')
self.exec_logger.info('Waiting for data')
# FUNCTION AUTOGAIN
# get best voltage to inject
tx_volt, polarity = self.compute_tx_volt()
print('tx volt V:', tx_volt)
# autogain function now that we know the tau
# ADS1115 for current measurement (AB)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x49)
# ADS1115 for voltage measurement (MN)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49)
# try auto gain
pin1.value = True
pin0.value = False
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48)
# turn on the power supply
self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage
self.DPS.write_register(0x09, 1) # DPS5005 on
# compute autogain
self.pin0.value = True
self.pin1.value = False
time.sleep(injection_duration)
gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0))
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0, ads.P1))
pin0.value = False
pin1.value = False
if polarity > 0:
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0))
else:
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2))
self.pin0.value = False
self.pin1.value = False
print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=0x48)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=0x49)
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48)
# TODO I don't get why 3 + 2*nb_stack - 1? why not just range(nb_stack)?
# or do we consider 1 stack = one full polarity? do we discard the first 3 readings?
for n in range(0, 3 + 2 * nb_stack - 1):
# one stack = 2 half-cycles (one positive, one negative)
pinMN = 0 if polarity > 0 else 2
for n in range(0, nb_stack * 2): # for each half-cycles
# current injection
if (n % 2) == 0:
pin1.value = True
pin0.value = False # current injection polarity nr1
self.pin0.value = True
self.pin1.value = False
else:
pin0.value = True
pin1.value = False # current injection nr2
self.pin0.value = False
self.pin1.value = True # current injection nr2
start_delay = time.time() # stating measurement time
time.sleep(injection_duration) # delay depending on current injection duration
......@@ -437,18 +578,26 @@ class OhmPi(object):
for k in range(0, self.nb_samples):
# reading current value on ADS channel A0
meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt)
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000
# reading voltage value on ADS channel A2
# meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000
if pinMN == 0:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000
else:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1
print(meas)
# we alternate on which ADS1115 pin we measure because of sign of voltage
if pinMN == 0:
pinMN = 2
else:
pinMN = 0
# stop current injection
pin1.value = False
pin0.value = False
self.pin0.value = False
self.pin1.value = False
end_delay = time.time()
# take average from the samples per stack, then sum them all
# average for all stack is done outside the loop
injection_current = injection_current + (np.mean(meas[:, 0]))
sum_i = sum_i + (np.mean(meas[:, 0]))
vmn1 = np.mean(meas[:, 1]) - np.mean(meas[:, 2])
if (n % 2) == 0:
sum_vmn = sum_vmn - vmn1
......@@ -465,6 +614,7 @@ class OhmPi(object):
# wait twice the actual injection time between two injection
# so it's a 50% duty cycle right?
time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay))
self.DPS.write_register(0x09, 0) # DPS5005 off
# create a dictionary and compute averaged values from all stacks
d = {
......@@ -474,10 +624,10 @@ class OhmPi(object):
"M": quad[2],
"N": quad[3],
"inj time [ms]": (end_delay - start_delay) * 1000,
"Vmn [mV]": (sum_vmn / (3 + 2 * nb_stack - 1)),
"I [mA]": (injection_current / (3 + 2 * nb_stack - 1)),
"R [ohm]": (sum_vmn / (3 + 2 * nb_stack - 1) / (injection_current / (3 + 2 * nb_stack - 1))),
"Ps [mV]": (sum_ps / (3 + 2 * nb_stack - 1)),
"Vmn [mV]": sum_vmn / (2 * nb_stack),
"I [mA]": sum_i / (2 * nb_stack),
"R [ohm]": sum_vmn / sum_i,
"Ps [mV]": sum_ps / (2 * nb_stack),
"nbStack": nb_stack,
"CPU temp [degC]": CPUTemperature().temperature,
"Time [s]": (-start_time + time.time()),
......@@ -496,10 +646,10 @@ class OhmPi(object):
output += f'{val}\t'
output = output[:-1]
self.exec_logger.debug(output)
time.sleep(1) # NOTE: why this?
return d
def rs_check(self):
""" Check contact resistance.
"""
......@@ -543,25 +693,25 @@ class OhmPi(object):
self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c)
# current injection
pin0 = self.mcp.get_pin(0)
pin0.direction = Direction.OUTPUT
pin1 = self.mcp.get_pin(1)
pin1.direction = Direction.OUTPUT
pin0.value = False
pin1.value = False
self.pin0 = self.mcp.get_pin(0)
self.pin0.direction = Direction.OUTPUT
self.pin1 = self.mcp.get_pin(1)
self.pin1.direction = Direction.OUTPUT
self.pin0.value = False
self.pin1.value = False
# call the switch_mux function to switch to the right electrodes
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48)
# ADS1115 for voltage measurement (MN)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49)
pin1.value = True # inject from pin1 to pin0
pin0.value = False
self.pin1.value = True # inject from pin1 to self.pin0
self.pin0.value = False
time.sleep(0.5)
# measure current and voltage
current = AnalogIn(self.ads_current, ads.P0).voltage / (50 * self.r_shunt)
voltage = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * 2.5
voltage = -AnalogIn(self.ads_voltage, ads.P0, ADS.P2).voltage * 2.5
resistance = voltage / current
print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} Ohm'.format(
current*1000, voltage*1000, resistance))
......@@ -590,8 +740,8 @@ class OhmPi(object):
# close mux path and put pin back to GND
self.switch_mux_off(quad)
pin0.value = False
pin1.value = False
self.pin0.value = False
self.pin1.value = False
self.reset_mux()
self.status = 'idle'
......@@ -732,9 +882,9 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
# for testing
if __name__ == "__main__":
ohmpi = OhmPi(config='ohmpi_param.json')
ohmpi.run_measurement()
#ohmpi.measure()
#ohmpi.read_quad('breadboard.txt')
ohmpi.rs_check()
#ohmpi.measure()
#time.sleep(20)
#ohmpi.stop()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment