Commit fae95cc9 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Fixes filename for data logging

Showing with 90 additions and 48 deletions
+90 -48
......@@ -60,7 +60,7 @@ def setup_loggers(mqtt=True):
log_format = '%(asctime)-15s | %(process)d | %(levelname)s: %(message)s'
logging_to_console = DATA_LOGGING_CONFIG['logging_to_console']
data_handler = CompressedSizedTimedRotatingFileHandler(exec_log_filename,
data_handler = CompressedSizedTimedRotatingFileHandler(data_log_filename,
max_bytes=DATA_LOGGING_CONFIG['max_bytes'],
backup_count=DATA_LOGGING_CONFIG['backup_count'],
when=DATA_LOGGING_CONFIG['when'],
......
......@@ -17,21 +17,22 @@ from datetime import datetime
from termcolor import colored
import threading
from logging_setup import setup_loggers
# from mqtt_setup import mqtt_client_setup
# finish import (done only when class is instantiated as some libs are only available on arm64 platform)
arm_64_imports = False
try:
import board # noqa
import busio # noqa
import adafruit_tca9548a # noqa
import adafruit_ads1x15.ads1115 as ads # noqa
from adafruit_ads1x15.analog_in import AnalogIn # noqa
import adafruit_ads1x15.ads1115 as ads # noqa
from adafruit_ads1x15.analog_in import AnalogIn # noqa
from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa
from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa
import digitalio # noqa
from digitalio import Direction # noqa
from gpiozero import CPUTemperature # noqa
arm64_imports = True
except ImportError as error:
print(colored(f'Import error: {error}', 'yellow'))
......@@ -52,6 +53,7 @@ class OhmPi(object):
Path to the .txt where the sequence is read. By default, a 1 quadrupole
sequence: 1, 2, 3, 4 is used.
"""
def __init__(self, config=None, sequence=None, mqtt=True, on_pi=None):
# flags and attributes
if on_pi is None:
......@@ -90,7 +92,7 @@ class OhmPi(object):
self._read_acquisition_parameters(config)
self.exec_logger.debug('Initialized with configuration:' + str(self.pardict))
# read quadrupole sequence
if sequence is None:
self.sequence = np.array([[1, 2, 3, 4]])
......@@ -100,16 +102,16 @@ class OhmPi(object):
# connect to components on the OhmPi board
if self.on_pi:
# activation of I2C protocol
self.i2c = busio.I2C(board.SCL, board.SDA) # noqa
self.i2c = busio.I2C(board.SCL, board.SDA) # noqa
# I2C connexion to MCP23008, for current injection
self.mcp = MCP23008(self.i2c, address=0x20)
# ADS1115 for current measurement (AB)
self.ads_current = ads.ADS1115(self.i2c, gain=16, data_rate=860, address=0x48)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48)
# ADS1115 for voltage measurement (MN)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49)
def _read_acquisition_parameters(self, config):
"""Read acquisition parameters.
......@@ -221,13 +223,13 @@ class OhmPi(object):
Array of shape (number quadrupoles * 4).
"""
output = np.loadtxt(filename, delimiter=" ", dtype=int) # load quadrupole file
# locate lines where the electrode index exceeds the maximum number of electrodes
test_index_elec = np.array(np.where(output > self.max_elec))
# locate lines where electrode A == electrode B
test_same_elec = self.find_identical_in_line(output)
# if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2)
if test_index_elec.size != 0:
for i in range(len(test_index_elec[0, :])):
......@@ -243,7 +245,7 @@ class OhmPi(object):
if output is not None:
self.exec_logger.debug('Sequence of {:d} quadrupoles read.'.format(output.shape[0]))
self.sequence = output
def switch_mux(self, electrode_nr, state, role):
......@@ -263,13 +265,13 @@ class OhmPi(object):
else:
# choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role])
# find I2C address of the electrode and corresponding relay
# TODO from number of electrode, the below can be guessed
# considering that one MCP23017 can cover 16 electrodes
electrode_nr = electrode_nr - 1 # switch to 0 indexing
i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division
relay_nr = electrode_nr - (electrode_nr // 16)*16
relay_nr = electrode_nr - (electrode_nr // 16) * 16
relay_nr = relay_nr + 1 # switch back to 1 based indexing
# if electrode_nr < 17:
......@@ -288,13 +290,13 @@ class OhmPi(object):
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr-1).direction = digitalio.Direction.OUTPUT
mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT
if state == 'on':
mcp2.get_pin(relay_nr-1).value = True
mcp2.get_pin(relay_nr - 1).value = True
else:
mcp2.get_pin(relay_nr-1).value = False
mcp2.get_pin(relay_nr - 1).value = False
self.exec_logger.debug(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}')
else:
self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
......@@ -335,7 +337,30 @@ class OhmPi(object):
self.switch_mux(j, 'off', roles[i])
self.exec_logger.debug('All MUX switched off.')
def run_measurement(self, quad, nb_stack=None, injection_duration=None): # NOTE: quad not used?!
def gain_auto(self, channel):
""" Automatically set the gain on a channel
Parameters
----------
channel:
Returns
-------
float
"""
gain = 2 / 3
if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.023):
gain = 2
elif (abs(channel.voltage) < 1.023) and (abs(channel.voltage) >= 0.508):
gain = 4
elif (abs(channel.voltage) < 0.508) and (abs(channel.voltage) >= 0.250):
gain = 8
elif abs(channel.voltage) < 0.256:
gain = 16
self.exec_logger.debug(f'Setting gain to {gain}')
return gain
def run_measurement(self, quad, nb_stack=None, injection_duration=None):
""" Do a 4 electrode measurement and measure transfer resistance obtained.
Parameters
......@@ -360,7 +385,7 @@ class OhmPi(object):
injection_current = 0
sum_vmn = 0
sum_ps = 0
# injection courant and measure
pin0 = self.mcp.get_pin(0)
pin0.direction = Direction.OUTPUT
......@@ -371,9 +396,28 @@ class OhmPi(object):
self.exec_logger.debug('Starting measurement')
self.data_logger.info('Waiting for data')
# TODO I don't get why 3 + 2*nb_stack - 1? why not just rnage(nb_stack)?
# FUNCTION AUTOGAIN
# ADS1115 for current measurement (AB)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48)
# ADS1115 for voltage measurement (MN)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49)
# try auto gain
pin1.value = True
pin0.value = False
time.sleep(injection_duration)
gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0))
gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0, ads.P1))
pin0.value = False
pin1.value = False
print(gain_current)
print(gain_voltage)
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=0x48)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=0x49)
# TODO I don't get why 3 + 2*nb_stack - 1? why not just range(nb_stack)?
# or do we consider 1 stack = one full polarity? do we discard the first 3 readings?
for n in range(0, 3+2*nb_stack-1):
for n in range(0, 3 + 2 * nb_stack - 1):
# current injection
if (n % 2) == 0:
pin1.value = True
......@@ -389,10 +433,10 @@ class OhmPi(object):
meas = np.zeros((self.nb_samples, 3))
for k in range(0, self.nb_samples):
# reading current value on ADS channel A0
meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage*1000) / (50 * self.r_shunt)
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * self.coef_p2 * 1000
meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt)
meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000
# reading voltage value on ADS channel A2
meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000
# meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000
# stop current injection
pin1.value = False
......@@ -411,21 +455,21 @@ class OhmPi(object):
sum_ps = sum_ps + vmn1
# TODO get battery voltage and warn if battery is running low
end_calc = time.time()
# TODO I am not sure I understand the computation below
# wait twice the actual injection time between two injection
# so it's a 50% duty cycle right?
time.sleep(2*(end_delay-start_delay)-(end_calc-start_delay))
time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay))
# create a dictionary and compute averaged values from all stacks
d = {
"time": [datetime.now().isoformat()],
"A": [1],
"B": [2],
"M": [3],
"N": [4],
"A": quad[0],
"B": quad[1],
"M": quad[2],
"N": quad[3],
"inj time [ms]": (end_delay - start_delay) * 1000,
"Vmn [mV]": [(sum_vmn / (3 + 2 * nb_stack - 1))],
"I [mA]": [(injection_current / (3 + 2 * nb_stack - 1))],
......@@ -434,7 +478,7 @@ class OhmPi(object):
"nbStack": [nb_stack],
"CPU temp [degC]": [CPUTemperature().temperature],
"Time [s]": [(-start_time + time.time())],
"Nb samples [-]": [self.nb_samples]
"Nb samples [-]": [self.nb_samples]
}
# round number to two decimal for nicer string output
......@@ -462,8 +506,8 @@ class OhmPi(object):
np.arange(nelec - 1) + 2,
np.arange(nelec - 1) + 1,
np.arange(nelec - 1) + 2
]).T
]).T
# create backup TODO not good
export_path = self.pardict['export_path'].copy()
sequence = self.sequence.copy()
......@@ -471,11 +515,11 @@ class OhmPi(object):
# assign new value
self.pardict['export_path'] = export_path.replace('.csv', '_rs.csv')
self.sequence = quads
# run the RS check
self.exec_logger.debug('RS check (check contact resistance)')
self.measure()
# restore
self.pardict['export_path'] = export_path
self.sequence = sequence
......@@ -494,7 +538,7 @@ class OhmPi(object):
last_measurement : dict
Last measurement taken in the form of a python dictionary
"""
if os.path.isfile(filename):
# Load data file and append data to it
with open(filename, 'a') as f:
......@@ -536,7 +580,7 @@ class OhmPi(object):
quad = self.sequence[i, :] # quadrupole
if self.run is False:
break
# call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad)
......@@ -549,7 +593,7 @@ class OhmPi(object):
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
'R [ohm]': np.abs(np.random.randn(1))
}
# switch mux off
self.switch_mux_off(quad)
......@@ -563,17 +607,18 @@ class OhmPi(object):
# between two sequence run (= sequence_delay)
measuring_time = time.time() - t0
sleep_time = self.pardict["sequence_delay"] - measuring_time
if sleep_time < 0:
# it means that the measuring time took longer than the sequence delay
sleep_time = 0
self.exec_logger.warning('The measuring time is longer than the sequence delay. '
'Increase the sequence delay')
'Increase the sequence delay')
# sleeping time between sequence
if self.pardict["nbr_meas"] > 1:
time.sleep(sleep_time) # waiting for next measurement (time-lapse)
self.status = 'idle'
self.thread = threading.Thread(target=func)
self.thread.start()
......@@ -586,9 +631,6 @@ class OhmPi(object):
self.exec_logger.debug(f'Status: {self.status}')
# exec_logger, exec_log_filename, data_logger, data_log_filename, logging_level = setup_loggers() # TODO: add SOH
# mqtt_client, measurement_topic = mqtt_client_setup()
VERSION = '2.0.2'
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment