Commit b60bf0de authored by Arnaud WATLET's avatar Arnaud WATLET
Browse files

Updates compute_tx_volt kwargs

Showing with 121 additions and 121 deletions
+121 -121
......@@ -403,8 +403,9 @@ class OhmPiHardware:
# polarity = 1
# return vab, polarity, rab
def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
vab_max=voltage_max, vmn_min=voltage_min, polarities=(1, -1), delay=0.050):
def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5., vab_max=voltage_max,
iab_max=current_max, vmn_max = 5., vmn_min=voltage_min, polarities=(1, -1), delay=0.050):
# TODO: Optimise how to pass iab_max, vab_max, vmn_min
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
......@@ -443,127 +444,126 @@ class OhmPiHardware:
rab : float
Resistance between injection electrodes
"""
# Get those values from components
p_max = 2.5
vmn_max = 5.
# define a sill
diff_vab_lim = 2.5
n_steps = 4
# Set gain at min
self.rx.reset_gain()
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
# self.tx.turn_on()
switch_pwr_off, switch_tx_pwr_off = False, False # TODO: check if these should be moved in kwargs
if self.tx.pwr_state == 'off':
self.tx.pwr_state = 'on'
switch_tx_pwr_off = True
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
if 1. / self.rx.sampling_rate > pulse_duration:
sampling_rate = 1. / pulse_duration # TODO: check this...
else:
sampling_rate = self.tx.sampling_rate
current, voltage = 0., 0.
if self.tx.pwr.voltage_adjustable:
self.tx.pwr.voltage = vab
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
k = 0
diff_vab = np.inf
while (k < n_steps) and (diff_vab > diff_vab_lim):
vabs = []
for pol in polarities:
# self.tx.polarity = pol
# set gains automatically
injection = Thread(target=self._inject, kwargs={'injection_duration': 0.2, 'polarity': pol})
readings = Thread(target=self._read_values)
injection.start()
self.tx_sync.wait()
readings.start()
readings.join()
injection.join()
v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0))[0] # NOTE : discard data aquired in the first x ms
iab = self.readings[v, 3]
vmn = self.readings[v, 4] * self.readings[v, 2]
iab_mean = np.mean(iab)
iab_std = np.std(iab)
vmn_mean = np.mean(vmn)
vmn_std = np.std(vmn)
print(f'iab: ({iab_mean:.5f}, {iab_std:5f}), vmn: ({vmn_mean:.4f}, {vmn_std:.4f})')
# bounds on iab
iab_upper_bound = iab_mean + 2 * iab_std
iab_lower_bound = np.max([0.00001, iab_mean - 2 * iab_std])
# bounds on vmn
vmn_upper_bound = vmn_mean + 2 * vmn_std
vmn_lower_bound = np.max([0.000001, vmn_mean - 2 * vmn_std])
# ax.plot([vab[k]] * 2, [vmn_lower_bound, vmn_upper_bound], '-b')
# ax.plot([0, vab_max], [0, vmn_upper_bound * vab_max / vab[k]], '-r', alpha=(k + 1) / n_steps)
# ax.plot([0, vab_max], [0, vmn_lower_bound * vab_max / vab[k]], '-g', alpha=(k + 1) / n_steps)
# bounds on rab
rab_lower_bound = vab[k] / iab_upper_bound
rab_upper_bound = vab[k] / iab_lower_bound
# bounds on r
r_lower_bound = vmn_lower_bound / iab_upper_bound
r_upper_bound = vmn_upper_bound / iab_lower_bound
# conditions for vab update
cond_vmn_max = rab_lower_bound / r_upper_bound * vmn_max
cond_p_max = np.sqrt(p_max * rab_lower_bound)
new_vab = np.min([vab_max, cond_vmn_max, cond_p_max])
diff_vab = np.abs(new_vab - vab[k])
print(f'Rab: [{rab_lower_bound:.1f}, {rab_upper_bound:.1f}], R: [{r_lower_bound:.1f},{r_upper_bound:.1f}]')
print(
f'{k}: [{vab_max:.1f}, {cond_vmn_max:.1f}, {cond_p_max:.1f}], new_vab: {new_vab}, diff_vab: {diff_vab}\n')
if new_vab == vab_max:
print('Vab bounded by Vab max')
elif new_vab == cond_p_max:
print('Vab bounded by Pmax')
elif new_vab == cond_vmn_max:
print('Vab bounded by Vmn max')
else:
print('Next step')
vab[k + 1] = new_vab
k = k + 1
vabs.append(new_vab)
if diff_vab < diff_vab_lim:
print('stopped on vab increase too small')
else:
print('stopped on maximum number of steps reached')
# Get those values from components
p_max = vab_max * iab_max
# define a sill
diff_vab_lim = 2.5
n_steps = 4
# Set gain at min
self.rx.reset_gain()
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
# self.tx.turn_on()
switch_pwr_off, switch_tx_pwr_off = False, False # TODO: check if these should be moved in kwargs
if self.tx.pwr_state == 'off':
self.tx.pwr_state = 'on'
switch_tx_pwr_off = True
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
if 1. / self.rx.sampling_rate > pulse_duration:
sampling_rate = 1. / pulse_duration # TODO: check this...
else:
sampling_rate = self.tx.sampling_rate
current, voltage = 0., 0.
if self.tx.pwr.voltage_adjustable:
self.tx.pwr.voltage = vab
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
k = 0
diff_vab = np.inf
while (k < n_steps) and (diff_vab > diff_vab_lim):
vabs = []
for pol in polarities:
# self.tx.polarity = pol
# set gains automatically
injection = Thread(target=self._inject, kwargs={'injection_duration': 0.2, 'polarity': pol})
readings = Thread(target=self._read_values)
injection.start()
self.tx_sync.wait()
readings.start()
readings.join()
injection.join()
v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0))[0] # NOTE : discard data aquired in the first x ms
iab = self.readings[v, 3]
vmn = self.readings[v, 4] * self.readings[v, 2]
iab_mean = np.mean(iab)
iab_std = np.std(iab)
vmn_mean = np.mean(vmn)
vmn_std = np.std(vmn)
print(f'iab: ({iab_mean:.5f}, {iab_std:5f}), vmn: ({vmn_mean:.4f}, {vmn_std:.4f})')
# bounds on iab
iab_upper_bound = iab_mean + 2 * iab_std
iab_lower_bound = np.max([0.00001, iab_mean - 2 * iab_std])
# bounds on vmn
vmn_upper_bound = vmn_mean + 2 * vmn_std
vmn_lower_bound = np.max([0.000001, vmn_mean - 2 * vmn_std])
# ax.plot([vab[k]] * 2, [vmn_lower_bound, vmn_upper_bound], '-b')
# ax.plot([0, vab_max], [0, vmn_upper_bound * vab_max / vab[k]], '-r', alpha=(k + 1) / n_steps)
# ax.plot([0, vab_max], [0, vmn_lower_bound * vab_max / vab[k]], '-g', alpha=(k + 1) / n_steps)
# bounds on rab
rab_lower_bound = vab[k] / iab_upper_bound
rab_upper_bound = vab[k] / iab_lower_bound
# bounds on r
r_lower_bound = vmn_lower_bound / iab_upper_bound
r_upper_bound = vmn_upper_bound / iab_lower_bound
# conditions for vab update
cond_vmn_max = rab_lower_bound / r_upper_bound * vmn_max
cond_p_max = np.sqrt(p_max * rab_lower_bound)
new_vab = np.min([vab_max, cond_vmn_max, cond_p_max])
diff_vab = np.abs(new_vab - vab[k])
print(f'Rab: [{rab_lower_bound:.1f}, {rab_upper_bound:.1f}], R: [{r_lower_bound:.1f},{r_upper_bound:.1f}]')
print(
f'{k}: [{vab_max:.1f}, {cond_vmn_max:.1f}, {cond_p_max:.1f}], new_vab: {new_vab}, diff_vab: {diff_vab}\n')
if new_vab == vab_max:
print('Vab bounded by Vab max')
elif new_vab == cond_p_max:
print('Vab bounded by Pmax')
elif new_vab == cond_vmn_max:
print('Vab bounded by Vmn max')
else:
print('Next step')
vab[k + 1] = new_vab
k = k + 1
vabs.append(new_vab)
if diff_vab < diff_vab_lim:
print('stopped on vab increase too small')
else:
print('stopped on maximum number of steps reached')
print(f'Selected Vab: {vab:.2f}')
vab = np.min(vabs)
print(f'Selected Vab: {vab:.2f}')
vab = np.min(vabs)
print(f'Selected Vab: {vab:.2f}')
# if strategy == 'vmax':
# # implement different strategies
# if vab < vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
# elif strategy == 'vmin':
# if vab <= vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# elif strategy != 'constant':
# self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
# else:
# self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# # self.tx.turn_off()
# if switch_pwr_off:
# self.tx.pwr.pwr_state = 'off'
# if switch_tx_pwr_off:
# self.tx.pwr_state = 'off'
# rab = (np.abs(vab) * 1000.) / iab
# self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
# if vmn < 0:
# polarity = -1 # TODO: check if we really need to return polarity
# else:
# polarity = 1
return vab, None, None
# if strategy == 'vmax':
# # implement different strategies
# if vab < vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
# elif strategy == 'vmin':
# if vab <= vab_max and iab < current_max:
# vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
# elif strategy != 'constant':
# self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
# else:
# self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
# # self.tx.turn_off()
# if switch_pwr_off:
# self.tx.pwr.pwr_state = 'off'
# if switch_tx_pwr_off:
# self.tx.pwr_state = 'off'
# rab = (np.abs(vab) * 1000.) / iab
# self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
# if vmn < 0:
# polarity = -1 # TODO: check if we really need to return polarity
# else:
# polarity = 1
return vab
def _plot_readings(self, save_fig=False):
# Plot graphs
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment