Commit 0ffc5e66 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Updates hardware_system for PEP and clean strategies

Showing with 103 additions and 61 deletions
+103 -61
......@@ -2,9 +2,10 @@ import importlib
import datetime
import time
import numpy as np
try:
import matplotlib.pyplot as plt
except Exception:
except Exception: # noqa
pass
from ohmpi.hardware_components.abstract_hardware_components import CtlAbstract
from ohmpi.logging_setup import create_stdout_logger
......@@ -13,8 +14,10 @@ from ohmpi.config import HARDWARE_CONFIG as HC
from threading import Thread, Event, Barrier, BrokenBarrierError
import warnings
# plt.switch_backend('agg') # for thread safe operations...
def elapsed_seconds(start_time):
lap = datetime.datetime.utcnow() - start_time
return lap.total_seconds()
......@@ -23,6 +26,7 @@ def elapsed_seconds(start_time):
class OhmPiHardware:
"""OhmPiHardware class.
"""
def __init__(self, **kwargs):
# OhmPiHardware initialization
self.exec_logger = kwargs.pop('exec_logger', create_stdout_logger('exec_hw'))
......@@ -34,7 +38,8 @@ class OhmPiHardware:
HARDWARE_CONFIG = self.hardware_config
print('hardware_config', HARDWARE_CONFIG)
# Define the default controller, a distinct controller could be defined for each tx, rx or mux board
# when using a distinct controller, the specific controller definition must be included in the component configuration
# when using a distinct controller, the specific controller definition must be included in the component
# configuration
ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["ctl"]["model"]}')
pwr_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["pwr"]["model"]}')
tx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}')
......@@ -62,11 +67,13 @@ class OhmPiHardware:
print(f'Cannot set value {v} in RX_CONFIG[{k}]:\n{e}')
self.current_max = np.min([TX_CONFIG['current_max'], HARDWARE_CONFIG['pwr'].pop('current_max', np.inf),
np.min(np.hstack(
(np.inf, [MUX_CONFIG[i].pop('current_max', np.inf) for i in MUX_CONFIG.keys()])))])
np.min(np.hstack(
(np.inf,
[MUX_CONFIG[i].pop('current_max', np.inf) for i in MUX_CONFIG.keys()])))])
self.voltage_max = np.min([TX_CONFIG['voltage_max'],
np.min(np.hstack(
(np.inf, [MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))])
np.min(np.hstack(
(np.inf,
[MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))])
print('maximums', self.voltage_max, self.current_max)
self.voltage_min = RX_CONFIG['voltage_min']
......@@ -97,10 +104,10 @@ class OhmPiHardware:
HARDWARE_CONFIG['rx'].update({'connection':
HARDWARE_CONFIG['rx'].pop('connection',
HARDWARE_CONFIG['rx']['ctl'].interfaces[
HARDWARE_CONFIG['rx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['rx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['rx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
'soh_logger': self.soh_logger})
HARDWARE_CONFIG['tx'].pop('ctl', None)
self.rx = kwargs.pop('rx', rx_module.Rx(**HARDWARE_CONFIG['rx']))
......@@ -153,7 +160,7 @@ class OhmPiHardware:
# if self.tx.specs['connect']:
# self.pwr_state = 'off'
# Join tX and pwr
# Join tX and pwr
self.tx.pwr = self.pwr
if not self.tx.pwr.voltage_adjustable:
self.tx.pwr._pwr_latency = 0
......@@ -175,7 +182,8 @@ class OhmPiHardware:
mux_ctl_module = importlib.import_module(f'ohmpi.hardware_components.{mux_config["ctl"]["model"]}')
mux_config['ctl'] = mux_ctl_module.Ctl(**mux_config['ctl']) # (**self.ctl)
assert issubclass(type(mux_config['ctl']), CtlAbstract)
mux_config.update({'connection': mux_config.pop('connection', mux_config['ctl'].interfaces[mux_config.pop('interface_name', 'i2c')])})
mux_config.update({'connection': mux_config.pop('connection', mux_config['ctl'].interfaces[
mux_config.pop('interface_name', 'i2c')])})
mux_config['id'] = mux_id
self.mux_boards[mux_id] = mux_module.Mux(**mux_config)
......@@ -185,10 +193,10 @@ class OhmPiHardware:
for mux_id, mux in self.mux_boards.items():
mux.barrier = self.mux_barrier
for k, v in mux.cabling.items():
update_dict(self._cabling, {k: (mux_id, k[0])}) #TODO: in theory k[0] is not needed in values
update_dict(self._cabling, {k: (mux_id, k[0])}) # TODO: in theory k[0] is not needed in values
# Complete OhmPiHardware initialization
self.readings = np.array([]) # time series of acquired data
self.sp = None # init SP
self.sp = None # init SP
self._start_time = None # time of the beginning of a readings acquisition
self._pulse = 0 # pulse number
self.exec_logger.event(f'OhmPiHardware\tinit\tend\t{datetime.datetime.utcnow()}')
......@@ -285,8 +293,8 @@ class OhmPiHardware:
_readings = []
else:
_readings = self.readings.tolist()
if test_r_shunt:
_current = []
# if test_r_shunt:
_current = []
if sampling_rate is None:
sampling_rate = self.rx.sampling_rate
sample = 0
......@@ -295,7 +303,7 @@ class OhmPiHardware:
if not append or self._start_time is None:
self._start_time = datetime.datetime.utcnow()
# TODO: Check if replacing the following two options by a reset_buffer method of TX would be OK
time.sleep(np.max([self.rx.latency, self.tx.latency])) # if continuous mode
time.sleep(np.max([self.rx.latency, self.tx.latency])) # if continuous mode
# _ = self.rx.voltage # if not continuous mode
while self.tx_sync.is_set():
......@@ -314,7 +322,7 @@ class OhmPiHardware:
sleep_time = self._start_time + datetime.timedelta(seconds=sample / sampling_rate) - lap
time.sleep(np.max([0., sleep_time.total_seconds()]))
self.exec_logger.debug(f'pulse {self._pulse}: elapsed time {(lap-self._start_time).total_seconds()} s')
self.exec_logger.debug(f'pulse {self._pulse}: elapsed time {(lap - self._start_time).total_seconds()} s')
self.exec_logger.debug(f'pulse {self._pulse}: total samples {len(_readings)}')
self.readings = np.array(_readings)
if test_r_shunt:
......@@ -349,7 +357,9 @@ class OhmPiHardware:
if self.sp is None:
self.last_sp(delay=delay)
if len(v) > 1:
return 100. * np.std(self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3]) / self.last_resistance(delay=delay)
return 100. * np.std(
self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3]) / self.last_resistance(
delay=delay)
else:
return np.nan
......@@ -385,7 +395,8 @@ class OhmPiHardware:
else:
return np.nan
def last_sp(self, delay=0.): # TODO: allow for different strategies for computing sp (i.e. when sp drift is not linear)
def last_sp(self,
delay=0.): # TODO: allow for different strategies for computing sp (i.e. when sp drift is not linear)
v = self.select_samples(delay)
if self.readings.shape == (0,) or len(self.readings[self.readings[:, 2] == 1, :]) < 1 or \
len(self.readings[self.readings[:, 2] == -1, :]) < 1:
......@@ -403,6 +414,32 @@ class OhmPiHardware:
# return sp
def _find_vab(self, vab, iab, vmn, p_max, vab_max, iab_max, vmn_max, vmn_min):
""" Finds the best injection voltage
Parameters
----------
vab: float
vab in use
iab: np.ndarray, list
series of current measured during the pulses
vmn: np.ndarray, list
p_max: float
vab_max: float
iab_max: float
vmn_max: float
vmn_min: float
Returns
-------
float
improved value for vab
"""
self.exec_logger.debug('Searching for the best Vab...')
iab_mean = np.mean(iab)
iab_std = np.std(iab)
......@@ -410,7 +447,7 @@ class OhmPiHardware:
vmn_std = np.std(vmn)
# print(f'iab: ({iab_mean:.5f}, {iab_std:5f}), vmn: ({vmn_mean:.4f}, {vmn_std:.4f})')
# bounds on iab
iab_upper_bound = iab_mean + 2 * iab_std
iab_upper_bound = iab_mean + 2 * iab_std # why no absolute upper bound?
iab_lower_bound = np.max([0.00001, iab_mean - 2 * iab_std])
# bounds on vmn
vmn_upper_bound = vmn_mean + 2 * vmn_std
......@@ -427,12 +464,14 @@ class OhmPiHardware:
cond_p_max = np.sqrt(p_max * rab_lower_bound)
cond_iab_max = rab_lower_bound * iab_max
# print(f'Rab: [{rab_lower_bound:.1f}, {rab_upper_bound:.1f}], R: [{r_lower_bound:.1f},{r_upper_bound:.1f}]')
self.exec_logger.debug(f'[vab_max: {vab_max:.1f}, vmn_max: {cond_vmn_max:.1f}, vmn_min: {cond_vmn_min:.1f}, p_max: {cond_p_max:.1f}, iab_max: {cond_iab_max:.1f}]')
self.exec_logger.debug(
f'[vab_max: {vab_max:.1f}, vmn_max: {cond_vmn_max:.1f}, vmn_min: {cond_vmn_min:.1f}, '
f'p_max: {cond_p_max:.1f}, iab_max: {cond_iab_max:.1f}]')
new_vab = np.min([vab_max, cond_vmn_max, cond_p_max, cond_iab_max])
if new_vab == vab_max:
self.exec_logger.debug(f'Vab {new_vab} bounded by Vab max')
elif new_vab == cond_p_max:
self.exec_logger.debug(f'Vab {vab } bounded by P max')
self.exec_logger.debug(f'Vab {vab} bounded by P max')
elif new_vab == cond_iab_max:
self.exec_logger.debug(f'Vab {vab} bounded by Iab max')
elif new_vab == cond_vmn_max:
......@@ -443,13 +482,11 @@ class OhmPiHardware:
return new_vab
def compute_vab(self, pulse_duration=0.1, strategy='vmax', vab=5., vab_max=None,
iab_max=None, vmn_max=None, vmn_min=None, polarities=[1, -1], delay=0.05,
iab_max=None, vmn_max=None, vmn_min=None, polarities=(1, -1), delay=0.05,
p_max=None, diff_vab_lim=2.5, n_steps=4):
# TODO: Optimise how to pass iab_max, vab_max, vmn_min
# TODO: Update docstring
"""Estimates best Vab voltage based on different strategies.
""" Estimates best Vab voltage based on different strategies.
In "vmax" and "vmin" strategies, we iteratively increase/decrease the vab while
checking vmn < vmn_max, vmn > vmn_min and iab < iab_max. We do maximum n_steps
checking vmn < vmn_max, vmn > vmn_min and iab < iab_max. We do a maximum of n_steps
and when the difference between the two steps is below diff_vab_lim or we
reached the maximum number of steps, we return the vab found.
......@@ -459,7 +496,7 @@ class OhmPiHardware:
Time in seconds for the pulse used to compute optimal Vab.
strategy : str, optional
Either:
- vmax : compute Vab to reach a maximum Iab without exceeding vab_max
- vmax : compute Vab to reach a maximum Iab without exceeding vab_max or p_max
- vmin : compute Vab to reach at least vmn_min
- constant : apply given Vab, if vab > vab_max then strategy is changed to vmax
- full_constant : apply given Vab, no checking with vmax (at your own risk!)
......@@ -490,15 +527,22 @@ class OhmPiHardware:
-------
vab : float
Proposed Vab according to the given strategy.
:param vmn_min:
:param vmn_max:
:param iab_max:
"""
vab_opt = np.abs(vab)
# TODO: Optimise how to pass iab_max, vab_max, vmn_min
# TODO: Update docstring
# TODO: replace vmn_min and vmn_max by vmn_requested
vab_opt = np.abs(vab)
polarities = list(polarities)
if not self.tx.pwr.voltage_adjustable:
vab_opt = self.tx.pwr.voltage
else:
if strategy == 'full_constant':
return vab
if vmn_max is None:
vmn_max = self.rx._voltage_max / 1000.
if iab_max is None:
......@@ -528,11 +572,9 @@ class OhmPiHardware:
k = 0
vab_list = np.zeros(n_steps + 1) * np.nan
vab_list[k] = vab
# self.tx.turn_on()
switch_pwr_off, switch_tx_pwr_off = False, False # TODO: check if these should be moved in kwargs
if self.pwr_state == 'off':
self.pwr_state = 'on'
switch_tx_pwr_off = True
# Switches on measuring LED
self.tx.measuring = 'on'
......@@ -540,7 +582,6 @@ class OhmPiHardware:
self.tx.voltage = vab
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
if 1. / self.rx.sampling_rate > pulse_duration:
sampling_rate = 1. / pulse_duration # TODO: check this...
......@@ -552,19 +593,21 @@ class OhmPiHardware:
while (k < n_steps) and (diff_vab > diff_vab_lim) and (vab_list[k] < vab_max):
self.exec_logger.event(
f'OhmPiHardware\t_compute_vab_sleep\tbegin\t{datetime.datetime.utcnow()}')
time.sleep(
0.2) # TODO: replace this by discharging DPS on resistor with relay on GPIO5 (at least for strategy vmin,
# but might be useful in vmax when last vab too high...)
# time.sleep(0.2) # TODO: replace this by discharging DPS on resistor with relay on GPIO5
# (at least for strategy vmin,
# but might be useful in vmax when last vab too high...)
self.exec_logger.event(
f'OhmPiHardware\t_compute_vab_sleep\tend\t{datetime.datetime.utcnow()}')
if strategy == 'vmax':
vmn_min = vmn_max
vabs = []
self._vab_pulses(vab_list[k], sampling_rate=self.rx.sampling_rate, durations=[pulse_duration, pulse_duration], polarities=polarities)
for pulse in range(2):
v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0) & (self.readings[:, 1] == pulse))[0] # NOTE : discard data aquired in the first x ms
iab = self.readings[v, 3]/1000.
vmn = np.abs(self.readings[v, 4]/1000. * self.readings[v, 2])
self._vab_pulses(vab_list[k], sampling_rate=self.rx.sampling_rate,
durations=[pulse_duration, pulse_duration], polarities=polarities)
for pulse in range(len(polarities)):
v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0) & (
self.readings[:, 1] == pulse))[0] # NOTE : discard data acquired in the first x ms
iab = self.readings[v, 3] / 1000.
vmn = np.abs(self.readings[v, 4] / 1000. * self.readings[v, 2])
new_vab = self._find_vab(vab_list[k], iab, vmn, p_max, vab_max, iab_max, vmn_max, vmn_min)
diff_vab = np.abs(new_vab - vab_list[k])
vabs.append(new_vab)
......@@ -578,17 +621,14 @@ class OhmPiHardware:
if k > n_steps:
self.exec_logger.debug('Compute_vab stopped on maximum number of steps reached')
vab_opt = vab_list[k]
# print(f'Selected Vab: {vab_opt:.2f}')
# if switch_pwr_off:
# self.tx.pwr.pwr_state = 'off'
return vab_opt
def discharge_pwr(self):
if self.tx.pwr.voltage_adjustable:
#TODO: implement strategy to discharge pwr based on hardware version => variable in TX should tell if it can discharge the pwr or not
# TODO: implement strategy to discharge pwr based on hardware version => variable in TX should tell if it can discharge the pwr or not
### if discharge relay manually add on mb_2024_0_2, then should not activate AB relays but simply wait for automatic discharge
### if mb_20240_1_X then TX should handle the pwr discharge
### if mb_2024_1_X then TX should handle the pwr discharge
# time.sleep(1.0)
self.tx.discharge_pwr()
......@@ -604,7 +644,7 @@ class OhmPiHardware:
fig, ax = plt.subplots(nrows=5, sharex=True)
ax[0].plot(self.readings[:, 0], self.readings[:, 3], '-r', marker='.', label='iab')
ax[0].set_ylabel('Iab [mA]')
ax[1].plot(self.readings[:, 0], self.readings[:, 4] - self.sp , '-b', marker='.', label='vmn')
ax[1].plot(self.readings[:, 0], self.readings[:, 4] - self.sp, '-b', marker='.', label='vmn')
ax[1].set_ylabel('Vmn [mV]')
ax[2].plot(self.readings[:, 0], self.readings[:, 2], '-g', marker='.', label='polarity')
ax[2].set_ylabel('polarity [-]')
......@@ -628,7 +668,7 @@ class OhmPiHardware:
def calibrate_rx_bias(self):
self.rx.bias += (np.mean(self.readings[self.readings[:, 2] == 1, 4])
+ np.mean(self.readings[self.readings[:, 2] == -1, 4])) / 2.
+ np.mean(self.readings[self.readings[:, 2] == -1, 4])) / 2.
def vab_square_wave(self, vab, cycle_duration, sampling_rate=None, cycles=3, polarity=1, duty_cycle=1.,
append=False):
......@@ -658,18 +698,18 @@ class OhmPiHardware:
self.pwr_state = 'on'
switch_tx_pwr_off = True
#Switches on measuring LED
# Switches on measuring LED
if self.tx.measuring == 'off':
self.tx.measuring = 'on'
if self.tx.pwr.pwr_state == 'off':
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
self.tx.pwr.pwr_state = 'on'
switch_pwr_off = True
self._gain_auto(vab=vab)
assert 0. <= duty_cycle <= 1.
if duty_cycle < 1.:
durations = [cycle_duration/2 * duty_cycle, cycle_duration/2 * (1.-duty_cycle)] * 2 * cycles
durations = [cycle_duration / 2 * duty_cycle, cycle_duration / 2 * (1. - duty_cycle)] * 2 * cycles
pol = [-int(polarity * np.heaviside(i % 2, -1.)) for i in range(2 * cycles)]
# pol = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in range(2 * cycles)]
polarities = [0] * (len(pol) * 2)
......@@ -677,7 +717,7 @@ class OhmPiHardware:
else:
durations = [cycle_duration / 2] * 2 * cycles
polarities = [-int(polarity * np.heaviside(i % 2, -1.)) for i in range(2 * cycles)]
self._vab_pulses(vab, durations, sampling_rate, polarities=polarities, append=append)
self._vab_pulses(vab, durations, sampling_rate, polarities=polarities, append=append)
self.exec_logger.event(f'OhmPiHardware\tvab_square_wave\tend\t{datetime.datetime.utcnow()}')
if switch_pwr_off:
self.tx.pwr.pwr_state = 'off'
......@@ -689,7 +729,7 @@ class OhmPiHardware:
def _vab_pulse(self, vab=None, duration=1., sampling_rate=None, polarity=1, append=False):
""" Gets VMN and IAB from a single voltage pulse
"""
#self.tx.polarity = polarity
# self.tx.polarity = polarity
if sampling_rate is None:
sampling_rate = self.sampling_rate
if self.tx.pwr.voltage_adjustable:
......@@ -712,7 +752,7 @@ class OhmPiHardware:
injection.start()
readings.join()
injection.join()
self.tx.polarity = 0 #TODO: is this necessary?
self.tx.polarity = 0 # TODO: is this necessary?
if switch_pwr_off:
self.tx.pwr.pwr_state = 'off'
......@@ -740,7 +780,8 @@ class OhmPiHardware:
if polarities is not None:
assert len(polarities) == n_pulses
else:
polarities = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in range(n_pulses)] #TODO: this doesn't work if tx.polarity=0 which is the case at init...
polarities = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in
range(n_pulses)] # TODO: this doesn't work if tx.polarity=0 which is the case at init...
if not append:
self._clear_values()
self.sp = None # re-initialise SP before new Vab_pulses
......@@ -751,7 +792,7 @@ class OhmPiHardware:
self.tx.pwr.pwr_state = 'off'
if switch_tx_pwr_off:
self.pwr_state = 'off'
def switch_mux(self, electrodes, roles=None, state='off', **kwargs):
"""Switches on multiplexer relays for given quadrupole.
......@@ -784,13 +825,14 @@ class OhmPiHardware:
status = False
if status:
mux_workers = list(set(mux_workers))
b = Barrier(len(mux_workers)+1)
b = Barrier(len(mux_workers) + 1)
self.mux_barrier = b
for idx, mux in enumerate(mux_workers):
# Create a new thread to perform some work
self.mux_boards[mux].barrier = b
kwargs.update({'elec_dict': elec_dict, 'state': state})
mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs=kwargs) # TODO: handle minimum delay between two relays activation (to avoid lagging during test_mux at high speed)
mux_workers[idx] = Thread(target=self.mux_boards[mux].switch,
kwargs=kwargs) # TODO: handle minimum delay between two relays activation (to avoid lagging during test_mux at high speed)
mux_workers[idx].start()
try:
self.mux_barrier.wait()
......@@ -806,7 +848,7 @@ class OhmPiHardware:
self.exec_logger.event(f'OhmPiHardware\tswitch_mux\tend\t{datetime.datetime.utcnow()}')
return status
def test_mux(self, channel=None, activation_time=1.0): #TODO: add test in reverse order on each mux board
def test_mux(self, channel=None, activation_time=1.0): # TODO: add test in reverse order on each mux board
"""Interactive method to test the multiplexer.
Parameters
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment