From eeeed89d32043309a35fc39add817c29b4eeda7d Mon Sep 17 00:00:00 2001
From: Guillaume <sagitta1618@gmail.com>
Date: Sat, 29 Oct 2022 21:35:03 +0100
Subject: [PATCH] harmonizing MQTT with web interface

---
 config.py         |   4 +-
 http_interface.py |  80 +++++++++++++++++++++++++----------
 index.html        |  26 ++++++------
 ohmpi.py          | 105 ++++++++++++++++++++++++++++++----------------
 4 files changed, 143 insertions(+), 72 deletions(-)

diff --git a/config.py b/config.py
index 60db22e5..0f7beb5b 100644
--- a/config.py
+++ b/config.py
@@ -80,7 +80,7 @@ MQTT_CONTROL_CONFIG = {
     'hostname': mqtt_broker,
     'port': 1883,
     'qos': 2,
-    'retain': False,
+    'retain': True,
     'keepalive': 60,
     'will': None,
     'auth': { 'username': 'mqtt_user', 'password': 'mqtt_password' },
@@ -88,5 +88,5 @@ MQTT_CONTROL_CONFIG = {
     'protocol': MQTTv31,
     'transport': 'tcp',
     'client_id': f'{OHMPI_CONFIG["id"]}',
-    'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl'
+    'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl',
 }
diff --git a/http_interface.py b/http_interface.py
index 2d938b96..2710fa53 100644
--- a/http_interface.py
+++ b/http_interface.py
@@ -23,10 +23,11 @@ publisher_config.pop('ctrl_topic')
 
 print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker."))
 cmd_id = None
+received = False
 rdic = {}
 
 
-# set controller globally as __init__ seem to be called for each request
+# set controller globally as __init__ seem to be called for each request and so we subscribe again each time (=overhead)
 controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False)  # create new instance
 print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
 trials = 0
@@ -52,6 +53,26 @@ else:
     controller = None
 
 
+# start a listener for acknowledgement
+def _control():
+    def on_message(client, userdata, message):
+        global cmd_id, rdic, received
+
+        command = json.loads(message.payload.decode('utf-8'))
+        #print('++++', cmd_id, received, command)
+        if ('reply' in command.keys()) and (command['cmd_id'] == cmd_id):
+            print(f'Acknowledgement reception of command {command} by OhmPi')
+           # print('oooooooooook', command['reply'])
+            received = True
+            #rdic = command
+
+    controller.on_message = on_message
+    controller.loop_forever()
+    
+t = threading.Thread(target=_control)
+t.start()
+
+
 class MyServer(SimpleHTTPRequestHandler):
     # because we use SimpleHTTPRequestHandler, we do not need to implement
     # the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to)
@@ -69,39 +90,52 @@ class MyServer(SimpleHTTPRequestHandler):
 
     def __init__(self, request, client_address, server):
         super().__init__(request, client_address, server)
-        global controller
-        self.controller = controller
-        self.cmd_thread = threading.Thread(target=self._control)
-
-    def _control(self):
-        def on_message(client, userdata, message):
-            global cmd_id, rdic
-
-            command = message.payload.decode('utf-8')
-            print(f'Received command {command}')
-            # self.process_commands(command)
-            if 'reply' in command.keys and command['cmd_id'] == cmd_id :
-                rdic = command
-
-        self.controller.on_message = on_message
-        self.controller.loop_start()
-        while True:
-            time.sleep(.1)
+        # global controller, once  # using global variable otherwise, we subscribe to client for EACH request
+        # if once:
+        #     self.controller = controller
+        #     self.cmd_thread = threading.Thread(target=self._control)
+        #     self.cmd_thread.start()
+        #     once = False
+
+
+    # we would like to listen to the ackn topic to check our message has been wel received
+    # by the OhmPi, however, this won't work as it seems an instance of MyServer is created
+    # each time (actually it's not a server but a requestHandler)
+    # def _control(self):
+    #     def on_message(client, userdata, message):
+    #         global cmd_id, rdic
+
+    #         command = json.loads(message.payload.decode('utf-8'))
+    #         print(f'Acknowledgement reception of command {command} by OhmPi')
+    #         if 'reply' in command.keys() and command['cmd_id'] == cmd_id :
+    #             print('oooooooooook', command['reply'])
+    #             #rdic = command
+
+    #     self.controller.on_message = on_message
+    #     print('starting loop')
+    #     self.controller.loop_forever()
+    #     print('forever')
 
     def do_POST(self):
