Commit 510430bc authored by remi.clement@inrae.fr's avatar remi.clement@inrae.fr
Browse files

fix rs_check with dph5005

Showing with 303 additions and 166 deletions
+303 -166
...@@ -3,8 +3,8 @@ from paho.mqtt.client import MQTTv31 ...@@ -3,8 +3,8 @@ from paho.mqtt.client import MQTTv31
# OhmPi configuration # OhmPi configuration
OHMPI_CONFIG = { OHMPI_CONFIG = {
'id': '0001', # Unique identifier of the OhmPi board (string) 'id': '0001', # Unique identifier of the OhmPi board (string)
'R_shunt': 2, # Shunt resistance in Ohms 'R_shunt': 1, # Shunt resistance in Ohms
'Imax': 4800/50/2, # Maximum current 'Imax': 4800/50/1, # Maximum current
'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V 'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V
'coef_p3': 2.50, # slope for current conversion for ADS.P3, measurement in V/V 'coef_p3': 2.50, # slope for current conversion for ADS.P3, measurement in V/V
'offset_p2': 0, 'offset_p2': 0,
...@@ -12,7 +12,7 @@ OHMPI_CONFIG = { ...@@ -12,7 +12,7 @@ OHMPI_CONFIG = {
'integer': 2, # Max value 10 # TODO: Explain what this is... 'integer': 2, # Max value 10 # TODO: Explain what this is...
'version': 2, 'version': 2,
'max_elec': 64, 'max_elec': 64,
'board_address': {'A': 0x70, 'B': 0x71, 'M': 0x72, 'N': 0x73} # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70} 'board_address': {'A': 0x72, 'B': 0x73, 'M': 0x70, 'N': 0x71} # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70}
#'board_address': {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70} # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70} #'board_address': {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70} # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70}
} }
...@@ -80,4 +80,4 @@ MQTT_CONTROL_CONFIG = { ...@@ -80,4 +80,4 @@ MQTT_CONTROL_CONFIG = {
'transport': 'tcp', 'transport': 'tcp',
'client_id': f'ohmpi_sn_{OHMPI_CONFIG["id"]}', 'client_id': f'ohmpi_sn_{OHMPI_CONFIG["id"]}',
'cmd_topic': f'cmd_ohmpi_sn_{OHMPI_CONFIG["id"]}' 'cmd_topic': f'cmd_ohmpi_sn_{OHMPI_CONFIG["id"]}'
} }
\ No newline at end of file
...@@ -18,6 +18,7 @@ from termcolor import colored ...@@ -18,6 +18,7 @@ from termcolor import colored
import threading import threading
from logging_setup import setup_loggers from logging_setup import setup_loggers
import minimalmodbus # for programmable power supply import minimalmodbus # for programmable power supply
from copy import deepcopy
# from mqtt_setup import mqtt_client_setup # from mqtt_setup import mqtt_client_setup
...@@ -96,7 +97,7 @@ class OhmPi(object): ...@@ -96,7 +97,7 @@ class OhmPi(object):
# read quadrupole sequence # read quadrupole sequence
if sequence is None: if sequence is None:
self.sequence = np.array([[1, 2, 3, 4]]) self.sequence = np.array([[1, 4, 2, 3]])
else: else:
self.read_quad(sequence) self.read_quad(sequence)
...@@ -300,26 +301,12 @@ class OhmPi(object): ...@@ -300,26 +301,12 @@ class OhmPi(object):
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role]) tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role])
# find I2C address of the electrode and corresponding relay # find I2C address of the electrode and corresponding relay
# TODO from number of electrode, the below can be guessed
# considering that one MCP23017 can cover 16 electrodes # considering that one MCP23017 can cover 16 electrodes
electrode_nr = electrode_nr - 1 # switch to 0 indexing electrode_nr = electrode_nr - 1 # switch to 0 indexing
i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16 relay_nr = electrode_nr - (electrode_nr // 16) * 16
relay_nr = relay_nr + 1 # switch back to 1 based indexing relay_nr = relay_nr + 1 # switch back to 1 based indexing
# if electrode_nr < 17:
# i2c_address = 7
# relay_nr = electrode_nr
# elif 16 < electrode_nr < 33:
# i2c_address = 6
# relay_nr = electrode_nr - 16
# elif 32 < electrode_nr < 49:
# i2c_address = 5
# relay_nr = electrode_nr - 32
# elif 48 < electrode_nr < 65:
# i2c_address = 4
# relay_nr = electrode_nr - 48
if i2c_address is not None: if i2c_address is not None:
# select the MCP23017 of the selected MUX board # select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address]) mcp2 = MCP23017(tca[i2c_address])
...@@ -347,10 +334,12 @@ class OhmPi(object): ...@@ -347,10 +334,12 @@ class OhmPi(object):
# another check to be sure A != B # another check to be sure A != B
if quadrupole[0] != quadrupole[1]: if quadrupole[0] != quadrupole[1]:
for i in range(0, 4): for i in range(0, 4):
self.switch_mux(quadrupole[i], 'on', roles[i]) if quadrupole[i] > 0:
self.switch_mux(quadrupole[i], 'on', roles[i])
else: else:
self.exec_logger.error('A == B -> short circuit risk detected!') self.exec_logger.error('A == B -> short circuit risk detected!')
def switch_mux_off(self, quadrupole): def switch_mux_off(self, quadrupole):
""" Switch off multiplexer relays for given quadrupole. """ Switch off multiplexer relays for given quadrupole.
...@@ -361,7 +350,8 @@ class OhmPi(object): ...@@ -361,7 +350,8 @@ class OhmPi(object):
""" """
roles = ['A', 'B', 'M', 'N'] roles = ['A', 'B', 'M', 'N']
for i in range(0, 4): for i in range(0, 4):
self.switch_mux(quadrupole[i], 'off', roles[i]) if quadrupole[i] > 0:
self.switch_mux(quadrupole[i], 'off', roles[i])
def reset_mux(self): def reset_mux(self):
...@@ -395,119 +385,149 @@ class OhmPi(object): ...@@ -395,119 +385,149 @@ class OhmPi(object):
gain = 16 gain = 16
self.exec_logger.debug(f'Setting gain to {gain}') self.exec_logger.debug(f'Setting gain to {gain}')
return gain return gain
def compute_tx_volt(self, best_tx_injtime=1): def compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5):
"""Compute best voltage to inject to be in our range of Vmn """Estimating best Tx voltage based on different strategy.
(10 mV - 4500 mV) and current (2 - 45 mA) At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
A constant c = vmn/iab is computed (only depends on geometric
factor and ground resistivity, that doesn't change during a
quadrupole). Then depending on the strategy, we compute which
vab to inject to reach the minimum/maximum Iab current or
min/max Vmn.
This function also compute the polarity on Vmn (on which pin
of the ADS1115 we need to measure Vmn to get the positive value).
Parameters
----------
best_tx_injtime : float, optional
Time in milliseconds for the half-cycle used to compute Rab.
strategy : str, optional
Either:
- vmin : compute Vab to reach a minimum Iab and Vmn
- vmax : compute Vab to reach a maximum Iab and Vmn
- constant : apply given Vab
tx_volt : float, optional
Voltage apply to try to guess the best voltage. 5 V applied
by default. If strategy "constant" is chosen, constant voltage
to applied is "tx_volt".
Returns
-------
vab : float
Proposed Vab according to the given strategy.
polarity : int
Either 1 or -1 to know on which pin of the ADS the Vmn is measured.
""" """
# inferring best voltage for injection Vab
# we guess the polarity on Vmn by trying both cases. once found # hardware limits
# we inject a starting voltage of 5V and measure our Vmn. Based voltage_min = 10 # mV
# on the data we then compute a multiplifcation factor to inject voltage_max = 4500
# a voltage that will fall right in the measurable range of Vmn current_min = voltage_min / (self.r_shunt * 50) # mA
# (10 - 4500 mV) and current (45 mA max) current_max = voltage_max / (self.r_shunt * 50)
tx_max = 40 # volt
# check of volt
volt = tx_volt
if volt > tx_max:
print('sorry, cannot inject more than 40 V, set it back to 5 V')
volt = 5
# redefined the pin of the mcp (needed when relays are connected)
self.pin0 = self.mcp.get_pin(0)
self.pin0.direction = Direction.OUTPUT
self.pin0.value = False
self.pin1 = self.mcp.get_pin(1)
self.pin1.direction = Direction.OUTPUT
self.pin1.value = False
# select a polarity to start with # select a polarity to start with
self.pin0.value = True self.pin0.value = True
self.pin1.value = False self.pin1.value = False
# set voltage for test
self.DPS.write_register(0x0000, volt, 2)
self.DPS.write_register(0x09, 1) # DPS5005 on self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(best_tx_injtime) # inject for given tx time
tau = np.nan
# voltage optimization # autogain
for volt in range(2, 10, 2): self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49)
print('trying with v:', volt) self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x48)
self.DPS.write_register(0x0000,volt,2) # fixe la voltage pour la mesure à 5V print('current P0', AnalogIn(self.ads_current, ads.P0).voltage)
time.sleep(best_tx_injtime) # inject for 1 s at least on DPS5005 print('voltage P0', AnalogIn(self.ads_voltage, ads.P0).voltage)
print('voltage P2', AnalogIn(self.ads_voltage, ads.P2).voltage)
# autogain gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0))
self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=128, address=0x49) gain_voltage0 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0))
self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=128, address=0x48) gain_voltage2 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2))
print('current P0', AnalogIn(self.ads_current, ads.P0).voltage) gain_voltage = np.min([gain_voltage0, gain_voltage2])
print('voltage P0', AnalogIn(self.ads_voltage, ads.P0).voltage) print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
print('voltage P2', AnalogIn(self.ads_voltage, ads.P2).voltage) self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=0x49)
gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0)) self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=0x48)
gain_voltage0 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0))
gain_voltage2 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2)) # we measure the voltage on both A0 and A2 to guess the polarity
gain_voltage = np.min([gain_voltage0, gain_voltage2]) I = (AnalogIn(self.ads_current, ads.P0).voltage) * 1000/50/self.r_shunt # measure current
print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 # measure voltage
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=128, address=0x49) U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=128, address=0x48) print('I (mV)', I*50*self.r_shunt)
print('I (mA)', I)
# we measure the voltage on both A0 and A2 to guess the polarity print('U0 (mV)', U0)
I = (AnalogIn(self.ads_current, ads.P0).voltage) * 1000/50/2 # measure current print('U2 (mV)', U2)
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 # measure voltage
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 # check polarity
print('I (mV)', I*50*2) polarity = 1 # by default, we guessed it right
print('I (mA)', I) vmn = U0
print('U0 (mV)', U0) if U0 < 0: # we guessed it wrong, let's use a correction factor
print('U2 (mV)', U2) polarity = -1
vmn = U2
print('polarity', polarity)
# compute constant
c = vmn / I
Rab = (volt * 1000) / I
print('Rab', Rab)
# implement different strategy
if strategy == 'vmax':
vmn_max = c * current_max
if vmn_max < voltage_max and vmn_max > voltage_min:
vab = current_max * Rab
print('target max current')
else:
iab = voltage_max / c
vab = iab * Rab
print('target max voltage')
if vab > 25000:
vab = 25000
vab = vab / 1000 * 0.9
# check polarity elif strategy == 'vmin':
polarity = 1 # by default, we guessed it right vmn_min = c * current_min
if U0 < 0: # we guessed it wrong, let's use a correction factor if vmn_min > voltage_min and vmn_min < voltage_max:
polarity = -1 vab = current_min * Rab
print('polarity', polarity) print('target min current')
# TODO (edge case) if PS is negative and greater than Vmn, it can
# potentially cause two negative values so none above 0
# check if we can actually measure smth
ok = True
if I > 2 and I <= 45:
if (((U0 < 4500) and (polarity > 0))
or ((2 < 4500) and (polarity < 0))):
if (((U0 > 10) and (polarity > 0))
or ((U2 > 10) and (polarity < 0))):
# ok, we compute tau
# inferring polarity and computing best voltage to inject
# by hardware design we can measure 10-4500 mV and 2-45 mA
# we will decide on the Vab to fall within this range
if U0 > 0: # we guessed the polarity right, let's keep that
tauI = 45 / I # compute ratio to maximize measuring range of I
tauU = 4500 / U0 # compute ratio to maximize measuring range of U
elif U0 < 0: # we guessed it wrong, let's use a correction factor
tauI = 45 / I
tauU = 4500 / U2
# let's be careful and avoid saturation by taking only 90% of
# the smallest factor
if tauI < tauU:
tau = tauI * 0.9
elif tauI > tauU:
tau = tauU * 0.9
print('tauI', tauI)
print('tauU', tauU)
print('best tau is', tau)
break
else:
# too weak, but let's try with a higher voltage
pass # we'll come back to the loop with higher voltage
else:
print('voltage out of range, max 4500 mV')
# doesn't work, tau will be NaN
break
else: else:
if I <= 2: iab = voltage_min / c
# let's try again vab = iab * Rab
pass print('target min voltage')
else: if vab < 1000:
print('current out of range, max 45 mA') vab = 1000
# doesn't work, tau will be NaN vab = vab / 1000 * 1.1
break
if tau == np.nan:
print('voltage out of range')
self.DPS.write_register(0x09, 0) # DPS5005 off
# we keep DPS5005 on if we computed a tau successfully
# turn off Vab elif strategy == 'constant':
vab = volt
else:
vab = 5
#self.DPS.write_register(0x09, 0) # DPS5005 off
self.pin0.value = False self.pin0.value = False
self.pin1.value = False self.pin1.value = False
return vab, polarity
return tau*volt, polarity
def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None, def run_measurement(self, quad=[1, 2, 3, 4], nb_stack=None, injection_duration=None,
best_tx=True, tx_volt=0, autogain=True, best_tx_injtime=1): tx_volt=5, autogain=True, best_tx_injtime=0.1, strategy='constant'):
"""Do a 4 electrode measurement and measure transfer resistance obtained. """Do a 4 electrode measurement and measure transfer resistance obtained.
Parameters Parameters
...@@ -519,25 +539,29 @@ class OhmPi(object): ...@@ -519,25 +539,29 @@ class OhmPi(object):
positive, one negative). positive, one negative).
injection_duration : int, optional injection_duration : int, optional
Injection time in seconds. Injection time in seconds.
best_tx : bool, optional
If True, will attempt to find the best Tx voltage that fill
within our measurement range. If it cannot find it, it will
return NaN as measurement. If False, it will make the
measurement with whatever it has as voltage and never returns
NaN. Finding the best tx voltage can take some time before
each quadrupole.
tx_volt : float, optional tx_volt : float, optional
If specified, voltage will be imposed disregarding the value If specified, voltage will be imposed. If 0, we will look
of best_tx argument. for the best voltage. If a best Tx cannot be found, no
measurement will be taken and values will be NaN.
autogain : bool, optional autogain : bool, optional
If True, will adapt the gain of the ADS1115 to maximize the If True, will adapt the gain of the ADS1115 to maximize the
resolution of the reading. resolution of the reading.
strategy : str, optional
If we search for best voltage (tx_volt == 0), we can choose
different strategy:
- vmin: find lowest voltage that gives us a signal
- vmax: find max voltage that are in the range
For a constant value, just set the tx_volt.
""" """
# check arguments # check arguments
if nb_stack is None: if nb_stack is None:
nb_stack = self.pardict['nb_stack'] nb_stack = self.pardict['nb_stack']
if injection_duration is None: if injection_duration is None:
injection_duration = self.pardict['injection_duration'] injection_duration = self.pardict['injection_duration']
if tx_volt > 0:
strategy == 'constant'
else:
tx_volt = 5
# inner variable initialization # inner variable initialization
sum_i = 0 sum_i = 0
...@@ -547,23 +571,36 @@ class OhmPi(object): ...@@ -547,23 +571,36 @@ class OhmPi(object):
self.exec_logger.debug('Starting measurement') self.exec_logger.debug('Starting measurement')
self.exec_logger.info('Waiting for data') self.exec_logger.info('Waiting for data')
# get best voltage to inject # let's define the pin again as if we run through measure()
if self.idps and tx_volt == 0: # as it's run in another thread, it doesn't consider these
tx_volt, polarity = self.compute_tx_volt(best_tx_injtime=best_tx_injtime) # and this can lead to short circuit!
self.pin0 = self.mcp.get_pin(0)
self.pin0.direction = Direction.OUTPUT
self.pin0.value = False
self.pin1 = self.mcp.get_pin(1)
self.pin1.direction = Direction.OUTPUT
self.pin1.value = False
# get best voltage to inject AND polarity
if self.idps:
tx_volt, polarity = self.compute_tx_volt(
best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt)
print('tx volt V:', tx_volt) print('tx volt V:', tx_volt)
else: else:
polarity = 1 polarity = 1
# first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x49, mode=0) self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=128, address=0x48, mode=0) self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48, mode=0)
# turn on the power supply # turn on the power supply
oor = False oor = False
if self.idps: if self.idps:
if tx_volt != np.nan: print('++++ tx_volt', tx_volt)
if np.isnan(tx_volt) == False:
self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V
self.DPS.write_register(0x09, 1) # DPS5005 on self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(0.05)
else: else:
print('no best voltage found, will not take measurement') print('no best voltage found, will not take measurement')
oor = True oor = True
...@@ -585,6 +622,9 @@ class OhmPi(object): ...@@ -585,6 +622,9 @@ class OhmPi(object):
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=0x49, mode=0) self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=0x49, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=0x48, mode=0) self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=0x48, mode=0)
self.pin0.value = False
self.pin1.value = False
# one stack = 2 half-cycles (one positive, one negative) # one stack = 2 half-cycles (one positive, one negative)
pinMN = 0 if polarity > 0 else 2 pinMN = 0 if polarity > 0 else 2
...@@ -593,7 +633,6 @@ class OhmPi(object): ...@@ -593,7 +633,6 @@ class OhmPi(object):
self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1
# full data for waveform # full data for waveform
#fulldata = np.zeros((nb_stack * self.nb_samples, 3)) * np.nan
fulldata = [] fulldata = []
# start counter # start counter
...@@ -610,6 +649,7 @@ class OhmPi(object): ...@@ -610,6 +649,7 @@ class OhmPi(object):
else: else:
self.pin0.value = False self.pin0.value = False
self.pin1.value = True # current injection nr2 self.pin1.value = True # current injection nr2
print('stack', n, self.pin0.value, self.pin1.value)
# measurement of current i and voltage u during injection # measurement of current i and voltage u during injection
meas = np.zeros((self.nb_samples, 3)) * np.nan meas = np.zeros((self.nb_samples, 3)) * np.nan
...@@ -632,7 +672,7 @@ class OhmPi(object): ...@@ -632,7 +672,7 @@ class OhmPi(object):
self.pin0.value = False self.pin0.value = False
self.pin1.value = False self.pin1.value = False
end_delay = time.time() end_delay = time.time()
# truncate the meas array if we didn't fill the last samples # truncate the meas array if we didn't fill the last samples
meas = meas[:k+1] meas = meas[:k+1]
...@@ -691,15 +731,28 @@ class OhmPi(object): ...@@ -691,15 +731,28 @@ class OhmPi(object):
else: else:
sum_vmn = sum_vmn + vmn1 sum_vmn = sum_vmn + vmn1
sum_ps = sum_ps + vmn1 sum_ps = sum_ps + vmn1
if self.idps: if self.idps:
self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt
self.DPS.write_register(0x09, 0) # DPS5005 off self.DPS.write_register(0x09, 0) # DPS5005 off
print('off')
else: else:
sum_i = np.nan sum_i = np.nan
sum_vmn = np.nan sum_vmn = np.nan
sum_ps = np.nan sum_ps = np.nan
# reshape full data to an array of good size
# we need an array of regular size to save in the csv
if oor == False:
fulldata = np.vstack(fulldata)
# we create a big enough array given nb_samples, number of
# half-cycles (1 stack = 2 half-cycles), and twice as we
# measure decay as well
a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 3)) * np.nan
a[:fulldata.shape[0], :] = fulldata
fulldata = a
else:
np.array([[]])
# create a dictionary and compute averaged values from all stacks # create a dictionary and compute averaged values from all stacks
d = { d = {
...@@ -708,18 +761,20 @@ class OhmPi(object): ...@@ -708,18 +761,20 @@ class OhmPi(object):
"B": quad[1], "B": quad[1],
"M": quad[2], "M": quad[2],
"N": quad[3], "N": quad[3],
"inj time [ms]": (end_delay - start_delay) * 1000, "inj time [ms]": (end_delay - start_delay) * 1000 if oor == False else 0,
"Vmn [mV]": sum_vmn / (2 * nb_stack), "Vmn [mV]": sum_vmn / (2 * nb_stack),
"I [mA]": sum_i / (2 * nb_stack), "I [mA]": sum_i / (2 * nb_stack),
"R [ohm]": sum_vmn / sum_i, "R [ohm]": sum_vmn / sum_i,
"Ps [mV]": sum_ps / (2 * nb_stack), "Ps [mV]": sum_ps / (2 * nb_stack),
"nbStack": nb_stack, "nbStack": nb_stack,
"Tx [V]": tx_volt if oor == False else 0,
"CPU temp [degC]": CPUTemperature().temperature, "CPU temp [degC]": CPUTemperature().temperature,
"Time [s]": (time.time() - start_time),
"Nb samples [-]": self.nb_samples, "Nb samples [-]": self.nb_samples,
"fulldata": np.vstack(fulldata) "fulldata": fulldata,
} }
print(d) a = deepcopy(d)
a.pop('fulldata')
print(a)
# round number to two decimal for nicer string output # round number to two decimal for nicer string output
output = [f'{k}\t' for k in d.keys()] output = [f'{k}\t' for k in d.keys()]
...@@ -736,7 +791,7 @@ class OhmPi(object): ...@@ -736,7 +791,7 @@ class OhmPi(object):
return d return d
def rs_check(self): def rs_check(self, tx_volt=12):
""" Check contact resistance. """ Check contact resistance.
""" """
# create custom sequence where MN == AB # create custom sequence where MN == AB
...@@ -748,6 +803,9 @@ class OhmPi(object): ...@@ -748,6 +803,9 @@ class OhmPi(object):
elec[:-1], elec[:-1],
elec[1:], elec[1:],
]).T ]).T
if self.idps:
quads[:, 2:] = 0 # we don't open Vmn to prevent burning the MN part
# as it has a smaller range of accepted voltage
# create filename to store RS # create filename to store RS
export_path_rs = self.pardict['export_path'].replace('.csv', '') \ export_path_rs = self.pardict['export_path'].replace('.csv', '') \
...@@ -764,22 +822,22 @@ class OhmPi(object): ...@@ -764,22 +822,22 @@ class OhmPi(object):
for i in range(0, quads.shape[0]): for i in range(0, quads.shape[0]):
quad = quads[i, :] # quadrupole quad = quads[i, :] # quadrupole
self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c) self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c)
d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=0.5, tx_volt=5, autogain=True) d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=1, tx_volt=tx_volt, autogain=False)
resistance = d['R [ohm]'] if self.idps:
voltage = d['Vmn [mV]'] voltage = tx_volt # imposed voltage on dps5005
else:
voltage = d['Vmn [mV]']
current = d['I [mA]'] current = d['I [mA]']
print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} Ohm'.format(
current, voltage, resistance))
# compute resistance measured (= contact resistance) # compute resistance measured (= contact resistance)
resist = abs(resistance / 1000) resist = abs(voltage / current) / 1000
print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format(
current, voltage, resist))
msg = 'Contact resistance {:s}: {:.3f} kOhm'.format( msg = 'Contact resistance {:s}: {:.3f} kOhm'.format(
str(quad), resist) str(quad), resist)
#print(msg)
self.exec_logger.debug(msg) self.exec_logger.debug(msg)
# if contact resistance = 0 -> we have a short circuit!! # if contact resistance = 0 -> we have a short circuit!!
if resist < 1e-5: if resist < 1e-5:
msg = '!!!SHORT CIRCUIT!!! {:s}: {:.3f} kOhm'.format( msg = '!!!SHORT CIRCUIT!!! {:s}: {:.3f} kOhm'.format(
...@@ -796,8 +854,6 @@ class OhmPi(object): ...@@ -796,8 +854,6 @@ class OhmPi(object):
# close mux path and put pin back to GND # close mux path and put pin back to GND
self.switch_mux_off(quad) self.switch_mux_off(quad)
#self.pin0.value = False
#self.pin1.value = False
self.reset_mux() self.reset_mux()
self.status = 'idle' self.status = 'idle'
...@@ -819,6 +875,19 @@ class OhmPi(object): ...@@ -819,6 +875,19 @@ class OhmPi(object):
last_measurement : dict last_measurement : dict
Last measurement taken in the form of a python dictionary Last measurement taken in the form of a python dictionary
""" """
last_measurement = deepcopy(last_measurement)
if 'fulldata' in last_measurement:
d = last_measurement['fulldata']
n = d.shape[0]
if n > 1:
idic = dict(zip(['i' + str(i) for i in range(n)], d[:,0]))
udic = dict(zip(['u' + str(i) for i in range(n)], d[:,1]))
tdic = dict(zip(['t' + str(i) for i in range(n)], d[:,2]))
last_measurement.update(idic)
last_measurement.update(udic)
last_measurement.update(tdic)
last_measurement.pop('fulldata')
if os.path.isfile(filename): if os.path.isfile(filename):
# Load data file and append data to it # Load data file and append data to it
...@@ -835,7 +904,7 @@ class OhmPi(object): ...@@ -835,7 +904,7 @@ class OhmPi(object):
# last_measurement.to_csv(f, header=True) # last_measurement.to_csv(f, header=True)
def measure(self): def measure(self, **kwargs):
"""Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'. """Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'.
""" """
self.run = True self.run = True
...@@ -868,8 +937,7 @@ class OhmPi(object): ...@@ -868,8 +937,7 @@ class OhmPi(object):
# run a measurement # run a measurement
if self.on_pi: if self.on_pi:
current_measurement = self.run_measurement(quad, self.pardict["nb_stack"], current_measurement = self.run_measurement(quad, **kwargs)
self.pardict["injection_duration"])
else: # for testing, generate random data else: # for testing, generate random data
current_measurement = { current_measurement = {
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
......
...@@ -2,23 +2,64 @@ from ohmpi import OhmPi ...@@ -2,23 +2,64 @@ from ohmpi import OhmPi
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
k = OhmPi(idps=True) a = np.arange(13) + 1
k.rs_check() b = a + 3
m = a + 1
n = a + 2
seq = np.c_[a, b, m, n]
x = [] k = OhmPi(idps=True)
for i in range(1): k.pardict['injection_duration'] = 0.5
out = k.run_measurement(injection_duration=0.5, nb_stack=1, tx_volt=0, autogain=True, best_tx_injtime=0.2) k.pardict['nb_stack'] = 1
x.append(out['R [ohm]']) k.pardict['tx_volt'] = 0
k.pardict['nbr_meas'] = 1
#k.sequence = seq
#k.reset_mux()
#k.switch_mux_on([4, 7, 5, 6])
#k.switch_mux_on([12, 15, 13, 14])
#k.measure(strategy='vmax')
#print('vab', k.compute_tx_volt(strategy='vmin'))
#k.rs_check()
# out = k.run_measurement(quad=[3, 3, 3, 3], nb_stack=1, tx_volt=12, strategy='constant', autogain=True)
#k.reset_mux()
#k.rs_check(tx_volt=12)
# x = []
for i in range(5):
out = k.run_measurement(injection_duration=0.5, nb_stack=5, strategy='vmin', tx_volt=5, autogain=True)
#x.append(out['R [ohm]'])
k.append_and_save('out.csv', out)
data = out['fulldata'] data = out['fulldata']
inan = ~np.isnan(data[:,0]) inan = ~np.isnan(data[:,0])
if True: if True:
plt.plot(data[inan,2], data[inan,0], '.-', label='current [mA]') fig, axs = plt.subplots(2, 1, sharex=True)
plt.plot(data[inan,2], data[inan,1], '.-', label='voltage [mV]') ax = axs[0]
plt.legend() ax.plot(data[inan,2], data[inan,0], 'r.-', label='current [mA]')
ax.set_ylabel('Current AB [mA]')
ax = axs[1]
ax.plot(data[inan,2], data[inan,1], '.-', label='voltage [mV]')
ax.set_ylabel('Voltage MN [mV]')
ax.set_xlabel('Time [s]')
plt.show() plt.show()
# fig,ax=plt.subplots()
#
#
# ax.plot(data[inan,2], data[inan,0], label='current [mA]', marker="o")
# ax2=ax.twinx()
# ax2.plot(data[inan,2], data[inan,1],'r.-' , label='current [mV]')
# ax2.set_ylabel('Voltage [mV]', color='r')
# ymin=-50
# ymax=50
# ymin1=-4500
# ymax1= 4500
# ax.set_ylim([ymin,ymax])
# ax2.set_ylim([ymin1,ymax1])
#
# plt.show()
if False: if False:
......
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
fnames = sorted(os.listdir('data'))
df = pd.read_csv('out.csv', engine='python')
df
ct = df.columns[df.columns.str.match('t\d+')]
ci = df.columns[df.columns.str.match('i\d+')]
cu = df.columns[df.columns.str.match('u\d+')]
fig, axs = plt.subplots(2, 1, sharex=True)
for i in range(df.shape[0]):
print(i)
data = np.c_[df.loc[i, ct], df.loc[i, ci], df.loc[i, cu]].astype(float)
inan = ~(np.isnan(data).any(1))
label = ', '.join(df.loc[i, ['A', 'B', 'M', 'N']].astype(str).to_list())
ax = axs[0]
ax.plot(data[inan,0], data[inan,1], '.-', label=label)
ax.set_ylabel('Current AB [mA]')
ax.legend()
ax = axs[1]
ax.plot(data[inan,0], data[inan,2], '.-', label=label)
ax.set_ylabel('Voltage MN [mV]')
ax.set_xlabel('Time [s]')
plt.show()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment