Commit acf68183 authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

Merge branch 'code_refactor' of https://gitlab.irstea.fr/reversaal/OhmPi into code_refactor

Showing with 249 additions and 69 deletions
+249 -69
......@@ -50,7 +50,7 @@ HARDWARE_CONFIG = {
'roles': {'M': 'X'},
'cabling': {(i, j): ('mux_M', i) for j in ['M'] for i in range(1, 65)},
'voltage_max': 12.},
'mux_N':
'mux_N':
{'model': 'mux_2023_0_X',
'mux_tca_address': 0x73,
'roles': {'N': 'X'},
......
......@@ -8,8 +8,8 @@ import logging
from ohmpi.config import HARDWARE_CONFIG
stand_alone = False
part_of_hardware_system = True
within_ohmpi = False
part_of_hardware_system = False
within_ohmpi = True
# Stand alone
if stand_alone:
......@@ -105,14 +105,14 @@ if within_ohmpi:
# k._hw.switch_mux([A, B, M, N], state='off')
# print(f'OhmPiHardware Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
# k._hw._plot_readings()
k.load_sequence('sequences/9991_GRAD_16_s1_a1.txt')
k.run_sequence(tx_volt=5, injection_duration=1., nb_stack=2, duty_cycle=0.5)
# k.load_sequence('sequences/9991_GRAD_16_s1_a1.txt')
# k.run_sequence(tx_volt=5., injection_duration=1., nb_stack=2, duty_cycle=0.5)
print('using OhmPi')
#d = k.run_measurement([A, B, M, N], injection_duration=1., nb_stack=2, duty_cycle=0.5)
d = k.run_measurement([A, B, M, N], injection_duration=1., nb_stack=2, duty_cycle=0.5)
# print(d)
# k._hw._plot_readings()
k._hw._plot_readings()
print(f'OhmPiHardware: Resistance: {k._hw.last_resistance() :.2f} ohm, dev. {k._hw.last_dev():.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV')
print(f'OhmPi: Resistance: {d["R [ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
print(f'OhmPi: Resistance: {d["R [Ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
# k._hw._plot_readings(save_fig=False)
# plot_exec_log('ohmpi/logs/exec.log')
change_config('../configs/config_default.py', verbose=False)
......@@ -122,6 +122,7 @@ class PwrAbstract(ABC):
else:
self.voltage = self._voltage_min
class MuxAbstract(ABC):
def __init__(self, **kwargs):
self.model = kwargs.pop('model', 'unknown MUX hardware')
......@@ -279,7 +280,7 @@ class TxAbstract(ABC):
self.pwr = kwargs.pop('pwr', None)
self._polarity = 0
self._injection_duration = None
self._adc_gain = 1.
self._gain = 1.
self.injection_duration = injection_duration
self._latency = kwargs.pop('latency', 0.)
self.tx_sync = kwargs.pop('tx_sync', Event())
......@@ -287,26 +288,23 @@ class TxAbstract(ABC):
self._pwr_state = 'off'
@property
def adc_gain(self):
return self._adc_gain
def gain(self):
return self._gain
@adc_gain.setter
def adc_gain(self, value):
@gain.setter
def gain(self, value):
"""
Set gain on RX ADC
Set gain on RX
Parameters
----------
value: float
"""
self._adc_gain = value
self.exec_logger.debug(f'Setting TX ADC gain to {value}')
self._gain = value
self.exec_logger.debug(f'Setting TX gain to {value}')
@abstractmethod
def _adc_gain_auto(self):
pass
@abstractmethod
def current_pulse(self, **kurwargs):
def current_pulse(self, **kwargs):
pass
@abstractmethod
......@@ -403,6 +401,10 @@ class TxAbstract(ABC):
self._pwr_state = 'off'
self.exec_logger.debug(f'{self.model} cannot switch off power source')
def reset_gain(self):
self.gain = 1.
class RxAbstract(ABC):
def __init__(self, **kwargs):
self.exec_logger = kwargs.pop('exec_logger', None)
......@@ -416,31 +418,34 @@ class RxAbstract(ABC):
self._sampling_rate = kwargs.pop('sampling_rate', 1) # ms
self.exec_logger.debug(f'{self.model} RX initialization')
self._voltage_max = kwargs.pop('voltage_max', 0.)
self._adc_gain = 1.
self._gain = 1.
self._max_sampling_rate = np.inf
self._latency = kwargs.pop('latency', 0.)
self._bias = kwargs.pop('bias', 0.)
self._vmn_hardware_offset = kwargs.pop('vmn_hardware_offset', 0.)
@property
def adc_gain(self):
return self._adc_gain
def gain(self):
return self._gain
@adc_gain.setter
def adc_gain(self, value):
@gain.setter
def gain(self, value):
"""
Sets gain on RX ADC
Sets gain on RX
Parameters
----------
value: float
"""
self._adc_gain = value
self.exec_logger.debug(f'Setting RX ADC gain to {value}')
self._gain = value
self.exec_logger.debug(f'Setting RX gain to {value}')
@abstractmethod
def _adc_gain_auto(self):
def gain_auto(self):
pass
def reset_gain(self):
self.gain = 1.
@property
def sampling_rate(self):
return self._sampling_rate
......
......@@ -31,7 +31,7 @@ SPECS = {'rx': {'model': {'default': os.path.basename(__file__).rstrip('.py')},
'mcp_address': {'default': 0x20},
'ads_address': {'default': 0x48},
'compatible_power_sources': {'default': ['pwr_batt', 'dps5005']},
'r_shunt': {'min': 0., 'default': 2. },
'r_shunt': {'min': 0., 'default': 2.},
'activation_delay': {'default': 0.005}, # Max turn on time of 211EH relays = 5ms
'release_delay': {'default': 0.001}, # Max turn off time of 211EH relays = 1ms
}}
......@@ -91,7 +91,8 @@ class Tx(TxAbstract):
# ADS1115 for current measurement (AB)
self._ads_current_address = kwargs['ads_address']
self._ads_current_data_rate = kwargs['data_rate']
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=self._ads_current_data_rate,
self._adc_gain = 2 / 3
self._ads_current = ads.ADS1115(self.connection, gain=self._adc_gain, data_rate=self._ads_current_data_rate,
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
self.r_shunt = kwargs['r_shunt']
......@@ -116,7 +117,7 @@ class Tx(TxAbstract):
def gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain,
self._ads_current = ads.ADS1115(self.connection, gain=self._adc_gain,
data_rate=SPECS['tx']['data_rate']['default'],
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
......@@ -182,12 +183,12 @@ class Tx(TxAbstract):
@property
def tx_bat(self):
if np.isnan(self.tx.pwr.battery_voltage):
if np.isnan(self.pwr.battery_voltage):
self.soh_logger.warning(f'Cannot get battery voltage on {self.model}')
self.exec_logger.debug(f'{self.model} cannot read battery voltage. Returning default battery voltage.')
return self.pwr.voltage
else:
return self.tx.pwr.battery_voltage
return self.pwr.battery_voltage
def voltage_pulse(self, voltage=None, length=None, polarity=1):
......@@ -248,7 +249,7 @@ class Rx(RxAbstract):
def gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_voltage = ads.ADS1115(self.connection, gain=self.adc_gain,
self._ads_voltage = ads.ADS1115(self.connection, gain=self._adc_gain,
data_rate=SPECS['rx']['data_rate']['default'],
address=self._ads_voltage_address)
self._ads_voltage.mode = Mode.CONTINUOUS
......@@ -263,6 +264,10 @@ class Rx(RxAbstract):
def gain_auto(self):
self._adc_gain_auto()
def reset_gain(self):
self.gain = 2/3
@property
def voltage(self):
""" Gets the voltage VMN in Volts
......
......@@ -192,6 +192,9 @@ class Rx(Rx_mb_2023):
self._dg411_gain_auto()
self.exec_logger.debug(f'Setting RX gain automatically to {self.gain}')
def reset_gain(self):
self.gain = 1/3
@property
def voltage(self):
""" Gets the voltage VMN in Volts
......
......@@ -37,12 +37,13 @@ class Ctl(CtlAbstract):
# None interface for battery
self.interfaces['none'] = None
# warnings.filterwarnings("error") # to filter out adafruit warning about setting I2C frequency
warnings.filterwarnings("error") # to filter out adafruit warning about setting I2C frequency
# I2C
try:
self.interfaces['i2c'] = busio.I2C(board.SCL, board.SDA) # noqa
except RuntimeWarning:
pass
warnings.resetwarnings()
# Extended I2C
try:
......@@ -51,7 +52,6 @@ class Ctl(CtlAbstract):
pass
except Exception as e:
self.exec_logger.warning(f'Could not initialize Extended I2C:\n{e}')
warnings.resetwarnings()
# modbus
try:
......
......@@ -317,8 +317,94 @@ class OhmPiHardware:
sp = np.mean(mean_vmn[np.ix_(polarity == 1)] + mean_vmn[np.ix_(polarity == -1)]) / 2
return sp
# def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
# vab_max=voltage_max, vmn_min=voltage_min):
# """Estimates best Tx voltage based on different strategies.
# At first a half-cycle is made for a short duration with a fixed
# known voltage. This gives us Iab and Rab. We also measure Vmn.
# A constant c = vmn/iab is computed (only depends on geometric
# factor and ground resistivity, that doesn't change during a
# quadrupole). Then depending on the strategy, we compute which
# vab to inject to reach the minimum/maximum Iab current or
# min/max Vmn.
# This function also compute the polarity on Vmn (on which pin
# of the ADS1115 we need to measure Vmn to get the positive value).
#
# Parameters
# ----------
# pulse_duration : float, optional
# Time in seconds for the pulse used to compute Rab.
# strategy : str, optional
# Either:
# - vmax : compute Vab to reach a maximum Iab without exceeding vab_max
# - vmin : compute Vab to reach at least vmn_min
# - constant : apply given Vab
# tx_volt : float, optional
# Voltage to apply for guessing the best voltage. 5 V applied
# by default. If strategy "constant" is chosen, constant voltage
# to applied is "tx_volt".
# vab_max : float, optional
# Maximum injection voltage to apply to tx (used by all strategies)
# vmn_min : float, optional
# Minimum voltage target for rx (used by vmin strategy)
#
# Returns
# -------
# vab : float
# Proposed Vab according to the given strategy.
# polarity:
# Polarity of VMN relative to polarity of VAB
# rab : float
# Resistance between injection electrodes
# """
#
# vab_max = np.abs(vab_max)
# vmn_min = np.abs(vmn_min)
# vab = np.min([np.abs(tx_volt), vab_max])
# # self.tx.turn_on()
# switch_pwr_off, switch_tx_pwr_off = False, False #TODO: check if these should be moved in kwargs
# if self.tx.pwr_state == 'off':
# self.tx.pwr_state = 'on'
# switch_tx_pwr_off = True
# if self.tx.pwr.pwr_state == 'off':
# self.tx.pwr.pwr_state = 'on'
# switch_pwr_off = True
# if 1. / self.rx.sampling_rate > pulse_duration:
# sampling_rate = 1. / pulse_duration # TODO: check this...
# else:
# sampling_rate = self.tx.sampling_rate
# self._vab_pulse(vab=vab, duration=pulse_duration, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
# vmn = np.mean(self.readings[:, 4])
# iab = np.mean(self.readings[:, 3])
# # if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# # then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
# if strategy == 'vmax':
# # implement different strategies
# if vab < vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
# elif strategy == 'vmin':
# if vab <= vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# elif strategy != 'constant':
# self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
# else:
# self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# # self.tx.turn_off()
# if switch_pwr_off:
# self.tx.pwr.pwr_state = 'off'
# if switch_tx_pwr_off:
# self.tx.pwr_state = 'off'
# rab = (np.abs(vab) * 1000.) / iab
# self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
# if vmn < 0:
# polarity = -1 # TODO: check if we really need to return polarity
# else:
# polarity = 1
# return vab, polarity, rab
def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
vab_max=voltage_max, vmn_min=voltage_min):
vab_max=voltage_max, vmn_min=voltage_min, polarities=(1, -1), delay=0.050):
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
......@@ -358,11 +444,26 @@ class OhmPiHardware:
Resistance between injection electrodes
"""
# TODO: Get those values from components
p_max = 2.5
vmn_max = 5.
vab_max = 50.
# define a sill
diff_vab_lim = 2.5
n_steps = 4
# Set gain at min
self.rx.reset_gain()
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
k = 0
vab_list = np.zeros(n_steps + 1) * np.nan
vab = np.min([np.abs(tx_volt), vab_max])
vab_list[k] = vab
# self.tx.turn_on()
switch_pwr_off, switch_tx_pwr_off = False, False #TODO: check if these should be moved in kwargs
switch_pwr_off, switch_tx_pwr_off = False, False # TODO: check if these should be moved in kwargs
if self.tx.pwr_state == 'off':
self.tx.pwr_state = 'on'
switch_tx_pwr_off = True
......@@ -372,36 +473,100 @@ class OhmPiHardware:
if 1. / self.rx.sampling_rate > pulse_duration:
sampling_rate = 1. / pulse_duration # TODO: check this...
else:
sampling_rate = self.tx.sampling_rate
self._vab_pulse(vab=vab, duration=pulse_duration, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
vmn = np.mean(self.readings[:, 4])
iab = np.mean(self.readings[:, 3])
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
if strategy == 'vmax':
# implement different strategies
if vab < vab_max and iab < current_max:
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
elif strategy == 'vmin':
if vab <= vab_max and iab < current_max:
vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
elif strategy != 'constant':
self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
else:
self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# self.tx.turn_off()
if switch_pwr_off:
self.tx.pwr.pwr_state = 'off'
if switch_tx_pwr_off:
self.tx.pwr_state = 'off'
rab = (np.abs(vab) * 1000.) / iab
self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
if vmn < 0:
polarity = -1 # TODO: check if we really need to return polarity
else:
polarity = 1
return vab, polarity, rab
sampling_rate = self.rx.sampling_rate
current, voltage = 0., 0.
if self.tx.pwr.voltage_adjustable:
self.tx.pwr.voltage = vab
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
diff_vab = np.inf
while (k < n_steps) and (diff_vab > diff_vab_lim):
vabs = []
for pol in polarities:
# self.tx.polarity = pol
# set gains automatically
injection = Thread(target=self._inject, kwargs={'injection_duration': 0.2, 'polarity': pol})
readings = Thread(target=self._read_values)
injection.start()
self.tx_sync.wait()
readings.start()
readings.join()
injection.join()
v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0))[0] # NOTE : discard data aquired in the first x ms
iab = self.readings[v, 3]/1000.
vmn = np.abs(self.readings[v, 4]/1000. * self.readings[v, 2])
iab_mean = np.mean(iab)
iab_std = np.std(iab)
vmn_mean = np.mean(vmn)
vmn_std = np.std(vmn)
print(f'iab: ({iab_mean:.5f}, {iab_std:5f}), vmn: ({vmn_mean:.4f}, {vmn_std:.4f})')
# bounds on iab
iab_upper_bound = iab_mean + 2 * iab_std
iab_lower_bound = np.max([0.00001, iab_mean - 2 * iab_std])
# bounds on vmn
vmn_upper_bound = vmn_mean + 2 * vmn_std
vmn_lower_bound = np.max([0.000001, vmn_mean - 2 * vmn_std])
# ax.plot([vab[k]] * 2, [vmn_lower_bound, vmn_upper_bound], '-b')
# ax.plot([0, vab_max], [0, vmn_upper_bound * vab_max / vab[k]], '-r', alpha=(k + 1) / n_steps)
# ax.plot([0, vab_max], [0, vmn_lower_bound * vab_max / vab[k]], '-g', alpha=(k + 1) / n_steps)
# bounds on rab
print(f'rab_lb: {vab_list[k] / iab_upper_bound}')
rab_lower_bound = np.min([0.1, np.abs(vab_list[k] / iab_upper_bound)])
rab_upper_bound = np.min([0.1, np.abs(vab_list[k] / iab_lower_bound)])
# bounds on r
r_lower_bound = np.min([0.01, np.abs(vmn_lower_bound / iab_upper_bound)])
r_upper_bound = np.min([0.01, np.abs(vmn_upper_bound / iab_lower_bound)])
# conditions for vab update
cond_vmn_max = rab_lower_bound / r_upper_bound * vmn_max
cond_p_max = np.sqrt(p_max * rab_lower_bound)
new_vab = np.min([vab_max, cond_vmn_max, cond_p_max])
diff_vab = np.abs(new_vab - vab_list[k])
print(f'Rab: [{rab_lower_bound:.1f}, {rab_upper_bound:.1f}], R: [{r_lower_bound:.1f},{r_upper_bound:.1f}]')
print(
f'{k}: [{vab_max:.1f}, {cond_vmn_max:.1f}, {cond_p_max:.1f}], new_vab: {new_vab}, diff_vab: {diff_vab}\n')
if new_vab == vab_max:
print('Vab bounded by Vab max')
elif new_vab == cond_p_max:
print('Vab bounded by Pmax')
elif new_vab == cond_vmn_max:
print('Vab bounded by Vmn max')
else:
print('Next step')
vabs.append(new_vab)
if diff_vab < diff_vab_lim:
print('stopped on vab increase too small')
else:
print('stopped on maximum number of steps reached')
k = k + 1
vab_list[k] = np.min(vabs)
vab_opt = vab_list[k]
print(f'Selected Vab: {vab_opt:.2f}')
# if strategy == 'vmax':
# # implement different strategies
# if vab < vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
# elif strategy == 'vmin':
# if vab <= vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# elif strategy != 'constant':
# self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
# else:
# self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# # self.tx.turn_off()
# if switch_pwr_off:
# self.tx.pwr.pwr_state = 'off'
# if switch_tx_pwr_off:
# self.tx.pwr_state = 'off'
# rab = (np.abs(vab) * 1000.) / iab
# self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
# if vmn < 0:
# polarity = -1 # TODO: check if we really need to return polarity
# else:
# polarity = 1
return vab_opt, None, None
def _plot_readings(self, save_fig=False):
# Plot graphs
......@@ -440,6 +605,7 @@ class OhmPiHardware:
# if self.tx.pwr.pwr_state == 'off':
# self.tx.pwr.pwr_state = 'on'
# switch_pwr_off = True
self._gain_auto(vab=vab)
assert 0. <= duty_cycle <= 1.
if duty_cycle < 1.:
......
......@@ -500,6 +500,7 @@ class OhmPi(object):
bypass_check = kwargs['bypass_check'] if 'bypass_check' in kwargs.keys() else False
d = {}
if self.switch_mux_on(quad, bypass_check=bypass_check, cmd_id=cmd_id):
tx_volt,_ ,_ = self._hw._compute_tx_volt(tx_volt=tx_volt, strategy=strategy)
self._hw.vab_square_wave(tx_volt, cycle_duration=injection_duration*2/duty_cycle, cycles=nb_stack, duty_cycle=duty_cycle)
if 'delay' in kwargs.keys():
delay = kwargs['delay']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment