diff --git a/mqtt_controller.py b/mqtt_controller.py new file mode 100644 index 0000000000000000000000000000000000000000..aadb0f4aeb68d4a4f8db184320c63cf2d973d458 --- /dev/null +++ b/mqtt_controller.py @@ -0,0 +1,40 @@ +import json + +import paho.mqtt.client as mqtt +import paho.mqtt.publish as publish +from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG +import time +import uuid + +client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_controller') # create new instance +print('connecting to broker') +client.connect(MQTT_CONTROL_CONFIG['hostname']) +client.loop_start() +publisher_config = MQTT_CONTROL_CONFIG.copy() +publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic'] +publisher_config.pop('ctrl_topic') +settings = { + 'injection_duration': 0.2, + 'nbr_meas': 100, + 'sequence_delay': 1, + 'nb_stack': 1, + 'export_path': 'data/measurement.csv' + } + +cmd_id = uuid.uuid4().hex +payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': settings}) +print(f'Update settings setup message: {payload} to {publisher_config["topic"]} with config {publisher_config}') +publish.single(payload=payload, **publisher_config) + +sequence = [[1, 2, 3, 4]] +cmd_id = uuid.uuid4().hex +payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'args': sequence}) +print(f'Set sequence message: {payload} to {publisher_config["topic"]} with config {publisher_config}') +publish.single(payload=payload, **publisher_config) +for i in range(4): + cmd_id = uuid.uuid4().hex + payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'start'}) + print(f'Publishing message {i}: {payload} to {publisher_config["topic"]} with config {publisher_config}') + publish.single(payload=payload, **publisher_config) + time.sleep(1) +client.loop_stop() diff --git a/mqtt_interface.py b/mqtt_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..a6b525859301014ce077663d4dfff644a166a3f6 --- /dev/null +++ b/mqtt_interface.py @@ -0,0 +1,37 @@ +import paho.mqtt.client as mqtt +from config import MQTT_CONTROL_CONFIG, CONTROL_CONFIG, OHMPI_CONFIG +import time +from queue import Queue +import zmq + +ctrl_queue = Queue() + + +def on_message(client, userdata, message): + global socket + + # Send the command + print(f'Sending command {message.payload.decode("utf-8")}') + socket.send_string(message.payload.decode("utf-8")) + + # Get the reply. + reply = socket.recv() + print(f'Received reply {message.payload.decode("utf-8")}: {reply}') + + +mqtt_client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_listener') # create new instance +print('connecting to broker') +mqtt_client.connect(MQTT_CONTROL_CONFIG['hostname']) +print('Subscribing to topic', MQTT_CONTROL_CONFIG['ctrl_topic']) +mqtt_client.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) +mqtt_client.on_message = on_message +mqtt_client.loop_start() +context = zmq.Context() + +# Socket to talk to server +print("Connecting to ohmpi control server") +socket = context.socket(zmq.REQ) +socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}') + +while True: + time.sleep(.1)