measure.py 11.4 KB
Newer Older
import importlib
from time import gmtime
import sys
import logging
from config import OHMPI_CONFIG
controller_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["controller"]["model"]}')
tx_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["tx"]["model"]}')
rx_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["rx"]["model"]}')
mux_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["mux"]["model"]}')
TX_CONFIG = tx_module.TX_CONFIG
RX_CONFIG = rx_module.RX_CONFIG

class OhmPiHardware():
    def __init__(self, **kwargs):
        self.tx = kwargs.pop('controller', tx_module.Controller())
        self.rx = kwargs.pop('tx', tx_module.Rx())
        self.tx = kwargs.pop('rx', tx_module.Tx())
        self.rx = kwargs.pop('mux', tx_module.Mux())
        self.exec_logger = kwargs.pop('exec_logger', None)
        self.data_logger = kwargs.pop('exec_logger', None)
        self.soh_logger = kwargs.pop('soh_logger', None)
        if self.exec_logger is None:
            self.exec_logger = logging.getLogger('exec_logger')
            log_format = '%(asctime)-15s | exec | %(levelname)s: %(message)s'
            exec_formatter = logging.Formatter(log_format)
            exec_formatter.converter = gmtime
            exec_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC'
            exec_handler = logging.StreamHandler(sys.stdout)
            exec_handler.setFormatter(exec_formatter)
            self.exec_logger.addHandler(exec_handler)
            self.exec_logger.setLevel('debug')
        if self.data_logger is None:
            self.data_logger = logging.getLogger('data_logger')
            log_format = '%(asctime)-15s | data | %(levelname)s: %(message)s'
            data_formatter = logging.Formatter(log_format)
            data_formatter.converter = gmtime
            data_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC'
            data_handler = logging.StreamHandler(sys.stdout)
            data_handler.setFormatter(data_formatter)
            self.data_logger.addHandler(data_handler)
            self.data_logger.setLevel('debug')
        if self.soh_logger is None:
            self.soh_logger = logging.getLogger('soh_logger')
            log_format = '%(asctime)-15s | soh | %(levelname)s: %(message)s'
            soh_formatter = logging.Formatter(log_format)
            soh_formatter.converter = gmtime
            soh_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC'
            soh_handler = logging.StreamHandler(sys.stdout)
            soh_handler.setFormatter(soh_formatter)
            self.soh_logger.addHandler(soh_handler)
            self.soh_logger.setLevel('debug')

    def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5):
        """Estimates best Tx voltage based on different strategies.
        At first a half-cycle is made for a short duration with a fixed
        known voltage. This gives us Iab and Rab. We also measure Vmn.
        A constant c = vmn/iab is computed (only depends on geometric
        factor and ground resistivity, that doesn't change during a
        quadrupole). Then depending on the strategy, we compute which
        vab to inject to reach the minimum/maximum Iab current or
        min/max Vmn.
        This function also compute the polarity on Vmn (on which pin
        of the ADS1115 we need to measure Vmn to get the positive value).

        Parameters
        ----------
        best_tx_injtime : float, optional
            Time in milliseconds for the half-cycle used to compute Rab.
        strategy : str, optional
            Either:
            - vmax : compute Vab to reach a maximum Iab and Vmn
            - constant : apply given Vab
        tx_volt : float, optional
            Voltage to apply for guessing the best voltage. 5 V applied
            by default. If strategy "constant" is chosen, constant voltage
            to applied is "tx_volt".

        Returns
        -------
        vab : float
            Proposed Vab according to the given strategy.
        polarity : int
            Either 1 or -1 to know on which pin of the ADS the Vmn is measured.
        """


        self.tx.polarity = 1
        self.tx.turn_on()
        if strategy == 'constant':
            vab = tx_volt
            self.tx.voltage = vab
            self.tx.voltage_pulse(length=best_tx_injtime)
            # set gains automatically
            self.tx.adc_gain_auto()
            self.rx.adc_gain_auto()
            I = self.tx.current  # measure current
            vmn = self.rx.voltage

        elif strategy == 'vmax':
            """
            # implement different strategies
            I = 0
            vmn = 0
            count = 0
            while I < TX_CONFIG['current_max'] or abs(vmn) < RX_CONFIG['?']:  # TODO: hardware related - place in config

                if count > 0:
                    # print('o', volt)
                    volt = volt + 2
                # print('>', volt)
                count = count + 1
                if volt > 50:
                    break

                # set voltage for test
                if count == 1:
                    self.DPS.write_register(0x09, 1)  # DPS5005 on
                    time.sleep(best_tx_injtime)  # inject for given tx time
                self.DPS.write_register(0x0000, volt, 2)
                # autogain
                self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address)
                self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address)
                gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0))
                gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))
                gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))
                gain_voltage = np.min([gain_voltage0, gain_voltage2])  # TODO: separate gain for P0 and P2
                self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address)
                self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address)
                # we measure the voltage on both A0 and A2 to guess the polarity
                for i in range(10):
                    I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt  # noqa measure current
                    U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.  # noqa measure voltage
                    U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.  # noqa
                    time.sleep(best_tx_injtime)

                # check polarity
                polarity = 1  # by default, we guessed it right
                vmn = U0
                if U0 < 0:  # we guessed it wrong, let's use a correction factor
                    polarity = -1
                    vmn = U2

            n = 0
            while (
                    abs(vmn) > voltage_max or I > current_max) and volt > 0:  # If starting voltage is too high, need to lower it down
                # print('we are out of range! so decreasing volt')
                volt = volt - 2
                self.DPS.write_register(0x0000, volt, 2)
                # self.DPS.write_register(0x09, 1)  # DPS5005 on
                I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt
                U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.
                polarity = 1  # by default, we guessed it right
                vmn = U0
                if U0 < 0:  # we guessed it wrong, let's use a correction factor
                    polarity = -1
                    vmn = U2
                n += 1
                if n > 25:
                    break

            factor_I = (current_max) / I
            factor_vmn = voltage_max / vmn
            factor = factor_I
            if factor_I > factor_vmn:
                factor = factor_vmn
            # print('factor', factor_I, factor_vmn)
            vab = factor * volt * 0.9
            if vab > tx_max:
                vab = tx_max
            print(factor_I, factor_vmn, 'factor!!')"""
            pass

        elif strategy == 'vmin':
            """# implement different strategy
            I = 20
            vmn = 400
            count = 0
            while I > 10 or abs(vmn) > 300:  # TODO: hardware related - place in config
                if count > 0:
                    volt = volt - 2
                print(volt, count)
                count = count + 1
                if volt > 50:
                    break

                # set voltage for test
                self.DPS.write_register(0x0000, volt, 2)
                if count == 1:
                    self.DPS.write_register(0x09, 1)  # DPS5005 on
                time.sleep(best_tx_injtime)  # inject for given tx time

                # autogain
                self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address)
                self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address)
                gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0))
                gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))
                gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))
                gain_voltage = np.min([gain_voltage0, gain_voltage2])  # TODO: separate gain for P0 and P2
                self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address)
                self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address)
                # we measure the voltage on both A0 and A2 to guess the polarity
                I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt  # noqa measure current
                U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.  # noqa measure voltage
                U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.  # noqa

                # check polarity
                polarity = 1  # by default, we guessed it right
                vmn = U0
                if U0 < 0:  # we guessed it wrong, let's use a correction factor
                    polarity = -1
                    vmn = U2

            n = 0
            while (
                    abs(vmn) < voltage_min or I < current_min) and volt > 0:  # If starting voltage is too high, need to lower it down
                # print('we are out of range! so increasing volt')
                volt = volt + 2
                print(volt)
                self.DPS.write_register(0x0000, volt, 2)
                # self.DPS.write_register(0x09, 1)  # DPS5005 on
                # time.sleep(best_tx_injtime)
                I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt
                U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.
                polarity = 1  # by default, we guessed it right
                vmn = U0
                if U0 < 0:  # we guessed it wrong, let's use a correction factor
                    polarity = -1
                    vmn = U2
                n += 1
                if n > 25:
                    break

            vab = volt"""
            pass

        self.tx.turn_off()
        self.tx.polarity = 0
        rab = (vab * 1000.) / I  # noqa

        self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')

        return vab, rab