Newer
Older
Olivier Kaufmann
committed
import importlib
import time
Olivier Kaufmann
committed
import numpy as np
from OhmPi.logging_setup import create_stdout_logger
from OhmPi.config import HARDWARE_CONFIG
from threading import Thread, Event, Barrier
controller_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["controller"]["model"]}')
tx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["rx"]["model"]}')
for mux_id, mux_config in HARDWARE_CONFIG['mux']['boards'].items():
mux_module = importlib.import_module(f'OhmPi.hardware_components.{mux_config["model"]}')
MUX_CONFIG[mux_id] = mux_module.MUX_CONFIG
MUX_CONFIG[mux_id].update(mux_config)
MUX_CONFIG[mux_id].update({'id': mux_id})
mux_boards.append(mux_id)
Olivier Kaufmann
committed
TX_CONFIG = tx_module.TX_CONFIG
RX_CONFIG = rx_module.RX_CONFIG
current_max = np.min([TX_CONFIG['current_max'], np.min([MUX_CONFIG[i].pop('current_max', np.inf) for i in mux_boards])])
voltage_max = np.min([TX_CONFIG['voltage_max'], np.min([MUX_CONFIG[i].pop('voltage_max', np.inf) for i in mux_boards])])
Olivier Kaufmann
committed
voltage_min = RX_CONFIG['voltage_min']
def elapsed_seconds(start_time):
lap = datetime.datetime.utcnow() - start_time
return lap.total_seconds()
Olivier Kaufmann
committed
class OhmPiHardware:
Olivier Kaufmann
committed
def __init__(self, **kwargs):
self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec_hw')
self.data_logger = kwargs.pop('exec_logger', None)
if self.data_logger is None:
self.data_logger = create_stdout_logger('data_hw')
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_hw')
self.tx_sync = Event()
Olivier Kaufmann
committed
self.controller = kwargs.pop('controller',
controller_module.Controller(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
self.rx = kwargs.pop('rx', rx_module.Rx(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger,
controller=self.controller))
self.tx = kwargs.pop('tx', tx_module.Tx(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger,
controller=self.controller))
self._cabling = kwargs.pop('cabling', {})
self.mux_boards = kwargs.pop('mux', {'mux_1': mux_module.Mux(id='mux_1',
exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger,
controller=self.controller,
cabling = self._cabling)})
self.mux_barrier = Barrier(len(self.mux_boards) + 1)
Olivier Kaufmann
committed
self._cabling={}
for _, mux in self.mux_boards.items():
mux.barrier = self.mux_barrier
update_dict(self._cabling, {k: (mux_id, k[0])})
self.readings = np.array([]) # time series of acquired data
self._start_time = None # time of the beginning of a readings acquisition
Olivier Kaufmann
committed
def _clear_values(self):
self.readings = np.array([])
def _inject(self, duration):
self.tx_sync.set()
self.tx_sync.clear()
self.mux_barrier = Barrier(len(self.mux_boards) + 1)
for mux in self.mux_boards:
mux.barrier = self.mux_barrier
@property
def pulses(self):
pulses = {}
for i in np.unique(self.readings[:,1]):
r = self.readings[self.readings[:, 1] == i, :]
assert np.all(np.isclose(r[:,2], r[0, 2])) # Polarity cannot change within a pulse
pulses.update({i: {'polarity': int(r[0, 2]), 'iab': r[:,3], 'vmn' : r[:,4]}}) # TODO: check how to generalize in case of multi-channel RX
return pulses
Olivier Kaufmann
committed
def _read_values(self, sampling_rate, append=False): # noqa
if not append:
self._clear_values()
_readings = []
else:
_readings = self.readings.tolist()
if not append or self._start_time is None:
self._start_time = datetime.datetime.utcnow()
while self.tx_sync.is_set():
lap = datetime.datetime.utcnow()
_readings.append([elapsed_seconds(self._start_time), self._pulse, self.tx.polarity, self.tx.current,
self.rx.voltage])
sleep_time = self._start_time + datetime.timedelta(seconds = sample * sampling_rate / 1000) - lap
time.sleep(np.min([0, np.abs(sleep_time.total_seconds())]))
if self.readings.shape == (0,) or len(self.readings[self.readings[:,2]==1, :]) < 1 or len(self.readings[self.readings[:,2]==-1, :]) < 1:
self.exec_logger.warning('Unable to compute sp: readings should at least contain one positive and one negative pulse')
return 0.
else:
n_pulses = int(np.max(self.readings[:, 1]))
polarity = np.array([np.mean(self.readings[self.readings[:, 1] == i, 2]) for i in range(n_pulses + 1)])
mean_vmn = []
mean_iab = []
for i in range(n_pulses + 1):
mean_vmn.append(np.mean(self.readings[self.readings[:, 1] == i, 4]))
mean_iab.append(np.mean(self.readings[self.readings[:, 1] == i, 3]))
mean_vmn = np.array(mean_vmn)
mean_iab = np.array(mean_iab)
sp = np.mean(mean_vmn[np.ix_(polarity==1)] - mean_vmn[np.ix_(polarity==-1)]) / 2
return sp
Olivier Kaufmann
committed
def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5,
vab_max=voltage_max, vmn_min=voltage_min):
Olivier Kaufmann
committed
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
A constant c = vmn/iab is computed (only depends on geometric
factor and ground resistivity, that doesn't change during a
quadrupole). Then depending on the strategy, we compute which
vab to inject to reach the minimum/maximum Iab current or
min/max Vmn.
This function also compute the polarity on Vmn (on which pin
of the ADS1115 we need to measure Vmn to get the positive value).
Parameters
----------
best_tx_injtime : float, optional
Time in milliseconds for the half-cycle used to compute Rab.
strategy : str, optional
Either:
Olivier Kaufmann
committed
- vmax : compute Vab to reach a maximum Iab without exceeding vab_max
- vmin : compute Vab to reach at least vmn_min
Olivier Kaufmann
committed
- constant : apply given Vab
tx_volt : float, optional
Voltage to apply for guessing the best voltage. 5 V applied
by default. If strategy "constant" is chosen, constant voltage
to applied is "tx_volt".
Olivier Kaufmann
committed
vab_max : float, optional
Maximum injection voltage to apply to tx (used by all strategies)
vmn_min : float, optional
Minimum voltage target for rx (used by vmin strategy)
Olivier Kaufmann
committed
Returns
-------
vab : float
Proposed Vab according to the given strategy.
Olivier Kaufmann
committed
polarity:
Polarity of VMN relative to polarity of VAB
rab : float
Resistance between injection electrodes
Olivier Kaufmann
committed
"""
Olivier Kaufmann
committed
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
Olivier Kaufmann
committed
self.tx.polarity = 1
self.tx.turn_on()
if self.rx.sampling_rate*1000 > best_tx_injtime:
sampling_rate = best_tx_injtime
else:
sampling_rate = self.tx.sampling_rate
self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate)
vmn = np.mean(self.readings[:,4])
iab = np.mean(self.readings[:,3])
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
Olivier Kaufmann
committed
if strategy == 'vmax':
Olivier Kaufmann
committed
# implement different strategies
Olivier Kaufmann
committed
if vab < vab_max and iab < current_max :
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
Olivier Kaufmann
committed
elif strategy == 'vmin':
if vab <= vab_max and iab < current_max:
Olivier Kaufmann
committed
vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
elif strategy != 'constant':
self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
else:
self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
Olivier Kaufmann
committed
self.tx.turn_off()
self.tx.polarity = 0
Olivier Kaufmann
committed
rab = (np.abs(vab) * 1000.) / iab
Olivier Kaufmann
committed
self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
Olivier Kaufmann
committed
if vmn < 0:
polarity = -1 # TODO: check if we really need to return polarity
else:
polarity = 1
Olivier Kaufmann
committed
return vab, polarity, rab
def vab_square_wave(self, vab, cycle_length, sampling_rate=None, cycles=3, polarity=1, append=False):
self._vab_pulses(vab, lengths, sampling_rate, append=append)
Olivier Kaufmann
committed
def _vab_pulse(self, vab, length, sampling_rate=None, polarity=None, append=False):
""" Gets VMN and IAB from a single voltage pulse
"""
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarity is not None and polarity != self.tx.polarity:
self.tx.polarity = polarity
if self.tx.voltage_adjustable:
self.tx.voltage = vab
else:
vab = self.tx.voltage
Olivier Kaufmann
committed
injection = Thread(target=self._inject, kwargs={'duration':length})
readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append})
# set gains automatically
self.tx.adc_gain_auto()
self.rx.adc_gain_auto()
readings.start()
injection.start()
readings.join()
injection.join()
def _vab_pulses(self, vab, lengths, sampling_rate, polarities=None, append=False):
Olivier Kaufmann
committed
n_pulses = len(lengths)
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarities is not None:
assert len(polarities)==n_pulses
else:
polarities = [-self.tx.polarity * np.heaviside(i % 2, -1.) for i in range(n_pulses)]
Olivier Kaufmann
committed
for i in range(n_pulses):
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True)
# _______________________________________________
def switch_dps(self, state='off'):
"""Switches DPS on or off.
Parameters
----------
state : str
'on', 'off'
"""
if state == 'on':
self.tx.turn_on()
else:
self.tx.turn_off()
if state != 'off':
self.exec_logger.warning(f'Unknown state {state} for DPS switching. switching off...')
def switch_mux(self, electrodes, roles=None, state='off'):
"""Switches on multiplexer relays for given quadrupole.
Parameters
----------
electrodes : list
List of integers representing the electrode ids.
roles : list, optional
List of roles of electrodes, optional
state : str, optional
Either 'on' or 'off'.
"""
if roles is None:
roles = ['A', 'B', 'M', 'N']
if len(electrodes) == len(roles):
# TODO: Check that we don't set incompatible roles to the same electrode
elec_dict = {i: [] for i in roles}
mux_workers = []
for idx, elec in enumerate(electrodes):
elec_dict[roles[idx]].append(elec)
try:
mux = self._cabling[(elec, roles[idx])][0]
if mux not in mux_workers:
mux_workers.append(mux)
except KeyError:
self.exec_logger.warning(f'({elec}, {roles[idx]}) is not in cabling. It will be ignored...')
mux_workers = list(set(mux_workers))
b = Barrier(len(mux_workers)+1)
self.mux_barrier = b
for idx, mux in enumerate(mux_workers):
# Create a new thread to perform some work
self.mux_boards[mux].barrier = b
mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs={'elec_dict': elec_dict})
mux_workers[idx].start()
self.mux_barrier.wait()
for mux_worker in mux_workers:
mux_worker.join()
else:
self.exec_logger.error(
'Unable to switch electrodes: number of electrodes and number of roles do not match!')
Olivier Kaufmann
committed
def test_mux(self, channel=None, activation_time=1.0):
"""Interactive method to test the multiplexer.
Parameters
----------
channel : tuple, optional
(electrode_nr, role) to test.
Olivier Kaufmann
committed
activation_time : float, optional
Time in seconds during which the relays are activated.
try:
electrodes = [int(channel[0])]
roles = [channel[1]]
except Exception as e:
self.exec_logger.error(f'Unable to parse channel: {e}')
return
Olivier Kaufmann
committed
self.switch_mux(electrodes,roles,state='on')
time.sleep(activation_time)
self.switch_mux(electrodes,roles, state='off')
self.exec_logger.info(f'Testing electrode {c[0]} with role {c[1]}.')
self.switch_mux(electrodes=[c[0]],roles=[c[1]],state='on')
Olivier Kaufmann
committed
time.sleep(activation_time)
self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='off')
Olivier Kaufmann
committed
self.exec_logger.info('Test finished.')
def reset_mux(self):
"""Switches off all multiplexer relays.
"""
self.exec_logger.debug('Resetting all mux boards ...')
for mux_id, mux in self.mux_boards.items():
self.exec_logger.debug(f'Resetting {mux_id}.')