Newer
Older
Olivier Kaufmann
committed
import importlib
import time
Olivier Kaufmann
committed
import numpy as np
try:
import matplotlib.pyplot as plt
except Exception:
pass
from ohmpi.hardware_components.abstract_hardware_components import CtlAbstract
from ohmpi.logging_setup import create_stdout_logger
from ohmpi.utils import update_dict
from ohmpi.config import HARDWARE_CONFIG
from threading import Thread, Event, Barrier, BrokenBarrierError
# plt.switch_backend('agg') # for thread safe operations...
# Define the default controller, a distinct controller could be defined for each tx, rx or mux board
# when using a distinct controller, the specific controller definition must be included in the component configuration
ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["ctl"]["model"]}')
pwr_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["pwr"]["model"]}')
tx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["rx"]["model"]}')
MUX_DEFAULT = HARDWARE_CONFIG['mux']['default']
MUX_CONFIG = HARDWARE_CONFIG['mux']['boards']
for k, v in MUX_CONFIG.items():
MUX_CONFIG[k].update({'id': k})
for k2, v2 in MUX_DEFAULT.items():
MUX_CONFIG[k].update({k2: MUX_CONFIG[k].pop(k2, v2)})
TX_CONFIG = HARDWARE_CONFIG['tx']
for k, v in tx_module.SPECS['tx'].items():
try:
TX_CONFIG.update({k: TX_CONFIG.pop(k, v['default'])})
except:
print(f'Cannot set value {v} in TX_CONFIG[{k}]')
RX_CONFIG = HARDWARE_CONFIG['rx']
for k, v in rx_module.SPECS['rx'].items():
try:
RX_CONFIG.update({k: RX_CONFIG.pop(k, v['default'])})
except:
print(f'Cannot set value {v} in RX_CONFIG[{k}]')
Olivier Kaufmann
committed
current_max = np.min([TX_CONFIG['voltage_max']/50/TX_CONFIG['r_shunt'], # TODO: replace 50 by a TX config
np.min(np.hstack((np.inf, [MUX_CONFIG[i].pop('current_max', np.inf) for i in MUX_CONFIG.keys()])))])
voltage_max = np.min([TX_CONFIG['voltage_max'], np.min(np.hstack((np.inf, [MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))])
Olivier Kaufmann
committed
voltage_min = RX_CONFIG['voltage_min']
def elapsed_seconds(start_time):
lap = datetime.datetime.utcnow() - start_time
return lap.total_seconds()
Olivier Kaufmann
committed
class OhmPiHardware:
Olivier Kaufmann
committed
def __init__(self, **kwargs):
# OhmPiHardware initialization
self.exec_logger = kwargs.pop('exec_logger', None)
self.exec_logger.event(f'OhmPiHardware\tinit\tbegin\t{datetime.datetime.utcnow()}')
if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec_hw')
self.data_logger = kwargs.pop('exec_logger', None)
if self.data_logger is None:
self.data_logger = create_stdout_logger('data_hw')
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_hw')
self.tx_sync = Event()
# Main Controller initialization
Olivier Kaufmann
committed
HARDWARE_CONFIG['ctl'].pop('model')
HARDWARE_CONFIG['ctl'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
self.ctl = kwargs.pop('ctl', ctl_module.Ctl(**HARDWARE_CONFIG['ctl']))
# use controller as defined in kwargs if present otherwise use controller as defined in config.
if isinstance(self.ctl, dict):
ctl_mod = self.ctl.pop('model', self.ctl)
if isinstance(ctl_mod, str):
ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
self.ctl = ctl_mod.Ctl(**self.ctl)
Olivier Kaufmann
committed
# Initialize RX
HARDWARE_CONFIG['rx'].pop('model')
HARDWARE_CONFIG['rx'].update(**HARDWARE_CONFIG['rx'])
HARDWARE_CONFIG['rx'].update({'ctl': HARDWARE_CONFIG['rx'].pop('ctl', self.ctl)})
if isinstance(HARDWARE_CONFIG['rx']['ctl'], dict):
ctl_mod = HARDWARE_CONFIG['rx']['ctl'].pop('model', self.ctl)
if isinstance(ctl_mod, str):
ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
HARDWARE_CONFIG['rx']['ctl'] = ctl_mod.Ctl(**HARDWARE_CONFIG['rx']['ctl'])
HARDWARE_CONFIG['rx'].update({'connection':
HARDWARE_CONFIG['rx'].pop('connection',
HARDWARE_CONFIG['rx']['ctl'].interfaces[
HARDWARE_CONFIG['rx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['rx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
HARDWARE_CONFIG['tx'].pop('ctl', None)
self.rx = kwargs.pop('rx', rx_module.Rx(**HARDWARE_CONFIG['rx']))
# Initialize power source
HARDWARE_CONFIG['pwr'].update(**HARDWARE_CONFIG['pwr']) # NOTE: Explain why this is needed or delete me
HARDWARE_CONFIG['pwr'].update({'ctl': HARDWARE_CONFIG['pwr'].pop('ctl', self.ctl)})
if isinstance(HARDWARE_CONFIG['pwr']['ctl'], dict):
ctl_mod = HARDWARE_CONFIG['pwr']['ctl'].pop('model', self.ctl)
if isinstance(ctl_mod, str):
ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
HARDWARE_CONFIG['pwr']['ctl'] = ctl_mod.Ctl(**HARDWARE_CONFIG['pwr']['ctl'])
HARDWARE_CONFIG['pwr'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
self.pwr = kwargs.pop('pwr', pwr_module.Pwr(**HARDWARE_CONFIG['pwr']))
# Initialize TX
HARDWARE_CONFIG['tx'].pop('model')
HARDWARE_CONFIG['tx'].update(**HARDWARE_CONFIG['tx'])
HARDWARE_CONFIG['tx'].update({'tx_sync': self.tx_sync})
HARDWARE_CONFIG['tx'].update({'ctl': HARDWARE_CONFIG['tx'].pop('ctl', self.ctl)})
if isinstance(HARDWARE_CONFIG['tx']['ctl'], dict):
ctl_mod = HARDWARE_CONFIG['tx']['ctl'].pop('model', self.ctl)
if isinstance(ctl_mod, str):
ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
HARDWARE_CONFIG['tx']['ctl'] = ctl_mod.Ctl(**HARDWARE_CONFIG['tx']['ctl'])
HARDWARE_CONFIG['tx'].update({'connection': HARDWARE_CONFIG['tx'].pop('connection',
HARDWARE_CONFIG['rx']['ctl'].interfaces[
HARDWARE_CONFIG['tx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['tx'].pop('ctl', None)
HARDWARE_CONFIG['tx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
self.tx = kwargs.pop('tx', tx_module.Tx(**HARDWARE_CONFIG['tx']))
if isinstance(self.tx, dict):
self.tx = tx_module.Tx(**self.tx)
# Initialize Muxes
self._cabling = kwargs.pop('cabling', {})
for mux_id, mux_config in MUX_CONFIG.items():
mux_config.update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
mux_config.update(**MUX_CONFIG[mux_id])
mux_config.update({'ctl': mux_config.pop('ctl', self.ctl)})
mux_module = importlib.import_module(f'ohmpi.hardware_components.{mux_config["model"]}')
if isinstance(mux_config['ctl'], dict):
mux_ctl_module = importlib.import_module(f'ohmpi.hardware_components.{mux_config["ctl"]["model"]}')
mux_config['ctl'] = mux_ctl_module.Ctl(**mux_config['ctl']) # (**self.ctl)
assert issubclass(type(mux_config['ctl']), CtlAbstract)
mux_config.update({'connection': mux_config.pop('connection', mux_config['ctl'].interfaces[mux_config.pop('interface_name', 'i2c')])})
self.mux_boards[mux_id] = mux_module.Mux(**mux_config)
self.mux_barrier = Barrier(len(self.mux_boards) + 1)
self._cabling = {}
for mux_id, mux in self.mux_boards.items():
mux.barrier = self.mux_barrier
update_dict(self._cabling, {k: (mux_id, k[0])})
# Complete OhmPiHardware initialization
self.readings = np.array([]) # time series of acquired data
self._start_time = None # time of the beginning of a readings acquisition
self.exec_logger.event(f'OhmPiHardware\tinit\tend\t{datetime.datetime.utcnow()}')
Olivier Kaufmann
committed
def _clear_values(self):
self.readings = np.array([])
def _gain_auto(self, polarities=(1, -1)): # TODO: improve _gain_auto
self.exec_logger.event(f'OhmPiHardware\ttx_rx_gain_auto\tbegin\t{datetime.datetime.utcnow()}')
for pol in polarities:
self.tx.polarity = pol
injection = Thread(target=self._inject, kwargs={'injection_duration': 0.2, 'polarity': pol})
# readings = Thread(target=self._read_values)
get_tx_gain = Thread(target=self.tx.gain_auto)
get_rx_gain = Thread(target=self.rx.gain_auto)
get_tx_gain.start() # TODO: add a barrier to synchronize?
get_rx_gain.start()
get_tx_gain.join()
get_rx_gain.join()
# v = self.readings[:, 2] != 0
# current = max(current, np.mean(self.readings[v, 3]))
# voltage = max(voltage, np.abs(np.mean(self.readings[v, 2] * self.readings[v, 4])))
self.exec_logger.event(f'OhmPiHardware\ttx_rx_gain_auto\tend\t{datetime.datetime.utcnow()}')
def _inject(self, polarity=1, injection_duration=None): # TODO: deal with voltage or current pulse
self.exec_logger.event(f'OhmPiHardware\tinject\tbegin\t{datetime.datetime.utcnow()}')
self.tx.voltage_pulse(length=injection_duration, polarity=polarity)
self.exec_logger.event(f'OhmPiHardware\tinject\tend\t{datetime.datetime.utcnow()}')
self.mux_barrier = Barrier(len(self.mux_boards) + 1)
for mux in self.mux_boards:
mux.barrier = self.mux_barrier
@property
def pulses(self):
pulses = {}
for i in np.unique(self.readings[:, 1]):
r = self.readings[self.readings[:, 1] == i, :]
assert np.all(np.isclose(r[:, 2], r[0, 2])) # Polarity cannot change within a pulse
# TODO: check how to generalize in case of multi-channel RX
pulses.update({i: {'polarity': int(r[0, 2]), 'iab': r[:, 3], 'vmn': r[:, 4]}})
def _read_values(self, sampling_rate=None, append=False): # noqa
self.exec_logger.event(f'OhmPiHardware\tread_values\tbegin\t{datetime.datetime.utcnow()}')
Olivier Kaufmann
committed
if not append:
self._clear_values()
_readings = []
else:
_readings = self.readings.tolist()
if sampling_rate is None:
lap = datetime.datetime.utcnow() # just in case tx_sync is not set immediately after passing wait
if not append or self._start_time is None:
self._start_time = datetime.datetime.utcnow()
# TODO: Check if replacing the following two options by a reset_buffer method of TX would be OK
time.sleep(np.max([self.rx._latency, self.tx._latency])) # if continuous mode
# _ = self.rx.voltage # if not continuous mode
while self.tx_sync.is_set():
lap = datetime.datetime.utcnow()
r = [elapsed_seconds(self._start_time), self._pulse, self.tx.polarity, self.tx.current, self.rx.voltage]
if self.tx_sync.is_set():
sample += 1
_readings.append(r)
sleep_time = self._start_time + datetime.timedelta(seconds=sample / sampling_rate) - lap
if sleep_time.total_seconds() < 0.:
# TODO: count how many samples were skipped to make a stat that could be used to qualify pulses
sample += int(sampling_rate * np.abs(sleep_time.total_seconds())) + 1
sleep_time = self._start_time + datetime.timedelta(seconds=sample / sampling_rate) - lap
time.sleep(np.max([0., sleep_time.total_seconds()]))
self.exec_logger.debug(f'pulse {self._pulse}: elapsed time {(lap-self._start_time).total_seconds()} s')
self.exec_logger.debug(f'pulse {self._pulse}: total samples {len(_readings)}')
self.exec_logger.event(f'OhmPiHardware\tread_values\tend\t{datetime.datetime.utcnow()}')
v = self.readings[:, 2] != 0
if len(v) > 1:
# return np.mean(np.abs(self.readings[v, 4] - self.sp) / self.readings[v, 3])
return np.mean(self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3])
def last_dev(self):
if len(self.readings) > 1:
v = self.readings[:, 2] != 0 # exclude sample where there is no injection
return 100. * np.std(self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3]) / self.last_rho
def sp(self): # TODO: allow for different strategies for computing sp (i.e. when sp drift is not linear)
if self.readings.shape == (0,) or len(self.readings[self.readings[:, 2] == 1, :]) < 1 or \
len(self.readings[self.readings[:, 2] == -1, :]) < 1:
self.exec_logger.warning('Unable to compute sp: readings should at least contain one positive and one '
'negative pulse')
return 0.
else:
n_pulses = int(np.max(self.readings[:, 1]))
polarity = np.array([np.median(self.readings[self.readings[:, 1] == i, 2]) for i in range(n_pulses + 1)])
mean_vmn = []
mean_iab = []
for i in range(n_pulses + 1):
mean_vmn.append(np.mean(self.readings[self.readings[:, 1] == i, 4]))
mean_iab.append(np.mean(self.readings[self.readings[:, 1] == i, 3]))
mean_vmn = np.array(mean_vmn)
mean_iab = np.array(mean_iab)
sp = np.mean(mean_vmn[np.ix_(polarity == 1)] + mean_vmn[np.ix_(polarity == -1)]) / 2
def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
Olivier Kaufmann
committed
vab_max=voltage_max, vmn_min=voltage_min):
Olivier Kaufmann
committed
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
A constant c = vmn/iab is computed (only depends on geometric
factor and ground resistivity, that doesn't change during a
quadrupole). Then depending on the strategy, we compute which
vab to inject to reach the minimum/maximum Iab current or
min/max Vmn.
This function also compute the polarity on Vmn (on which pin
of the ADS1115 we need to measure Vmn to get the positive value).
Parameters
----------
pulse_duration : float, optional
Time in seconds for the pulse used to compute Rab.
Olivier Kaufmann
committed
strategy : str, optional
Either:
Olivier Kaufmann
committed
- vmax : compute Vab to reach a maximum Iab without exceeding vab_max
- vmin : compute Vab to reach at least vmn_min
Olivier Kaufmann
committed
- constant : apply given Vab
tx_volt : float, optional
Voltage to apply for guessing the best voltage. 5 V applied
by default. If strategy "constant" is chosen, constant voltage
to applied is "tx_volt".
Olivier Kaufmann
committed
vab_max : float, optional
Maximum injection voltage to apply to tx (used by all strategies)
vmn_min : float, optional
Minimum voltage target for rx (used by vmin strategy)
Olivier Kaufmann
committed
Returns
-------
vab : float
Proposed Vab according to the given strategy.
Olivier Kaufmann
committed
polarity:
Polarity of VMN relative to polarity of VAB
rab : float
Resistance between injection electrodes
Olivier Kaufmann
committed
"""
Olivier Kaufmann
committed
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
Olivier Kaufmann
committed
self.tx.turn_on()
if 1. / self.rx.sampling_rate > pulse_duration:
sampling_rate = 1. / pulse_duration # TODO: check this...
else:
sampling_rate = self.tx.sampling_rate
self._vab_pulse(vab=vab, duration=pulse_duration, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
vmn = np.mean(self.readings[:, 4])
iab = np.mean(self.readings[:, 3])
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
Olivier Kaufmann
committed
if strategy == 'vmax':
Olivier Kaufmann
committed
# implement different strategies
Olivier Kaufmann
committed
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
Olivier Kaufmann
committed
elif strategy == 'vmin':
if vab <= vab_max and iab < current_max:
Olivier Kaufmann
committed
vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
elif strategy != 'constant':
self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
else:
self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
Olivier Kaufmann
committed
self.tx.turn_off()
Olivier Kaufmann
committed
rab = (np.abs(vab) * 1000.) / iab
Olivier Kaufmann
committed
self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
Olivier Kaufmann
committed
if vmn < 0:
polarity = -1 # TODO: check if we really need to return polarity
else:
polarity = 1
Olivier Kaufmann
committed
return vab, polarity, rab
def _plot_readings(self, save_fig=False):
ax[0].plot(self.readings[:, 0], self.readings[:, 3], '-r', marker='.', label='iab')
ax[0].set_ylabel('Iab [mA]')
ax[1].plot(self.readings[:, 0], self.readings[:, 2] * (self.readings[:, 4] - self.sp) , '-b', marker='.', label='vmn')
ax[1].set_ylabel('Vmn [mV]')
ax[2].plot(self.readings[:, 0], self.readings[:, 2], '-g', marker='.', label='polarity')
ax[3].plot(self.readings[v, 0], (self.readings[v, 2] * (self.readings[v, 4] - self.sp)) / self.readings[v, 3],
'-m', marker='.', label='R [ohm]')
ax[4].plot(self.readings[v, 0], np.ones_like(self.readings[v,0]) * self.sp, '-k', marker='.', label='SP [mV]')
if save_fig:
fig.savefig(f'figures/test.png')
else:
plt.show()
self.rx._bias += (np.mean(self.readings[self.readings[:, 2] == 1, 4])
+ np.mean(self.readings[self.readings[:, 2] == -1, 4])) / 2.
def vab_square_wave(self, vab, cycle_duration, sampling_rate=None, cycles=3, polarity=1, duty_cycle=1.,
append=False):
self.exec_logger.event(f'OhmPiHardware\tvab_square_wave\tbegin\t{datetime.datetime.utcnow()}')
assert 0. <= duty_cycle <= 1.
if duty_cycle < 1.:
durations = [cycle_duration/2 * duty_cycle, cycle_duration/2*(1.-duty_cycle)] * 2 * cycles
pol = [-self.tx.polarity * np.heaviside(i % 2, -1.) for i in range(2 * cycles)]
polarities[0::2] = pol
else:
durations = [cycle_duration / 2] * 2 * cycles
polarities = None
self._vab_pulses(vab, durations, sampling_rate, polarities=polarities, append=append)
self.exec_logger.event(f'OhmPiHardware\tvab_square_wave\tend\t{datetime.datetime.utcnow()}')
Olivier Kaufmann
committed
def _vab_pulse(self, vab, duration, sampling_rate=None, polarity=1, append=False):
Olivier Kaufmann
committed
""" Gets VMN and IAB from a single voltage pulse
"""
Olivier Kaufmann
committed
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if self.tx.pwr.voltage_adjustable:
self.tx.pwr.voltage = vab
# reads current and voltage during the pulse
injection = Thread(target=self._inject, kwargs={'injection_duration': duration, 'polarity': polarity})
Olivier Kaufmann
committed
readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append})
readings.start()
injection.start()
readings.join()
injection.join()
Olivier Kaufmann
committed
def _vab_pulses(self, vab, durations, sampling_rate, polarities=None, append=False):
n_pulses = len(durations)
self.exec_logger.debug(f'n_pulses: {n_pulses}')
Olivier Kaufmann
committed
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarities is not None:
assert len(polarities) == n_pulses
Olivier Kaufmann
committed
else:
polarities = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in range(n_pulses)]
Olivier Kaufmann
committed
for i in range(n_pulses):
self._vab_pulse(self, duration=durations[i], sampling_rate=sampling_rate, polarity=polarities[i],
def switch_mux(self, electrodes, roles=None, state='off', **kwargs):
"""Switches on multiplexer relays for given quadrupole.
Parameters
----------
electrodes : list
List of integers representing the electrode ids.
roles : list, optional
List of roles of electrodes, optional
state : str, optional
Either 'on' or 'off'.
"""
self.exec_logger.event(f'OhmPiHardware\tswitch_mux\tbegin\t{datetime.datetime.utcnow()}')
if roles is None:
roles = ['A', 'B', 'M', 'N']
if len(electrodes) == len(roles):
# TODO: Check that we don't set incompatible roles to the same electrode
elec_dict = {i: [] for i in roles}
mux_workers = []
for idx, elec in enumerate(electrodes):
elec_dict[roles[idx]].append(elec)
try:
mux = self._cabling[(elec, roles[idx])][0]
if mux not in mux_workers:
mux_workers.append(mux)
except KeyError:
self.exec_logger.debug(f'Unable to switch {state} ({elec}, {roles[idx]})'
f': not in cabling and will be ignored...')
status = False
if status:
mux_workers = list(set(mux_workers))
b = Barrier(len(mux_workers)+1)
self.mux_barrier = b
for idx, mux in enumerate(mux_workers):
# Create a new thread to perform some work
self.mux_boards[mux].barrier = b
kwargs.update({'elec_dict': elec_dict, 'state': state})
mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs=kwargs)
try:
self.mux_barrier.wait()
for mux_worker in mux_workers:
mux_worker.join()
except BrokenBarrierError:
self.exec_logger.warning('Switching aborted')
status = False
else:
self.exec_logger.error(
f'Unable to switch {state} electrodes: number of electrodes and number of roles do not match!')
self.exec_logger.event(f'OhmPiHardware\tswitch_mux\tend\t{datetime.datetime.utcnow()}')
Olivier Kaufmann
committed
def test_mux(self, channel=None, activation_time=1.0):
"""Interactive method to test the multiplexer.
Parameters
----------
channel : tuple, optional
(electrode_nr, role) to test.
Olivier Kaufmann
committed
activation_time : float, optional
Time in seconds during which the relays are activated.
try:
electrodes = [int(channel[0])]
roles = [channel[1]]
except Exception as e:
self.exec_logger.error(f'Unable to parse channel: {e}')
return
Olivier Kaufmann
committed
time.sleep(activation_time)
self.exec_logger.info(f'Testing electrode {c[0]} with role {c[1]}.')
self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='on')
Olivier Kaufmann
committed
time.sleep(activation_time)
self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='off')
Olivier Kaufmann
committed
self.exec_logger.info('Test finished.')
def reset_mux(self):
"""Switches off all multiplexer relays.
"""
self.exec_logger.debug('Resetting all mux boards ...')
self.exec_logger.event(f'OhmPiHardware\treset_mux\tbegin\t{datetime.datetime.utcnow()}')
for mux_id, mux in self.mux_boards.items(): # noqa
self.exec_logger.debug(f'Resetting {mux_id}.')
self.exec_logger.event(f'OhmPiHardware\treset_mux\tend\t{datetime.datetime.utcnow()}')