Commit b141e56c authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Fixes PEP8; Prepares class for logging mechanisms

Showing with 150 additions and 115 deletions
+150 -115
# -*- coding: utf-8 -*-
"""
created on January 6, 2020
Update February 2022
Ohmpi.py is a program to control a low-cost and open hardward resistivity meter OhmPi that has been developed by Rémi CLEMENT (INRAE),Vivien DUBOIS (INRAE),Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR)
and Guillaume BLANCHY (ILVO).
created on January 6, 2020.
Update March 2022
Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed by
Rémi CLEMENT (INRAE),Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR)
Olivier KAUFMANN (UMONS) and Guillaume BLANCHY (ILVO).
"""
VERSION = '2.0.0'
print('\033[1m'+'\033[31m'+' ________________________________')
print('| _ | | | || \/ || ___ \_ _|')
print('| | | | |_| || . . || |_/ / | |' )
print('| | | | _ || |\/| || __/ | |')
print('\ \_/ / | | || | | || | _| |_')
print(' \___/\_| |_/\_| |_/\_| \___/ ')
print('\033[0m')
print('OhmPi start' )
print('Version:', VERSION)
print('Import libraries')
import os
import sys
import io
import json
import glob
import numpy as np
import pandas as pd
import csv
import time
from datetime import datetime
from termcolor import colored
import threading
# finish import (done only when class is instantiated as some libs are
# only available on arm64 platform)
try:
import board
import busio
import adafruit_tca9548a
import adafruit_ads1x15.ads1115 as ads
from adafruit_ads1x15.analog_in import AnalogIn
from adafruit_mcp230xx.mcp23008 import MCP23008
from adafruit_mcp230xx.mcp23017 import MCP23017
import digitalio
from digitalio import Direction
from gpiozero import CPUTemperature
arm64_imports = True
except Exception as e:
print(f'Warning: {e}')
arm64_imports = False
VERSION = '2.0.1'
print('\033[1m'+'\033[31m'+' ________________________________')
print(r'| _ | | | || \/ || ___ \_ _|')
print(r'| | | | |_| || . . || |_/ / | |')
print(r'| | | | _ || |\/| || __/ | |')
print(r'\ \_/ / | | || | | || | _| |_')
print(r' \___/\_| |_/\_| |_/\_| \___/ ')
print('\033[0m')
print('OhmPi start')
print('Version:', VERSION)
print('Import libraries')
current_time = datetime.now()
print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
......@@ -51,34 +70,27 @@ class OhmPi(object):
sequence : str, optional
Path to the .txt where the sequence is read. By default, a 1 quadrupole
sequence: 1, 2, 3, 4 is used.
onpi : bool, optional
on_pi : bool, optional
True if running on the RaspberryPi. False for testing (random data generated).
output : str, optional
Either 'print' for a console output or 'mqtt' for publication onto
MQTT broker.
"""
def __init__(self, config=None, sequence=None, onpi=True, output='print'):
def __init__(self, config=None, sequence=None, output='print'):
# flags and attributes
self.onpi = onpi # True if run from the RaspberryPi with the hardware, otherwise False for random data
self.output = output # type of output print
# self.on_pi = on_pi # True if run from the RaspberryPi with the hardware, otherwise False for random data
self.output = output # type of output print
self.status = 'idle' # either running or idle
self.run = False # flag is True when measuring
self.thread = None # contains the handle for the thread taking the measurement
self.path = 'data/' # wher to save the .csv
# finish import (done only when class is instantiated as some libs are
# only available on arm64 platform)
if self.onpi:
import board, busio, adafruit_tca9548a
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
from adafruit_mcp230xx.mcp23008 import MCP23008
from adafruit_mcp230xx.mcp23017 import MCP23017
import digitalio
from digitalio import Direction
from gpiozero import CPUTemperature
# read in hardware parameters (seetings.py)
self.path = 'data/' # where to save the .csv
if not arm64_imports:
self.dump(f'Warning: {e}\n Some libraries only available on arm64 platform could not be imported.\n'
f'The Ohmpi class will fake operations for testing purposes.', 'warning')
# read in hardware parameters (settings.py)
self._read_hardware_parameters()
# default acquisition parameters
......@@ -111,7 +123,7 @@ class OhmPi(object):
}
# connect to components on the OhmPi board
if self.onpi:
if self.on_pi:
# activation of I2C protocol
self.i2c = busio.I2C(board.SCL, board.SDA)
......@@ -119,11 +131,10 @@ class OhmPi(object):
self.mcp = MCP23008(self.i2c, address=0x20)
# ADS1115 for current measurement (AB)
self.ads_current = ADS.ADS1115(self.i2c, gain=16, data_rate=860, address=0x48)
self.ads_current = ads.ADS1115(self.i2c, gain=16, data_rate=860, address=0x48)
# ADS1115 for voltage measurement (MN)
self.ads_voltage = ADS.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49)
def dump(self, msg, level='debug'):
"""Function for output management.
......@@ -135,8 +146,7 @@ class OhmPi(object):
level : str, optional
Level of the message, either: 'error', 'warn', 'debug'
"""
# TODO all message to be logged using python logging library and rotatin log
# TODO all message to be logged using python logging library and rotating log
if self.output == 'print':
if level == 'error':
......@@ -150,7 +160,6 @@ class OhmPi(object):
# TODO mqtt transmission here
pass
def _read_acquisition_parameters(self, config):
"""Read acquisition parameters.
Parameters can be:
......@@ -164,7 +173,7 @@ class OhmPi(object):
Parameters
----------
config : str
Path to the .json or dictionnary.
Path to the .json or dictionary.
"""
if isinstance(config, dict):
self.pardict.update(config)
......@@ -174,26 +183,25 @@ class OhmPi(object):
self.pardict.update(dic)
self.dump('Acquisition parameters updated: ' + str(self.pardict), level='debug')
def _read_hardware_parameters(self):
"""Read hardware parameters from settings.py.
"""
from settings import OHMPI_CONFIG
self.id = OHMPI_CONFIG['id'] # ID of the OhmPi
self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm
self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm
self.Imax = OHMPI_CONFIG['Imax'] # maximum current
self.dump('The maximum current cannot be higher than 48 mA', level='warn')
self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ADS.P2, measurement in V/V
self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ADS.P3, measurement in V/V
self.dump(f'The maximum current cannot be higher than {self.Imax} mA', level='warn')
self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V
self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ads.P3, measurement in V/V
self.offset_p2 = OHMPI_CONFIG['offset_p2']
self.offset_p3 = OHMPI_CONFIG['offset_p3']
self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack
self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack
self.version = OHMPI_CONFIG['version'] # hardware version
self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes
self.dump('OHMPI_CONFIG = ' + str(OHMPI_CONFIG), level='debug')
def find_identical_in_line(self, quads):
@staticmethod
def find_identical_in_line(quads):
"""Find quadrupole which where A and B are identical.
If A and B are connected to the same relay, the Pi burns (short-circuit).
......@@ -231,6 +239,17 @@ class OhmPi(object):
# output.append(i)
return output
@property
def on_pi(self):
"""Returns True if code is running on a raspberry pi and required arm64 libs have been imported"""
running_on_pi = False
try:
with io.open('/sys/firmware/devicetree/base/model', 'r') as f:
if 'raspberry pi' in f.read().lower():
running_on_pi = True
except Exception as e:
print(e)
return running_on_pi and arm64_imports
def read_quad(self, filename):
"""Read quadrupole sequence from file.
......@@ -246,7 +265,7 @@ class OhmPi(object):
output : numpy.array
Array of shape (number quadrupoles * 4).
"""
output = np.loadtxt(filename, delimiter=" ", dtype=int) # load quadripole file
output = np.loadtxt(filename, delimiter=" ", dtype=int) # load quadrupole file
# locate lines where the electrode index exceeds the maximum number of electrodes
test_index_elec = np.array(np.where(output > self.max_elec))
......@@ -256,14 +275,16 @@ class OhmPi(object):
# if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2)
if test_index_elec.size != 0:
for i in range(len(test_index_elec[0,:])):
self.dump("Error: An electrode index at line " + str(test_index_elec[0,i]+1) + " exceeds the maximum number of electrodes", level="error")
#sys.exit(1)
for i in range(len(test_index_elec[0, :])):
self.dump('Error: An electrode index at line ' + str(test_index_elec[0, i]+1) +
' exceeds the maximum number of electrodes', level='error')
# sys.exit(1)
output = None
elif len(test_same_elec) != 0:
for i in range(len(test_same_elec)):
self.dump("Error: An electrode index A == B detected at line " + str(test_same_elec[i]+1), level="error")
#sys.exit(1)
self.dump('Error: An electrode index A == B detected at line ' + str(test_same_elec[i]+1),
level="error")
# sys.exit(1)
output = None
if output is not None:
......@@ -271,7 +292,6 @@ class OhmPi(object):
self.sequence = output
def switch_mux(self, electrode_nr, state, role):
"""Select the right channel for the multiplexer cascade for a given electrode.
......@@ -290,14 +310,13 @@ class OhmPi(object):
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role])
# find I2C addres of the electrode and corresponding relay
# find I2C address of the electrode and corresponding relay
# TODO from number of electrode, the below can be guessed
i2c_address = None
# considering that one MCP23017 can cover 16 electrodes
electrode_nr = electrode_nr - 1 # switch to 0 indexing
i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division
electrode_nr = electrode_nr - 1 # switch to 0 indexing
i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16)*16
relay_nr = relay_nr + 1 # switch back to 1 based indexing
relay_nr = relay_nr + 1 # switch back to 1 based indexing
# if electrode_nr < 17:
# i2c_address = 7
......@@ -326,7 +345,6 @@ class OhmPi(object):
else:
self.dump(f'Unable to address electrode nr {electrode_nr}', level='warn')
def switch_mux_on(self, quadrupole):
"""Switch on multiplexer relays for given quadrupole.
......@@ -343,7 +361,6 @@ class OhmPi(object):
else:
self.dump('A == B -> short circuit detected!', level='error')
def switch_mux_off(self, quadrupole):
"""Switch off multiplexer relays for given quadrupole.
......@@ -356,7 +373,6 @@ class OhmPi(object):
for i in range(0, 4):
self.switch_mux(quadrupole[i], 'off', roles[i])
def reset_mux(self):
"""Switch off all multiplexer relays."""
roles = ['A', 'B', 'M', 'N']
......@@ -364,16 +380,15 @@ class OhmPi(object):
for j in range(1, self.max_elec + 1):
self.switch_mux(j, 'off', roles[i])
self.dump('All MUX switched off.', level='debug')
def run_measurement(self, quad, nb_stack=None, injection_duration=None):
"""Do a 4 electrode measurement and measure transfer resistance obtained.
def run_measurement(self, quad, nb_stack=None, injection_duration=None): # NOTE: quad not used?!
""" Do a 4 electrode measurement and measure transfer resistance obtained.
Parameters
----------
nb_stack : int, optional
Number of stacks.
injection_detlat : int, optional
injection_duration : int, optional
Injection time in seconds.
quad : list of int
Quadrupole to measure.
......@@ -406,7 +421,7 @@ class OhmPi(object):
# current injection
if (n % 2) == 0:
pin1.value = True
pin0.value = False # current injection polarity nr1
pin0.value = False # current injection polarity nr1
else:
pin0.value = True
pin1.value = False # current injection nr2
......@@ -417,9 +432,11 @@ class OhmPi(object):
# sampling for each stack at the end of the injection
meas = np.zeros((self.nb_samples, 3))
for k in range(0, self.nb_samples):
meas[k, 0] = (AnalogIn(self.ads_current, ADS.P0).voltage*1000) / (50 * self.r_shunt) # reading current value on ADS channel A0
meas[k, 1] = AnalogIn(self.ads_voltage, ADS.P0).voltage * self.coef_p2 * 1000
meas[k, 2] = AnalogIn(self.ads_voltage, ADS.P1).voltage * self.coef_p3 * 1000 # reading voltage value on ADS channel A2
# reading current value on ADS channel A0
meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage*1000) / (50 * self.r_shunt)
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * self.coef_p2 * 1000
# reading voltage value on ADS channel A2
meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000
# stop current injection
pin1.value = False
......@@ -441,18 +458,18 @@ class OhmPi(object):
end_calc = time.time()
# TODO I am not sure I undestand the computation below
# TODO I am not sure I understand the computation below
# wait twice the actual injection time between two injection
# so it's a 50% duty cycle right?
time.sleep(2*(end_delay-start_delay)-(end_calc-start_delay))
# create dateframe and compute averaged values from all stacks
df = pd.DataFrame({
# create a dictionary and compute averaged values from all stacks
d = {
"time": [datetime.now()],
"A": [(1)],
"B": [(2)],
"M": [(3)],
"N": [(4)],
"A": [1],
"B": [2],
"M": [3],
"N": [4],
"inj time [ms]": (end_delay - start_delay) * 1000,
"Vmn [mV]": [(sum_vmn / (3 + 2 * nb_stack - 1))],
"I [mA]": [(injection_current / (3 + 2 * nb_stack - 1))],
......@@ -462,15 +479,22 @@ class OhmPi(object):
"CPU temp [degC]": [CPUTemperature().temperature],
"Time [s]": [(-start_time + time.time())],
"Nb samples [-]": [self.nb_samples]
})
}
# round number to two decimal for nicer string output
output = df.round(2)
self.dump(output.to_string(), level='debug')
time.sleep(1) # TODO why this?
return df
output = [f'{d[k]}\t' for k in d.keys]
output = output[:-1] + '\n'
for k in d.keys:
if isinstance(d[k], float):
val = np.round(d[k], 2)
else:
val = d[k]
output += f'{val}\t'
output = output[:-1]
self.dump(output, level='debug')
time.sleep(1) # NOTE: why this?
return d
def rs_check(self):
"""Check contact resistance.
......@@ -502,27 +526,33 @@ class OhmPi(object):
# TODO if interrupted, we would need to restore the values
# TODO or we offer the possiblity in 'run_measurement' to have rs_check each time?
def append_and_save(self, fname, last_measurement):
@staticmethod
def append_and_save(fname, last_measurement):
"""Append and save last measurement dataframe.
Parameters
----------
last_measurement : pandas.DataFrame
Last measurement taken in the form of a pandas dataframe.
fname : str
filename to save the last measurement dataframe
last_measurement : dict
Last measurement taken in the form of a python dictionary
"""
if os.path.isfile(fname):
# Load data file and append data to it
with open(fname, 'a') as f:
last_measurement.to_csv(f, header=False)
w = csv.DictWriter(f, last_measurement.keys())
w.writerow(last_measurement)
# last_measurement.to_csv(f, header=False)
else:
# create data file and add headers
with open(fname, 'a') as f:
last_measurement.to_csv(f, header=True)
w = csv.DictWriter(f, last_measurement.keys())
w.writeheader()
w.writerow(last_measurement)
# last_measurement.to_csv(f, header=True)
def measure(self):
"""Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'.
"""
......@@ -531,14 +561,15 @@ class OhmPi(object):
self.dump('status = ' + self.status, level='debug')
def func():
for g in range(0, self.pardict["nbr_meas"]): # for time-lapse monitoring
if self.run == False:
for g in range(0, self.pardict["nbr_meas"]): # for time-lapse monitoring
if self.run is False:
self.dump('INTERRUPTED', level='debug')
break
t0 = time.time()
# create filename with timestamp
fname = self.pardict["export_path"].replace('.csv', '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '.csv')
fname = self.pardict["export_path"].replace('.csv', '_' + datetime.now().strftime('%Y%m%dT%H%M%S')
+ '.csv')
self.dump('saving to ' + fname, level='debug')
# make sure all multiplexer are off
......@@ -547,19 +578,21 @@ class OhmPi(object):
# measure all quadrupole of the sequence
for i in range(0, self.sequence.shape[0]):
quad = self.sequence[i, :] # quadrupole
if self.run == False:
if self.run is False:
break
# call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad)
# run a measurement
if self.onpi:
current_measurement = self.run_measurement(quad, self.pardict["stack"], self.pardict["injection_duration"])
if self.on_pi:
current_measurement = self.run_measurement(quad, self.pardict["stack"],
self.pardict["injection_duration"])
else: # for testing, generate random data
current_measurement = pd.DataFrame({
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], 'R [ohm]': np.abs(np.random.randn(1))
})
current_measurement = {
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
'R [ohm]': np.abs(np.random.randn(1))
}
# switch mux off
self.switch_mux_off(quad)
......@@ -576,11 +609,12 @@ class OhmPi(object):
if sleep_time < 0:
# it means that the measuring time took longer than the sequence delay
sleep_time = 0
self.dump('The measuring time is longer than the sequence delay. Increase the sequence delay', level='warn')
self.dump('The measuring time is longer than the sequence delay. Increase the sequence delay',
level='warn')
# sleeping time between sequence
if self.pardict["nbr_meas"] > 1:
time.sleep(sleep_time) # waiting for next measurement (time-lapse)
time.sleep(sleep_time) # waiting for next measurement (time-lapse)
self.status = 'idle'
self.thread = threading.Thread(target=func)
self.thread.start()
......@@ -593,9 +627,10 @@ class OhmPi(object):
self.thread.join()
self.dump('status = ' + self.status)
# test
#ohmpi = OhmPi(config='ohmpi_param.json')
#ohmpi.measure()
#time.sleep(4)
#ohmpi.stop()
# for testing
if __name__ == "__main__":
ohmpi = OhmPi(config='ohmpi_param.json')
ohmpi.measure()
time.sleep(4)
ohmpi.stop()
#!/bin/bash
sudo apt-get install -y libatlas-base-dev
python3 -m venv ohmpy
source ohmpy/bin/activate
source ohmpy/bin/activate || exit 1 # NOTE: Added || exit to avoid installing requirements in system python
export CFLAGS=-fcommon
pip install -r requirements.txt
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment