Commit 9e6d8fc3 authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

instantiated mqtt in http interface globally

Showing with 34 additions and 28 deletions
+34 -28
......@@ -25,6 +25,33 @@ print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic'
cmd_id = None
rdic = {}
# set controller globally as __init__ seem to be called for each request
controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except Exception as e:
print(f'Unable to connect control broker: {e}')
print('trying again to connect to control broker...')
time.sleep(2)
trials += 1
if broker_connected:
print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else:
print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
controller = None
class MyServer(SimpleHTTPRequestHandler):
# because we use SimpleHTTPRequestHandler, we do not need to implement
# the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to)
......@@ -42,30 +69,8 @@ class MyServer(SimpleHTTPRequestHandler):
def __init__(self, request, client_address, server):
super().__init__(request, client_address, server)
# set controller
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except Exception as e:
print(f'Unable to connect control broker: {e}')
print('trying again to connect to control broker...')
time.sleep(2)
trials += 1
if broker_connected:
print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else:
print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
self.controller = None
global controller
self.controller = controller
self.cmd_thread = threading.Thread(target=self._control)
def _control(self):
......@@ -91,7 +96,7 @@ class MyServer(SimpleHTTPRequestHandler):
# global ohmpiThread, status, run
dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
rdic = {} # response dictionary
if dic['cmd'] == 'run_sequence':
if dic['cmd'] == 'run_multiple_sequences':
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'})
publish.single(payload=payload, **publisher_config)
elif dic['cmd'] == 'interrupt':
......
......@@ -1009,12 +1009,12 @@ class OhmPi(object):
self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False
elif cmd == 'run_sequence':
self.run_sequence_async(cmd_id)
self.run_sequence_async(cmd_id=cmd_id)
while not self.status == 'idle':
time.sleep(0.1)
status = True
elif cmd == 'run_multiple_sequences':
self.run_multiple_sequences(cmd_id)
self.run_multiple_sequences(cmd_id=cmd_id)
while not self.status == 'idle':
time.sleep(0.1)
status = True
......@@ -1127,6 +1127,7 @@ class OhmPi(object):
# self.run = True
if sequence_delay == None:
sequence_delay = self.settings['sequence_delay']
sequence_delay = int(sequence_delay)
self.status = 'running'
def func():
for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring
......@@ -1143,7 +1144,6 @@ class OhmPi(object):
if self.settings["nb_meas"] > 1:
time.sleep(dt) # waiting for next measurement (time-lapse)
self.status = 'idle'
self.thread = threading.Thread(target=func)
self.thread.start()
......@@ -1155,6 +1155,7 @@ class OhmPi(object):
""" Interrupt the acquisition. """
self.status = 'stopping'
if self.thread is not None:
print('joining thread')
self.thread.join()
self.exec_logger.debug(f'Status: {self.status}')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment