Commit 31fd8e0e authored by Arnaud WATLET's avatar Arnaud WATLET
Browse files

fixes merging issues on http_interface.py

Showing with 72 additions and 39 deletions
+72 -39
from http.server import SimpleHTTPRequestHandler, HTTPServer
rom http.server import SimpleHTTPRequestHandler, HTTPServer
import os
import json
import uuid
from config import CONTROL_CONFIG
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
from termcolor import colored
import threading
import pandas as pd
import shutil
import zmq # to write on TCP
import time
import threading
import paho.mqtt.client as mqtt_client
import paho.mqtt.publish as publish
hostName = "0.0.0.0" # for AP mode (not AP-STA)
serverPort = 8080
# https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a
tcp_port = CONTROL_CONFIG['tcp_port']
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}')
print(colored(f'Sending commands and listening on tcp port {tcp_port}.'))
ctrl_broker = MQTT_CONTROL_CONFIG['hostname']
publisher_config = MQTT_CONTROL_CONFIG.copy()
publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
publisher_config.pop('ctrl_topic')
print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker."))
cmd_id = None
rdic = {}
class MyServer(SimpleHTTPRequestHandler):
# because we use SimpleHTTPRequestHandler, we do not need to implement
......@@ -37,9 +40,53 @@ class MyServer(SimpleHTTPRequestHandler):
# with open(os.path.join('.', self.path[1:]), 'r') as f:
# self.wfile.write(bytes(f.read(), "utf-8"))
def __init__(self, request, client_address, server):
super().__init__(request, client_address, server)
# set controller
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except Exception as e:
print(f'Unable to connect control broker: {e}')
print('trying again to connect to control broker...')
time.sleep(2)
trials += 1
if broker_connected:
print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else:
print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
self.controller = None
self.cmd_thread = threading.Thread(target=self._control)
def _control(self):
def on_message(client, userdata, message):
global cmd_id, rdic
command = message.payload.decode('utf-8')
print(f'Received command {command}')
# self.process_commands(command)
if 'reply' in command.keys and command['cmd_id'] == cmd_id :
rdic = command
self.controller.on_message = on_message
self.controller.loop_start()
while True:
time.sleep(.1)
def do_POST(self):
global cmd_id, rdic
cmd_id = uuid.uuid4().hex
global socket
# global socket
# global ohmpiThread, status, run
dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
......@@ -73,36 +120,22 @@ class MyServer(SimpleHTTPRequestHandler):
os.mkdir('data')
elif dic['cmd'] == 'update_settings':
# ohmpi.stop()
socket.send_string(json.dumps({
'cmd_id': cmd_id,
'cmd': 'update_settings',
'args': dic['config']
}))
if 'sequence' in dic['config'].keys() and dic['config']['sequence'] is not None:
sequence = dic['config']['sequence']
dic['config'].pop('sequence', None)
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'args': sequence})
publish.single(payload=payload, **publisher_config)
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': dic['config']})
cdic = dic['config']
"""
ohmpi.pardict['nb_electrodes'] = int(cdic['nbElectrodes'])
ohmpi.pardict['injection_duration'] = float(cdic['injectionDuration'])
ohmpi.pardict['nbr_meas'] = int(cdic['nbMeasurements'])
ohmpi.pardict['nb_stack'] = int(cdic['nbStack'])
ohmpi.pardict['sequence_delay'] = int(cdic['sequenceDelay'])
if cdic['sequence'] != '':
with open('sequence.txt', 'w') as f:
f.write(cdic['sequence'])
ohmpi.read_quad('sequence.txt')
print('new sequence set.')
print('setConfig', ohmpi.pardict)
"""
publish.single(payload=payload, **publisher_config)
elif dic['cmd'] == 'invert':
pass
elif dic['cmd'] == 'getResults':
pass
elif dic['cmd'] == 'rsCheck':
# ohmpi.rs_check()
socket.send_string(json.dumps({
'cmd_id': cmd_id,
'cmd': 'rs_check'
}))
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'rs_check'})
publish.single(payload=payload, **publisher_config)
fnames = sorted([fname for fname in os.listdir('data/') if fname[-7:] == '_rs.csv'])
df = pd.read_csv('data/' + fnames[-1])
ddic = {
......@@ -117,7 +150,7 @@ class MyServer(SimpleHTTPRequestHandler):
os.system('shutdown now -h')
elif dic['cmd'] == 'restart':
print('shutting down...')
os.system('reboot')
os.system('reboot') # NOTE: on machine running the interface? or on rpi?
else:
# command not found
rdic['response'] = 'command not found'
......@@ -126,16 +159,16 @@ class MyServer(SimpleHTTPRequestHandler):
rdic['status'] = 'unknown' # socket_out.
# wait for reply
message = socket.recv()
print('+++////', message)
rdic['data'] = message.decode('utf-8')
#message = socket.recv()
#print('+++////', message)
# rdic['data'] = message.decode('utf-8')
"""
while False:
message = socket.recv()
print(f'Received command: {message}')
e = None
try:
decoded_message = json.loads(message.decode('utf-8'))
decoded_message = json.loads(message))
cmd = decoded_message.pop('cmd', None)
args = decoded_message.pop('args', None)
status = False
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment