-
Olivier Kaufmann authored4e9c3353
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import importlib
import datetime
import time
import numpy as np
from OhmPi.logging_setup import create_stdout_logger
from OhmPi.config import HARDWARE_CONFIG
from threading import Thread, Event
controller_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["controller"]["model"]}')
tx_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["rx"]["model"]}')
mux_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["mux"]["model"]}')
TX_CONFIG = tx_module.TX_CONFIG
RX_CONFIG = rx_module.RX_CONFIG
MUX_CONFIG = mux_module.MUX_CONFIG
current_max = np.min([TX_CONFIG['current_max'], MUX_CONFIG['current_max']])
voltage_max = np.min([TX_CONFIG['voltage_max'], MUX_CONFIG['voltage_max']])
voltage_min = RX_CONFIG['voltage_min']
def elapsed_seconds(start_time):
lap = datetime.datetime.utcnow() - start_time
return lap.seconds + 0.001 * (lap.microseconds//1000)
class OhmPiHardware:
def __init__(self, **kwargs):
self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec')
self.data_logger = kwargs.pop('exec_logger', None)
if self.data_logger is None:
self.data_logger = create_stdout_logger('data')
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh')
self.tx_sync = Event()
self.controller = kwargs.pop('controller',
controller_module.Controller(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger= self.soh_logger))
self.rx = kwargs.pop('rx', rx_module.Rx(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
self.tx = kwargs.pop('tx', tx_module.Tx(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
self.mux = kwargs.pop('mux', mux_module.Mux(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
def _vab_pulse(self, vab, length, sampling_rate=None, polarity=None):
""" Gets VMN and IAB from a single voltage pulse
"""
def inject(self, duration):
self.tx_sync.set()
self.tx.voltage_pulse(length=duration)
self.tx_sync.clear()
def read_values(self, data, sampling_rate): # noqa
_readings = []
self.tx_sync.wait()
start_time = datetime.datetime.utcnow()
while self.tx_sync.is_set():
lap = datetime.datetime.utcnow()
_readings.append([elapsed_seconds(start_time), self.tx.current, self.rx.voltage])
sleep_time = sampling_rate/1000.-elapsed_seconds(lap)
print(f'sleep_time')
time.sleep(np.min[sleep_time, np.abs(sleep_time)])
data = np.array(_readings)
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarity is not None and polarity != self.tx.polarity:
self.tx.polarity = polarity
self.tx.voltage = vab
data = None
injection = Thread(target=inject, args=[self], kwargs={'duration':length})
readings = Thread(target=read_values, args=[self, data], kwargs={'sampling_rate': sampling_rate})
# set gains automatically
self.tx.adc_gain_auto()
self.rx.adc_gain_auto()
readings.start()
injection.start()
readings.join()
injection.join()
print(data)
iab = self.tx.current # measure current
vmn = self.rx.voltage
return iab, vmn
def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5,
vab_max=voltage_max, vmn_min=voltage_min):
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
A constant c = vmn/iab is computed (only depends on geometric
factor and ground resistivity, that doesn't change during a
quadrupole). Then depending on the strategy, we compute which
vab to inject to reach the minimum/maximum Iab current or
min/max Vmn.
This function also compute the polarity on Vmn (on which pin
of the ADS1115 we need to measure Vmn to get the positive value).
Parameters
----------
best_tx_injtime : float, optional
Time in milliseconds for the half-cycle used to compute Rab.
strategy : str, optional
Either:
- vmax : compute Vab to reach a maximum Iab without exceeding vab_max
- vmin : compute Vab to reach at least vmn_min
- constant : apply given Vab
tx_volt : float, optional
Voltage to apply for guessing the best voltage. 5 V applied
by default. If strategy "constant" is chosen, constant voltage
to applied is "tx_volt".
vab_max : float, optional
Maximum injection voltage to apply to tx (used by all strategies)
vmn_min : float, optional
Minimum voltage target for rx (used by vmin strategy)
Returns
-------
vab : float
Proposed Vab according to the given strategy.
polarity:
Polarity of VMN relative to polarity of VAB
rab : float
Resistance between injection electrodes
"""
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
self.tx.polarity = 1
self.tx.turn_on()
vmn, iab = self._vab_pulse(vab=vab, length=best_tx_injtime)
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
if strategy == 'vmax':
# implement different strategies
if vab < vab_max and iab < current_max :
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
elif strategy == 'vmin':
if vab <= vab_max and iab < current_max:
vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
elif strategy != 'constant':
self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
else:
self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
self.tx.turn_off()
self.tx.polarity = 0
rab = (np.abs(vab) * 1000.) / iab
self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
if vmn < 0:
polarity = -1 # TODO: check if we really need to return polarity
else:
polarity = 1
return vab, polarity, rab