-        global cmd_id, rdic
+        global cmd_id, rdic, received
+        received = False
         cmd_id = uuid.uuid4().hex
-        # global socket
-
-        # global ohmpiThread, status, run
         dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
+        #print('++', dic, cmd_id)
         rdic = {} # response dictionary
         if dic['cmd'] == 'run_multiple_sequences':
             payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'})
+            #print('-- payload...', end='')
             publish.single(payload=payload, **publisher_config)
+            #print('published!')
+
         elif dic['cmd'] == 'interrupt':
             payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'})
+            #for i in range(10):
             publish.single(payload=payload, **publisher_config)
+            #    time.sleep(.5)
+                # if received:
+                #     break
         elif dic['cmd'] == 'getData':
             print(dic)
             # get all .csv file in data folder
diff --git a/index.html b/index.html
index f16bc60e..3292a067 100644
--- a/index.html
+++ b/index.html
@@ -26,7 +26,7 @@
         <div class="form-check">
             <input id="dataRetrievalCheck" class="form-check-input" type="checkbox" value="">
             <label class="form-check-label" for="dataRetrievalCheck">
-                Automaticaly get data every 1 secondStart
+                Automaticaly get data every 1 second
             </label>
         </div>
         <div id='output'>Status: idle</div>
@@ -421,18 +421,20 @@
                 // update list of quadrupoles if any
                 if (quads.length == 0) {
                     console.log('updating list of quadrupoles')
-                    let df = data[surveyNames[0]]
-                    let quadSelect = document.getElementById('quadSelect')
-                    quadSelect.innerHTML = ''
-                    for (let i = 0; i < df['a'].length; i++) {
-                        quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]]
-                        quads.push(quad)
-                        let option = document.createElement('option')
-                        option.value = quad.join(', ')
-                        option.innerText = quad.join(', ')
-                        quadSelect.appendChild(option)
+                    if (surveyNames.length > 0) {
+                        let df = data[surveyNames[0]]
+                        let quadSelect = document.getElementById('quadSelect')
+                        quadSelect.innerHTML = ''
+                        for (let i = 0; i < df['a'].length; i++) {
+                            quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]]
+                            quads.push(quad)
+                            let option = document.createElement('option')
+                            option.value = quad.join(', ')
+                            option.innerText = quad.join(', ')
+                            quadSelect.appendChild(option)
+                        }
+                        console.log('quads=', quads)
                     }
-                    console.log('quads=', quads)
                 }
 
                 // update time-serie figure
diff --git a/ohmpi.py b/ohmpi.py
index 2d99748c..80efeec8 100644
--- a/ohmpi.py
+++ b/ohmpi.py
@@ -19,7 +19,6 @@ from io import StringIO
 from datetime import datetime
 from termcolor import colored
 import threading
-import paho.mqtt.client as mqtt_client
 from logging_setup import setup_loggers
 from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
 
@@ -80,29 +79,55 @@ class OhmPi(object):
         print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue'))
 
         # set controller
-        self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False)  # create new instance
-        print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
-        trials = 0
-        trials_max = 10
-        broker_connected = False
-        while trials < trials_max:
-            try:
-                self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
-                                                MQTT_CONTROL_CONFIG['auth']['password'])
-                self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
-                trials = trials_max
-                broker_connected = True
-            except Exception as e:
-                self.exec_logger.debug(f'Unable to connect control broker: {e}')
-                self.exec_logger.info('trying again to connect to control broker...')
-                time.sleep(2)
-                trials += 1
-        if broker_connected:
-            self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
-            self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
-        else:
-            self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
-            self.controller = None
+        self.mqtt = False
+        self.cmd_id = None
+        if mqtt:
+            self.mqtt = True
+            import paho.mqtt.client as mqtt_client  # if we don't use MQTT but just Python API, we don't need to install it to start the ohmpi.py
+            import paho.mqtt.publish as publish
+            self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False)  # create new instance
+            print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
+            trials = 0
+            trials_max = 10
+            broker_connected = False
+            while trials < trials_max:
+                try:
+                    self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
+                                                    MQTT_CONTROL_CONFIG['auth']['password'])
+                    self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
+                    trials = trials_max
+                    broker_connected = True
+                except Exception as e:
+                    self.exec_logger.debug(f'Unable to connect control broker: {e}')
+                    self.exec_logger.info('trying again to connect to control broker...')
+                    time.sleep(2)
+                    trials += 1
+            if broker_connected:
+                self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
+                self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
+            
+                # MQTT does not ensure that the message is delivered to the subscribed client
+                # this is important for us as we want to ensure the command has been sent AND
+                # received. Hence, upon reception of a command, we publish a message acknowledgement
+                publisher_config = MQTT_CONTROL_CONFIG.copy()
+                publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
+                publisher_config.pop('ctrl_topic')
+                
+                def on_message(client, userdata, message):
+                    command = message.payload.decode('utf-8')
+                    dic = json.loads(command)
+                    if dic['cmd_id'] != self.cmd_id:
+                        self.cmd_id = dic['cmd_id']
+                        self.exec_logger.debug(f'Received command {command}')
+                        payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'})
+                        publish.single(payload=payload, **publisher_config)
+                        self._process_commands(command)
+
+                self.controller.on_message = on_message
+    
+            else:
+                self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
+                self.controller = None
 
         # read in hardware parameters (config.py)
         self._read_hardware_config()
@@ -164,10 +189,11 @@ class OhmPi(object):
             self.pin1.direction = Direction.OUTPUT
             self.pin1.value = False
 
-        # Starts the command processing thread
-        self.cmd_listen = True
-        self.cmd_thread = threading.Thread(target=self._control)
-        self.cmd_thread.start()
+        if False:
+            # Starts the command processing thread
+            self.cmd_listen = True
+            self.cmd_thread = threading.Thread(target=self._control)
+            self.cmd_thread.start()
 
     @property
     def sequence(self):
@@ -184,8 +210,9 @@ class OhmPi(object):
         else:
             self.use_mux = False
         self._sequence = sequence
-
-    def _control(self):
+    
+    def _control(self):  # ISSUE: somehow not ALL message are catch by this method in a thread
+        # this might be due to the thread... -> that means we can miss commands!
         def on_message(client, userdata, message):
             command = message.payload.decode('utf-8')
             self.exec_logger.debug(f'Received command {command}')
@@ -989,6 +1016,7 @@ class OhmPi(object):
         -------
 
         """
+        print('yyyy', command)
         try:
             cmd_id = None
             decoded_message = json.loads(command)
@@ -1008,15 +1036,18 @@ class OhmPi(object):
                     except Exception as e:
                         self.exec_logger.warning(f'Unable to set sequence: {e}')
                         status = False
-                elif cmd == 'run_sequence':
+                elif cmd == 'run_sequence': 
+                    self.run_sequence(cmd_id=cmd_id)
+                elif cmd == 'run_sequence_async':
                     self.run_sequence_async(cmd_id=cmd_id)
-                    while not self.status == 'idle':
-                        time.sleep(0.1)
+                    #while not self.status == 'idle':  # idem for async, we need to return immediately otherwise
+                    # the interrupt command cannot be processed
+                    #    time.sleep(0.1)
                     status = True
                 elif cmd == 'run_multiple_sequences':
                     self.run_multiple_sequences(cmd_id=cmd_id)
-                    while not self.status == 'idle':
-                        time.sleep(0.1)
+                    #while not self.status == 'idle':  # we cannot do that as it's supposed to be an asynchrone command
+                    #    time.sleep(0.1)
                     status = True
                 elif cmd == 'interrupt':
                     self.interrupt()
@@ -1200,3 +1231,7 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
 # for testing
 if __name__ == "__main__":
     ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
+    def func():
+        ohmpi.controller.loop_forever()
+    t = threading.Thread(target=func)
+    t.start()
-- 
GitLab