hardware_system.py 16.5 KB
Newer Older
try:
    import matplotlib.pyplot as plt
except Exception:
    pass
from OhmPi.logging_setup import create_stdout_logger
from OhmPi.utils import update_dict
from OhmPi.config import HARDWARE_CONFIG
from threading import Thread, Event, Barrier
ctl_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["ctl"]["model"]}')
pwr_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["pwr"]["model"]}')
tx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["rx"]["model"]}')
MUX_CONFIG = {}
for mux_id, mux_config in HARDWARE_CONFIG['mux']['boards'].items():
    mux_module = importlib.import_module(f'OhmPi.hardware_components.{mux_config["model"]}')
    MUX_CONFIG[mux_id] = mux_module.MUX_CONFIG
    MUX_CONFIG[mux_id].update(mux_config)
    MUX_CONFIG[mux_id].update({'id': mux_id})
    mux_boards.append(mux_id)
TX_CONFIG = tx_module.TX_CONFIG
RX_CONFIG = rx_module.RX_CONFIG

current_max = np.min([TX_CONFIG['current_max'], np.min([MUX_CONFIG[i].pop('current_max', np.inf) for i in mux_boards])])
voltage_max = np.min([TX_CONFIG['voltage_max'], np.min([MUX_CONFIG[i].pop('voltage_max', np.inf) for i in mux_boards])])
def elapsed_seconds(start_time):
    lap = datetime.datetime.utcnow() - start_time
    return lap.total_seconds()
        self.exec_logger = kwargs.pop('exec_logger', None)
        if self.exec_logger is None:
            self.exec_logger = create_stdout_logger('exec_hw')
        self.data_logger = kwargs.pop('exec_logger', None)
        if self.data_logger is None:
            self.data_logger = create_stdout_logger('data_hw')
        self.soh_logger = kwargs.pop('soh_logger', None)
        if self.soh_logger is None:
            self.soh_logger = create_stdout_logger('soh_hw')
        self.ctl = kwargs.pop('ctl', ctl_module.Ctl(exec_logger=self.exec_logger,
                                                    data_logger=self.data_logger,
                                                    soh_logger=self.soh_logger))
        self.rx = kwargs.pop('rx', rx_module.Rx(exec_logger=self.exec_logger,
                                                data_logger=self.data_logger,
                                                soh_logger=self.soh_logger,
                                                ctl=self.ctl))
        self.pwr = kwargs.pop('pwr', pwr_module.Pwr(exec_logger=self.exec_logger,
                                                    data_logger=self.data_logger,
                                                    soh_logger=self.soh_logger,
                                                    ctl=self.ctl))
        self.tx = kwargs.pop('tx', tx_module.Tx(exec_logger=self.exec_logger,
                                                data_logger=self.data_logger,
                                                soh_logger=self.soh_logger,
                                                ctl=self.ctl))
        self.tx.pwr = self.pwr
        self._cabling = kwargs.pop('cabling', {})
        self.mux_boards = kwargs.pop('mux', {'mux_1': mux_module.Mux(id='mux_1',
                                                                     exec_logger=self.exec_logger,
                                                                     data_logger=self.data_logger,
                                                                     soh_logger=self.soh_logger,
                                                                     ctl=self.ctl,
                                                                     cabling = self._cabling)})
        self.mux_barrier = Barrier(len(self.mux_boards) + 1)
        for _, mux in self.mux_boards.items():
            mux.barrier = self.mux_barrier
            for k, v in mux.cabling.items():
                update_dict(self._cabling, {k: (mux_id, k[0])})
        self.readings = np.array([])  # time series of acquired data
        self._start_time = None  # time of the beginning of a readings acquisition
        self._pulse = 0  # pulse number

    def _clear_values(self):
        self.readings = np.array([])
        self._start_time = None
        self._pulse = 0
    def _gain_auto(self):
        self.tx_sync.wait()
        self.tx.adc_gain_auto()
        self.rx.adc_gain_auto()

    def _inject(self, polarity=1, inj_time=None):
            self.tx.voltage_pulse(length=inj_time, polarity=polarity)
    def _set_mux_barrier(self):
        self.mux_barrier = Barrier(len(self.mux_boards) + 1)
        for mux in self.mux_boards:
            mux.barrier = self.mux_barrier

    @property
    def pulses(self):
        pulses = {}
        for i in np.unique(self.readings[:,1]):
            r = self.readings[self.readings[:, 1] == i, :]
            assert np.all(np.isclose(r[:,2], r[0, 2]))  # Polarity cannot change within a pulse
            pulses.update({i: {'polarity': int(r[0, 2]), 'iab': r[:,3], 'vmn' : r[:,4]}})  # TODO: check how to generalize in case of multi-channel RX
        return pulses

    def _read_values(self, sampling_rate, append=False):  # noqa
        if not append:
            self._clear_values()
            _readings = []
        else:
            _readings = self.readings.tolist()
        if not append or self._start_time is None:
            self._start_time = datetime.datetime.utcnow()
        while self.tx_sync.is_set():
            lap = datetime.datetime.utcnow()
            _readings.append([elapsed_seconds(self._start_time), self._pulse, self.tx.polarity, self.tx.current,
                              self.rx.voltage])
            sleep_time = self._start_time + datetime.timedelta(seconds = sample * sampling_rate / 1000) - lap
            time.sleep(np.max([0, sleep_time.total_seconds()]))
        self.readings = np.array(_readings)
        self._pulse += 1
Olivier Kaufmann's avatar
Olivier Kaufmann committed
    @property
    def sp(self):  # TODO: use a time window within pulses
        if self.readings.shape == (0,) or len(self.readings[self.readings[:,2]==1, :]) < 1 or len(self.readings[self.readings[:,2]==-1, :]) < 1:
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            self.exec_logger.warning('Unable to compute sp: readings should at least contain one positive and one negative pulse')
            return 0.
        else:
            n_pulses = int(np.max(self.readings[:, 1]))
            polarity = np.array([np.median(self.readings[self.readings[:, 1]==i, 2]) for i in range(n_pulses + 1)])
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            mean_vmn = []
            mean_iab = []
            for i in range(n_pulses + 1):
                mean_vmn.append(np.mean(self.readings[self.readings[:, 1]==i, 4]))
                mean_iab.append(np.mean(self.readings[self.readings[:, 1]==i, 3]))
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            mean_vmn = np.array(mean_vmn)
            mean_iab = np.array(mean_iab)
            sp = np.mean(mean_vmn[np.ix_(polarity==1)] - mean_vmn[np.ix_(polarity==-1)]) / 2
            return sp

    def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5,
                         vab_max=voltage_max, vmn_min=voltage_min):
        """Estimates best Tx voltage based on different strategies.
        At first a half-cycle is made for a short duration with a fixed
        known voltage. This gives us Iab and Rab. We also measure Vmn.
        A constant c = vmn/iab is computed (only depends on geometric
        factor and ground resistivity, that doesn't change during a
        quadrupole). Then depending on the strategy, we compute which
        vab to inject to reach the minimum/maximum Iab current or
        min/max Vmn.
        This function also compute the polarity on Vmn (on which pin
        of the ADS1115 we need to measure Vmn to get the positive value).

        Parameters
        ----------
        best_tx_injtime : float, optional
            Time in milliseconds for the half-cycle used to compute Rab.
        strategy : str, optional
            Either:
            - vmax : compute Vab to reach a maximum Iab without exceeding vab_max
            - vmin : compute Vab to reach at least vmn_min
            - constant : apply given Vab
        tx_volt : float, optional
            Voltage to apply for guessing the best voltage. 5 V applied
            by default. If strategy "constant" is chosen, constant voltage
            to applied is "tx_volt".
        vab_max : float, optional
            Maximum injection voltage to apply to tx (used by all strategies)
        vmn_min : float, optional
            Minimum voltage target for rx (used by vmin strategy)

        Returns
        -------
        vab : float
            Proposed Vab according to the given strategy.
        polarity:
            Polarity of VMN relative to polarity of VAB
        rab : float
            Resistance between injection electrodes
        vab = np.min([np.abs(tx_volt), vab_max])
        if self.rx.sampling_rate*1000 > best_tx_injtime:
            sampling_rate = best_tx_injtime  # TODO: check this...
        else:
            sampling_rate = self.tx.sampling_rate
        self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
        vmn = np.mean(self.readings[:,4])
        iab = np.mean(self.readings[:,3])
        # if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
        # then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
            if vab < vab_max and iab < current_max :
                vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab])  # TODO: check if setting at 90% of max as a safety margin is OK
            self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
            if vab <= vab_max and iab < current_max:
                vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab])  # TODO: check if setting at 90% of max as a safety margin is OK
        elif strategy != 'constant':
            self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
        else:
            self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
        self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
        if vmn < 0:
            polarity = -1  # TODO: check if we really need to return polarity
        else:
            polarity = 1
    def _plot_readings(self):
        # Plot graphs
        fig, ax = plt.subplots(nrows=2, sharex=True)
        ax[0].plot(self.readings[:, 0], self.readings[:, 3], '-r', marker='.', label='iab')
        ax[0].set_ylabel('Iab [mA]')
        ax[1].plot(self.readings[:, 0], self.readings[:, 2] * self.readings[:, 4], '-b', marker='.', label='vmn')
        ax[1].set_ylabel('Vmn [mV]')
    def vab_square_wave(self, vab, cycle_length, sampling_rate=None, cycles=3, polarity=1, append=False):
        self.tx.polarity = polarity
        lengths = [cycle_length/2]*2*cycles
        self._vab_pulses(vab, lengths, sampling_rate, append=append)
    def _vab_pulse(self, vab, length, sampling_rate=None, polarity=1, append=False):
        """ Gets VMN and IAB from a single voltage pulse
        """
        self.tx.polarity = polarity
        if sampling_rate is None:
            sampling_rate = RX_CONFIG['sampling_rate']
        if self.tx.pwr.voltage_adjustable:
            self.tx.pwr.voltage = vab
            vab = self.tx.pwr.voltage
        # set gains automatically
        gain_auto = Thread(target=self._gain_auto)
        injection = Thread(target=self._inject, kwargs={'inj_time': 0.2, 'polarity': polarity})
        gain_auto.start()
        injection.start()
        # reads current and voltage during the pulse
        injection = Thread(target=self._inject, kwargs={'inj_time':length, 'polarity': polarity})
        readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append})
        readings.start()
        injection.start()
        readings.join()
        injection.join()

    def _vab_pulses(self, vab, lengths, sampling_rate, polarities=None, append=False):
        n_pulses = len(lengths)
        if sampling_rate is None:
            sampling_rate = RX_CONFIG['sampling_rate']
        if polarities is not None:
            assert len(polarities)==n_pulses
        else:
            polarities = [-self.tx.polarity * np.heaviside(i % 2, -1.) for i in range(n_pulses)]
        if not append:
            self._clear_values()
            self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True)

    def switch_mux(self, electrodes, roles=None, state='off'):
        """Switches on multiplexer relays for given quadrupole.

        Parameters
        ----------
        electrodes : list
            List of integers representing the electrode ids.
        roles : list, optional
            List of roles of electrodes, optional
        state : str, optional
            Either 'on' or 'off'.
        """
        if roles is None:
            roles = ['A', 'B', 'M', 'N']
        if len(electrodes) == len(roles):
            # TODO: Check that we don't set incompatible roles to the same electrode
            elec_dict = {i: [] for i in roles}
            mux_workers = []
            for idx, elec in enumerate(electrodes):
                elec_dict[roles[idx]].append(elec)
                try:
                    mux = self._cabling[(elec, roles[idx])][0]
                    if mux not in mux_workers:
                        mux_workers.append(mux)
                except KeyError:
                    self.exec_logger.debug(f'Unable to switch {state} ({elec}, {roles[idx]}): not in cabling and will be ignored...')
                    status = False
            if status:
                mux_workers = list(set(mux_workers))
                b = Barrier(len(mux_workers)+1)
                self.mux_barrier = b
                for idx, mux in enumerate(mux_workers):
                    # Create a new thread to perform some work
                    self.mux_boards[mux].barrier = b
                    mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs={'elec_dict': elec_dict, 'state': state})
                    mux_workers[idx].start()
                self.mux_barrier.wait()
                for mux_worker in mux_workers:
                    mux_worker.join()
        else:
            self.exec_logger.error(
                f'Unable to switch {state} electrodes: number of electrodes and number of roles do not match!')
            status = False
        return status
    def test_mux(self, channel=None, activation_time=1.0):
        """Interactive method to test the multiplexer.

        Parameters
        ----------
        channel : tuple, optional
            (electrode_nr, role) to test.
        activation_time : float, optional
            Time in seconds during which the relays are activated.
        if channel is not None:
            try:
                electrodes = [int(channel[0])]
                roles = [channel[1]]
            except Exception as e:
                self.exec_logger.error(f'Unable to parse channel: {e}')
                return
            self.switch_mux(electrodes,roles,state='on')
            time.sleep(activation_time)
            self.switch_mux(electrodes,roles, state='off')
            for c in self._cabling.keys():
                self.exec_logger.info(f'Testing electrode {c[0]} with role {c[1]}.')
                self.switch_mux(electrodes=[c[0]],roles=[c[1]],state='on')
                self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='off')

    def reset_mux(self):
        """Switches off all multiplexer relays.
        """

        self.exec_logger.debug('Resetting all mux boards ...')
        for mux_id, mux in self.mux_boards.items():  # noqa
            self.exec_logger.debug(f'Resetting {mux_id}.')