Commit eeb82292 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Implements the generic _process_command method. Cleans up the code following PEP.

Showing with 219 additions and 179 deletions
+219 -179
......@@ -5,11 +5,10 @@ import zipfile
class CompressedSizedTimedRotatingFileHandler(handlers.TimedRotatingFileHandler):
"""
Handler for logging to a set of files, which switches from one file
"""Handler for logging to a set of files, which switches from one file
to the next when the current file reaches a certain size, or at certain
timed intervals
"""
timed intervals"""
def __init__(self, filename, max_bytes=0, backup_count=0, encoding=None,
delay=0, when='h', interval=1, utc=False, zip_mode=zipfile.ZIP_DEFLATED):
handlers.TimedRotatingFileHandler.__init__(self, filename=filename, when=when, interval=interval, utc=utc,
......@@ -18,11 +17,10 @@ class CompressedSizedTimedRotatingFileHandler(handlers.TimedRotatingFileHandler)
self.zip_mode = zip_mode
def shouldRollover(self, record):
"""
Determine if rollover should occur.
Basically, see if the supplied record would cause the file to exceed
the size limit we have.
"""
"""Determines if rollover should occur.
Basically, sees if the supplied record would cause the file to exceed
the size limit we have."""
if self.stream is None: # delay was set...
self.stream = self._open()
if self.maxBytes > 0: # are we rolling over?
......@@ -36,6 +34,8 @@ class CompressedSizedTimedRotatingFileHandler(handlers.TimedRotatingFileHandler)
return False
def find_last_rotated_file(self):
"""Looks for the last rotated file and returns it"""
dir_name, base_name = os.path.split(self.baseFilename)
file_names = os.listdir(dir_name)
result = []
......@@ -47,6 +47,8 @@ class CompressedSizedTimedRotatingFileHandler(handlers.TimedRotatingFileHandler)
return os.path.join(dir_name, result[0])
def doRollover(self):
"""Does the roll-over by compressing the current file then deleting the uncompressed file"""
super(CompressedSizedTimedRotatingFileHandler, self).doRollover()
dfn = self.find_last_rotated_file()
......
......@@ -2,8 +2,8 @@ import logging
from paho.mqtt.client import MQTTv31
mqtt_broker = 'localhost'
logging_suffix = '_interactive'
mqtt_broker = 'mg3d-dev.umons.ac.be' # 'localhost'
logging_suffix = ''
# OhmPi configuration
OHMPI_CONFIG = {
'id': '0001', # Unique identifier of the OhmPi board (string)
......@@ -21,13 +21,9 @@ OHMPI_CONFIG = {
'board_version': '22.10'
} # TODO: add a dictionary with INA models and associated gain values
# CONTROL_CONFIG = {
# 'tcp_port': 5555,
# 'interface': 'mqtt_interface.py' # 'http_interface'
# }
# Execution logging configuration
EXEC_LOGGING_CONFIG = {
'logging_level': logging.DEBUG,
'logging_level': logging.INFO,
'logging_to_console': True,
'file_name': f'exec{logging_suffix}.log',
'max_bytes': 262144,
......
......@@ -6,9 +6,11 @@ import logging
from mqtt_logger import MQTTHandler
from compressed_sized_timed_rotating_logger import CompressedSizedTimedRotatingFileHandler
import sys
from termcolor import colored
def setup_loggers(mqtt=True):
msg = ''
# Message logging setup
log_path = path.join(path.dirname(__file__), 'logs')
if not path.isdir(log_path):
......@@ -37,13 +39,12 @@ def setup_loggers(mqtt=True):
interval=EXEC_LOGGING_CONFIG['interval'])
exec_formatter = logging.Formatter(log_format)
exec_formatter.converter = gmtime
exec_formatter.datefmt = '%Y/%m/%d %H:%M:%S UTC'
exec_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC'
exec_handler.setFormatter(exec_formatter)
exec_logger.addHandler(exec_handler)
exec_logger.setLevel(EXEC_LOGGING_CONFIG['logging_level'])
if logging_to_console:
print(f'logging exec ? {logging_to_console}') # TODO: delete this line
console_exec_handler = logging.StreamHandler(sys.stdout)
console_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level'])
console_exec_handler.setFormatter(exec_formatter)
......@@ -52,14 +53,16 @@ def setup_loggers(mqtt=True):
if mqtt:
mqtt_settings = MQTT_LOGGING_CONFIG.copy()
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']]
mqtt_settings.update({'topic':MQTT_LOGGING_CONFIG['exec_topic']})
mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['exec_topic']})
# TODO: handle the case of MQTT broker down or temporarily unavailable
try:
mqtt_exec_handler = MQTTHandler(**mqtt_settings)
mqtt_exec_handler.setLevel(EXEC_LOGGING_CONFIG['logging_level'])
mqtt_exec_handler.setFormatter(exec_formatter)
exec_logger.addHandler(mqtt_exec_handler)
except:
msg+=colored(f"\n\u2611 Publishes execution as {MQTT_LOGGING_CONFIG['exec_topic']} topic on the {MQTT_LOGGING_CONFIG['hostname']} broker", 'blue')
except Exception as e:
msg += colored(f'\nWarning: Unable to connect to exec topic on broker\n{e}', 'yellow')
mqtt = False
# Set data logging format and level
......@@ -88,17 +91,22 @@ def setup_loggers(mqtt=True):
mqtt_settings = MQTT_LOGGING_CONFIG.copy()
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic']]
mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['data_topic']})
mqtt_data_handler = MQTTHandler(**mqtt_settings)
mqtt_data_handler.setLevel(DATA_LOGGING_CONFIG['logging_level'])
mqtt_data_handler.setFormatter(data_formatter)
data_logger.addHandler(mqtt_data_handler)
try:
mqtt_data_handler = MQTTHandler(**mqtt_settings)
mqtt_data_handler.setLevel(DATA_LOGGING_CONFIG['logging_level'])
mqtt_data_handler.setFormatter(data_formatter)
data_logger.addHandler(mqtt_data_handler)
msg += colored(f"\n\u2611 Publishes data as {MQTT_LOGGING_CONFIG['data_topic']} topic on the {MQTT_LOGGING_CONFIG['hostname']} broker", 'blue')
except Exception as e:
msg += colored(f'\nWarning: Unable to connect to data topic on broker\n{e}', 'yellow')
mqtt = False
try:
init_logging(exec_logger, data_logger, EXEC_LOGGING_CONFIG['logging_level'], log_path, data_log_filename)
except Exception as err:
print(f'ERROR: Could not initialize logging!\n{err}')
msg += colored(f'\n\u26A0 ERROR: Could not initialize logging!\n{err}', 'red')
finally:
return exec_logger, exec_log_filename, data_logger, data_log_filename, EXEC_LOGGING_CONFIG['logging_level']
return exec_logger, exec_log_filename, data_logger, data_log_filename, EXEC_LOGGING_CONFIG['logging_level'], msg
def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_log_filename):
......@@ -110,7 +118,7 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo
exec_logger.info('*** NEW SESSION STARTING ***')
exec_logger.info('****************************')
exec_logger.info('')
exec_logger.info('Logging level: %s' % exec_logging_level)
exec_logger.debug('Logging level: %s' % exec_logging_level)
try:
st = statvfs('.')
available_space = st.f_bavail * st.f_frsize / 1024 / 1024
......@@ -118,15 +126,14 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo
except Exception as e:
exec_logger.debug('Unable to get remaining disk space: {e}')
exec_logger.info('Saving data log to ' + data_log_filename)
exec_logger.info('OhmPi settings:')
# TODO Add OhmPi settings
config_dict = {'execution logging configuration': json.dumps(EXEC_LOGGING_CONFIG, indent=4),
'data logging configuration': json.dumps(DATA_LOGGING_CONFIG, indent=4),
'mqtt logging configuration': json.dumps(MQTT_LOGGING_CONFIG, indent=4),
'mqtt control configuration': json.dumps(MQTT_CONTROL_CONFIG, indent=4)}
for k, v in config_dict.items():
exec_logger.info(f'{k}:\n{v}')
exec_logger.info('')
exec_logger.info(f'init_logging_status: {init_logging_status}')
exec_logger.debug(f'{k}:\n{v}')
exec_logger.debug('')
if not init_logging_status:
exec_logger.warning(f'Logging initialisation has encountered a problem.')
data_logger.info('Starting_session')
return init_logging_status
......@@ -3,7 +3,7 @@
created on January 6, 2020.
Updates May 2022, Oct 2022.
Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed by
Rémi CLEMENT (INRAE),Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR)
Rémi CLEMENT (INRAE), Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR)
Olivier KAUFMANN (UMONS), Arnaud WATELET (UMONS) and Guillaume BLANCHY (ILVO).
"""
......@@ -21,7 +21,8 @@ from termcolor import colored
import threading
import paho.mqtt.client as mqtt_client
from logging_setup import setup_loggers
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG
from logging import DEBUG
# finish import (done only when class is instantiated as some libs are only available on arm64 platform)
try:
......@@ -36,10 +37,10 @@ try:
from digitalio import Direction # noqa
from gpiozero import CPUTemperature # noqa
import minimalmodbus
arm64_imports = True
except ImportError as error:
print(colored(f'Import error: {error}', 'yellow'))
if EXEC_LOGGING_CONFIG['logging_level'] == DEBUG:
print(colored(f'Import error: {error}', 'yellow'))
arm64_imports = False
except Exception as error:
print(colored(f'Unexpected error: {error}', 'red'))
......@@ -47,19 +48,28 @@ except Exception as error:
class OhmPi(object):
"""Create the main OhmPi object.
Parameters
----------
settings : str, optional
Path to the .json configuration file.
sequence : str, optional
Path to the .txt where the sequence is read. By default, a 1 quadrupole
sequence: 1, 2, 3, 4 is used.
""" OhmPi class.
"""
def __init__(self, settings=None, sequence=None, use_mux=False, mqtt=True, on_pi=None, idps=False):
# flags and attributes
"""Constructs the ohmpi object
Parameters
----------
settings:
sequence:
use_mux:
if True use the multiplexor to select active electrodes
mqtt: bool, defaut: True
if True publish on mqtt topics while logging, otherwise use other loggers only
on_pi: bool,None default: None
if None, the platform on which the class is instanciated is determined to set on_pi to either True or False
idps:
if true uses the DPS
"""
if on_pi is None:
_, on_pi = OhmPi._get_platform()
......@@ -70,21 +80,18 @@ class OhmPi(object):
self.thread = None # contains the handle for the thread taking the measurement
# set loggers
config_exec_logger, _, config_data_logger, _, _ = setup_loggers(mqtt=mqtt) # TODO: add SOH
config_exec_logger, _, config_data_logger, _, _, msg = setup_loggers(mqtt=mqtt) # TODO: add SOH
self.data_logger = config_data_logger
self.exec_logger = config_exec_logger
self.soh_logger = None
print('Loggers:')
print(colored(f'Exec logger {self.exec_logger.handlers if self.exec_logger is not None else "None"}', 'blue'))
print(colored(f'Data logger {self.data_logger.handlers if self.data_logger is not None else "None"}', 'blue'))
print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue'))
self.soh_logger = None # TODO: Implement the SOH logger
print(msg)
# set controller
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False)
trials = 0
trials_max = 10
broker_connected = False
self.exec_logger.debug(f"Connecting to control broker: {MQTT_CONTROL_CONFIG['hostname']}")
while trials < trials_max:
try:
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
......@@ -94,12 +101,21 @@ class OhmPi(object):
broker_connected = True
except Exception as e:
self.exec_logger.debug(f'Unable to connect control broker: {e}')
self.exec_logger.info('trying again to connect to control broker...')
self.exec_logger.debug('Retrying to connect control broker...')
time.sleep(2)
trials += 1
if broker_connected:
self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
self.exec_logger.debug(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
try:
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
msg = f"\u2611 Subscribed to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}" \
f" on {MQTT_CONTROL_CONFIG['hostname']} broker"
self.exec_logger.debug(msg)
print(colored(msg, 'blue'))
except Exception as e:
self.exec_logger.warning(f'Unable to subscribe to control topic : {e}')
self.controller = None
else:
self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
self.controller = None
......@@ -119,7 +135,6 @@ class OhmPi(object):
if settings is not None:
self.update_settings(settings)
print(self.settings)
self.exec_logger.debug('Initialized with settings:' + str(self.settings))
# read quadrupole sequence
......@@ -146,13 +161,13 @@ class OhmPi(object):
# current injection module
if self.idps:
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, slave address (in decimal)
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal)
self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc
self.DPS.serial.bytesize = 8 #
self.DPS.serial.timeout = 1 # greater than 0.5 for it to work
self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
self.DPS.serial.timeout = 1 # greater than 0.5 for it to work
self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays)
# (last number) 0 is for mA, 3 is for A
......@@ -165,9 +180,13 @@ class OhmPi(object):
self.pin1.value = False
# Starts the command processing thread
self.cmd_listen = True
self.cmd_thread = threading.Thread(target=self._control)
self.cmd_thread.start()
if self.controller is not None:
self.cmd_listen = True
self.cmd_thread = threading.Thread(target=self._control)
self.cmd_thread.start()
else:
self.exec_logger.warning('No connection to control broker.'
' Use python/ipython to interact with OhmPi object...')
@property
def sequence(self):
......@@ -187,6 +206,7 @@ class OhmPi(object):
self._sequence = sequence
def _control(self):
"""Gets commands from the controller(s) and execute them"""
def on_message(client, userdata, message):
command = message.payload.decode('utf-8')
self.exec_logger.debug(f'Received command {command}')
......@@ -195,7 +215,7 @@ class OhmPi(object):
self.controller.on_message = on_message
self.controller.loop_start()
while True:
time.sleep(.5)
time.sleep(.5) # TODO: Check if this waiting time should be reduced...
def _update_acquisition_settings(self, config):
warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning)
......@@ -213,8 +233,8 @@ class OhmPi(object):
Parameters
----------
config : str
Path to the .json or dictionary.
config : str, dict
Path to the .json settings file or dictionary of settings.
"""
status = False
if config is not None:
......@@ -241,7 +261,7 @@ class OhmPi(object):
self.id = OHMPI_CONFIG['id'] # ID of the OhmPi
self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm
self.Imax = OHMPI_CONFIG['Imax'] # maximum current
self.exec_logger.warning(f'The maximum current cannot be higher than {self.Imax} mA')
self.exec_logger.debug(f'The maximum current cannot be higher than {self.Imax} mA')
self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V
self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack
self.version = OHMPI_CONFIG['version'] # hardware version
......@@ -265,7 +285,6 @@ class OhmPi(object):
output : numpy.ndarray 1D array of int
List of index of rows where A and B are identical.
"""
# TODO is this needed for M and N?
# if we have a 1D array (so only 1 quadrupole), make it a 2D array
if len(quads.shape) == 1:
......@@ -278,8 +297,9 @@ class OhmPi(object):
@staticmethod
def _get_platform():
"""Gets platform name and checks if it is a raspberry pi
Returns
=======
-------
str, bool
name of the platform on which the code is running, boolean that is true if the platform is a raspberry pi"""
......@@ -296,10 +316,10 @@ class OhmPi(object):
def read_quad(self, filename):
warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning)
self.load_sequence(self, filename)
self.load_sequence(filename)
def load_sequence(self, filename):
"""Read quadrupole sequence from file.
def load_sequence(self, filename: str):
"""Reads quadrupole sequence from file.
Parameters
----------
......@@ -312,10 +332,11 @@ class OhmPi(object):
sequence : numpy.array
Array of shape (number quadrupoles * 4).
"""
self.exec_logger.debug(f'Loading sequence {filename}')
sequence = np.loadtxt(filename, delimiter=" ", dtype=np.uint32) # load quadrupole file
if sequence is not None:
self.exec_logger.debug('Sequence of {:d} quadrupoles read.'.format(sequence.shape[0]))
self.exec_logger.debug(f'Sequence of {sequence.shape[0]:d} quadrupoles read.')
# locate lines where the electrode index exceeds the maximum number of electrodes
test_index_elec = np.array(np.where(sequence > self.max_elec))
......@@ -337,14 +358,14 @@ class OhmPi(object):
sequence = None
if sequence is not None:
self.exec_logger.info('Sequence of {:d} quadrupoles read.'.format(sequence.shape[0]))
self.exec_logger.info(f'Sequence {filename} of {sequence.shape[0]:d} quadrupoles loaded.')
else:
self.exec_logger.warning(f'Unable to load sequence {filename}')
self.sequence = sequence
def _switch_mux(self, electrode_nr, state, role):
"""Select the right channel for the multiplexer cascade for a given electrode.
"""Selects the right channel for the multiplexer cascade for a given electrode.
Parameters
----------
......@@ -355,8 +376,12 @@ class OhmPi(object):
role : str
Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board.
"""
if not self.use_mux:
pass # no MUX or don't use MUX
if not self.use_mux or not self.on_pi:
if not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
else:
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
elif self.sequence is None:
self.exec_logger.warning('Unable to switch MUX without a sequence')
else:
......@@ -366,7 +391,7 @@ class OhmPi(object):
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16) * 16 +1
relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
......@@ -382,9 +407,8 @@ class OhmPi(object):
else:
self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
def switch_mux_on(self, quadrupole):
""" Switch on multiplexer relays for given quadrupole.
"""Switches on multiplexer relays for given quadrupole.
Parameters
----------
......@@ -401,7 +425,7 @@ class OhmPi(object):
self.exec_logger.error('A == B -> short circuit risk detected!')
def switch_mux_off(self, quadrupole):
""" Switch off multiplexer relays for given quadrupole.
"""Switches off multiplexer relays for given quadrupole.
Parameters
----------
......@@ -414,15 +438,21 @@ class OhmPi(object):
self._switch_mux(quadrupole[i], 'off', roles[i])
def reset_mux(self):
"""Switch off all multiplexer relays."""
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
for j in range(1, self.max_elec + 1):
self._switch_mux(j, 'off', roles[i])
self.exec_logger.debug('All MUX switched off.')
"""Switches off all multiplexer relays."""
if self.on_pi and self.use_mux:
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
for j in range(1, self.max_elec + 1):
self._switch_mux(j, 'off', roles[i])
self.exec_logger.debug('All MUX switched off.')
elif not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...')
else:
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...')
def _gain_auto(self, channel):
""" Automatically set the gain on a channel
"""Automatically sets the gain on a channel
Parameters
----------
......@@ -445,7 +475,7 @@ class OhmPi(object):
return gain
def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5):
"""Estimating best Tx voltage based on different strategy.
"""Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed
known voltage. This gives us Iab and Rab. We also measure Vmn.
A constant c = vmn/iab is computed (only depends on geometric
......@@ -505,50 +535,50 @@ class OhmPi(object):
# set voltage for test
self.DPS.write_register(0x0000, volt, 2)
self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(best_tx_injtime) # inject for given tx time
self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(best_tx_injtime) # inject for given tx time
# autogain
self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_current_address)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_voltage_address)
#print('current P0', AnalogIn(self.ads_current, ads.P0).voltage)
#print('voltage P0', AnalogIn(self.ads_voltage, ads.P0).voltage)
#print('voltage P2', AnalogIn(self.ads_voltage, ads.P2).voltage)
gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0))
gain_voltage0 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0))
gain_voltage2 = self.gain_auto(AnalogIn(self.ads_voltage, ads.P2))
# print('current P0', AnalogIn(self.ads_current, ads.P0).voltage)
# print('voltage P0', AnalogIn(self.ads_voltage, ads.P0).voltage)
# print('voltage P2', AnalogIn(self.ads_voltage, ads.P2).voltage)
gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0))
gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))
gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))
gain_voltage = np.min([gain_voltage0, gain_voltage2])
#print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
# print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address)
# we measure the voltage on both A0 and A2 to guess the polarity
I = (AnalogIn(self.ads_current, ads.P0).voltage) * 1000/50/self.r_shunt # measure current
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 # measure voltage
I = AnalogIn(self.ads_current, ads.P0).voltage * 1000/50/self.r_shunt # measure current
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 # measure voltage
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000
#print('I (mV)', I*50*self.r_shunt)
#print('I (mA)', I)
#print('U0 (mV)', U0)
#print('U2 (mV)', U2)
# print('I (mV)', I*50*self.r_shunt)
# print('I (mA)', I)
# print('U0 (mV)', U0)
# print('U2 (mV)', U2)
# check polarity
polarity = 1 # by default, we guessed it right
vmn = U0
if U0 < 0: # we guessed it wrong, let's use a correction factor
if U0 < 0: # we guessed it wrong, let's use a correction factor
polarity = -1
vmn = U2
#print('polarity', polarity)
# print('polarity', polarity)
# compute constant
c = vmn / I
Rab = (volt * 1000) / I
self.exec_logger.debug('Rab = {:.2f} Ohms'.format(Rab))
self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms')
# implement different strategy
if strategy == 'vmax':
vmn_max = c * current_max
if vmn_max < voltage_max and vmn_max > voltage_min:
if voltage_max > vmn_max > voltage_min:
vab = current_max * Rab
self.exec_logger.debug('target max current')
else:
......@@ -561,7 +591,7 @@ class OhmPi(object):
elif strategy == 'vmin':
vmn_min = c * current_min
if vmn_min > voltage_min and vmn_min < voltage_max:
if voltage_min < vmn_min < voltage_max:
vab = current_min * Rab
self.exec_logger.debug('target min current')
else:
......@@ -577,16 +607,15 @@ class OhmPi(object):
else:
vab = 5
#self.DPS.write_register(0x09, 0) # DPS5005 off
# self.DPS.write_register(0x09, 0) # DPS5005 off
self.pin0.value = False
self.pin1.value = False
return vab, polarity
def run_measurement(self, quad=None, nb_stack=None, injection_duration=None,
autogain=True, strategy='constant', tx_volt=5, best_tx_injtime=0.1):
"""Do a 4 electrode measurement and measure transfer resistance obtained.
autogain=True, strategy='constant', tx_volt=5., best_tx_injtime=0.1):
"""Measures on a quadrupole and returns transfer resistance.
Parameters
----------
......@@ -604,12 +633,12 @@ class OhmPi(object):
strategy : str, optional
(V3.0 only) If we search for best voltage (tx_volt == 0), we can choose
different strategy:
- vmin: find lowest voltage that gives us a signal
- vmax: find max voltage that are in the range
- vmin: find the lowest voltage that gives us a signal
- vmax: find the highest voltage that stays in the range
For a constant value, just set the tx_volt.
tx_volt : float, optional
(V3.0 only) If specified, voltage will be imposed. If 0, we will look
for the best voltage. If a best Tx cannot be found, no
for the best voltage. If the best Tx cannot be found, no
measurement will be taken and values will be NaN.
best_tx_injtime : float, optional
(V3.0 only) Injection time in seconds used for finding the best voltage.
......@@ -647,26 +676,28 @@ class OhmPi(object):
if self.idps:
tx_volt, polarity = self._compute_tx_volt(
best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt)
self.exec_logger.debug('Best vab found is {:.3}V'.format(tx_volt))
self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V')
else:
polarity = 1
# first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address, mode=0)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860,
address=self.ads_current_address, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860,
address=self.ads_voltage_address, mode=0)
# turn on the power supply
out_of_range = False
if self.idps:
if not np.isnan(tx_volt):
self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V
self.DPS.write_register(0x09, 1) # DPS5005 on
self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V
self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(0.05)
else:
self.exec_logger.debug('No best voltage found, will not take measurement')
out_of_range = True # oor: out of range
out_of_range = True
if not out_of_range: # we found a vab in the range so we measure
if not out_of_range: # we found a Vab in the range so we measure
if autogain:
# compute autogain
self.pin0.value = True
......@@ -679,9 +710,11 @@ class OhmPi(object):
gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))
self.pin0.value = False
self.pin1.value = False
self.exec_logger.debug('Gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage))
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address, mode=0)
self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage:.3f}')
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860,
address=self.ads_current_address, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860,
address=self.ads_voltage_address, mode=0)
self.pin0.value = False
self.pin1.value = False
......@@ -722,15 +755,15 @@ class OhmPi(object):
if pinMN == 0:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000
else:
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1
meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P2).voltage * 1000
elif self.board_version == '22.10':
meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000
#else:
# else:
# self.exec_logger.debug('Unknown board')
time.sleep(sampling_interval / 1000)
dt = time.time() - start_delay # real injection time (s)
meas[k, 2] = time.time() - start_time
if dt > (injection_duration - 0 * sampling_interval /1000):
if dt > (injection_duration - 0 * sampling_interval / 1000.):
break
# stop current injection
......@@ -747,20 +780,20 @@ class OhmPi(object):
dt = 0
for k in range(0, measpp.shape[0]):
# reading current value on ADS channels
measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt)
measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt)
if self.board_version == '22.11':
if pinMN == 0:
measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000
measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
else:
measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 *-1
measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1
elif self.board_version == '22.10':
measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000
measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000.
else:
self.exec_logger.debug('unknown board')
time.sleep(sampling_interval / 1000)
dt = time.time() - start_delay # real injection time (s)
measpp[k, 2] = time.time() - start_time
if dt > (injection_duration - 0 * sampling_interval /1000):
if dt > (injection_duration - 0 * sampling_interval / 1000.):
break
end_delay = time.time()
......@@ -802,11 +835,11 @@ class OhmPi(object):
if self.idps:
self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt
self.DPS.write_register(0x09, 0) # DPS5005 off
self.DPS.write_register(0x09, 0) # DPS5005 off
# reshape full data to an array of good size
# we need an array of regular size to save in the csv
if out_of_range == False:
if not out_of_range:
fulldata = np.vstack(fulldata)
# we create a big enough array given nb_samples, number of
# half-cycles (1 stack = 2 half-cycles), and twice as we
......@@ -824,13 +857,13 @@ class OhmPi(object):
"B": quad[1],
"M": quad[2],
"N": quad[3],
"inj time [ms]": (end_delay - start_delay) * 1000 if out_of_range == False else 0,
"inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0.,
"Vmn [mV]": sum_vmn / (2 * nb_stack),
"I [mA]": sum_i / (2 * nb_stack),
"R [ohm]": sum_vmn / sum_i,
"Ps [mV]": sum_ps / (2 * nb_stack),
"nbStack": nb_stack,
"Tx [V]": tx_volt if out_of_range == False else 0,
"Tx [V]": tx_volt if not out_of_range else 0.,
"CPU temp [degC]": CPUTemperature().temperature,
"Nb samples [-]": self.nb_samples,
"fulldata": fulldata,
......@@ -863,8 +896,7 @@ class OhmPi(object):
return d
def rs_check(self, tx_volt=12):
""" Check contact resistance.
"""
"""Checks contact resistances"""
# create custom sequence where MN == AB
# we only check the electrodes which are in the sequence (not all might be connected)
if self.sequence is None or not self.use_mux:
......@@ -900,14 +932,14 @@ class OhmPi(object):
d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=1, tx_volt=tx_volt, autogain=False)
if self.idps:
voltage = tx_volt * 1000. # imposed voltage on dps5005
voltage = tx_volt * 1000. # imposed voltage on dps5005
else:
voltage = d['Vmn [mV]']
current = d['I [mA]']
# compute resistance measured (= contact resistance)
resist = abs(voltage / current) /1000.
#print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format(
resist = abs(voltage / current) / 1000.
# print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format(
# current, voltage, resist))
msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \
f'V: {voltage :>10.3f} mV, ' \
......@@ -917,8 +949,7 @@ class OhmPi(object):
# if contact resistance = 0 -> we have a short circuit!!
if resist < 1e-5:
msg = '!!!SHORT CIRCUIT!!! {:s}: {:.3f} kOhm'.format(
str(quad), resist)
msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm'
self.exec_logger.warning(msg)
print(msg)
......@@ -943,7 +974,7 @@ class OhmPi(object):
@staticmethod
def append_and_save(filename, last_measurement):
"""Append and save last measurement dict.
"""Appends and saves the last measurement dict.
Parameters
----------
......@@ -957,15 +988,14 @@ class OhmPi(object):
d = last_measurement['fulldata']
n = d.shape[0]
if n > 1:
idic = dict(zip(['i' + str(i) for i in range(n)], d[:,0]))
udic = dict(zip(['u' + str(i) for i in range(n)], d[:,1]))
tdic = dict(zip(['t' + str(i) for i in range(n)], d[:,2]))
idic = dict(zip(['i' + str(i) for i in range(n)], d[:, 0]))
udic = dict(zip(['u' + str(i) for i in range(n)], d[:, 1]))
tdic = dict(zip(['t' + str(i) for i in range(n)], d[:, 2]))
last_measurement.update(idic)
last_measurement.update(udic)
last_measurement.update(tdic)
last_measurement.pop('fulldata')
if os.path.isfile(filename):
# Load data file and append data to it
with open(filename, 'a') as f:
......@@ -980,28 +1010,29 @@ class OhmPi(object):
w.writerow(last_measurement)
def _process_commands(self, message):
""" TODO
"""Processes commands received from the controller(s)
Parameters
----------
message : str
command and arguments
Returns
-------
message containing a command and arguments or keywords and arguments
"""
try:
cmd_id = None
decoded_message = json.loads(message)
print(f'decoded message: {decoded_message}')
cmd_id = decoded_message.pop('cmd_id', None)
cmd = decoded_message.pop('cmd', None)
args = decoded_message.pop('args', '[]')
if args[0] != '[':
args = f'["{args}"]'
args = json.loads(args)
kwargs = decoded_message.pop('kwargs', '{}')
kwargs = json.loads(kwargs)
self.exec_logger.debug(f'Calling method {cmd}({args}, {kwargs})')
status = False
e = None # NOTE: Why this?
# e = None # NOTE: Why this?
print(cmd, args, kwargs)
if cmd_id is None:
self.exec_logger.warning('You should use a unique identifier for cmd_id')
if cmd is not None:
......@@ -1021,7 +1052,7 @@ class OhmPi(object):
def measure(self, *args, **kwargs):
warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning)
self.run_sequence(self, *args, **kwargs)
self.run_sequence(*args, **kwargs)
def set_sequence(self, sequence=sequence):
try:
......@@ -1031,13 +1062,12 @@ class OhmPi(object):
self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False
def run_sequence(self, **kwargs):
"""Run sequence in sync mode
def run_sequence(self, cmd_id=None, **kwargs):
"""Runs sequence in sync mode
"""
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
t0 = time.time()
# create filename with timestamp
......@@ -1088,7 +1118,12 @@ class OhmPi(object):
self.status = 'idle'
def run_sequence_async(self, cmd_id=None, **kwargs):
""" Run the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
"""Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
Parameters
----------
cmd_id:
"""
# self.run = True
self.status = 'running'
......@@ -1151,8 +1186,8 @@ class OhmPi(object):
self.thread = threading.Thread(target=func)
self.thread.start()
def run_multiple_sequences(self, cmd_id=None, **kwargs):
""" Run multiple sequences in a separate thread for monitoring mode.
def run_multiple_sequences(self, *args, **kwargs):
"""Run multiple sequences in a separate thread for monitoring mode.
Can be stopped by 'OhmPi.interrupt()'.
"""
# self.run = True
......@@ -1160,8 +1195,9 @@ class OhmPi(object):
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
cmd_id = kwargs.pop('cmd_id', None)
def func():
for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring
for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring
if self.status != 'running':
self.exec_logger.warning('Data acquisition interrupted')
break
......@@ -1236,10 +1272,12 @@ class OhmPi(object):
self.interrupt()
def interrupt(self):
""" Interrupt the acquisition. """
"""Interrupts the acquisition. """
self.status = 'stopping'
if self.thread is not None:
self.thread.join()
else:
self.exec_logger.debug('No sequence measurement thread to interrupt.')
self.exec_logger.debug(f'Status: {self.status}')
def quit(self):
......@@ -1264,21 +1302,20 @@ print(colored(r' ________________________________' + '\n' +
r'| | | | _ || |\/| || __/ | |' + '\n' +
r'\ \_/ / | | || | | || | _| |_' + '\n' +
r' \___/\_| |_/\_| |_/\_| \___/ ', 'red'))
print('OhmPi start')
print('Version:', VERSION)
platform, on_pi = OhmPi._get_platform()
platform, on_pi = OhmPi._get_platform() # noqa
if on_pi:
print(colored(f'Running on {platform} platform', 'green'))
print(colored(f'\u2611 Running on {platform} platform', 'green'))
# TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...)
# and emit a warning otherwise
if not arm64_imports:
print(colored(f'Warning: Required packages are missing.\n'
f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow'))
else:
print(colored(f'Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))
print(colored(f'\u26A0 Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))
current_time = datetime.now()
print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
print(f'local date and time : {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
# for testing
if __name__ == "__main__":
......
......@@ -2,4 +2,3 @@ numpy
paho-mqtt
termcolor
pandas
pyzmq
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment