diff --git a/logging_setup.py b/logging_setup.py index 08b0d617e4db9cdccd5ed9f38c943b5f57a0c6df..556c77804868cc20d9da32d9b01fa5aae172783a 100644 --- a/logging_setup.py +++ b/logging_setup.py @@ -1,11 +1,12 @@ -from settings import LOGGING_CONFIG, DATA_LOGGING_CONFIG +from settings import LOGGING_CONFIG, DATA_LOGGING_CONFIG, MQTT_LOGGING_CONFIG from os import path, mkdir, statvfs from time import gmtime import logging +from mqtt_logger import MQTTHandler from compressed_sized_timed_rotating_logger import CompressedSizedTimedRotatingFileHandler -def setup_loggers(): +def setup_loggers(mqtt=True): # Message logging setup log_path = path.join(path.dirname(__file__), 'logs') if not path.isdir(log_path): @@ -13,6 +14,9 @@ def setup_loggers(): msg_log_filename = path.join(log_path, 'msg_log') msg_logger = logging.getLogger('msg_logger') + # SOH logging setup + # TODO: Add state of health logging here + # Data logging setup base_path = path.dirname(__file__) data_path = path.join(base_path, 'data') @@ -44,6 +48,11 @@ def setup_loggers(): if logging_to_console: msg_logger.addHandler(logging.StreamHandler()) + if mqtt: + mqtt_msg_handler = MQTTHandler(MQTT_LOGGING_CONFIG['hostname'], MQTT_LOGGING_CONFIG['topic']) + mqtt_msg_handler.setLevel(logging_level) + mqtt_msg_handler.setFormatter(msg_formatter) + msg_logger.addHandler(mqtt_msg_handler) # Set data logging level and handler data_logger.setLevel(logging.INFO) diff --git a/mqtt_logger.py b/mqtt_logger.py new file mode 100644 index 0000000000000000000000000000000000000000..e93ee1f6c52998b48263df6d1c09e0373beef8ca --- /dev/null +++ b/mqtt_logger.py @@ -0,0 +1,58 @@ +""" +MIT License +Copyright (c) 2016 Pipat Methavanitpong +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import logging +import paho.mqtt.client as mqtt +import paho.mqtt.publish as publish + + +class MQTTHandler(logging.Handler): + """ + A handler class which writes logging records, appropriately formatted, + to a MQTT server to a topic. + """ + def __init__(self, hostname, topic, qos=0, retain=False, + port=1883, client_id='', keepalive=60, will=None, auth=None, + tls=None, protocol=mqtt.MQTTv31, transport='tcp'): + logging.Handler.__init__(self) + self.topic = topic + self.qos = qos + self.retain = retain + self.hostname = hostname + self.port = port + self.client_id = client_id + self.keepalive = keepalive + self.will = will + self.auth = auth + self.tls = tls + self.protocol = protocol + self.transport = transport + + def emit(self, record): + """ + Publish a single formatted logging record to a broker, then disconnect + cleanly. + """ + msg = self.format(record) + publish.single(self.topic, msg, self.qos, self.retain, + hostname=self.hostname, port=self.port, + client_id=self.client_id, keepalive=self.keepalive, + will=self.will, auth=self.auth, tls=self.tls, + protocol=self.protocol, transport=self.transport) \ No newline at end of file diff --git a/mqtt_setup.py b/mqtt_setup.py index 9067c5251e7dd58fbf45987701a1a37b84304a8f..66296df93ef71b8e3637c85a57b37ae90940a7b6 100644 --- a/mqtt_setup.py +++ b/mqtt_setup.py @@ -1,4 +1,4 @@ -from settings import MQTT_CONFIG +from settings import MQTT_LOGGING_CONFIG import paho.mqtt.client as mqtt @@ -8,11 +8,11 @@ def on_message(client, userdata, message): print(f'topic: {message.topic}') print(f'qos: {message.qos}') print(f'retain flag: {message.retain}') - client.publish(MQTT_CONFIG['measurements_topic'], f'{m} 45 ohm.m') + client.publish(MQTT_LOGGING_CONFIG['measurements_topic'], f'{m} 45 ohm.m') def mqtt_client_setup(): - client = mqtt.Client(MQTT_CONFIG['client_id'], protocol=5) # create new client instance - client.connect(MQTT_CONFIG['mqtt_broker']) + client = mqtt.Client(MQTT_LOGGING_CONFIG['client_id'], protocol=5) # create new client instance + client.connect(MQTT_LOGGING_CONFIG['mqtt_broker']) client.on_message = on_message - return client, MQTT_CONFIG['measurements_topic'] + return client, MQTT_LOGGING_CONFIG['measurements_topic'] \ No newline at end of file diff --git a/ohmpi.py b/ohmpi.py index 330b73793287a5739031ed87f6f52e2a82bf4463..416652ed19e3db16cdda82699dbb2f5143d62be3 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -51,7 +51,6 @@ print(r' \___/\_| |_/\_| |_/\_| \___/ ') print('\033[0m') print('OhmPi start') print('Version:', VERSION) -print('Import libraries') current_time = datetime.now() print(current_time.strftime("%Y-%m-%d %H:%M:%S")) @@ -71,7 +70,7 @@ class OhmPi(object): Either 'print' for a console output or 'mqtt' for publication onto MQTT broker. """ - def __init__(self, config=None, sequence=None, output='print'): + def __init__(self, config=None, sequence=None, output='print', data_logger=None, msg_logger=None, soh_logger=None): # flags and attributes # self.on_pi = on_pi # True if run from the RaspberryPi with the hardware, otherwise False for random data self.output = output # type of output print @@ -79,9 +78,13 @@ class OhmPi(object): self.run = False # flag is True when measuring self.thread = None # contains the handle for the thread taking the measurement self.path = 'data/' # where to save the .csv + self.data_logger = data_logger + self.msg_logger = msg_logger + self.soh_logger = soh_logger + if not arm64_imports: - self.dump(f'Warning: {e}\n Some libraries only available on arm64 platform could not be imported.\n' + self.log_msg(f'Warning: {e}\n Some libraries only available on arm64 platform could not be imported.\n' f'The Ohmpi class will fake operations for testing purposes.', 'warning') # read in hardware parameters (settings.py) @@ -100,7 +103,7 @@ class OhmPi(object): if config is not None: self._read_acquisition_parameters(config) - self.dump('Initialized with configuration:' + str(self.pardict), level='debug') + self.log_msg('Initialized with configuration:' + str(self.pardict), level='debug') # read quadrupole sequence if sequence is None: @@ -122,7 +125,7 @@ class OhmPi(object): # ADS1115 for voltage measurement (MN) self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49) - def dump(self, msg, level='debug'): + def log_msg(self, msg, level='debug'): """Function for output management. Parameters @@ -134,17 +137,19 @@ class OhmPi(object): """ # TODO all message to be logged using python logging library and rotating log - if self.output == 'print': - if level == 'error': - print(colored(level.upper() + ' : ' + msg, 'red')) - elif level == 'warn': - print(colored(level.upper() + ' : ' + msg, 'yellow')) - else: - print(level.upper() + ' : ' + msg) - elif self.output == 'mqtt': - if level == 'debug': - # TODO mqtt transmission here - pass + if self.msg_logger is not None: + self.msg_logger.info(msg) + # if self.output == 'print': + # if level == 'error': + # print(colored(level.upper() + ' : ' + msg, 'red')) + # elif level == 'warn': + # print(colored(level.upper() + ' : ' + msg, 'yellow')) + # else: + # print(level.upper() + ' : ' + msg) + # elif self.output == 'mqtt': + # if level == 'debug': + # # TODO mqtt transmission here + # pass def _read_acquisition_parameters(self, config): """Read acquisition parameters. @@ -167,7 +172,7 @@ class OhmPi(object): with open(config) as json_file: dic = json.load(json_file) self.pardict.update(dic) - self.dump('Acquisition parameters updated: ' + str(self.pardict), level='debug') + self.log_msg('Acquisition parameters updated: ' + str(self.pardict), level='debug') def _read_hardware_parameters(self): """Read hardware parameters from settings.py. @@ -176,7 +181,7 @@ class OhmPi(object): self.id = OHMPI_CONFIG['id'] # ID of the OhmPi self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm self.Imax = OHMPI_CONFIG['Imax'] # maximum current - self.dump(f'The maximum current cannot be higher than {self.Imax} mA', level='warn') + self.log_msg(f'The maximum current cannot be higher than {self.Imax} mA', level='warn') self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ads.P3, measurement in V/V self.offset_p2 = OHMPI_CONFIG['offset_p2'] @@ -185,7 +190,7 @@ class OhmPi(object): self.version = OHMPI_CONFIG['version'] # hardware version self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes self.board_address = OHMPI_CONFIG['board_address'] - self.dump('OHMPI_CONFIG = ' + str(OHMPI_CONFIG), level='debug') + self.log_msg('OHMPI_CONFIG = ' + str(OHMPI_CONFIG), level='debug') @staticmethod def find_identical_in_line(quads): @@ -263,19 +268,19 @@ class OhmPi(object): # if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2) if test_index_elec.size != 0: for i in range(len(test_index_elec[0, :])): - self.dump('Error: An electrode index at line ' + str(test_index_elec[0, i]+1) + + self.log_msg('Error: An electrode index at line ' + str(test_index_elec[0, i] + 1) + ' exceeds the maximum number of electrodes', level='error') # sys.exit(1) output = None elif len(test_same_elec) != 0: for i in range(len(test_same_elec)): - self.dump('Error: An electrode index A == B detected at line ' + str(test_same_elec[i]+1), - level="error") + self.log_msg('Error: An electrode index A == B detected at line ' + str(test_same_elec[i] + 1), + level="error") # sys.exit(1) output = None if output is not None: - self.dump('Sequence of {:d} quadrupoles read.'.format(output.shape[0]), level='debug') + self.log_msg('Sequence of {:d} quadrupoles read.'.format(output.shape[0]), level='debug') self.sequence = output @@ -328,9 +333,9 @@ class OhmPi(object): else: mcp2.get_pin(relay_nr-1).value = False - self.dump(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}', level='debug') + self.log_msg(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}', level='debug') else: - self.dump(f'Unable to address electrode nr {electrode_nr}', level='warn') + self.log_msg(f'Unable to address electrode nr {electrode_nr}', level='warn') def switch_mux_on(self, quadrupole): """Switch on multiplexer relays for given quadrupole. @@ -346,7 +351,7 @@ class OhmPi(object): for i in range(0, 4): self.switch_mux(quadrupole[i], 'on', roles[i]) else: - self.dump('A == B -> short circuit detected!', level='error') + self.log_msg('A == B -> short circuit detected!', level='error') def switch_mux_off(self, quadrupole): """Switch off multiplexer relays for given quadrupole. @@ -366,7 +371,7 @@ class OhmPi(object): for i in range(0, 4): for j in range(1, self.max_elec + 1): self.switch_mux(j, 'off', roles[i]) - self.dump('All MUX switched off.', level='debug') + self.log_msg('All MUX switched off.', level='debug') def run_measurement(self, quad, nb_stack=None, injection_duration=None): # NOTE: quad not used?! """ Do a 4 electrode measurement and measure transfer resistance obtained. @@ -478,7 +483,7 @@ class OhmPi(object): val = d[k] output += f'{val}\t' output = output[:-1] - self.dump(output, level='debug') + self.log_msg(output, level='debug') time.sleep(1) # NOTE: why this? return d @@ -504,7 +509,7 @@ class OhmPi(object): self.sequence = quads # run the RS check - self.dump('RS check (check contact resistance)', level='debug') + self.log_msg('RS check (check contact resistance)', level='debug') self.measure() # restore @@ -545,19 +550,19 @@ class OhmPi(object): """ self.run = True self.status = 'running' - self.dump('status = ' + self.status, level='debug') + self.log_msg('status = ' + self.status, level='debug') def func(): for g in range(0, self.pardict["nbr_meas"]): # for time-lapse monitoring if self.run is False: - self.dump('INTERRUPTED', level='debug') + self.log_msg('INTERRUPTED', level='debug') break t0 = time.time() # create filename with timestamp fname = self.pardict["export_path"].replace('.csv', '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '.csv') - self.dump('saving to ' + fname, level='debug') + self.log_msg('saving to ' + fname, level='debug') # make sure all multiplexer are off self.reset_mux() @@ -586,7 +591,7 @@ class OhmPi(object): # save data and print in a text file self.append_and_save(fname, current_measurement) - self.dump('{:d}/{:d}'.format(i+1, self.sequence.shape[0]), level='debug') + self.log_msg('{:d}/{:d}'.format(i + 1, self.sequence.shape[0]), level='debug') # compute time needed to take measurement and subtract it from interval # between two sequence run (= sequence_delay) @@ -596,8 +601,8 @@ class OhmPi(object): if sleep_time < 0: # it means that the measuring time took longer than the sequence delay sleep_time = 0 - self.dump('The measuring time is longer than the sequence delay. Increase the sequence delay', - level='warn') + self.log_msg('The measuring time is longer than the sequence delay. Increase the sequence delay', + level='warn') # sleeping time between sequence if self.pardict["nbr_meas"] > 1: @@ -612,7 +617,7 @@ class OhmPi(object): self.run = False if self.thread is not None: self.thread.join() - self.dump('status = ' + self.status) + self.log_msg('status = ' + self.status) # for testing diff --git a/old-requirements.txt b/old-requirements.txt deleted file mode 100644 index 4935e38a311ae23aa6e007577eb790e5623e19ee..0000000000000000000000000000000000000000 --- a/old-requirements.txt +++ /dev/null @@ -1,20 +0,0 @@ -libraries to install using the package manager : libatlas-base-dev - sudo apt-get install libatlas-base-dev -python libraries dependencies : -Adafruit-Blinka==3.2.0 -adafruit-circuitpython-ads1x15==2.1.1 -adafruit-circuitpython-mcp230xx==2.5.1 -adafruit-circuitpython-busdevice==4.0.1 -Adafruit-PlatformDetect==1.3.8 -adafruit-circuitpython-tca9548a==0.5.1 -Adafruit-PureIO==1.0.4 -numpy==1.17.4 -pandas==0.25.3 -pkg-resources==0.0.0 -python-dateutil==2.8.1 -pytz==2019.3 -rpi-ws281x==4.2.2 -RPi.GPIO==0.7.0 -six==1.13.0 -spidev==3.4 -sysv-ipc==1.0.1 diff --git a/settings.py b/settings.py index 23f3544a9268dd4571a8996a1c9ac9dd1116060a..b1630ddae3b263deb1dfbe31dae9bc0b229af918 100644 --- a/settings.py +++ b/settings.py @@ -1,3 +1,5 @@ +from paho import mqtt + # OhmPi configuration OHMPI_CONFIG = { 'id': '0001', # Unique identifier of the OhmPi board (string) @@ -34,9 +36,20 @@ DATA_LOGGING_CONFIG = { } # MQTT configuration parameters -MQTT_CONFIG = { - 'mqtt_broker': 'mg3d-dev.umons.ac.be', +MQTT_LOGGING_CONFIG = { + 'broker': 'ohmpy.umons.ac.be', + 'port': 1883, + 'qos': 0, + 'retain': False, + 'keepalive': 60, + 'will': None, + 'auth': None, + 'tls':None, + 'protocol': mqtt.MQTTv31, + 'transport': 'tcp', 'client_id': f'ohmpi_sn_{OHMPI_CONFIG["id"]}', 'control_topic': f'cmd_ohmpi_sn_{OHMPI_CONFIG["id"]}', - 'measurements_topic': f'measurements_ohmpi_sn_{OHMPI_CONFIG["id"]}' + 'msg_topic': f'msg_ohmpi_sn_{OHMPI_CONFIG["id"]}', + 'data_topic': f'data_ohmpi_sn_{OHMPI_CONFIG["id"]}', + 'soh_topic': f'soh_ohmpi_sn_{OHMPI_CONFIG["id"]}' }