From b141e56c1824aa8aa29e5acc953981aa66141bb9 Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Sat, 5 Mar 2022 12:32:15 +0100 Subject: [PATCH] Fixes PEP8; Prepares class for logging mechanisms --- Ohmpi.py | 263 +++++++++++++++++++++++++++++++------------------------ env.sh | 2 +- 2 files changed, 150 insertions(+), 115 deletions(-) diff --git a/Ohmpi.py b/Ohmpi.py index 81ca7339..81d1e296 100644 --- a/Ohmpi.py +++ b/Ohmpi.py @@ -1,35 +1,54 @@ # -*- coding: utf-8 -*- """ -created on January 6, 2020 -Update February 2022 -Ohmpi.py is a program to control a low-cost and open hardward resistivity meter OhmPi that has been developed by Rémi CLEMENT (INRAE),Vivien DUBOIS (INRAE),Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR) -and Guillaume BLANCHY (ILVO). +created on January 6, 2020. +Update March 2022 +Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed by +Rémi CLEMENT (INRAE),Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR) +Olivier KAUFMANN (UMONS) and Guillaume BLANCHY (ILVO). """ -VERSION = '2.0.0' - -print('\033[1m'+'\033[31m'+' ________________________________') -print('| _ | | | || \/ || ___ \_ _|') -print('| | | | |_| || . . || |_/ / | |' ) -print('| | | | _ || |\/| || __/ | |') -print('\ \_/ / | | || | | || | _| |_') -print(' \___/\_| |_/\_| |_/\_| \___/ ') -print('\033[0m') -print('OhmPi start' ) -print('Version:', VERSION) -print('Import libraries') - import os -import sys +import io import json -import glob import numpy as np -import pandas as pd +import csv import time from datetime import datetime from termcolor import colored import threading +# finish import (done only when class is instantiated as some libs are +# only available on arm64 platform) +try: + import board + import busio + import adafruit_tca9548a + import adafruit_ads1x15.ads1115 as ads + from adafruit_ads1x15.analog_in import AnalogIn + from adafruit_mcp230xx.mcp23008 import MCP23008 + from adafruit_mcp230xx.mcp23017 import MCP23017 + import digitalio + from digitalio import Direction + from gpiozero import CPUTemperature + arm64_imports = True +except Exception as e: + print(f'Warning: {e}') + arm64_imports = False + + +VERSION = '2.0.1' + +print('\033[1m'+'\033[31m'+' ________________________________') +print(r'| _ | | | || \/ || ___ \_ _|') +print(r'| | | | |_| || . . || |_/ / | |') +print(r'| | | | _ || |\/| || __/ | |') +print(r'\ \_/ / | | || | | || | _| |_') +print(r' \___/\_| |_/\_| |_/\_| \___/ ') +print('\033[0m') +print('OhmPi start') +print('Version:', VERSION) +print('Import libraries') + current_time = datetime.now() print(current_time.strftime("%Y-%m-%d %H:%M:%S")) @@ -51,34 +70,27 @@ class OhmPi(object): sequence : str, optional Path to the .txt where the sequence is read. By default, a 1 quadrupole sequence: 1, 2, 3, 4 is used. - onpi : bool, optional + on_pi : bool, optional True if running on the RaspberryPi. False for testing (random data generated). output : str, optional Either 'print' for a console output or 'mqtt' for publication onto MQTT broker. """ - def __init__(self, config=None, sequence=None, onpi=True, output='print'): + def __init__(self, config=None, sequence=None, output='print'): # flags and attributes - self.onpi = onpi # True if run from the RaspberryPi with the hardware, otherwise False for random data - self.output = output # type of output print + # self.on_pi = on_pi # True if run from the RaspberryPi with the hardware, otherwise False for random data + self.output = output # type of output print self.status = 'idle' # either running or idle self.run = False # flag is True when measuring self.thread = None # contains the handle for the thread taking the measurement - self.path = 'data/' # wher to save the .csv - - # finish import (done only when class is instantiated as some libs are - # only available on arm64 platform) - if self.onpi: - import board, busio, adafruit_tca9548a - import adafruit_ads1x15.ads1115 as ADS - from adafruit_ads1x15.analog_in import AnalogIn - from adafruit_mcp230xx.mcp23008 import MCP23008 - from adafruit_mcp230xx.mcp23017 import MCP23017 - import digitalio - from digitalio import Direction - from gpiozero import CPUTemperature - - # read in hardware parameters (seetings.py) + self.path = 'data/' # where to save the .csv + + if not arm64_imports: + self.dump(f'Warning: {e}\n Some libraries only available on arm64 platform could not be imported.\n' + f'The Ohmpi class will fake operations for testing purposes.', 'warning') + + + # read in hardware parameters (settings.py) self._read_hardware_parameters() # default acquisition parameters @@ -111,7 +123,7 @@ class OhmPi(object): } # connect to components on the OhmPi board - if self.onpi: + if self.on_pi: # activation of I2C protocol self.i2c = busio.I2C(board.SCL, board.SDA) @@ -119,11 +131,10 @@ class OhmPi(object): self.mcp = MCP23008(self.i2c, address=0x20) # ADS1115 for current measurement (AB) - self.ads_current = ADS.ADS1115(self.i2c, gain=16, data_rate=860, address=0x48) + self.ads_current = ads.ADS1115(self.i2c, gain=16, data_rate=860, address=0x48) # ADS1115 for voltage measurement (MN) - self.ads_voltage = ADS.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49) - + self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49) def dump(self, msg, level='debug'): """Function for output management. @@ -135,8 +146,7 @@ class OhmPi(object): level : str, optional Level of the message, either: 'error', 'warn', 'debug' """ - # TODO all message to be logged using python logging library and rotatin log - + # TODO all message to be logged using python logging library and rotating log if self.output == 'print': if level == 'error': @@ -150,7 +160,6 @@ class OhmPi(object): # TODO mqtt transmission here pass - def _read_acquisition_parameters(self, config): """Read acquisition parameters. Parameters can be: @@ -164,7 +173,7 @@ class OhmPi(object): Parameters ---------- config : str - Path to the .json or dictionnary. + Path to the .json or dictionary. """ if isinstance(config, dict): self.pardict.update(config) @@ -174,26 +183,25 @@ class OhmPi(object): self.pardict.update(dic) self.dump('Acquisition parameters updated: ' + str(self.pardict), level='debug') - def _read_hardware_parameters(self): """Read hardware parameters from settings.py. """ from settings import OHMPI_CONFIG self.id = OHMPI_CONFIG['id'] # ID of the OhmPi - self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm + self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm self.Imax = OHMPI_CONFIG['Imax'] # maximum current - self.dump('The maximum current cannot be higher than 48 mA', level='warn') - self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ADS.P2, measurement in V/V - self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ADS.P3, measurement in V/V + self.dump(f'The maximum current cannot be higher than {self.Imax} mA', level='warn') + self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V + self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ads.P3, measurement in V/V self.offset_p2 = OHMPI_CONFIG['offset_p2'] self.offset_p3 = OHMPI_CONFIG['offset_p3'] - self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack + self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack self.version = OHMPI_CONFIG['version'] # hardware version self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes self.dump('OHMPI_CONFIG = ' + str(OHMPI_CONFIG), level='debug') - - def find_identical_in_line(self, quads): + @staticmethod + def find_identical_in_line(quads): """Find quadrupole which where A and B are identical. If A and B are connected to the same relay, the Pi burns (short-circuit). @@ -231,6 +239,17 @@ class OhmPi(object): # output.append(i) return output + @property + def on_pi(self): + """Returns True if code is running on a raspberry pi and required arm64 libs have been imported""" + running_on_pi = False + try: + with io.open('/sys/firmware/devicetree/base/model', 'r') as f: + if 'raspberry pi' in f.read().lower(): + running_on_pi = True + except Exception as e: + print(e) + return running_on_pi and arm64_imports def read_quad(self, filename): """Read quadrupole sequence from file. @@ -246,7 +265,7 @@ class OhmPi(object): output : numpy.array Array of shape (number quadrupoles * 4). """ - output = np.loadtxt(filename, delimiter=" ", dtype=int) # load quadripole file + output = np.loadtxt(filename, delimiter=" ", dtype=int) # load quadrupole file # locate lines where the electrode index exceeds the maximum number of electrodes test_index_elec = np.array(np.where(output > self.max_elec)) @@ -256,14 +275,16 @@ class OhmPi(object): # if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2) if test_index_elec.size != 0: - for i in range(len(test_index_elec[0,:])): - self.dump("Error: An electrode index at line " + str(test_index_elec[0,i]+1) + " exceeds the maximum number of electrodes", level="error") - #sys.exit(1) + for i in range(len(test_index_elec[0, :])): + self.dump('Error: An electrode index at line ' + str(test_index_elec[0, i]+1) + + ' exceeds the maximum number of electrodes', level='error') + # sys.exit(1) output = None elif len(test_same_elec) != 0: for i in range(len(test_same_elec)): - self.dump("Error: An electrode index A == B detected at line " + str(test_same_elec[i]+1), level="error") - #sys.exit(1) + self.dump('Error: An electrode index A == B detected at line ' + str(test_same_elec[i]+1), + level="error") + # sys.exit(1) output = None if output is not None: @@ -271,7 +292,6 @@ class OhmPi(object): self.sequence = output - def switch_mux(self, electrode_nr, state, role): """Select the right channel for the multiplexer cascade for a given electrode. @@ -290,14 +310,13 @@ class OhmPi(object): # choose with MUX board tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role]) - # find I2C addres of the electrode and corresponding relay + # find I2C address of the electrode and corresponding relay # TODO from number of electrode, the below can be guessed - i2c_address = None # considering that one MCP23017 can cover 16 electrodes - electrode_nr = electrode_nr - 1 # switch to 0 indexing - i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division + electrode_nr = electrode_nr - 1 # switch to 0 indexing + i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division relay_nr = electrode_nr - (electrode_nr // 16)*16 - relay_nr = relay_nr + 1 # switch back to 1 based indexing + relay_nr = relay_nr + 1 # switch back to 1 based indexing # if electrode_nr < 17: # i2c_address = 7 @@ -326,7 +345,6 @@ class OhmPi(object): else: self.dump(f'Unable to address electrode nr {electrode_nr}', level='warn') - def switch_mux_on(self, quadrupole): """Switch on multiplexer relays for given quadrupole. @@ -343,7 +361,6 @@ class OhmPi(object): else: self.dump('A == B -> short circuit detected!', level='error') - def switch_mux_off(self, quadrupole): """Switch off multiplexer relays for given quadrupole. @@ -356,7 +373,6 @@ class OhmPi(object): for i in range(0, 4): self.switch_mux(quadrupole[i], 'off', roles[i]) - def reset_mux(self): """Switch off all multiplexer relays.""" roles = ['A', 'B', 'M', 'N'] @@ -364,16 +380,15 @@ class OhmPi(object): for j in range(1, self.max_elec + 1): self.switch_mux(j, 'off', roles[i]) self.dump('All MUX switched off.', level='debug') - - def run_measurement(self, quad, nb_stack=None, injection_duration=None): - """Do a 4 electrode measurement and measure transfer resistance obtained. + def run_measurement(self, quad, nb_stack=None, injection_duration=None): # NOTE: quad not used?! + """ Do a 4 electrode measurement and measure transfer resistance obtained. Parameters ---------- nb_stack : int, optional Number of stacks. - injection_detlat : int, optional + injection_duration : int, optional Injection time in seconds. quad : list of int Quadrupole to measure. @@ -406,7 +421,7 @@ class OhmPi(object): # current injection if (n % 2) == 0: pin1.value = True - pin0.value = False # current injection polarity nr1 + pin0.value = False # current injection polarity nr1 else: pin0.value = True pin1.value = False # current injection nr2 @@ -417,9 +432,11 @@ class OhmPi(object): # sampling for each stack at the end of the injection meas = np.zeros((self.nb_samples, 3)) for k in range(0, self.nb_samples): - meas[k, 0] = (AnalogIn(self.ads_current, ADS.P0).voltage*1000) / (50 * self.r_shunt) # reading current value on ADS channel A0 - meas[k, 1] = AnalogIn(self.ads_voltage, ADS.P0).voltage * self.coef_p2 * 1000 - meas[k, 2] = AnalogIn(self.ads_voltage, ADS.P1).voltage * self.coef_p3 * 1000 # reading voltage value on ADS channel A2 + # reading current value on ADS channel A0 + meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage*1000) / (50 * self.r_shunt) + meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * self.coef_p2 * 1000 + # reading voltage value on ADS channel A2 + meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000 # stop current injection pin1.value = False @@ -441,18 +458,18 @@ class OhmPi(object): end_calc = time.time() - # TODO I am not sure I undestand the computation below + # TODO I am not sure I understand the computation below # wait twice the actual injection time between two injection # so it's a 50% duty cycle right? time.sleep(2*(end_delay-start_delay)-(end_calc-start_delay)) - # create dateframe and compute averaged values from all stacks - df = pd.DataFrame({ + # create a dictionary and compute averaged values from all stacks + d = { "time": [datetime.now()], - "A": [(1)], - "B": [(2)], - "M": [(3)], - "N": [(4)], + "A": [1], + "B": [2], + "M": [3], + "N": [4], "inj time [ms]": (end_delay - start_delay) * 1000, "Vmn [mV]": [(sum_vmn / (3 + 2 * nb_stack - 1))], "I [mA]": [(injection_current / (3 + 2 * nb_stack - 1))], @@ -462,15 +479,22 @@ class OhmPi(object): "CPU temp [degC]": [CPUTemperature().temperature], "Time [s]": [(-start_time + time.time())], "Nb samples [-]": [self.nb_samples] - }) + } # round number to two decimal for nicer string output - output = df.round(2) - self.dump(output.to_string(), level='debug') - time.sleep(1) # TODO why this? - - return df + output = [f'{d[k]}\t' for k in d.keys] + output = output[:-1] + '\n' + for k in d.keys: + if isinstance(d[k], float): + val = np.round(d[k], 2) + else: + val = d[k] + output += f'{val}\t' + output = output[:-1] + self.dump(output, level='debug') + time.sleep(1) # NOTE: why this? + return d def rs_check(self): """Check contact resistance. @@ -502,27 +526,33 @@ class OhmPi(object): # TODO if interrupted, we would need to restore the values # TODO or we offer the possiblity in 'run_measurement' to have rs_check each time? - - def append_and_save(self, fname, last_measurement): + @staticmethod + def append_and_save(fname, last_measurement): """Append and save last measurement dataframe. Parameters ---------- - last_measurement : pandas.DataFrame - Last measurement taken in the form of a pandas dataframe. + fname : str + filename to save the last measurement dataframe + last_measurement : dict + Last measurement taken in the form of a python dictionary """ if os.path.isfile(fname): # Load data file and append data to it with open(fname, 'a') as f: - last_measurement.to_csv(f, header=False) + w = csv.DictWriter(f, last_measurement.keys()) + w.writerow(last_measurement) + # last_measurement.to_csv(f, header=False) else: # create data file and add headers with open(fname, 'a') as f: - last_measurement.to_csv(f, header=True) + w = csv.DictWriter(f, last_measurement.keys()) + w.writeheader() + w.writerow(last_measurement) + # last_measurement.to_csv(f, header=True) - def measure(self): """Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'. """ @@ -531,14 +561,15 @@ class OhmPi(object): self.dump('status = ' + self.status, level='debug') def func(): - for g in range(0, self.pardict["nbr_meas"]): # for time-lapse monitoring - if self.run == False: + for g in range(0, self.pardict["nbr_meas"]): # for time-lapse monitoring + if self.run is False: self.dump('INTERRUPTED', level='debug') break t0 = time.time() # create filename with timestamp - fname = self.pardict["export_path"].replace('.csv', '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '.csv') + fname = self.pardict["export_path"].replace('.csv', '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + + '.csv') self.dump('saving to ' + fname, level='debug') # make sure all multiplexer are off @@ -547,19 +578,21 @@ class OhmPi(object): # measure all quadrupole of the sequence for i in range(0, self.sequence.shape[0]): quad = self.sequence[i, :] # quadrupole - if self.run == False: + if self.run is False: break # call the switch_mux function to switch to the right electrodes self.switch_mux_on(quad) # run a measurement - if self.onpi: - current_measurement = self.run_measurement(quad, self.pardict["stack"], self.pardict["injection_duration"]) + if self.on_pi: + current_measurement = self.run_measurement(quad, self.pardict["stack"], + self.pardict["injection_duration"]) else: # for testing, generate random data - current_measurement = pd.DataFrame({ - 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], 'R [ohm]': np.abs(np.random.randn(1)) - }) + current_measurement = { + 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], + 'R [ohm]': np.abs(np.random.randn(1)) + } # switch mux off self.switch_mux_off(quad) @@ -576,11 +609,12 @@ class OhmPi(object): if sleep_time < 0: # it means that the measuring time took longer than the sequence delay sleep_time = 0 - self.dump('The measuring time is longer than the sequence delay. Increase the sequence delay', level='warn') + self.dump('The measuring time is longer than the sequence delay. Increase the sequence delay', + level='warn') # sleeping time between sequence if self.pardict["nbr_meas"] > 1: - time.sleep(sleep_time) # waiting for next measurement (time-lapse) + time.sleep(sleep_time) # waiting for next measurement (time-lapse) self.status = 'idle' self.thread = threading.Thread(target=func) self.thread.start() @@ -593,9 +627,10 @@ class OhmPi(object): self.thread.join() self.dump('status = ' + self.status) -# test -#ohmpi = OhmPi(config='ohmpi_param.json') -#ohmpi.measure() -#time.sleep(4) -#ohmpi.stop() +# for testing +if __name__ == "__main__": + ohmpi = OhmPi(config='ohmpi_param.json') + ohmpi.measure() + time.sleep(4) + ohmpi.stop() diff --git a/env.sh b/env.sh index 21e3ecaa..e79f68df 100644 --- a/env.sh +++ b/env.sh @@ -1,7 +1,7 @@ #!/bin/bash sudo apt-get install -y libatlas-base-dev python3 -m venv ohmpy -source ohmpy/bin/activate +source ohmpy/bin/activate || exit 1 # NOTE: Added || exit to avoid installing requirements in system python export CFLAGS=-fcommon pip install -r requirements.txt -- GitLab