diff --git a/architecture.pdf b/architecture.pdf index a1025d0dccfa2174568c747a27517452214cd4bf..9c53a687468d97f5f0494d70b9e4fbf0cb3b9887 100644 Binary files a/architecture.pdf and b/architecture.pdf differ diff --git a/config.py b/config.py index 03ead8f48cfd6974218affa1e25fe1d0c0d895ca..8e206ce110071c5c746a4f832744e857f583184a 100644 --- a/config.py +++ b/config.py @@ -52,8 +52,9 @@ DATA_LOGGING_CONFIG = { # State of Health logging configuration SOH_LOGGING_CONFIG = { - 'file_name': 'soh.log', + 'logging_level' : logging.INFO, 'logging_to_console': True, + 'file_name': 'soh.log', 'max_bytes': 16777216, 'backup_count': 1024, 'when': 'd', @@ -74,8 +75,11 @@ MQTT_LOGGING_CONFIG = { 'transport': 'tcp', 'client_id': f'{OHMPI_CONFIG["id"]}', 'exec_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/exec', + 'exec_logging_level': logging.DEBUG, 'data_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/data', - 'soh_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/soh' + 'data_logging_level': DATA_LOGGING_CONFIG['logging_level'], + 'soh_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/soh', + 'soh_logging_level': SOH_LOGGING_CONFIG['logging_level'] } # MQTT control configuration parameters diff --git a/examples/basic_ohmpi_flows_node-red.json b/examples/basic_ohmpi_flows_node-red.json new file mode 100644 index 0000000000000000000000000000000000000000..2cdf5c89c42d46bb1d079a260d169214b5072505 --- /dev/null +++ b/examples/basic_ohmpi_flows_node-red.json @@ -0,0 +1 @@ +[{"id":"61c1655c50bd371c","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"2d6881abe9336fbf","type":"mqtt in","z":"61c1655c50bd371c","name":"","topic":"ohmpi_0001/exec","qos":"2","datatype":"auto-detect","broker":"6ae7e77e.04c64","nl":false,"rap":false,"inputs":0,"x":390,"y":40,"wires":[["02e94bb48ce2cded"]]},{"id":"e2590e574c551cb9","type":"mqtt in","z":"61c1655c50bd371c","name":"","topic":"ohmpi_0001/data","qos":"2","datatype":"auto","broker":"6ae7e77e.04c64","nl":false,"rap":false,"inputs":0,"x":380,"y":140,"wires":[["b9a9d56fd4fb0b8c"]]},{"id":"e2c109f78f9e714c","type":"mqtt out","z":"61c1655c50bd371c","name":"MQTT ctrl","topic":"ohmpi_0001/ctrl","qos":"2","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"6ae7e77e.04c64","x":820,"y":340,"wires":[]},{"id":"e1d067d8532ff36b","type":"mqtt in","z":"61c1655c50bd371c","name":"","topic":"ohmpi_0001/soh","qos":"2","datatype":"auto","broker":"6ae7e77e.04c64","nl":false,"rap":false,"inputs":0,"x":380,"y":240,"wires":[["f5c9dacaafac51d0"]]},{"id":"a7a6f6068e01c7eb","type":"ui_button","z":"61c1655c50bd371c","name":"Run sequence","group":"142ad6ae.d55e29","order":1,"width":"1","height":"1","passthru":false,"label":"⏺","tooltip":"run sequence","color":"red","bgcolor":"lightgrey","className":"","icon":"","payload":"{\"cmd_id\" :\"0\", \"cmd\":\"run_sequence_async\"}","payloadType":"str","topic":"topic","topicType":"msg","x":380,"y":340,"wires":[["e2c109f78f9e714c"]]},{"id":"d82912d9c5b122fe","type":"ui_button","z":"61c1655c50bd371c","name":"Interrupt","group":"142ad6ae.d55e29","order":2,"width":"1","height":"1","passthru":false,"label":" ◾","tooltip":"interrupt sequence","color":"black","bgcolor":"lightgrey","className":"","icon":"","payload":"{\"cmd_id\" :\"0\", \"cmd\":\"interrupt\"}","payloadType":"str","topic":"topic","topicType":"msg","x":360,"y":400,"wires":[["e2c109f78f9e714c"]]},{"id":"02e94bb48ce2cded","type":"ui_text","z":"61c1655c50bd371c","group":"64a75353.37700c","order":2,"width":"16","height":"3","name":"MQTT exec","label":"Execution","format":"{{msg.payload}}","layout":"row-spread","className":"","x":830,"y":40,"wires":[]},{"id":"b9a9d56fd4fb0b8c","type":"ui_text","z":"61c1655c50bd371c","group":"64a75353.37700c","order":3,"width":"16","height":"3","name":"MQTT Data","label":"Data","format":"{{msg.payload}}","layout":"row-spread","className":"","x":830,"y":140,"wires":[]},{"id":"f5c9dacaafac51d0","type":"ui_text","z":"61c1655c50bd371c","group":"64a75353.37700c","order":4,"width":0,"height":0,"name":"MQTT SOH","label":"SOH","format":"{{msg.payload}}","layout":"row-spread","className":"","x":830,"y":240,"wires":[]},{"id":"329591d611aa2704","type":"ui_button","z":"61c1655c50bd371c","name":"","group":"64a75353.37700c","order":1,"width":0,"height":0,"passthru":false,"label":"clear messages","tooltip":"","color":"","bgcolor":"","className":"","icon":"","payload":" ","payloadType":"str","topic":"topic","topicType":"msg","x":580,"y":160,"wires":[["f5c9dacaafac51d0","b9a9d56fd4fb0b8c","02e94bb48ce2cded"]]},{"id":"25d69085f401beae","type":"ui_dropdown","z":"61c1655c50bd371c","name":"","label":"command","tooltip":"","place":"Select option","group":"142ad6ae.d55e29","order":5,"width":0,"height":0,"passthru":true,"multiple":false,"options":[{"label":"","value":"load_sequence","type":"str"},{"label":"","value":"reset_mux","type":"str"},{"label":"","value":"set_sequence","type":"str"},{"label":"","value":"update_settings","type":"str"}],"payload":"","topic":"command","topicType":"str","className":"","x":140,"y":480,"wires":[["182249692ee7502c"]]},{"id":"edbd2d507fbf085a","type":"ui_text_input","z":"61c1655c50bd371c","name":"","label":"kwargs","tooltip":"","group":"142ad6ae.d55e29","order":5,"width":0,"height":0,"passthru":false,"mode":"text","delay":"0","topic":"kwargs","sendOnBlur":true,"className":"","topicType":"str","x":140,"y":600,"wires":[["679471976db01c0e"]]},{"id":"679471976db01c0e","type":"json","z":"61c1655c50bd371c","name":"","property":"payload","action":"","pretty":false,"x":290,"y":600,"wires":[["b11654144c63d1b2"]]},{"id":"435a126aec0c424c","type":"debug","z":"61c1655c50bd371c","name":"debug 5","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":640,"y":480,"wires":[]},{"id":"45c95568bb348470","type":"join","z":"61c1655c50bd371c","name":"","mode":"custom","build":"merged","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":650,"y":540,"wires":[["41122cfe96dd471a","e2c109f78f9e714c"]]},{"id":"41122cfe96dd471a","type":"ui_text","z":"61c1655c50bd371c","group":"142ad6ae.d55e29","order":6,"width":"2","height":"3","name":"","label":"Command to send","format":"{{msg.payload}}","layout":"row-spread","className":"","x":870,"y":600,"wires":[]},{"id":"182249692ee7502c","type":"function","z":"61c1655c50bd371c","name":"set cmd","func":"var newMsg = { payload: {\"cmd\": msg.payload }};\nreturn newMsg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":300,"y":480,"wires":[["435a126aec0c424c","45c95568bb348470"]]},{"id":"1549f01b20537e60","type":"ui_button","z":"61c1655c50bd371c","name":"","group":"142ad6ae.d55e29","order":7,"width":0,"height":0,"passthru":false,"label":"Send command","tooltip":"","color":"","bgcolor":"","className":"","icon":"","payload":"","payloadType":"str","topic":"topic","topicType":"msg","x":160,"y":700,"wires":[["c81857e22b65ef26","ae78c7b9e5aaeede"]]},{"id":"d4f6486f114c987d","type":"change","z":"61c1655c50bd371c","name":"","rules":[{"t":"set","p":"complete","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":590,"y":700,"wires":[["45c95568bb348470"]]},{"id":"b11654144c63d1b2","type":"function","z":"61c1655c50bd371c","name":"set kwargs","func":"var newMsg = { payload: {\"kwargs\": msg.payload }};\nreturn newMsg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":450,"y":600,"wires":[["45c95568bb348470"]]},{"id":"c81857e22b65ef26","type":"uuid","z":"61c1655c50bd371c","uuidVersion":"v1","namespaceType":"","namespace":"","namespaceCustom":"","name":"","field":"payload","fieldType":"msg","x":310,"y":660,"wires":[["1f97edbce5f88461"]]},{"id":"ae78c7b9e5aaeede","type":"delay","z":"61c1655c50bd371c","name":"","pauseType":"delay","timeout":"250","timeoutUnits":"milliseconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":390,"y":700,"wires":[["d4f6486f114c987d"]]},{"id":"1f97edbce5f88461","type":"function","z":"61c1655c50bd371c","name":"set cmd_id","func":"var newMsg = { payload: {\"cmd_id\": msg.payload }};\nreturn newMsg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":490,"y":660,"wires":[["45c95568bb348470"]]},{"id":"6ae7e77e.04c64","type":"mqtt-broker","name":"ohmpi_mqtt_broker","broker":"mg3d-dev.umons.ac.be","port":"1883","clientid":"","autoConnect":true,"usetls":false,"compatmode":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""},{"id":"142ad6ae.d55e29","type":"ui_group","name":"Commands","tab":"5d888f29.07334","order":1,"disp":true,"width":"6","collapse":false,"className":""},{"id":"64a75353.37700c","type":"ui_group","name":"Messages","tab":"5d888f29.07334","order":2,"disp":true,"width":"16","collapse":true},{"id":"5d888f29.07334","type":"ui_tab","name":"Simple OhmPi controller","icon":"dashboard","disabled":false,"hidden":false}] \ No newline at end of file diff --git a/mqtt_controller.py b/examples/mqtt_controller.py similarity index 92% rename from mqtt_controller.py rename to examples/mqtt_controller.py index dd2e6a5c21636c82faa5294c961aa331d8137a65..a9d42e117e4ea2b1c877297bbc3086aeab882287 100644 --- a/mqtt_controller.py +++ b/examples/mqtt_controller.py @@ -22,13 +22,13 @@ settings = { } cmd_id = uuid.uuid4().hex -payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'args': settings}) +payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'update_settings', 'kwargs': {'settings' : settings}}) print(f'Update settings setup message: {payload} to {publisher_config["topic"]} with config {publisher_config}') publish.single(payload=payload, **publisher_config) sequence = [[1, 2, 3, 4]] cmd_id = uuid.uuid4().hex -payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'args': sequence}) +payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'set_sequence', 'kwargs': {'sequence': sequence}}) print(f'Set sequence message: {payload} to {publisher_config["topic"]} with config {publisher_config}') publish.single(payload=payload, **publisher_config) cmd_id = uuid.uuid4().hex @@ -38,7 +38,7 @@ publish.single(payload=payload, **publisher_config) for i in range(4): cmd_id = uuid.uuid4().hex - payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'start'}) + payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_sequence_async'}) print(f'Publishing message {i}: {payload} to {publisher_config["topic"]} with config {publisher_config}') publish.single(payload=payload, **publisher_config) time.sleep(1) diff --git a/index-mqtt.html b/index-mqtt.html index 1ee100c304494f83000948e4503bf3b56eab7992..070d4fe13ea81700590c04d23493fbab8dcb512a 100755 --- a/index-mqtt.html +++ b/index-mqtt.html @@ -153,6 +153,7 @@ let port = 9001 let clientId = 'ohmpi_0001_html' let message = null + let msg = '' // create client client = new Paho.MQTT.Client(hostname, port, clientId); @@ -182,7 +183,13 @@ let payload = message.payloadString if (message.topic == topic_data) { // process data + msg = payload // for accessing the variable from the console console.log('DATA', payload) + let ddic = JSON.parse(payload.split('INFO:')[1]) + + // check cmd_id is any + processData(ddic) + // usually these don't have a cmd_id so we are not sure when } else if (message.topic == topic_exec) { @@ -463,105 +470,110 @@ // getData function getData() { sendCommand(JSON.stringify({ - 'cmd': 'getData', - 'surveyNames': Object.keys(data).slice(0, -1) + 'cmd': 'get_data', + 'survey_names': Object.keys(data).slice(0, -1) // last survey is often partial so we download it again - }), function(ddic) { - // update status - output.innerHTML = 'Status: ' + ddic['status'] - - // update data dic with new data - data = { // destructuring assignement (magic! :o) - ...data, - ...ddic['data'] // value from second dic are preferred - } - - // dropdown with number of surveys and +++ - let surveyNames = Object.keys(data).sort() + }), console.log('processData(ddic)') + ) + } - // remove listener as we will replace the choices - surveySelect.removeEventListener('change', surveySelectFunc) - surveySelect.innerHTML = '' // clearing all child nodes + // processData + function processData(ddic) { + // update status + output.innerHTML = 'Status: ' + ddic['status'] - // add choices again - for (let surveyName of surveyNames) { + // update data dic with new data + data = { // destructuring assignement (magic! :o) + ...data, + ...ddic['data'] // value from second dic are preferred + } + + // dropdown with number of surveys and +++ + let surveyNames = Object.keys(data).sort() + + // remove listener as we will replace the choices + surveySelect.removeEventListener('change', surveySelectFunc) + surveySelect.innerHTML = '' // clearing all child nodes + + // add choices again + for (let surveyName of surveyNames) { + let option = document.createElement('option') + option.innerText = surveyName + option.value = surveyName + surveySelect.appendChild(option) + } + + // listener again + surveySelect.addEventListener('change', surveySelectFunc) + + // plot last one by default + surveySelect.value = surveyNames[surveyNames.length - 1] + + // call the function directly + // (as progammatically chaging the value does not trigger the event) + surveySelectFunc({'target': surveySelect}) + + // update list of quadrupoles if any + if (quads.length == 0) { + console.log('updating list of quadrupoles') + let df = data[surveyNames[0]] + let quadSelect = document.getElementById('quadSelect') + quadSelect.innerHTML = '' + for (let i = 0; i < df['a'].length; i++) { + quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]] + quads.push(quad) let option = document.createElement('option') - option.innerText = surveyName - option.value = surveyName - surveySelect.appendChild(option) + option.value = quad.join(', ') + option.innerText = quad.join(', ') + quadSelect.appendChild(option) } + console.log('quads=', quads) + } - // listener again - surveySelect.addEventListener('change', surveySelectFunc) - - // plot last one by default - surveySelect.value = surveyNames[surveyNames.length - 1] - - // call the function directly - // (as progammatically chaging the value does not trigger the event) - surveySelectFunc({'target': surveySelect}) - - // update list of quadrupoles if any - if (quads.length == 0) { - console.log('updating list of quadrupoles') - let df = data[surveyNames[0]] - let quadSelect = document.getElementById('quadSelect') - quadSelect.innerHTML = '' - for (let i = 0; i < df['a'].length; i++) { - quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]] - quads.push(quad) - let option = document.createElement('option') - option.value = quad.join(', ') - option.innerText = quad.join(', ') - quadSelect.appendChild(option) - } - console.log('quads=', quads) + // update time-serie figure + if (squads.length > 0) { + + // transform all surveyNames to datetime + let xt = [] + for (surveyName of surveyNames) { + let a = surveyName.split('_').slice(-1)[0] + xt.push(a.slice(0, 4) + '-' + + a.slice(4, 6) + '-' + + a.slice(6, 8) + ' ' + + a.slice(9, 11) + ':' + + a.slice(11, 13) + ':' + + a.slice(13, 15)) } - - // update time-serie figure - if (squads.length > 0) { - - // transform all surveyNames to datetime - let xt = [] - for (surveyName of surveyNames) { - let a = surveyName.split('_').slice(-1)[0] - xt.push(a.slice(0, 4) + '-' - + a.slice(4, 6) + '-' - + a.slice(6, 8) + ' ' - + a.slice(9, 11) + ':' - + a.slice(11, 13) + ':' - + a.slice(13, 15)) - } - //console.log(xt) - - // create one new trace per selected quadrupole - for (let k = 0; k < squads.length; k++) { - squad = squads[k] - let x = [] - let y = [] - for (let i = 0; i < surveyNames.length; i++) { - df = data[surveyNames[i]] - for (let j = 0; j < df['a'].length; j++) { - if (df['a'][j] == squad[0] - && df['b'][j] == squad[1] - && df['m'][j] == squad[2] - && df['n'][j] == squad[3]) { - y.push(df['rho'][j]) - x.push(xt[i]) - break - } + //console.log(xt) + + // create one new trace per selected quadrupole + for (let k = 0; k < squads.length; k++) { + squad = squads[k] + let x = [] + let y = [] + for (let i = 0; i < surveyNames.length; i++) { + df = data[surveyNames[i]] + for (let j = 0; j < df['a'].length; j++) { + if (df['a'][j] == squad[0] + && df['b'][j] == squad[1] + && df['m'][j] == squad[2] + && df['n'][j] == squad[3]) { + y.push(df['rho'][j]) + x.push(xt[i]) + break } } - - // update trace dictionnary - tdata[k]['x'] = x - tdata[k]['y'] = y } - //console.log(tdata) - Plotly.redraw('ts') + + // update trace dictionnary + tdata[k]['x'] = x + tdata[k]['y'] = y } - }) + //console.log(tdata) + Plotly.redraw('ts') + } } + let getDataBtn = document.getElementById('getDataBtn') getDataBtn.addEventListener('click', getData) diff --git a/logging_setup.py b/logging_setup.py index e0ced5e8f2d1ba14a9c6e89be15f8cd602187eb9..b80d01fcca272dc652418df4e520314e2769ee29 100644 --- a/logging_setup.py +++ b/logging_setup.py @@ -52,15 +52,18 @@ def setup_loggers(mqtt=True): if mqtt: mqtt_settings = MQTT_LOGGING_CONFIG.copy() - [mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']] + mqtt_exec_logging_level = mqtt_settings.pop('exec_logging_level', logging.DEBUG) + [mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic', 'data_logging_level', + 'soh_logging_level']] mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['exec_topic']}) # TODO: handle the case of MQTT broker down or temporarily unavailable try: mqtt_exec_handler = MQTTHandler(**mqtt_settings) - mqtt_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level']) + mqtt_exec_handler.setLevel(mqtt_exec_logging_level) mqtt_exec_handler.setFormatter(exec_formatter) exec_logger.addHandler(mqtt_exec_handler) - msg+=colored(f"\n\u2611 Publishes execution as {MQTT_LOGGING_CONFIG['exec_topic']} topic on the {MQTT_LOGGING_CONFIG['hostname']} broker", 'blue') + msg += colored(f"\n\u2611 Publishes execution as {MQTT_LOGGING_CONFIG['exec_topic']} topic on the " + f"{MQTT_LOGGING_CONFIG['hostname']} broker", 'blue') except Exception as e: msg += colored(f'\nWarning: Unable to connect to exec topic on broker\n{e}', 'yellow') mqtt = False @@ -89,14 +92,17 @@ def setup_loggers(mqtt=True): if mqtt: mqtt_settings = MQTT_LOGGING_CONFIG.copy() - [mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']] + mqtt_data_logging_level = mqtt_settings.pop('data_logging_level', logging.INFO) + [mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic', 'exec_logging_level', + 'soh_logging_level']] mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['data_topic']}) try: mqtt_data_handler = MQTTHandler(**mqtt_settings) - mqtt_data_handler.setLevel(DATA_LOGGING_CONFIG['logging_level']) + mqtt_data_handler.setLevel(MQTT_LOGGING_CONFIG['data_logging_level']) mqtt_data_handler.setFormatter(data_formatter) data_logger.addHandler(mqtt_data_handler) - msg += colored(f"\n\u2611 Publishes data as {MQTT_LOGGING_CONFIG['data_topic']} topic on the {MQTT_LOGGING_CONFIG['hostname']} broker", 'blue') + msg += colored(f"\n\u2611 Publishes data as {MQTT_LOGGING_CONFIG['data_topic']} topic on the " + f"{MQTT_LOGGING_CONFIG['hostname']} broker", 'blue') except Exception as e: msg += colored(f'\nWarning: Unable to connect to data topic on broker\n{e}', 'yellow') mqtt = False @@ -118,12 +124,12 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo exec_logger.info('*** NEW SESSION STARTING ***') exec_logger.info('****************************') exec_logger.info('') - exec_logger.debug('Logging level: %s' % exec_logging_level) + exec_logger.debug(f'Logging level: {exec_logging_level}') try: st = statvfs('.') available_space = st.f_bavail * st.f_frsize / 1024 / 1024 exec_logger.info(f'Remaining disk space : {available_space:.1f} MB') - except Exception as e: + except Exception as e: # noqa exec_logger.debug('Unable to get remaining disk space: {e}') exec_logger.info('Saving data log to ' + data_log_filename) config_dict = {'execution logging configuration': json.dumps(EXEC_LOGGING_CONFIG, indent=4), diff --git a/mqtt_interface.py b/mqtt_interface.py deleted file mode 100644 index f90a6ae0df2f3bb583e47541c03127f5c74aae81..0000000000000000000000000000000000000000 --- a/mqtt_interface.py +++ /dev/null @@ -1,53 +0,0 @@ -import paho.mqtt.client as mqtt -from config import MQTT_CONTROL_CONFIG, CONTROL_CONFIG, OHMPI_CONFIG -import time -from queue import Queue -import zmq - -ctrl_queue = Queue() - -def on_message(client, userdata, message): - global socket - - # Send the command - print(f'Sending command {message.payload.decode("utf-8")}') - socket.send(message.payload) - - # Get the reply - reply = socket.recv() - print(f'Received reply {message.payload.decode("utf-8")}: {reply}') - - -mqtt_client = mqtt.Client(f'ohmpi_{OHMPI_CONFIG["id"]}_listener', clean_session=False) # create new instance -print('connecting command listener to broker') -trials = 0 -trials_max = 10 -broker_connected = False -while trials < trials_max: - try: - mqtt_client.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), - MQTT_CONTROL_CONFIG['auth']['password']) - mqtt_client.connect(MQTT_CONTROL_CONFIG['hostname']) - trials = trials_max - broker_connected = True - except: - print('trying again...') - time.sleep(2) - trials += 1 -if broker_connected: - print('Subscribing to topic', MQTT_CONTROL_CONFIG['ctrl_topic']) - mqtt_client.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) - mqtt_client.on_message = on_message - mqtt_client.loop_start() - - context = zmq.Context() - # Socket to talk to server - print("Connecting to ohmpi control server") - socket = context.socket(zmq.REQ) - socket.connect(f'tcp://localhost:{CONTROL_CONFIG["tcp_port"]}') - - while True: - time.sleep(.1) -else: - print("Unable to connect to control broker") - exit(1) diff --git a/ohmpi.py b/ohmpi.py index d3cacfcb117b224fc56cb3f83b740677fab0b1bb..505cd0bf9d1e782bb2127f099291d06cf6aae8f6 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -15,7 +15,7 @@ from copy import deepcopy import numpy as np import csv import time -from io import StringIO +import shutil from datetime import datetime from termcolor import colored import threading @@ -35,7 +35,8 @@ try: import digitalio # noqa from digitalio import Direction # noqa from gpiozero import CPUTemperature # noqa - import minimalmodbus # noqa + import minimalmodbus # noqa + arm64_imports = True except ImportError as error: if EXEC_LOGGING_CONFIG['logging_level'] == DEBUG: @@ -130,13 +131,13 @@ class OhmPi(object): # current injection module if self.idps: self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) - self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc - self.DPS.serial.bytesize = 8 # - self.DPS.serial.timeout = 1 # greater than 0.5 for it to work - self.DPS.debug = False # - self.DPS.serial.parity = 'N' # No parity - self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode - self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays) + self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc + self.DPS.serial.bytesize = 8 # + self.DPS.serial.timeout = 1 # greater than 0.5 for it to work + self.DPS.debug = False # + self.DPS.serial.parity = 'N' # No parity + self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode + self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays) # (last number) 0 is for mA, 3 is for A # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) @@ -157,7 +158,7 @@ class OhmPi(object): f" on {MQTT_CONTROL_CONFIG['hostname']} broker") def connect_mqtt() -> mqtt_client: - def on_connect(client, userdata, flags, rc): + def on_connect(mqttclient, userdata, flags, rc): if rc == 0: self.exec_logger.debug(f"Successfully connected to control broker:" f" {MQTT_CONTROL_CONFIG['hostname']}") @@ -170,6 +171,7 @@ class OhmPi(object): client.on_connect = on_connect client.connect(MQTT_CONTROL_CONFIG['hostname'], MQTT_CONTROL_CONFIG['port']) return client + try: self.exec_logger.debug(f"Connecting to control broker: {MQTT_CONTROL_CONFIG['hostname']}") self.controller = connect_mqtt() @@ -209,6 +211,7 @@ class OhmPi(object): Parameters ---------- + cmd_id filename : str filename to save the last measurement dataframe last_measurement : dict @@ -305,8 +308,8 @@ class OhmPi(object): time.sleep(best_tx_injtime) # inject for given tx time # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_voltage_address) + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) # print('current P0', AnalogIn(self.ads_current, ads.P0).voltage) # print('voltage P0', AnalogIn(self.ads_voltage, ads.P0).voltage) # print('voltage P2', AnalogIn(self.ads_voltage, ads.P2).voltage) @@ -320,8 +323,8 @@ class OhmPi(object): # we measure the voltage on both A0 and A2 to guess the polarity I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa # print('I (mV)', I*50*self.r_shunt) # print('I (mA)', I) # print('U0 (mV)', U0) @@ -337,7 +340,7 @@ class OhmPi(object): # compute constant c = vmn / I - Rab = (volt * 1000.) / I + Rab = (volt * 1000.) / I # noqa self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') @@ -429,6 +432,42 @@ class OhmPi(object): self.exec_logger.debug(f'Setting gain to {gain}') return gain + def get_data(self, survey_names=[], cmd_id=None): + """Get available data. + + Parameters + ---------- + survey_names : list of str, optional + List of filenames already available from the html interface. So + their content won't be returned again. Only files not in the list + will be read. + """ + # get all .csv file in data folder + fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] + ddic = {} + if cmd_id is None: + cmd_id = 'unknown' + for fname in fnames: + if ((fname != 'readme.txt') + and ('_rs' not in fname) + and (fname.replace('.csv', '') not in survey_names)): + try: + data = np.loadtxt('data/' + fname, delimiter=',', + skiprows=1, usecols=(1,2,3,4,8)) + data = data[None, :] if len(data.shape) == 1 else data + ddic[fname.replace('.csv', '')] = { + 'a': data[:, 0].astype(int).tolist(), + 'b': data[:, 1].astype(int).tolist(), + 'm': data[:, 2].astype(int).tolist(), + 'n': data[:, 3].astype(int).tolist(), + 'rho': data[:, 4].tolist(), + } + except Exception as e: + print(fname, ':', e) + rdic = {'cmd_id': cmd_id, 'data': ddic} + self.data_logger.info(json.dumps(rdic)) + return ddic + def interrupt(self, cmd_id=None): """Interrupts the acquisition. """ self.status = 'stopping' @@ -444,6 +483,7 @@ class OhmPi(object): Parameters ---------- + cmd_id filename : str Path of the .csv or .txt file with A, B, M and N electrodes. Electrode index start at 1. @@ -546,8 +586,6 @@ class OhmPi(object): except Exception as e: self.exec_logger.error( f"Unable to execute {cmd}({str(kwargs) if kwargs is not None else ''}): {e}") - # f"Unable to execute {cmd}({str(args) + ', ' if args is not None else ''}" - # f"{str(kwargs) if kwargs is not None else ''}): {e}") status = False except Exception as e: self.exec_logger.warning(f'Unable to decode command {message}: {e}') @@ -583,6 +621,12 @@ class OhmPi(object): warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) self.load_sequence(**kwargs) + def remove_data(self, **kwargs): + """Remove all data in the data/ folder. + """ + shutil.rmtree('data') + os.mkdir('data') + def restart(self, cmd_id=None): self.exec_logger.info('Restarting pi...') os.system('reboot') @@ -699,7 +743,7 @@ class OhmPi(object): self.pin1.value = False # one stack = 2 half-cycles (one positive, one negative) - pinMN = 0 if polarity > 0 else 2 + pinMN = 0 if polarity > 0 else 2 # noqa # sampling for each stack at the end of the injection sampling_interval = 10 # ms @@ -752,7 +796,7 @@ class OhmPi(object): end_delay = time.time() # truncate the meas array if we didn't fill the last samples - meas = meas[:k+1] + meas = meas[:k + 1] # measurement of current i and voltage u during off time measpp = np.zeros((meas.shape[0], 3)) * np.nan @@ -779,13 +823,13 @@ class OhmPi(object): end_delay = time.time() # truncate the meas array if we didn't fill the last samples - measpp = measpp[:k+1] + measpp = measpp[:k + 1] # we alternate on which ADS1115 pin we measure because of sign of voltage if pinMN == 0: - pinMN = 2 + pinMN = 2 # noqa else: - pinMN = 0 + pinMN = 0 # noqa # store data for full wave form fulldata.append(meas) @@ -799,8 +843,8 @@ class OhmPi(object): # take average from the samples per stack, then sum them all # average for the last third of the stacked values # is done outside the loop - sum_i = sum_i + (np.mean(meas[-int(meas.shape[0]//3):, 0])) - vmn1 = np.mean(meas[-int(meas.shape[0]//3), 1]) + sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) + vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) if (n % 2) == 0: sum_vmn = sum_vmn - vmn1 sum_ps = sum_ps + vmn1 @@ -872,119 +916,6 @@ class OhmPi(object): return d - def run_multiple_sequences(self, sequence_delay=None, nb_meas=None, **kwargs): - """Runs multiple sequences in a separate thread for monitoring mode. - Can be stopped by 'OhmPi.interrupt()'. - Additional arguments are passed to run_measurement(). - - Parameters - ---------- - sequence_delay : int, optional - Number of seconds at which the sequence must be started from each others. - nb_meas : int, optional - Number of time the sequence must be repeated. - kwargs : dict, optional - See help(k.run_measurement) for more info. - """ - # self.run = True - if sequence_delay is None: - sequence_delay = self.settings['sequence_delay'] - sequence_delay = int(sequence_delay) - if nb_meas is None: - nb_meas = self.settings['nb_meas'] - self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - - def func(): - for g in range(0, nb_meas): # for time-lapse monitoring - if self.status == 'stopping': - self.exec_logger.warning('Data acquisition interrupted') - break - t0 = time.time() - self.run_sequence(**kwargs) - - # sleeping time between sequence - dt = sequence_delay - (time.time() - t0) - if dt < 0: - dt = 0 - if nb_meas > 1: - time.sleep(dt) # waiting for next measurement (time-lapse) - self.status = 'idle' - self.thread = threading.Thread(target=func) - self.thread.start() - - def run_sequence(self, **kwargs): - """Runs sequence synchronously (=blocking on main thread). - Additional arguments are passed to run_measurement(). - """ - self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - t0 = time.time() - - # create filename with timestamp - filename = self.settings["export_path"].replace('.csv', - f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') - self.exec_logger.debug(f'Saving to {filename}') - - # make sure all multiplexer are off - self.reset_mux() - - # measure all quadrupole of the sequence - if self.sequence is None: - n = 1 - else: - n = self.sequence.shape[0] - for i in range(0, n): - if self.sequence is None: - quad = np.array([0, 0, 0, 0]) - else: - quad = self.sequence[i, :] # quadrupole - if self.status == 'stopping': - break - - # call the switch_mux function to switch to the right electrodes - self.switch_mux_on(quad) - - # run a measurement - if self.on_pi: - acquired_data = self.run_measurement(quad, **kwargs) - else: # for testing, generate random data - acquired_data = { - 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], - 'R [ohm]': np.abs(np.random.randn(1)) - } - - # switch mux off - self.switch_mux_off(quad) - - # add command_id in dataset - # acquired_data.update({'cmd_id': cmd_id}) // in run_measurement() - # log data to the data logger - # self.data_logger.info(f'{acquired_data}') // in run_measurement() - # save data and print in a text file - self.append_and_save(filename, acquired_data) - self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') - - self.status = 'idle' - - def run_sequence_async(self, cmd_id=None, **kwargs): - """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. - Additional arguments are passed to run_measurement(). - - Parameters - ---------- - cmd_id: - """ - - def func(): - self.run_sequence(**kwargs) - - self.thread = threading.Thread(target=func) - self.thread.start() - self.status = 'idle' - def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs): """Runs multiple sequences in a separate thread for monitoring mode. Can be stopped by 'OhmPi.interrupt()'. @@ -1026,6 +957,7 @@ class OhmPi(object): if nb_meas > 1: time.sleep(dt) # waiting for next measurement (time-lapse) self.status = 'idle' + self.thread = threading.Thread(target=func) self.thread.start() @@ -1077,7 +1009,7 @@ class OhmPi(object): # add command_id in dataset acquired_data.update({'cmd_id': cmd_id}) # log data to the data logger - # self.data_logger.info(f'{acquired_data}') # already in run_measurement() + # self.data_logger.info(f'{acquired_data}') # save data and print in a text file self.append_and_save(filename, acquired_data) self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') @@ -1147,8 +1079,8 @@ class OhmPi(object): # print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format( # current, voltage, resist)) msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \ - f'V: {voltage :>10.3f} mV, ' \ - f'R: {resist :>10.3f} kOhm' + f'V: {voltage :>10.3f} mV, ' \ + f'R: {resist :>10.3f} kOhm' self.exec_logger.debug(msg) @@ -1169,96 +1101,11 @@ class OhmPi(object): else: pass self.status = 'idle' + # # # TODO if interrupted, we would need to restore the values # # TODO or we offer the possibility in 'run_measurement' to have rs_check each time? - def set_sequence(self, sequence=None): - try: - self.sequence = np.array(sequence) - #self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32') - status = True - except Exception as e: - self.exec_logger.warning(f'Unable to set sequence: {e}') - status = False - - def stop(self): - warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning) - self.interrupt() - - def _switch_mux(self, electrode_nr, state, role): - """Selects the right channel for the multiplexer cascade for a given electrode. - - Parameters - ---------- - electrode_nr : int - Electrode index to be switched on or off. - state : str - Either 'on' or 'off'. - role : str - Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. - """ - if not self.use_mux or not self.on_pi: - if not self.on_pi: - self.exec_logger.warning('Cannot reset mux while in simulation mode...') - else: - self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' - ' Set use_mux to True to use the multiplexer...') - elif self.sequence is None: - self.exec_logger.warning('Unable to switch MUX without a sequence') - else: - # choose with MUX board - tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) - - # find I2C address of the electrode and corresponding relay - # considering that one MCP23017 can cover 16 electrodes - i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division - relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 - - if i2c_address is not None: - # select the MCP23017 of the selected MUX board - mcp2 = MCP23017(tca[i2c_address]) - mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT - - if state == 'on': - mcp2.get_pin(relay_nr - 1).value = True - else: - mcp2.get_pin(relay_nr - 1).value = False - - self.exec_logger.debug(f'Switching relay {relay_nr} ' - f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') - else: - self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') - - def switch_mux_on(self, quadrupole): - """Switches on multiplexer relays for given quadrupole. - - Parameters - ---------- - quadrupole : list of 4 int - List of 4 integers representing the electrode numbers. - """ - roles = ['A', 'B', 'M', 'N'] - # another check to be sure A != B - if quadrupole[0] != quadrupole[1]: - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'on', roles[i]) - else: - self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') - - def switch_mux_off(self, quadrupole): - """Switches off multiplexer relays for given quadrupole. - - Parameters - ---------- - quadrupole : list of 4 int - List of 4 integers representing the electrode numbers. - """ - roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'off', roles[i]) def set_sequence(self, sequence=None, cmd_id=None): try: self.sequence = np.array(sequence).astype(int) @@ -1268,9 +1115,9 @@ class OhmPi(object): self.exec_logger.warning(f'Unable to set sequence: {e}') status = False - def stop(self, cmd_id=None): + def stop(self, **kwargs): warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning) - self.interrupt() + self.interrupt(**kwargs) def _switch_mux(self, electrode_nr, state, role): """Selects the right channel for the multiplexer cascade for a given electrode. @@ -1321,6 +1168,7 @@ class OhmPi(object): Parameters ---------- + cmd_id quadrupole : list of 4 int List of 4 integers representing the electrode numbers. """ @@ -1338,6 +1186,7 @@ class OhmPi(object): Parameters ---------- + cmd_id quadrupole : list of 4 int List of 4 integers representing the electrode numbers. """ @@ -1346,65 +1195,6 @@ class OhmPi(object): if quadrupole[i] > 0: self._switch_mux(quadrupole[i], 'off', roles[i]) - def reset_mux(self): - """Switches off all multiplexer relays.""" - if self.on_pi and self.use_mux: - roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - for j in range(1, self.max_elec + 1): - self._switch_mux(j, 'off', roles[i]) - self.exec_logger.debug('All MUX switched off.') - elif not self.on_pi: - self.exec_logger.warning('Cannot reset mux while in simulation mode...') - else: - self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' - ' Set use_mux to True to use the multiplexer...') - - def _update_acquisition_settings(self, config): - warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) - self.update_settings(config) - - def update_settings(self, config): - """Updates acquisition settings from a json file or dictionary. - Parameters can be: - - nb_electrodes (number of electrode used, if 4, no MUX needed) - - injection_duration (in seconds) - - nb_meas (total number of times the sequence will be run) - - sequence_delay (delay in second between each sequence run) - - nb_stack (number of stack for each quadrupole measurement) - - export_path (path where to export the data, timestamp will be added to filename) - - Parameters - ---------- - config : str, dict - Path to the .json settings file or dictionary of settings. - """ - status = False - if config is not None: - try: - if isinstance(config, dict): - self.settings.update(config) - else: - with open(config) as json_file: - dic = json.load(json_file) - self.settings.update(dic) - self.exec_logger.info('Acquisition parameters updated: ' + str(self.settings)) - status = True - except Exception as e: - self.exec_logger.warning('Unable to update settings.') - status = False - else: - self.exec_logger.warning('Settings are missing...') - return status - - # Properties - @property - def sequence(self): - """Gets sequence""" - if self._sequence is not None: - assert isinstance(self._sequence, np.ndarray) - return self._sequence - def reset_mux(self, cmd_id=None): """Switches off all multiplexer relays.""" if self.on_pi and self.use_mux: @@ -1421,9 +1211,9 @@ class OhmPi(object): def _update_acquisition_settings(self, config): warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) - self.update_settings(config) + self.update_settings(settings=config) - def update_settings(self, config:str, cmd_id=None): + def update_settings(self, settings: str, cmd_id=None): """Updates acquisition settings from a json file or dictionary. Parameters can be: - nb_electrodes (number of electrode used, if 4, no MUX needed) @@ -1435,21 +1225,22 @@ class OhmPi(object): Parameters ---------- - config : str, dict + cmd_id + settings : str, dict Path to the .json settings file or dictionary of settings. """ status = False - if config is not None: + if settings is not None: try: - if isinstance(config, dict): - self.settings.update(config) + if isinstance(settings, dict): + self.settings.update(settings) else: - with open(config) as json_file: + with open(settings) as json_file: dic = json.load(json_file) self.settings.update(dic) self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings)) status = True - except Exception as e: + except Exception as e: # noqa self.exec_logger.warning('Unable to update settings.') status = False else: