Newer
Older
Olivier Kaufmann
committed
acquired_data = {
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
'R [ohm]': np.abs(np.random.randn(1))
}
# switch mux off
self.switch_mux_off(quad)
# add command_id in dataset
acquired_data.update({'cmd_id': cmd_id})
# log data to the data logger
Olivier Kaufmann
committed
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
# save data and print in a text file
self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}')
self.status = 'idle'
def run_sequence_async(self, cmd_id=None, **kwargs):
"""Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
Additional arguments are passed to run_measurement().
Parameters
----------
cmd_id:
"""
def func():
self.run_sequence(**kwargs)
self.thread = threading.Thread(target=func)
self.thread.start()
self.status = 'idle'
Olivier Kaufmann
committed
def rs_check(self, tx_volt=12, cmd_id=None):
Olivier Kaufmann
committed
"""Checks contact resistances"""
# we only check the electrodes which are in the sequence (not all might be connected)
quads = np.array([[1, 2, 1, 2]], dtype=np.uint32)
Olivier Kaufmann
committed
else:
elec = np.sort(np.unique(self.sequence.flatten())) # assumed order
quads = np.vstack([
elec[:-1],
elec[1:],
elec[:-1],
elec[1:],
]).T
if self.idps:
quads[:, 2:] = 0 # we don't open Vmn to prevent burning the MN part
# as it has a smaller range of accepted voltage
export_path_rs = self.settings['export_path'].replace('.csv', '') \
+ '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '_rs.csv'

Guillaume Blanchy
committed
Olivier Kaufmann
committed
# self.run = True
if self.on_pi:
# make sure all mux are off to start with
self.reset_mux()
# measure all quad of the RS sequence
for i in range(0, quads.shape[0]):
quad = quads[i, :] # quadrupole
self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c)
d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=0.2, tx_volt=tx_volt, autogain=False)
Olivier Kaufmann
committed
voltage = tx_volt * 1000. # imposed voltage on dps5005
else:
voltage = d['Vmn [mV]']
current = d['I [mA]']
# compute resistance measured (= contact resistance)
Olivier Kaufmann
committed
resist = abs(voltage / current) / 1000.
# print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format(
msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \
f'V: {voltage :>10.3f} mV, ' \
f'R: {resist :>10.3f} kOhm'
self.exec_logger.debug(msg)
# if contact resistance = 0 -> we have a short circuit!!
Olivier Kaufmann
committed
msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm'
self.exec_logger.warning(msg)
Olivier Kaufmann
committed
# save data in a text file
self.append_and_save(export_path_rs, {
'A': quad[0],
'B': quad[1],
'RS [kOhm]': resist,
})
# close mux path and put pin back to GND
self.switch_mux_off(quad)
else:
pass
self.status = 'idle'
Olivier Kaufmann
committed
#
# # TODO if interrupted, we would need to restore the values
# # TODO or we offer the possibility in 'run_measurement' to have rs_check each time?
Olivier Kaufmann
committed
def set_sequence(self, sequence=None, cmd_id=None):
self.sequence = np.array(sequence).astype(int)
# self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32')
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False
def stop(self, **kwargs):
warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning)
self.interrupt(**kwargs)
def _switch_mux(self, electrode_nr, state, role):
"""Selects the right channel for the multiplexer cascade for a given electrode.
Olivier Kaufmann
committed
Parameters
----------
electrode_nr : int
Electrode index to be switched on or off.
state : str
Either 'on' or 'off'.
role : str
Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board.
"""
if not self.use_mux or not self.on_pi:
if not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
Olivier Kaufmann
committed
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
elif self.sequence is None:
self.exec_logger.warning('Unable to switch MUX without a sequence')
else:
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
Olivier Kaufmann
committed
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1
Olivier Kaufmann
committed
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT
Olivier Kaufmann
committed
if state == 'on':
mcp2.get_pin(relay_nr - 1).value = True
else:
mcp2.get_pin(relay_nr - 1).value = False
Olivier Kaufmann
committed
self.exec_logger.debug(f'Switching relay {relay_nr} '
f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}')
else:
self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
Olivier Kaufmann
committed
Olivier Kaufmann
committed
def switch_mux_on(self, quadrupole, cmd_id=None):
Olivier Kaufmann
committed
"""Switches on multiplexer relays for given quadrupole.
Olivier Kaufmann
committed
Olivier Kaufmann
committed
Parameters
----------
Olivier Kaufmann
committed
quadrupole : list of 4 int
List of 4 integers representing the electrode numbers.
Olivier Kaufmann
committed
roles = ['A', 'B', 'M', 'N']
# another check to be sure A != B
if quadrupole[0] != quadrupole[1]:
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'on', roles[i])
else:
self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!')
Olivier Kaufmann
committed
def switch_mux_off(self, quadrupole, cmd_id=None):
Olivier Kaufmann
committed
"""Switches off multiplexer relays for given quadrupole.
Olivier Kaufmann
committed
quadrupole : list of 4 int
List of 4 integers representing the electrode numbers.
Olivier Kaufmann
committed
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'off', roles[i])
Olivier Kaufmann
committed

Guillaume Blanchy
committed
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
def test_mux(self, activation_time=1.0, address=0x70):
"""Interactive method to test the multiplexer.
Parameters
----------
activation_time : float, optional
Time in seconds during which the relays are activated.
address : hex, optional
Address of the multiplexer board to test (e.g. 0x70, 0x71, ...).
"""
self.use_mux = True
self.reset_mux()
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, address)
# ask use some details on how to proceed
a = input(' if vous want try 1 channel choose 1, if you want try all channel choose 2!')
if a == '1':
print("run channel by channel test")
electrode = int(input('Choose your electrode number (integer):'))
electrodes = [electrode]
elif a == '2':
electrodes = range(1, 65)
else:
print ("Wrong choice !")
return
# run the test
for electrode_nr in electrodes:
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT
# activate relay for given time
mcp2.get_pin(relay_nr - 1).value = True
print('electrode:', electrode_nr, ' activated...', end='', flush=True)
time.sleep(activation_time)
mcp2.get_pin(relay_nr - 1).value = False
print(' deactivated' )
time.sleep(activation_time)
print('Test finished.')
Olivier Kaufmann
committed
def reset_mux(self, cmd_id=None):
Olivier Kaufmann
committed
"""Switches off all multiplexer relays."""
if self.on_pi and self.use_mux:
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
for j in range(1, self.max_elec + 1):
self._switch_mux(j, 'off', roles[i])
self.exec_logger.debug('All MUX switched off.')
elif not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
else:
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
Olivier Kaufmann
committed
def _update_acquisition_settings(self, config):
warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning)
def update_settings(self, settings: str, cmd_id=None):
Olivier Kaufmann
committed
"""Updates acquisition settings from a json file or dictionary.
Parameters can be:
- nb_electrodes (number of electrode used, if 4, no MUX needed)
- injection_duration (in seconds)
- nb_meas (total number of times the sequence will be run)
- sequence_delay (delay in second between each sequence run)
- nb_stack (number of stack for each quadrupole measurement)
- export_path (path where to export the data, timestamp will be added to filename)
Olivier Kaufmann
committed
Parameters
----------
Olivier Kaufmann
committed
Path to the .json settings file or dictionary of settings.
"""
status = False
Olivier Kaufmann
committed
try:
if isinstance(settings, dict):
self.settings.update(settings)
Olivier Kaufmann
committed
else:
Olivier Kaufmann
committed
dic = json.load(json_file)
self.settings.update(dic)
self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings))
status = True
Olivier Kaufmann
committed
self.exec_logger.warning('Unable to update settings.')
status = False
Olivier Kaufmann
committed
else:
Olivier Kaufmann
committed
self.exec_logger.warning('Settings are missing...')
return status
Olivier Kaufmann
committed
# Properties
@property
def sequence(self):
"""Gets sequence"""
if self._sequence is not None:
assert isinstance(self._sequence, np.ndarray)
return self._sequence
Olivier Kaufmann
committed
Olivier Kaufmann
committed
@sequence.setter
def sequence(self, sequence):
"""Sets sequence"""
if sequence is not None:
assert isinstance(sequence, np.ndarray)
self.use_mux = True
else:
self.use_mux = False
self._sequence = sequence
Olivier Kaufmann
committed
print(colored(r' ________________________________' + '\n' +
r'| _ | | | || \/ || ___ \_ _|' + '\n' +
r'| | | | |_| || . . || |_/ / | |' + '\n' +
r'| | | | _ || |\/| || __/ | |' + '\n' +
r'\ \_/ / | | || | | || | _| |_' + '\n' +
r' \___/\_| |_/\_| |_/\_| \___/ ', 'red'))
print('Version:', VERSION)
Olivier Kaufmann
committed
platform, on_pi = get_platform()
Olivier Kaufmann
committed
print(colored(f'\u2611 Running on {platform} platform', 'green'))
Olivier Kaufmann
committed
# TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...)
# and emit a warning otherwise
if not arm64_imports:
print(colored(f'Warning: Required packages are missing.\n'
f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow'))
Olivier Kaufmann
committed
print(colored(f'\u26A0 Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))
Olivier Kaufmann
committed
print(f'local date and time : {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
# for testing
if __name__ == "__main__":
ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
Olivier Kaufmann
committed
if ohmpi.controller is not None:
ohmpi.controller.loop_forever()