Failed to fetch fork details. Try again later.
-
remi.clement authored64597f5f
Forked from
reversaal / OhmPi
Source project has a limited visibility.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# definition of hardware level functions
import numpy as np
import board # noqa
import busio # noqa
import adafruit_tca9548a # noqa
import adafruit_ads1x15.ads1115 as ads # noqa
from adafruit_ads1x15.analog_in import AnalogIn # noqa
from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa
from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa
import digitalio # noqa
from digitalio import Direction # noqa
from gpiozero import CPUTemperature # noqa
import minimalmodbus # noqa
import time
# global variable
i2c = busio.I2C(board.SCL, board.SDA)
from config import OHMPI_CONFIG
class Alimentation():
def __init__(self, address=0x20, tx_voltage=12):
self.mcp = MCP23017(i2c, address=address)
self.tx_voltage = tx_voltage
self.polarity = True
self.on = False
self.pinA = 0
self.pinB = 1
# setup DPS
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal)
self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc
self.DPS.serial.bytesize = 8 #
self.DPS.serial.timeout = 1 # greater than 0.5 for it to work
self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays)
# (last number) 0 is for mA, 3 is for A
def turn_on(self):
if self.on is False:
self.DPS.write_register(0x09, 1) # DPS5005 on
self.on = True
def turn_off(self):
self.DPS.write_register(0x09, 0) # DPS5005 off
self.on = False
def start_injection(self, polarity=True):
# injection courant and measure (TODO check if it works, otherwise back in run_measurement())
self.polarity = polarity
if self.polarity:
self.pin0 = self.mcp.get_pin(self.pinA)
self.pin0.direction = Direction.OUTPUT
self.pin0.value = True
self.pin1 = self.mcp.get_pin(self.pinB)
self.pin1.direction = Direction.OUTPUT
self.pin1.value = False
else:
self.pin0 = self.mcp.get_pin(self.pinA)
self.pin0.direction = Direction.OUTPUT
self.pin0.value = False
self.pin1 = self.mcp.get_pin(self.pinB)
self.pin1.direction = Direction.OUTPUT
self.pin1.value = True
def stop_injection(self):
self.pin0 = self.mcp.get_pin(self.pinA)
self.pin0.direction = Direction.OUTPUT
self.pin0.value = False
self.pin1 = self.mcp.get_pin(self.pinB)
self.pin1.direction = Direction.OUTPUT
self.pin1.value = False
def set_polarity(self, polarity=True):
self.polarity = polarity
def set_tx_voltage(self, tx_voltage=12):
if tx_voltage >= 0:
self.tx_voltage = tx_voltage
# set voltage for test
self.DPS.write_register(0x0000, tx_voltage, 2)
self.DPS.write_register(0x09, 1) # DPS5005 on
else:
raise ValueError('Voltage needs to be >= 0 V')
class ADS(): # analog to digital converter ADS1115
def __init__(self, address=0x48, gain=2/3, data_rate=820, mode=1):
self.ads = ads.ADS1115(i2c, gain=gain, data_rate=data_rate, address=address, mode=mode)
self.gain = gain
self.data_rate = data_rate
self.mode = mode
self.pins = {
0: self.ads.P0,
1: self.ads.P1,
2: self.ads.P2,
3: self.ads.P3,
}
self.vmin = 0.01 # volts
self.vmax = 4.5 # volts
def read_single(self, pin=0):
return AnalogIn(self.ads, self.pins[pin]).voltage
def read_diff(self, pins='01'):
if pins == '01':
return AnalogIn(self.ads, self.ads.P0, self.ads.P1).voltage
elif pins == '23':
return AnalogIn(self.ads, self.ads.P2, self.ads.P3).voltage
def set_gain(self, gain=2/3):
self.gain = gain
# TODO maybe there is already a set_gain() function in the library? check that
self.ads = ads.ADS1115(
i2c, gain=self.gain, data_rate=self.data_rate,
address=self.address, mode=self.mode)
def get_best_gain(self, channel=0):
"""Automatically sets the gain on a channel
Parameters
----------
channel : ads.ADS1x15
Instance of ADS where voltage is measured.
Returns
-------
gain : float
Gain to be applied on ADS1115.
"""
voltage = self.read_singl(channel)
gain = 2 / 3
if (abs(voltage) < 2.040) and (abs(voltage) >= 1.023):
gain = 2
elif (abs(voltage) < 1.023) and (abs(voltage) >= 0.508):
gain = 4
elif (abs(voltage) < 0.508) and (abs(voltage) >= 0.250):
gain = 8
elif abs(voltage) < 0.256:
gain = 16
#self.exec_logger.debug(f'Setting gain to {gain}')
return gain
def set_best_gain(self, channel=0):
gain = self.get_best_gain(channel)
self.set_gain(gain)
class Voltage(ADS): # for MN
def __init__(self):
super().__init__()
def read(self, pin=0):
return self.read_single(self, pin=pin)
def read_all(self, pins=[0, 2]):
return [self.read_single(pin) for pin in pins]
class Current(ADS): # for AB
def __init__(self, address=0x48, gain=2/3, data_rate=820, mode=1, r_shunt=OHMPI_CONFIG['R_shunt']):
super().__init__(address=address, gain=gain, data_rate=data_rate, mode=mode)
self.r_shunt = r_shunt
self.imin = self.vmin / (self.r_shunt * 50)
self.imax = self.vmax / (self.r_shunt * 50)
def read(self):
U = self.read_single(pin=0)
return U / 50 / self.r_shunt
class Multiplexer():
def __init__(self, addresses={
'A': 0x70,
'B': 0x71,
'M': 0x72,
'N': 0x73
},
nelec=64):
#OHMPI_CONFIG['board_addresses']
self.addresses = addresses
self.nelec = nelec # max number of electrodes per board
def switch_one(self, elec, role, state='off'):
self.tca = adafruit_tca9548a.TCA9548A(i2c, self.addresses[role])
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (elec - 1) // 16 # quotient without rest of the division
relay = (elec-1) - ((elec-1) // 16) * 16
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp = MCP23017(self.tca[i2c_address])
mcp.get_pin(relay - 1).direction = digitalio.Direction.OUTPUT
if state == 'on':
mcp.get_pin(relay - 1).value = True
else:
mcp.get_pin(relay - 1).value = False
#exec_logger.debug(f'Switching relay {relay} '
# f'({str(hex(self.addresses[role]))}) on:{on} for electrode {elec}')
else:
raise ValueError('No I2C address found for the electrode'
' {:d} on board {:s}'.format(elec, self.addresses[role]))
#exec_logger.warning(f'Unable to address electrode nr {elec}')
def switch(self, elecdic={}, state='on'):
"""Switch a given list of electrodes with different roles.
Electrodes with a value of 0 will be ignored.
Parameters
----------
elecdic : dictionary, optional
Dictionnary of the form: role: [list of electrodes].
state : str, optional
Either 'on' or 'off'.
"""
# check to prevent A == B (SHORT-CIRCUIT)
if 'A' in elecdic and 'B' in elecdic:
out = np.in1d(elecdic['A'], elecdic['B'])
if out.any():
raise ValueError('Some electrodes have A == B -> SHORT-CIRCUIT')
return
# check none of M and N are the same A or B
# as to prevent burning the MN part which cannot take
# the full voltage of the DPS
if 'A' in elecdic and 'B' in elecdic and 'M' in elecdic and 'N' in elecdic:
if (np.in1d(elecdic['M'], elecdic['A']).any()
or np.in1d(elecdic['M'], elecdic['B']).any()
or np.in1d(elecdic['N'], elecdic['A']).any()
or np.in1d(elecdic['N'], elecdic['B']).any()):
raise ValueError('Some electrodes M and N are on A and B -> cannot be with DPS')
return
# if all ok, then switch the electrodes
for role in elecdic:
for elec in elecdic[role]:
if elec > 0:
self.switch_one(elec, role, state)
def reset(self):
for role in self.addresses:
for elec in range(self.nelec):
self.switch_one(elec, role, 'off')
def test(self, role, activation_time=1):
"""Interactive method to test the multiplexer.
Parameters
----------
activation_time : float, optional
Time in seconds during which the relays are activated.
address : hex, optional
Address of the multiplexer board to test (e.g. 0x70, 0x71, ...).
"""
self.reset()
# ask use some details on how to proceed
a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!')
if a == '1':
print('run channel by channel test')
electrode = int(input('Choose your electrode number (integer):'))
electrodes = [electrode]
elif a == '2':
electrodes = range(1, 65)
else:
print('Wrong choice !')
return
# run the test
for elec in electrodes:
self.switch_one(elec, role, 'on')
print('electrode:', elec, ' activated...', end='', flush=True)
time.sleep(activation_time)
self.switch_one(elec, role, 'off')
print(' deactivated')
time.sleep(activation_time)
print('Test finished.')