Commit 40c657f2 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Replaces tcp listener by controller based on mqtt

Showing with 122 additions and 117 deletions
+122 -117
......@@ -2,7 +2,7 @@ import logging
from paho.mqtt.client import MQTTv31
mqtt_broker = 'localhost'
mqtt_broker = 'mg3d-dev.umons.ac.be' # TODO: set back to 'localhost'
logging_suffix = '_interactive'
# OhmPi configuration
OHMPI_CONFIG = {
......@@ -16,14 +16,14 @@ OHMPI_CONFIG = {
'integer': 2, # Max value 10 # TODO: Explain what this is...
'version': 2,
'max_elec': 64,
'board_address': {'A': 0x70, 'B': 0x71, 'M': 0x72, 'N': 0x73}, # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70}
'board_addresses': {'A': 0x70, 'B': 0x71, 'M': 0x72, 'N': 0x73}, # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70}
'settings': 'ohmpi_settings.json'
} # TODO: add a dictionary with INA models and associated gain values
CONTROL_CONFIG = {
'tcp_port': 5555,
'interface': 'mqtt_interface.py' # 'http_interface'
}
# CONTROL_CONFIG = {
# 'tcp_port': 5555,
# 'interface': 'mqtt_interface.py' # 'http_interface'
# }
# Execution logging configuration
EXEC_LOGGING_CONFIG = {
'logging_level': logging.DEBUG,
......
import paho.mqtt.client as mqtt
from config import MQTT_CONTROL_CONFIG, CONTROL_CONFIG, OHMPI_CONFIG
import time
from queue import Queue
# from queue import Queue
import zmq
ctrl_queue = Queue()
# ctrl_queue = Queue()
def on_message(client, userdata, message):
......@@ -50,5 +50,5 @@ if broker_connected:
while True:
time.sleep(.1)
else:
print("Unable to connect to broker")
print("Unable to connect to control broker")
exit(1)
......@@ -14,13 +14,12 @@ import numpy as np
import csv
import time
from io import StringIO
import zmq
from datetime import datetime
from termcolor import colored
import threading
import paho.mqtt.client as mqtt_client
from logging_setup import setup_loggers
from config import CONTROL_CONFIG, OHMPI_CONFIG
from subprocess import Popen
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
# finish import (done only when class is instantiated as some libs are only available on arm64 platform)
try:
......@@ -61,11 +60,10 @@ class OhmPi(object):
if on_pi is None:
_, on_pi = OhmPi.get_platform()
self.sequence = sequence
self.on_pi = on_pi # True if run from the RaspberryPi with the hardware, otherwise False for random data
self.status = 'idle' # either running or idle
# self.run = False # flag is True when measuring
self.thread = None # contains the handle for the thread taking the measurement
# self.path = 'data/' # where to save the .csv
# set loggers
config_exec_logger, _, config_data_logger, _, _ = setup_loggers(mqtt=mqtt) # TODO: add SOH
......@@ -77,6 +75,31 @@ class OhmPi(object):
print(colored(f'Data logger {self.data_logger.handlers if self.data_logger is not None else "None"}', 'blue'))
print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue'))
# set controller
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except Exception as e:
self.exec_logger.debug(f'Unable to connect control broker: {e}')
self.exec_logger.info('trying again to connect to control broker...')
time.sleep(2)
trials += 1
if broker_connected:
self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else:
self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
self.controller = None
# read in hardware parameters (config.py)
self._read_hardware_config()
......@@ -98,8 +121,6 @@ class OhmPi(object):
# read quadrupole sequence
if sequence is not None:
# self.sequence = np.array([[1, 2, 3, 4]], dtype=np.int32)
# else:
self.read_quad(sequence)
# connect to components on the OhmPi board
......@@ -118,9 +139,20 @@ class OhmPi(object):
# Starts the command processing thread
self.cmd_listen = True
self.cmd_thread = threading.Thread(target=self.process_commands)
self.cmd_thread = threading.Thread(target=self._control)
self.cmd_thread.start()
def _control(self):
def on_message(client, userdata, message):
command = message.payload.decode('utf-8')
self.exec_logger.debug(f'Received command {command}')
self.process_commands(command)
self.controller.on_message = on_message
self.controller.loop_start()
while True:
time.sleep(.5)
def _update_acquisition_settings(self, config):
"""Update acquisition settings from a json file or dictionary.
Parameters can be:
......@@ -159,7 +191,7 @@ class OhmPi(object):
self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack
self.version = OHMPI_CONFIG['version'] # hardware version
self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes
self.board_address = OHMPI_CONFIG['board_address']
self.board_addresses = OHMPI_CONFIG['board_addresses']
self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}')
@staticmethod
......@@ -260,6 +292,8 @@ class OhmPi(object):
if sequence is not None:
self.exec_logger.info('Sequence of {:d} quadrupoles read.'.format(sequence.shape[0]))
else:
self.exec_logger.warning(f'Unable to load sequence {filename}')
self.sequence = sequence
......@@ -279,28 +313,13 @@ class OhmPi(object):
pass
else:
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role])
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
# find I2C address of the electrode and corresponding relay
# TODO from number of electrode, the below can be guessed
# considering that one MCP23017 can cover 16 electrodes
electrode_nr = electrode_nr - 1 # switch to 0 indexing
i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16
relay_nr = relay_nr + 1 # switch back to 1 based indexing
# if electrode_nr < 17:
# i2c_address = 7
# relay_nr = electrode_nr
# elif 16 < electrode_nr < 33:
# i2c_address = 6
# relay_nr = electrode_nr - 16
# elif 32 < electrode_nr < 49:
# i2c_address = 5
# relay_nr = electrode_nr - 32
# elif 48 < electrode_nr < 65:
# i2c_address = 4
# relay_nr = electrode_nr - 48
i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16 +1
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
......@@ -375,7 +394,7 @@ class OhmPi(object):
self.exec_logger.debug(f'Setting gain to {gain}')
return gain
def run_measurement(self, quad=[0,0,0,0], nb_stack=None, injection_duration=None):
def run_measurement(self, quad=None, nb_stack=None, injection_duration=None):
""" Do a 4 electrode measurement and measure transfer resistance obtained.
Parameters
......@@ -390,7 +409,8 @@ class OhmPi(object):
"""
# TODO here we can add the current_injected or voltage_injected in mA or mV
# check arguments
if quad is None:
quad = [0, 0, 0, 0]
self.exec_logger.debug('Starting measurement')
self.exec_logger.info('Waiting for data')
......@@ -529,7 +549,7 @@ class OhmPi(object):
# create custom sequence where MN == AB
# we only check the electrodes which are in the sequence (not all might be connected)
if self.sequence is None:
quads = np.array([[1,2,1,2]])
quads = np.array([[1, 2, 1, 2]])
else:
elec = np.sort(np.unique(self.sequence.flatten())) # assumed order
quads = np.vstack([
......@@ -647,78 +667,58 @@ class OhmPi(object):
w.writerow(last_measurement)
# last_measurement.to_csv(f, header=True)
def process_commands(self):
context = zmq.Context()
tcp_port = CONTROL_CONFIG["tcp_port"]
socket = context.socket(zmq.REP)
socket.bind(f'tcp://*:{tcp_port}')
print(colored(f'Listening to commands on tcp port {tcp_port}.'
f' Make sure your client interface is running and bound to this port...', 'blue'))
self.exec_logger.debug(f'Start listening for commands on port {tcp_port}')
while self.cmd_listen:
try:
message = socket.recv() # flags=zmq.NOBLOCK)
self.exec_logger.debug(f'Received command: {message}')
e = None
try:
cmd_id = None
decoded_message = json.loads(message.decode('utf-8'))
cmd_id = decoded_message.pop('cmd_id', None)
cmd = decoded_message.pop('cmd', None)
args = decoded_message.pop('args', None)
status = False
e = None
if cmd is not None and cmd_id is not None:
if cmd == 'update_settings' and args is not None:
self._update_acquisition_settings(args)
status = True
elif cmd == 'set_sequence' and args is not None:
self.sequence = np.loadtxt(StringIO(args))
status = True
elif cmd == 'start':
self.measure(cmd_id)
while not self.status == 'idle':
time.sleep(0.1)
status = True
elif cmd == 'stop':
self.stop()
status = True
elif cmd == 'read_sequence':
try:
self.read_quad(args)
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to read sequence: {e}')
elif cmd == 'set_sequence':
try:
self.sequence = np.array(args)
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}')
elif cmd == 'rs_check':
try:
self.rs_check()
status = True
except Exception as e:
print('error====', e)
self.exec_logger.warning(f'Unable to run rs-check: {e}')
else:
self.exec_logger.warning(f'Unknown command {cmd} - cmd_id: {cmd_id}')
except Exception as e:
self.exec_logger.warning(f'Unable to decode command {message}: {e}')
status = False
finally:
reply = {'cmd_id': cmd_id, 'status': status}
reply = json.dumps(reply)
self.exec_logger.debug(f'Execution report: {reply}')
reply = reply.encode('utf-8')
socket.send(reply)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
pass # no message was ready (yet!)
def process_commands(self, command):
try:
cmd_id = None
decoded_message = json.loads(command)
cmd_id = decoded_message.pop('cmd_id', None)
cmd = decoded_message.pop('cmd', None)
args = decoded_message.pop('args', None)
status = False
e = None
if cmd is not None and cmd_id is not None:
if cmd == 'update_settings' and args is not None:
self._update_acquisition_settings(args)
status = True
elif cmd == 'set_sequence' and args is not None:
self.sequence = np.loadtxt(StringIO(args))
status = True
elif cmd == 'start':
self.measure(cmd_id)
while not self.status == 'idle':
time.sleep(0.1)
status = True
elif cmd == 'stop':
self.stop()
status = True
elif cmd == 'read_sequence':
try:
self.read_quad(args)
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to read sequence: {e}')
elif cmd == 'set_sequence':
try:
self.sequence = np.array(args)
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}')
elif cmd == 'rs_check':
try:
self.rs_check()
status = True
except Exception as e:
print('error====', e)
self.exec_logger.warning(f'Unable to run rs-check: {e}')
else:
self.exec_logger.error(f'Unexpected error while process: {e}')
self.exec_logger.warning(f'Unknown command {cmd} - cmd_id: {cmd_id}')
except Exception as e:
self.exec_logger.warning(f'Unable to decode command {command}: {e}')
status = False
finally:
reply = {'cmd_id': cmd_id, 'status': status}
reply = json.dumps(reply)
self.exec_logger.debug(f'Execution report: {reply}')
def measure(self, cmd_id=None):
"""Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'.
......@@ -726,6 +726,7 @@ class OhmPi(object):
# self.run = True
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
def func():
for g in range(0, self.settings["nbr_meas"]): # for time-lapse monitoring
......@@ -736,15 +737,22 @@ class OhmPi(object):
# create filename with timestamp
filename = self.settings["export_path"].replace('.csv',
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
self.exec_logger.debug(f'Saving to {filename}')
# make sure all multiplexer are off
self.reset_mux()
# measure all quadrupole of the sequence
for i in range(0, self.sequence.shape[0]):
quad = self.sequence[i, :] # quadrupole
if self.sequence is None:
n = 1
else:
n = self.sequence.shape[0]
for i in range(0, n):
if self.sequence is None:
quad = np.array([0, 0, 0, 0])
else:
quad = self.sequence[i, :] # quadrupole
if self.status == 'stopping':
break
......@@ -765,7 +773,7 @@ class OhmPi(object):
print(f'{acquired_data}')
# save data and print in a text file
self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'{i+1:d}/{self.sequence.shape[0]:d}')
self.exec_logger.debug(f'{i+1:d}/{n:d}')
# compute time needed to take measurement and subtract it from interval
# between two sequence run (= sequence_delay)
......@@ -830,7 +838,4 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
# for testing
if __name__ == "__main__":
# start interface
Popen(['python', CONTROL_CONFIG['interface']])
print('done')
ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment