diff --git a/logging_setup.py b/logging_setup.py index 1e2e1f88d22d8fbbc8122040e8961999c2adb194..df8d5f88d537598127249594f7dd7fcf3538a23c 100644 --- a/logging_setup.py +++ b/logging_setup.py @@ -60,7 +60,7 @@ def setup_loggers(mqtt=True): log_format = '%(asctime)-15s | %(process)d | %(levelname)s: %(message)s' logging_to_console = DATA_LOGGING_CONFIG['logging_to_console'] - data_handler = CompressedSizedTimedRotatingFileHandler(exec_log_filename, + data_handler = CompressedSizedTimedRotatingFileHandler(data_log_filename, max_bytes=DATA_LOGGING_CONFIG['max_bytes'], backup_count=DATA_LOGGING_CONFIG['backup_count'], when=DATA_LOGGING_CONFIG['when'], diff --git a/ohmpi.py b/ohmpi.py index 61b47fa3a737413421e38ff4fdc33b4b289bd120..27a37ebf3178ae22413bad66c99441cec9520ae8 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -17,21 +17,22 @@ from datetime import datetime from termcolor import colored import threading from logging_setup import setup_loggers + # from mqtt_setup import mqtt_client_setup # finish import (done only when class is instantiated as some libs are only available on arm64 platform) -arm_64_imports = False try: import board # noqa import busio # noqa import adafruit_tca9548a # noqa - import adafruit_ads1x15.ads1115 as ads # noqa - from adafruit_ads1x15.analog_in import AnalogIn # noqa + import adafruit_ads1x15.ads1115 as ads # noqa + from adafruit_ads1x15.analog_in import AnalogIn # noqa from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa import digitalio # noqa from digitalio import Direction # noqa from gpiozero import CPUTemperature # noqa + arm64_imports = True except ImportError as error: print(colored(f'Import error: {error}', 'yellow')) @@ -52,6 +53,7 @@ class OhmPi(object): Path to the .txt where the sequence is read. By default, a 1 quadrupole sequence: 1, 2, 3, 4 is used. """ + def __init__(self, config=None, sequence=None, mqtt=True, on_pi=None): # flags and attributes if on_pi is None: @@ -90,7 +92,7 @@ class OhmPi(object): self._read_acquisition_parameters(config) self.exec_logger.debug('Initialized with configuration:' + str(self.pardict)) - + # read quadrupole sequence if sequence is None: self.sequence = np.array([[1, 2, 3, 4]]) @@ -100,16 +102,16 @@ class OhmPi(object): # connect to components on the OhmPi board if self.on_pi: # activation of I2C protocol - self.i2c = busio.I2C(board.SCL, board.SDA) # noqa + self.i2c = busio.I2C(board.SCL, board.SDA) # noqa # I2C connexion to MCP23008, for current injection self.mcp = MCP23008(self.i2c, address=0x20) - + # ADS1115 for current measurement (AB) - self.ads_current = ads.ADS1115(self.i2c, gain=16, data_rate=860, address=0x48) - + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48) + # ADS1115 for voltage measurement (MN) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=0x49) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49) def _read_acquisition_parameters(self, config): """Read acquisition parameters. @@ -221,13 +223,13 @@ class OhmPi(object): Array of shape (number quadrupoles * 4). """ output = np.loadtxt(filename, delimiter=" ", dtype=int) # load quadrupole file - + # locate lines where the electrode index exceeds the maximum number of electrodes test_index_elec = np.array(np.where(output > self.max_elec)) - + # locate lines where electrode A == electrode B test_same_elec = self.find_identical_in_line(output) - + # if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2) if test_index_elec.size != 0: for i in range(len(test_index_elec[0, :])): @@ -243,7 +245,7 @@ class OhmPi(object): if output is not None: self.exec_logger.debug('Sequence of {:d} quadrupoles read.'.format(output.shape[0])) - + self.sequence = output def switch_mux(self, electrode_nr, state, role): @@ -263,13 +265,13 @@ class OhmPi(object): else: # choose with MUX board tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_address[role]) - + # find I2C address of the electrode and corresponding relay # TODO from number of electrode, the below can be guessed # considering that one MCP23017 can cover 16 electrodes electrode_nr = electrode_nr - 1 # switch to 0 indexing i2c_address = 7 - electrode_nr // 16 # quotient without rest of the division - relay_nr = electrode_nr - (electrode_nr // 16)*16 + relay_nr = electrode_nr - (electrode_nr // 16) * 16 relay_nr = relay_nr + 1 # switch back to 1 based indexing # if electrode_nr < 17: @@ -288,13 +290,13 @@ class OhmPi(object): if i2c_address is not None: # select the MCP23017 of the selected MUX board mcp2 = MCP23017(tca[i2c_address]) - mcp2.get_pin(relay_nr-1).direction = digitalio.Direction.OUTPUT - + mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT + if state == 'on': - mcp2.get_pin(relay_nr-1).value = True + mcp2.get_pin(relay_nr - 1).value = True else: - mcp2.get_pin(relay_nr-1).value = False - + mcp2.get_pin(relay_nr - 1).value = False + self.exec_logger.debug(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}') else: self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') @@ -335,7 +337,30 @@ class OhmPi(object): self.switch_mux(j, 'off', roles[i]) self.exec_logger.debug('All MUX switched off.') - def run_measurement(self, quad, nb_stack=None, injection_duration=None): # NOTE: quad not used?! + def gain_auto(self, channel): + """ Automatically set the gain on a channel + + Parameters + ---------- + channel: + + Returns + ------- + float + """ + gain = 2 / 3 + if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.023): + gain = 2 + elif (abs(channel.voltage) < 1.023) and (abs(channel.voltage) >= 0.508): + gain = 4 + elif (abs(channel.voltage) < 0.508) and (abs(channel.voltage) >= 0.250): + gain = 8 + elif abs(channel.voltage) < 0.256: + gain = 16 + self.exec_logger.debug(f'Setting gain to {gain}') + return gain + + def run_measurement(self, quad, nb_stack=None, injection_duration=None): """ Do a 4 electrode measurement and measure transfer resistance obtained. Parameters @@ -360,7 +385,7 @@ class OhmPi(object): injection_current = 0 sum_vmn = 0 sum_ps = 0 - + # injection courant and measure pin0 = self.mcp.get_pin(0) pin0.direction = Direction.OUTPUT @@ -371,9 +396,28 @@ class OhmPi(object): self.exec_logger.debug('Starting measurement') self.data_logger.info('Waiting for data') - # TODO I don't get why 3 + 2*nb_stack - 1? why not just rnage(nb_stack)? + + # FUNCTION AUTOGAIN + # ADS1115 for current measurement (AB) + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x48) + # ADS1115 for voltage measurement (MN) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=0x49) + # try auto gain + pin1.value = True + pin0.value = False + time.sleep(injection_duration) + gain_current = self.gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage = self.gain_auto(AnalogIn(self.ads_voltage, ads.P0, ads.P1)) + pin0.value = False + pin1.value = False + print(gain_current) + print(gain_voltage) + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=0x48) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=0x49) + + # TODO I don't get why 3 + 2*nb_stack - 1? why not just range(nb_stack)? # or do we consider 1 stack = one full polarity? do we discard the first 3 readings? - for n in range(0, 3+2*nb_stack-1): + for n in range(0, 3 + 2 * nb_stack - 1): # current injection if (n % 2) == 0: pin1.value = True @@ -389,10 +433,10 @@ class OhmPi(object): meas = np.zeros((self.nb_samples, 3)) for k in range(0, self.nb_samples): # reading current value on ADS channel A0 - meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage*1000) / (50 * self.r_shunt) - meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * self.coef_p2 * 1000 + meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) + meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 # reading voltage value on ADS channel A2 - meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000 + # meas[k, 2] = AnalogIn(self.ads_voltage, ads.P1).voltage * self.coef_p3 * 1000 # stop current injection pin1.value = False @@ -411,21 +455,21 @@ class OhmPi(object): sum_ps = sum_ps + vmn1 # TODO get battery voltage and warn if battery is running low - + end_calc = time.time() # TODO I am not sure I understand the computation below # wait twice the actual injection time between two injection # so it's a 50% duty cycle right? - time.sleep(2*(end_delay-start_delay)-(end_calc-start_delay)) - + time.sleep(2 * (end_delay - start_delay) - (end_calc - start_delay)) + # create a dictionary and compute averaged values from all stacks d = { "time": [datetime.now().isoformat()], - "A": [1], - "B": [2], - "M": [3], - "N": [4], + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], "inj time [ms]": (end_delay - start_delay) * 1000, "Vmn [mV]": [(sum_vmn / (3 + 2 * nb_stack - 1))], "I [mA]": [(injection_current / (3 + 2 * nb_stack - 1))], @@ -434,7 +478,7 @@ class OhmPi(object): "nbStack": [nb_stack], "CPU temp [degC]": [CPUTemperature().temperature], "Time [s]": [(-start_time + time.time())], - "Nb samples [-]": [self.nb_samples] + "Nb samples [-]": [self.nb_samples] } # round number to two decimal for nicer string output @@ -462,8 +506,8 @@ class OhmPi(object): np.arange(nelec - 1) + 2, np.arange(nelec - 1) + 1, np.arange(nelec - 1) + 2 - ]).T - + ]).T + # create backup TODO not good export_path = self.pardict['export_path'].copy() sequence = self.sequence.copy() @@ -471,11 +515,11 @@ class OhmPi(object): # assign new value self.pardict['export_path'] = export_path.replace('.csv', '_rs.csv') self.sequence = quads - + # run the RS check self.exec_logger.debug('RS check (check contact resistance)') self.measure() - + # restore self.pardict['export_path'] = export_path self.sequence = sequence @@ -494,7 +538,7 @@ class OhmPi(object): last_measurement : dict Last measurement taken in the form of a python dictionary """ - + if os.path.isfile(filename): # Load data file and append data to it with open(filename, 'a') as f: @@ -536,7 +580,7 @@ class OhmPi(object): quad = self.sequence[i, :] # quadrupole if self.run is False: break - + # call the switch_mux function to switch to the right electrodes self.switch_mux_on(quad) @@ -549,7 +593,7 @@ class OhmPi(object): 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], 'R [ohm]': np.abs(np.random.randn(1)) } - + # switch mux off self.switch_mux_off(quad) @@ -563,17 +607,18 @@ class OhmPi(object): # between two sequence run (= sequence_delay) measuring_time = time.time() - t0 sleep_time = self.pardict["sequence_delay"] - measuring_time - + if sleep_time < 0: # it means that the measuring time took longer than the sequence delay sleep_time = 0 self.exec_logger.warning('The measuring time is longer than the sequence delay. ' - 'Increase the sequence delay') + 'Increase the sequence delay') # sleeping time between sequence if self.pardict["nbr_meas"] > 1: time.sleep(sleep_time) # waiting for next measurement (time-lapse) self.status = 'idle' + self.thread = threading.Thread(target=func) self.thread.start() @@ -586,9 +631,6 @@ class OhmPi(object): self.exec_logger.debug(f'Status: {self.status}') -# exec_logger, exec_log_filename, data_logger, data_log_filename, logging_level = setup_loggers() # TODO: add SOH - - # mqtt_client, measurement_topic = mqtt_client_setup() VERSION = '2.0.2'