Newer
Older
from http.server import SimpleHTTPRequestHandler, HTTPServer
import os
import json
Olivier Kaufmann
committed
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
import pandas as pd
import shutil
Olivier Kaufmann
committed
import time
import threading
import paho.mqtt.client as mqtt_client
import paho.mqtt.publish as publish
hostName = "0.0.0.0" # for AP mode (not AP-STA)
serverPort = 8080
# https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a
Olivier Kaufmann
committed
ctrl_broker = MQTT_CONTROL_CONFIG['hostname']
publisher_config = MQTT_CONTROL_CONFIG.copy()
publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
publisher_config.pop('ctrl_topic')
Olivier Kaufmann
committed
print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker."))
cmd_id = None
rdic = {}
Olivier Kaufmann
committed
class MyServer(SimpleHTTPRequestHandler):
# because we use SimpleHTTPRequestHandler, we do not need to implement
# the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to)
Olivier Kaufmann
committed
# def do_GET(self):
# # normal get for wepages (not so secure!)
# print(self.command)
# print(self.headers)
# print(self.request)
# self.send_response(200)
# self.send_header("Content-type", "text/html")
# self.end_headers()
# with open(os.path.join('.', self.path[1:]), 'r') as f:
# self.wfile.write(bytes(f.read(), "utf-8"))
Olivier Kaufmann
committed
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(self, request, client_address, server):
super().__init__(request, client_address, server)
# set controller
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except Exception as e:
print(f'Unable to connect control broker: {e}')
print('trying again to connect to control broker...')
time.sleep(2)
trials += 1
if broker_connected:
print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else:
print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
self.controller = None
self.cmd_thread = threading.Thread(target=self._control)
def _control(self):
def on_message(client, userdata, message):
global cmd_id, rdic
command = message.payload.decode('utf-8')
print(f'Received command {command}')
# self.process_commands(command)
if 'reply' in command.keys and command['cmd_id'] == cmd_id :
rdic = command
self.controller.on_message = on_message
self.controller.loop_start()
while True:
time.sleep(.1)
Olivier Kaufmann
committed
def do_POST(self):
Olivier Kaufmann
committed
global cmd_id, rdic
Olivier Kaufmann
committed
# global socket
Olivier Kaufmann
committed
# global ohmpiThread, status, run
dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
Olivier Kaufmann
committed
rdic = {} # response dictionary
if dic['cmd'] == 'start':
Olivier Kaufmann
committed
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'start'})
publish.single(payload=payload, **publisher_config)
elif dic['cmd'] == 'stop':
Olivier Kaufmann
committed
# ohmpi.stop()
Olivier Kaufmann
committed
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'stop'})
publish.single(payload=payload, **publisher_config)
elif dic['cmd'] == 'getData':
# get all .csv file in data folder
fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv']
ddic = {}
for fname in fnames:
Olivier Kaufmann
committed
if (fname.replace('.csv', '') not in dic['surveyNames']
and fname != 'readme.txt'
and '_rs' not in fname):
df = pd.read_csv('data/' + fname)
ddic[fname.replace('.csv', '')] = {
'a': df['A'].tolist(),
'b': df['B'].tolist(),
'm': df['M'].tolist(),
'n': df['N'].tolist(),
'rho': df['R [ohm]'].tolist(),
}
rdic['data'] = ddic
elif dic['cmd'] == 'removeData':
shutil.rmtree('data')
os.mkdir('data')
elif dic['cmd'] == 'update_settings':
Olivier Kaufmann
committed
# ohmpi.stop()
if 'sequence' in dic['config'].keys and dic['config']['sequence'] is not None:
sequence = dic['config']['sequence']
dic['config'].pop('sequence', None)
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'args': sequence})
Olivier Kaufmann
committed
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': dic['config']})
cdic = dic['config']
Olivier Kaufmann
committed
publish.single(payload=payload, **publisher_config)
elif dic['cmd'] == 'invert':
elif dic['cmd'] == 'getResults':
elif dic['cmd'] == 'rsCheck':
Olivier Kaufmann
committed
# ohmpi.rs_check()
Olivier Kaufmann
committed
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'rs_check'})
publish.single(payload=payload, **publisher_config)
fnames = sorted([fname for fname in os.listdir('data/') if fname[-7:] == '_rs.csv'])
df = pd.read_csv('data/' + fnames[-1])
ddic = {
'AB': (df['A'].astype('str') + '-' + df['B'].astype(str)).tolist(),
'res': df['RS [kOhm]'].tolist()
}
rdic['data'] = ddic
elif dic['cmd'] == 'download':
shutil.make_archive('data', 'zip', 'data')
elif dic['cmd'] == 'shutdown':
print('shutting down...')
os.system('shutdown now -h')
elif dic['cmd'] == 'restart':
print('shutting down...')
Olivier Kaufmann
committed
os.system('reboot') # NOTE: on machine running the interface? or on rpi?
else:
# command not found
rdic['response'] = 'command not found'
Olivier Kaufmann
committed
# rdic['status'] = ohmpi.status
rdic['status'] = 'unknown' # socket_out.
Olivier Kaufmann
committed
Olivier Kaufmann
committed
#message = socket.recv()
#print('+++////', message)
# rdic['data'] = message.decode('utf-8')
"""
while False:
message = socket.recv()
print(f'Received command: {message}')
e = None
try:
Olivier Kaufmann
committed
decoded_message = json.loads(message))
cmd = decoded_message.pop('cmd', None)
args = decoded_message.pop('args', None)
status = False
e = None
if cmd is not None and cmd_id is decoded_message.pop('cmd_id', None):
print('reply=', decoded_message)
except Exception as e:
print(f'Unable to decode command {message}: {e}')
"""
self.send_response(200)
self.send_header('Content-Type', 'text/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(rdic), 'utf8'))
Olivier Kaufmann
committed
if __name__ == "__main__":
webServer = HTTPServer((hostName, serverPort), MyServer)
print("Server started http://%s:%s" % (hostName, serverPort))
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")