http_interface.py 9.41 KiB
from http.server import SimpleHTTPRequestHandler, HTTPServer
import os
import json
import uuid
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
from termcolor import colored
import pandas as pd
import shutil
import time
import numpy as np
from io import StringIO
import threading
import paho.mqtt.client as mqtt_client
import paho.mqtt.publish as publish

hostName = "0.0.0.0"  # for AP mode (not AP-STA)
serverPort = 8080

# https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a

ctrl_broker = MQTT_CONTROL_CONFIG['hostname']
publisher_config = MQTT_CONTROL_CONFIG.copy()
publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
publisher_config.pop('ctrl_topic')

print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker."))
cmd_id = None
received = False
rdic = {}


# set controller globally as __init__ seem to be called for each request and so we subscribe again each time (=overhead)
controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_interface_http", clean_session=False)  # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
    try:
        controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
                                        MQTT_CONTROL_CONFIG['auth']['password'])
        controller.connect(MQTT_CONTROL_CONFIG['hostname'])
        trials = trials_max
        broker_connected = True
    except Exception as e:
        print(f'Unable to connect control broker: {e}')
        print('trying again to connect to control broker...')
        time.sleep(2)
        trials += 1
if broker_connected:
    print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
    controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else:
    print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
    controller = None


# start a listener for acknowledgement
def _control():
    def on_message(client, userdata, message):
        global cmd_id, rdic, received

        command = json.loads(message.payload.decode('utf-8'))
        #print('++++', cmd_id, received, command)
        if ('reply' in command.keys()) and (command['cmd_id'] == cmd_id):
            print(f'Acknowledgement reception of command {command} by OhmPi')
           # print('oooooooooook', command['reply'])
            received = True
            #rdic = command

    controller.on_message = on_message
    controller.loop_forever()
    
t = threading.Thread(target=_control)
t.start()


class MyServer(SimpleHTTPRequestHandler):
    # because we use SimpleHTTPRequestHandler, we do not need to implement
    # the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to)

    # def do_GET(self):
    #     # normal get for wepages (not so secure!)
    #     print(self.command)
    #     print(self.headers)
    #     print(self.request)
    #     self.send_response(200)
    #     self.send_header("Content-type", "text/html")
    #     self.end_headers()
    #     with open(os.path.join('.', self.path[1:]), 'r') as f:
    #         self.wfile.write(bytes(f.read(), "utf-8"))

    def __init__(self, request, client_address, server):
        super().__init__(request, client_address, server)
        # global controller, once  # using global variable otherwise, we subscribe to client for EACH request
        # if once:
        #     self.controller = controller
        #     self.cmd_thread = threading.Thread(target=self._control)
        #     self.cmd_thread.start()
        #     once = False


    # we would like to listen to the ackn topic to check our message has been wel received
    # by the OhmPi, however, this won't work as it seems an instance of MyServer is created
    # each time (actually it's not a server but a requestHandler)
    # def _control(self):
    #     def on_message(client, userdata, message):
    #         global cmd_id, rdic

    #         command = json.loads(message.payload.decode('utf-8'))
    #         print(f'Acknowledgement reception of command {command} by OhmPi')
    #         if 'reply' in command.keys() and command['cmd_id'] == cmd_id :
    #             print('oooooooooook', command['reply'])
    #             #rdic = command

    #     self.controller.on_message = on_message
    #     print('starting loop')
    #     self.controller.loop_forever()
    #     print('forever')

    def do_POST(self):
        global cmd_id, rdic, received
        received = False
        cmd_id = uuid.uuid4().hex
        dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
        rdic = {} # response dictionary
        if dic['cmd'] == 'run_multiple_sequences':
            payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'})
            publish.single(payload=payload, **publisher_config)
        elif dic['cmd'] == 'interrupt':
            payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'})
            publish.single(payload=payload, **publisher_config)
        elif dic['cmd'] == 'getData':
            # get all .csv file in data folder
            fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv']
            ddic = {}
            for fname in fnames:
                if ((fname != 'readme.txt')
                    and ('_rs' not in fname)
                    and (fname.replace('.csv', '') not in dic['surveyNames'])):
                    df = pd.read_csv('data/' + fname)
                    ddic[fname.replace('.csv', '')] = {
                        'a': df['A'].tolist(),
                        'b': df['B'].tolist(),
                        'm': df['M'].tolist(),
                        'n': df['N'].tolist(),
                        'rho': df['R [ohm]'].tolist(),
                    }
            rdic['data'] = ddic
        elif dic['cmd'] == 'removeData':
            shutil.rmtree('data')
            os.mkdir('data')
        elif dic['cmd'] == 'update_settings':
            if 'sequence' in dic['config'].keys() and dic['config']['sequence'] is not None:
                sequence = dic['config'].pop('sequence', None)
                sequence = np.loadtxt(StringIO(sequence)).astype(int).tolist()  # list of list
                # we pass the sequence as a list of list as this object is easier to parse for the json.loads()
                # of ohmpi._process_commands()
                payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'kwargs': {'sequence': sequence}})
                print('payload ===', payload)
                publish.single(payload=payload, **publisher_config)
            payload = json.dumps({'cmd_id': cmd_id + '_settings', 'cmd': 'update_settings', 'kwargs': {'config': dic['config']}})
            cdic = dic['config']
            publish.single(payload=payload, **publisher_config)
        elif dic['cmd'] == 'invert':
            pass
        elif dic['cmd'] == 'getResults':
            pass
        elif dic['cmd'] == 'rsCheck':
            payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'rs_check'})
            publish.single(payload=payload, **publisher_config)

        elif dic['cmd'] == 'getRsCheck':
            fnames = sorted([fname for fname in os.listdir('data/') if fname[-7:] == '_rs.csv'])
            if len(fnames) > 0:
                df = pd.read_csv('data/' + fnames[-1])
                ddic = {
                    'AB': (df['A'].astype('str') + '-' + df['B'].astype(str)).tolist(),
                    'res': df['RS [kOhm]'].tolist()
                }
            else:
                ddic = {}
            rdic['data'] = ddic
        elif dic['cmd'] == 'download':
            shutil.make_archive('data', 'zip', 'data')
        elif dic['cmd'] == 'shutdown':
            print('shutting down...')
            os.system('shutdown now -h')
        elif dic['cmd'] == 'restart':
            print('shutting down...')
            os.system('reboot')  # NOTE: on machine running the interface?  or on rpi?
        else:
            # command not found
            rdic['response'] = 'command not found'

        # rdic['status'] = ohmpi.status
        rdic['status'] = 'unknown'  # socket_out.
        # wait for reply

        #message = socket.recv()
        #print('+++////', message)
        # rdic['data'] = message.decode('utf-8')
        """
        while False:
            message = socket.recv()
            print(f'Received command: {message}')
            e = None
            try:
                decoded_message = json.loads(message))
                cmd = decoded_message.pop('cmd', None)
                args = decoded_message.pop('args', None)
                status = False
                e = None
                if cmd is not None and cmd_id is decoded_message.pop('cmd_id', None):
                    print('reply=', decoded_message)
            except Exception as e:
                print(f'Unable to decode command {message}: {e}')
            """
        self.send_response(200)
        self.send_header('Content-Type', 'text/json')
        self.end_headers()
        self.wfile.write(bytes(json.dumps(rdic), 'utf8'))


if __name__ == "__main__":
    webServer = HTTPServer((hostName, serverPort), MyServer)
    print("Server started http://%s:%s" % (hostName, serverPort))

    try:
        webServer.serve_forever()
    except KeyboardInterrupt:
        pass

    webServer.server_close()
    print("Server stopped.")