hardware_system.py 26.7 KB
Newer Older
try:
    import matplotlib.pyplot as plt
except Exception:
    pass
Olivier Kaufmann's avatar
Olivier Kaufmann committed
from ohmpi.hardware_components.abstract_hardware_components import CtlAbstract
from ohmpi.logging_setup import create_stdout_logger
from ohmpi.utils import update_dict
from ohmpi.config import HARDWARE_CONFIG
from threading import Thread, Event, Barrier, BrokenBarrierError
# plt.switch_backend('agg')  # for thread safe operations...

Olivier Kaufmann's avatar
Olivier Kaufmann committed
# Define the default controller, a distinct controller could be defined for each tx, rx or mux board
# when using a distinct controller, the specific controller definition must be included in the component configuration
ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["ctl"]["model"]}')
pwr_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["pwr"]["model"]}')
tx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["rx"]["model"]}')
MUX_DEFAULT = HARDWARE_CONFIG['mux']['default']
MUX_CONFIG = HARDWARE_CONFIG['mux']['boards']
for k, v in MUX_CONFIG.items():
    MUX_CONFIG[k].update({'id': k})
    for k2, v2 in MUX_DEFAULT.items():
        MUX_CONFIG[k].update({k2: MUX_CONFIG[k].pop(k2, v2)})
TX_CONFIG = HARDWARE_CONFIG['tx']
for k, v in tx_module.SPECS['tx'].items():
    try:
        TX_CONFIG.update({k: TX_CONFIG.pop(k, v['default'])})
    except:
        print(f'Cannot set value {v} in TX_CONFIG[{k}]')
RX_CONFIG = HARDWARE_CONFIG['rx']
for k, v in rx_module.SPECS['rx'].items():
    try:
        RX_CONFIG.update({k: RX_CONFIG.pop(k, v['default'])})
    except:
        print(f'Cannot set value {v} in RX_CONFIG[{k}]')
current_max = np.min([TX_CONFIG['voltage_max']/50/TX_CONFIG['r_shunt'],  # TODO: replace 50 by a TX config
                      np.min(np.hstack((np.inf, [MUX_CONFIG[i].pop('current_max', np.inf) for i in MUX_CONFIG.keys()])))])
voltage_max = np.min([TX_CONFIG['voltage_max'], np.min(np.hstack((np.inf, [MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))])
def elapsed_seconds(start_time):
    lap = datetime.datetime.utcnow() - start_time
    return lap.total_seconds()
        self.exec_logger = kwargs.pop('exec_logger', None)
        self.exec_logger.event(f'OhmPiHardware\tinit\tbegin\t{datetime.datetime.utcnow()}')
        if self.exec_logger is None:
            self.exec_logger = create_stdout_logger('exec_hw')
        self.data_logger = kwargs.pop('exec_logger', None)
        if self.data_logger is None:
            self.data_logger = create_stdout_logger('data_hw')
        self.soh_logger = kwargs.pop('soh_logger', None)
        if self.soh_logger is None:
            self.soh_logger = create_stdout_logger('soh_hw')
        HARDWARE_CONFIG['ctl'].pop('model')
        HARDWARE_CONFIG['ctl'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
                                       'soh_logger': self.soh_logger})
        self.ctl = kwargs.pop('ctl', ctl_module.Ctl(**HARDWARE_CONFIG['ctl']))
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        # use controller as defined in kwargs if present otherwise use controller as defined in config.
        if isinstance(self.ctl, dict):
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            ctl_mod = self.ctl.pop('model', self.ctl)
            if isinstance(ctl_mod, str):
                ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
            self.ctl = ctl_mod.Ctl(**self.ctl)
        HARDWARE_CONFIG['rx'].pop('model')
        HARDWARE_CONFIG['rx'].update(**HARDWARE_CONFIG['rx'])
        HARDWARE_CONFIG['rx'].update({'ctl': HARDWARE_CONFIG['rx'].pop('ctl', self.ctl)})
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        if isinstance(HARDWARE_CONFIG['rx']['ctl'], dict):
            ctl_mod = HARDWARE_CONFIG['rx']['ctl'].pop('model', self.ctl)
            if isinstance(ctl_mod, str):
                ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
            HARDWARE_CONFIG['rx']['ctl'] = ctl_mod.Ctl(**HARDWARE_CONFIG['rx']['ctl'])
        HARDWARE_CONFIG['rx'].update({'connection':
                                          HARDWARE_CONFIG['rx'].pop('connection',
                                                                    HARDWARE_CONFIG['rx']['ctl'].interfaces[
                                                                                  HARDWARE_CONFIG['rx'].pop(
                                                                                      'interface_name', 'i2c')])})
        HARDWARE_CONFIG['rx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
                                       'soh_logger': self.soh_logger})
        HARDWARE_CONFIG['tx'].pop('ctl', None)
        self.rx = kwargs.pop('rx', rx_module.Rx(**HARDWARE_CONFIG['rx']))
        HARDWARE_CONFIG['pwr'].pop('model')
        HARDWARE_CONFIG['pwr'].update(**HARDWARE_CONFIG['pwr'])  # NOTE: Explain why this is needed or delete me
        HARDWARE_CONFIG['pwr'].update({'ctl': HARDWARE_CONFIG['pwr'].pop('ctl', self.ctl)})
        if isinstance(HARDWARE_CONFIG['pwr']['ctl'], dict):
            ctl_mod = HARDWARE_CONFIG['pwr']['ctl'].pop('model', self.ctl)
            if isinstance(ctl_mod, str):
                ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
            HARDWARE_CONFIG['pwr']['ctl'] = ctl_mod.Ctl(**HARDWARE_CONFIG['pwr']['ctl'])
        HARDWARE_CONFIG['pwr'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
                                      'soh_logger': self.soh_logger})
        self.pwr = kwargs.pop('pwr', pwr_module.Pwr(**HARDWARE_CONFIG['pwr']))
        HARDWARE_CONFIG['tx'].pop('model')
        HARDWARE_CONFIG['tx'].update(**HARDWARE_CONFIG['tx'])
        HARDWARE_CONFIG['tx'].update({'tx_sync': self.tx_sync})
        HARDWARE_CONFIG['tx'].update({'ctl': HARDWARE_CONFIG['tx'].pop('ctl', self.ctl)})
        if isinstance(HARDWARE_CONFIG['tx']['ctl'], dict):
            ctl_mod = HARDWARE_CONFIG['tx']['ctl'].pop('model', self.ctl)
            if isinstance(ctl_mod, str):
                ctl_mod = importlib.import_module(f'ohmpi.hardware_components.{ctl_mod}')
            HARDWARE_CONFIG['tx']['ctl'] = ctl_mod.Ctl(**HARDWARE_CONFIG['tx']['ctl'])
        HARDWARE_CONFIG['tx'].update({'connection': HARDWARE_CONFIG['tx'].pop('connection',
Arnaud WATLET's avatar
Arnaud WATLET committed
                                                                              HARDWARE_CONFIG['tx']['ctl'].interfaces[
                                                                                  HARDWARE_CONFIG['tx'].pop(
                                                                                      'interface_name', 'i2c')])})
        HARDWARE_CONFIG['tx'].pop('ctl', None)
        HARDWARE_CONFIG['tx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
                                      'soh_logger': self.soh_logger})
        self.tx = kwargs.pop('tx', tx_module.Tx(**HARDWARE_CONFIG['tx']))
        if isinstance(self.tx, dict):
            self.tx = tx_module.Tx(**self.tx)
        self.tx.pwr = self.pwr
        self._cabling = kwargs.pop('cabling', {})
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.mux_boards = {}
        for mux_id, mux_config in MUX_CONFIG.items():
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            mux_config.update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
                               'soh_logger': self.soh_logger})
            mux_config.update(**MUX_CONFIG[mux_id])
            mux_config.update({'ctl': mux_config.pop('ctl', self.ctl)})
            mux_module = importlib.import_module(f'ohmpi.hardware_components.{mux_config["model"]}')
            if isinstance(mux_config['ctl'], dict):
                mux_ctl_module = importlib.import_module(f'ohmpi.hardware_components.{mux_config["ctl"]["model"]}')
                mux_config['ctl'] = mux_ctl_module.Ctl(**mux_config['ctl'])  # (**self.ctl)
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            assert issubclass(type(mux_config['ctl']), CtlAbstract)
            mux_config.update({'connection': mux_config.pop('connection', mux_config['ctl'].interfaces[mux_config.pop('interface_name', 'i2c')])})
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            mux_config['id'] = mux_id
Olivier Kaufmann's avatar
Olivier Kaufmann committed

            self.mux_boards[mux_id] = mux_module.Mux(**mux_config)

        self.mux_barrier = Barrier(len(self.mux_boards) + 1)
        for mux_id, mux in self.mux_boards.items():
            mux.barrier = self.mux_barrier
            for k, v in mux.cabling.items():
                update_dict(self._cabling, {k: (mux_id, k[0])})

        # Complete OhmPiHardware initialization
        self.readings = np.array([])  # time series of acquired data
        self._start_time = None  # time of the beginning of a readings acquisition
        self._pulse = 0  # pulse number
        self.exec_logger.event(f'OhmPiHardware\tinit\tend\t{datetime.datetime.utcnow()}')

    def _clear_values(self):
        self.readings = np.array([])
        self._start_time = None
        self._pulse = 0
    def _gain_auto(self, polarities=(1, -1)):  # TODO: improve _gain_auto
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.exec_logger.event(f'OhmPiHardware\ttx_rx_gain_auto\tbegin\t{datetime.datetime.utcnow()}')
        current, voltage = 0., 0.
        for pol in polarities:
            self.tx.polarity = pol
Arnaud WATLET's avatar
Arnaud WATLET committed
            # self.tx_sync.wait()
            # set gains automatically
            injection = Thread(target=self._inject, kwargs={'injection_duration': 0.2, 'polarity': pol})
            # readings = Thread(target=self._read_values)
            get_tx_gain = Thread(target=self.tx.gain_auto)
            get_rx_gain = Thread(target=self.rx.gain_auto)
            injection.start()
            get_tx_gain.start()  # TODO: add a barrier to synchronize?
            get_rx_gain.start()
            get_tx_gain.join()
            get_rx_gain.join()
            injection.join()
            tx_gains.append(self.tx.gain)
            rx_gains.append(self.rx.gain)
            # v = self.readings[:, 2] != 0
            # current = max(current, np.mean(self.readings[v, 3]))
            # voltage = max(voltage, np.abs(np.mean(self.readings[v, 2] * self.readings[v, 4])))
Arnaud WATLET's avatar
Arnaud WATLET committed
        self.tx.polarity = 0
        self.tx.gain = min(tx_gains)
        self.rx.gain = min(rx_gains)
        # self.rx.gain_auto(voltage)
        self.exec_logger.event(f'OhmPiHardware\ttx_rx_gain_auto\tend\t{datetime.datetime.utcnow()}')
    def _inject(self, polarity=1, injection_duration=None):  # TODO: deal with voltage or current pulse
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.exec_logger.event(f'OhmPiHardware\tinject\tbegin\t{datetime.datetime.utcnow()}')
Arnaud WATLET's avatar
Arnaud WATLET committed
        print('inject')
        self.tx.voltage_pulse(length=injection_duration, polarity=polarity)
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.exec_logger.event(f'OhmPiHardware\tinject\tend\t{datetime.datetime.utcnow()}')
    def _set_mux_barrier(self):
        self.mux_barrier = Barrier(len(self.mux_boards) + 1)
        for mux in self.mux_boards:
            mux.barrier = self.mux_barrier

    @property
    def pulses(self):
        pulses = {}
        for i in np.unique(self.readings[:, 1]):
            r = self.readings[self.readings[:, 1] == i, :]
            assert np.all(np.isclose(r[:, 2], r[0, 2]))  # Polarity cannot change within a pulse
            # TODO: check how to generalize in case of multi-channel RX
            pulses.update({i: {'polarity': int(r[0, 2]), 'iab': r[:, 3], 'vmn': r[:, 4]}})
        return pulses

    def _read_values(self, sampling_rate=None, append=False):  # noqa
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.exec_logger.event(f'OhmPiHardware\tread_values\tbegin\t{datetime.datetime.utcnow()}')
            _readings = []
        else:
            _readings = self.readings.tolist()
        if sampling_rate is None:
            sampling_rate = self.rx.sampling_rate
        lap = datetime.datetime.utcnow()  # just in case tx_sync is not set immediately after passing wait
        self.tx_sync.wait()  #
        if not append or self._start_time is None:
            self._start_time = datetime.datetime.utcnow()
            # TODO: Check if replacing the following two options by a reset_buffer method of TX would be OK
            time.sleep(np.max([self.rx._latency, self.tx._latency])) # if continuous mode
            # _ = self.rx.voltage # if not continuous mode
        while self.tx_sync.is_set():
            lap = datetime.datetime.utcnow()
            r = [elapsed_seconds(self._start_time), self._pulse, self.tx.polarity, self.tx.current, self.rx.voltage]
            if self.tx_sync.is_set():
                sample += 1
                _readings.append(r)
                sleep_time = self._start_time + datetime.timedelta(seconds=sample / sampling_rate) - lap
                if sleep_time.total_seconds() < 0.:
                    # TODO: count how many samples were skipped to make a stat that could be used to qualify pulses
                    sample += int(sampling_rate * np.abs(sleep_time.total_seconds())) + 1
                    sleep_time = self._start_time + datetime.timedelta(seconds=sample / sampling_rate) - lap
                time.sleep(np.max([0., sleep_time.total_seconds()]))
        self.exec_logger.debug(f'pulse {self._pulse}: elapsed time {(lap-self._start_time).total_seconds()} s')
        self.exec_logger.debug(f'pulse {self._pulse}: total samples {len(_readings)}')
        self.readings = np.array(_readings)
        self._pulse += 1
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.exec_logger.event(f'OhmPiHardware\tread_values\tend\t{datetime.datetime.utcnow()}')
    def last_rho(self):
        v = self.readings[:, 2] != 0
        if len(v) > 1:
            # return np.mean(np.abs(self.readings[v, 4] - self.sp) / self.readings[v, 3])
            return np.mean(self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3])
    def last_dev(self):
        if len(self.readings) > 1:
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            v = self.readings[:, 2] != 0  # exclude sample where there is no injection
            return 100. * np.std(self.readings[v, 2] * (self.readings[v, 4] - self.sp) / self.readings[v, 3]) / self.last_rho
Olivier Kaufmann's avatar
Olivier Kaufmann committed
    @property
    def sp(self):  # TODO: allow for different strategies for computing sp (i.e. when sp drift is not linear)
        if self.readings.shape == (0,) or len(self.readings[self.readings[:, 2] == 1, :]) < 1 or \
                len(self.readings[self.readings[:, 2] == -1, :]) < 1:
            self.exec_logger.warning('Unable to compute sp: readings should at least contain one positive and one '
                                     'negative pulse')
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            return 0.
        else:
            n_pulses = int(np.max(self.readings[:, 1]))
            polarity = np.array([np.median(self.readings[self.readings[:, 1] == i, 2]) for i in range(n_pulses + 1)])
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            mean_vmn = []
            mean_iab = []
            for i in range(n_pulses + 1):
Olivier Kaufmann's avatar
Olivier Kaufmann committed
                mean_vmn.append(np.mean(self.readings[self.readings[:, 1] == i, 4]))
                mean_iab.append(np.mean(self.readings[self.readings[:, 1] == i, 3]))
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            mean_vmn = np.array(mean_vmn)
            mean_iab = np.array(mean_iab)
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            sp = np.mean(mean_vmn[np.ix_(polarity == 1)] + mean_vmn[np.ix_(polarity == -1)]) / 2
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            return sp

    def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
        """Estimates best Tx voltage based on different strategies.
        At first a half-cycle is made for a short duration with a fixed
        known voltage. This gives us Iab and Rab. We also measure Vmn.
        A constant c = vmn/iab is computed (only depends on geometric
        factor and ground resistivity, that doesn't change during a
        quadrupole). Then depending on the strategy, we compute which
        vab to inject to reach the minimum/maximum Iab current or
        min/max Vmn.
        This function also compute the polarity on Vmn (on which pin
        of the ADS1115 we need to measure Vmn to get the positive value).

        Parameters
        ----------
        pulse_duration : float, optional
            Time in seconds for the pulse used to compute Rab.
            - vmax : compute Vab to reach a maximum Iab without exceeding vab_max
            - vmin : compute Vab to reach at least vmn_min
            - constant : apply given Vab
        tx_volt : float, optional
            Voltage to apply for guessing the best voltage. 5 V applied
            by default. If strategy "constant" is chosen, constant voltage
            to applied is "tx_volt".
        vab_max : float, optional
            Maximum injection voltage to apply to tx (used by all strategies)
        vmn_min : float, optional
            Minimum voltage target for rx (used by vmin strategy)

        Returns
        -------
        vab : float
            Proposed Vab according to the given strategy.
        polarity:
            Polarity of VMN relative to polarity of VAB
        rab : float
            Resistance between injection electrodes
        vab = np.min([np.abs(tx_volt), vab_max])
        if 1. / self.rx.sampling_rate > pulse_duration:
            sampling_rate = 1. / pulse_duration  # TODO: check this...
        else:
            sampling_rate = self.tx.sampling_rate
        self._vab_pulse(vab=vab, duration=pulse_duration, sampling_rate=sampling_rate)  # TODO: use a square wave pulse?
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        vmn = np.mean(self.readings[:, 4])
        iab = np.mean(self.readings[:, 3])
        # if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
        # then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            if vab < vab_max and iab < current_max:
                vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab])  # TODO: check if setting at 90% of max as a safety margin is OK
            self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
            if vab <= vab_max and iab < current_max:
                vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab])  # TODO: check if setting at 90% of max as a safety margin is OK
        elif strategy != 'constant':
            self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
        else:
            self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
        self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
        if vmn < 0:
            polarity = -1  # TODO: check if we really need to return polarity
        else:
            polarity = 1
    def _plot_readings(self, save_fig=False):
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        fig, ax = plt.subplots(nrows=5, sharex=True)
        ax[0].plot(self.readings[:, 0], self.readings[:, 3], '-r', marker='.', label='iab')
        ax[0].set_ylabel('Iab [mA]')
        ax[1].plot(self.readings[:, 0], self.readings[:, 2] * (self.readings[:, 4] - self.sp) , '-b', marker='.', label='vmn')
        ax[1].set_ylabel('Vmn [mV]')
        ax[2].plot(self.readings[:, 0], self.readings[:, 2], '-g', marker='.', label='polarity')
        ax[2].set_ylabel('polarity [-]')
        v = self.readings[:, 2] != 0
        ax[3].plot(self.readings[v, 0], (self.readings[v, 2] * (self.readings[v, 4] - self.sp)) / self.readings[v, 3],
                   '-m', marker='.', label='R [ohm]')
        ax[3].set_ylabel('R [ohm]')
        ax[4].plot(self.readings[v, 0], np.ones_like(self.readings[v,0]) * self.sp, '-k', marker='.', label='SP [mV]')
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        ax[4].set_ylabel('SP [mV]')
        if save_fig:
            fig.savefig(f'figures/test.png')
        else:
            plt.show()
    def calibrate_rx_bias(self):
        self.rx._bias += (np.mean(self.readings[self.readings[:, 2] == 1, 4])
                          + np.mean(self.readings[self.readings[:, 2] == -1, 4])) / 2.
    def vab_square_wave(self, vab, cycle_duration, sampling_rate=None, cycles=3, polarity=1, duty_cycle=1.,
                        append=False):
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.exec_logger.event(f'OhmPiHardware\tvab_square_wave\tbegin\t{datetime.datetime.utcnow()}')
        self._gain_auto()
        assert 0. <= duty_cycle <= 1.
        if duty_cycle < 1.:
            durations = [cycle_duration/2 * duty_cycle, cycle_duration/2*(1.-duty_cycle)] * 2 * cycles
            pol = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in range(2 * cycles)]
            polarities = [0] * (len(pol) * 2)
            polarities[0::2] = pol
        else:
            durations = [cycle_duration / 2] * 2 * cycles
            polarities = None
        self._vab_pulses(vab, durations, sampling_rate, polarities=polarities,  append=append)
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        self.exec_logger.event(f'OhmPiHardware\tvab_square_wave\tend\t{datetime.datetime.utcnow()}')
    def _vab_pulse(self, vab, duration, sampling_rate=None, polarity=1, append=False):
        """ Gets VMN and IAB from a single voltage pulse
        """
Arnaud WATLET's avatar
Arnaud WATLET committed
        #self.tx.polarity = polarity
        if sampling_rate is None:
            sampling_rate = RX_CONFIG['sampling_rate']
        if self.tx.pwr.voltage_adjustable:
            self.tx.pwr.voltage = vab
            vab = self.tx.pwr.voltage
        # reads current and voltage during the pulse
        injection = Thread(target=self._inject, kwargs={'injection_duration': duration, 'polarity': polarity})
        readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append})
        readings.start()
        injection.start()
        readings.join()
        injection.join()
        self.tx.polarity = 0
    def _vab_pulses(self, vab, durations, sampling_rate, polarities=None, append=False):
        n_pulses = len(durations)
        self.exec_logger.debug(f'n_pulses: {n_pulses}')
        if sampling_rate is None:
            sampling_rate = RX_CONFIG['sampling_rate']
        if polarities is not None:
            assert len(polarities) == n_pulses
            polarities = [-int(self.tx.polarity * np.heaviside(i % 2, -1.)) for i in range(n_pulses)]
        if not append:
            self._clear_values()
            self._vab_pulse(self, duration=durations[i], sampling_rate=sampling_rate, polarity=polarities[i],
    def switch_mux(self, electrodes, roles=None, state='off', **kwargs):
        """Switches on multiplexer relays for given quadrupole.

        Parameters
        ----------
        electrodes : list
            List of integers representing the electrode ids.
        roles : list, optional
            List of roles of electrodes, optional
        state : str, optional
            Either 'on' or 'off'.
        """
        self.exec_logger.event(f'OhmPiHardware\tswitch_mux\tbegin\t{datetime.datetime.utcnow()}')
        if roles is None:
            roles = ['A', 'B', 'M', 'N']
        if len(electrodes) == len(roles):
            # TODO: Check that we don't set incompatible roles to the same electrode
            elec_dict = {i: [] for i in roles}
            mux_workers = []
            for idx, elec in enumerate(electrodes):
                elec_dict[roles[idx]].append(elec)
                try:
                    mux = self._cabling[(elec, roles[idx])][0]
                    if mux not in mux_workers:
                        mux_workers.append(mux)
                except KeyError:
Olivier Kaufmann's avatar
Olivier Kaufmann committed
                    self.exec_logger.debug(f'Unable to switch {state} ({elec}, {roles[idx]})'
                                           f': not in cabling and will be ignored...')
                    status = False
            if status:
                mux_workers = list(set(mux_workers))
                b = Barrier(len(mux_workers)+1)
                self.mux_barrier = b
                for idx, mux in enumerate(mux_workers):
                    # Create a new thread to perform some work
                    self.mux_boards[mux].barrier = b
                    kwargs.update({'elec_dict': elec_dict, 'state': state})
                    mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs=kwargs)
                    mux_workers[idx].start()
                try:
                    self.mux_barrier.wait()
                    for mux_worker in mux_workers:
                        mux_worker.join()
                except BrokenBarrierError:
                    self.exec_logger.warning('Switching aborted')
                    status = False
        else:
            self.exec_logger.error(
                f'Unable to switch {state} electrodes: number of electrodes and number of roles do not match!')
        self.exec_logger.event(f'OhmPiHardware\tswitch_mux\tend\t{datetime.datetime.utcnow()}')
    def test_mux(self, channel=None, activation_time=1.0):
        """Interactive method to test the multiplexer.

        Parameters
        ----------
        channel : tuple, optional
            (electrode_nr, role) to test.
        activation_time : float, optional
            Time in seconds during which the relays are activated.
        if channel is not None:
            try:
                electrodes = [int(channel[0])]
                roles = [channel[1]]
            except Exception as e:
                self.exec_logger.error(f'Unable to parse channel: {e}')
                return
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            self.switch_mux(electrodes, roles, state='on')
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            self.switch_mux(electrodes, roles, state='off')
            for c in self._cabling.keys():
                self.exec_logger.info(f'Testing electrode {c[0]} with role {c[1]}.')
Olivier Kaufmann's avatar
Olivier Kaufmann committed
                self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='on')
                self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='off')

    def reset_mux(self):
        """Switches off all multiplexer relays.
        """

        self.exec_logger.debug('Resetting all mux boards ...')
        self.exec_logger.event(f'OhmPiHardware\treset_mux\tbegin\t{datetime.datetime.utcnow()}')
        for mux_id, mux in self.mux_boards.items():  # noqa
            self.exec_logger.debug(f'Resetting {mux_id}.')
Olivier Kaufmann's avatar
Olivier Kaufmann committed
            mux.reset()
        self.exec_logger.event(f'OhmPiHardware\treset_mux\tend\t{datetime.datetime.utcnow()}')