Commit eeeed89d authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

harmonizing MQTT with web interface

Showing with 143 additions and 72 deletions
+143 -72
...@@ -80,7 +80,7 @@ MQTT_CONTROL_CONFIG = { ...@@ -80,7 +80,7 @@ MQTT_CONTROL_CONFIG = {
'hostname': mqtt_broker, 'hostname': mqtt_broker,
'port': 1883, 'port': 1883,
'qos': 2, 'qos': 2,
'retain': False, 'retain': True,
'keepalive': 60, 'keepalive': 60,
'will': None, 'will': None,
'auth': { 'username': 'mqtt_user', 'password': 'mqtt_password' }, 'auth': { 'username': 'mqtt_user', 'password': 'mqtt_password' },
...@@ -88,5 +88,5 @@ MQTT_CONTROL_CONFIG = { ...@@ -88,5 +88,5 @@ MQTT_CONTROL_CONFIG = {
'protocol': MQTTv31, 'protocol': MQTTv31,
'transport': 'tcp', 'transport': 'tcp',
'client_id': f'{OHMPI_CONFIG["id"]}', 'client_id': f'{OHMPI_CONFIG["id"]}',
'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl' 'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl',
} }
...@@ -23,10 +23,11 @@ publisher_config.pop('ctrl_topic') ...@@ -23,10 +23,11 @@ publisher_config.pop('ctrl_topic')
print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker.")) print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker."))
cmd_id = None cmd_id = None
received = False
rdic = {} rdic = {}
# set controller globally as __init__ seem to be called for each request # set controller globally as __init__ seem to be called for each request and so we subscribe again each time (=overhead)
controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0 trials = 0
...@@ -52,6 +53,26 @@ else: ...@@ -52,6 +53,26 @@ else:
controller = None controller = None
# start a listener for acknowledgement
def _control():
def on_message(client, userdata, message):
global cmd_id, rdic, received
command = json.loads(message.payload.decode('utf-8'))
#print('++++', cmd_id, received, command)
if ('reply' in command.keys()) and (command['cmd_id'] == cmd_id):
print(f'Acknowledgement reception of command {command} by OhmPi')
# print('oooooooooook', command['reply'])
received = True
#rdic = command
controller.on_message = on_message
controller.loop_forever()
t = threading.Thread(target=_control)
t.start()
class MyServer(SimpleHTTPRequestHandler): class MyServer(SimpleHTTPRequestHandler):
# because we use SimpleHTTPRequestHandler, we do not need to implement # because we use SimpleHTTPRequestHandler, we do not need to implement
# the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to) # the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to)
...@@ -69,39 +90,52 @@ class MyServer(SimpleHTTPRequestHandler): ...@@ -69,39 +90,52 @@ class MyServer(SimpleHTTPRequestHandler):
def __init__(self, request, client_address, server): def __init__(self, request, client_address, server):
super().__init__(request, client_address, server) super().__init__(request, client_address, server)
global controller # global controller, once # using global variable otherwise, we subscribe to client for EACH request
self.controller = controller # if once:
self.cmd_thread = threading.Thread(target=self._control) # self.controller = controller
# self.cmd_thread = threading.Thread(target=self._control)
def _control(self): # self.cmd_thread.start()
def on_message(client, userdata, message): # once = False
global cmd_id, rdic
command = message.payload.decode('utf-8') # we would like to listen to the ackn topic to check our message has been wel received
print(f'Received command {command}') # by the OhmPi, however, this won't work as it seems an instance of MyServer is created
# self.process_commands(command) # each time (actually it's not a server but a requestHandler)
if 'reply' in command.keys and command['cmd_id'] == cmd_id : # def _control(self):
rdic = command # def on_message(client, userdata, message):
# global cmd_id, rdic
self.controller.on_message = on_message
self.controller.loop_start() # command = json.loads(message.payload.decode('utf-8'))
while True: # print(f'Acknowledgement reception of command {command} by OhmPi')
time.sleep(.1) # if 'reply' in command.keys() and command['cmd_id'] == cmd_id :
# print('oooooooooook', command['reply'])
# #rdic = command
# self.controller.on_message = on_message
# print('starting loop')
# self.controller.loop_forever()
# print('forever')
def do_POST(self): def do_POST(self):
global cmd_id, rdic global cmd_id, rdic, received
received = False
cmd_id = uuid.uuid4().hex cmd_id = uuid.uuid4().hex
# global socket
# global ohmpiThread, status, run
dic = json.loads(self.rfile.read(int(self.headers['Content-Length']))) dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
#print('++', dic, cmd_id)
rdic = {} # response dictionary rdic = {} # response dictionary
if dic['cmd'] == 'run_multiple_sequences': if dic['cmd'] == 'run_multiple_sequences':
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'}) payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'})
#print('-- payload...', end='')
publish.single(payload=payload, **publisher_config) publish.single(payload=payload, **publisher_config)
#print('published!')
elif dic['cmd'] == 'interrupt': elif dic['cmd'] == 'interrupt':
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'}) payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'})
#for i in range(10):
publish.single(payload=payload, **publisher_config) publish.single(payload=payload, **publisher_config)
# time.sleep(.5)
# if received:
# break
elif dic['cmd'] == 'getData': elif dic['cmd'] == 'getData':
print(dic) print(dic)
# get all .csv file in data folder # get all .csv file in data folder
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
<div class="form-check"> <div class="form-check">
<input id="dataRetrievalCheck" class="form-check-input" type="checkbox" value=""> <input id="dataRetrievalCheck" class="form-check-input" type="checkbox" value="">
<label class="form-check-label" for="dataRetrievalCheck"> <label class="form-check-label" for="dataRetrievalCheck">
Automaticaly get data every 1 secondStart Automaticaly get data every 1 second
</label> </label>
</div> </div>
<div id='output'>Status: idle</div> <div id='output'>Status: idle</div>
...@@ -421,18 +421,20 @@ ...@@ -421,18 +421,20 @@
// update list of quadrupoles if any // update list of quadrupoles if any
if (quads.length == 0) { if (quads.length == 0) {
console.log('updating list of quadrupoles') console.log('updating list of quadrupoles')
let df = data[surveyNames[0]] if (surveyNames.length > 0) {
let quadSelect = document.getElementById('quadSelect') let df = data[surveyNames[0]]
quadSelect.innerHTML = '' let quadSelect = document.getElementById('quadSelect')
for (let i = 0; i < df['a'].length; i++) { quadSelect.innerHTML = ''
quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]] for (let i = 0; i < df['a'].length; i++) {
quads.push(quad) quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]]
let option = document.createElement('option') quads.push(quad)
option.value = quad.join(', ') let option = document.createElement('option')
option.innerText = quad.join(', ') option.value = quad.join(', ')
quadSelect.appendChild(option) option.innerText = quad.join(', ')
quadSelect.appendChild(option)
}
console.log('quads=', quads)
} }
console.log('quads=', quads)
} }
// update time-serie figure // update time-serie figure
......
...@@ -19,7 +19,6 @@ from io import StringIO ...@@ -19,7 +19,6 @@ from io import StringIO
from datetime import datetime from datetime import datetime
from termcolor import colored from termcolor import colored
import threading import threading
import paho.mqtt.client as mqtt_client
from logging_setup import setup_loggers from logging_setup import setup_loggers
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
...@@ -80,29 +79,55 @@ class OhmPi(object): ...@@ -80,29 +79,55 @@ class OhmPi(object):
print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue')) print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue'))
# set controller # set controller
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance self.mqtt = False
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) self.cmd_id = None
trials = 0 if mqtt:
trials_max = 10 self.mqtt = True
broker_connected = False import paho.mqtt.client as mqtt_client # if we don't use MQTT but just Python API, we don't need to install it to start the ohmpi.py
while trials < trials_max: import paho.mqtt.publish as publish
try: self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
MQTT_CONTROL_CONFIG['auth']['password']) trials = 0
self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) trials_max = 10
trials = trials_max broker_connected = False
broker_connected = True while trials < trials_max:
except Exception as e: try:
self.exec_logger.debug(f'Unable to connect control broker: {e}') self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
self.exec_logger.info('trying again to connect to control broker...') MQTT_CONTROL_CONFIG['auth']['password'])
time.sleep(2) self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials += 1 trials = trials_max
if broker_connected: broker_connected = True
self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") except Exception as e:
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) self.exec_logger.debug(f'Unable to connect control broker: {e}')
else: self.exec_logger.info('trying again to connect to control broker...')
self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") time.sleep(2)
self.controller = None trials += 1
if broker_connected:
self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
# MQTT does not ensure that the message is delivered to the subscribed client
# this is important for us as we want to ensure the command has been sent AND
# received. Hence, upon reception of a command, we publish a message acknowledgement
publisher_config = MQTT_CONTROL_CONFIG.copy()
publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
publisher_config.pop('ctrl_topic')
def on_message(client, userdata, message):
command = message.payload.decode('utf-8')
dic = json.loads(command)
if dic['cmd_id'] != self.cmd_id:
self.cmd_id = dic['cmd_id']
self.exec_logger.debug(f'Received command {command}')
payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'})
publish.single(payload=payload, **publisher_config)
self._process_commands(command)
self.controller.on_message = on_message
else:
self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
self.controller = None
# read in hardware parameters (config.py) # read in hardware parameters (config.py)
self._read_hardware_config() self._read_hardware_config()
...@@ -164,10 +189,11 @@ class OhmPi(object): ...@@ -164,10 +189,11 @@ class OhmPi(object):
self.pin1.direction = Direction.OUTPUT self.pin1.direction = Direction.OUTPUT
self.pin1.value = False self.pin1.value = False
# Starts the command processing thread if False:
self.cmd_listen = True # Starts the command processing thread
self.cmd_thread = threading.Thread(target=self._control) self.cmd_listen = True
self.cmd_thread.start() self.cmd_thread = threading.Thread(target=self._control)
self.cmd_thread.start()
@property @property
def sequence(self): def sequence(self):
...@@ -184,8 +210,9 @@ class OhmPi(object): ...@@ -184,8 +210,9 @@ class OhmPi(object):
else: else:
self.use_mux = False self.use_mux = False
self._sequence = sequence self._sequence = sequence
def _control(self): def _control(self): # ISSUE: somehow not ALL message are catch by this method in a thread
# this might be due to the thread... -> that means we can miss commands!
def on_message(client, userdata, message): def on_message(client, userdata, message):
command = message.payload.decode('utf-8') command = message.payload.decode('utf-8')
self.exec_logger.debug(f'Received command {command}') self.exec_logger.debug(f'Received command {command}')
...@@ -989,6 +1016,7 @@ class OhmPi(object): ...@@ -989,6 +1016,7 @@ class OhmPi(object):
------- -------
""" """
print('yyyy', command)
try: try:
cmd_id = None cmd_id = None
decoded_message = json.loads(command) decoded_message = json.loads(command)
...@@ -1008,15 +1036,18 @@ class OhmPi(object): ...@@ -1008,15 +1036,18 @@ class OhmPi(object):
except Exception as e: except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}') self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False status = False
elif cmd == 'run_sequence': elif cmd == 'run_sequence':
self.run_sequence(cmd_id=cmd_id)
elif cmd == 'run_sequence_async':
self.run_sequence_async(cmd_id=cmd_id) self.run_sequence_async(cmd_id=cmd_id)
while not self.status == 'idle': #while not self.status == 'idle': # idem for async, we need to return immediately otherwise
time.sleep(0.1) # the interrupt command cannot be processed
# time.sleep(0.1)
status = True status = True
elif cmd == 'run_multiple_sequences': elif cmd == 'run_multiple_sequences':
self.run_multiple_sequences(cmd_id=cmd_id) self.run_multiple_sequences(cmd_id=cmd_id)
while not self.status == 'idle': #while not self.status == 'idle': # we cannot do that as it's supposed to be an asynchrone command
time.sleep(0.1) # time.sleep(0.1)
status = True status = True
elif cmd == 'interrupt': elif cmd == 'interrupt':
self.interrupt() self.interrupt()
...@@ -1200,3 +1231,7 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S")) ...@@ -1200,3 +1231,7 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
# for testing # for testing
if __name__ == "__main__": if __name__ == "__main__":
ohmpi = OhmPi(settings=OHMPI_CONFIG['settings']) ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
def func():
ohmpi.controller.loop_forever()
t = threading.Thread(target=func)
t.start()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment