Newer
Older
else: # for testing, generate random data
d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3],
'R [ohm]': np.abs(np.random.randn(1)).tolist()}
dd.update({'A': str(dd['A'])})
dd.update({'B': str(dd['B'])})
dd.update({'M': str(dd['M'])})
dd.update({'N': str(dd['N'])})
# round float to 2 decimal
for key in dd.keys():
if isinstance(dd[key], float):
dd[key] = np.round(dd[key], 3)
dd['cmd_id'] = str(cmd_id)
self.data_logger.info(dd)
self.pin2.value = False # DSP + off
self.pin3.value = False # DSP - off
def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs):
"""Runs multiple sequences in a separate thread for monitoring mode.
Can be stopped by 'OhmPi.interrupt()'.
Additional arguments are passed to run_measurement().
Parameters
----------
cmd_id : str, optional
Unique command identifier
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
sequence_delay : int, optional
Number of seconds at which the sequence must be started from each others.
nb_meas : int, optional
Number of time the sequence must be repeated.
kwargs : dict, optional
See help(k.run_measurement) for more info.
"""
# self.run = True
if sequence_delay is None:
sequence_delay = self.settings['sequence_delay']
sequence_delay = int(sequence_delay)
if nb_meas is None:
nb_meas = self.settings['nb_meas']
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
def func():
for g in range(0, nb_meas): # for time-lapse monitoring
if self.status == 'stopping':
self.exec_logger.warning('Data acquisition interrupted')
break
t0 = time.time()
self.run_sequence(**kwargs)
# sleeping time between sequence
dt = sequence_delay - (time.time() - t0)
if dt < 0:
dt = 0
if nb_meas > 1:
time.sleep(dt) # waiting for next measurement (time-lapse)
self.status = 'idle'
self.thread = threading.Thread(target=func)
self.thread.start()
Olivier Kaufmann
committed
def run_sequence(self, cmd_id=None, **kwargs):
"""Runs sequence synchronously (=blocking on main thread).
Additional arguments are passed to run_measurement().
Parameters
----------
cmd_id : str, optional
Unique command identifier
Olivier Kaufmann
committed
"""
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
t0 = time.time()
self.pin2 = self.mcp.get_pin(2) # dsp -
self.pin2.direction = Direction.OUTPUT
self.pin2.value = True
self.pin3 = self.mcp.get_pin(3) # dsp -
self.pin3.direction = Direction.OUTPUT
self.pin3.value = True
Olivier Kaufmann
committed
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
# create filename with timestamp
filename = self.settings["export_path"].replace('.csv',
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
self.exec_logger.debug(f'Saving to {filename}')
# make sure all multiplexer are off
self.reset_mux()
# measure all quadrupole of the sequence
if self.sequence is None:
n = 1
else:
n = self.sequence.shape[0]
for i in range(0, n):
if self.sequence is None:
quad = np.array([0, 0, 0, 0])
else:
quad = self.sequence[i, :] # quadrupole
if self.status == 'stopping':
break
# call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad)
# run a measurement
if self.on_pi:
acquired_data = self.run_measurement(quad, **kwargs)
else: # for testing, generate random data
sum_vmn = np.random.rand(1)[0] * 1000.
sum_i = np.random.rand(1)[0] * 100.
cmd_id = np.random.randint(1000)
Olivier Kaufmann
committed
acquired_data = {
"time": datetime.now().isoformat(),
"A": quad[0],
"B": quad[1],
"M": quad[2],
"N": quad[3],
"inj time [ms]": self.settings['injection_duration'] * 1000.,
"Vmn [mV]": sum_vmn,
"I [mA]": sum_i,
"R [ohm]": sum_vmn / sum_i,
"Ps [mV]": np.random.randn(1)[0] * 100.,
"nbStack": self.settings['nb_stack'],
"Tx [V]": np.random.randn(1)[0] * 5.,
"CPU temp [degC]": np.random.randn(1)[0] * 50.,
"Nb samples [-]": self.nb_samples,
Olivier Kaufmann
committed
}
self.data_logger.info(acquired_data)
Olivier Kaufmann
committed
# switch mux off
self.switch_mux_off(quad)
# add command_id in dataset
acquired_data.update({'cmd_id': cmd_id})
# log data to the data logger
Olivier Kaufmann
committed
# save data and print in a text file
self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}')
Olivier Kaufmann
committed
self.status = 'idle'
def run_sequence_async(self, cmd_id=None, **kwargs):
"""Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
Additional arguments are passed to run_measurement().
Parameters
----------
cmd_id : str, optional
Unique command identifier
Olivier Kaufmann
committed
"""
def func():
self.run_sequence(**kwargs)
self.thread = threading.Thread(target=func)
self.thread.start()
self.status = 'idle'
def rs_check(self, tx_volt=12., cmd_id=None):
"""Checks contact resistances
Parameters
----------
tx_volt : float
Voltage of the injection
cmd_id : str, optional
Unique command identifier
"""
# we only check the electrodes which are in the sequence (not all might be connected)
quads = np.array([[1, 2, 1, 2]], dtype=np.uint32)
Olivier Kaufmann
committed
else:
elec = np.sort(np.unique(self.sequence.flatten())) # assumed order
quads = np.vstack([
elec[:-1],
elec[1:],
elec[:-1],
elec[1:],
]).T
if self.idps:
quads[:, 2:] = 0 # we don't open Vmn to prevent burning the MN part
# as it has a smaller range of accepted voltage
export_path_rs = self.settings['export_path'].replace('.csv', '') \
+ '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '_rs.csv'

Guillaume Blanchy
committed
Olivier Kaufmann
committed
# self.run = True
if self.on_pi:
# make sure all mux are off to start with
self.reset_mux()
# measure all quad of the RS sequence
for i in range(0, quads.shape[0]):
quad = quads[i, :] # quadrupole
self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c)
d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=0.2, tx_volt=tx_volt, autogain=False)
Olivier Kaufmann
committed
voltage = tx_volt * 1000. # imposed voltage on dps5005
else:
voltage = d['Vmn [mV]']
current = d['I [mA]']
# compute resistance measured (= contact resistance)
Olivier Kaufmann
committed
resist = abs(voltage / current) / 1000.
# print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format(
msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \
f'V: {voltage :>10.3f} mV, ' \
f'R: {resist :>10.3f} kOhm'
self.exec_logger.debug(msg)
# if contact resistance = 0 -> we have a short circuit!!
Olivier Kaufmann
committed
msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm'
self.exec_logger.warning(msg)
Olivier Kaufmann
committed
# save data in a text file
self.append_and_save(export_path_rs, {
'A': quad[0],
'B': quad[1],
'RS [kOhm]': resist,
})
# close mux path and put pin back to GND
self.switch_mux_off(quad)
else:
pass
self.status = 'idle'
Olivier Kaufmann
committed
#
# # TODO if interrupted, we would need to restore the values
# # TODO or we offer the possibility in 'run_measurement' to have rs_check each time?
Olivier Kaufmann
committed
def set_sequence(self, sequence=None, cmd_id=None):
"""Sets the sequence to acquire
Parameters
----------
sequence : list, str
sequence of quadrupoles
cmd_id: str, optional
Unique command identifier
"""
self.sequence = np.array(sequence).astype(int)
# self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32')
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False
def stop(self, **kwargs):
warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning)
self.interrupt(**kwargs)
def _switch_mux(self, electrode_nr, state, role):
"""Selects the right channel for the multiplexer cascade for a given electrode.
Olivier Kaufmann
committed
Parameters
----------
electrode_nr : int
Electrode index to be switched on or off.
state : str
Either 'on' or 'off'.
role : str
Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board.
"""
if not self.use_mux or not self.on_pi:
if not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
Olivier Kaufmann
committed
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
elif self.sequence is None:
self.exec_logger.warning('Unable to switch MUX without a sequence')
else:
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
Olivier Kaufmann
committed
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
relay_nr = (electrode_nr-1) - ((electrode_nr-1) // 16) * 16
Olivier Kaufmann
committed
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT
Olivier Kaufmann
committed
if state == 'on':
Olivier Kaufmann
committed
else:
Olivier Kaufmann
committed
self.exec_logger.debug(f'Switching relay {relay_nr} '
f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}')
else:
self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
Olivier Kaufmann
committed
Olivier Kaufmann
committed
def switch_mux_on(self, quadrupole, cmd_id=None):
Olivier Kaufmann
committed
"""Switches on multiplexer relays for given quadrupole.
Olivier Kaufmann
committed
Olivier Kaufmann
committed
Parameters
----------
cmd_id : str, optional
Unique command identifier
Olivier Kaufmann
committed
quadrupole : list of 4 int
List of 4 integers representing the electrode numbers.
Olivier Kaufmann
committed
roles = ['A', 'B', 'M', 'N']
# another check to be sure A != B
if quadrupole[0] != quadrupole[1]:
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'on', roles[i])
else:
self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!')
Olivier Kaufmann
committed
def switch_mux_off(self, quadrupole, cmd_id=None):
Olivier Kaufmann
committed
"""Switches off multiplexer relays for given quadrupole.
cmd_id : str, optional
Unique command identifier
Olivier Kaufmann
committed
quadrupole : list of 4 int
List of 4 integers representing the electrode numbers.
Olivier Kaufmann
committed
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'off', roles[i])
Olivier Kaufmann
committed

Guillaume Blanchy
committed
def test_mux(self, activation_time=1.0, address=0x70):
"""Interactive method to test the multiplexer.
Parameters
----------
activation_time : float, optional
Time in seconds during which the relays are activated.
address : hex, optional
Address of the multiplexer board to test (e.g. 0x70, 0x71, ...).
"""
self.use_mux = True
self.reset_mux()
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, address)
# ask use some details on how to proceed
a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!')

Guillaume Blanchy
committed
if a == '1':
print('run channel by channel test')
electrode = int(input('Choose your electrode number (integer):'))

Guillaume Blanchy
committed
electrodes = [electrode]
elif a == '2':
electrodes = range(1, 65)
else:
print('Wrong choice !')
return
# run the test

Guillaume Blanchy
committed
for electrode_nr in electrodes:
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT
# activate relay for given time
mcp2.get_pin(relay_nr - 1).value = True
print('electrode:', electrode_nr, ' activated...', end='', flush=True)
time.sleep(activation_time)

Guillaume Blanchy
committed
mcp2.get_pin(relay_nr - 1).value = False
print(' deactivated')
time.sleep(activation_time)

Guillaume Blanchy
committed
print('Test finished.')
Olivier Kaufmann
committed
def reset_mux(self, cmd_id=None):
"""Switches off all multiplexer relays.
Parameters
----------
cmd_id : str, optional
Unique command identifier
"""
Olivier Kaufmann
committed
if self.on_pi and self.use_mux:
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
for j in range(1, self.max_elec + 1):
self._switch_mux(j, 'off', roles[i])
self.exec_logger.debug('All MUX switched off.')
elif not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
else:
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
Olivier Kaufmann
committed
def _update_acquisition_settings(self, config):
warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning)
def update_settings(self, settings: str, cmd_id=None):
Olivier Kaufmann
committed
"""Updates acquisition settings from a json file or dictionary.
Parameters can be:
- nb_electrodes (number of electrode used, if 4, no MUX needed)
- injection_duration (in seconds)
- nb_meas (total number of times the sequence will be run)
- sequence_delay (delay in second between each sequence run)
- nb_stack (number of stack for each quadrupole measurement)
- export_path (path where to export the data, timestamp will be added to filename)
Olivier Kaufmann
committed
Parameters
----------
Olivier Kaufmann
committed
Path to the .json settings file or dictionary of settings.
cmd_id : str, optional
Unique command identifier
Olivier Kaufmann
committed
"""
status = False
Olivier Kaufmann
committed
try:
if isinstance(settings, dict):
self.settings.update(settings)
Olivier Kaufmann
committed
else:
Olivier Kaufmann
committed
dic = json.load(json_file)
self.settings.update(dic)
self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings))
status = True
Olivier Kaufmann
committed
self.exec_logger.warning('Unable to update settings.')
status = False
Olivier Kaufmann
committed
else:
Olivier Kaufmann
committed
self.exec_logger.warning('Settings are missing...')
return status
Olivier Kaufmann
committed
# Properties
@property
def sequence(self):
"""Gets sequence"""
if self._sequence is not None:
assert isinstance(self._sequence, np.ndarray)
return self._sequence
Olivier Kaufmann
committed
Olivier Kaufmann
committed
@sequence.setter
def sequence(self, sequence):
"""Sets sequence"""
if sequence is not None:
assert isinstance(sequence, np.ndarray)
self.use_mux = True
else:
self.use_mux = False
self._sequence = sequence
Olivier Kaufmann
committed
print(colored(r' ________________________________' + '\n' +
r'| _ | | | || \/ || ___ \_ _|' + '\n' +
r'| | | | |_| || . . || |_/ / | |' + '\n' +
r'| | | | _ || |\/| || __/ | |' + '\n' +
r'\ \_/ / | | || | | || | _| |_' + '\n' +
r' \___/\_| |_/\_| |_/\_| \___/ ', 'red'))
print('Version:', VERSION)
Olivier Kaufmann
committed
platform, on_pi = get_platform()
Olivier Kaufmann
committed
print(colored(f'\u2611 Running on {platform} platform', 'green'))
Olivier Kaufmann
committed
# TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...)
# and emit a warning otherwise
if not arm64_imports:
print(colored(f'Warning: Required packages are missing.\n'
f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow'))
Olivier Kaufmann
committed
print(colored(f'\u26A0 Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))
Olivier Kaufmann
committed
print(f'local date and time : {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
# for testing
if __name__ == "__main__":
ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
Olivier Kaufmann
committed
if ohmpi.controller is not None:
ohmpi.controller.loop_forever()