Newer
Older
Olivier Kaufmann
committed
import importlib
import time
Olivier Kaufmann
committed
import numpy as np
from OhmPi.logging_setup import create_stdout_logger
from OhmPi.config import HARDWARE_CONFIG
from threading import Thread, Event
controller_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["controller"]["model"]}')
tx_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["rx"]["model"]}')
mux_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["mux"]["model"]}')
Olivier Kaufmann
committed
TX_CONFIG = tx_module.TX_CONFIG
RX_CONFIG = rx_module.RX_CONFIG
Olivier Kaufmann
committed
MUX_CONFIG = mux_module.MUX_CONFIG
Olivier Kaufmann
committed
Olivier Kaufmann
committed
current_max = np.min([TX_CONFIG['current_max'], MUX_CONFIG['current_max']])
voltage_max = np.min([TX_CONFIG['voltage_max'], MUX_CONFIG['voltage_max']])
voltage_min = RX_CONFIG['voltage_min']
def elapsed_seconds(start_time):
lap = datetime.datetime.utcnow() - start_time
return lap.total_seconds()
Olivier Kaufmann
committed
class OhmPiHardware:
Olivier Kaufmann
committed
def __init__(self, **kwargs):
self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec_hw')
self.data_logger = kwargs.pop('exec_logger', None)
if self.data_logger is None:
self.data_logger = create_stdout_logger('data_hw')
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_hw')
self.tx_sync = Event()
Olivier Kaufmann
committed
self.controller = kwargs.pop('controller',
controller_module.Controller(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger= self.soh_logger))
self.rx = kwargs.pop('rx', rx_module.Rx(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
self.tx = kwargs.pop('tx', tx_module.Tx(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
self.mux = kwargs.pop('mux', mux_module.Mux(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
Olivier Kaufmann
committed
self.readings = np.array([])
self.readings_window = (0.3, 1.0)
def _clear_values(self):
self.readings = np.array([])
def _inject(self, duration):
self.tx_sync.set()
self.tx_sync.clear()
Olivier Kaufmann
committed
def _read_values(self, sampling_rate, append=False): # noqa
if not append:
self._clear_values()
_readings = []
else:
_readings = self.readings.tolist()
self.tx_sync.wait()
start_time = datetime.datetime.utcnow()
while self.tx_sync.is_set():
lap = datetime.datetime.utcnow()
Olivier Kaufmann
committed
_readings.append([elapsed_seconds(start_time), self.tx.current, self.rx.voltage, self.tx.polarity])
sample+=1
sleep_time = start_time + datetime.timedelta(seconds = sample * sampling_rate / 1000) - lap
time.sleep(np.min([0, np.abs(sleep_time.total_seconds())]))
self.readings = np.array(_readings)
Olivier Kaufmann
committed
def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5,
vab_max=voltage_max, vmn_min=voltage_min):
Olivier Kaufmann
committed
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
A constant c = vmn/iab is computed (only depends on geometric
factor and ground resistivity, that doesn't change during a
quadrupole). Then depending on the strategy, we compute which
vab to inject to reach the minimum/maximum Iab current or
min/max Vmn.
This function also compute the polarity on Vmn (on which pin
of the ADS1115 we need to measure Vmn to get the positive value).
Parameters
----------
best_tx_injtime : float, optional
Time in milliseconds for the half-cycle used to compute Rab.
strategy : str, optional
Either:
Olivier Kaufmann
committed
- vmax : compute Vab to reach a maximum Iab without exceeding vab_max
- vmin : compute Vab to reach at least vmn_min
Olivier Kaufmann
committed
- constant : apply given Vab
tx_volt : float, optional
Voltage to apply for guessing the best voltage. 5 V applied
by default. If strategy "constant" is chosen, constant voltage
to applied is "tx_volt".
Olivier Kaufmann
committed
vab_max : float, optional
Maximum injection voltage to apply to tx (used by all strategies)
vmn_min : float, optional
Minimum voltage target for rx (used by vmin strategy)
Olivier Kaufmann
committed
Returns
-------
vab : float
Proposed Vab according to the given strategy.
Olivier Kaufmann
committed
polarity:
Polarity of VMN relative to polarity of VAB
rab : float
Resistance between injection electrodes
Olivier Kaufmann
committed
"""
Olivier Kaufmann
committed
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
Olivier Kaufmann
committed
self.tx.polarity = 1
self.tx.turn_on()
if self.rx.sampling_rate*1000 > best_tx_injtime:
sampling_rate = best_tx_injtime
else:
sampling_rate = self.tx.sampling_rate
self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate)
vmn = np.mean(self.readings[:,2])
iab = np.mean(self.readings[:,1])
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
Olivier Kaufmann
committed
if strategy == 'vmax':
Olivier Kaufmann
committed
# implement different strategies
Olivier Kaufmann
committed
if vab < vab_max and iab < current_max :
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
Olivier Kaufmann
committed
elif strategy == 'vmin':
if vab <= vab_max and iab < current_max:
Olivier Kaufmann
committed
vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
elif strategy != 'constant':
self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
else:
self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
Olivier Kaufmann
committed
self.tx.turn_off()
self.tx.polarity = 0
Olivier Kaufmann
committed
rab = (np.abs(vab) * 1000.) / iab
Olivier Kaufmann
committed
self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
Olivier Kaufmann
committed
if vmn < 0:
polarity = -1 # TODO: check if we really need to return polarity
else:
polarity = 1
Olivier Kaufmann
committed
return vab, polarity, rab
def vab_square_wave(self, vab, cycle_length, sampling_rate, cycles=3, polarity=1, append=False):
lengths = [cycle_length/2]*2*cycles # TODO: delete me
print(f'vab_square_wave lengths: {lengths}')
self._vab_pulses(vab, lengths, sampling_rate, append)
Olivier Kaufmann
committed
def _vab_pulse(self, vab, length, sampling_rate=None, polarity=None, append=False):
""" Gets VMN and IAB from a single voltage pulse
"""
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarity is not None and polarity != self.tx.polarity:
self.tx.polarity = polarity
self.tx.voltage = vab
injection = Thread(target=self._inject, kwargs={'duration':length})
readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append})
# set gains automatically
self.tx.adc_gain_auto()
self.rx.adc_gain_auto()
readings.start()
injection.start()
readings.join()
injection.join()
def _vab_pulses(self, vab, lengths, sampling_rate, polarities=None, append=False):
Olivier Kaufmann
committed
n_pulses = len(lengths)
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarities is not None:
assert len(polarities)==n_pulses
else:
polarities = [-self.tx.polarity * np.heaviside(i % 2, -1.) for i in range(n_pulses)]
if not append:
self._clear_values()
print(f'Polarities: {polarities}, sampling_rate: {sampling_rate}') # TODO: delete me
Olivier Kaufmann
committed
for i in range(n_pulses):
self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True)