diff --git a/ohmpi/hardware_system.py b/ohmpi/hardware_system.py index ebaf5d1da76f8798b03e88ed19d36d80c15c8200..339b4dcc737c92e4db3d3bb42b79cd14008a3f05 100644 --- a/ohmpi/hardware_system.py +++ b/ohmpi/hardware_system.py @@ -2,9 +2,10 @@ import importlib import datetime import time import numpy as np + try: import matplotlib.pyplot as plt -except Exception: +except Exception: # noqa pass from ohmpi.hardware_components.abstract_hardware_components import CtlAbstract from ohmpi.logging_setup import create_stdout_logger @@ -13,8 +14,10 @@ from ohmpi.config import HARDWARE_CONFIG as HC from threading import Thread, Event, Barrier, BrokenBarrierError import warnings + # plt.switch_backend('agg') # for thread safe operations... + def elapsed_seconds(start_time): lap = datetime.datetime.utcnow() - start_time return lap.total_seconds() @@ -23,6 +26,7 @@ def elapsed_seconds(start_time): class OhmPiHardware: """OhmPiHardware class. """ + def __init__(self, **kwargs): # OhmPiHardware initialization self.exec_logger = kwargs.pop('exec_logger', create_stdout_logger('exec_hw')) @@ -34,7 +38,8 @@ class OhmPiHardware: HARDWARE_CONFIG = self.hardware_config print('hardware_config', HARDWARE_CONFIG) # Define the default controller, a distinct controller could be defined for each tx, rx or mux board - # when using a distinct controller, the specific controller definition must be included in the component configuration + # when using a distinct controller, the specific controller definition must be included in the component + # configuration ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["ctl"]["model"]}') pwr_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["pwr"]["model"]}') tx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}') @@ -62,11 +67,13 @@ class OhmPiHardware: print(f'Cannot set value {v} in RX_CONFIG[{k}]:\n{e}') self.current_max = np.min([TX_CONFIG['current_max'], HARDWARE_CONFIG['pwr'].pop('current_max', np.inf), - np.min(np.hstack( - (np.inf, [MUX_CONFIG[i].pop('current_max', np.inf) for i in MUX_CONFIG.keys()])))]) + np.min(np.hstack( + (np.inf, + [MUX_CONFIG[i].pop('current_max', np.inf) for i in MUX_CONFIG.keys()])))]) self.voltage_max = np.min([TX_CONFIG['voltage_max'], - np.min(np.hstack( - (np.inf, [MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))]) + np.min(np.hstack( + (np.inf, + [MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))]) print('maximums', self.voltage_max, self.current_max) self.voltage_min = RX_CONFIG['voltage_min'] @@ -97,10 +104,10 @@ class OhmPiHardware: HARDWARE_CONFIG['rx'].update({'connection': HARDWARE_CONFIG['rx'].pop('connection', HARDWARE_CONFIG['rx']['ctl'].interfaces[ - HARDWARE_CONFIG['rx'].pop( - 'interface_name', 'i2c')])}) + HARDWARE_CONFIG['rx'].pop( + 'interface_name', 'i2c')])}) HARDWARE_CONFIG['rx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger, - 'soh_logger': self.soh_logger}) + 'soh_logger': self.soh_logger}) HARDWARE_CONFIG['tx'].pop('ctl', None) self.rx = kwargs.pop('rx', rx_module.Rx(**HARDWARE_CONFIG['rx'])) @@ -153,7 +160,7 @@ class OhmPiHardware: # if self.tx.specs['connect']: # self.pwr_state = 'off' - # Join tX and pwr + # Join tX and pwr self.tx.pwr = self.pwr if not self.tx.pwr.voltage_adjustable: self.tx.pwr._pwr_latency = 0 @@ -175,7 +182,8 @@ class OhmPiHardware: mux_ctl_module = importlib.import_module(f'ohmpi.hardware_components.{mux_config["ctl"]["model"]}') mux_config['ctl'] = mux_ctl_module.Ctl(**mux_config['ctl']) # (**self.ctl) assert issubclass(type(mux_config['ctl']), CtlAbstract) - mux_config.update({'connection': mux_config.pop('connection', mux_config['ctl'].interfaces[mux_config.pop('interface_name', 'i2c')])}) + mux_config.update({'connection': mux_config.pop('connection', mux_config['ctl'].interfaces[ + mux_config.pop('interface_name', 'i2c')])}) mux_config['id'] = mux_id self.mux_boards[mux_id] = mux_module.Mux(**mux_config) @@ -185,10 +193,10 @@ class OhmPiHardware: for mux_id, mux in self.mux_boards.items(): mux.barrier = self.mux_barrier for k, v in mux.cabling.items(): - update_dict(self._cabling, {k: (mux_id, k[0])}) #TODO: in theory k[0] is not needed in values + update_dict(self._cabling, {k: (mux_id, k[0])}) # TODO: in theory k[0] is not needed in values # Complete OhmPiHardware initialization self.readings = np.array([]) # time series of acquired data - self.sp = None # init SP + self.sp = None # init SP self._start_time = None # time of the beginning of a readings acquisition self._pulse = 0 # pulse number self.exec_logger.event(f'OhmPiHardware\tinit\tend\t{datetime.datetime.utcnow()}') @@ -285,8 +293,8 @@ class OhmPiHardware: _readings = [] else: _readings = self.readings.tolist() - if test_r_shunt: - _current = [] + # if test_r_shunt: + _current = [] if sampling_rate is None: sampling_rate = self.rx.sampling_rate sample = 0 @@ -295,7 +303,7 @@ class OhmPiHardware: if not append or self._start_time is None: self._start_time = datetime.datetime.utcnow() # TODO: Check if replacing the following two options by a reset_buffer method of TX would be OK - time.sleep(np.max([self.rx.latency, self.tx.latency])) # if continuous mode + time.sleep(np.max([self.rx.latency, self.tx.latency])) # if continuous mode # _ = self.rx.voltage # if not continuous mode while self.tx_sync.is_set(): @@ -314,7 +322,7 @@ class OhmPiHardware: sleep_time = self._start_time + datetime.timedelta(seconds=sample / sampling_rate) - lap time.sleep(np.max([0., sleep_time.total_seconds()])) - self.exec_logger.debug(f'pulse {self._pulse}: elapsed time {(lap-self._start_time).total_seconds()} s') + self.exec_logger.debug(f'pulse {self._pulse}: elapsed time {(lap - self._start_time).total_seconds()} s') self.exec_logger.debug(f'pulse {self._pulse}: total samples {len(_readings)}') self.readings = np.array(_readings) if test_r_shunt: @@ -349,7 +357,9 @@ class OhmPiHardware: if self.sp is None: self.last_sp(delay=delay) if len(v) > 1: - return 100. * np.std(self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3]) / self.last_resistance(delay=delay) + return 100. * np.std( + self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3]) / self.last_resistance( + delay=delay) else: return np.nan @@ -385,7 +395,8 @@ class OhmPiHardware: else: return np.nan - def last_sp(self, delay=0.): # TODO: allow for different strategies for computing sp (i.e. when sp drift is not linear) + def last_sp(self, + delay=0.): # TODO: allow for different strategies for computing sp (i.e. when sp drift is not linear) v = self.select_samples(delay) if self.readings.shape == (0,) or len(self.readings[self.readings[:, 2] == 1, :]) < 1 or \ len(self.readings[self.readings[:, 2] == -1, :]) < 1: @@ -403,6 +414,32 @@ class OhmPiHardware: # return sp def _find_vab(self, vab, iab, vmn, p_max, vab_max, iab_max, vmn_max, vmn_min): + """ Finds the best injection voltage + + Parameters + ---------- + vab: float + vab in use + iab: np.ndarray, list + series of current measured during the pulses + vmn: np.ndarray, list + + p_max: float + + vab_max: float + + iab_max: float + + vmn_max: float + + vmn_min: float + + Returns + ------- + float + improved value for vab + + """ self.exec_logger.debug('Searching for the best Vab...') iab_mean = np.mean(iab) iab_std = np.std(iab) @@ -410,7 +447,7 @@ class OhmPiHardware: vmn_std = np.std(vmn) # print(f'iab: ({iab_mean:.5f}, {iab_std:5f}), vmn: ({vmn_mean:.4f}, {vmn_std:.4f})') # bounds on iab - iab_upper_bound = iab_mean + 2 * iab_std + iab_upper_bound = iab_mean + 2 * iab_std # why no absolute upper bound? iab_lower_bound = np.max([0.00001, iab_mean - 2 * iab_std]) # bounds on vmn vmn_upper_bound = vmn_mean + 2 * vmn_std @@ -427,12 +464,14 @@ class OhmPiHardware: cond_p_max = np.sqrt(p_max * rab_lower_bound) cond_iab_max = rab_lower_bound * iab_max # print(f'Rab: [{rab_lower_bound:.1f}, {rab_upper_bound:.1f}], R: [{r_lower_bound:.1f},{r_upper_bound:.1f}]') - self.exec_logger.debug(f'[vab_max: {vab_max:.1f}, vmn_max: {cond_vmn_max:.1f}, vmn_min: {cond_vmn_min:.1f}, p_max: {cond_p_max:.1f}, iab_max: {cond_iab_max:.1f}]') + self.exec_logger.debug( + f'[vab_max: {vab_max:.1f}, vmn_max: {cond_vmn_max:.1f}, vmn_min: {cond_vmn_min:.1f}, ' + f'p_max: {cond_p_max:.1f}, iab_max: {cond_iab_max:.1f}]') new_vab = np.min([vab_max, cond_vmn_max, cond_p_max, cond_iab_max]) if new_vab == vab_max: self.exec_logger.debug(f'Vab {new_vab} bounded by Vab max') elif new_vab == cond_p_max: - self.exec_logger.debug(f'Vab {vab } bounded by P max') + self.exec_logger.debug(f'Vab {vab} bounded by P max') elif new_vab == cond_iab_max: self.exec_logger.debug(f'Vab {vab} bounded by Iab max') elif new_vab == cond_vmn_max: @@ -443,13 +482,11 @@ class OhmPiHardware: return new_vab def compute_vab(self, pulse_duration=0.1, strategy='vmax', vab=5., vab_max=None, - iab_max=None, vmn_max=None, vmn_min=None, polarities=[1, -1], delay=0.05, + iab_max=None, vmn_max=None, vmn_min=None, polarities=(1, -1), delay=0.05, p_max=None, diff_vab_lim=2.5, n_steps=4): - # TODO: Optimise how to pass iab_max, vab_max, vmn_min - # TODO: Update docstring - """Estimates best Vab voltage based on different strategies. + """ Estimates best Vab voltage based on different strategies. In "vmax" and "vmin" strategies, we iteratively increase/decrease the vab while - checking vmn < vmn_max, vmn > vmn_min and iab < iab_max. We do maximum n_steps + checking vmn < vmn_max, vmn > vmn_min and iab < iab_max. We do a maximum of n_steps and when the difference between the two steps is below diff_vab_lim or we reached the maximum number of steps, we return the vab found. @@ -459,7 +496,7 @@ class OhmPiHardware: Time in seconds for the pulse used to compute optimal Vab. strategy : str, optional Either: - - vmax : compute Vab to reach a maximum Iab without exceeding vab_max + - vmax : compute Vab to reach a maximum Iab without exceeding vab_max or p_max - vmin : compute Vab to reach at least vmn_min - constant : apply given Vab, if vab > vab_max then strategy is changed to vmax - full_constant : apply given Vab, no checking with vmax (at your own risk!) @@ -490,15 +527,22 @@ class OhmPiHardware: ------- vab : float Proposed Vab according to the given strategy. + :param vmn_min: + :param vmn_max: + :param iab_max: """ - vab_opt = np.abs(vab) + # TODO: Optimise how to pass iab_max, vab_max, vmn_min + # TODO: Update docstring + # TODO: replace vmn_min and vmn_max by vmn_requested + + vab_opt = np.abs(vab) + polarities = list(polarities) if not self.tx.pwr.voltage_adjustable: vab_opt = self.tx.pwr.voltage else: if strategy == 'full_constant': return vab - if vmn_max is None: vmn_max = self.rx._voltage_max / 1000. if iab_max is None: @@ -528,11 +572,9 @@ class OhmPiHardware: k = 0 vab_list = np.zeros(n_steps + 1) * np.nan vab_list[k] = vab - # self.tx.turn_on() - switch_pwr_off, switch_tx_pwr_off = False, False # TODO: check if these should be moved in kwargs + if self.pwr_state == 'off': self.pwr_state = 'on' - switch_tx_pwr_off = True # Switches on measuring LED self.tx.measuring = 'on' @@ -540,7 +582,6 @@ class OhmPiHardware: self.tx.voltage = vab if self.tx.pwr.pwr_state == 'off': self.tx.pwr.pwr_state = 'on' - switch_pwr_off = True if 1. / self.rx.sampling_rate > pulse_duration: sampling_rate = 1. / pulse_duration # TODO: check this... @@ -552,19 +593,21 @@ class OhmPiHardware: while (k < n_steps) and (diff_vab > diff_vab_lim) and (vab_list[k] < vab_max): self.exec_logger.event( f'OhmPiHardware\t_compute_vab_sleep\tbegin\t{datetime.datetime.utcnow()}') - time.sleep( - 0.2) # TODO: replace this by discharging DPS on resistor with relay on GPIO5 (at least for strategy vmin, - # but might be useful in vmax when last vab too high...) + # time.sleep(0.2) # TODO: replace this by discharging DPS on resistor with relay on GPIO5 + # (at least for strategy vmin, + # but might be useful in vmax when last vab too high...) self.exec_logger.event( f'OhmPiHardware\t_compute_vab_sleep\tend\t{datetime.datetime.utcnow()}') if strategy == 'vmax': vmn_min = vmn_max vabs = [] - self._vab_pulses(vab_list[k], sampling_rate=self.rx.sampling_rate, durations=[pulse_duration, pulse_duration], polarities=polarities) - for pulse in range(2): - v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0) & (self.readings[:, 1] == pulse))[0] # NOTE : discard data aquired in the first x ms - iab = self.readings[v, 3]/1000. - vmn = np.abs(self.readings[v, 4]/1000. * self.readings[v, 2]) + self._vab_pulses(vab_list[k], sampling_rate=self.rx.sampling_rate, + durations=[pulse_duration, pulse_duration], polarities=polarities) + for pulse in range(len(polarities)): + v = np.where((self.readings[:, 0] > delay) & (self.readings[:, 2] != 0) & ( + self.readings[:, 1] == pulse))[0] # NOTE : discard data acquired in the first x ms + iab = self.readings[v, 3] / 1000. + vmn = np.abs(self.readings[v, 4] / 1000. * self.readings[v, 2]) new_vab = self._find_vab(vab_list[k], iab, vmn, p_max, vab_max, iab_max, vmn_max, vmn_min) diff_vab = np.abs(new_vab - vab_list[k]) vabs.append(new_vab) @@ -578,17 +621,14 @@ class OhmPiHardware: if k > n_steps: self.exec_logger.debug('Compute_vab stopped on maximum number of steps reached') vab_opt = vab_list[k] - # print(f'Selected Vab: {vab_opt:.2f}') - # if switch_pwr_off: - # self.tx.pwr.pwr_state = 'off' return vab_opt def discharge_pwr(self): if self.tx.pwr.voltage_adjustable: - #TODO: implement strategy to discharge pwr based on hardware version => variable in TX should tell if it can discharge the pwr or not + # TODO: implement strategy to discharge pwr based on hardware version => variable in TX should tell if it can discharge the pwr or not ### if discharge relay manually add on mb_2024_0_2, then should not activate AB relays but simply wait for automatic discharge - ### if mb_20240_1_X then TX should handle the pwr discharge + ### if mb_2024_1_X then TX should handle the pwr discharge # time.sleep(1.0) self.tx.discharge_pwr() @@ -604,7 +644,7 @@ class OhmPiHardware: fig, ax = plt.subplots(nrows=5, sharex=True) ax[0].plot(self.readings[:, 0], self.readings[:, 3], '-r', marker='.', label='iab') ax[0].set_ylabel('Iab [mA]') - ax[1].plot(self.readings[:, 0], self.readings[:, 4] - self.sp , '-b', marker='.', label='vmn') + ax[1].plot(self.readings[:, 0], self.readings[:, 4] - self.sp, '-b', marker='.', label='vmn') ax[1].set_ylabel('Vmn [mV]') ax[2].plot(self.readings[:, 0], self.readings[:, 2], '-g', marker='.', label='polarity') ax[2].set_ylabel('polarity [-]') @@ -628,7 +668,7 @@ class OhmPiHardware: def calibrate_rx_bias(self): self.rx.bias += (np.mean(self.readings[self.readings[:, 2] == 1, 4]) - + np.mean(self.readings[self.readings[:, 2] == -1, 4])) / 2. + + np.mean(self.readings[self.readings[:, 2] == -1, 4])) / 2. def vab_square_wave(self, vab, cycle_duration, sampling_rate=None, cycles=3, polarity=1, duty_cycle=1., append=False): @@ -658,18 +698,18 @@ class OhmPiHardware: self.pwr_state = 'on' switch_tx_pwr_off = True - #Switches on measuring LED + # Switches on measuring LED if self.tx.measuring == 'off': self.tx.measuring = 'on' if self.tx.pwr.pwr_state == 'off': - self.tx.pwr.pwr_state = 'on' - switch_pwr_off = True + self.tx.pwr.pwr_state = 'on' + switch_pwr_off = True self._gain_auto(vab=vab) assert 0. <= duty_cycle <= 1. if duty_cycle < 1.: - durations = [cycle_duration/2 * duty_cycle, cycle_duration/2 * (1.-duty_cycle)] * 2 * cycles + durations = [cycle_duration / 2 * duty_cycle, cycle_duration / 2 * (1. - duty_cycle)] * 2 * cycles pol = [-int(polarity * np.heaviside(i % 2, -1.)) for i in range(2 * cycles)] # pol = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in range(2 * cycles)] polarities = [0] * (len(pol) * 2) @@ -677,7 +717,7 @@ class OhmPiHardware: else: durations = [cycle_duration / 2] * 2 * cycles polarities = [-int(polarity * np.heaviside(i % 2, -1.)) for i in range(2 * cycles)] - self._vab_pulses(vab, durations, sampling_rate, polarities=polarities, append=append) + self._vab_pulses(vab, durations, sampling_rate, polarities=polarities, append=append) self.exec_logger.event(f'OhmPiHardware\tvab_square_wave\tend\t{datetime.datetime.utcnow()}') if switch_pwr_off: self.tx.pwr.pwr_state = 'off' @@ -689,7 +729,7 @@ class OhmPiHardware: def _vab_pulse(self, vab=None, duration=1., sampling_rate=None, polarity=1, append=False): """ Gets VMN and IAB from a single voltage pulse """ - #self.tx.polarity = polarity + # self.tx.polarity = polarity if sampling_rate is None: sampling_rate = self.sampling_rate if self.tx.pwr.voltage_adjustable: @@ -712,7 +752,7 @@ class OhmPiHardware: injection.start() readings.join() injection.join() - self.tx.polarity = 0 #TODO: is this necessary? + self.tx.polarity = 0 # TODO: is this necessary? if switch_pwr_off: self.tx.pwr.pwr_state = 'off' @@ -740,7 +780,8 @@ class OhmPiHardware: if polarities is not None: assert len(polarities) == n_pulses else: - polarities = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in range(n_pulses)] #TODO: this doesn't work if tx.polarity=0 which is the case at init... + polarities = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in + range(n_pulses)] # TODO: this doesn't work if tx.polarity=0 which is the case at init... if not append: self._clear_values() self.sp = None # re-initialise SP before new Vab_pulses @@ -751,7 +792,7 @@ class OhmPiHardware: self.tx.pwr.pwr_state = 'off' if switch_tx_pwr_off: self.pwr_state = 'off' - + def switch_mux(self, electrodes, roles=None, state='off', **kwargs): """Switches on multiplexer relays for given quadrupole. @@ -784,13 +825,14 @@ class OhmPiHardware: status = False if status: mux_workers = list(set(mux_workers)) - b = Barrier(len(mux_workers)+1) + b = Barrier(len(mux_workers) + 1) self.mux_barrier = b for idx, mux in enumerate(mux_workers): # Create a new thread to perform some work self.mux_boards[mux].barrier = b kwargs.update({'elec_dict': elec_dict, 'state': state}) - mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs=kwargs) # TODO: handle minimum delay between two relays activation (to avoid lagging during test_mux at high speed) + mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, + kwargs=kwargs) # TODO: handle minimum delay between two relays activation (to avoid lagging during test_mux at high speed) mux_workers[idx].start() try: self.mux_barrier.wait() @@ -806,7 +848,7 @@ class OhmPiHardware: self.exec_logger.event(f'OhmPiHardware\tswitch_mux\tend\t{datetime.datetime.utcnow()}') return status - def test_mux(self, channel=None, activation_time=1.0): #TODO: add test in reverse order on each mux board + def test_mux(self, channel=None, activation_time=1.0): # TODO: add test in reverse order on each mux board """Interactive method to test the multiplexer. Parameters