Commit b14c80fd authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Adds some improvements such as setting mqtt to false in case of unreacheable broker

Showing with 40 additions and 34 deletions
+40 -34
......@@ -52,10 +52,14 @@ def setup_loggers(mqtt=True):
mqtt_settings = MQTT_LOGGING_CONFIG.copy()
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']]
mqtt_settings.update({'topic':MQTT_LOGGING_CONFIG['exec_topic']})
mqtt_exec_handler = MQTTHandler(**mqtt_settings)
mqtt_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level'])
mqtt_exec_handler.setFormatter(exec_formatter)
exec_logger.addHandler(mqtt_exec_handler)
# TODO: handle the case of MQTT broker down or temporarily unavailable
try:
mqtt_exec_handler = MQTTHandler(**mqtt_settings)
mqtt_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level'])
mqtt_exec_handler.setFormatter(exec_formatter)
exec_logger.addHandler(mqtt_exec_handler)
except:
mqtt = False
# Set data logging format and level
log_format = '%(asctime)-15s | %(process)d | %(levelname)s: %(message)s'
......
......@@ -7,7 +7,7 @@ import time
import uuid
client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_controller', clean_session=False) # create new instance
print('connecting to broker')
print('connecting controller to broker')
client.connect(MQTT_CONTROL_CONFIG['hostname'])
client.loop_start()
publisher_config = MQTT_CONTROL_CONFIG.copy()
......
......@@ -20,7 +20,7 @@ def on_message(client, userdata, message):
mqtt_client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_listener', clean_session=False) # create new instance
print('connecting to broker')
print('connecting command listener to broker')
mqtt_client.connect(MQTT_CONTROL_CONFIG['hostname'])
print('Subscribing to topic', MQTT_CONTROL_CONFIG['ctrl_topic'])
mqtt_client.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
......
......@@ -51,6 +51,7 @@ class MQTTHandler(logging.Handler):
cleanly.
"""
msg = self.format(record)
publish.single(self.topic, msg, self.qos, self.retain,
hostname=self.hostname, port=self.port,
client_id=self.client_id, keepalive=self.keepalive,
......
......@@ -744,8 +744,8 @@ class OhmPi(object):
self.switch_mux_on(quad)
# run a measurement
acquired_data = self.run_measurement(quad, self.settings["nb_stack"],
self.settings["injection_duration"])
acquired_data = self.run_measurement(quad, self.settings['nb_stack'],
self.settings['injection_duration'])
# switch mux off
self.switch_mux_off(quad)
......
......@@ -3,37 +3,38 @@ import time
import os
import json
import uuid
#from ohmpi import OhmPi
# from ohmpi import OhmPi
from config import CONTROL_CONFIG
from termcolor import colored
import threading
import pandas as pd
import shutil
import zmq # to write on TCP
import zmq # to write on TCP
#hostName = "raspberrypi.local" # works for AP-STA
#hostName = "192.168.50.1" # fixed IP in AP-STA mode
# hostName = "raspberrypi.local" # works for AP-STA
# hostName = "192.168.50.1" # fixed IP in AP-STA mode
hostName = "0.0.0.0" # for AP mode (not AP-STA)
serverPort = 8080
# https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a
#with open('ohmpi_settings.json') as json_file:
# with open('ohmpi_settings.json') as json_file:
# pardict = json.load(json_file)
#ohmpi = OhmPi(pardict, sequence='dd.txt')
#ohmpi = OhmPi(pardict, sequence='dd16s0no8.txt')
# ohmpi = OhmPi(pardict, sequence='dd.txt')
# ohmpi = OhmPi(pardict, sequence='dd16s0no8.txt')
tcp_port = CONTROL_CONFIG['tcp_port']
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}')
print(colored(f'Sending commands and listenning on tcp port {tcp_port}.'))
print(colored(f'Sending commands and listening on tcp port {tcp_port}.'))
class MyServer(SimpleHTTPRequestHandler):
class MyServer(SimpleHTTPRequestHandler):
# because we use SimpleHTTPRequestHandler, we do not need to implement
# the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to)
# def do_GET(self):
# # normal get for wepages (not so secure!)
# print(self.command)
......@@ -44,22 +45,22 @@ class MyServer(SimpleHTTPRequestHandler):
# self.end_headers()
# with open(os.path.join('.', self.path[1:]), 'r') as f:
# self.wfile.write(bytes(f.read(), "utf-8"))
def do_POST(self):
cmd_id = uuid.uuid4().hex
global socket
#global ohmpiThread, status, run
# global ohmpiThread, status, run
dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
rdic = {} # response dictionnary
rdic = {} # response dictionnary
if dic['command'] == 'start':
#ohmpi.measure()
# ohmpi.measure()
socket.send_string(json.dumps({
'cmd_id': cmd_id,
'command': 'start'
}))
elif dic['command'] == 'stop':
#ohmpi.stop()
# ohmpi.stop()
socket.send_string(json.dumps({
'cmd_id': cmd_id,
'command': 'stop'
......@@ -69,9 +70,9 @@ class MyServer(SimpleHTTPRequestHandler):
fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv']
ddic = {}
for fname in fnames:
if (fname.replace('.csv', '') not in dic['surveyNames']
and fname != 'readme.txt'
and '_rs' not in fname):
if (fname.replace('.csv', '') not in dic['surveyNames']
and fname != 'readme.txt'
and '_rs' not in fname):
df = pd.read_csv('data/' + fname)
ddic[fname.replace('.csv', '')] = {
'a': df['A'].tolist(),
......@@ -85,14 +86,14 @@ class MyServer(SimpleHTTPRequestHandler):
shutil.rmtree('data')
os.mkdir('data')
elif dic['command'] == 'update_settings':
#ohmpi.stop()
# ohmpi.stop()
socket.send_string(json.dumps({
'cmd_id': cmd_id,
'cmd': 'update_settings',
'args': dic['config']
}))
cdic = dic['config']
"""
ohmpi.pardict['nb_electrodes'] = int(cdic['nbElectrodes'])
ohmpi.pardict['injection_duration'] = float(cdic['injectionDuration'])
......@@ -111,7 +112,7 @@ class MyServer(SimpleHTTPRequestHandler):
elif dic['command'] == 'getResults':
pass
elif dic['command'] == 'rsCheck':
#ohmpi.rs_check()
# ohmpi.rs_check()
socket.send_string(json.dumps({
'cmd_id': cmd_id,
'cmd': 'rs_check'
......@@ -134,11 +135,11 @@ class MyServer(SimpleHTTPRequestHandler):
else:
# command not found
rdic['response'] = 'command not found'
#rdic['status'] = ohmpi.status
rdic['status'] = 'unknown' # socket_out.
# rdic['status'] = ohmpi.status
rdic['status'] = 'unknown' # socket_out.
# wait for reply
message = socket.recv()
print('+++////', message)
rdic['data'] = message
......@@ -164,7 +165,7 @@ class MyServer(SimpleHTTPRequestHandler):
self.wfile.write(bytes(json.dumps(rdic), 'utf8'))
if __name__ == "__main__":
if __name__ == "__main__":
webServer = HTTPServer((hostName, serverPort), MyServer)
print("Server started http://%s:%s" % (hostName, serverPort))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment