From b14c80fdc699b3063e2816cf263c7a1f9b4a8638 Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Wed, 26 Oct 2022 17:58:41 +0200 Subject: [PATCH] Adds some improvements such as setting mqtt to false in case of unreacheable broker --- logging_setup.py | 12 +++++++---- mqtt_controller.py | 2 +- mqtt_interface.py | 2 +- mqtt_logger.py | 1 + ohmpi.py | 4 ++-- webserver.py | 53 +++++++++++++++++++++++----------------------- 6 files changed, 40 insertions(+), 34 deletions(-) diff --git a/logging_setup.py b/logging_setup.py index 05963f27..2dc30f81 100644 --- a/logging_setup.py +++ b/logging_setup.py @@ -52,10 +52,14 @@ def setup_loggers(mqtt=True): mqtt_settings = MQTT_LOGGING_CONFIG.copy() [mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']] mqtt_settings.update({'topic':MQTT_LOGGING_CONFIG['exec_topic']}) - mqtt_exec_handler = MQTTHandler(**mqtt_settings) - mqtt_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level']) - mqtt_exec_handler.setFormatter(exec_formatter) - exec_logger.addHandler(mqtt_exec_handler) + # TODO: handle the case of MQTT broker down or temporarily unavailable + try: + mqtt_exec_handler = MQTTHandler(**mqtt_settings) + mqtt_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level']) + mqtt_exec_handler.setFormatter(exec_formatter) + exec_logger.addHandler(mqtt_exec_handler) + except: + mqtt = False # Set data logging format and level log_format = '%(asctime)-15s | %(process)d | %(levelname)s: %(message)s' diff --git a/mqtt_controller.py b/mqtt_controller.py index 93abf662..dd2e6a5c 100644 --- a/mqtt_controller.py +++ b/mqtt_controller.py @@ -7,7 +7,7 @@ import time import uuid client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_controller', clean_session=False) # create new instance -print('connecting to broker') +print('connecting controller to broker') client.connect(MQTT_CONTROL_CONFIG['hostname']) client.loop_start() publisher_config = MQTT_CONTROL_CONFIG.copy() diff --git a/mqtt_interface.py b/mqtt_interface.py index 916a964c..6b56e2b2 100644 --- a/mqtt_interface.py +++ b/mqtt_interface.py @@ -20,7 +20,7 @@ def on_message(client, userdata, message): mqtt_client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_listener', clean_session=False) # create new instance -print('connecting to broker') +print('connecting command listener to broker') mqtt_client.connect(MQTT_CONTROL_CONFIG['hostname']) print('Subscribing to topic', MQTT_CONTROL_CONFIG['ctrl_topic']) mqtt_client.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) diff --git a/mqtt_logger.py b/mqtt_logger.py index 10cbbc9d..d20a7bb7 100644 --- a/mqtt_logger.py +++ b/mqtt_logger.py @@ -51,6 +51,7 @@ class MQTTHandler(logging.Handler): cleanly. """ msg = self.format(record) + publish.single(self.topic, msg, self.qos, self.retain, hostname=self.hostname, port=self.port, client_id=self.client_id, keepalive=self.keepalive, diff --git a/ohmpi.py b/ohmpi.py index d5e812b4..3ff716d3 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -744,8 +744,8 @@ class OhmPi(object): self.switch_mux_on(quad) # run a measurement - acquired_data = self.run_measurement(quad, self.settings["nb_stack"], - self.settings["injection_duration"]) + acquired_data = self.run_measurement(quad, self.settings['nb_stack'], + self.settings['injection_duration']) # switch mux off self.switch_mux_off(quad) diff --git a/webserver.py b/webserver.py index f9d5dabc..9a56b9b7 100644 --- a/webserver.py +++ b/webserver.py @@ -3,37 +3,38 @@ import time import os import json import uuid -#from ohmpi import OhmPi +# from ohmpi import OhmPi from config import CONTROL_CONFIG from termcolor import colored import threading import pandas as pd import shutil -import zmq # to write on TCP +import zmq # to write on TCP -#hostName = "raspberrypi.local" # works for AP-STA -#hostName = "192.168.50.1" # fixed IP in AP-STA mode +# hostName = "raspberrypi.local" # works for AP-STA +# hostName = "192.168.50.1" # fixed IP in AP-STA mode hostName = "0.0.0.0" # for AP mode (not AP-STA) serverPort = 8080 # https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a -#with open('ohmpi_settings.json') as json_file: +# with open('ohmpi_settings.json') as json_file: # pardict = json.load(json_file) -#ohmpi = OhmPi(pardict, sequence='dd.txt') -#ohmpi = OhmPi(pardict, sequence='dd16s0no8.txt') +# ohmpi = OhmPi(pardict, sequence='dd.txt') +# ohmpi = OhmPi(pardict, sequence='dd16s0no8.txt') tcp_port = CONTROL_CONFIG['tcp_port'] context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}') -print(colored(f'Sending commands and listenning on tcp port {tcp_port}.')) +print(colored(f'Sending commands and listening on tcp port {tcp_port}.')) -class MyServer(SimpleHTTPRequestHandler): + +class MyServer(SimpleHTTPRequestHandler): # because we use SimpleHTTPRequestHandler, we do not need to implement # the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to) - + # def do_GET(self): # # normal get for wepages (not so secure!) # print(self.command) @@ -44,22 +45,22 @@ class MyServer(SimpleHTTPRequestHandler): # self.end_headers() # with open(os.path.join('.', self.path[1:]), 'r') as f: # self.wfile.write(bytes(f.read(), "utf-8")) - + def do_POST(self): cmd_id = uuid.uuid4().hex global socket - #global ohmpiThread, status, run + # global ohmpiThread, status, run dic = json.loads(self.rfile.read(int(self.headers['Content-Length']))) - rdic = {} # response dictionnary + rdic = {} # response dictionnary if dic['command'] == 'start': - #ohmpi.measure() + # ohmpi.measure() socket.send_string(json.dumps({ 'cmd_id': cmd_id, 'command': 'start' })) elif dic['command'] == 'stop': - #ohmpi.stop() + # ohmpi.stop() socket.send_string(json.dumps({ 'cmd_id': cmd_id, 'command': 'stop' @@ -69,9 +70,9 @@ class MyServer(SimpleHTTPRequestHandler): fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] ddic = {} for fname in fnames: - if (fname.replace('.csv', '') not in dic['surveyNames'] - and fname != 'readme.txt' - and '_rs' not in fname): + if (fname.replace('.csv', '') not in dic['surveyNames'] + and fname != 'readme.txt' + and '_rs' not in fname): df = pd.read_csv('data/' + fname) ddic[fname.replace('.csv', '')] = { 'a': df['A'].tolist(), @@ -85,14 +86,14 @@ class MyServer(SimpleHTTPRequestHandler): shutil.rmtree('data') os.mkdir('data') elif dic['command'] == 'update_settings': - #ohmpi.stop() + # ohmpi.stop() socket.send_string(json.dumps({ 'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': dic['config'] })) cdic = dic['config'] - + """ ohmpi.pardict['nb_electrodes'] = int(cdic['nbElectrodes']) ohmpi.pardict['injection_duration'] = float(cdic['injectionDuration']) @@ -111,7 +112,7 @@ class MyServer(SimpleHTTPRequestHandler): elif dic['command'] == 'getResults': pass elif dic['command'] == 'rsCheck': - #ohmpi.rs_check() + # ohmpi.rs_check() socket.send_string(json.dumps({ 'cmd_id': cmd_id, 'cmd': 'rs_check' @@ -134,11 +135,11 @@ class MyServer(SimpleHTTPRequestHandler): else: # command not found rdic['response'] = 'command not found' - - #rdic['status'] = ohmpi.status - rdic['status'] = 'unknown' # socket_out. + + # rdic['status'] = ohmpi.status + rdic['status'] = 'unknown' # socket_out. # wait for reply - + message = socket.recv() print('+++////', message) rdic['data'] = message @@ -164,7 +165,7 @@ class MyServer(SimpleHTTPRequestHandler): self.wfile.write(bytes(json.dumps(rdic), 'utf8')) -if __name__ == "__main__": +if __name__ == "__main__": webServer = HTTPServer((hostName, serverPort), MyServer) print("Server started http://%s:%s" % (hostName, serverPort)) -- GitLab