Commit a2491be0 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Adds MQTT code + settings and local logging

Showing with 1386 additions and 2 deletions
+1386 -2
"""
created on december, 2021
Update january 2022
Ohmpi_4elec.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed
by Rémi CLEMENT(INRAE),Vivien DUBOIS(INRAE),Hélène GUYARD(IGE), Nicolas FORQUET (INRAE),
Oliver KAUFMANN (UMONS) and Yannick FARGIER (IFSTTAR).
"""
from settings import OHMPI_CONFIG
try:
import board, busio
import adafruit_tca9548a
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
from adafruit_mcp230xx.mcp23008 import MCP23008
from adafruit_mcp230xx.mcp23017 import MCP23017
import digitalio
from digitalio import Direction
from gpiozero import CPUTemperature
except:
pass
from pandas import DataFrame
from datetime import datetime
import time
import numpy as np
import sys
import json
# import glob
from os import path, statvfs
from threading import Thread
from logging_setup import setup_loggers
from mqtt_setup import mqtt_client_setup
# Initialization
version = "1.01"
print('\033[1m'+'\033[31m'+' ________________________________')
print('| _ | | | || \/ || ___ \_ _|')
print('| | | | |_| || . . || |_/ / | |' )
print('| | | | _ || |\/| || __/ | |')
print('\ \_/ / | | || | | || | _| |_')
print(' \___/\_| |_/\_| |_/\_| \___/ ')
print('\033[0m')
print('OhmPi 4 elec MQTT start' )
print(f'Vers: {version}')
msg_logger, msg_log_filename, data_logger, data_log_filename, logging_level = setup_loggers()
mqtt_client, measurement_topic = mqtt_client_setup()
msg_logger.info(f'publishing mqtt to topic {measurement_topic}')
# Remaining initialization
status = True
"""
hardware parameters
"""
print(f'\033[1m \033[31m The maximum current cannot be higher than {OHMPI_CONFIG["Imax"]} mA \033[0m')
# offset_p2= 0
# offset_p3= 0
integer = 2 # Max value 10 TODO: explain this
nb_elec = 4 # TODO: Improve this
meas = np.zeros((3, integer))
"""
import parameters
"""
with open('ohmpi_param.json') as json_file:
pardict = json.load(json_file)
i2c = busio.I2C(board.SCL, board.SDA) #activation du protocle I2C
mcp = MCP23008(i2c, address=0x20) #connexion I2C MCP23008, injection de courant
ads_current = ADS.ADS1115(i2c, gain=2/3,data_rate=860, address=0X48)# connexion ADS1115, pour la mesure de courant
ads_voltage = ADS.ADS1115(i2c, gain=2/3,data_rate=860, address=0X49)# connexion ADS1115, pour la mesure de voltage
#initialisation des voies pour la polarité
pin0 = mcp.get_pin(0)
pin0.direction = Direction.OUTPUT
pin1 = mcp.get_pin(1)
pin1.direction = Direction.OUTPUT
pin0.value = False
pin1.value = False
def switch_mux(electrode_nr, state, role):
"""select the right channel for the multiplexer cascade for a given electrode"""
board_address = {'A': 0x76, 'B': 0X71, 'M': 0x74, 'N': 0x70}
tca = adafruit_tca9548a.TCA9548A(i2c, board_address[role]) # choose MUX A B M or N
i2c_address = None
if electrode_nr < 17:
i2c_address = 7
relay_nr = electrode_nr
elif 16 < electrode_nr < 33:
i2c_address = 6
relay_nr = electrode_nr - 16
elif 32 < electrode_nr < 49:
i2c_address = 5
relay_nr = electrode_nr - 32
elif 48 < electrode_nr < 65:
i2c_address = 4
relay_nr = electrode_nr - 48
if i2c_address is not None:
mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr-1).direction=digitalio.Direction.OUTPUT
if state == 'on':
mcp2.get_pin(relay_nr-1).value = True
else:
mcp2.get_pin(relay_nr-1).value = False
msg_logger.debug(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}')
else:
msg_logger.warn(f'Unable to address electrode nr {electrode_nr}')
def switch_mux_on(quadrupole):
"""switch on multiplexer relays for quadrupole"""
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
switch_mux(quadrupole[i], 'on', roles[i])
def switch_mux_off(quadrupole):
"""switch off multiplexer relays for quadrupole"""
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
switch_mux(quadrupole[i], 'off', roles[i])
def reset_mux():
"""switch off all multiplexer relays"""
global nb_elec
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
for j in range(1, nb_elec + 1):
switch_mux(j, 'off', roles[i])
# function to find rows with identical values in different columns
def find_identical_in_line(array_object):
output = []
if array_object.ndim == 1:
temp = np.zeros(4)
for i in range(len(array_object)):
temp[i] = np.count_nonzero(array_object == array_object[i])
if any(temp > 1):
output.append(0)
else:
for i in range(len(array_object[:,1])):
temp = np.zeros(len(array_object[1,:]))
for j in range(len(array_object[1,:])):
temp[j] = np.count_nonzero(array_object[i,:] == array_object[i,j])
if any(temp > 1):
output.append(i)
return output
def read_quad(filename, nb_elec):
"""read quadripole file and apply tests"""
output = np.loadtxt(filename, delimiter=" ",dtype=int) # load quadripole file
# locate lines where the electrode index exceeds the maximum number of electrodes
test_index_elec = np.array(np.where(output > nb_elec))
# locate lines where an electrode is referred twice
test_same_elec = find_identical_in_line(output)
# if statement with exit cases (rajouter un else if pour le deuxième cas du ticket #2)
if test_index_elec.size != 0:
for i in range(len(test_index_elec[0,:])):
print("Error: An electrode index at line "+ str(test_index_elec[0,i]+1)+" exceeds the maximum number of electrodes")
sys.exit(1)
elif len(test_same_elec) != 0:
for i in range(len(test_same_elec)):
print("Error: An electrode index is used twice at line " + str(test_same_elec[i]+1))
sys.exit(1)
else:
return output
def run_measurement(nb_stack, injection_deltat, r_shunt, coefp2, coefp3):
start_time=time.time()
# inner variable initialization
injection_current=0
sum_vmn=0
sum_ps=0
# injection courant and measure
mcp = MCP23008(i2c, address=0x20)
pin0 = mcp.get_pin(0)
pin0.direction = Direction.OUTPUT
pin1 = mcp.get_pin(1)
pin1.direction = Direction.OUTPUT
pin0.value = False
pin1.value = False
for n in range(0, 3+2*nb_stack-1):
# current injection
if (n % 2) == 0:
pin1.value = True
pin0.value = False # current injection polarity nr1
else:
pin0.value = True
pin1.value = False # current injection nr2
start_delay=time.time() # stating measurement time
time.sleep(injection_deltat) # delay depending on current injection duration
# mesureament of i and u
for k in range(0, integer):
meas[0,k] = (AnalogIn(ads_current,ADS.P0).voltage*1000)/(50*r_shunt) # reading current value on ADS channel A0
meas[1,k] = AnalogIn(ads_voltage,ADS.P0).voltage*coefp2*1000
meas[2,k] = AnalogIn(ads_voltage,ADS.P1).voltage*coefp3*1000 # reading voltage value on ADS channel A2
# stop current injection
pin1.value = False
pin0.value = False
end_delay = time.time()
injection_current = injection_current + (np.mean(meas[0, :]))
vmn1 = ((np.mean(meas[1, :]))-(np.mean(meas[2, :])))
if (n % 2) == 0:
sum_vmn = sum_vmn - vmn1
sum_ps = sum_ps + vmn1
else:
sum_vmn = sum_vmn + vmn1
sum_ps = sum_ps + vmn1
cpu = CPUTemperature()
end_calc = time.time()
time.sleep(2*(end_delay-start_delay)-(end_calc-start_delay))
#end_sleep2=time.time()
#print(['sleep=',((end_sleep2-start_delay))])
#print(['true delta=',((end_delay-start_delay)-injection_deltat)])
#print(['time stop=',((2*(end_delay-start_delay)-(end_calc-start_delay)))])
# return averaged values
# cpu= CPUTemperature()
output = DataFrame({
"time": [datetime.now()],
"A": [(1)],
"B": [(2)],
"M": [(3)],
"N": [(4)],
"inj time [ms]": (end_delay - start_delay) * 1000,
"Vmn [mV]": [(sum_vmn / (3 + 2 * nb_stack - 1))],
"I [mA]": [(injection_current / (3 + 2 * nb_stack - 1))],
"R [ohm]": [(sum_vmn / (3 + 2 * nb_stack - 1) / (injection_current / (3 + 2 * nb_stack - 1)))],
"Ps [mV]": [(sum_ps / (3 + 2 * nb_stack - 1))],
"nbStack": [nb_stack],
"CPU temp [°C]": [cpu.temperature],
"Time [s]": [(-start_time + time.time())],
"Integer [-]": [integer]
# Dead time equivalent to the duration of the current injection pulse
})
output = output.round(2)
print(output.to_string())
time.sleep(1)
return output
def append_and_save(data_path, last_measurement):
"""Save data"""
if path.isfile(data_path):
# Load data file and append data to it
with open(data_path, 'a') as f:
last_measurement.to_csv(f, header=False)
else:
# create data file and add headers
with open(data_path, 'a') as f:
last_measurement.to_csv(f, header=True)
"""
Main loop
"""
for g in range(0, pardict.get("nbr_meas")): # for time-lapse monitoring
current_measurement = run_measurement(pardict.get("stack"), pardict.get("injection_duration"),
OHMPI_CONFIG['R_shunt'], OHMPI_CONFIG['coef_p2'], OHMPI_CONFIG['coef_p3'])
append_and_save(pardict.get("export_path"), current_measurement)
msg = f'Resitivity: {current_measurement.iloc[-1]["R [ohm]"]:.2f} ohm'
msg_logger.info(msg)
mqtt_client.publish(measurement_topic, msg)
time.sleep(pardict.get("sequence_delay")) # waiting next measurement (time-lapse)
File added
%% Cell type:markdown id:606c11c0-3a80-4138-ac50-1da0bd01bef8 tags:
# A small code to test MQTT interface for ohmpi
%% Cell type:code id:daf2041b-1df9-42de-a385-f450a826c96f tags:
``` python
import paho.mqtt.client as mqtt
import time
```
%% Cell type:code id:14c42035 tags:
``` python
client_id = 'ohmpi_console_sn_0001'
measurements_topic = 'measurements_ohmpi_sn_0001'
```
%% Cell type:code id:391c6373-f7db-485e-b3dd-b1e04a37473a tags:
``` python
broker_address="mg3d-dev.umons.ac.be"
```
%% Cell type:code id:8fc857ba-bbcf-4f99-a30f-84fd14ddb2d0 tags:
``` python
client = mqtt.Client(client_id, protocol=4) #create new instance
```
%% Cell type:code id:24926751-62c6-4833-8d68-99279192d4e0 tags:
``` python
def on_message(client, userdata, message):
m = str(message.payload.decode("utf-8"))
print(f'message received {m}')
print(f'topic: {message.topic}')
print(f'qos: {message.qos}')
print(f'retain flag: {message.retain}')
```
%% Cell type:code id:06e424ff-cd1d-4756-bf53-cfbca4628e73 tags:
``` python
client.connect(broker_address) #connect to broker
```
%% Output
0
%% Cell type:code id:f0b06a71-b0bf-4551-a044-94b1100a3a5d tags:
``` python
client.on_message = on_message
client.loop_start()
```
%% Cell type:code id:8e168d2d-25fa-49ac-9d03-dd0da5e61841 tags:
``` python
print("Subscribing to topic", measurements_topic)
client.subscribe(measurements_topic)
```
%% Output
Subscribing to topic measurements_ohmpi_sn_0001
(0, 1)
%% Cell type:code id:eaa7d034-5383-4ece-a824-f763ce214760 tags:
``` python
time.sleep(60)
```
%% Output
message received Resitivity: 215.22 ohm
topic: measurements_ohmpi_sn_0001
qos: 0
retain flag: 0
message received Resitivity: 214.94 ohm
topic: measurements_ohmpi_sn_0001
qos: 0
retain flag: 0
%% Cell type:code id:9bd768aa-c4d3-429e-b667-6e981bd28353 tags:
``` python
client.loop_stop()
```
%% Cell type:code id:f9a0333a-c1ec-42a8-bd24-3dc466c51bb4 tags:
``` python
```
This diff is collapsed.
import time
import os
import logging.handlers as handlers
import zipfile
class CompressedSizedTimedRotatingFileHandler(handlers.TimedRotatingFileHandler):
"""
Handler for logging to a set of files, which switches from one file
to the next when the current file reaches a certain size, or at certain
timed intervals
"""
def __init__(self, filename, max_bytes=0, backup_count=0, encoding=None,
delay=0, when='h', interval=1, utc=False, zip_mode=zipfile.ZIP_DEFLATED):
handlers.TimedRotatingFileHandler.__init__(self, filename=filename, when=when, interval=interval, utc=utc,
backupCount=backup_count, encoding=encoding, delay=delay)
self.maxBytes = max_bytes
self.zip_mode = zip_mode
def shouldRollover(self, record):
"""
Determine if rollover should occur.
Basically, see if the supplied record would cause the file to exceed
the size limit we have.
"""
if self.stream is None: # delay was set...
self.stream = self._open()
if self.maxBytes > 0: # are we rolling over?
msg = "%s\n" % self.format(record)
self.stream.seek(0, 2) # due to non-posix-compliant Windows feature
if self.stream.tell() + len(msg) >= self.maxBytes:
return True
t = int(time.time())
if t >= self.rolloverAt:
return True
return False
def find_last_rotated_file(self):
dir_name, base_name = os.path.split(self.baseFilename)
file_names = os.listdir(dir_name)
result = []
prefix = f'{base_name}.2' # we want to find a rotated file with eg filename.2017-12-12... name
for file_name in file_names:
if file_name.startswith(prefix) and not file_name.endswith('.zip'):
result.append(file_name)
result.sort()
return os.path.join(dir_name, result[0])
def doRollover(self):
super(CompressedSizedTimedRotatingFileHandler, self).doRollover()
dfn = self.find_last_rotated_file()
dfn_zipped = f'{dfn}.zip'
if os.path.exists(dfn_zipped):
os.remove(dfn_zipped)
with zipfile.ZipFile(dfn_zipped, 'w', self.zip_mode) as f:
f.write(dfn)
os.remove(dfn)
\ No newline at end of file
from settings import LOGGING_CONFIG, DATA_LOGGING_CONFIG
from os import path, mkdir, statvfs
from time import gmtime
import logging
from compressed_sized_timed_rotating_logger import CompressedSizedTimedRotatingFileHandler
def setup_loggers():
# Message logging setup
log_path = path.join(path.dirname(__file__), 'logs')
if not path.isdir(log_path):
mkdir(log_path)
msg_log_filename = path.join(log_path, 'msg_log')
msg_logger = logging.getLogger('msg_logger')
# Data logging setup
base_path = path.dirname(__file__)
data_path = path.join(base_path, 'data')
if not path.isdir(data_path):
mkdir(data_path)
data_log_filename = path.join(data_path, 'data_log')
data_logger = logging.getLogger('data_logger')
# Debug and logging
debug = LOGGING_CONFIG['debug_mode']
if debug:
logging_level = logging.DEBUG
else:
logging_level = logging.INFO
# Set message logging format and level
log_format = '%(asctime)-15s | %(process)d | %(levelname)s: %(message)s'
logging_to_console = LOGGING_CONFIG['logging_to_console']
msg_handler = CompressedSizedTimedRotatingFileHandler(msg_log_filename, max_bytes=LOGGING_CONFIG['max_bytes'],
backup_count=LOGGING_CONFIG['backup_count'],
when=LOGGING_CONFIG['when'],
interval=LOGGING_CONFIG['interval'])
msg_formatter = logging.Formatter(log_format)
msg_formatter.converter = gmtime
msg_formatter.datefmt = '%Y/%m/%d %H:%M:%S UTC'
msg_handler.setFormatter(msg_formatter)
msg_logger.addHandler(msg_handler)
msg_logger.setLevel(logging_level)
if logging_to_console:
msg_logger.addHandler(logging.StreamHandler())
# Set data logging level and handler
data_logger.setLevel(logging.INFO)
data_handler = CompressedSizedTimedRotatingFileHandler(data_log_filename,
max_bytes=DATA_LOGGING_CONFIG['max_bytes'],
backup_count=DATA_LOGGING_CONFIG['backup_count'],
when=DATA_LOGGING_CONFIG['when'],
interval=DATA_LOGGING_CONFIG['interval'])
data_logger.addHandler(data_handler)
if not init_logging(msg_logger, logging_level, log_path, data_log_filename):
print('ERROR: Could not initialize logging!')
return msg_logger, msg_log_filename, data_logger, data_log_filename, logging_level
def init_logging(msg_logger, logging_level, log_path, data_log_filename):
""" This is the init sequence for the logging system """
init_logging_status = True
msg_logger.info('')
msg_logger.info('****************************')
msg_logger.info('*** NEW SESSION STARTING ***')
msg_logger.info('****************************')
msg_logger.info('')
msg_logger.info('Logging level: %s' % logging_level)
try:
st = statvfs('.')
available_space = st.f_bavail * st.f_frsize / 1024 / 1024
msg_logger.info(f'Remaining disk space : {available_space:.1f} MB')
except Exception as e:
msg_logger.debug('Unable to get remaining disk space: {e}')
msg_logger.info('Saving data log to ' + data_log_filename)
msg_logger.info('OhmPi settings:')
# TODO Add OhmPi settings
msg_logger.info('')
msg_logger.info(f'init_logging_status: {init_logging_status}')
return init_logging_status
{ {
"nb_electrodes": 64, "nb_electrodes": 64,
"injection_duration":0.2, "injection_duration": 4,
"nbr_meas": 100, "nbr_meas": 100,
"sequence_delay": 1, "sequence_delay": 1,
"stack": 1, "stack": 1,
"export_path": "/home/pi/Documents/OhmPi/measurement.csv" "export_path": "./measurement.csv"
} }
settings.py 0 → 100644
# OhmPi configuration
OHMPI_CONFIG = {
'id': '0001', # Unique identifier of the OhmPi board (string)
'R_shunt': 2, # Shunt resistance in Ohms
'Imax': 4800/50/2, # Maximum current
'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V
'coef_p3': 2.50, # slope for current conversion for ADS.P3, measurement in V/V
'offset_p2': 0,
'offset_p3': 0,
'integer': 2, # Max value 10 WHAT IS THIS?
'version': 2
}
# local messages logging configuration
LOGGING_CONFIG = {
'debug_mode': False,
'logging_to_console': False,
'file_name': 'ohmpi_log',
'max_bytes': 262144,
'backup_count': 30,
'when': 'd',
'interval': 1
}
# local data logging configuration
DATA_LOGGING_CONFIG = {
'file_name': 'data_log',
'max_bytes': 16777216,
'backup_count': 1024,
'when': 'd',
'interval': 1
}
# MQTT configuration parameters
MQTT_CONFIG = {
'mqtt_broker': 'mg3d-dev.umons.ac.be',
'client_id': f'ohmpi_sn_{OHMPI_CONFIG["id"]}',
'control_topic': f'cmd_ohmpi_sn_{OHMPI_CONFIG["id"]}',
'measurements_topic': f'measurements_ohmpi_sn_{OHMPI_CONFIG["id"]}'
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment