From 40c657f205b17b5742c3c9bcb802e3588c4ac6b7 Mon Sep 17 00:00:00 2001
From: su530201 <olivier.kaufmann@umons.ac.be>
Date: Thu, 27 Oct 2022 22:52:27 +0200
Subject: [PATCH] Replaces tcp listener by controller based on mqtt

---
 config.py         |  12 +--
 mqtt_interface.py |   6 +-
 ohmpi.py          | 221 ++++++++++++++++++++++++----------------------
 3 files changed, 122 insertions(+), 117 deletions(-)

diff --git a/config.py b/config.py
index 43cbe211..d6864e65 100644
--- a/config.py
+++ b/config.py
@@ -2,7 +2,7 @@ import logging
 
 from paho.mqtt.client import MQTTv31
 
-mqtt_broker = 'localhost'
+mqtt_broker = 'mg3d-dev.umons.ac.be'  # TODO: set back to 'localhost'
 logging_suffix = '_interactive'
 # OhmPi configuration
 OHMPI_CONFIG = {
@@ -16,14 +16,14 @@ OHMPI_CONFIG = {
     'integer': 2,  # Max value 10 # TODO: Explain what this is...
     'version': 2,
     'max_elec': 64,
-    'board_address': {'A': 0x70, 'B': 0x71, 'M': 0x72, 'N': 0x73},  # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70}
+    'board_addresses': {'A': 0x70, 'B': 0x71, 'M': 0x72, 'N': 0x73},  # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70}
     'settings': 'ohmpi_settings.json'
 }  # TODO: add a dictionary with INA models and associated gain values
 
-CONTROL_CONFIG = {
-    'tcp_port': 5555,
-    'interface': 'mqtt_interface.py'  # 'http_interface'
-}
+# CONTROL_CONFIG = {
+#     'tcp_port': 5555,
+#     'interface': 'mqtt_interface.py'  # 'http_interface'
+# }
 # Execution logging configuration
 EXEC_LOGGING_CONFIG = {
     'logging_level': logging.DEBUG,
diff --git a/mqtt_interface.py b/mqtt_interface.py
index 1eaf661e..1da36cc6 100644
--- a/mqtt_interface.py
+++ b/mqtt_interface.py
@@ -1,10 +1,10 @@
 import paho.mqtt.client as mqtt
 from config import MQTT_CONTROL_CONFIG, CONTROL_CONFIG, OHMPI_CONFIG
 import time
-from queue import Queue
+# from queue import Queue
 import zmq
 
-ctrl_queue = Queue()
+# ctrl_queue = Queue()
 
 
 def on_message(client, userdata, message):
@@ -50,5 +50,5 @@ if broker_connected:
     while True:
         time.sleep(.1)
 else:
-    print("Unable to connect to broker")
+    print("Unable to connect to control broker")
     exit(1)
diff --git a/ohmpi.py b/ohmpi.py
index 4905f1c9..52e1dc26 100644
--- a/ohmpi.py
+++ b/ohmpi.py
@@ -14,13 +14,12 @@ import numpy as np
 import csv
 import time
 from io import StringIO
-import zmq
 from datetime import datetime
 from termcolor import colored
 import threading
+import paho.mqtt.client as mqtt_client
 from logging_setup import setup_loggers
-from config import CONTROL_CONFIG, OHMPI_CONFIG
-from subprocess import Popen
+from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
 
 # finish import (done only when class is instantiated as some libs are only available on arm64 platform)
 try:
@@ -61,11 +60,10 @@ class OhmPi(object):
         if on_pi is None:
             _, on_pi = OhmPi.get_platform()
         self.sequence = sequence
+
         self.on_pi = on_pi  # True if run from the RaspberryPi with the hardware, otherwise False for random data
         self.status = 'idle'  # either running or idle
-        # self.run = False  # flag is True when measuring
         self.thread = None  # contains the handle for the thread taking the measurement
-        # self.path = 'data/'  # where to save the .csv
 
         # set loggers
         config_exec_logger, _, config_data_logger, _, _ = setup_loggers(mqtt=mqtt)  # TODO: add SOH
@@ -77,6 +75,31 @@ class OhmPi(object):
         print(colored(f'Data logger {self.data_logger.handlers if self.data_logger is not None else "None"}', 'blue'))
         print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue'))
 
+        # set controller
+        self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False)  # create new instance
+        print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
+        trials = 0
+        trials_max = 10
+        broker_connected = False
+        while trials < trials_max:
+            try:
+                self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
+                                                MQTT_CONTROL_CONFIG['auth']['password'])
+                self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
+                trials = trials_max
+                broker_connected = True
+            except Exception as e:
+                self.exec_logger.debug(f'Unable to connect control broker: {e}')
+                self.exec_logger.info('trying again to connect to control broker...')
+                time.sleep(2)
+                trials += 1
+        if broker_connected:
+            self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
+            self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
+        else:
+            self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
+            self.controller = None
+
         # read in hardware parameters (config.py)
         self._read_hardware_config()
 
@@ -98,8 +121,6 @@ class OhmPi(object):
 
         # read quadrupole sequence
         if sequence is not None:
-#            self.sequence = np.array([[1, 2, 3, 4]], dtype=np.int32)
-#        else:
             self.read_quad(sequence)
 
         # connect to components on the OhmPi board
@@ -118,9 +139,20 @@ class OhmPi(object):
 
         # Starts the command processing thread
         self.cmd_listen = True
-        self.cmd_thread = threading.Thread(target=self.process_commands)
+        self.cmd_thread = threading.Thread(target=self._control)
         self.cmd_thread.start()
 
+    def _control(self):
+        def on_message(client, userdata, message):
+            command = message.payload.decode('utf-8')
+            self.exec_logger.debug(f'Received command {command}')
+            self.process_commands(command)
+
+        self.controller.on_message = on_message
+        self.controller.loop_start()
+        while True:
+            time.sleep(.5)
+
     def _update_acquisition_settings(self, config):
         """Update acquisition settings from a json file or dictionary.
         Parameters can be:
@@ -159,7 +191,7 @@ class OhmPi(object):
         self.nb_samples = OHMPI_CONFIG['integer']  # number of samples measured for each stack
         self.version = OHMPI_CONFIG['version']  # hardware version
         self.max_elec = OHMPI_CONFIG['max_elec']  # maximum number of electrodes
-        self.board_address = OHMPI_CONFIG['board_address']
+        self.board_addresses = OHMPI_CONFIG['board_addresses']
         self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}')
 
     @staticmethod
@@ -260,6 +292,8 @@ class OhmPi(object):
 
         if sequence is not None:
             self.exec_logger.info('Sequence of {:d} quadrupoles read.'.format(sequence.shape[0]))
+        else:
+            self.exec_logger.warning(f'Unable to load sequence {filename}')
 
         self.sequence = sequence
 
@@ -279,28 +313,13 @@ class OhmPi(object):
             pass
         else:
             # choose with MUX board
-            tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role])
+            tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
 
             # find I2C address of the electrode and corresponding relay
             # TODO from number of electrode, the below can be guessed
             # considering that one MCP23017 can cover 16 electrodes
-            electrode_nr = electrode_nr - 1  # switch to 0 indexing
-            i2c_address = 7 - electrode_nr // 16  # quotient without rest of the division
-            relay_nr = electrode_nr - (electrode_nr // 16) * 16
-            relay_nr = relay_nr + 1  # switch back to 1 based indexing
-
-            # if electrode_nr < 17:
-            #     i2c_address = 7
-            #     relay_nr = electrode_nr
-            # elif 16 < electrode_nr < 33:
-            #     i2c_address = 6
-            #     relay_nr = electrode_nr - 16
-            # elif 32 < electrode_nr < 49:
-            #     i2c_address = 5
-            #     relay_nr = electrode_nr - 32
-            # elif 48 < electrode_nr < 65:
-            #     i2c_address = 4
-            #     relay_nr = electrode_nr - 48
+            i2c_address = 7 - (electrode_nr - 1) // 16  # quotient without rest of the division
+            relay_nr = electrode_nr - (electrode_nr // 16) * 16 +1
 
             if i2c_address is not None:
                 # select the MCP23017 of the selected MUX board
@@ -375,7 +394,7 @@ class OhmPi(object):
         self.exec_logger.debug(f'Setting gain to {gain}')
         return gain
 
-    def run_measurement(self, quad=[0,0,0,0], nb_stack=None, injection_duration=None):
+    def run_measurement(self, quad=None, nb_stack=None, injection_duration=None):
         """ Do a 4 electrode measurement and measure transfer resistance obtained.
 
         Parameters
@@ -390,7 +409,8 @@ class OhmPi(object):
         """
         # TODO here we can add the current_injected or voltage_injected in mA or mV
         # check arguments
-
+        if quad is None:
+            quad = [0, 0, 0, 0]
         self.exec_logger.debug('Starting measurement')
         self.exec_logger.info('Waiting for data')
 
@@ -529,7 +549,7 @@ class OhmPi(object):
         # create custom sequence where MN == AB
         # we only check the electrodes which are in the sequence (not all might be connected)
         if self.sequence is None:
-            quads = np.array([[1,2,1,2]])
+            quads = np.array([[1, 2, 1, 2]])
         else:
             elec = np.sort(np.unique(self.sequence.flatten()))  # assumed order
             quads = np.vstack([
@@ -647,78 +667,58 @@ class OhmPi(object):
                 w.writerow(last_measurement)
                 # last_measurement.to_csv(f, header=True)
 
-    def process_commands(self):
-        context = zmq.Context()
-        tcp_port = CONTROL_CONFIG["tcp_port"]
-        socket = context.socket(zmq.REP)
-        socket.bind(f'tcp://*:{tcp_port}')
-
-        print(colored(f'Listening to commands on tcp port {tcp_port}.'
-                      f' Make sure your client interface is running and bound to this port...', 'blue'))
-        self.exec_logger.debug(f'Start listening for commands on port {tcp_port}')
-        while self.cmd_listen:
-            try:
-                message = socket.recv() # flags=zmq.NOBLOCK)
-                self.exec_logger.debug(f'Received command: {message}')
-                e = None
-                try:
-                    cmd_id = None
-                    decoded_message = json.loads(message.decode('utf-8'))
-                    cmd_id = decoded_message.pop('cmd_id', None)
-                    cmd = decoded_message.pop('cmd', None)
-                    args = decoded_message.pop('args', None)
-                    status = False
-                    e = None
-                    if cmd is not None and cmd_id is not None:
-                        if cmd == 'update_settings' and args is not None:
-                            self._update_acquisition_settings(args)
-                            status = True
-                        elif cmd == 'set_sequence' and args is not None:
-                            self.sequence = np.loadtxt(StringIO(args))
-                            status = True
-                        elif cmd == 'start':
-                            self.measure(cmd_id)
-                            while not self.status == 'idle':
-                                time.sleep(0.1)
-                            status = True
-                        elif cmd == 'stop':
-                            self.stop()
-                            status = True
-                        elif cmd == 'read_sequence':
-                            try:
-                                self.read_quad(args)
-                                status = True
-                            except Exception as e:
-                                self.exec_logger.warning(f'Unable to read sequence: {e}')
-                        elif cmd == 'set_sequence':
-                            try:
-                                self.sequence = np.array(args)
-                                status = True
-                            except Exception as e:
-                                self.exec_logger.warning(f'Unable to set sequence: {e}')
-                        elif cmd == 'rs_check':
-                            try:
-                                self.rs_check()
-                                status = True
-                            except Exception as e:
-                                print('error====', e)
-                                self.exec_logger.warning(f'Unable to run rs-check: {e}')
-                        else:
-                            self.exec_logger.warning(f'Unknown command {cmd} - cmd_id: {cmd_id}')
-                except Exception as e:
-                    self.exec_logger.warning(f'Unable to decode command {message}: {e}')
-                    status = False
-                finally:
-                    reply = {'cmd_id': cmd_id, 'status': status}
-                    reply = json.dumps(reply)
-                    self.exec_logger.debug(f'Execution report: {reply}')
-                    reply = reply.encode('utf-8')
-                    socket.send(reply)
-            except zmq.ZMQError as e:
-                if e.errno == zmq.EAGAIN:
-                    pass # no message was ready (yet!)
+    def process_commands(self, command):
+        try:
+            cmd_id = None
+            decoded_message = json.loads(command)
+            cmd_id = decoded_message.pop('cmd_id', None)
+            cmd = decoded_message.pop('cmd', None)
+            args = decoded_message.pop('args', None)
+            status = False
+            e = None
+            if cmd is not None and cmd_id is not None:
+                if cmd == 'update_settings' and args is not None:
+                    self._update_acquisition_settings(args)
+                    status = True
+                elif cmd == 'set_sequence' and args is not None:
+                    self.sequence = np.loadtxt(StringIO(args))
+                    status = True
+                elif cmd == 'start':
+                    self.measure(cmd_id)
+                    while not self.status == 'idle':
+                        time.sleep(0.1)
+                    status = True
+                elif cmd == 'stop':
+                    self.stop()
+                    status = True
+                elif cmd == 'read_sequence':
+                    try:
+                        self.read_quad(args)
+                        status = True
+                    except Exception as e:
+                        self.exec_logger.warning(f'Unable to read sequence: {e}')
+                elif cmd == 'set_sequence':
+                    try:
+                        self.sequence = np.array(args)
+                        status = True
+                    except Exception as e:
+                        self.exec_logger.warning(f'Unable to set sequence: {e}')
+                elif cmd == 'rs_check':
+                    try:
+                        self.rs_check()
+                        status = True
+                    except Exception as e:
+                        print('error====', e)
+                        self.exec_logger.warning(f'Unable to run rs-check: {e}')
                 else:
-                    self.exec_logger.error(f'Unexpected error while process: {e}')
+                    self.exec_logger.warning(f'Unknown command {cmd} - cmd_id: {cmd_id}')
+        except Exception as e:
+            self.exec_logger.warning(f'Unable to decode command {command}: {e}')
+            status = False
+        finally:
+            reply = {'cmd_id': cmd_id, 'status': status}
+            reply = json.dumps(reply)
+            self.exec_logger.debug(f'Execution report: {reply}')
 
     def measure(self, cmd_id=None):
         """Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'.
@@ -726,6 +726,7 @@ class OhmPi(object):
         # self.run = True
         self.status = 'running'
         self.exec_logger.debug(f'Status: {self.status}')
+        self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
 
         def func():
             for g in range(0, self.settings["nbr_meas"]):  # for time-lapse monitoring
@@ -736,15 +737,22 @@ class OhmPi(object):
 
                 # create filename with timestamp
                 filename = self.settings["export_path"].replace('.csv',
-                                                               f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
+                                                                f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
                 self.exec_logger.debug(f'Saving to {filename}')
 
                 # make sure all multiplexer are off
                 self.reset_mux()
 
                 # measure all quadrupole of the sequence
-                for i in range(0, self.sequence.shape[0]):
-                    quad = self.sequence[i, :]  # quadrupole
+                if self.sequence is None:
+                    n = 1
+                else:
+                    n = self.sequence.shape[0]
+                for i in range(0, n):
+                    if self.sequence is None:
+                        quad = np.array([0, 0, 0, 0])
+                    else:
+                        quad = self.sequence[i, :]  # quadrupole
                     if self.status == 'stopping':
                         break
 
@@ -765,7 +773,7 @@ class OhmPi(object):
                     print(f'{acquired_data}')
                     # save data and print in a text file
                     self.append_and_save(filename, acquired_data)
-                    self.exec_logger.debug(f'{i+1:d}/{self.sequence.shape[0]:d}')
+                    self.exec_logger.debug(f'{i+1:d}/{n:d}')
 
                 # compute time needed to take measurement and subtract it from interval
                 # between two sequence run (= sequence_delay)
@@ -830,7 +838,4 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
 
 # for testing
 if __name__ == "__main__":
-    # start interface
-    Popen(['python', CONTROL_CONFIG['interface']])
-    print('done')
     ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
-- 
GitLab