Commit d22c2bde authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

Merge branch 'mqtt' of https://gitlab.irstea.fr/reversaal/OhmPi into mqtt

Showing with 849 additions and 362 deletions
+849 -362
C:\Users\reclement\Documents\39_ohmpi\OhmPi\PCB_file_measurement_card\measurement_board\kicad_v5.00\VMN\_autosave-VMN.kicad_sch /home/arnaud/codes/OhmPi/PCB_file_measurement_card/measurement_board/kicad_v5.00/VMN/_autosave-VMN.kicad_sch
No preview for this file type
...@@ -52,8 +52,9 @@ DATA_LOGGING_CONFIG = { ...@@ -52,8 +52,9 @@ DATA_LOGGING_CONFIG = {
# State of Health logging configuration # State of Health logging configuration
SOH_LOGGING_CONFIG = { SOH_LOGGING_CONFIG = {
'file_name': 'soh.log', 'logging_level' : logging.INFO,
'logging_to_console': True, 'logging_to_console': True,
'file_name': 'soh.log',
'max_bytes': 16777216, 'max_bytes': 16777216,
'backup_count': 1024, 'backup_count': 1024,
'when': 'd', 'when': 'd',
...@@ -74,8 +75,11 @@ MQTT_LOGGING_CONFIG = { ...@@ -74,8 +75,11 @@ MQTT_LOGGING_CONFIG = {
'transport': 'tcp', 'transport': 'tcp',
'client_id': f'{OHMPI_CONFIG["id"]}', 'client_id': f'{OHMPI_CONFIG["id"]}',
'exec_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/exec', 'exec_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/exec',
'exec_logging_level': logging.DEBUG,
'data_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/data', 'data_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/data',
'soh_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/soh' 'data_logging_level': DATA_LOGGING_CONFIG['logging_level'],
'soh_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/soh',
'soh_logging_level': SOH_LOGGING_CONFIG['logging_level']
} }
# MQTT control configuration parameters # MQTT control configuration parameters
......
...@@ -22,13 +22,13 @@ settings = { ...@@ -22,13 +22,13 @@ settings = {
} }
cmd_id = uuid.uuid4().hex cmd_id = uuid.uuid4().hex
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': settings}) payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'kwargs': {'settings' : settings}})
print(f'Update settings setup message: {payload} to {publisher_config["topic"]} with config {publisher_config}') print(f'Update settings setup message: {payload} to {publisher_config["topic"]} with config {publisher_config}')
publish.single(payload=payload, **publisher_config) publish.single(payload=payload, **publisher_config)
sequence = [[1, 2, 3, 4]] sequence = [[1, 2, 3, 4]]
cmd_id = uuid.uuid4().hex cmd_id = uuid.uuid4().hex
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'args': sequence}) payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'kwargs': {'sequence': sequence}})
print(f'Set sequence message: {payload} to {publisher_config["topic"]} with config {publisher_config}') print(f'Set sequence message: {payload} to {publisher_config["topic"]} with config {publisher_config}')
publish.single(payload=payload, **publisher_config) publish.single(payload=payload, **publisher_config)
cmd_id = uuid.uuid4().hex cmd_id = uuid.uuid4().hex
...@@ -38,7 +38,7 @@ publish.single(payload=payload, **publisher_config) ...@@ -38,7 +38,7 @@ publish.single(payload=payload, **publisher_config)
for i in range(4): for i in range(4):
cmd_id = uuid.uuid4().hex cmd_id = uuid.uuid4().hex
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'start'}) payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_sequence_async'})
print(f'Publishing message {i}: {payload} to {publisher_config["topic"]} with config {publisher_config}') print(f'Publishing message {i}: {payload} to {publisher_config["topic"]} with config {publisher_config}')
publish.single(payload=payload, **publisher_config) publish.single(payload=payload, **publisher_config)
time.sleep(1) time.sleep(1)
......
...@@ -52,15 +52,18 @@ def setup_loggers(mqtt=True): ...@@ -52,15 +52,18 @@ def setup_loggers(mqtt=True):
if mqtt: if mqtt:
mqtt_settings = MQTT_LOGGING_CONFIG.copy() mqtt_settings = MQTT_LOGGING_CONFIG.copy()
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']] mqtt_exec_logging_level = mqtt_settings.pop('exec_logging_level', logging.DEBUG)
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic', 'data_logging_level',
'soh_logging_level']]
mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['exec_topic']}) mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['exec_topic']})
# TODO: handle the case of MQTT broker down or temporarily unavailable # TODO: handle the case of MQTT broker down or temporarily unavailable
try: try:
mqtt_exec_handler = MQTTHandler(**mqtt_settings) mqtt_exec_handler = MQTTHandler(**mqtt_settings)
mqtt_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level']) mqtt_exec_handler.setLevel(mqtt_exec_logging_level)
mqtt_exec_handler.setFormatter(exec_formatter) mqtt_exec_handler.setFormatter(exec_formatter)
exec_logger.addHandler(mqtt_exec_handler) exec_logger.addHandler(mqtt_exec_handler)
msg+=colored(f"\n\u2611 Publishes execution as {MQTT_LOGGING_CONFIG['exec_topic']} topic on the {MQTT_LOGGING_CONFIG['hostname']} broker", 'blue') msg += colored(f"\n\u2611 Publishes execution as {MQTT_LOGGING_CONFIG['exec_topic']} topic on the "
f"{MQTT_LOGGING_CONFIG['hostname']} broker", 'blue')
except Exception as e: except Exception as e:
msg += colored(f'\nWarning: Unable to connect to exec topic on broker\n{e}', 'yellow') msg += colored(f'\nWarning: Unable to connect to exec topic on broker\n{e}', 'yellow')
mqtt = False mqtt = False
...@@ -89,14 +92,17 @@ def setup_loggers(mqtt=True): ...@@ -89,14 +92,17 @@ def setup_loggers(mqtt=True):
if mqtt: if mqtt:
mqtt_settings = MQTT_LOGGING_CONFIG.copy() mqtt_settings = MQTT_LOGGING_CONFIG.copy()
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']] mqtt_data_logging_level = mqtt_settings.pop('data_logging_level', logging.INFO)
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic', 'exec_logging_level',
'soh_logging_level']]
mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['data_topic']}) mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['data_topic']})
try: try:
mqtt_data_handler = MQTTHandler(**mqtt_settings) mqtt_data_handler = MQTTHandler(**mqtt_settings)
mqtt_data_handler.setLevel(DATA_LOGGING_CONFIG['logging_level']) mqtt_data_handler.setLevel(MQTT_LOGGING_CONFIG['data_logging_level'])
mqtt_data_handler.setFormatter(data_formatter) mqtt_data_handler.setFormatter(data_formatter)
data_logger.addHandler(mqtt_data_handler) data_logger.addHandler(mqtt_data_handler)
msg += colored(f"\n\u2611 Publishes data as {MQTT_LOGGING_CONFIG['data_topic']} topic on the {MQTT_LOGGING_CONFIG['hostname']} broker", 'blue') msg += colored(f"\n\u2611 Publishes data as {MQTT_LOGGING_CONFIG['data_topic']} topic on the "
f"{MQTT_LOGGING_CONFIG['hostname']} broker", 'blue')
except Exception as e: except Exception as e:
msg += colored(f'\nWarning: Unable to connect to data topic on broker\n{e}', 'yellow') msg += colored(f'\nWarning: Unable to connect to data topic on broker\n{e}', 'yellow')
mqtt = False mqtt = False
...@@ -118,12 +124,12 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo ...@@ -118,12 +124,12 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo
exec_logger.info('*** NEW SESSION STARTING ***') exec_logger.info('*** NEW SESSION STARTING ***')
exec_logger.info('****************************') exec_logger.info('****************************')
exec_logger.info('') exec_logger.info('')
exec_logger.debug('Logging level: %s' % exec_logging_level) exec_logger.debug(f'Logging level: {exec_logging_level}')
try: try:
st = statvfs('.') st = statvfs('.')
available_space = st.f_bavail * st.f_frsize / 1024 / 1024 available_space = st.f_bavail * st.f_frsize / 1024 / 1024
exec_logger.info(f'Remaining disk space : {available_space:.1f} MB') exec_logger.info(f'Remaining disk space : {available_space:.1f} MB')
except Exception as e: except Exception as e: # noqa
exec_logger.debug('Unable to get remaining disk space: {e}') exec_logger.debug('Unable to get remaining disk space: {e}')
exec_logger.info('Saving data log to ' + data_log_filename) exec_logger.info('Saving data log to ' + data_log_filename)
config_dict = {'execution logging configuration': json.dumps(EXEC_LOGGING_CONFIG, indent=4), config_dict = {'execution logging configuration': json.dumps(EXEC_LOGGING_CONFIG, indent=4),
......
import paho.mqtt.client as mqtt
from config import MQTT_CONTROL_CONFIG, CONTROL_CONFIG, OHMPI_CONFIG
import time
from queue import Queue
import zmq
ctrl_queue = Queue()
def on_message(client, userdata, message):
global socket
# Send the command
print(f'Sending command {message.payload.decode("utf-8")}')
socket.send(message.payload)
# Get the reply
reply = socket.recv()
print(f'Received reply {message.payload.decode("utf-8")}: {reply}')
mqtt_client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_listener', clean_session=False) # create new instance
print('connecting command listener to broker')
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
mqtt_client.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
mqtt_client.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except:
print('trying again...')
time.sleep(2)
trials += 1
if broker_connected:
print('Subscribing to topic', MQTT_CONTROL_CONFIG['ctrl_topic'])
mqtt_client.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
mqtt_client.on_message = on_message
mqtt_client.loop_start()
context = zmq.Context()
# Socket to talk to server
print("Connecting to ohmpi control server")
socket = context.socket(zmq.REQ)
socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}')
while True:
time.sleep(.1)
else:
print("Unable to connect to control broker")
exit(1)
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment