Commit f771e959 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Adds an mqtt_handler for logging and includes first steps of logging implementation

Showing with 131 additions and 66 deletions
+131 -66
from settings import LOGGING_CONFIG, DATA_LOGGING_CONFIG from settings import LOGGING_CONFIG, DATA_LOGGING_CONFIG, MQTT_LOGGING_CONFIG
from os import path, mkdir, statvfs from os import path, mkdir, statvfs
from time import gmtime from time import gmtime
import logging import logging
from mqtt_logger import MQTTHandler
from compressed_sized_timed_rotating_logger import CompressedSizedTimedRotatingFileHandler from compressed_sized_timed_rotating_logger import CompressedSizedTimedRotatingFileHandler
def setup_loggers(): def setup_loggers(mqtt=True):
# Message logging setup # Message logging setup
log_path = path.join(path.dirname(__file__), 'logs') log_path = path.join(path.dirname(__file__), 'logs')
if not path.isdir(log_path): if not path.isdir(log_path):
...@@ -13,6 +14,9 @@ def setup_loggers(): ...@@ -13,6 +14,9 @@ def setup_loggers():
msg_log_filename = path.join(log_path, 'msg_log') msg_log_filename = path.join(log_path, 'msg_log')
msg_logger = logging.getLogger('msg_logger') msg_logger = logging.getLogger('msg_logger')
# SOH logging setup
# TODO: Add state of health logging here
# Data logging setup # Data logging setup
base_path = path.dirname(__file__) base_path = path.dirname(__file__)
data_path = path.join(base_path, 'data') data_path = path.join(base_path, 'data')
...@@ -44,6 +48,11 @@ def setup_loggers(): ...@@ -44,6 +48,11 @@ def setup_loggers():
if logging_to_console: if logging_to_console:
msg_logger.addHandler(logging.StreamHandler()) msg_logger.addHandler(logging.StreamHandler())
if mqtt:
mqtt_msg_handler = MQTTHandler(MQTT_LOGGING_CONFIG['hostname'], MQTT_LOGGING_CONFIG['topic'])
mqtt_msg_handler.setLevel(logging_level)
mqtt_msg_handler.setFormatter(msg_formatter)
msg_logger.addHandler(mqtt_msg_handler)
# Set data logging level and handler # Set data logging level and handler
data_logger.setLevel(logging.INFO) data_logger.setLevel(logging.INFO)
......
"""
MIT License
Copyright (c) 2016 Pipat Methavanitpong
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import logging
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
class MQTTHandler(logging.Handler):
"""
A handler class which writes logging records, appropriately formatted,
to a MQTT server to a topic.
"""
def __init__(self, hostname, topic, qos=0, retain=False,
port=1883, client_id='', keepalive=60, will=None, auth=None,
tls=None, protocol=mqtt.MQTTv31, transport='tcp'):
logging.Handler.__init__(self)
self.topic = topic
self.qos = qos
self.retain = retain
self.hostname = hostname
self.port = port
self.client_id = client_id
self.keepalive = keepalive
self.will = will
self.auth = auth
self.tls = tls
self.protocol = protocol
self.transport = transport
def emit(self, record):
"""
Publish a single formatted logging record to a broker, then disconnect
cleanly.
"""
msg = self.format(record)
publish.single(self.topic, msg, self.qos, self.retain,
hostname=self.hostname, port=self.port,
client_id=self.client_id, keepalive=self.keepalive,
will=self.will, auth=self.auth, tls=self.tls,
protocol=self.protocol, transport=self.transport)
\ No newline at end of file
from settings import MQTT_CONFIG from settings import MQTT_LOGGING_CONFIG
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
...@@ -8,11 +8,11 @@ def on_message(client, userdata, message): ...@@ -8,11 +8,11 @@ def on_message(client, userdata, message):
print(f'topic: {message.topic}') print(f'topic: {message.topic}')
print(f'qos: {message.qos}') print(f'qos: {message.qos}')
print(f'retain flag: {message.retain}') print(f'retain flag: {message.retain}')
client.publish(MQTT_CONFIG['measurements_topic'], f'{m} 45 ohm.m') client.publish(MQTT_LOGGING_CONFIG['measurements_topic'], f'{m} 45 ohm.m')
def mqtt_client_setup(): def mqtt_client_setup():
client = mqtt.Client(MQTT_CONFIG['client_id'], protocol=5) # create new client instance client = mqtt.Client(MQTT_LOGGING_CONFIG['client_id'], protocol=5) # create new client instance
client.connect(MQTT_CONFIG['mqtt_broker']) client.connect(MQTT_LOGGING_CONFIG['mqtt_broker'])
client.on_message = on_message client.on_message = on_message
return client, MQTT_CONFIG['measurements_topic'] return client, MQTT_LOGGING_CONFIG['measurements_topic']
\ No newline at end of file
...@@ -51,7 +51,6 @@ print(r' \___/\_| |_/\_| |_/\_| \___/ ') ...@@ -51,7 +51,6 @@ print(r' \___/\_| |_/\_| |_/\_| \___/ ')
print('\033[0m') print('\033[0m')
print('OhmPi start') print('OhmPi start')
print('Version:', VERSION) print('Version:', VERSION)
print('Import libraries')
current_time = datetime.now() current_time = datetime.now()
print(current_time.strftime("%Y-%m-%d %H:%M:%S")) print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
...@@ -71,7 +70,7 @@ class OhmPi(object): ...@@ -71,7 +70,7 @@ class OhmPi(object):
Either 'print' for a console output or 'mqtt' for publication onto Either 'print' for a console output or 'mqtt' for publication onto
MQTT broker. MQTT broker.
""" """
def __init__(self, config=None, sequence=None, output='print'): def __init__(self, config=None, sequence=None, output='print', data_logger=None, msg_logger=None, soh_logger=None):
# flags and attributes # flags and attributes
# self.on_pi = on_pi # True if run from the RaspberryPi with the hardware, otherwise False for random data # self.on_pi = on_pi # True if run from the RaspberryPi with the hardware, otherwise False for random data
self.output = output # type of output print self.output = output # type of output print
...@@ -79,9 +78,13 @@ class OhmPi(object): ...@@ -79,9 +78,13 @@ class OhmPi(object):
self.run = False # flag is True when measuring self.run = False # flag is True when measuring
self.thread = None # contains the handle for the thread taking the measurement self.thread = None # contains the handle for the thread taking the measurement
self.path = 'data/' # where to save the .csv self.path = 'data/' # where to save the .csv
self.data_logger = data_logger
self.msg_logger = msg_logger
self.soh_logger = soh_logger
if not arm64_imports: if not arm64_imports:
self.dump(f'Warning: {e}\n Some libraries only available on arm64 platform could not be imported.\n' self.log_msg(f'Warning: {e}\n Some libraries only available on arm64 platform could not be imported.\n'
f'The Ohmpi class will fake operations for testing purposes.', 'warning') f'The Ohmpi class will fake operations for testing purposes.', 'warning')
# read in hardware parameters (settings.py) # read in hardware parameters (settings.py)
...@@ -100,7 +103,7 @@ class OhmPi(object): ...@@ -100,7 +103,7 @@ class OhmPi(object):
if config is not None: if config is not None:
self._read_acquisition_parameters(config) self._read_acquisition_parameters(config)
self.dump('Initialized with configuration:' + str(self.pardict), level='debug') self.log_msg('Initialized with configuration:' + str(self.pardict), level='debug')
# read quadrupole sequence # read quadrupole sequence
if sequence is None: if sequence is None:
...@@ -122,7 +125,7 @@ class OhmPi(object): ...@@ -122,7 +125,7 @@ class OhmPi(object):
# ADS1115 for voltage measurement (MN) # ADS1115 for voltage measurement (MN)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49) self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49)
def dump(self, msg, level='debug'): def log_msg(self, msg, level='debug'):
"""Function for output management. """Function for output management.
Parameters Parameters
...@@ -134,17 +137,19 @@ class OhmPi(object): ...@@ -134,17 +137,19 @@ class OhmPi(object):
""" """
# TODO all message to be logged using python logging library and rotating log # TODO all message to be logged using python logging library and rotating log
if self.output == 'print': if self.msg_logger is not None:
if level == 'error': self.msg_logger.info(msg)
print(colored(level.upper() + ' : ' + msg, 'red')) # if self.output == 'print':
elif level == 'warn': # if level == 'error':
print(colored(level.upper() + ' : ' + msg, 'yellow')) # print(colored(level.upper() + ' : ' + msg, 'red'))
else: # elif level == 'warn':
print(level.upper() + ' : ' + msg) # print(colored(level.upper() + ' : ' + msg, 'yellow'))
elif self.output == 'mqtt': # else:
if level == 'debug': # print(level.upper() + ' : ' + msg)
# TODO mqtt transmission here # elif self.output == 'mqtt':
pass # if level == 'debug':
# # TODO mqtt transmission here
# pass
def _read_acquisition_parameters(self, config): def _read_acquisition_parameters(self, config):
"""Read acquisition parameters. """Read acquisition parameters.
...@@ -167,7 +172,7 @@ class OhmPi(object): ...@@ -167,7 +172,7 @@ class OhmPi(object):
with open(config) as json_file: with open(config) as json_file:
dic = json.load(json_file) dic = json.load(json_file)
self.pardict.update(dic) self.pardict.update(dic)
self.dump('Acquisition parameters updated: ' + str(self.pardict), level='debug') self.log_msg('Acquisition parameters updated: ' + str(self.pardict), level='debug')
def _read_hardware_parameters(self): def _read_hardware_parameters(self):
"""Read hardware parameters from settings.py. """Read hardware parameters from settings.py.
...@@ -176,7 +181,7 @@ class OhmPi(object): ...@@ -176,7 +181,7 @@ class OhmPi(object):
self.id = OHMPI_CONFIG['id'] # ID of the OhmPi self.id = OHMPI_CONFIG['id'] # ID of the OhmPi
self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm
self.Imax = OHMPI_CONFIG['Imax'] # maximum current self.Imax = OHMPI_CONFIG['Imax'] # maximum current
self.dump(f'The maximum current cannot be higher than {self.Imax} mA', level='warn') self.log_msg(f'The maximum current cannot be higher than {self.Imax} mA', level='warn')
self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V
self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ads.P3, measurement in V/V self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ads.P3, measurement in V/V
self.offset_p2 = OHMPI_CONFIG['offset_p2'] self.offset_p2 = OHMPI_CONFIG['offset_p2']
...@@ -185,7 +190,7 @@ class OhmPi(object): ...@@ -185,7 +190,7 @@ class OhmPi(object):
self.version = OHMPI_CONFIG['version'] # hardware version self.version = OHMPI_CONFIG['version'] # hardware version
self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes
self.board_address = OHMPI_CONFIG['board_address'] self.board_address = OHMPI_CONFIG['board_address']
self.dump('OHMPI_CONFIG = ' + str(OHMPI_CONFIG), level='debug') self.log_msg('OHMPI_CONFIG = ' + str(OHMPI_CONFIG), level='debug')
@staticmethod @staticmethod
def find_identical_in_line(quads): def find_identical_in_line(quads):
...@@ -263,19 +268,19 @@ class OhmPi(object): ...@@ -263,19 +268,19 @@ class OhmPi(object):
# if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2) # if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2)
if test_index_elec.size != 0: if test_index_elec.size != 0:
for i in range(len(test_index_elec[0, :])): for i in range(len(test_index_elec[0, :])):
self.dump('Error: An electrode index at line ' + str(test_index_elec[0, i]+1) + self.log_msg('Error: An electrode index at line ' + str(test_index_elec[0, i] + 1) +
' exceeds the maximum number of electrodes', level='error') ' exceeds the maximum number of electrodes', level='error')
# sys.exit(1) # sys.exit(1)
output = None output = None
elif len(test_same_elec) != 0: elif len(test_same_elec) != 0:
for i in range(len(test_same_elec)): for i in range(len(test_same_elec)):
self.dump('Error: An electrode index A == B detected at line ' + str(test_same_elec[i]+1), self.log_msg('Error: An electrode index A == B detected at line ' + str(test_same_elec[i] + 1),
level="error") level="error")
# sys.exit(1) # sys.exit(1)
output = None output = None
if output is not None: if output is not None:
self.dump('Sequence of {:d} quadrupoles read.'.format(output.shape[0]), level='debug') self.log_msg('Sequence of {:d} quadrupoles read.'.format(output.shape[0]), level='debug')
self.sequence = output self.sequence = output
...@@ -328,9 +333,9 @@ class OhmPi(object): ...@@ -328,9 +333,9 @@ class OhmPi(object):
else: else:
mcp2.get_pin(relay_nr-1).value = False mcp2.get_pin(relay_nr-1).value = False
self.dump(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}', level='debug') self.log_msg(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}', level='debug')
else: else:
self.dump(f'Unable to address electrode nr {electrode_nr}', level='warn') self.log_msg(f'Unable to address electrode nr {electrode_nr}', level='warn')
def switch_mux_on(self, quadrupole): def switch_mux_on(self, quadrupole):
"""Switch on multiplexer relays for given quadrupole. """Switch on multiplexer relays for given quadrupole.
...@@ -346,7 +351,7 @@ class OhmPi(object): ...@@ -346,7 +351,7 @@ class OhmPi(object):
for i in range(0, 4): for i in range(0, 4):
self.switch_mux(quadrupole[i], 'on', roles[i]) self.switch_mux(quadrupole[i], 'on', roles[i])
else: else:
self.dump('A == B -> short circuit detected!', level='error') self.log_msg('A == B -> short circuit detected!', level='error')
def switch_mux_off(self, quadrupole): def switch_mux_off(self, quadrupole):
"""Switch off multiplexer relays for given quadrupole. """Switch off multiplexer relays for given quadrupole.
...@@ -366,7 +371,7 @@ class OhmPi(object): ...@@ -366,7 +371,7 @@ class OhmPi(object):
for i in range(0, 4): for i in range(0, 4):
for j in range(1, self.max_elec + 1): for j in range(1, self.max_elec + 1):
self.switch_mux(j, 'off', roles[i]) self.switch_mux(j, 'off', roles[i])
self.dump('All MUX switched off.', level='debug') self.log_msg('All MUX switched off.', level='debug')
def run_measurement(self, quad, nb_stack=None, injection_duration=None): # NOTE: quad not used?! def run_measurement(self, quad, nb_stack=None, injection_duration=None): # NOTE: quad not used?!
""" Do a 4 electrode measurement and measure transfer resistance obtained. """ Do a 4 electrode measurement and measure transfer resistance obtained.
...@@ -478,7 +483,7 @@ class OhmPi(object): ...@@ -478,7 +483,7 @@ class OhmPi(object):
val = d[k] val = d[k]
output += f'{val}\t' output += f'{val}\t'
output = output[:-1] output = output[:-1]
self.dump(output, level='debug') self.log_msg(output, level='debug')
time.sleep(1) # NOTE: why this? time.sleep(1) # NOTE: why this?
return d return d
...@@ -504,7 +509,7 @@ class OhmPi(object): ...@@ -504,7 +509,7 @@ class OhmPi(object):
self.sequence = quads self.sequence = quads
# run the RS check # run the RS check
self.dump('RS check (check contact resistance)', level='debug') self.log_msg('RS check (check contact resistance)', level='debug')
self.measure() self.measure()
# restore # restore
...@@ -545,19 +550,19 @@ class OhmPi(object): ...@@ -545,19 +550,19 @@ class OhmPi(object):
""" """
self.run = True self.run = True
self.status = 'running' self.status = 'running'
self.dump('status = ' + self.status, level='debug') self.log_msg('status = ' + self.status, level='debug')
def func(): def func():
for g in range(0, self.pardict["nbr_meas"]): # for time-lapse monitoring for g in range(0, self.pardict["nbr_meas"]): # for time-lapse monitoring
if self.run is False: if self.run is False:
self.dump('INTERRUPTED', level='debug') self.log_msg('INTERRUPTED', level='debug')
break break
t0 = time.time() t0 = time.time()
# create filename with timestamp # create filename with timestamp
fname = self.pardict["export_path"].replace('.csv', '_' + datetime.now().strftime('%Y%m%dT%H%M%S') fname = self.pardict["export_path"].replace('.csv', '_' + datetime.now().strftime('%Y%m%dT%H%M%S')
+ '.csv') + '.csv')
self.dump('saving to ' + fname, level='debug') self.log_msg('saving to ' + fname, level='debug')
# make sure all multiplexer are off # make sure all multiplexer are off
self.reset_mux() self.reset_mux()
...@@ -586,7 +591,7 @@ class OhmPi(object): ...@@ -586,7 +591,7 @@ class OhmPi(object):
# save data and print in a text file # save data and print in a text file
self.append_and_save(fname, current_measurement) self.append_and_save(fname, current_measurement)
self.dump('{:d}/{:d}'.format(i+1, self.sequence.shape[0]), level='debug') self.log_msg('{:d}/{:d}'.format(i + 1, self.sequence.shape[0]), level='debug')
# compute time needed to take measurement and subtract it from interval # compute time needed to take measurement and subtract it from interval
# between two sequence run (= sequence_delay) # between two sequence run (= sequence_delay)
...@@ -596,8 +601,8 @@ class OhmPi(object): ...@@ -596,8 +601,8 @@ class OhmPi(object):
if sleep_time < 0: if sleep_time < 0:
# it means that the measuring time took longer than the sequence delay # it means that the measuring time took longer than the sequence delay
sleep_time = 0 sleep_time = 0
self.dump('The measuring time is longer than the sequence delay. Increase the sequence delay', self.log_msg('The measuring time is longer than the sequence delay. Increase the sequence delay',
level='warn') level='warn')
# sleeping time between sequence # sleeping time between sequence
if self.pardict["nbr_meas"] > 1: if self.pardict["nbr_meas"] > 1:
...@@ -612,7 +617,7 @@ class OhmPi(object): ...@@ -612,7 +617,7 @@ class OhmPi(object):
self.run = False self.run = False
if self.thread is not None: if self.thread is not None:
self.thread.join() self.thread.join()
self.dump('status = ' + self.status) self.log_msg('status = ' + self.status)
# for testing # for testing
......
from paho import mqtt
# OhmPi configuration # OhmPi configuration
OHMPI_CONFIG = { OHMPI_CONFIG = {
'id': '0001', # Unique identifier of the OhmPi board (string) 'id': '0001', # Unique identifier of the OhmPi board (string)
...@@ -34,9 +36,20 @@ DATA_LOGGING_CONFIG = { ...@@ -34,9 +36,20 @@ DATA_LOGGING_CONFIG = {
} }
# MQTT configuration parameters # MQTT configuration parameters
MQTT_CONFIG = { MQTT_LOGGING_CONFIG = {
'mqtt_broker': 'mg3d-dev.umons.ac.be', 'broker': 'ohmpy.umons.ac.be',
'port': 1883,
'qos': 0,
'retain': False,
'keepalive': 60,
'will': None,
'auth': None,
'tls':None,
'protocol': mqtt.MQTTv31,
'transport': 'tcp',
'client_id': f'ohmpi_sn_{OHMPI_CONFIG["id"]}', 'client_id': f'ohmpi_sn_{OHMPI_CONFIG["id"]}',
'control_topic': f'cmd_ohmpi_sn_{OHMPI_CONFIG["id"]}', 'control_topic': f'cmd_ohmpi_sn_{OHMPI_CONFIG["id"]}',
'measurements_topic': f'measurements_ohmpi_sn_{OHMPI_CONFIG["id"]}' 'msg_topic': f'msg_ohmpi_sn_{OHMPI_CONFIG["id"]}',
'data_topic': f'data_ohmpi_sn_{OHMPI_CONFIG["id"]}',
'soh_topic': f'soh_ohmpi_sn_{OHMPI_CONFIG["id"]}'
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment