Commit b3e1576b authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Adds gain and reset_gain in abstract RX and TX; Adds a rewritten function for _compute_tx_volt

Showing with 227 additions and 53 deletions
+227 -53
......@@ -50,7 +50,7 @@ HARDWARE_CONFIG = {
'roles': {'M': 'X'},
'cabling': {(i, j): ('mux_M', i) for j in ['M'] for i in range(1, 65)},
'voltage_max': 12.},
'mux_N':
'mux_N':
{'model': 'mux_2023_0_X',
'mux_tca_address': 0x73,
'roles': {'N': 'X'},
......
......@@ -122,6 +122,7 @@ class PwrAbstract(ABC):
else:
self.voltage = self._voltage_min
class MuxAbstract(ABC):
def __init__(self, **kwargs):
self.model = kwargs.pop('model', 'unknown MUX hardware')
......@@ -279,7 +280,7 @@ class TxAbstract(ABC):
self.pwr = kwargs.pop('pwr', None)
self._polarity = 0
self._injection_duration = None
self._adc_gain = 1.
self._gain = 1.
self.injection_duration = injection_duration
self._latency = kwargs.pop('latency', 0.)
self.tx_sync = kwargs.pop('tx_sync', Event())
......@@ -287,26 +288,23 @@ class TxAbstract(ABC):
self._pwr_state = 'off'
@property
def adc_gain(self):
return self._adc_gain
def gain(self):
return self._gain
@adc_gain.setter
def adc_gain(self, value):
@gain.setter
def gain(self, value):
"""
Set gain on RX ADC
Set gain on RX
Parameters
----------
value: float
"""
self._adc_gain = value
self.exec_logger.debug(f'Setting TX ADC gain to {value}')
self._gain = value
self.exec_logger.debug(f'Setting TX gain to {value}')
@abstractmethod
def _adc_gain_auto(self):
pass
@abstractmethod
def current_pulse(self, **kurwargs):
def current_pulse(self, **kwargs):
pass
@abstractmethod
......@@ -403,6 +401,10 @@ class TxAbstract(ABC):
self._pwr_state = 'off'
self.exec_logger.debug(f'{self.model} cannot switch off power source')
def reset_gain(self):
self.gain = 1.
class RxAbstract(ABC):
def __init__(self, **kwargs):
self.exec_logger = kwargs.pop('exec_logger', None)
......@@ -416,31 +418,34 @@ class RxAbstract(ABC):
self._sampling_rate = kwargs.pop('sampling_rate', 1) # ms
self.exec_logger.debug(f'{self.model} RX initialization')
self._voltage_max = kwargs.pop('voltage_max', 0.)
self._adc_gain = 1.
self._gain = 1.
self._max_sampling_rate = np.inf
self._latency = kwargs.pop('latency', 0.)
self._bias = kwargs.pop('bias', 0.)
self._vmn_hardware_offset = kwargs.pop('vmn_hardware_offset', 0.)
@property
def adc_gain(self):
return self._adc_gain
def gain(self):
return self._gain
@adc_gain.setter
def adc_gain(self, value):
@gain.setter
def gain(self, value):
"""
Sets gain on RX ADC
Sets gain on RX
Parameters
----------
value: float
"""
self._adc_gain = value
self.exec_logger.debug(f'Setting RX ADC gain to {value}')
self._gain = value
self.exec_logger.debug(f'Setting RX gain to {value}')
@abstractmethod
def _adc_gain_auto(self):
def gain_auto(self):
pass
def reset_gain(self):
self.gain = 1.
@property
def sampling_rate(self):
return self._sampling_rate
......
......@@ -263,6 +263,10 @@ class Rx(RxAbstract):
def gain_auto(self):
self._adc_gain_auto()
def reset_gain(self):
self.gain = 2/3
@property
def voltage(self):
""" Gets the voltage VMN in Volts
......
......@@ -192,6 +192,9 @@ class Rx(Rx_mb_2023):
self._dg411_gain_auto()
self.exec_logger.debug(f'Setting RX gain automatically to {self.gain}')
def reset_gain(self):
self.gain = 1/3
@property
def voltage(self):
""" Gets the voltage VMN in Volts
......
......@@ -317,8 +317,94 @@ class OhmPiHardware:
sp = np.mean(mean_vmn[np.ix_(polarity == 1)] + mean_vmn[np.ix_(polarity == -1)]) / 2
return sp
# def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
# vab_max=voltage_max, vmn_min=voltage_min):
# """Estimates best Tx voltage based on different strategies.
# At first a half-cycle is made for a short duration with a fixed
# known voltage. This gives us Iab and Rab. We also measure Vmn.
# A constant c = vmn/iab is computed (only depends on geometric
# factor and ground resistivity, that doesn't change during a
# quadrupole). Then depending on the strategy, we compute which
# vab to inject to reach the minimum/maximum Iab current or
# min/max Vmn.
# This function also compute the polarity on Vmn (on which pin
# of the ADS1115 we need to measure Vmn to get the positive value).
#
# Parameters
# ----------
# pulse_duration : float, optional
# Time in seconds for the pulse used to compute Rab.
# strategy : str, optional
# Either:
# - vmax : compute Vab to reach a maximum Iab without exceeding vab_max
# - vmin : compute Vab to reach at least vmn_min
# - constant : apply given Vab
# tx_volt : float, optional
# Voltage to apply for guessing the best voltage. 5 V applied
# by default. If strategy "constant" is chosen, constant voltage
# to applied is "tx_volt".
# vab_max : float, optional
# Maximum injection voltage to apply to tx (used by all strategies)
# vmn_min : float, optional
# Minimum voltage target for rx (used by vmin strategy)
#
# Returns
# -------
# vab : float
# Proposed Vab according to the given strategy.
# polarity:
# Polarity of VMN relative to polarity of VAB
# rab : float
# Resistance between injection electrodes
# """
#
# vab_max = np.abs(vab_max)
# vmn_min = np.abs(vmn_min)
# vab = np.min([np.abs(tx_volt), vab_max])
# # self.tx.turn_on()
# switch_pwr_off, switch_tx_pwr_off = False, False #TODO: check if these should be moved in kwargs
# if self.tx.pwr_state == 'off':
# self.tx.pwr_state = 'on'
# switch_tx_pwr_off = True
# if self.tx.pwr.pwr_state == 'off':
# self.tx.pwr.pwr_state = 'on'
# switch_pwr_off = True
# if 1. / self.rx.sampling_rate > pulse_duration:
# sampling_rate = 1. / pulse_duration # TODO: check this...
# else:
# sampling_rate = self.tx.sampling_rate
# self._vab_pulse(vab=vab, duration=pulse_duration, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
# vmn = np.mean(self.readings[:, 4])
# iab = np.mean(self.readings[:, 3])
# # if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# # then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
# if strategy == 'vmax':
# # implement different strategies
# if vab < vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
# elif strategy == 'vmin':
# if vab <= vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# elif strategy != 'constant':
# self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
# else:
# self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# # self.tx.turn_off()
# if switch_pwr_off:
# self.tx.pwr.pwr_state = 'off'
# if switch_tx_pwr_off:
# self.tx.pwr_state = 'off'
# rab = (np.abs(vab) * 1000.) / iab
# self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
# if vmn < 0:
# polarity = -1 # TODO: check if we really need to return polarity
# else:
# polarity = 1
# return vab, polarity, rab
def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
vab_max=voltage_max, vmn_min=voltage_min):
vab_max=voltage_max, vmn_min=voltage_min, polarities=(1, -1), delay=0.050):
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
......@@ -358,11 +444,22 @@ class OhmPiHardware:
Resistance between injection electrodes
"""
# Get those values from components
p_max = 2.5
vmn_max = 5.
# define a sill
diff_vab_lim = 2.5
n_steps = 4
# Set gain at min
self.rx.reset_gain()
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
# self.tx.turn_on()
switch_pwr_off, switch_tx_pwr_off = False, False #TODO: check if these should be moved in kwargs
switch_pwr_off, switch_tx_pwr_off = False, False # TODO: check if these should be moved in kwargs
if self.tx.pwr_state == 'off':
self.tx.pwr_state = 'on'
switch_tx_pwr_off = True
......@@ -373,35 +470,100 @@ class OhmPiHardware:
sampling_rate = 1. / pulse_duration # TODO: check this...
else:
sampling_rate = self.tx.sampling_rate
self._vab_pulse(vab=vab, duration=pulse_duration, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
vmn = np.mean(self.readings[:, 4])
iab = np.mean(self.readings[:, 3])
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
if strategy == 'vmax':
# implement different strategies
if vab < vab_max and iab < current_max:
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
elif strategy == 'vmin':
if vab <= vab_max and iab < current_max:
vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
elif strategy != 'constant':
self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
else:
self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# self.tx.turn_off()
if switch_pwr_off:
self.tx.pwr.pwr_state = 'off'
if switch_tx_pwr_off:
self.tx.pwr_state = 'off'
rab = (np.abs(vab) * 1000.) / iab
self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
if vmn < 0:
polarity = -1 # TODO: check if we really need to return polarity
else:
polarity = 1
return vab, polarity, rab
current, voltage = 0., 0.
if self.tx.pwr.voltage_adjustable:
self.tx.pwr.voltage = vab
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
k = 0
diff_vab = np.inf
while (k < n_steps) and (diff_vab > diff_vab_lim):
vabs = []
for pol in polarities:
# self.tx.polarity = pol
# set gains automatically
injection = Thread(target=self._inject, kwargs={'injection_duration': 0.2, 'polarity': pol})
readings = Thread(target=self._read_values)
injection.start()
self.tx_sync.wait()
readings.start()
readings.join()
injection.join()
v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0))[0] # NOTE : discard data aquired in the first x ms
iab = self.readings[v, 3]
vmn = self.readings[v, 4] * self.readings[v, 2]
iab_mean = np.mean(iab)
iab_std = np.std(iab)
vmn_mean = np.mean(vmn)
vmn_std = np.std(vmn)
print(f'iab: ({iab_mean:.5f}, {iab_std:5f}), vmn: ({vmn_mean:.4f}, {vmn_std:.4f})')
# bounds on iab
iab_upper_bound = iab_mean + 2 * iab_std
iab_lower_bound = np.max([0.00001, iab_mean - 2 * iab_std])
# bounds on vmn
vmn_upper_bound = vmn_mean + 2 * vmn_std
vmn_lower_bound = np.max([0.000001, vmn_mean - 2 * vmn_std])
# ax.plot([vab[k]] * 2, [vmn_lower_bound, vmn_upper_bound], '-b')
# ax.plot([0, vab_max], [0, vmn_upper_bound * vab_max / vab[k]], '-r', alpha=(k + 1) / n_steps)
# ax.plot([0, vab_max], [0, vmn_lower_bound * vab_max / vab[k]], '-g', alpha=(k + 1) / n_steps)
# bounds on rab
rab_lower_bound = vab[k] / iab_upper_bound
rab_upper_bound = vab[k] / iab_lower_bound
# bounds on r
r_lower_bound = vmn_lower_bound / iab_upper_bound
r_upper_bound = vmn_upper_bound / iab_lower_bound
# conditions for vab update
cond_vmn_max = rab_lower_bound / r_upper_bound * vmn_max
cond_p_max = np.sqrt(p_max * rab_lower_bound)
new_vab = np.min([vab_max, cond_vmn_max, cond_p_max])
diff_vab = np.abs(new_vab - vab[k])
print(f'Rab: [{rab_lower_bound:.1f}, {rab_upper_bound:.1f}], R: [{r_lower_bound:.1f},{r_upper_bound:.1f}]')
print(
f'{k}: [{vab_max:.1f}, {cond_vmn_max:.1f}, {cond_p_max:.1f}], new_vab: {new_vab}, diff_vab: {diff_vab}\n')
if new_vab == vab_max:
print('Vab bounded by Vab max')
elif new_vab == cond_p_max:
print('Vab bounded by Pmax')
elif new_vab == cond_vmn_max:
print('Vab bounded by Vmn max')
else:
print('Next step')
vab[k + 1] = new_vab
k = k + 1
vabs.append(new_vab)
if diff_vab < diff_vab_lim:
print('stopped on vab increase too small')
else:
print('stopped on maximum number of steps reached')
print(f'Selected Vab: {vab:.2f}')
vab = np.min(vabs)
print(f'Selected Vab: {vab:.2f}')
# if strategy == 'vmax':
# # implement different strategies
# if vab < vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
# elif strategy == 'vmin':
# if vab <= vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# elif strategy != 'constant':
# self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
# else:
# self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# # self.tx.turn_off()
# if switch_pwr_off:
# self.tx.pwr.pwr_state = 'off'
# if switch_tx_pwr_off:
# self.tx.pwr_state = 'off'
# rab = (np.abs(vab) * 1000.) / iab
# self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
# if vmn < 0:
# polarity = -1 # TODO: check if we really need to return polarity
# else:
# polarity = 1
return vab, None, None
def _plot_readings(self, save_fig=False):
# Plot graphs
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment