Commit 1564831d authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Replaces tcp by mqtt to send commands in http_interface. Replies are not received yet

Showing with 71 additions and 47 deletions
+71 -47
...@@ -2,24 +2,28 @@ from http.server import SimpleHTTPRequestHandler, HTTPServer ...@@ -2,24 +2,28 @@ from http.server import SimpleHTTPRequestHandler, HTTPServer
import os import os
import json import json
import uuid import uuid
from config import CONTROL_CONFIG from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
from termcolor import colored from termcolor import colored
import threading
import pandas as pd import pandas as pd
import shutil import shutil
import zmq # to write on TCP import time
import threading
import paho.mqtt.client as mqtt_client
import paho.mqtt.publish as publish
hostName = "0.0.0.0" # for AP mode (not AP-STA) hostName = "0.0.0.0" # for AP mode (not AP-STA)
serverPort = 8080 serverPort = 8080
# https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a # https://gist.github.com/MichaelCurrie/19394abc19abd0de4473b595c0e37a3a
tcp_port = CONTROL_CONFIG['tcp_port'] ctrl_broker = MQTT_CONTROL_CONFIG['hostname']
context = zmq.Context() publisher_config = MQTT_CONTROL_CONFIG.copy()
socket = context.socket(zmq.REQ) publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}') publisher_config.pop('ctrl_topic')
print(colored(f'Sending commands and listening on tcp port {tcp_port}.'))
print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker."))
cmd_id = None
rdic = {}
class MyServer(SimpleHTTPRequestHandler): class MyServer(SimpleHTTPRequestHandler):
# because we use SimpleHTTPRequestHandler, we do not need to implement # because we use SimpleHTTPRequestHandler, we do not need to implement
...@@ -36,25 +40,64 @@ class MyServer(SimpleHTTPRequestHandler): ...@@ -36,25 +40,64 @@ class MyServer(SimpleHTTPRequestHandler):
# with open(os.path.join('.', self.path[1:]), 'r') as f: # with open(os.path.join('.', self.path[1:]), 'r') as f:
# self.wfile.write(bytes(f.read(), "utf-8")) # self.wfile.write(bytes(f.read(), "utf-8"))
# def __init__(self):
# super().__init__(self)
# # set controller
# self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
# print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
# trials = 0
# trials_max = 10
# broker_connected = False
# while trials < trials_max:
# try:
# self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
# MQTT_CONTROL_CONFIG['auth']['password'])
# self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
# trials = trials_max
# broker_connected = True
# except Exception as e:
# print(f'Unable to connect control broker: {e}')
# print('trying again to connect to control broker...')
# time.sleep(2)
# trials += 1
# if broker_connected:
# print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
# self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
# else:
# print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
# self.controller = None
# self.cmd_thread = threading.Thread(target=self._control)
#
# def _control(self):
# def on_message(client, userdata, message):
# global cmd_id, rdic
#
# command = message.payload.decode('utf-8')
# print(f'Received command {command}')
# # self.process_commands(command)
# if 'reply' in command.keys and command['cmd_id'] == cmd_id :
# rdic = command['reply']
#
# self.controller.on_message = on_message
# self.controller.loop_start()
# while True:
# time.sleep(.1)
def do_POST(self): def do_POST(self):
global cmd_id, rdic
cmd_id = uuid.uuid4().hex cmd_id = uuid.uuid4().hex
global socket # global socket
# global ohmpiThread, status, run # global ohmpiThread, status, run
dic = json.loads(self.rfile.read(int(self.headers['Content-Length']))) dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
rdic = {} # response dictionnary rdic = {} # response dictionary
if dic['cmd'] == 'start': if dic['cmd'] == 'start':
#ohmpi.measure() payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'start'})
socket.send_string(json.dumps({ publish.single(payload=payload, **publisher_config)
'cmd_id': cmd_id,
'cmd': 'start'
}))
elif dic['cmd'] == 'stop': elif dic['cmd'] == 'stop':
# ohmpi.stop() # ohmpi.stop()
socket.send_string(json.dumps({ payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'stop'})
'cmd_id': cmd_id, publish.single(payload=payload, **publisher_config)
'cmd': 'stop'
}))
elif dic['cmd'] == 'getData': elif dic['cmd'] == 'getData':
# get all .csv file in data folder # get all .csv file in data folder
fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv']
...@@ -77,36 +120,17 @@ class MyServer(SimpleHTTPRequestHandler): ...@@ -77,36 +120,17 @@ class MyServer(SimpleHTTPRequestHandler):
os.mkdir('data') os.mkdir('data')
elif dic['cmd'] == 'update_settings': elif dic['cmd'] == 'update_settings':
# ohmpi.stop() # ohmpi.stop()
socket.send_string(json.dumps({ payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': dic['config']})
'cmd_id': cmd_id,
'cmd': 'update_settings',
'args': dic['config']
}))
cdic = dic['config'] cdic = dic['config']
publish.single(payload=payload, **publisher_config)
"""
ohmpi.pardict['nb_electrodes'] = int(cdic['nbElectrodes'])
ohmpi.pardict['injection_duration'] = float(cdic['injectionDuration'])
ohmpi.pardict['nbr_meas'] = int(cdic['nbMeasurements'])
ohmpi.pardict['nb_stack'] = int(cdic['nbStack'])
ohmpi.pardict['sequence_delay'] = int(cdic['sequenceDelay'])
if cdic['sequence'] != '':
with open('sequence.txt', 'w') as f:
f.write(cdic['sequence'])
ohmpi.read_quad('sequence.txt')
print('new sequence set.')
print('setConfig', ohmpi.pardict)
"""
elif dic['cmd'] == 'invert': elif dic['cmd'] == 'invert':
pass pass
elif dic['cmd'] == 'getResults': elif dic['cmd'] == 'getResults':
pass pass
elif dic['cmd'] == 'rsCheck': elif dic['cmd'] == 'rsCheck':
# ohmpi.rs_check() # ohmpi.rs_check()
socket.send_string(json.dumps({ payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'rs_check'})
'cmd_id': cmd_id, publish.single(payload=payload, **publisher_config)
'cmd': 'rs_check'
}))
fnames = sorted([fname for fname in os.listdir('data/') if fname[-7:] == '_rs.csv']) fnames = sorted([fname for fname in os.listdir('data/') if fname[-7:] == '_rs.csv'])
df = pd.read_csv('data/' + fnames[-1]) df = pd.read_csv('data/' + fnames[-1])
ddic = { ddic = {
...@@ -121,7 +145,7 @@ class MyServer(SimpleHTTPRequestHandler): ...@@ -121,7 +145,7 @@ class MyServer(SimpleHTTPRequestHandler):
os.system('shutdown now -h') os.system('shutdown now -h')
elif dic['cmd'] == 'restart': elif dic['cmd'] == 'restart':
print('shutting down...') print('shutting down...')
os.system('reboot') os.system('reboot') # NOTE: on machine running the interface? or on rpi?
else: else:
# command not found # command not found
rdic['response'] = 'command not found' rdic['response'] = 'command not found'
...@@ -130,16 +154,16 @@ class MyServer(SimpleHTTPRequestHandler): ...@@ -130,16 +154,16 @@ class MyServer(SimpleHTTPRequestHandler):
rdic['status'] = 'unknown' # socket_out. rdic['status'] = 'unknown' # socket_out.
# wait for reply # wait for reply
message = socket.recv() #message = socket.recv()
print('+++////', message) #print('+++////', message)
rdic['data'] = message.decode('utf-8') # rdic['data'] = message.decode('utf-8')
""" """
while False: while False:
message = socket.recv() message = socket.recv()
print(f'Received command: {message}') print(f'Received command: {message}')
e = None e = None
try: try:
decoded_message = json.loads(message.decode('utf-8')) decoded_message = json.loads(message))
cmd = decoded_message.pop('cmd', None) cmd = decoded_message.pop('cmd', None)
args = decoded_message.pop('args', None) args = decoded_message.pop('args', None)
status = False status = False
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment