Commit 82b170fd authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

Merge branch 'mqtt' of https://gitlab.irstea.fr/reversaal/OhmPi into mqtt

Showing with 273 additions and 12 deletions
+273 -12
...@@ -179,7 +179,7 @@ files (.json and .py). ...@@ -179,7 +179,7 @@ files (.json and .py).
***MQTT interface*** ***MQTT interface***
Interface to communicate with the Pi designed for the Internet of Things (IoT). This interface enable to control a network of OhmPi remotely through an MQTT broker. An example of MQTT broker that can be used is `Mosquitto <https://mosquitto.org/>`_. Commands are received by ohmpi.py script and processed. All commands are sent in JSON format following the Python API with args and kwargs: Interface to communicate with the Pi designed for the Internet of Things (IoT). This interface allows to control a single OhmPi, a network of OhmPis, or auxilliary instruments remotely through an MQTT broker. An example of MQTT broker that can be used is `Mosquitto <https://mosquitto.org/>`_. Depending on the experiment needs, MQTT brokers can be set up locally on the Raspberry Pi, on a master Raspberry Pi or on a local or remote server. Commands are received by ohmpi.py script via the mqtt_interface.py script and further processed. All commands are sent in JSON format following the Python API with args and kwargs:
.. code-block:: json .. code-block:: json
:caption: Updating acquisition settings. :caption: Updating acquisition settings.
...@@ -206,7 +206,15 @@ Interface to communicate with the Pi designed for the Internet of Things (IoT). ...@@ -206,7 +206,15 @@ Interface to communicate with the Pi designed for the Internet of Things (IoT).
"cmd_id": "3fzxv121UITwGjWYgcz4xw", "cmd_id": "3fzxv121UITwGjWYgcz4xw",
"cmd": "rs_check", "cmd": "rs_check",
} }
.. code-block:: json
:caption: Running a sequence.
{
"cmd_id": "3fzxv121UITwGjWYgcz4Yw",
"cmd": "run_sequence",
}
.. code-block:: json .. code-block:: json
:caption: Running same sequence multiple times (nb_meas). :caption: Running same sequence multiple times (nb_meas).
...@@ -220,11 +228,10 @@ Interface to communicate with the Pi designed for the Internet of Things (IoT). ...@@ -220,11 +228,10 @@ Interface to communicate with the Pi designed for the Internet of Things (IoT).
{ {
"cmd_id": "3fzxv121UITwGjWYgcz4xw", "cmd_id": "3fzxv121UITwGjWYgcz4xw",
"cmd": "update_settings", "cmd": "interrupt",
} }
......
...@@ -195,11 +195,6 @@ class OhmPi(object): ...@@ -195,11 +195,6 @@ class OhmPi(object):
def on_message(client, userdata, message): def on_message(client, userdata, message):
command = message.payload.decode('utf-8') command = message.payload.decode('utf-8')
self.exec_logger.debug(f'Received command {command}') self.exec_logger.debug(f'Received command {command}')
# dic = json.loads(command)
# if dic['cmd_id'] != self.cmd_id:
# self.cmd_id = dic['cmd_id']
# payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'})
# publish.single(payload=payload, **publisher_config)
self._process_commands(command) self._process_commands(command)
self.controller.on_message = on_message self.controller.on_message = on_message
...@@ -388,12 +383,12 @@ class OhmPi(object): ...@@ -388,12 +383,12 @@ class OhmPi(object):
def _find_identical_in_line(quads): def _find_identical_in_line(quads):
"""Finds quadrupole where A and B are identical. """Finds quadrupole where A and B are identical.
If A and B are connected to the same electrode, the Pi burns (short-circuit). If A and B are connected to the same electrode, the Pi burns (short-circuit).
Parameters Parameters
---------- ----------
quads : numpy.ndarray quads : numpy.ndarray
List of quadrupoles of shape nquad x 4 or 1D vector of shape nquad. List of quadrupoles of shape nquad x 4 or 1D vector of shape nquad.
Returns Returns
------- -------
output : numpy.ndarray 1D array of int output : numpy.ndarray 1D array of int
...@@ -625,7 +620,7 @@ class OhmPi(object): ...@@ -625,7 +620,7 @@ class OhmPi(object):
""" """
self.exec_logger.debug('Starting measurement') self.exec_logger.debug('Starting measurement')
self.exec_logger.info('Waiting for data') # do we need this as info? debug is enough I think (gb) self.exec_logger.debug('Waiting for data')
# check arguments # check arguments
if quad is None: if quad is None:
...@@ -920,6 +915,121 @@ class OhmPi(object): ...@@ -920,6 +915,121 @@ class OhmPi(object):
self.thread = threading.Thread(target=func) self.thread = threading.Thread(target=func)
self.thread.start() self.thread.start()
def run_sequence(self, cmd_id=None, **kwargs):
"""Runs sequence synchronously (=blocking on main thread).
Additional arguments are passed to run_measurement().
"""
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
t0 = time.time()
# create filename with timestamp
filename = self.settings["export_path"].replace('.csv',
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
self.exec_logger.debug(f'Saving to {filename}')
# make sure all multiplexer are off
self.reset_mux()
# measure all quadrupole of the sequence
if self.sequence is None:
n = 1
else:
n = self.sequence.shape[0]
for i in range(0, n):
if self.sequence is None:
quad = np.array([0, 0, 0, 0])
else:
quad = self.sequence[i, :] # quadrupole
if self.status == 'stopping':
break
# call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad)
# run a measurement
if self.on_pi:
acquired_data = self.run_measurement(quad, **kwargs)
else: # for testing, generate random data
acquired_data = {
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
'R [ohm]': np.abs(np.random.randn(1))
}
# switch mux off
self.switch_mux_off(quad)
# add command_id in dataset
acquired_data.update({'cmd_id': cmd_id})
# log data to the data logger
# self.data_logger.info(f'{acquired_data}')
# save data and print in a text file
self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}')
self.status = 'idle'
def run_sequence_async(self, cmd_id=None, **kwargs):
"""Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
Additional arguments are passed to run_measurement().
Parameters
----------
cmd_id:
"""
def func():
self.run_sequence(**kwargs)
self.thread = threading.Thread(target=func)
self.thread.start()
self.status = 'idle'
def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs):
"""Runs multiple sequences in a separate thread for monitoring mode.
Can be stopped by 'OhmPi.interrupt()'.
Additional arguments are passed to run_measurement().
Parameters
----------
cmd_id :
sequence_delay : int, optional
Number of seconds at which the sequence must be started from each others.
nb_meas : int, optional
Number of time the sequence must be repeated.
kwargs : dict, optional
See help(k.run_measurement) for more info.
"""
# self.run = True
if sequence_delay is None:
sequence_delay = self.settings['sequence_delay']
sequence_delay = int(sequence_delay)
if nb_meas is None:
nb_meas = self.settings['nb_meas']
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
def func():
for g in range(0, nb_meas): # for time-lapse monitoring
if self.status == 'stopping':
self.exec_logger.warning('Data acquisition interrupted')
break
t0 = time.time()
self.run_sequence(**kwargs)
# sleeping time between sequence
dt = sequence_delay - (time.time() - t0)
if dt < 0:
dt = 0
if nb_meas > 1:
time.sleep(dt) # waiting for next measurement (time-lapse)
self.status = 'idle'
self.thread = threading.Thread(target=func)
self.thread.start()
def run_sequence(self, cmd_id=None, **kwargs): def run_sequence(self, cmd_id=None, **kwargs):
"""Runs sequence synchronously (=blocking on main thread). """Runs sequence synchronously (=blocking on main thread).
Additional arguments are passed to run_measurement(). Additional arguments are passed to run_measurement().
...@@ -1064,6 +1174,91 @@ class OhmPi(object): ...@@ -1064,6 +1174,91 @@ class OhmPi(object):
# # TODO if interrupted, we would need to restore the values # # TODO if interrupted, we would need to restore the values
# # TODO or we offer the possibility in 'run_measurement' to have rs_check each time? # # TODO or we offer the possibility in 'run_measurement' to have rs_check each time?
def set_sequence(self, sequence=None):
try:
self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32')
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False
def stop(self):
warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning)
self.interrupt()
def _switch_mux(self, electrode_nr, state, role):
"""Selects the right channel for the multiplexer cascade for a given electrode.
Parameters
----------
electrode_nr : int
Electrode index to be switched on or off.
state : str
Either 'on' or 'off'.
role : str
Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board.
"""
if not self.use_mux or not self.on_pi:
if not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
else:
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
elif self.sequence is None:
self.exec_logger.warning('Unable to switch MUX without a sequence')
else:
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT
if state == 'on':
mcp2.get_pin(relay_nr - 1).value = True
else:
mcp2.get_pin(relay_nr - 1).value = False
self.exec_logger.debug(f'Switching relay {relay_nr} '
f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}')
else:
self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
def switch_mux_on(self, quadrupole):
"""Switches on multiplexer relays for given quadrupole.
Parameters
----------
quadrupole : list of 4 int
List of 4 integers representing the electrode numbers.
"""
roles = ['A', 'B', 'M', 'N']
# another check to be sure A != B
if quadrupole[0] != quadrupole[1]:
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'on', roles[i])
else:
self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!')
def switch_mux_off(self, quadrupole):
"""Switches off multiplexer relays for given quadrupole.
Parameters
----------
quadrupole : list of 4 int
List of 4 integers representing the electrode numbers.
"""
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'off', roles[i])
def set_sequence(self, sequence=None): def set_sequence(self, sequence=None):
try: try:
self.sequence = np.array(sequence).astype(int) self.sequence = np.array(sequence).astype(int)
...@@ -1210,6 +1405,65 @@ class OhmPi(object): ...@@ -1210,6 +1405,65 @@ class OhmPi(object):
assert isinstance(self._sequence, np.ndarray) assert isinstance(self._sequence, np.ndarray)
return self._sequence return self._sequence
def reset_mux(self):
"""Switches off all multiplexer relays."""
if self.on_pi and self.use_mux:
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
for j in range(1, self.max_elec + 1):
self._switch_mux(j, 'off', roles[i])
self.exec_logger.debug('All MUX switched off.')
elif not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
else:
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
def _update_acquisition_settings(self, config):
warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning)
self.update_settings(config)
def update_settings(self, config):
"""Updates acquisition settings from a json file or dictionary.
Parameters can be:
- nb_electrodes (number of electrode used, if 4, no MUX needed)
- injection_duration (in seconds)
- nb_meas (total number of times the sequence will be run)
- sequence_delay (delay in second between each sequence run)
- nb_stack (number of stack for each quadrupole measurement)
- export_path (path where to export the data, timestamp will be added to filename)
Parameters
----------
config : str, dict
Path to the .json settings file or dictionary of settings.
"""
status = False
if config is not None:
try:
if isinstance(config, dict):
self.settings.update(config)
else:
with open(config) as json_file:
dic = json.load(json_file)
self.settings.update(dic)
self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings))
status = True
except Exception as e:
self.exec_logger.warning('Unable to update settings.')
status = False
else:
self.exec_logger.warning('Settings are missing...')
return status
# Properties
@property
def sequence(self):
"""Gets sequence"""
if self._sequence is not None:
assert isinstance(self._sequence, np.ndarray)
return self._sequence
@sequence.setter @sequence.setter
def sequence(self, sequence): def sequence(self, sequence):
"""Sets sequence""" """Sets sequence"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment