From 3487e2e0ad69c345861effbb20479273ec1ccc8e Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Sat, 8 Apr 2023 08:25:39 +0200 Subject: [PATCH] Adds more elements in hardware and mb_2024_rev_0_0 TX and RX; introduces a measurement module for the mid-level functionalities --- config.py | 10 +- hardware/hardware.py | 79 ++++++++++-- hardware/mb_2024_rev_0_0.py | 221 ++++++++++++++++++++++++++++---- measure.py | 244 ++++++++++++++++++++++++++++++++++++ ohmpi.py | 8 +- 5 files changed, 519 insertions(+), 43 deletions(-) create mode 100644 measure.py diff --git a/config.py b/config.py index 94ee7598..7c1baee1 100644 --- a/config.py +++ b/config.py @@ -27,21 +27,21 @@ OHMPI_CONFIG = { } # TODO: add a dictionary with INA models and associated gain values HARDWARE_CONFIG = { - {'Controller': {'model' : 'raspberry_pi_3' + {'controller': {'model' : 'raspberry_pi_3' } }, - {'TX' : {'model' : 'mb_2024_rev_0_0', + {'tx' : {'model' : 'mb_2024_rev_0_0', 'mcp_board_address': 0x20, 'Imax': 4800 / 50 / 2, # Maximum current 'R_shunt': 2 # Shunt resistance in Ohms } }, - {'RX' : {'model': 'mb_2024_rev_0_0', + {'rx' : {'model': 'mb_2024_rev_0_0', 'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V 'nb_samples': 20, # Max value 10 # was named integer before... } }, - {'MUX': {'model' : 'mux_2021', + {'mux': {'model' : 'mux_2021', 'max_elec': 64, 'board_addresses': {'A': 0x73, 'B': 0x72, 'M': 0x71, 'N': 0x70}, # CHECK IF YOUR BOARDS HAVE THESE ADDRESSES 'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V @@ -78,7 +78,7 @@ DATA_LOGGING_CONFIG = { SOH_LOGGING_CONFIG = { 'logging_level': logging.INFO, 'logging_to_console': True, - 'file_name': 'soh.log', + 'file_name': f'soh{logging_suffix}.log', 'max_bytes': 16777216, 'backup_count': 1024, 'when': 'd', diff --git a/hardware/hardware.py b/hardware/hardware.py index 64f305d0..5460405c 100644 --- a/hardware/hardware.py +++ b/hardware/hardware.py @@ -1,5 +1,9 @@ from abc import ABC import os +import sys +from time import gmtime +import logging + class ControllerAbstract(ABC): def __init__(self, **kwargs): self.bus = None @@ -11,14 +15,51 @@ class TxAbstract(ABC): def __init__(self, **kwargs): polarity = kwargs.pop('polarity', 1) inj_time = kwargs.pop('inj_time', 1.) - exec_logger = kwargs.pop('exec_logger', None) - soh_logger = kwargs.pop('soh_logger', None) + self.exec_logger = kwargs.pop('exec_logger', None) + self.soh_logger = kwargs.pop('soh_logger', None) + if self.exec_logger is None: + self.exec_logger = logging.getLogger('exec_logger') + log_format = '%(asctime)-15s | exec | %(levelname)s: %(message)s' + exec_formatter = logging.Formatter(log_format) + exec_formatter.converter = gmtime + exec_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' + exec_handler = logging.StreamHandler(sys.stdout) + exec_handler.setFormatter(exec_formatter) + self.exec_logger.addHandler(exec_handler) + self.exec_logger.setLevel('debug') + if self.soh_logger is None: + self.soh_logger = logging.getLogger('soh_logger') + log_format = '%(asctime)-15s | soh | %(levelname)s: %(message)s' + soh_formatter = logging.Formatter(log_format) + soh_formatter.converter = gmtime + soh_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' + soh_handler = logging.StreamHandler(sys.stdout) + soh_handler.setFormatter(soh_formatter) + self.soh_logger.addHandler(soh_handler) + self.soh_logger.setLevel('debug') self._polarity = None self._inj_time = None + self._dps_state = 'off' self.polarity = polarity self.inj_time = inj_time - board_name = os.path.basename(__file__) - self.exec_logger.debug(f'TX {board_name} Initialized.') + self.board_name = os.path.basename(__file__) + self.exec_logger.debug(f'TX {self.board_name} initialization') + + @property + def current(self): + # add actions to read the DPS current and return it + return None + + @current.setter + def current(self, value, **kwargs): + # add actions to set the DPS current + pass + + def current_pulse(self, **kwargs): + pass + + def inject(self, state='on'): + assert state in ['on', 'off'] @property def inj_time(self): @@ -35,17 +76,17 @@ class TxAbstract(ABC): @polarity.setter def polarity(self, value): - assert value in [-1,1] + assert value in [-1,0,1] self._polarity = value # add actions to set the polarity (switch relays) def turn_off(self): - # add actions to turn the DPS off - pass + self.exec_logger.debug(f'Switching DPS off') + self._dps_state = 'off' def turn_on(self): - # add actions to turn the DPS on - pass + self.exec_logger.debug(f'Switching DPS on') + self._dps_state = 'on' @property def voltage(self): @@ -57,9 +98,25 @@ class TxAbstract(ABC): # add actions to set the DPS voltage pass - def current(self, **kwargs): + def voltage_pulse(self, voltage, length, polarity): + """ Generates a square voltage pulse + + Parameters + ---------- + voltage: float, optional + Voltage to apply in volts, tx_v_def is applied if omitted. + length: float, optional + Length of the pulse in seconds + polarity: 1,0,-1 + Polarity of the pulse + """ pass class RxAbstract(ABC): - pass + def __init__(self, **kwargs): + self.exec_logger = kwargs.pop('exec_logger', None) + self.soh_logger = kwargs.pop('soh_logger', None) + self.board_name = os.path.basename(__file__) + self.exec_logger.debug(f'RX {self.board_name} initialization') + diff --git a/hardware/mb_2024_rev_0_0.py b/hardware/mb_2024_rev_0_0.py index 478e7211..c547165d 100644 --- a/hardware/mb_2024_rev_0_0.py +++ b/hardware/mb_2024_rev_0_0.py @@ -6,62 +6,239 @@ from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa from digitalio import Direction # noqa import minimalmodbus # noqa import time +import numpy as np from hardware import TxAbstract, RxAbstract controller_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["controller"]["model"]}') + TX_CONFIG = OHMPI_CONFIG['hardware']['TX'] +RX_CONFIG = OHMPI_CONFIG['hardware']['RX'] + +# hardware limits +voltage_min = 10. # mV +voltage_max = 4500. +RX_CONFIG['voltage_min'] = voltage_min # mV +RX_CONFIG['voltage_max'] = voltage_max +TX_CONFIG['current_min'] = voltage_min / (TX_CONFIG['R_shunt'] * 50) # mA +TX_CONFIG['current_max'] = voltage_max / (TX_CONFIG['R_shunt'] * 50) +TX_CONFIG['default_voltage'] = 5. # V +TX_CONFIG['voltage_max'] = 50. # V +TX_CONFIG['dps_switch_on_warm_up'] = 4. # 4 seconds + +def _gain_auto(channel): + """Automatically sets the gain on a channel + + Parameters + ---------- + channel : ads.ADS1x15 + Instance of ADS where voltage is measured. + + Returns + ------- + gain : float + Gain to be applied on ADS1115. + """ + + gain = 2 / 3 + if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.0): + gain = 2 + elif (abs(channel.voltage) < 1.0) and (abs(channel.voltage) >= 0.500): + gain = 4 + elif (abs(channel.voltage) < 0.500) and (abs(channel.voltage) >= 0.250): + gain = 8 + elif abs(channel.voltage) < 0.250: + gain = 16 + return gain -class TX(TxAbstract): +class Tx(TxAbstract): def __init__(self, **kwargs): - self.controller = kwargs.pop('controller', controller_module.Controller()) super().__init__(**kwargs) + self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage']) + self.controller = kwargs.pop('controller', controller_module.Controller()) # I2C connexion to MCP23008, for current injection self.mcp_board = MCP23008(self.controller.bus, address=TX_CONFIG['mcp_board_address']) - self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run - self.pin4.direction = Direction.OUTPUT - self.pin4.value = True # ADS1115 for current measurement (AB) - self.ads_current = ads.ADS1115(self.controller.bus, gain=2/3, data_rate=860, address=0x48) + self._adc_gain = 2/3 + self.ads_current_address = 0x48 + self.ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + address=self.ads_current_address) + + # Relays for pulse polarity + self.pin0 = self.mcp_board.get_pin(0) + self.pin0.direction = Direction.OUTPUT + self.pin1 = self.mcp_board.get_pin(1) + self.pin1.direction = Direction.OUTPUT + self.polarity = 0 # DPH 5005 Digital Power Supply self.pin2 = self.mcp_board.get_pin(2) # dps + self.pin2.direction = Direction.OUTPUT - self.pin2.value = True self.pin3 = self.mcp_board.get_pin(3) # dps - self.pin3.direction = Direction.OUTPUT - self.pin3.value = True - time.sleep(4) + self.turn_on() + time.sleep(TX_CONFIG['dps_switch_on_warm_up']) self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc self.DPS.serial.bytesize = 8 # - self.DPS.serial.timeout = 1 # greater than 0.5 for it to work + self.DPS.serial.timeout = 1. # greater than 0.5 for it to work self.DPS.debug = False # self.DPS.serial.parity = 'N' # No parity self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode self.DPS.write_register(0x0001, 1000, 0) # max current allowed (100 mA for relays) # (last number) 0 is for mA, 3 is for A - # self.soh_logger.debug(f'Battery voltage: {self.DPS.read_register(0x05,2 ):.3f}') TODO: SOH logger - print(self.DPS.read_register(0x05, 2)) - self.switch_dps('off') + # I2C connexion to MCP23008, for current injection + self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run + self.pin4.direction = Direction.OUTPUT + self.pin4.value = True - def turn_on(self): - self.pin2.value = True - self.pin3.value = True - self.exec_logger.debug(f'Switching DPS on') - time.sleep(4) + tx_bat = self.DPS.read_register(0x05, 2) + if self.exec_logger is not None: + self.exec_logger.info(f'TX battery: {tx_bat:.1f} V') + if tx_bat < 12.: + if self.soh_logger is not None: + self.soh_logger.debug(f'Low TX Battery: {tx_bat:.1f} V') # TODO: SOH logger + self.turn_off() + + @property + def adc_gain(self): + return self._adc_gain + + @adc_gain.setter + def adc_gain(self, value): + assert value in [2/3, 2, 4, 8, 16] + self._adc_gain = value + self.ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + address=self.ads_current_address) + self.exec_logger.debug(f'Setting TX ADC gain to {value}') + + def adc_gain_auto(self): + gain = _gain_auto(AnalogIn(self.ads_current, ads.P0)) + self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}') + self.adc_gain = gain + + def current_pulse(self, **kwargs): + super().current_pulse(**kwargs) + self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board') + + @property + def current(self): + """ Gets the current IAB in Amps + """ + return AnalogIn(self.ads_current, ads.P0).voltage * 1000. / (50 * TX_CONFIG['R_shunt']) # noqa measure current + + @ current.setter + def current(self, value): + assert TX_CONFIG['current_min'] <= value <= TX_CONFIG['current_max'] + self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board') + + def inject(self, state='on'): + super().inject(state=state) + if state=='on': + self.DPS.write_register(0x09, 1) # DPS5005 on + else: + self.DPS.write_register(0x09, 0) # DPS5005 off + + @property + def polarity(self): + return super().polarity + + @polarity.setter + def polarity(self, value): + super().polarity(value) + if value==1: + self.pin0.value = True + self.pin1.value = False + elif value==-1: + self.pin0.value = False + self.pin1.value = True + else: + self.pin0.value = False + self.pin1.value = False + #time.sleep(0.001) # TODO: check max switching time of relays + + @property + def voltage(self): + return self._voltage + @voltage.setter + def voltage(self, value): + if value > TX_CONFIG['voltage_max']: + self.exec_logger.warning(f'Sorry, cannot inject more than {TX_CONFIG["voltage_max"]} V, ' + f'set it back to {TX_CONFIG["default_voltage"]} V (default value).') + value = TX_CONFIG['default_voltage'] + self.DPS.write_register(0x0000, value, 2) def turn_off(self): + super().turn_off() self.pin2.value = False self.pin3.value = False - self.exec_logger.debug(f'Switching DPS off') + def turn_on(self): + super().turn_on() + self.pin2.value = True + self.pin3.value = True + + def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=None): + """ Generates a square voltage pulse + + Parameters + ---------- + voltage: float, optional + Voltage to apply in volts, tx_v_def is applied if omitted. + length: float, optional + Length of the pulse in seconds + polarity: 1,0,-1 + Polarity of the pulse + """ + + if length is None: + length = self.inj_time + if polarity is None: + polarity = self.polarity + self.polarity = polarity + self.voltage(voltage) + self.exec_logger.debug(f'Voltage pulse of {polarity*voltage:.3f} V for {length:.3f} s') + self.inject(state='on') + time.sleep(length) + self.inject(state='off') -class RX(RxAbstract): +class Rx(RxAbstract): def __init__(self, **kwargs): self.controller = kwargs.pop('controller', controller_module.Controller()) + self._adc_gain = [2/3, 2/3] super().__init__(**kwargs) - + self.ads_voltage_address = 0x49 # ADS1115 for voltage measurement (MN) - self.ads_voltage = ads.ADS1115(self.controller.bus, gain=2/3, data_rate=860, address=0x49) \ No newline at end of file + self.ads_voltage = ads.ADS1115(self.controller.bus, gain=2/3, data_rate=860, address=self.ads_voltage_address) + + + @property + def adc_gain(self): + return self._adc_gain + + + @adc_gain.setter + def adc_gain(self, value): + assert value in [2 / 3, 2, 4, 8, 16] + self._adc_gain = value + self.ads_voltage = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + address=self.ads_voltage_address) + self.exec_logger.debug(f'Setting RX ADC gain to {value}') + + + def adc_gain_auto(self): + gain_0 = _gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_2 = _gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain = np.min([gain_0, gain_2])[0] + self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}') + self.adc_gain = gain + + @property + def voltage(self): + """ Gets the voltage VMN in Volts + """ + u0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + u2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + self.exec_logger.debug(f'Reading voltages {u0} V and {u2} V on RX. Returning {np.max([u0, u2])} V') + return np.max([u0,u2]) \ No newline at end of file diff --git a/measure.py b/measure.py new file mode 100644 index 00000000..a65fc18b --- /dev/null +++ b/measure.py @@ -0,0 +1,244 @@ +import importlib +from time import gmtime +import sys +import logging +from config import OHMPI_CONFIG +controller_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["controller"]["model"]}') +tx_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["tx"]["model"]}') +rx_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["rx"]["model"]}') +mux_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["mux"]["model"]}') +TX_CONFIG = tx_module.TX_CONFIG +RX_CONFIG = rx_module.RX_CONFIG + +class OhmPiHardware(): + def __init__(self, **kwargs): + self.tx = kwargs.pop('controller', tx_module.Controller()) + self.rx = kwargs.pop('tx', tx_module.Rx()) + self.tx = kwargs.pop('rx', tx_module.Tx()) + self.rx = kwargs.pop('mux', tx_module.Mux()) + self.exec_logger = kwargs.pop('exec_logger', None) + self.data_logger = kwargs.pop('exec_logger', None) + self.soh_logger = kwargs.pop('soh_logger', None) + if self.exec_logger is None: + self.exec_logger = logging.getLogger('exec_logger') + log_format = '%(asctime)-15s | exec | %(levelname)s: %(message)s' + exec_formatter = logging.Formatter(log_format) + exec_formatter.converter = gmtime + exec_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' + exec_handler = logging.StreamHandler(sys.stdout) + exec_handler.setFormatter(exec_formatter) + self.exec_logger.addHandler(exec_handler) + self.exec_logger.setLevel('debug') + if self.data_logger is None: + self.data_logger = logging.getLogger('data_logger') + log_format = '%(asctime)-15s | data | %(levelname)s: %(message)s' + data_formatter = logging.Formatter(log_format) + data_formatter.converter = gmtime + data_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' + data_handler = logging.StreamHandler(sys.stdout) + data_handler.setFormatter(data_formatter) + self.data_logger.addHandler(data_handler) + self.data_logger.setLevel('debug') + if self.soh_logger is None: + self.soh_logger = logging.getLogger('soh_logger') + log_format = '%(asctime)-15s | soh | %(levelname)s: %(message)s' + soh_formatter = logging.Formatter(log_format) + soh_formatter.converter = gmtime + soh_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' + soh_handler = logging.StreamHandler(sys.stdout) + soh_handler.setFormatter(soh_formatter) + self.soh_logger.addHandler(soh_handler) + self.soh_logger.setLevel('debug') + + def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5): + """Estimates best Tx voltage based on different strategies. + At first a half-cycle is made for a short duration with a fixed + known voltage. This gives us Iab and Rab. We also measure Vmn. + A constant c = vmn/iab is computed (only depends on geometric + factor and ground resistivity, that doesn't change during a + quadrupole). Then depending on the strategy, we compute which + vab to inject to reach the minimum/maximum Iab current or + min/max Vmn. + This function also compute the polarity on Vmn (on which pin + of the ADS1115 we need to measure Vmn to get the positive value). + + Parameters + ---------- + best_tx_injtime : float, optional + Time in milliseconds for the half-cycle used to compute Rab. + strategy : str, optional + Either: + - vmax : compute Vab to reach a maximum Iab and Vmn + - constant : apply given Vab + tx_volt : float, optional + Voltage to apply for guessing the best voltage. 5 V applied + by default. If strategy "constant" is chosen, constant voltage + to applied is "tx_volt". + + Returns + ------- + vab : float + Proposed Vab according to the given strategy. + polarity : int + Either 1 or -1 to know on which pin of the ADS the Vmn is measured. + """ + + + self.tx.polarity = 1 + self.tx.turn_on() + if strategy == 'constant': + vab = tx_volt + self.tx.voltage = vab + self.tx.voltage_pulse(length=best_tx_injtime) + # set gains automatically + self.tx.adc_gain_auto() + self.rx.adc_gain_auto() + I = self.tx.current # measure current + vmn = self.rx.voltage + + elif strategy == 'vmax': + """ + # implement different strategies + I = 0 + vmn = 0 + count = 0 + while I < TX_CONFIG['current_max'] or abs(vmn) < RX_CONFIG['?']: # TODO: hardware related - place in config + + if count > 0: + # print('o', volt) + volt = volt + 2 + # print('>', volt) + count = count + 1 + if volt > 50: + break + + # set voltage for test + if count == 1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + self.DPS.write_register(0x0000, volt, 2) + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) # TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + for i in range(10): + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + time.sleep(best_tx_injtime) + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + + n = 0 + while ( + abs(vmn) > voltage_max or I > current_max) and volt > 0: # If starting voltage is too high, need to lower it down + # print('we are out of range! so decreasing volt') + volt = volt - 2 + self.DPS.write_register(0x0000, volt, 2) + # self.DPS.write_register(0x09, 1) # DPS5005 on + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + n += 1 + if n > 25: + break + + factor_I = (current_max) / I + factor_vmn = voltage_max / vmn + factor = factor_I + if factor_I > factor_vmn: + factor = factor_vmn + # print('factor', factor_I, factor_vmn) + vab = factor * volt * 0.9 + if vab > tx_max: + vab = tx_max + print(factor_I, factor_vmn, 'factor!!')""" + pass + + elif strategy == 'vmin': + """# implement different strategy + I = 20 + vmn = 400 + count = 0 + while I > 10 or abs(vmn) > 300: # TODO: hardware related - place in config + if count > 0: + volt = volt - 2 + print(volt, count) + count = count + 1 + if volt > 50: + break + + # set voltage for test + self.DPS.write_register(0x0000, volt, 2) + if count == 1: + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) # TODO: separate gain for P0 and P2 + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + + n = 0 + while ( + abs(vmn) < voltage_min or I < current_min) and volt > 0: # If starting voltage is too high, need to lower it down + # print('we are out of range! so increasing volt') + volt = volt + 2 + print(volt) + self.DPS.write_register(0x0000, volt, 2) + # self.DPS.write_register(0x09, 1) # DPS5005 on + # time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + n += 1 + if n > 25: + break + + vab = volt""" + pass + + self.tx.turn_off() + self.tx.polarity = 0 + rab = (vab * 1000.) / I # noqa + + self.exec_logger.debug(f'RAB = {rab:.2f} Ohms') + + return vab, rab \ No newline at end of file diff --git a/ohmpi.py b/ohmpi.py index 690ffd21..4b47540c 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -152,7 +152,7 @@ class OhmPi(object): # (last number) 0 is for mA, 3 is for A #self.soh_logger.debug(f'Battery voltage: {self.DPS.read_register(0x05,2 ):.3f}') TODO: SOH logger - print(self.DPS.read_register(0x05,2 )) + print(self.DPS.read_register(0x05,2)) self.switch_dps('off') @@ -783,7 +783,7 @@ class OhmPi(object): self.exec_logger.warning('Not on Raspberry Pi, skipping reboot...') def run_measurement(self, quad=None, nb_stack=None, injection_duration=None, - autogain=True, strategy='constant', tx_volt=5, best_tx_injtime=0.1, + autogain=True, strategy='constant', tx_volt=5., best_tx_injtime=0.1, cmd_id=None): """Measures on a quadrupole and returns transfer resistance. @@ -875,7 +875,7 @@ class OhmPi(object): if self.idps: tx_volt, polarity, Rab = self._compute_tx_volt( best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) - self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V') + self.exec_logger.debug(f'Best VAB found is {tx_volt:.3f}V') else: polarity = 1 Rab = None @@ -1246,8 +1246,6 @@ class OhmPi(object): t0 = time.time() self.reset_mux() - - # create filename with timestamp filename = self.settings["export_path"].replace('.csv', f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') -- GitLab