diff --git a/http_interface.py b/http_interface.py index 992118261ed9f05e07523b30fcd67bf17b2ac579..1bb87a0fbc24412fa80e314abe58668837ef4e63 100644 --- a/http_interface.py +++ b/http_interface.py @@ -2,24 +2,28 @@ from http.server import SimpleHTTPRequestHandler, HTTPServer import os import json import uuid -from config import CONTROL_CONFIG +from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG from termcolor import colored -import threading import pandas as pd import shutil -import zmq # to write on TCP +import time +import threading +import paho.mqtt.client as mqtt_client +import paho.mqtt.publish as publish hostName = "0.0.0.0" # for AP mode (not AP-STA) serverPort = 8080 # https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a -tcp_port = CONTROL_CONFIG['tcp_port'] -context = zmq.Context() -socket = context.socket(zmq.REQ) -socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}') -print(colored(f'Sending commands and listening on tcp port {tcp_port}.')) +ctrl_broker = MQTT_CONTROL_CONFIG['hostname'] +publisher_config = MQTT_CONTROL_CONFIG.copy() +publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic'] +publisher_config.pop('ctrl_topic') +print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker.")) +cmd_id = None +rdic = {} class MyServer(SimpleHTTPRequestHandler): # because we use SimpleHTTPRequestHandler, we do not need to implement @@ -36,25 +40,64 @@ class MyServer(SimpleHTTPRequestHandler): # with open(os.path.join('.', self.path[1:]), 'r') as f: # self.wfile.write(bytes(f.read(), "utf-8")) + # def __init__(self): + # super().__init__(self) + # # set controller + # self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance + # print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) + # trials = 0 + # trials_max = 10 + # broker_connected = False + # while trials < trials_max: + # try: + # self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), + # MQTT_CONTROL_CONFIG['auth']['password']) + # self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) + # trials = trials_max + # broker_connected = True + # except Exception as e: + # print(f'Unable to connect control broker: {e}') + # print('trying again to connect to control broker...') + # time.sleep(2) + # trials += 1 + # if broker_connected: + # print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") + # self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) + # else: + # print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") + # self.controller = None + # self.cmd_thread = threading.Thread(target=self._control) + # + # def _control(self): + # def on_message(client, userdata, message): + # global cmd_id, rdic + # + # command = message.payload.decode('utf-8') + # print(f'Received command {command}') + # # self.process_commands(command) + # if 'reply' in command.keys and command['cmd_id'] == cmd_id : + # rdic = command['reply'] + # + # self.controller.on_message = on_message + # self.controller.loop_start() + # while True: + # time.sleep(.1) + def do_POST(self): + global cmd_id, rdic cmd_id = uuid.uuid4().hex - global socket + # global socket # global ohmpiThread, status, run dic = json.loads(self.rfile.read(int(self.headers['Content-Length']))) - rdic = {} # response dictionnary + rdic = {} # response dictionary if dic['cmd'] == 'start': - #ohmpi.measure() - socket.send_string(json.dumps({ - 'cmd_id': cmd_id, - 'cmd': 'start' - })) + payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'start'}) + publish.single(payload=payload, **publisher_config) elif dic['cmd'] == 'stop': # ohmpi.stop() - socket.send_string(json.dumps({ - 'cmd_id': cmd_id, - 'cmd': 'stop' - })) + payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'stop'}) + publish.single(payload=payload, **publisher_config) elif dic['cmd'] == 'getData': # get all .csv file in data folder fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] @@ -77,36 +120,17 @@ class MyServer(SimpleHTTPRequestHandler): os.mkdir('data') elif dic['cmd'] == 'update_settings': # ohmpi.stop() - socket.send_string(json.dumps({ - 'cmd_id': cmd_id, - 'cmd': 'update_settings', - 'args': dic['config'] - })) + payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': dic['config']}) cdic = dic['config'] - - """ - ohmpi.pardict['nb_electrodes'] = int(cdic['nbElectrodes']) - ohmpi.pardict['injection_duration'] = float(cdic['injectionDuration']) - ohmpi.pardict['nbr_meas'] = int(cdic['nbMeasurements']) - ohmpi.pardict['nb_stack'] = int(cdic['nbStack']) - ohmpi.pardict['sequence_delay'] = int(cdic['sequenceDelay']) - if cdic['sequence'] != '': - with open('sequence.txt', 'w') as f: - f.write(cdic['sequence']) - ohmpi.read_quad('sequence.txt') - print('new sequence set.') - print('setConfig', ohmpi.pardict) - """ + publish.single(payload=payload, **publisher_config) elif dic['cmd'] == 'invert': pass elif dic['cmd'] == 'getResults': pass elif dic['cmd'] == 'rsCheck': # ohmpi.rs_check() - socket.send_string(json.dumps({ - 'cmd_id': cmd_id, - 'cmd': 'rs_check' - })) + payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'rs_check'}) + publish.single(payload=payload, **publisher_config) fnames = sorted([fname for fname in os.listdir('data/') if fname[-7:] == '_rs.csv']) df = pd.read_csv('data/' + fnames[-1]) ddic = { @@ -121,7 +145,7 @@ class MyServer(SimpleHTTPRequestHandler): os.system('shutdown now -h') elif dic['cmd'] == 'restart': print('shutting down...') - os.system('reboot') + os.system('reboot') # NOTE: on machine running the interface? or on rpi? else: # command not found rdic['response'] = 'command not found' @@ -130,16 +154,16 @@ class MyServer(SimpleHTTPRequestHandler): rdic['status'] = 'unknown' # socket_out. # wait for reply - message = socket.recv() - print('+++////', message) - rdic['data'] = message.decode('utf-8') + #message = socket.recv() + #print('+++////', message) + # rdic['data'] = message.decode('utf-8') """ while False: message = socket.recv() print(f'Received command: {message}') e = None try: - decoded_message = json.loads(message.decode('utf-8')) + decoded_message = json.loads(message)) cmd = decoded_message.pop('cmd', None) args = decoded_message.pop('args', None) status = False