diff --git a/config.py b/config.py index 9e787e98fbef18ee8d3429457ea7fb9b27b8d60d..4468951265fa1ee997304e2c44cd47a0be6a5398 100644 --- a/config.py +++ b/config.py @@ -65,6 +65,7 @@ DATA_LOGGING_CONFIG = { # State of Health logging configuration (For a future release) SOH_LOGGING_CONFIG = { 'logging_level': logging.INFO, + 'log_file_logging_level': logging.DEBUG, 'logging_to_console': True, 'file_name': f'soh{logging_suffix}.log', 'max_bytes': 16777216, diff --git a/doc/mux_2024_rev_0_0.py b/doc/mux_2024_rev_0_0.py new file mode 100644 index 0000000000000000000000000000000000000000..7845820fac43aac74faaaa157d1d4ac142371d11 --- /dev/null +++ b/doc/mux_2024_rev_0_0.py @@ -0,0 +1,72 @@ +from OhmPi.config import HARDWARE_CONFIG +import os +from OhmPi.hardware import MuxAbstract +MUX_CONFIG = HARDWARE_CONFIG['mux'] + +class Mux(MuxAbstract): + def __init__(self, **kwargs): + kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) + super().__init__(**kwargs) + self.max_elec = MUX_CONFIG['max_elec'] + + def switch_one(self, elec, role, state='off'): + self.tca = adafruit_tca9548a.TCA9548A(i2c, self.addresses[role]) + # find I2C address of the electrode and corresponding relay + # considering that one MCP23017 can cover 16 electrodes + i2c_address = 7 - (elec - 1) // 16 # quotient without rest of the division + relay = (elec - 1) - ((elec - 1) // 16) * 16 + + if i2c_address is not None: + # select the MCP23017 of the selected MUX board + mcp = MCP23017(self.tca[i2c_address]) + mcp.get_pin(relay - 1).direction = digitalio.Direction.OUTPUT + if state == 'on': + mcp.get_pin(relay - 1).value = True + else: + mcp.get_pin(relay - 1).value = False + # exec_logger.debug(f'Switching relay {relay} ' + # f'({str(hex(self.addresses[role]))}) on:{on} for electrode {elec}') + else: + raise ValueError('No I2C address found for the electrode' + ' {:d} on board {:s}'.format(elec, self.addresses[role])) + # exec_logger.warning(f'Unable to address electrode nr {elec}') + + def switch(self, elecdic={}, state='on'): + """Switch a given list of electrodes with different roles. + Electrodes with a value of 0 will be ignored. + + Parameters + ---------- + elecdic : dictionary, optional + Dictionnary of the form: role: [list of electrodes]. + state : str, optional + Either 'on' or 'off'. + """ + # check to prevent A == B (SHORT-CIRCUIT) + if 'A' in elecdic and 'B' in elecdic: + out = np.in1d(elecdic['A'], elecdic['B']) + if out.any(): + raise ValueError('Some electrodes have A == B -> SHORT-CIRCUIT') + return + + # check none of M and N are the same A or B + # as to prevent burning the MN part which cannot take + # the full voltage of the DPS + if 'A' in elecdic and 'B' in elecdic and 'M' in elecdic and 'N' in elecdic: + if (np.in1d(elecdic['M'], elecdic['A']).any() + or np.in1d(elecdic['M'], elecdic['B']).any() + or np.in1d(elecdic['N'], elecdic['A']).any() + or np.in1d(elecdic['N'], elecdic['B']).any()): + raise ValueError('Some electrodes M and N are on A and B -> cannot be with DPS') + return + + # if all ok, then switch the electrodes + for role in elecdic: + for elec in elecdic[role]: + if elec > 0: + self.switch_one(elec, role, state) + + def reset(self): + for role in self.addresses: + for elec in range(self.nelec): + self.switch_one(elec, role, 'off') \ No newline at end of file diff --git a/hardware/abstract_hardware.py b/hardware/abstract_hardware.py index c90bc112c45314ab96d374b01ecfd16af57403cb..4082dc670dbf683f922ecbd167e3fd46cd52b4b4 100644 --- a/hardware/abstract_hardware.py +++ b/hardware/abstract_hardware.py @@ -1,4 +1,7 @@ from abc import ABC, abstractmethod + +import numpy as np + from OhmPi.logging_setup import create_stdout_logger import time @@ -9,16 +12,107 @@ class ControllerAbstract(ABC): self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_ctl') + self.soh_logger = kwargs.pop('soh_logger', None) + if self.soh_logger is None: + self.soh_logger = create_stdout_logger('soh_ctl') self.exec_logger.debug(f'{self.board_name} Controller initialization') + self._cpu_temp_available = False + self.max_cpu_temp = np.inf + @property + def cpu_temperature(self): + if not self._cpu_temp_available: + self.exec_logger.warning(f'CPU temperature reading is not available for {self.board_name}') + cpu_temp = np.nan + else: + cpu_temp = self._get_cpu_temp() + if cpu_temp > self.max_cpu_temp: + self.soh_logger.warning(f'CPU temperature of {self.board_name} is over the limit!') + return cpu_temp + + @abstractmethod + def _get_cpu_temp(self): + pass class MuxAbstract(ABC): def __init__(self, **kwargs): - self.board_name = kwargs.pop('board_name', 'unknown MUX hardware') + self.board_name = kwargs.pop('board_name', 'unknown MUX hardware') # TODO: introduce MUX boards that take part to a MUX system (could be the same for RX boards that take part to an RX system (e.g. different channels) self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_mux') self.exec_logger.debug(f'{self.board_name} MUX initialization') + @abstractmethod + def reset(self): + pass + + def switch(self, elec_dict=None, state='on'): + """Switch a given list of electrodes with different roles. + Electrodes with a value of 0 will be ignored. + + Parameters + ---------- + elec_dict : dictionary, optional + Dictionary of the form: {role: [list of electrodes]}. + state : str, optional + Either 'on' or 'off'. + """ + if elec_dict is not None: + self.exec_logger.debug(f'Switching {self.board_name} ') + # check to prevent A == B (SHORT-CIRCUIT) + if 'A' in elec_dict.keys() and 'B' in elec_dict.keys(): + out = np.in1d(elec_dict['A'], elec_dict['B']) + if out.any() and state=='on': + self.exec_logger.error('Trying to switch on some electrodes with both A and B roles. ' + 'This would create a short-circuit! Switching aborted.') + return + + # check that none of M or N are the same as A or B + # as to prevent burning the MN part which cannot take + # the full voltage of the DPS + if 'A' in elec_dict.keys() and 'B' in elec_dict.keys() and 'M' in elec_dict.keys() and 'N' in elec_dict.keys(): + if (np.in1d(elec_dict['M'], elec_dict['A']).any() + or np.in1d(elec_dict['M'], elec_dict['B']).any() + or np.in1d(elec_dict['N'], elec_dict['A']).any() + or np.in1d(elec_dict['N'], elec_dict['B']).any()) and state=='on': + self.exec_logger.error('Trying to switch on some electrodes with both M or N roles and A or B roles.' + 'This would create an over-voltage in the RX! Switching aborted.') + return + + # if all ok, then switch the electrodes + for role in elec_dict: + for elec in elec_dict[role]: + if elec > 0: + self.switch_one(elec, role, state) + else: + self.exec_logger.warning(f'Missing argument for {self.board_name}.switch: elec_dict is None.') + + @abstractmethod + def switch_one(self, elec, role, state): + pass + + def test(self, elec_dict, activation_time=1.): + """Method to test the multiplexer. + + Parameters + ---------- + elec_dict : dictionary, optional + Dictionary of the form: {role: [list of electrodes]}. + activation_time : float, optional + Time in seconds during which the relays are activated. + """ + self.exec_logger.debug(f'Starting {self.board_name} test...') + self.reset() + + for role in elec_dict.keys(): + for elec in elec_dict['role']: + self.switch_one(elec, role, 'on') + self.exec_logger.debug(f'electrode: {elec} activated.') + time.sleep(activation_time) + self.switch_one(elec, role, 'off') + self.exec_logger.debug(f'electrode: {elec} deactivated.') + time.sleep(activation_time) + self.exec_logger.debug('Test finished.') + class TxAbstract(ABC): def __init__(self, **kwargs): self.board_name = kwargs.pop('board_name', 'unknown TX hardware') @@ -153,6 +247,7 @@ class RxAbstract(ABC): self._sampling_rate = kwargs.pop('sampling_rate', 1) self.exec_logger.debug(f'{self.board_name} RX initialization') self._adc_gain = 1. + self._max_sampling_rate = np.inf @property def adc_gain(self): @@ -175,6 +270,10 @@ class RxAbstract(ABC): @sampling_rate.setter def sampling_rate(self, value): assert value > 0. + if value > self._max_sampling_rate: + self.exec_logger.warning(f'{self} maximum sampling rate is {self._max_sampling_rate}. ' + f'Setting sampling rate to the highest allowed value.') + value = self._max_sampling_rate self._sampling_rate = value self.exec_logger.debug(f'Sampling rate set to {value}') diff --git a/hardware/dummy_mux.py b/hardware/dummy_mux.py index 34e3f04f756c33e62609b48902660b2f2bf390bf..6d3de58242309f66d41890b9d60ca9218983235b 100644 --- a/hardware/dummy_mux.py +++ b/hardware/dummy_mux.py @@ -7,4 +7,13 @@ class Mux(MuxAbstract): def __init__(self, **kwargs): kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) - self.max_elec = MUX_CONFIG['max_elec'] \ No newline at end of file + self.max_elec = MUX_CONFIG['max_elec'] + + def reset(self): + pass + + def switch(self, elec_dict, state): + pass + + def test(self): + self.exec_logger.info('MUX test finished.') \ No newline at end of file diff --git a/hardware/raspberry_pi.py b/hardware/raspberry_pi.py index cdf04664e3dfa0bddadf83ddbb0d025850c7289f..99a6062dd986d13c0342bc49c741c1fd394ee1b7 100644 --- a/hardware/raspberry_pi.py +++ b/hardware/raspberry_pi.py @@ -2,6 +2,7 @@ from OhmPi.hardware import ControllerAbstract import board # noqa import busio # noqa from OhmPi.utils import get_platform +from gpiozero import CPUTemperature class Controller(ControllerAbstract): def __init__(self, **kwargs): @@ -9,4 +10,10 @@ class Controller(ControllerAbstract): self.bus = busio.I2C(board.SCL, board.SDA) # noqa platform, on_pi = get_platform() assert on_pi - self.board_name = platform \ No newline at end of file + self.board_name = platform + self._cpu_temp_available = True + self.max_cpu_temp = 85. # °C + + @property + def _get_cpu_temp(self): + return CPUTemperature().temperature \ No newline at end of file diff --git a/logging_setup.py b/logging_setup.py index 1702ccb229b9d132105f0c9ddae950e6539c3bd7..174f74c9de9f1c8457a2feace768e4912a1a6f5f 100644 --- a/logging_setup.py +++ b/logging_setup.py @@ -1,5 +1,6 @@ import json -from OhmPi.config import EXEC_LOGGING_CONFIG, DATA_LOGGING_CONFIG, MQTT_LOGGING_CONFIG, MQTT_CONTROL_CONFIG +from OhmPi.config import EXEC_LOGGING_CONFIG, DATA_LOGGING_CONFIG, SOH_LOGGING_CONFIG,\ + MQTT_LOGGING_CONFIG, MQTT_CONTROL_CONFIG from os import path, mkdir, statvfs from time import gmtime import logging @@ -27,10 +28,50 @@ def setup_loggers(mqtt=True): if not path.isdir(log_path): mkdir(log_path) exec_log_filename = path.join(log_path, EXEC_LOGGING_CONFIG['file_name']) + soh_log_filename = path.join(log_path, SOH_LOGGING_CONFIG['file_name']) + exec_logger = logging.getLogger('exec_logger') + soh_logger = logging.getLogger('soh_logger') # SOH logging setup - # TODO: Add state of health logging here + # Set message logging format and level + log_format = '%(asctime)-15s | %(process)d | %(levelname)s: %(message)s' + logging_to_console = SOH_LOGGING_CONFIG['logging_to_console'] + soh_handler = CompressedSizedTimedRotatingFileHandler(soh_log_filename, + max_bytes=SOH_LOGGING_CONFIG['max_bytes'], + backup_count=SOH_LOGGING_CONFIG['backup_count'], + when=SOH_LOGGING_CONFIG['when'], + interval=SOH_LOGGING_CONFIG['interval']) + soh_formatter = logging.Formatter(log_format) + soh_formatter.converter = gmtime + soh_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' + soh_handler.setFormatter(soh_formatter) + soh_logger.addHandler(soh_handler) + soh_logger.setLevel(SOH_LOGGING_CONFIG['log_file_logging_level']) + + if logging_to_console: + console_soh_handler = logging.StreamHandler(sys.stdout) + console_soh_handler.setLevel(SOH_LOGGING_CONFIG['logging_level']) + console_soh_handler.setFormatter(soh_formatter) + soh_logger.addHandler(console_soh_handler) + + if mqtt: + mqtt_settings = MQTT_LOGGING_CONFIG.copy() + mqtt_soh_logging_level = mqtt_settings.pop('soh_logging_level', logging.DEBUG) + [mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic', 'data_logging_level', + 'soh_logging_level']] + mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['soh_topic']}) + # TODO: handle the case of MQTT broker down or temporarily unavailable + try: + mqtt_soh_handler = MQTTHandler(**mqtt_settings) + mqtt_soh_handler.setLevel(mqtt_soh_logging_level) + mqtt_soh_handler.setFormatter(soh_formatter) + soh_logger.addHandler(mqtt_soh_handler) + msg += colored(f"\n\u2611 Publishes execution as {MQTT_LOGGING_CONFIG['soh_topic']} topic on the " + f"{MQTT_LOGGING_CONFIG['hostname']} broker", 'blue') + except Exception as e: + msg += colored(f'\nWarning: Unable to connect to soh topic on broker\n{e}', 'yellow') + mqtt = False # Data logging setup base_path = path.dirname(__file__) @@ -119,14 +160,16 @@ def setup_loggers(mqtt=True): mqtt = False try: - init_logging(exec_logger, data_logger, EXEC_LOGGING_CONFIG['logging_level'], log_path, data_log_filename) + init_logging(exec_logger, data_logger, soh_logger, EXEC_LOGGING_CONFIG['logging_level'], + SOH_LOGGING_CONFIG['logging_level'], log_path, data_log_filename) except Exception as err: msg += colored(f'\n\u26A0 ERROR: Could not initialize logging!\n{err}', 'red') finally: - return exec_logger, exec_log_filename, data_logger, data_log_filename, EXEC_LOGGING_CONFIG['logging_level'], msg + return exec_logger, exec_log_filename, data_logger, data_log_filename, soh_logger, soh_log_filename,\ + EXEC_LOGGING_CONFIG['logging_level'], msg -def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_log_filename): +def init_logging(exec_logger, data_logger, soh_logger, exec_logging_level, soh_logging_level, log_path, data_log_filename): """ This is the init sequence for the logging system """ init_logging_status = True @@ -135,7 +178,8 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo exec_logger.info('*** NEW SESSION STARTING ***') exec_logger.info('****************************') exec_logger.info('') - exec_logger.debug(f'Logging level: {exec_logging_level}') + exec_logger.debug(f'Execution logging level: {exec_logging_level}') + exec_logger.debug(f'State of health logging level: {soh_logging_level}') try: st = statvfs('.') available_space = st.f_bavail * st.f_frsize / 1024 / 1024 @@ -144,6 +188,7 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo exec_logger.debug('Unable to get remaining disk space: {e}') exec_logger.info('Saving data log to ' + data_log_filename) config_dict = {'execution logging configuration': json.dumps(EXEC_LOGGING_CONFIG, indent=4), + 'state of health logging configuration': json.dumps(SOH_LOGGING_CONFIG, indent=4), 'data logging configuration': json.dumps(DATA_LOGGING_CONFIG, indent=4), 'mqtt logging configuration': json.dumps(MQTT_LOGGING_CONFIG, indent=4), 'mqtt control configuration': json.dumps(MQTT_CONTROL_CONFIG, indent=4)} diff --git a/measure.py b/measure.py index 1c6329d52f33e140a60249579cebb29390335175..39b1088d059c23dfe717afebdb756e42ead5f0c5 100644 --- a/measure.py +++ b/measure.py @@ -48,46 +48,32 @@ class OhmPiHardware: self.mux = kwargs.pop('mux', mux_module.Mux(exec_logger=self.exec_logger, data_logger=self.data_logger, soh_logger=self.soh_logger)) - self.readings=np.array([]) + self.readings = np.array([]) + self.readings_window = (0.3, 1.0) + + def _clear_values(self): + self.readings = np.array([]) def _inject(self, duration): self.tx_sync.set() self.tx.voltage_pulse(length=duration) self.tx_sync.clear() - def _read_values(self, sampling_rate): # noqa + def _read_values(self, sampling_rate, append=False): # noqa + if not append: + self._clear_values() _readings = [] sample = 0 self.tx_sync.wait() start_time = datetime.datetime.utcnow() while self.tx_sync.is_set(): lap = datetime.datetime.utcnow() - _readings.append([elapsed_seconds(start_time), self.tx.current, self.rx.voltage]) + _readings.append([elapsed_seconds(start_time), self.tx.current, self.rx.voltage, self.tx.polarity]) sample+=1 sleep_time = start_time + datetime.timedelta(seconds = sample * sampling_rate / 1000) - lap - # print(f'expected_end_time: {start_time+datetime.timedelta(seconds = sample * sampling_rate / 1000)}, lap: {lap}, sample: {sample}, sleep_time: {sleep_time.total_seconds()} seconds') time.sleep(np.min([0, np.abs(sleep_time.total_seconds())])) self.readings = np.array(_readings) - def _vab_pulse(self, vab, length, sampling_rate=None, polarity=None): - """ Gets VMN and IAB from a single voltage pulse - """ - - if sampling_rate is None: - sampling_rate = RX_CONFIG['sampling_rate'] - if polarity is not None and polarity != self.tx.polarity: - self.tx.polarity = polarity - self.tx.voltage = vab - injection = Thread(target=self._inject, kwargs={'duration':length}) - readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate}) - # set gains automatically - self.tx.adc_gain_auto() - self.rx.adc_gain_auto() - readings.start() - injection.start() - readings.join() - injection.join() - def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5, vab_max=voltage_max, vmn_min=voltage_min): """Estimates best Tx voltage based on different strategies. @@ -163,4 +149,38 @@ class OhmPiHardware: polarity = -1 # TODO: check if we really need to return polarity else: polarity = 1 - return vab, polarity, rab \ No newline at end of file + return vab, polarity, rab + + def vab_square_wave(self, vab, length, sampling_rate, cycles=3): + self._vab_pulses(self, vab, [length]*cycles, sampling_rate) + + def _vab_pulse(self, vab, length, sampling_rate=None, polarity=None, append=False): + """ Gets VMN and IAB from a single voltage pulse + """ + + if sampling_rate is None: + sampling_rate = RX_CONFIG['sampling_rate'] + if polarity is not None and polarity != self.tx.polarity: + self.tx.polarity = polarity + self.tx.voltage = vab + injection = Thread(target=self._inject, kwargs={'duration':length}) + readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append}) + # set gains automatically + self.tx.adc_gain_auto() + self.rx.adc_gain_auto() + readings.start() + injection.start() + readings.join() + injection.join() + + def _vab_pulses(self, vab, lengths, sampling_rate, polarities=None): + n_pulses = len(lengths) + if sampling_rate is None: + sampling_rate = RX_CONFIG['sampling_rate'] + if polarities is not None: + assert len(polarities)==n_pulses + else: + polarities = [-self.tx.polarity * np.heaviside(i % 2, -1.) for i in range(n_pulses)] + self._clear_values() + for i in range(n_pulses): + self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True) \ No newline at end of file diff --git a/ohmpi.py b/ohmpi.py index 13c9154b9b7d6a461eb95d07a9c23a1e22479ef8..8efdc605c77144153ec8ca1b631c99ba8988618f 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -4,7 +4,7 @@ created on January 6, 2020. Updates dec 2022. Hardware: Licensed under CERN-OHL-S v2 or any later version Software: Licensed under the GNU General Public License v3.0 -Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed by +Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that is developed by Rémi CLEMENT (INRAE), Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR) Olivier KAUFMANN (UMONS), Arnaud WATLET (UMONS) and Guillaume BLANCHY (FNRS/ULiege). """ @@ -24,21 +24,10 @@ import threading from OhmPi.logging_setup import setup_loggers from OhmPi.config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG from logging import DEBUG +from measure import OhmPiHardware # finish import (done only when class is instantiated as some libs are only available on arm64 platform) try: - import board # noqa - import busio # noqa - import adafruit_tca9548a # noqa - import adafruit_ads1x15.ads1115 as ads # noqa - from adafruit_ads1x15.analog_in import AnalogIn # noqa - from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa - from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa - import digitalio # noqa - from digitalio import Direction # noqa - from gpiozero import CPUTemperature # noqa - import minimalmodbus # noqa - arm64_imports = True except ImportError as error: if EXEC_LOGGING_CONFIG['logging_level'] == DEBUG: @@ -52,7 +41,7 @@ class OhmPi(object): """ OhmPi class. """ - def __init__(self, settings=None, sequence=None, use_mux=False, mqtt=True, onpi=None, idps=False): + def __init__(self, settings=None, sequence=None, mqtt=True, onpi=None): """Constructs the ohmpi object Parameters @@ -61,37 +50,34 @@ class OhmPi(object): sequence: - use_mux: - if True use the multiplexor to select active electrodes mqtt: bool, defaut: True if True publish on mqtt topics while logging, otherwise use other loggers only onpi: bool,None default: None if None, the platform on which the class is instantiated is determined to set on_pi to either True or False. if False the behaviour of an ohmpi will be partially emulated and return random data. - idps: - if true uses the DPS """ if onpi is None: _, onpi = get_platform() + elif onpi: + assert get_platform()[1] == True # Checks that the system actually runs on a pi if onpi is True + self.on_pi = onpi # True if run from the RaspberryPi with the hardware, otherwise False for random data self._sequence = sequence self.nb_samples = 0 - self.use_mux = use_mux - self.on_pi = onpi # True if run from the RaspberryPi with the hardware, otherwise False for random data self.status = 'idle' # either running or idle self.thread = None # contains the handle for the thread taking the measurement # set loggers - config_exec_logger, _, config_data_logger, _, _, msg = setup_loggers(mqtt=mqtt) # TODO: add SOH - self.data_logger = config_data_logger + config_exec_logger, _, config_data_logger, _, config_soh_logger, _, _, msg = setup_loggers(mqtt=mqtt) self.exec_logger = config_exec_logger - self.soh_logger = None # TODO: Implement the SOH logger + self.soh_logger = config_soh_logger print(msg) # read in hardware parameters (config.py) - self._read_hardware_config() - + self._hw = OhmPiHardware({'exec_logger': self.exec_logger, 'data_logger': self.data_logger, + 'soh_logger': self.soh_logger}) + self.exec_logger.info('Hardware configured...') # default acquisition settings self.settings = { 'injection_duration': 0.2, @@ -110,60 +96,6 @@ class OhmPi(object): if sequence is not None: self.load_sequence(sequence) - self.idps = idps # flag to use dps for injection or not - - # connect to components on the OhmPi board - if self.on_pi: - # activation of I2C protocol - self.i2c = busio.I2C(board.SCL, board.SDA) # noqa - - # I2C connexion to MCP23008, for current injection - self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address) - self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run - self.pin4.direction = Direction.OUTPUT - self.pin4.value = True - - # ADS1115 for current measurement (AB) - self.ads_current_address = 0x48 - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - - # ADS1115 for voltage measurement (MN) - self.ads_voltage_address = 0x49 - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - - # current injection module - if self.idps: - #self.switch_dps('on') - self.pin2 = self.mcp_board.get_pin(2) # dsp + - self.pin2.direction = Direction.OUTPUT - self.pin2.value = True - self.pin3 = self.mcp_board.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True - time.sleep(4) - self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) - self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc - self.DPS.serial.bytesize = 8 # - self.DPS.serial.timeout = 1 # greater than 0.5 for it to work - self.DPS.debug = False # - self.DPS.serial.parity = 'N' # No parity - self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode - self.DPS.write_register(0x0001, 1000, 0) # max current allowed (100 mA for relays) - # (last number) 0 is for mA, 3 is for A - - #self.soh_logger.debug(f'Battery voltage: {self.DPS.read_register(0x05,2 ):.3f}') TODO: SOH logger - print(self.DPS.read_register(0x05,2)) - self.switch_dps('off') - - - # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) - self.pin0 = self.mcp_board.get_pin(0) - self.pin0.direction = Direction.OUTPUT - self.pin0.value = False - self.pin1 = self.mcp_board.get_pin(1) - self.pin1.direction = Direction.OUTPUT - self.pin1.value = False - # set controller self.mqtt = mqtt self.cmd_id = None @@ -260,242 +192,6 @@ class OhmPi(object): w.writeheader() w.writerow(last_measurement) - def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5): - """Estimates best Tx voltage based on different strategies. - At first a half-cycle is made for a short duration with a fixed - known voltage. This gives us Iab and Rab. We also measure Vmn. - A constant c = vmn/iab is computed (only depends on geometric - factor and ground resistivity, that doesn't change during a - quadrupole). Then depending on the strategy, we compute which - vab to inject to reach the minimum/maximum Iab current or - min/max Vmn. - This function also compute the polarity on Vmn (on which pin - of the ADS1115 we need to measure Vmn to get the positive value). - - Parameters - ---------- - best_tx_injtime : float, optional - Time in milliseconds for the half-cycle used to compute Rab. - strategy : str, optional - Either: - - vmax : compute Vab to reach a maximum Iab and Vmn - - constant : apply given Vab - tx_volt : float, optional - Voltage to apply for guessing the best voltage. 5 V applied - by default. If strategy "constant" is chosen, constant voltage - to applied is "tx_volt". - - Returns - ------- - vab : float - Proposed Vab according to the given strategy. - polarity : int - Either 1 or -1 to know on which pin of the ADS the Vmn is measured. - """ - - # hardware limits - voltage_min = 10. # mV - voltage_max = 4500. - current_min = voltage_min / (self.r_shunt * 50) # mA - current_max = voltage_max / (self.r_shunt * 50) - tx_max = 50. # volt - - # check of volt - volt = tx_volt - if volt > tx_max: - self.exec_logger.warning('Sorry, cannot inject more than 50 V, set it back to 5 V') - volt = 5. - - # redefined the pin of the mcp (needed when relays are connected) - self.pin0 = self.mcp_board.get_pin(0) - self.pin0.direction = Direction.OUTPUT - self.pin0.value = False - self.pin1 = self.mcp_board.get_pin(1) - self.pin1.direction = Direction.OUTPUT - self.pin1.value = False - - # select a polarity to start with - self.pin0.value = True - self.pin1.value = False - - - if strategy == 'constant': - vab = volt - - self.DPS.write_register(0x0000, volt, 2) - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) # TODO: separate gain for P0 and P2 - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - - elif strategy == 'vmax': - # implement different strategies - I=0 - vmn=0 - count=0 - while I < 3 or abs(vmn) < 20 : #TODO: hardware related - place in config - - if count > 0 : - #print('o', volt) - volt = volt + 2 - # print('>', volt) - count=count+1 - if volt > 50: - break - - # set voltage for test - if count==1: - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time - self.DPS.write_register(0x0000, volt, 2) - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity - for i in range(10): - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - time.sleep(best_tx_injtime) - - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - - n = 0 - while (abs(vmn) > voltage_max or I > current_max) and volt>0: #If starting voltage is too high, need to lower it down - # print('we are out of range! so decreasing volt') - volt = volt - 2 - self.DPS.write_register(0x0000, volt, 2) - #self.DPS.write_register(0x09, 1) # DPS5005 on - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - n+=1 - if n > 25 : - break - - factor_I = (current_max) / I - factor_vmn = voltage_max / vmn - factor = factor_I - if factor_I > factor_vmn: - factor = factor_vmn - #print('factor', factor_I, factor_vmn) - vab = factor * volt * 0.9 - if vab > tx_max: - vab = tx_max - print(factor_I, factor_vmn, 'factor!!') - - - elif strategy == 'vmin': - # implement different strategy - I=20 - vmn=400 - count=0 - while I > 10 or abs(vmn) > 300 : #TODO: hardware related - place in config - if count > 0 : - volt = volt - 2 - print(volt, count) - count=count+1 - if volt > 50: - break - - # set voltage for test - self.DPS.write_register(0x0000, volt, 2) - if count==1: - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time - - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - - n=0 - while (abs(vmn) < voltage_min or I < current_min) and volt > 0 : #If starting voltage is too high, need to lower it down - # print('we are out of range! so increasing volt') - volt = volt + 2 - print(volt) - self.DPS.write_register(0x0000, volt, 2) - #self.DPS.write_register(0x09, 1) # DPS5005 on - #time.sleep(best_tx_injtime) - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - n+=1 - if n > 25 : - break - - vab = volt - - self.DPS.write_register(0x09, 0) # DPS5005 off - # print('polarity', polarity) - self.pin0.value = False - self.pin1.value = False - # # compute constant - # c = vmn / I - Rab = (volt * 1000.) / I # noqa - - self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') - - # self.DPS.write_register(0x09, 0) # DPS5005 off - self.pin0.value = False - self.pin1.value = False - - return vab, polarity, Rab @staticmethod def _find_identical_in_line(quads): @@ -521,32 +217,6 @@ class OhmPi(object): return output - def _gain_auto(self, channel): - """Automatically sets the gain on a channel - - Parameters - ---------- - channel : ads.ADS1x15 - Instance of ADS where voltage is measured. - - Returns - ------- - gain : float - Gain to be applied on ADS1115. - """ - - gain = 2 / 3 - if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.0): - gain = 2 - elif (abs(channel.voltage) < 1.0) and (abs(channel.voltage) >= 0.500): - gain = 4 - elif (abs(channel.voltage) < 0.500) and (abs(channel.voltage) >= 0.250): - gain = 8 - elif abs(channel.voltage) < 0.250: - gain = 16 - self.exec_logger.debug(f'Setting gain to {gain}') - return gain - def get_data(self, survey_names=None, cmd_id=None): """Get available data. @@ -752,9 +422,6 @@ class OhmPi(object): warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) self.load_sequence(**kwargs) - def _read_voltage(self): - pass - def remove_data(self, cmd_id=None): """Remove all data in the data folder @@ -793,7 +460,7 @@ class OhmPi(object): Quadrupole to measure, just for labelling. Only switch_mux_on/off really create the route to the electrodes. nb_stack : int, optional - Number of stacks. A stacl is considered two half-cycles (one + Number of stacks. A stack is considered two half-cycles (one positive, one negative). injection_duration : int, optional Injection time in seconds. @@ -820,7 +487,7 @@ class OhmPi(object): if quad is None: quad = [0, 0, 0, 0] - if self.on_pi: + if self.on_pi: # TODO : Remove this condition? if nb_stack is None: nb_stack = self.settings['nb_stack'] if injection_duration is None: @@ -832,283 +499,283 @@ class OhmPi(object): sum_vmn = 0 sum_ps = 0 - # let's define the pin again as if we run through measure() - # as it's run in another thread, it doesn't consider these - # and this can lead to short circuit! - - self.pin0 = self.mcp_board.get_pin(0) - self.pin0.direction = Direction.OUTPUT - self.pin0.value = False - self.pin1 = self.mcp_board.get_pin(1) - self.pin1.direction = Direction.OUTPUT - self.pin1.value = False - self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement - self.pin7.direction = Direction.OUTPUT - self.pin7.value = False - - if self.sequence is None: - if self.idps: - - # self.switch_dps('on') - self.pin2 = self.mcp_board.get_pin(2) # dsp + - self.pin2.direction = Direction.OUTPUT - self.pin2.value = True - self.pin3 = self.mcp_board.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True - time.sleep(4) - - self.pin5 = self.mcp_board.get_pin(5) #IHM on mesaurement - self.pin5.direction = Direction.OUTPUT - self.pin5.value = True - self.pin6 = self.mcp_board.get_pin(6) #IHM on mesaurement - self.pin6.direction = Direction.OUTPUT - self.pin6.value = False - self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement - self.pin7.direction = Direction.OUTPUT - self.pin7.value = False - if self.idps: - if self.DPS.read_register(0x05,2) < 11: - self.pin7.value = True# max current allowed (100 mA for relays) #voltage - - # get best voltage to inject AND polarity - if self.idps: - tx_volt, polarity, Rab = self._compute_tx_volt( - best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) - self.exec_logger.debug(f'Best VAB found is {tx_volt:.3f}V') - else: - polarity = 1 - Rab = None - - # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) - self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, - address=self.ads_current_address, mode=0) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, - address=self.ads_voltage_address, mode=0) - # turn on the power supply - start_delay = None - end_delay = None - out_of_range = False - if self.idps: - if not np.isnan(tx_volt): - self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(0.3) - else: - self.exec_logger.debug('No best voltage found, will not take measurement') - out_of_range = True - - if not out_of_range: # we found a Vab in the range so we measure - if autogain: - - # compute autogain - gain_voltage = [] - for n in [0,1]: # make short cycle for gain computation - self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, - address=self.ads_voltage_address, mode=0) - if n == 0: - self.pin0.value = True - self.pin1.value = False - if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on - else: - self.pin0.value = False - self.pin1.value = True # current injection nr2 - if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on - - time.sleep(injection_duration) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - - if polarity > 0: - if n == 0: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) - else: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) - else: - if n == 0: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) - else: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) - - self.pin0.value = False - self.pin1.value = False - time.sleep(injection_duration) - if n == 0: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) - else: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) - if self.board_version == 'mb.2023.0.0': - self.pin6.value = False # IHM current injection led off - - self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, ' - f'{gain_voltage[1]:.3f}') - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, - address=self.ads_current_address, mode=0) - - self.pin0.value = False - self.pin1.value = False - - # one stack = 2 half-cycles (one positive, one negative) - pinMN = 0 if polarity > 0 else 2 # noqa - - # sampling for each stack at the end of the injection - sampling_interval = 10 # ms # TODO: make this a config option - self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy - - # full data for waveform - fulldata = [] - - # we sample every 10 ms (as using AnalogIn for both current - # and voltage takes about 7 ms). When we go over the injection - # duration, we break the loop and truncate the meas arrays - # only the last values in meas will be taken into account - start_time = time.time() # start counter - for n in range(0, nb_stack * 2): # for each half-cycles - # current injection - if (n % 2) == 0: - self.pin0.value = True - self.pin1.value = False - if autogain: # select gain computed on first half cycle - self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860, - address=self.ads_voltage_address, mode=0) - else: - self.pin0.value = False - self.pin1.value = True # current injection nr2 - if autogain: # select gain computed on first half cycle - self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage),data_rate=860, - address=self.ads_voltage_address, mode=0) - self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') - if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on - # measurement of current i and voltage u during injection - meas = np.zeros((self.nb_samples, 3)) * np.nan - start_delay = time.time() # stating measurement time - dt = 0 - k = 0 - for k in range(0, self.nb_samples): - # reading current value on ADS channels - meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) - if self.board_version == 'mb.2023.0.0': - if pinMN == 0: - meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 - else: - meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 - elif self.board_version == '22.10': - meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 - # else: - # self.exec_logger.debug('Unknown board') - time.sleep(sampling_interval / 1000) - dt = time.time() - start_delay # real injection time (s) - meas[k, 2] = time.time() - start_time - if dt > (injection_duration - 0 * sampling_interval / 1000.): - break - - # stop current injection - self.pin0.value = False - self.pin1.value = False -# if autogain: # select gain computed on first half cycle -# self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[2],data_rate=860, + # # let's define the pin again as if we run through measure() + # # as it's run in another thread, it doesn't consider these + # # and this can lead to short circuit! + # + # self.pin0 = self.mcp_board.get_pin(0) + # self.pin0.direction = Direction.OUTPUT + # self.pin0.value = False + # self.pin1 = self.mcp_board.get_pin(1) + # self.pin1.direction = Direction.OUTPUT + # self.pin1.value = False + # self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement + # self.pin7.direction = Direction.OUTPUT + # self.pin7.value = False + + # if self.sequence is None: +# if self.idps: +# +# # self.switch_dps('on') +# self.pin2 = self.mcp_board.get_pin(2) # dsp + +# self.pin2.direction = Direction.OUTPUT +# self.pin2.value = True +# self.pin3 = self.mcp_board.get_pin(3) # dsp - +# self.pin3.direction = Direction.OUTPUT +# self.pin3.value = True +# time.sleep(4) +# +# self.pin5 = self.mcp_board.get_pin(5) #IHM on mesaurement +# self.pin5.direction = Direction.OUTPUT +# self.pin5.value = True +# self.pin6 = self.mcp_board.get_pin(6) #IHM on mesaurement +# self.pin6.direction = Direction.OUTPUT +# self.pin6.value = False +# self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement +# self.pin7.direction = Direction.OUTPUT +# self.pin7.value = False +# if self.idps: +# if self.DPS.read_register(0x05,2) < 11: +# self.pin7.value = True# max current allowed (100 mA for relays) #voltage +# +# # get best voltage to inject AND polarity +# if self.idps: +# tx_volt, polarity, Rab = self._compute_tx_volt( +# best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) +# self.exec_logger.debug(f'Best VAB found is {tx_volt:.3f}V') +# else: +# polarity = 1 +# Rab = None +# +# # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) +# self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, +# address=self.ads_current_address, mode=0) +# self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, +# address=self.ads_voltage_address, mode=0) +# # turn on the power supply +# start_delay = None +# end_delay = None +# out_of_range = False +# if self.idps: +# if not np.isnan(tx_volt): +# self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V +# self.DPS.write_register(0x09, 1) # DPS5005 on +# time.sleep(0.3) +# else: +# self.exec_logger.debug('No best voltage found, will not take measurement') +# out_of_range = True +# +# if not out_of_range: # we found a Vab in the range so we measure +# if autogain: +# +# # compute autogain +# gain_voltage = [] +# for n in [0,1]: # make short cycle for gain computation +# self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, +# address=self.ads_voltage_address, mode=0) +# if n == 0: +# self.pin0.value = True +# self.pin1.value = False +# if self.board_version == 'mb.2023.0.0': +# self.pin6.value = True # IHM current injection led on +# else: +# self.pin0.value = False +# self.pin1.value = True # current injection nr2 +# if self.board_version == 'mb.2023.0.0': +# self.pin6.value = True # IHM current injection led on +# +# time.sleep(injection_duration) +# gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) +# +# if polarity > 0: +# if n == 0: +# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) +# else: +# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) +# else: +# if n == 0: +# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) +# else: +# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) +# +# self.pin0.value = False +# self.pin1.value = False +# time.sleep(injection_duration) +# if n == 0: +# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) +# else: +# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) +# if self.board_version == 'mb.2023.0.0': +# self.pin6.value = False # IHM current injection led off +# +# self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, ' +# f'{gain_voltage[1]:.3f}') +# self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, +# address=self.ads_current_address, mode=0) +# +# self.pin0.value = False +# self.pin1.value = False +# +# # one stack = 2 half-cycles (one positive, one negative) +# pinMN = 0 if polarity > 0 else 2 # noqa +# +# # sampling for each stack at the end of the injection +# sampling_interval = 10 # ms # TODO: make this a config option +# self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy +# +# # full data for waveform +# fulldata = [] +# +# # we sample every 10 ms (as using AnalogIn for both current +# # and voltage takes about 7 ms). When we go over the injection +# # duration, we break the loop and truncate the meas arrays +# # only the last values in meas will be taken into account +# start_time = time.time() # start counter +# for n in range(0, nb_stack * 2): # for each half-cycles +# # current injection +# if (n % 2) == 0: +# self.pin0.value = True +# self.pin1.value = False +# if autogain: # select gain computed on first half cycle +# self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860, # address=self.ads_voltage_address, mode=0) - self.pin6.value = False# IHM current injection led on - end_delay = time.time() - - # truncate the meas array if we didn't fill the last samples #TODO: check why - meas = meas[:k + 1] - - # measurement of current i and voltage u during off time - measpp = np.zeros((meas.shape[0], 3)) * np.nan - start_delay = time.time() # stating measurement time - dt = 0 - for k in range(0, measpp.shape[0]): - # reading current value on ADS channels - measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt) - if self.board_version == 'mb.2023.0.0': - if pinMN == 0: - measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - else: - measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1 - elif self.board_version == '22.10': - measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000. - else: - self.exec_logger.debug('unknown board') - time.sleep(sampling_interval / 1000) - dt = time.time() - start_delay # real injection time (s) - measpp[k, 2] = time.time() - start_time - if dt > (injection_duration - 0 * sampling_interval / 1000.): - break - - end_delay = time.time() - - # truncate the meas array if we didn't fill the last samples - measpp = measpp[:k + 1] - - # we alternate on which ADS1115 pin we measure because of sign of voltage - if pinMN == 0: - pinMN = 2 # noqa - else: - pinMN = 0 # noqa - - # store data for full wave form - fulldata.append(meas) - fulldata.append(measpp) - - # TODO get battery voltage and warn if battery is running low - # TODO send a message on SOH stating the battery level - - # let's do some calculation (out of the stacking loop) - - # i_stack = np.empty(2 * nb_stack, dtype=object) - # vmn_stack = np.empty(2 * nb_stack, dtype=object) - i_stack, vmn_stack = [], [] - # select appropriate window length to average the readings - window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3) - for n, meas in enumerate(fulldata[::2]): - # take average from the samples per stack, then sum them all - # average for the last third of the stacked values - # is done outside the loop - i_stack.append(meas[-int(window):, 0]) - vmn_stack.append(meas[-int(window):, 1]) - - sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) - vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) - if (n % 2) == 0: - sum_vmn = sum_vmn - vmn1 - sum_ps = sum_ps + vmn1 - else: - sum_vmn = sum_vmn + vmn1 - sum_ps = sum_ps + vmn1 - - else: - sum_i = np.nan - sum_vmn = np.nan - sum_ps = np.nan - fulldata = None - - if self.idps: - self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt - self.DPS.write_register(0x09, 0) # DPS5005 off - - # reshape full data to an array of good size - # we need an array of regular size to save in the csv - if not out_of_range: - fulldata = np.vstack(fulldata) - # we create a big enough array given nb_samples, number of - # half-cycles (1 stack = 2 half-cycles), and twice as we - # measure decay as well - a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 3)) * np.nan - a[:fulldata.shape[0], :] = fulldata - fulldata = a - else: - np.array([[]]) - - vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) - vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) - i_stack_mean = np.mean(i_stack) - i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) - r_stack_mean = vmn_stack_mean / i_stack_mean - r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean - ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) +# else: +# self.pin0.value = False +# self.pin1.value = True # current injection nr2 +# if autogain: # select gain computed on first half cycle +# self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage),data_rate=860, +# address=self.ads_voltage_address, mode=0) +# self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') +# if self.board_version == 'mb.2023.0.0': +# self.pin6.value = True # IHM current injection led on +# # measurement of current i and voltage u during injection +# meas = np.zeros((self.nb_samples, 3)) * np.nan +# start_delay = time.time() # stating measurement time +# dt = 0 +# k = 0 +# for k in range(0, self.nb_samples): +# # reading current value on ADS channels +# meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) +# if self.board_version == 'mb.2023.0.0': +# if pinMN == 0: +# meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 +# else: +# meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 +# elif self.board_version == '22.10': +# meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 +# # else: +# # self.exec_logger.debug('Unknown board') +# time.sleep(sampling_interval / 1000) +# dt = time.time() - start_delay # real injection time (s) +# meas[k, 2] = time.time() - start_time +# if dt > (injection_duration - 0 * sampling_interval / 1000.): +# break +# +# # stop current injection +# self.pin0.value = False +# self.pin1.value = False +# # if autogain: # select gain computed on first half cycle +# # self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[2],data_rate=860, +# # address=self.ads_voltage_address, mode=0) +# self.pin6.value = False# IHM current injection led on +# end_delay = time.time() +# +# # truncate the meas array if we didn't fill the last samples #TODO: check why +# meas = meas[:k + 1] +# +# # measurement of current i and voltage u during off time +# measpp = np.zeros((meas.shape[0], 3)) * np.nan +# start_delay = time.time() # stating measurement time +# dt = 0 +# for k in range(0, measpp.shape[0]): +# # reading current value on ADS channels +# measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt) +# if self.board_version == 'mb.2023.0.0': +# if pinMN == 0: +# measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. +# else: +# measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1 +# elif self.board_version == '22.10': +# measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000. +# else: +# self.exec_logger.debug('unknown board') +# time.sleep(sampling_interval / 1000) +# dt = time.time() - start_delay # real injection time (s) +# measpp[k, 2] = time.time() - start_time +# if dt > (injection_duration - 0 * sampling_interval / 1000.): +# break +# +# end_delay = time.time() +# +# # truncate the meas array if we didn't fill the last samples +# measpp = measpp[:k + 1] +# +# # we alternate on which ADS1115 pin we measure because of sign of voltage +# if pinMN == 0: +# pinMN = 2 # noqa +# else: +# pinMN = 0 # noqa +# +# # store data for full wave form +# fulldata.append(meas) +# fulldata.append(measpp) +# +# # TODO get battery voltage and warn if battery is running low +# # TODO send a message on SOH stating the battery level +# +# # let's do some calculation (out of the stacking loop) +# +# # i_stack = np.empty(2 * nb_stack, dtype=object) +# # vmn_stack = np.empty(2 * nb_stack, dtype=object) +# i_stack, vmn_stack = [], [] +# # select appropriate window length to average the readings +# window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3) +# for n, meas in enumerate(fulldata[::2]): +# # take average from the samples per stack, then sum them all +# # average for the last third of the stacked values +# # is done outside the loop +# i_stack.append(meas[-int(window):, 0]) +# vmn_stack.append(meas[-int(window):, 1]) +# +# sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) +# vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) +# if (n % 2) == 0: +# sum_vmn = sum_vmn - vmn1 +# sum_ps = sum_ps + vmn1 +# else: +# sum_vmn = sum_vmn + vmn1 +# sum_ps = sum_ps + vmn1 +# +# else: +# sum_i = np.nan +# sum_vmn = np.nan +# sum_ps = np.nan +# fulldata = None +# +# if self.idps: +# self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt +# self.DPS.write_register(0x09, 0) # DPS5005 off +# +# # reshape full data to an array of good size +# # we need an array of regular size to save in the csv +# if not out_of_range: +# fulldata = np.vstack(fulldata) +# # we create a big enough array given nb_samples, number of +# # half-cycles (1 stack = 2 half-cycles), and twice as we +# # measure decay as well +# a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 3)) * np.nan +# a[:fulldata.shape[0], :] = fulldata +# fulldata = a +# else: +# np.array([[]]) +# +# vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) +# vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) +# i_stack_mean = np.mean(i_stack) +# i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) +# r_stack_mean = vmn_stack_mean / i_stack_mean +# r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean +# ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) # create a dictionary and compute averaged values from all stacks # if self.board_version == 'mb.2023.0.0': @@ -1125,7 +792,7 @@ class OhmPi(object): "Ps [mV]": sum_ps / (2 * nb_stack), "nbStack": nb_stack, "Tx [V]": tx_volt if not out_of_range else 0., - "CPU temp [degC]": CPUTemperature().temperature, + "CPU temp [degC]": self._hw.cpu_temperature, "Nb samples [-]": self.nb_samples, "fulldata": fulldata, "I_stack [mA]": i_stack_mean, @@ -1174,15 +841,15 @@ class OhmPi(object): dd.update({'N': str(dd['N'])}) # round float to 2 decimal - for key in dd.keys(): + for key in dd.keys(): # Check why this is applied on keys and not values... if isinstance(dd[key], float): dd[key] = np.round(dd[key], 3) dd['cmd_id'] = str(cmd_id) self.data_logger.info(dd) - self.pin5.value = False #IHM led on measurement off - if self.sequence is None : - self.switch_dps('off') + # self.pin5.value = False #IHM led on measurement off + # if self.sequence is None : + # self.switch_dps('off') return d @@ -1266,21 +933,21 @@ class OhmPi(object): quad = self.sequence[i, :] # quadrupole if self.status == 'stopping': break - if i == 0: - # call the switch_mux function to switch to the right electrodes - # switch on DPS - self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address) - self.pin2 = self.mcp_board.get_pin(2) # dsp - - self.pin2.direction = Direction.OUTPUT - self.pin2.value = True - self.pin3 = self.mcp_board.get_pin(3) # dsp - - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True - time.sleep (4) - - #self.switch_dps('on') - time.sleep(.6) - self.switch_mux_on(quad) + # if i == 0: + # # call the switch_mux function to switch to the right electrodes + # # switch on DPS + # self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address) + # self.pin2 = self.mcp_board.get_pin(2) # dsp - + # self.pin2.direction = Direction.OUTPUT + # self.pin2.value = True + # self.pin3 = self.mcp_board.get_pin(3) # dsp - + # self.pin3.direction = Direction.OUTPUT + # self.pin3.value = True + # time.sleep (4) + # + # #self.switch_dps('on') + # time.sleep(.6) + # self.switch_mux_on(quad) # run a measurement if self.on_pi: acquired_data = self.run_measurement(quad, **kwargs) @@ -1306,10 +973,10 @@ class OhmPi(object): } self.data_logger.info(acquired_data) - # switch mux off - self.switch_mux_off(quad) - - # add command_id in dataset + # # switch mux off + # self.switch_mux_off(quad) + # + # # add command_id in dataset acquired_data.update({'cmd_id': cmd_id}) # log data to the data logger # self.data_logger.info(f'{acquired_data}') @@ -1317,7 +984,7 @@ class OhmPi(object): self.append_and_save(filename, acquired_data) self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') - self.switch_dps('off') + # self.switch_dps('off') self.status = 'idle' def run_sequence_async(self, cmd_id=None, **kwargs): @@ -1337,7 +1004,7 @@ class OhmPi(object): self.thread.start() self.status = 'idle' - def rs_check(self, tx_volt=12., cmd_id=None): + def rs_check(self, tx_volt=12., cmd_id=None): # TODO: we could build a smarter RS-Check by selecting adjacent electrodes based on their locations and try to isolate electrodes that are responsible for high resistances (ex: AB high, AC low, BC high -> might be a problem at B (cf what we did with WofE) """Checks contact resistances Parameters @@ -1359,21 +1026,20 @@ class OhmPi(object): elec[:-1], elec[1:], ]).T - if self.idps: - quads[:, 2:] = 0 # we don't open Vmn to prevent burning the MN part - # as it has a smaller range of accepted voltage + # if self.idps: + # quads[:, 2:] = 0 # we don't open Vmn to prevent burning the MN part + # # as it has a smaller range of accepted voltage # create filename to store RS export_path_rs = self.settings['export_path'].replace('.csv', '') \ + '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '_rs.csv' # perform RS check - # self.run = True self.status = 'running' if self.on_pi: # make sure all mux are off to start with - self.reset_mux() + self._hw.mux.reset_mux() # measure all quad of the RS sequence for i in range(0, quads.shape[0]): @@ -1543,54 +1209,54 @@ class OhmPi(object): if quadrupole[i] > 0: self._switch_mux(quadrupole[i], 'off', roles[i]) - def test_mux(self, activation_time=1.0, address=0x70): - """Interactive method to test the multiplexer. - - Parameters - ---------- - activation_time : float, optional - Time in seconds during which the relays are activated. - address : hex, optional - Address of the multiplexer board to test (e.g. 0x70, 0x71, ...). - """ - self.use_mux = True - self.reset_mux() - - # choose with MUX board - tca = adafruit_tca9548a.TCA9548A(self.i2c, address) - - # ask use some details on how to proceed - a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!') - if a == '1': - print('run channel by channel test') - electrode = int(input('Choose your electrode number (integer):')) - electrodes = [electrode] - elif a == '2': - electrodes = range(1, 65) - else: - print('Wrong choice !') - return - - # run the test - for electrode_nr in electrodes: - # find I2C address of the electrode and corresponding relay - # considering that one MCP23017 can cover 16 electrodes - i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division - relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 - - if i2c_address is not None: - # select the MCP23017 of the selected MUX board - mcp2 = MCP23017(tca[i2c_address]) - mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT - - # activate relay for given time - mcp2.get_pin(relay_nr - 1).value = True - print('electrode:', electrode_nr, ' activated...', end='', flush=True) - time.sleep(activation_time) - mcp2.get_pin(relay_nr - 1).value = False - print(' deactivated') - time.sleep(activation_time) - print('Test finished.') + # def test_mux(self, activation_time=1.0, address=0x70): TODO: add this in the MUX code + # """Interactive method to test the multiplexer. + # + # Parameters + # ---------- + # activation_time : float, optional + # Time in seconds during which the relays are activated. + # address : hex, optional + # Address of the multiplexer board to test (e.g. 0x70, 0x71, ...). + # """ + # self.use_mux = True + # self.reset_mux() + # + # # choose with MUX board + # tca = adafruit_tca9548a.TCA9548A(self.i2c, address) + # + # # ask use some details on how to proceed + # a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!') + # if a == '1': + # print('run channel by channel test') + # electrode = int(input('Choose your electrode number (integer):')) + # electrodes = [electrode] + # elif a == '2': + # electrodes = range(1, 65) + # else: + # print('Wrong choice !') + # return + # + # # run the test + # for electrode_nr in electrodes: + # # find I2C address of the electrode and corresponding relay + # # considering that one MCP23017 can cover 16 electrodes + # i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division + # relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 + # + # if i2c_address is not None: + # # select the MCP23017 of the selected MUX board + # mcp2 = MCP23017(tca[i2c_address]) + # mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT + # + # # activate relay for given time + # mcp2.get_pin(relay_nr - 1).value = True + # print('electrode:', electrode_nr, ' activated...', end='', flush=True) + # time.sleep(activation_time) + # mcp2.get_pin(relay_nr - 1).value = False + # print(' deactivated') + # time.sleep(activation_time) + # print('Test finished.') def reset_mux(self, cmd_id=None): """Switches off all multiplexer relays. diff --git a/ohmpi_bkp.py b/ohmpi_bkp.py new file mode 100644 index 0000000000000000000000000000000000000000..13c9154b9b7d6a461eb95d07a9c23a1e22479ef8 --- /dev/null +++ b/ohmpi_bkp.py @@ -0,0 +1,1701 @@ +# -*- coding: utf-8 -*- +""" +created on January 6, 2020. +Updates dec 2022. +Hardware: Licensed under CERN-OHL-S v2 or any later version +Software: Licensed under the GNU General Public License v3.0 +Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed by +Rémi CLEMENT (INRAE), Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR) +Olivier KAUFMANN (UMONS), Arnaud WATLET (UMONS) and Guillaume BLANCHY (FNRS/ULiege). +""" + +import os +from OhmPi.utils import get_platform +import json +import warnings +from copy import deepcopy +import numpy as np +import csv +import time +import shutil +from datetime import datetime +from termcolor import colored +import threading +from OhmPi.logging_setup import setup_loggers +from OhmPi.config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG +from logging import DEBUG + +# finish import (done only when class is instantiated as some libs are only available on arm64 platform) +try: + import board # noqa + import busio # noqa + import adafruit_tca9548a # noqa + import adafruit_ads1x15.ads1115 as ads # noqa + from adafruit_ads1x15.analog_in import AnalogIn # noqa + from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa + from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa + import digitalio # noqa + from digitalio import Direction # noqa + from gpiozero import CPUTemperature # noqa + import minimalmodbus # noqa + + arm64_imports = True +except ImportError as error: + if EXEC_LOGGING_CONFIG['logging_level'] == DEBUG: + print(colored(f'Import error: {error}', 'yellow')) + arm64_imports = False +except Exception as error: + print(colored(f'Unexpected error: {error}', 'red')) + arm64_imports = None + +class OhmPi(object): + """ OhmPi class. + """ + + def __init__(self, settings=None, sequence=None, use_mux=False, mqtt=True, onpi=None, idps=False): + """Constructs the ohmpi object + + Parameters + ---------- + settings: + + sequence: + + use_mux: + if True use the multiplexor to select active electrodes + mqtt: bool, defaut: True + if True publish on mqtt topics while logging, otherwise use other loggers only + onpi: bool,None default: None + if None, the platform on which the class is instantiated is determined to set on_pi to either True or False. + if False the behaviour of an ohmpi will be partially emulated and return random data. + idps: + if true uses the DPS + """ + + if onpi is None: + _, onpi = get_platform() + + self._sequence = sequence + self.nb_samples = 0 + self.use_mux = use_mux + self.on_pi = onpi # True if run from the RaspberryPi with the hardware, otherwise False for random data + self.status = 'idle' # either running or idle + self.thread = None # contains the handle for the thread taking the measurement + + # set loggers + config_exec_logger, _, config_data_logger, _, _, msg = setup_loggers(mqtt=mqtt) # TODO: add SOH + self.data_logger = config_data_logger + self.exec_logger = config_exec_logger + self.soh_logger = None # TODO: Implement the SOH logger + print(msg) + + # read in hardware parameters (config.py) + self._read_hardware_config() + + # default acquisition settings + self.settings = { + 'injection_duration': 0.2, + 'nb_meas': 1, + 'sequence_delay': 1, + 'nb_stack': 1, + 'export_path': 'data/measurement.csv' + } + # read in acquisition settings + if settings is not None: + self.update_settings(settings) + + self.exec_logger.debug('Initialized with settings:' + str(self.settings)) + + # read quadrupole sequence + if sequence is not None: + self.load_sequence(sequence) + + self.idps = idps # flag to use dps for injection or not + + # connect to components on the OhmPi board + if self.on_pi: + # activation of I2C protocol + self.i2c = busio.I2C(board.SCL, board.SDA) # noqa + + # I2C connexion to MCP23008, for current injection + self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address) + self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run + self.pin4.direction = Direction.OUTPUT + self.pin4.value = True + + # ADS1115 for current measurement (AB) + self.ads_current_address = 0x48 + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + + # ADS1115 for voltage measurement (MN) + self.ads_voltage_address = 0x49 + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + + # current injection module + if self.idps: + #self.switch_dps('on') + self.pin2 = self.mcp_board.get_pin(2) # dsp + + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp_board.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + time.sleep(4) + self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) + self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc + self.DPS.serial.bytesize = 8 # + self.DPS.serial.timeout = 1 # greater than 0.5 for it to work + self.DPS.debug = False # + self.DPS.serial.parity = 'N' # No parity + self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode + self.DPS.write_register(0x0001, 1000, 0) # max current allowed (100 mA for relays) + # (last number) 0 is for mA, 3 is for A + + #self.soh_logger.debug(f'Battery voltage: {self.DPS.read_register(0x05,2 ):.3f}') TODO: SOH logger + print(self.DPS.read_register(0x05,2)) + self.switch_dps('off') + + + # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) + self.pin0 = self.mcp_board.get_pin(0) + self.pin0.direction = Direction.OUTPUT + self.pin0.value = False + self.pin1 = self.mcp_board.get_pin(1) + self.pin1.direction = Direction.OUTPUT + self.pin1.value = False + + # set controller + self.mqtt = mqtt + self.cmd_id = None + if self.mqtt: + import paho.mqtt.client as mqtt_client + + self.exec_logger.debug(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}" + f" on {MQTT_CONTROL_CONFIG['hostname']} broker") + + def connect_mqtt() -> mqtt_client: + def on_connect(mqttclient, userdata, flags, rc): + if rc == 0: + self.exec_logger.debug(f"Successfully connected to control broker:" + f" {MQTT_CONTROL_CONFIG['hostname']}") + else: + self.exec_logger.warning(f'Failed to connect to control broker. Return code : {rc}') + + client = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) + client.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), + MQTT_CONTROL_CONFIG['auth']['password']) + client.on_connect = on_connect + client.connect(MQTT_CONTROL_CONFIG['hostname'], MQTT_CONTROL_CONFIG['port']) + return client + + try: + self.exec_logger.debug(f"Connecting to control broker: {MQTT_CONTROL_CONFIG['hostname']}") + self.controller = connect_mqtt() + except Exception as e: + self.exec_logger.debug(f'Unable to connect control broker: {e}') + self.controller = None + if self.controller is not None: + self.exec_logger.debug(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") + try: + self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) + + msg = f"Subscribed to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}" \ + f" on {MQTT_CONTROL_CONFIG['hostname']} broker" + self.exec_logger.debug(msg) + print(colored(f'\u2611 {msg}', 'blue')) + except Exception as e: + self.exec_logger.warning(f'Unable to subscribe to control topic : {e}') + self.controller = None + publisher_config = MQTT_CONTROL_CONFIG.copy() + publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic'] + publisher_config.pop('ctrl_topic') + + def on_message(client, userdata, message): + command = message.payload.decode('utf-8') + self.exec_logger.debug(f'Received command {command}') + self._process_commands(command) + + self.controller.on_message = on_message + else: + self.controller = None + self.exec_logger.warning('No connection to control broker.' + ' Use python/ipython to interact with OhmPi object...') + + @staticmethod + def append_and_save(filename: str, last_measurement: dict, cmd_id=None): + """Appends and saves the last measurement dict. + + Parameters + ---------- + filename : str + filename to save the last measurement dataframe + last_measurement : dict + Last measurement taken in the form of a python dictionary + cmd_id : str, optional + Unique command identifier + """ + last_measurement = deepcopy(last_measurement) + if 'fulldata' in last_measurement: + d = last_measurement['fulldata'] + n = d.shape[0] + if n > 1: + idic = dict(zip(['i' + str(i) for i in range(n)], d[:, 0])) + udic = dict(zip(['u' + str(i) for i in range(n)], d[:, 1])) + tdic = dict(zip(['t' + str(i) for i in range(n)], d[:, 2])) + last_measurement.update(idic) + last_measurement.update(udic) + last_measurement.update(tdic) + last_measurement.pop('fulldata') + + if os.path.isfile(filename): + # Load data file and append data to it + with open(filename, 'a') as f: + w = csv.DictWriter(f, last_measurement.keys()) + w.writerow(last_measurement) + # last_measurement.to_csv(f, header=False) + else: + # create data file and add headers + with open(filename, 'a') as f: + w = csv.DictWriter(f, last_measurement.keys()) + w.writeheader() + w.writerow(last_measurement) + + def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5): + """Estimates best Tx voltage based on different strategies. + At first a half-cycle is made for a short duration with a fixed + known voltage. This gives us Iab and Rab. We also measure Vmn. + A constant c = vmn/iab is computed (only depends on geometric + factor and ground resistivity, that doesn't change during a + quadrupole). Then depending on the strategy, we compute which + vab to inject to reach the minimum/maximum Iab current or + min/max Vmn. + This function also compute the polarity on Vmn (on which pin + of the ADS1115 we need to measure Vmn to get the positive value). + + Parameters + ---------- + best_tx_injtime : float, optional + Time in milliseconds for the half-cycle used to compute Rab. + strategy : str, optional + Either: + - vmax : compute Vab to reach a maximum Iab and Vmn + - constant : apply given Vab + tx_volt : float, optional + Voltage to apply for guessing the best voltage. 5 V applied + by default. If strategy "constant" is chosen, constant voltage + to applied is "tx_volt". + + Returns + ------- + vab : float + Proposed Vab according to the given strategy. + polarity : int + Either 1 or -1 to know on which pin of the ADS the Vmn is measured. + """ + + # hardware limits + voltage_min = 10. # mV + voltage_max = 4500. + current_min = voltage_min / (self.r_shunt * 50) # mA + current_max = voltage_max / (self.r_shunt * 50) + tx_max = 50. # volt + + # check of volt + volt = tx_volt + if volt > tx_max: + self.exec_logger.warning('Sorry, cannot inject more than 50 V, set it back to 5 V') + volt = 5. + + # redefined the pin of the mcp (needed when relays are connected) + self.pin0 = self.mcp_board.get_pin(0) + self.pin0.direction = Direction.OUTPUT + self.pin0.value = False + self.pin1 = self.mcp_board.get_pin(1) + self.pin1.direction = Direction.OUTPUT + self.pin1.value = False + + # select a polarity to start with + self.pin0.value = True + self.pin1.value = False + + + if strategy == 'constant': + vab = volt + + self.DPS.write_register(0x0000, volt, 2) + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) # TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + + elif strategy == 'vmax': + # implement different strategies + I=0 + vmn=0 + count=0 + while I < 3 or abs(vmn) < 20 : #TODO: hardware related - place in config + + if count > 0 : + #print('o', volt) + volt = volt + 2 + # print('>', volt) + count=count+1 + if volt > 50: + break + + # set voltage for test + if count==1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + self.DPS.write_register(0x0000, volt, 2) + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + for i in range(10): + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + time.sleep(best_tx_injtime) + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + + n = 0 + while (abs(vmn) > voltage_max or I > current_max) and volt>0: #If starting voltage is too high, need to lower it down + # print('we are out of range! so decreasing volt') + volt = volt - 2 + self.DPS.write_register(0x0000, volt, 2) + #self.DPS.write_register(0x09, 1) # DPS5005 on + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + n+=1 + if n > 25 : + break + + factor_I = (current_max) / I + factor_vmn = voltage_max / vmn + factor = factor_I + if factor_I > factor_vmn: + factor = factor_vmn + #print('factor', factor_I, factor_vmn) + vab = factor * volt * 0.9 + if vab > tx_max: + vab = tx_max + print(factor_I, factor_vmn, 'factor!!') + + + elif strategy == 'vmin': + # implement different strategy + I=20 + vmn=400 + count=0 + while I > 10 or abs(vmn) > 300 : #TODO: hardware related - place in config + if count > 0 : + volt = volt - 2 + print(volt, count) + count=count+1 + if volt > 50: + break + + # set voltage for test + self.DPS.write_register(0x0000, volt, 2) + if count==1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + + n=0 + while (abs(vmn) < voltage_min or I < current_min) and volt > 0 : #If starting voltage is too high, need to lower it down + # print('we are out of range! so increasing volt') + volt = volt + 2 + print(volt) + self.DPS.write_register(0x0000, volt, 2) + #self.DPS.write_register(0x09, 1) # DPS5005 on + #time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + n+=1 + if n > 25 : + break + + vab = volt + + self.DPS.write_register(0x09, 0) # DPS5005 off + # print('polarity', polarity) + self.pin0.value = False + self.pin1.value = False + # # compute constant + # c = vmn / I + Rab = (volt * 1000.) / I # noqa + + self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') + + # self.DPS.write_register(0x09, 0) # DPS5005 off + self.pin0.value = False + self.pin1.value = False + + return vab, polarity, Rab + + @staticmethod + def _find_identical_in_line(quads): + """Finds quadrupole where A and B are identical. + If A and B are connected to the same electrode, the Pi burns (short-circuit). + + Parameters + ---------- + quads : numpy.ndarray + List of quadrupoles of shape nquad x 4 or 1D vector of shape nquad. + + Returns + ------- + output : numpy.ndarray 1D array of int + List of index of rows where A and B are identical. + """ + + # if we have a 1D array (so only 1 quadrupole), make it a 2D array + if len(quads.shape) == 1: + quads = quads[None, :] + + output = np.where(quads[:, 0] == quads[:, 1])[0] + + return output + + def _gain_auto(self, channel): + """Automatically sets the gain on a channel + + Parameters + ---------- + channel : ads.ADS1x15 + Instance of ADS where voltage is measured. + + Returns + ------- + gain : float + Gain to be applied on ADS1115. + """ + + gain = 2 / 3 + if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.0): + gain = 2 + elif (abs(channel.voltage) < 1.0) and (abs(channel.voltage) >= 0.500): + gain = 4 + elif (abs(channel.voltage) < 0.500) and (abs(channel.voltage) >= 0.250): + gain = 8 + elif abs(channel.voltage) < 0.250: + gain = 16 + self.exec_logger.debug(f'Setting gain to {gain}') + return gain + + def get_data(self, survey_names=None, cmd_id=None): + """Get available data. + + Parameters + ---------- + survey_names : list of str, optional + List of filenames already available from the html interface. So + their content won't be returned again. Only files not in the list + will be read. + cmd_id : str, optional + Unique command identifier + """ + # get all .csv file in data folder + if survey_names is None: + survey_names = [] + fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] + ddic = {} + if cmd_id is None: + cmd_id = 'unknown' + for fname in fnames: + if ((fname != 'readme.txt') + and ('_rs' not in fname) + and (fname.replace('.csv', '') not in survey_names)): + try: + data = np.loadtxt('data/' + fname, delimiter=',', + skiprows=1, usecols=(1, 2, 3, 4, 8)) + data = data[None, :] if len(data.shape) == 1 else data + ddic[fname.replace('.csv', '')] = { + 'a': data[:, 0].astype(int).tolist(), + 'b': data[:, 1].astype(int).tolist(), + 'm': data[:, 2].astype(int).tolist(), + 'n': data[:, 3].astype(int).tolist(), + 'rho': data[:, 4].tolist(), + } + except Exception as e: + print(fname, ':', e) + rdic = {'cmd_id': cmd_id, 'data': ddic} + self.data_logger.info(json.dumps(rdic)) + return ddic + + def interrupt(self, cmd_id=None): + """Interrupts the acquisition + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + """ + self.status = 'stopping' + if self.thread is not None: + self.thread.join() + self.exec_logger.debug('Interrupted sequence acquisition...') + else: + self.exec_logger.debug('No sequence measurement thread to interrupt.') + self.exec_logger.debug(f'Status: {self.status}') + + def load_sequence(self, filename: str, cmd_id=None): + """Reads quadrupole sequence from file. + + Parameters + ---------- + filename : str + Path of the .csv or .txt file with A, B, M and N electrodes. + Electrode index start at 1. + cmd_id : str, optional + Unique command identifier + + Returns + ------- + sequence : numpy.array + Array of shape (number quadrupoles * 4). + """ + self.exec_logger.debug(f'Loading sequence {filename}') + sequence = np.loadtxt(filename, delimiter=" ", dtype=np.uint32) # load quadrupole file + + if sequence is not None: + self.exec_logger.debug(f'Sequence of {sequence.shape[0]:d} quadrupoles read.') + + # locate lines where the electrode index exceeds the maximum number of electrodes + test_index_elec = np.array(np.where(sequence > self.max_elec)) + + # locate lines where electrode A == electrode B + test_same_elec = self._find_identical_in_line(sequence) + + # if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2) + if test_index_elec.size != 0: + for i in range(len(test_index_elec[0, :])): + self.exec_logger.error(f'An electrode index at line {str(test_index_elec[0, i] + 1)} ' + f'exceeds the maximum number of electrodes') + # sys.exit(1) + sequence = None + elif len(test_same_elec) != 0: + for i in range(len(test_same_elec)): + self.exec_logger.error(f'An electrode index A == B detected at line {str(test_same_elec[i] + 1)}') + # sys.exit(1) + sequence = None + + if sequence is not None: + self.exec_logger.info(f'Sequence {filename} of {sequence.shape[0]:d} quadrupoles loaded.') + else: + self.exec_logger.warning(f'Unable to load sequence {filename}') + self.sequence = sequence + + def measure(self, **kwargs): + warnings.warn('This function is deprecated. Use run_multiple_sequences() instead.', DeprecationWarning) + self.run_multiple_sequences(**kwargs) + + def _process_commands(self, message: str): + """Processes commands received from the controller(s) + + Parameters + ---------- + message : str + message containing a command and arguments or keywords and arguments + """ + status = False + cmd_id = '?' + try: + decoded_message = json.loads(message) + self.exec_logger.debug(f'Decoded message {decoded_message}') + cmd_id = decoded_message.pop('cmd_id', None) + cmd = decoded_message.pop('cmd', None) + # args = decoded_message.pop('args', None) + # if args is not None: + # if len(args) != 0: + # if args[0] != '[': + # args = f'["{args}"]' + # self.exec_logger.debug(f'args to decode: {args}') + # args = json.loads(args) if args != '[]' else None + # self.exec_logger.debug(f'Decoded args {args}') + # else: + # args = None + kwargs = decoded_message.pop('kwargs', None) + # if kwargs is not None: + # if len(kwargs) != 0: + # if kwargs[0] != '{': + # kwargs = '{"' + kwargs + '"}' + # self.exec_logger.debug(f'kwargs to decode: {kwargs}') + # kwargs = json.loads(kwargs) if kwargs != '' else None + # self.exec_logger.debug(f'Decoded kwargs {kwargs}') + # else: + # kwargs = None + self.exec_logger.debug(f"Calling method {cmd}({str(kwargs) if kwargs is not None else ''})") + # self.exec_logger.debug(f"Calling method {cmd}({str(args) + ', ' if args is not None else ''}" + # f"{str(kwargs) if kwargs is not None else ''})") + if cmd_id is None: + self.exec_logger.warning('You should use a unique identifier for cmd_id') + if cmd is not None: + try: + # if args is None: + # if kwargs is None: + # output = getattr(self, cmd)() + # else: + # output = getattr(self, cmd)(**kwargs) + # else: + if kwargs is None: + output = getattr(self, cmd)() + else: + output = getattr(self, cmd)(**kwargs) + status = True + except Exception as e: + self.exec_logger.error( + f"Unable to execute {cmd}({str(kwargs) if kwargs is not None else ''}): {e}") + status = False + except Exception as e: + self.exec_logger.warning(f'Unable to decode command {message}: {e}') + status = False + finally: + reply = {'cmd_id': cmd_id, 'status': status} + reply = json.dumps(reply) + self.exec_logger.debug(f'Execution report: {reply}') + + def quit(self, cmd_id=None): + """Quits OhmPi + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + """ + + self.exec_logger.debug(f'Quitting ohmpi.py following command {cmd_id}') + exit() + + def _read_hardware_config(self): + """Reads hardware configuration from config.py + """ + self.exec_logger.debug('Getting hardware config') + self.id = OHMPI_CONFIG['id'] # ID of the OhmPi + self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm + self.Imax = OHMPI_CONFIG['Imax'] # maximum current + self.exec_logger.debug(f'The maximum current cannot be higher than {self.Imax} mA') + self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V + self.nb_samples = OHMPI_CONFIG['nb_samples'] # number of samples measured for each stack + self.version = OHMPI_CONFIG['version'] # hardware version + self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes + self.board_addresses = OHMPI_CONFIG['board_addresses'] + self.board_version = OHMPI_CONFIG['board_version'] + self.mcp_board_address = OHMPI_CONFIG['mcp_board_address'] + self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') + + def read_quad(self, **kwargs): + warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) + self.load_sequence(**kwargs) + + def _read_voltage(self): + pass + + def remove_data(self, cmd_id=None): + """Remove all data in the data folder + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + """ + self.exec_logger.debug(f'Removing all data following command {cmd_id}') + shutil.rmtree('data') + os.mkdir('data') + + def restart(self, cmd_id=None): + """Restarts the Raspberry Pi + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + """ + + if self.on_pi: + self.exec_logger.info(f'Restarting pi following command {cmd_id}...') + os.system('reboot') + else: + self.exec_logger.warning('Not on Raspberry Pi, skipping reboot...') + + def run_measurement(self, quad=None, nb_stack=None, injection_duration=None, + autogain=True, strategy='constant', tx_volt=5., best_tx_injtime=0.1, + cmd_id=None): + """Measures on a quadrupole and returns transfer resistance. + + Parameters + ---------- + quad : iterable (list of int) + Quadrupole to measure, just for labelling. Only switch_mux_on/off + really create the route to the electrodes. + nb_stack : int, optional + Number of stacks. A stacl is considered two half-cycles (one + positive, one negative). + injection_duration : int, optional + Injection time in seconds. + autogain : bool, optional + If True, will adapt the gain of the ADS1115 to maximize the + resolution of the reading. + strategy : str, optional + (V3.0 only) If we search for best voltage (tx_volt == 0), we can choose + vmax strategy : find the highest voltage that stays in the range + For a constant value, just set the tx_volt. + tx_volt : float, optional + (V3.0 only) If specified, voltage will be imposed. If 0, we will look + for the best voltage. If the best Tx cannot be found, no + measurement will be taken and values will be NaN. + best_tx_injtime : float, optional + (V3.0 only) Injection time in seconds used for finding the best voltage. + cmd_id : str, optional + Unique command identifier + """ + self.exec_logger.debug('Starting measurement') + self.exec_logger.debug('Waiting for data') + + # check arguments + if quad is None: + quad = [0, 0, 0, 0] + + if self.on_pi: + if nb_stack is None: + nb_stack = self.settings['nb_stack'] + if injection_duration is None: + injection_duration = self.settings['injection_duration'] + tx_volt = float(tx_volt) + + # inner variable initialization + sum_i = 0 + sum_vmn = 0 + sum_ps = 0 + + # let's define the pin again as if we run through measure() + # as it's run in another thread, it doesn't consider these + # and this can lead to short circuit! + + self.pin0 = self.mcp_board.get_pin(0) + self.pin0.direction = Direction.OUTPUT + self.pin0.value = False + self.pin1 = self.mcp_board.get_pin(1) + self.pin1.direction = Direction.OUTPUT + self.pin1.value = False + self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement + self.pin7.direction = Direction.OUTPUT + self.pin7.value = False + + if self.sequence is None: + if self.idps: + + # self.switch_dps('on') + self.pin2 = self.mcp_board.get_pin(2) # dsp + + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp_board.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + time.sleep(4) + + self.pin5 = self.mcp_board.get_pin(5) #IHM on mesaurement + self.pin5.direction = Direction.OUTPUT + self.pin5.value = True + self.pin6 = self.mcp_board.get_pin(6) #IHM on mesaurement + self.pin6.direction = Direction.OUTPUT + self.pin6.value = False + self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement + self.pin7.direction = Direction.OUTPUT + self.pin7.value = False + if self.idps: + if self.DPS.read_register(0x05,2) < 11: + self.pin7.value = True# max current allowed (100 mA for relays) #voltage + + # get best voltage to inject AND polarity + if self.idps: + tx_volt, polarity, Rab = self._compute_tx_volt( + best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) + self.exec_logger.debug(f'Best VAB found is {tx_volt:.3f}V') + else: + polarity = 1 + Rab = None + + # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, + address=self.ads_current_address, mode=0) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, + address=self.ads_voltage_address, mode=0) + # turn on the power supply + start_delay = None + end_delay = None + out_of_range = False + if self.idps: + if not np.isnan(tx_volt): + self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(0.3) + else: + self.exec_logger.debug('No best voltage found, will not take measurement') + out_of_range = True + + if not out_of_range: # we found a Vab in the range so we measure + if autogain: + + # compute autogain + gain_voltage = [] + for n in [0,1]: # make short cycle for gain computation + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, + address=self.ads_voltage_address, mode=0) + if n == 0: + self.pin0.value = True + self.pin1.value = False + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on + else: + self.pin0.value = False + self.pin1.value = True # current injection nr2 + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on + + time.sleep(injection_duration) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + + if polarity > 0: + if n == 0: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + else: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + else: + if n == 0: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + else: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + + self.pin0.value = False + self.pin1.value = False + time.sleep(injection_duration) + if n == 0: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + else: + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + if self.board_version == 'mb.2023.0.0': + self.pin6.value = False # IHM current injection led off + + self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, ' + f'{gain_voltage[1]:.3f}') + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, + address=self.ads_current_address, mode=0) + + self.pin0.value = False + self.pin1.value = False + + # one stack = 2 half-cycles (one positive, one negative) + pinMN = 0 if polarity > 0 else 2 # noqa + + # sampling for each stack at the end of the injection + sampling_interval = 10 # ms # TODO: make this a config option + self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy + + # full data for waveform + fulldata = [] + + # we sample every 10 ms (as using AnalogIn for both current + # and voltage takes about 7 ms). When we go over the injection + # duration, we break the loop and truncate the meas arrays + # only the last values in meas will be taken into account + start_time = time.time() # start counter + for n in range(0, nb_stack * 2): # for each half-cycles + # current injection + if (n % 2) == 0: + self.pin0.value = True + self.pin1.value = False + if autogain: # select gain computed on first half cycle + self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860, + address=self.ads_voltage_address, mode=0) + else: + self.pin0.value = False + self.pin1.value = True # current injection nr2 + if autogain: # select gain computed on first half cycle + self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage),data_rate=860, + address=self.ads_voltage_address, mode=0) + self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') + if self.board_version == 'mb.2023.0.0': + self.pin6.value = True # IHM current injection led on + # measurement of current i and voltage u during injection + meas = np.zeros((self.nb_samples, 3)) * np.nan + start_delay = time.time() # stating measurement time + dt = 0 + k = 0 + for k in range(0, self.nb_samples): + # reading current value on ADS channels + meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) + if self.board_version == 'mb.2023.0.0': + if pinMN == 0: + meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 + else: + meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 + elif self.board_version == '22.10': + meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 + # else: + # self.exec_logger.debug('Unknown board') + time.sleep(sampling_interval / 1000) + dt = time.time() - start_delay # real injection time (s) + meas[k, 2] = time.time() - start_time + if dt > (injection_duration - 0 * sampling_interval / 1000.): + break + + # stop current injection + self.pin0.value = False + self.pin1.value = False +# if autogain: # select gain computed on first half cycle +# self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[2],data_rate=860, +# address=self.ads_voltage_address, mode=0) + self.pin6.value = False# IHM current injection led on + end_delay = time.time() + + # truncate the meas array if we didn't fill the last samples #TODO: check why + meas = meas[:k + 1] + + # measurement of current i and voltage u during off time + measpp = np.zeros((meas.shape[0], 3)) * np.nan + start_delay = time.time() # stating measurement time + dt = 0 + for k in range(0, measpp.shape[0]): + # reading current value on ADS channels + measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt) + if self.board_version == 'mb.2023.0.0': + if pinMN == 0: + measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + else: + measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1 + elif self.board_version == '22.10': + measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000. + else: + self.exec_logger.debug('unknown board') + time.sleep(sampling_interval / 1000) + dt = time.time() - start_delay # real injection time (s) + measpp[k, 2] = time.time() - start_time + if dt > (injection_duration - 0 * sampling_interval / 1000.): + break + + end_delay = time.time() + + # truncate the meas array if we didn't fill the last samples + measpp = measpp[:k + 1] + + # we alternate on which ADS1115 pin we measure because of sign of voltage + if pinMN == 0: + pinMN = 2 # noqa + else: + pinMN = 0 # noqa + + # store data for full wave form + fulldata.append(meas) + fulldata.append(measpp) + + # TODO get battery voltage and warn if battery is running low + # TODO send a message on SOH stating the battery level + + # let's do some calculation (out of the stacking loop) + + # i_stack = np.empty(2 * nb_stack, dtype=object) + # vmn_stack = np.empty(2 * nb_stack, dtype=object) + i_stack, vmn_stack = [], [] + # select appropriate window length to average the readings + window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3) + for n, meas in enumerate(fulldata[::2]): + # take average from the samples per stack, then sum them all + # average for the last third of the stacked values + # is done outside the loop + i_stack.append(meas[-int(window):, 0]) + vmn_stack.append(meas[-int(window):, 1]) + + sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) + vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) + if (n % 2) == 0: + sum_vmn = sum_vmn - vmn1 + sum_ps = sum_ps + vmn1 + else: + sum_vmn = sum_vmn + vmn1 + sum_ps = sum_ps + vmn1 + + else: + sum_i = np.nan + sum_vmn = np.nan + sum_ps = np.nan + fulldata = None + + if self.idps: + self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt + self.DPS.write_register(0x09, 0) # DPS5005 off + + # reshape full data to an array of good size + # we need an array of regular size to save in the csv + if not out_of_range: + fulldata = np.vstack(fulldata) + # we create a big enough array given nb_samples, number of + # half-cycles (1 stack = 2 half-cycles), and twice as we + # measure decay as well + a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 3)) * np.nan + a[:fulldata.shape[0], :] = fulldata + fulldata = a + else: + np.array([[]]) + + vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) + vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) + i_stack_mean = np.mean(i_stack) + i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) + r_stack_mean = vmn_stack_mean / i_stack_mean + r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean + ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) + + # create a dictionary and compute averaged values from all stacks + # if self.board_version == 'mb.2023.0.0': + d = { + "time": datetime.now().isoformat(), + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], + "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + "Vmn [mV]": sum_vmn / (2 * nb_stack), + "I [mA]": sum_i / (2 * nb_stack), + "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": sum_ps / (2 * nb_stack), + "nbStack": nb_stack, + "Tx [V]": tx_volt if not out_of_range else 0., + "CPU temp [degC]": CPUTemperature().temperature, + "Nb samples [-]": self.nb_samples, + "fulldata": fulldata, + "I_stack [mA]": i_stack_mean, + "I_std [mA]": i_std, + "I_per_stack [mA]": np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), + "Vmn_stack [mV]": vmn_stack_mean, + "Vmn_std [mV]": vmn_std, + "Vmn_per_stack [mV]": np.array([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)]), + "R_stack [ohm]": r_stack_mean, + "R_std [ohm]": r_stack_std, + "R_per_stack [Ohm]": np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), + "PS_per_stack [mV]": np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)]), + "PS_stack [mV]": ps_stack_mean, + "R_ab [ohm]": Rab + } + # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)])) + # elif self.board_version == '22.10': + # d = { + # "time": datetime.now().isoformat(), + # "A": quad[0], + # "B": quad[1], + # "M": quad[2], + # "N": quad[3], + # "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., + # "Vmn [mV]": sum_vmn / (2 * nb_stack), + # "I [mA]": sum_i / (2 * nb_stack), + # "R [ohm]": sum_vmn / sum_i, + # "Ps [mV]": sum_ps / (2 * nb_stack), + # "nbStack": nb_stack, + # "Tx [V]": tx_volt if not out_of_range else 0., + # "CPU temp [degC]": CPUTemperature().temperature, + # "Nb samples [-]": self.nb_samples, + # "fulldata": fulldata, + # } + + else: # for testing, generate random data + d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], + 'R [ohm]': np.abs(np.random.randn(1)).tolist()} + + # to the data logger + dd = d.copy() + dd.pop('fulldata') # too much for logger + dd.update({'A': str(dd['A'])}) + dd.update({'B': str(dd['B'])}) + dd.update({'M': str(dd['M'])}) + dd.update({'N': str(dd['N'])}) + + # round float to 2 decimal + for key in dd.keys(): + if isinstance(dd[key], float): + dd[key] = np.round(dd[key], 3) + + dd['cmd_id'] = str(cmd_id) + self.data_logger.info(dd) + self.pin5.value = False #IHM led on measurement off + if self.sequence is None : + self.switch_dps('off') + + return d + + def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs): + """Runs multiple sequences in a separate thread for monitoring mode. + Can be stopped by 'OhmPi.interrupt()'. + Additional arguments are passed to run_measurement(). + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + sequence_delay : int, optional + Number of seconds at which the sequence must be started from each others. + nb_meas : int, optional + Number of time the sequence must be repeated. + kwargs : dict, optional + See help(k.run_measurement) for more info. + """ + # self.run = True + if sequence_delay is None: + sequence_delay = self.settings['sequence_delay'] + sequence_delay = int(sequence_delay) + if nb_meas is None: + nb_meas = self.settings['nb_meas'] + self.status = 'running' + self.exec_logger.debug(f'Status: {self.status}') + self.exec_logger.debug(f'Measuring sequence: {self.sequence}') + + def func(): + for g in range(0, nb_meas): # for time-lapse monitoring + if self.status == 'stopping': + self.exec_logger.warning('Data acquisition interrupted') + break + t0 = time.time() + self.run_sequence(**kwargs) + + # sleeping time between sequence + dt = sequence_delay - (time.time() - t0) + if dt < 0: + dt = 0 + if nb_meas > 1: + time.sleep(dt) # waiting for next measurement (time-lapse) + self.status = 'idle' + + self.thread = threading.Thread(target=func) + self.thread.start() + + def run_sequence(self, cmd_id=None, **kwargs): + """Runs sequence synchronously (=blocking on main thread). + Additional arguments are passed to run_measurement(). + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + """ + self.status = 'running' + self.exec_logger.debug(f'Status: {self.status}') + self.exec_logger.debug(f'Measuring sequence: {self.sequence}') + t0 = time.time() + self.reset_mux() + + # create filename with timestamp + filename = self.settings["export_path"].replace('.csv', + f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') + self.exec_logger.debug(f'Saving to {filename}') + + # make sure all multiplexer are off + + + # measure all quadrupole of the sequence + if self.sequence is None: + n = 1 + else: + n = self.sequence.shape[0] + for i in range(0, n): + if self.sequence is None: + quad = np.array([0, 0, 0, 0]) + else: + quad = self.sequence[i, :] # quadrupole + if self.status == 'stopping': + break + if i == 0: + # call the switch_mux function to switch to the right electrodes + # switch on DPS + self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address) + self.pin2 = self.mcp_board.get_pin(2) # dsp - + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp_board.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + time.sleep (4) + + #self.switch_dps('on') + time.sleep(.6) + self.switch_mux_on(quad) + # run a measurement + if self.on_pi: + acquired_data = self.run_measurement(quad, **kwargs) + else: # for testing, generate random data + sum_vmn = np.random.rand(1)[0] * 1000. + sum_i = np.random.rand(1)[0] * 100. + cmd_id = np.random.randint(1000) + acquired_data = { + "time": datetime.now().isoformat(), + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], + "inj time [ms]": self.settings['injection_duration'] * 1000., + "Vmn [mV]": sum_vmn, + "I [mA]": sum_i, + "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": np.random.randn(1)[0] * 100., + "nbStack": self.settings['nb_stack'], + "Tx [V]": np.random.randn(1)[0] * 5., + "CPU temp [degC]": np.random.randn(1)[0] * 50., + "Nb samples [-]": self.nb_samples, + } + self.data_logger.info(acquired_data) + + # switch mux off + self.switch_mux_off(quad) + + # add command_id in dataset + acquired_data.update({'cmd_id': cmd_id}) + # log data to the data logger + # self.data_logger.info(f'{acquired_data}') + # save data and print in a text file + self.append_and_save(filename, acquired_data) + self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') + + self.switch_dps('off') + self.status = 'idle' + + def run_sequence_async(self, cmd_id=None, **kwargs): + """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. + Additional arguments are passed to run_measurement(). + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + """ + + def func(): + self.run_sequence(**kwargs) + + self.thread = threading.Thread(target=func) + self.thread.start() + self.status = 'idle' + + def rs_check(self, tx_volt=12., cmd_id=None): + """Checks contact resistances + + Parameters + ---------- + tx_volt : float + Voltage of the injection + cmd_id : str, optional + Unique command identifier + """ + # create custom sequence where MN == AB + # we only check the electrodes which are in the sequence (not all might be connected) + if self.sequence is None or not self.use_mux: + quads = np.array([[1, 2, 1, 2]], dtype=np.uint32) + else: + elec = np.sort(np.unique(self.sequence.flatten())) # assumed order + quads = np.vstack([ + elec[:-1], + elec[1:], + elec[:-1], + elec[1:], + ]).T + if self.idps: + quads[:, 2:] = 0 # we don't open Vmn to prevent burning the MN part + # as it has a smaller range of accepted voltage + + # create filename to store RS + export_path_rs = self.settings['export_path'].replace('.csv', '') \ + + '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '_rs.csv' + + # perform RS check + # self.run = True + self.status = 'running' + + if self.on_pi: + # make sure all mux are off to start with + self.reset_mux() + + # measure all quad of the RS sequence + for i in range(0, quads.shape[0]): + quad = quads[i, :] # quadrupole + self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c) + d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=0.2, tx_volt=tx_volt, autogain=False) + + if self.idps: + voltage = tx_volt * 1000. # imposed voltage on dps5005 + else: + voltage = d['Vmn [mV]'] + current = d['I [mA]'] + + # compute resistance measured (= contact resistance) + resist = abs(voltage / current) / 1000. + # print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format( + # current, voltage, resist)) + msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \ + f'V: {voltage :>10.3f} mV, ' \ + f'R: {resist :>10.3f} kOhm' + + self.exec_logger.debug(msg) + + # if contact resistance = 0 -> we have a short circuit!! + if resist < 1e-5: + msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm' + self.exec_logger.warning(msg) + + # save data in a text file + self.append_and_save(export_path_rs, { + 'A': quad[0], + 'B': quad[1], + 'RS [kOhm]': resist, + }) + + # close mux path and put pin back to GND + self.switch_mux_off(quad) + else: + pass + self.status = 'idle' + + # + # # TODO if interrupted, we would need to restore the values + # # TODO or we offer the possibility in 'run_measurement' to have rs_check each time? + + def set_sequence(self, sequence=None, cmd_id=None): + """Sets the sequence to acquire + + Parameters + ---------- + sequence : list, str + sequence of quadrupoles + cmd_id: str, optional + Unique command identifier + """ + try: + self.sequence = np.array(sequence).astype(int) + # self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32') + status = True + except Exception as e: + self.exec_logger.warning(f'Unable to set sequence: {e}') + status = False + + def stop(self, **kwargs): + warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning) + self.interrupt(**kwargs) + + def _switch_mux(self, electrode_nr, state, role): + """Selects the right channel for the multiplexer cascade for a given electrode. + + Parameters + ---------- + electrode_nr : int + Electrode index to be switched on or off. + state : str + Either 'on' or 'off'. + role : str + Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. + """ + + if not self.use_mux or not self.on_pi: + if not self.on_pi: + self.exec_logger.warning('Cannot reset mux while in simulation mode...') + else: + self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' + ' Set use_mux to True to use the multiplexer...') + elif self.sequence is None and not self.use_mux: + self.exec_logger.warning('Unable to switch MUX without a sequence') + else: + # choose with MUX board + tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) + + # find I2C address of the electrode and corresponding relay + # considering that one MCP23017 can cover 16 electrodes + i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division + relay_nr = (electrode_nr-1) - ((electrode_nr-1) // 16) * 16 + + if i2c_address is not None: + # select the MCP23017 of the selected MUX board + mcp2 = MCP23017(tca[i2c_address]) + mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT + + if state == 'on': + mcp2.get_pin(relay_nr).value = True + else: + mcp2.get_pin(relay_nr).value = False + + self.exec_logger.debug(f'Switching relay {relay_nr} ' + f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') + else: + self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') + + def switch_dps(self,state='off'): + """Switches DPS on or off. + + Parameters + ---------- + state : str + 'on', 'off' + """ + self.pin2 = self.mcp_board.get_pin(2) # dsp - + self.pin2.direction = Direction.OUTPUT + self.pin3 = self.mcp_board.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + if state == 'on': + self.pin2.value = True + self.pin3.value = True + self.exec_logger.debug(f'Switching DPS on') + time.sleep(4) + elif state == 'off': + self.pin2.value = False + self.pin3.value = False + self.exec_logger.debug(f'Switching DPS off') + + + def switch_mux_on(self, quadrupole, cmd_id=None): + """Switches on multiplexer relays for given quadrupole. + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + quadrupole : list of 4 int + List of 4 integers representing the electrode numbers. + """ + roles = ['A', 'B', 'M', 'N'] + # another check to be sure A != B + if quadrupole[0] != quadrupole[1]: + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'on', roles[i]) + else: + self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') + + def switch_mux_off(self, quadrupole, cmd_id=None): + """Switches off multiplexer relays for given quadrupole. + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + quadrupole : list of 4 int + List of 4 integers representing the electrode numbers. + """ + roles = ['A', 'B', 'M', 'N'] + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'off', roles[i]) + + def test_mux(self, activation_time=1.0, address=0x70): + """Interactive method to test the multiplexer. + + Parameters + ---------- + activation_time : float, optional + Time in seconds during which the relays are activated. + address : hex, optional + Address of the multiplexer board to test (e.g. 0x70, 0x71, ...). + """ + self.use_mux = True + self.reset_mux() + + # choose with MUX board + tca = adafruit_tca9548a.TCA9548A(self.i2c, address) + + # ask use some details on how to proceed + a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!') + if a == '1': + print('run channel by channel test') + electrode = int(input('Choose your electrode number (integer):')) + electrodes = [electrode] + elif a == '2': + electrodes = range(1, 65) + else: + print('Wrong choice !') + return + + # run the test + for electrode_nr in electrodes: + # find I2C address of the electrode and corresponding relay + # considering that one MCP23017 can cover 16 electrodes + i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division + relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 + + if i2c_address is not None: + # select the MCP23017 of the selected MUX board + mcp2 = MCP23017(tca[i2c_address]) + mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT + + # activate relay for given time + mcp2.get_pin(relay_nr - 1).value = True + print('electrode:', electrode_nr, ' activated...', end='', flush=True) + time.sleep(activation_time) + mcp2.get_pin(relay_nr - 1).value = False + print(' deactivated') + time.sleep(activation_time) + print('Test finished.') + + def reset_mux(self, cmd_id=None): + """Switches off all multiplexer relays. + + Parameters + ---------- + cmd_id : str, optional + Unique command identifier + """ + if self.on_pi and self.use_mux: + roles = ['A', 'B', 'M', 'N'] + for i in range(0, 4): + for j in range(1, self.max_elec + 1): + self._switch_mux(j, 'off', roles[i]) + self.exec_logger.debug('All MUX switched off.') + elif not self.on_pi: + self.exec_logger.warning('Cannot reset mux while in simulation mode...') + else: + self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' + ' Set use_mux to True to use the multiplexer...') + + def _update_acquisition_settings(self, config): + warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) + self.update_settings(settings=config) + + def update_settings(self, settings: str, cmd_id=None): + """Updates acquisition settings from a json file or dictionary. + Parameters can be: + - nb_electrodes (number of electrode used, if 4, no MUX needed) + - injection_duration (in seconds) + - nb_meas (total number of times the sequence will be run) + - sequence_delay (delay in second between each sequence run) + - nb_stack (number of stack for each quadrupole measurement) + - export_path (path where to export the data, timestamp will be added to filename) + + Parameters + ---------- + settings : str, dict + Path to the .json settings file or dictionary of settings. + cmd_id : str, optional + Unique command identifier + """ + status = False + if settings is not None: + try: + if isinstance(settings, dict): + self.settings.update(settings) + else: + with open(settings) as json_file: + dic = json.load(json_file) + self.settings.update(dic) + self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings)) + status = True + except Exception as e: # noqa + self.exec_logger.warning('Unable to update settings.') + status = False + else: + self.exec_logger.warning('Settings are missing...') + return status + + # Properties + @property + def sequence(self): + """Gets sequence""" + if self._sequence is not None: + assert isinstance(self._sequence, np.ndarray) + return self._sequence + + @sequence.setter + def sequence(self, sequence): + """Sets sequence""" + if sequence is not None: + assert isinstance(sequence, np.ndarray) + self.use_mux = True + else: + self.use_mux = False + self._sequence = sequence + + +VERSION = '2.1.5' + +print(colored(r' ________________________________' + '\n' + + r'| _ | | | || \/ || ___ \_ _|' + '\n' + + r'| | | | |_| || . . || |_/ / | |' + '\n' + + r'| | | | _ || |\/| || __/ | |' + '\n' + + r'\ \_/ / | | || | | || | _| |_' + '\n' + + r' \___/\_| |_/\_| |_/\_| \___/ ', 'red')) +print('Version:', VERSION) +platform, on_pi = get_platform() + +if on_pi: + print(colored(f'\u2611 Running on {platform} platform', 'green')) + # TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...) + # and emit a warning otherwise + if not arm64_imports: + print(colored(f'Warning: Required packages are missing.\n' + f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow')) +else: + print(colored(f'\u26A0 Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow')) + +current_time = datetime.now() +print(f'local date and time : {current_time.strftime("%Y-%m-%d %H:%M:%S")}') + +# for testing +if __name__ == "__main__": + ohmpi = OhmPi(settings=OHMPI_CONFIG['settings']) + if ohmpi.controller is not None: + ohmpi.controller.loop_forever() diff --git a/test_measure_with_ohmpi_card_3_15.py b/test_measure_with_ohmpi_card_3_15.py index 50e9d5acd4a96eae04799ddea287a9e0b20a3faf..141ec9b461e1f046ab0e7b02a6e77c2c9fc8d38f 100644 --- a/test_measure_with_ohmpi_card_3_15.py +++ b/test_measure_with_ohmpi_card_3_15.py @@ -1,12 +1,13 @@ import numpy as np -import matplotlib.pyplot as plt +# import matplotlib.pyplot as plt from utils import change_config change_config('config_ohmpi_card_3_15.py', verbose=False) from OhmPi.measure import OhmPiHardware k = OhmPiHardware() -k._vab_pulse(vab=12, length=1., sampling_rate=k.rx.sampling_rate, polarity=1) -r = k.readings[:,2]/k.readings[:,1] +k.vab_square_wave(vab=12, length=1., sampling_rate=k.rx.sampling_rate, cycles=3) +r = k.readings[:,4]*k.readings[:,2]/k.readings[:,1] print(f'Mean resistance: {np.mean(r):.3f} Ohms, Dev. {100*np.std(r)/np.mean(r):.1f} %') print(f'sampling rate: {k.rx.sampling_rate:.1f} ms, mean sample spacing: {np.mean(np.diff(k.readings[:,0]))*1000.:.1f} ms') change_config('config_default.py', verbose=False) +print(k.readings) \ No newline at end of file