ohmpi.py 80.3 KB
Newer Older
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                start_time = time.time()  # start counter
                for n in range(0, nb_stack * 2):  # for each half-cycles
                    # current injection
                    if (n % 2) == 0:
                        self.pin0.value = True
                        self.pin1.value = False
                        if autogain:  # select gain computed on first half cycle
                            self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860,
Clement Remi's avatar
Clement Remi committed
                                                        address=self.ads_voltage_address)
                            self.ads_voltage.mode= Mode.CONTINUOUS 
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    else:
                        self.pin0.value = False
                        self.pin1.value = True  # current injection nr2
                        if autogain:  # select gain computed on first half cycle
                            self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860,
Clement Remi's avatar
Clement Remi committed
                                                        address=self.ads_voltage_address)
                            self.ads_voltage.mode= Mode.CONTINUOUS 
                    self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}')
                    if self.board_version == 'mb.2023.0.0':
                        self.pin6.value = True  # IHM current injection led on
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    # measurement of current i and voltage u during injection
                    meas = np.zeros((self.nb_samples, 5)) * np.nan
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    start_delay = time.time()  # stating measurement time
                    dt = 0
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    for k in range(0, self.nb_samples):
                        # reading current value on ADS channels
                        meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt)
Clement Remi's avatar
Clement Remi committed
                        if self.board_version == 'mb.2023.0.0':
                            # if pinMN == 0:
                            #     meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                            #     meas[k, 3] = meas[k, 1]
                            #     meas[k, 4] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0
                            # else:
                            #     meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0
                            #     meas[k, 4] = meas[k, 1]
                            #     meas[k, 3] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                            u0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                            u2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.
Arnaud WATLET's avatar
Arnaud WATLET committed
                            u = np.max([u0, u2]) * (np.heaviside(u0 - u2, 1.) * 2 - 1.) - self.vmn_offset
                            meas[k, 1] = u
                            meas[k, 3] = u0
                            meas[k, 4] = u2 *-1.0
Clement Remi's avatar
Clement Remi committed
                        elif self.board_version == '22.10':
                            meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                        time.sleep(sampling_interval / 1000)
                        dt = time.time() - start_delay  # real injection time (s)
                        meas[k, 2] = time.time() - start_time
                        if dt > (injection_duration - 0 * sampling_interval / 1000.):
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                            break
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    # stop current injection
                    self.pin0.value = False
                    self.pin1.value = False
                        self.pin6.value = False  # IHM current injection led on
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    end_delay = time.time()

Arnaud WATLET's avatar
Arnaud WATLET committed
                    # truncate the meas array if we didn't fill the last samples  #TODO: check why
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                    meas = meas[:k + 1]
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    # measurement of current i and voltage u during off time
                    measpp = np.zeros((int(meas.shape[0] * (1 / duty_cycle - 1)), 5)) * np.nan
Arnaud WATLET's avatar
Arnaud WATLET committed
                    time.sleep(sampling_interval / 1000)
                    start_delay_off = time.time()  # stating measurement time
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    dt = 0
                    for k in range(0, measpp.shape[0]):
                        # reading current value on ADS channels
                        measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt)
Clement Remi's avatar
Clement Remi committed
                        if self.board_version == 'mb.2023.0.0':
                            # if pinMN == 0:
                            #     measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                            #     measpp[k, 3] = measpp[k, 1]
                            #     measpp[k, 4] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0
                            # else:
                            #     measpp[k, 3] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                            #     measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0
                            #     measpp[k, 4] = measpp[k, 1]
                            u0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
                            u2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.
Arnaud WATLET's avatar
Arnaud WATLET committed
                            u = np.max([u0, u2]) * (np.heaviside(u0 - u2, 1.) * 2 - 1.) - self.vmn_offset
                            measpp[k, 1] = u
                            measpp[k, 3] = u0
                            measpp[k, 4] = u2 * -1.0
Clement Remi's avatar
Clement Remi committed
                        elif self.board_version == '22.10':
                            measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000.
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                        else:
                            self.exec_logger.debug('unknown board')
                        time.sleep(sampling_interval / 1000)
Arnaud WATLET's avatar
Arnaud WATLET committed
                        dt = time.time() - start_delay_off  # real injection time (s)
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                        measpp[k, 2] = time.time() - start_time
                        if dt > (injection_duration - 0 * sampling_interval / 1000.):
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                            break
Arnaud WATLET's avatar
Arnaud WATLET committed
                    end_delay_off = time.time()
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    # truncate the meas array if we didn't fill the last samples
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                    measpp = measpp[:k + 1]
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    # we alternate on which ADS1115 pin we measure because of sign of voltage
                    if pinMN == 0:
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                        pinMN = 2  # noqa
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                        pinMN = 0  # noqa
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    # store data for full wave form
                    fulldata.append(meas)
                    fulldata.append(measpp)

                # TODO get battery voltage and warn if battery is running low
                # TODO send a message on SOH stating the battery level

remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                # let's do some calculation (out of the stacking loop)

                # i_stack = np.empty(2 * nb_stack, dtype=object)
                # vmn_stack = np.empty(2 * nb_stack, dtype=object)
                i_stack, vmn_stack = [], []
                # select appropriate window length to average the readings
                window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3)
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                for n, meas in enumerate(fulldata[::2]):
                    # take average from the samples per stack, then sum them all
                    # average for the last third of the stacked values
                    #  is done outside the loop
                    i_stack.append(meas[-int(window):, 0])
                    vmn_stack.append(meas[-int(window):, 1])
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                    sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0]))
                    vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1])
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                    if (n % 2) == 0:
                        sum_vmn = sum_vmn - vmn1
                        sum_ps = sum_ps + vmn1
                    else:
                        sum_vmn = sum_vmn + vmn1
                        sum_ps = sum_ps + vmn1

            else:
                sum_i = np.nan
                sum_vmn = np.nan
                sum_ps = np.nan
            if self.idps:
                self.DPS.write_register(0x0000, 0, 2)  # reset to 0 volt
                self.DPS.write_register(0x09, 0)  # DPS5005 off
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
            # reshape full data to an array of good size
            # we need an array of regular size to save in the csv
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                fulldata = np.vstack(fulldata)
                # we create a big enough array given nb_samples, number of
                # half-cycles (1 stack = 2 half-cycles), and twice as we
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                # measure decay as well
                a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 5)) * np.nan
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                a[:fulldata.shape[0], :] = fulldata
                fulldata = a
            else:
                np.array([[]])
            vmn_stack_mean = np.mean(
                [np.diff(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) / 2 for i in range(nb_stack)])
            vmn_std = np.sqrt(np.std(vmn_stack[::2]) ** 2 + np.std(
                vmn_stack[1::2]) ** 2)  # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])])
            i_stack_mean = np.mean(i_stack)
            i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])]))
            r_stack_mean = vmn_stack_mean / i_stack_mean
            r_stack_std = np.sqrt((vmn_std / vmn_stack_mean) ** 2 + (i_std / i_stack_mean) ** 2) * r_stack_mean
            ps_stack_mean = np.mean(
                np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)]))
            # create a dictionary and compute averaged values from all stacks
            # if self.board_version == 'mb.2023.0.0':
            d = {
                "time": datetime.now().isoformat(),
                "A": quad[0],
                "B": quad[1],
                "M": quad[2],
                "N": quad[3],
                "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0.,
                "Vmn [mV]": sum_vmn / (2 * nb_stack),
                "I [mA]": sum_i / (2 * nb_stack),
                "R [ohm]": sum_vmn / sum_i,
                "Ps [mV]": sum_ps / (2 * nb_stack),
                "nbStack": nb_stack,
Arnaud WATLET's avatar
Arnaud WATLET committed
                "Vab [V]": tx_volt if not out_of_range else 0.,
                "CPU temp [degC]": CPUTemperature().temperature,
                "Nb samples [-]": self.nb_samples,
                "fulldata": fulldata,
                "I_stack [mA]": i_stack_mean,
                "I_std [mA]": i_std,
                "I_per_stack [mA]": np.array([np.mean(i_stack[i * 2:i * 2 + 2]) for i in range(nb_stack)]),
                "Vmn_stack [mV]": vmn_stack_mean,
                "Vmn_std [mV]": vmn_std,
                "Vmn_per_stack [mV]": np.array(
                    [np.diff(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1))[0] / 2 for i in range(nb_stack)]),
                "R_stack [ohm]": r_stack_mean,
                "R_std [ohm]": r_stack_std,
                "R_per_stack [ohm]": np.mean(
                    [np.diff(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) / 2 for i in range(nb_stack)]) / np.array(
                    [np.mean(i_stack[i * 2:i * 2 + 2]) for i in range(nb_stack)]),
                "PS_per_stack [mV]": np.array(
                    [np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)]),
                "PS_stack [mV]": ps_stack_mean,
Arnaud WATLET's avatar
Arnaud WATLET committed
                "Rab [ohm]": Rab,
                "Pab [W]": tx_volt * i_stack_mean/1000.,
                "Gain_Vmn": gain,
                "Tx_battery [V]":self._read_battery_level()
            # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)]))
            # elif self.board_version == '22.10':
            #     d = {
            #         "time": datetime.now().isoformat(),
            #         "A": quad[0],
            #         "B": quad[1],
            #         "M": quad[2],
            #         "N": quad[3],
            #         "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0.,
            #         "Vmn [mV]": sum_vmn / (2 * nb_stack),
            #         "I [mA]": sum_i / (2 * nb_stack),
            #         "R [ohm]": sum_vmn / sum_i,
            #         "Ps [mV]": sum_ps / (2 * nb_stack),
            #         "nbStack": nb_stack,
            #         "Tx [V]": tx_volt if not out_of_range else 0.,
            #         "CPU temp [degC]": CPUTemperature().temperature,
            #         "Nb samples [-]": self.nb_samples,
            #         "fulldata": fulldata,
            #     }
        else:  # for testing, generate random data
            d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3],
                'R [ohm]': np.abs(np.random.randn(1)).tolist()}
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
        # to the data logger
        dd = d.copy()
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
        dd.pop('fulldata')  # too much for logger
        dd.update({'A': str(dd['A'])})
        dd.update({'B': str(dd['B'])})
        dd.update({'M': str(dd['M'])})
        dd.update({'N': str(dd['N'])})

        # round float to 2 decimal
        for key in dd.keys():
            if isinstance(dd[key], float):
                dd[key] = np.round(dd[key], 3)

        dd['cmd_id'] = str(cmd_id)
        self.data_logger.info(dd)
        self.pin5.value = False  # IHM led on measurement off
        if self.sequence is None:
            self.switch_dps('off')

    def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs):
        """Runs multiple sequences in a separate thread for monitoring mode.
           Can be stopped by 'OhmPi.interrupt()'.
           Additional arguments are passed to run_measurement().

        Parameters
        ----------
        cmd_id : str, optional
            Unique command identifier
        sequence_delay : int, optional
            Number of seconds at which the sequence must be started from each others.
        nb_meas : int, optional
            Number of time the sequence must be repeated.
        kwargs : dict, optional
            See help(k.run_measurement) for more info.
        """
        # self.run = True
        if sequence_delay is None:
            sequence_delay = self.settings['sequence_delay']
        sequence_delay = int(sequence_delay)
        if nb_meas is None:
            nb_meas = self.settings['nb_meas']
        self.status = 'running'
        self.exec_logger.debug(f'Status: {self.status}')
        self.exec_logger.debug(f'Measuring sequence: {self.sequence}')

        def func():
            for g in range(0, nb_meas):  # for time-lapse monitoring
                if self.status == 'stopping':
                    self.exec_logger.warning('Data acquisition interrupted')
                    break
                t0 = time.time()
                self.run_sequence(**kwargs)

                # sleeping time between sequence
                dt = sequence_delay - (time.time() - t0)
                if dt < 0:
                    dt = 0
                if nb_meas > 1:
                    time.sleep(dt)  # waiting for next measurement (time-lapse)
            self.status = 'idle'
        self.thread = threading.Thread(target=func)
        self.thread.start()

Arnaud WATLET's avatar
Arnaud WATLET committed
    def run_sequence(self, cmd_id=None, plot_realtime_fulldata=False, plot_ads=False, **kwargs):
        """Runs sequence synchronously (=blocking on main thread).
           Additional arguments are passed to run_measurement().

        Parameters
        ----------
        cmd_id : str, optional
            Unique command identifier
        """
        self.status = 'running'
        self.exec_logger.debug(f'Status: {self.status}')
        self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
        t0 = time.time()
        # create filename with timestamp
        filename = self.settings["export_path"].replace('.csv',
                                                        f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
        self.exec_logger.debug(f'Saving to {filename}')

        # make sure all multiplexer are off

        # measure all quadrupole of the sequence
        if self.sequence is None:
            n = 1
        else:
            n = self.sequence.shape[0]
        for i in range(0, n):
            if self.sequence is None:
                quad = np.array([0, 0, 0, 0])
            else:
                quad = self.sequence[i, :]  # quadrupole
            if self.status == 'stopping':
                break
            if i == 0:
                # call the switch_mux function to switch to the right electrodes
                # switch on DPS
                self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address)
                self.pin2 = self.mcp_board.get_pin(2) # dsp -
                self.pin2.direction = Direction.OUTPUT
                self.pin2.value = True
                self.pin3 = self.mcp_board.get_pin(3) # dsp -
                self.pin3.direction = Direction.OUTPUT
                self.pin3.value = True
                time.sleep (4)
                #self.switch_dps('on')
            time.sleep(.6)
            self.switch_mux_on(quad)
            # run a measurement
            if self.on_pi:
                acquired_data = self.run_measurement(quad, **kwargs)
            else:  # for testing, generate random data
                sum_vmn = np.random.rand(1)[0] * 1000.
                sum_i = np.random.rand(1)[0] * 100.
                cmd_id = np.random.randint(1000)
                    "time": datetime.now().isoformat(),
                    "A": quad[0],
                    "B": quad[1],
                    "M": quad[2],
                    "N": quad[3],
                    "inj time [ms]": self.settings['injection_duration'] * 1000.,
                    "Vmn [mV]": sum_vmn,
                    "I [mA]": sum_i,
                    "R [ohm]": sum_vmn / sum_i,
                    "Ps [mV]": np.random.randn(1)[0] * 100.,
                    "nbStack": self.settings['nb_stack'],
                    "Tx [V]": np.random.randn(1)[0] * 5.,
                    "CPU temp [degC]": np.random.randn(1)[0] * 50.,
                    "Nb samples [-]": self.nb_samples,
                self.data_logger.info(acquired_data)

            # switch mux off
            self.switch_mux_off(quad)
            # add command_id in dataset
            acquired_data.update({'cmd_id': cmd_id})
            # log data to the data logger
Guillaume Blanchy's avatar
Guillaume Blanchy committed
            # self.data_logger.info(f'{acquired_data}')
            # save data and print in a text file
            self.append_and_save(filename, acquired_data)
            self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}')
            if plot_realtime_fulldata:
                realtime_plot_window = 10
                plt.ion()
                last_measurement = acquired_data["fulldata"][~np.isnan(acquired_data["fulldata"][:, 2])]
                if i==0:
                    xlim = [last_measurement[:, 2][-1] - realtime_plot_window, last_measurement[:, 2][-1]]
Arnaud WATLET's avatar
Arnaud WATLET committed
                    fig, (ax1, ax2), lines = plot_fulldata(last_measurement, realtime=True, xlim=xlim, plot_ads=plot_ads)
Arnaud WATLET's avatar
Arnaud WATLET committed
                    fig, (ax1, ax2), lines, acquired_dataset = \
                            update_realtime_fulldata_plot(last_measurement, acquired_dataset, lines,
                                                          (ax1, ax2), fig, x_window=realtime_plot_window,plot_ads=plot_ads)
        self.switch_dps('off')
Arnaud WATLET's avatar
Arnaud WATLET committed
        if plot_realtime_fulldata:
Rémi's avatar
Rémi committed
            return fig,(ax1,ax2), (line1,line2), filename, acquired_dataset
        else:
            return filename
        
    def run_sequence_async(self, cmd_id=None, **kwargs):
        """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
            Additional arguments are passed to run_measurement().

        Parameters
        ----------
        cmd_id : str, optional
            Unique command identifier
        """

        def func():
            self.run_sequence(**kwargs)

        self.thread = threading.Thread(target=func)
        self.thread.start()
        self.status = 'idle'

    def rs_check(self, tx_volt=12., cmd_id=None):
        """Checks contact resistances

        Parameters
        ----------
        tx_volt : float
            Voltage of the injection
        cmd_id : str, optional
            Unique command identifier
        """
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        # create custom sequence where MN == AB
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        # we only check the electrodes which are in the sequence (not all might be connected)
Olivier Kaufmann's avatar
Olivier Kaufmann committed
        if self.sequence is None or not self.use_mux:
            quads = np.array([[1, 2, 1, 2]], dtype=np.uint32)
        else:
            elec = np.sort(np.unique(self.sequence.flatten()))  # assumed order
            quads = np.vstack([
                elec[:-1],
                elec[1:],
                elec[:-1],
                elec[1:],
            ]).T
        if self.idps:
            quads[:, 2:] = 0  # we don't open Vmn to prevent burning the MN part
            # as it has a smaller range of accepted voltage
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        # create filename to store RS
        export_path_rs = self.settings['export_path'].replace('.csv', '') \
                         + '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '_rs.csv'
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        # perform RS check
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        self.status = 'running'
        if self.on_pi:
            # make sure all mux are off to start with
            self.reset_mux()
Guillaume Blanchy's avatar
Guillaume Blanchy committed

            # measure all quad of the RS sequence
            for i in range(0, quads.shape[0]):
                quad = quads[i, :]  # quadrupole
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                self.switch_mux_on(quad)  # put before raising the pins (otherwise conflict i2c)
                d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=0.2, tx_volt=tx_volt, autogain=False)
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                if self.idps:
                    voltage = tx_volt * 1000.  # imposed voltage on dps5005
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                else:
                    voltage = d['Vmn [mV]']
                current = d['I [mA]']
                # compute resistance measured (= contact resistance)
                resist = abs(voltage / current) / 1000.
                # print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format(
rpi2.0's avatar
rpi2.0 committed
                #    current, voltage, resist))
                msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                      f'V: {voltage :>10.3f} mV, ' \
                      f'R: {resist :>10.3f} kOhm'

                self.exec_logger.debug(msg)

                # if contact resistance = 0 -> we have a short circuit!!
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                if resist < 1e-5:
                    msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm'
                    self.exec_logger.warning(msg)
                # save data in a text file
                self.append_and_save(export_path_rs, {
                    'A': quad[0],
                    'B': quad[1],
                    'RS [kOhm]': resist,
                })

                # close mux path and put pin back to GND
                self.switch_mux_off(quad)
        else:
            pass
        self.status = 'idle'
    #
    #         # TODO if interrupted, we would need to restore the values
    #         # TODO or we offer the possibility in 'run_measurement' to have rs_check each time?

    def set_sequence(self, sequence=None, cmd_id=None):
        """Sets the sequence to acquire

        Parameters
        ----------
        sequence : list, str
            sequence of quadrupoles
        cmd_id: str, optional
            Unique command identifier
        """
            self.sequence = np.array(sequence).astype(int)
            # self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32')
            status = True
        except Exception as e:
            self.exec_logger.warning(f'Unable to set sequence: {e}')
            status = False

    def stop(self, **kwargs):
        warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning)
        self.interrupt(**kwargs)

    def _switch_mux(self, electrode_nr, state, role):
        """Selects the right channel for the multiplexer cascade for a given electrode.
        Parameters
        ----------
        electrode_nr : int
            Electrode index to be switched on or off.
        state : str
            Either 'on' or 'off'.
        role : str
            Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board.
        """
        if not self.use_mux or not self.on_pi:
            if not self.on_pi:
                self.exec_logger.warning('Cannot reset mux while in simulation mode...')
                self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
                                         ' Set use_mux to True to use the multiplexer...')
        elif self.sequence is None and not self.use_mux:
            self.exec_logger.warning('Unable to switch MUX without a sequence')
        else:
            # choose with MUX board
            tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
            # find I2C address of the electrode and corresponding relay
            # considering that one MCP23017 can cover 16 electrodes
            i2c_address = 7 - (electrode_nr - 1) // 16  # quotient without rest of the division
            relay_nr = (electrode_nr-1) - ((electrode_nr-1) // 16) * 16
            if i2c_address is not None:
                # select the MCP23017 of the selected MUX board
                mcp2 = MCP23017(tca[i2c_address])
rpi2.0's avatar
rpi2.0 committed
                mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT
rpi2.0's avatar
rpi2.0 committed
                    mcp2.get_pin(relay_nr).value = True
rpi2.0's avatar
rpi2.0 committed
                    mcp2.get_pin(relay_nr).value = False
                self.exec_logger.debug(f'Switching relay {relay_nr} '
                                       f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}')
            else:
                self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
    def switch_dps(self,state='off'):
        """Switches DPS on or off.

            Parameters
            ----------
            state : str
                'on', 'off'
            """
Arnaud WATLET's avatar
Arnaud WATLET committed
        self.pin2 = self.mcp_board.get_pin(2) # dsp -
        self.pin2.direction = Direction.OUTPUT
Arnaud WATLET's avatar
Arnaud WATLET committed
        self.pin3 = self.mcp_board.get_pin(3) # dsp -
        self.pin3.direction = Direction.OUTPUT
        if state == 'on':
            self.pin2.value = True
            self.pin3.value = True
            self.exec_logger.debug(f'Switching DPS on')
            time.sleep(4)
        elif state == 'off':
            self.pin2.value = False
            self.pin3.value = False
            self.exec_logger.debug(f'Switching DPS off')


    def switch_mux_on(self, quadrupole, cmd_id=None):
        """Switches on multiplexer relays for given quadrupole.
        cmd_id : str, optional
            Unique command identifier
        quadrupole : list of 4 int
            List of 4 integers representing the electrode numbers.
        roles = ['A', 'B', 'M', 'N']
        # another check to be sure A != B
        if quadrupole[0] != quadrupole[1]:
            for i in range(0, 4):
                if quadrupole[i] > 0:
                    self._switch_mux(quadrupole[i], 'on', roles[i])
        else:
            self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!')
Rémi's avatar
Rémi committed
    
    def ohmpi_to_bert(self,fname,abmn_file,coord_file):
        """Export data to BERT format.

        Parameters
        ----------
        """
        
        abmn = np.loadtxt(abmn_file)
        nbr_abmn = len(abmn)
        data = np.loadtxt(fname, comments = '#', delimiter = ',',
                converters = None, skiprows = 1, usecols = [1,2,3,4,6,7], unpack = False,
                ndmin = 0, encoding = 'bytes', max_rows = None)
        coord = np.loadtxt(coord_file)
        with open(fname +'data.dat','w') as rho_data:
            rho_data.write(str(len(coord)))
            rho_data.write('\n')
            rho_data.write('# x y z')
            rho_data.write('\n')
            np.savetxt(rho_data,coord,delimiter=' ',fmt='%1.3f')
            rho_data.write(str(len(data)))
            rho_data.write('\n')
            rho_data.write('# a b m n u i ')
            rho_data.write('\n')
            np.savetxt(rho_data,data, fmt='%i %i %i %i %1.3f %1.3f')  
        
        
    def switch_mux_off(self, quadrupole, cmd_id=None):
        """Switches off multiplexer relays for given quadrupole.
Olivier Kaufmann's avatar
Olivier Kaufmann committed

        Parameters
        ----------
        cmd_id : str, optional
            Unique command identifier
        quadrupole : list of 4 int
            List of 4 integers representing the electrode numbers.
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        """
        roles = ['A', 'B', 'M', 'N']
        for i in range(0, 4):
            if quadrupole[i] > 0:
                self._switch_mux(quadrupole[i], 'off', roles[i])
Rémi's avatar
Rémi committed
    def test_led(self):
        """Interactive method to test the multiplexer."""
        self.mcp_board = MCP23008(self.i2c, address=self.mcp_board_address)
        self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run
        self.pin4.direction = Direction.OUTPUT
        self.pin5 = self.mcp_board.get_pin(5) # measurement_run
        self.pin5.direction = Direction.OUTPUT
        self.pin6 = self.mcp_board.get_pin(6) # stack_run
        self.pin6.direction = Direction.OUTPUT
        self.pin7 = self.mcp_board.get_pin(7) # battery_off
        self.pin7.direction = Direction.OUTPUT
Rémi's avatar
Rémi committed
        print('led on test')
Rémi's avatar
Rémi committed
        for i in range(30):
Rémi's avatar
Rémi committed
            print('Led on')
            self.exec_logger.debug(f'Led on')
Rémi's avatar
Rémi committed
            self.pin4.value = True
            self.pin5.value = True
            self.pin6.value = True
            self.pin7.value = True
            time.sleep(0.5)
Rémi's avatar
Rémi committed
            print('Led off')
            self.exec_logger.debug(f'Led off')
Rémi's avatar
Rémi committed
            self.pin4.value = False
            self.pin5.value = False
            self.pin6.value = False
            self.pin7.value = False
            time.sleep(0.5)
            

    def test_mux(self, activation_time=1.0, address=0x70):
        """Interactive method to test the multiplexer.

        Parameters
        ----------
        activation_time : float, optional
            Time in seconds during which the relays are activated.
        address : hex, optional
            Address of the multiplexer board to test (e.g. 0x70, 0x71, ...).
        """
        self.use_mux = True
        self.reset_mux()

        # choose with MUX board
        tca = adafruit_tca9548a.TCA9548A(self.i2c, address)

        # ask use some details on how to proceed
        a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!')
            print('run channel by channel test')
            electrode = int(input('Choose your electrode number (integer):'))
            electrodes = [electrode]
        elif a == '2':
            electrodes = range(1, 65)
        else:
            print('Wrong choice !')
            return

            # run the test
        for electrode_nr in electrodes:
            # find I2C address of the electrode and corresponding relay
            # considering that one MCP23017 can cover 16 electrodes
            i2c_address = 7 - (electrode_nr - 1) // 16  # quotient without rest of the division
            relay_nr = (electrode_nr-1) - ((electrode_nr-1) // 16) * 16


            if i2c_address is not None:
                # select the MCP23017 of the selected MUX board
                mcp2 = MCP23017(tca[i2c_address])
                mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT
                mcp2.get_pin(relay_nr).value = True
                print('electrode:', electrode_nr, ' activated...', end='', flush=True)
                time.sleep(activation_time)
                mcp2.get_pin(relay_nr).value = False
                print(' deactivated')
                time.sleep(activation_time)
        """Switches off all multiplexer relays.

        Parameters
        ----------
        cmd_id : str, optional
            Unique command identifier
        """
        if self.on_pi and self.use_mux:
            roles = ['A', 'B', 'M', 'N']
            for i in range(0, 4):
                for j in range(1, self.max_elec + 1):
                    self._switch_mux(j, 'off', roles[i])
            self.exec_logger.debug('All MUX switched off.')
        elif not self.on_pi:
            self.exec_logger.warning('Cannot reset mux while in simulation mode...')
        else:
            self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
                                     ' Set use_mux to True to use the multiplexer...')
    def _update_acquisition_settings(self, config):
        warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning)
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        self.update_settings(settings=config)
Guillaume Blanchy's avatar
Guillaume Blanchy committed
    def update_settings(self, settings: str, cmd_id=None):
        """Updates acquisition settings from a json file or dictionary.
        Parameters can be:
            - nb_electrodes (number of electrode used, if 4, no MUX needed)
            - injection_duration (in seconds)
            - nb_meas (total number of times the sequence will be run)
            - sequence_delay (delay in second between each sequence run)
            - nb_stack (number of stack for each quadrupole measurement)
            - export_path (path where to export the data, timestamp will be added to filename)
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        settings : str, dict
            Path to the .json settings file or dictionary of settings.
        cmd_id : str, optional
            Unique command identifier
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        if settings is not None:
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                if isinstance(settings, dict):
                    self.settings.update(settings)
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                    with open(settings) as json_file:
                        dic = json.load(json_file)
                    self.settings.update(dic)
                self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings))
                status = True
            except Exception as e:  # noqa
                self.exec_logger.warning('Unable to update settings.')
                status = False
            self.exec_logger.warning('Settings are missing...')
        return status
    # Properties
    @property
    def sequence(self):
        """Gets sequence"""
        if self._sequence is not None:
            assert isinstance(self._sequence, np.ndarray)
        return self._sequence
    @sequence.setter
    def sequence(self, sequence):
        """Sets sequence"""
        if sequence is not None:
            assert isinstance(sequence, np.ndarray)
            self.use_mux = True
        else:
            self.use_mux = False
        self._sequence = sequence
Olivier Kaufmann's avatar
Olivier Kaufmann committed
VERSION = '2.1.5'

print(colored(r' ________________________________' + '\n' +
              r'|  _  | | | ||  \/  || ___ \_   _|' + '\n' +
              r'| | | | |_| || .  . || |_/ / | |' + '\n' +
              r'| | | |  _  || |\/| ||  __/  | |' + '\n' +
              r'\ \_/ / | | || |  | || |    _| |_' + '\n' +
              r' \___/\_| |_/\_|  |_/\_|    \___/ ', 'red'))
print('Version:', VERSION)
if on_pi:
    print(colored(f'\u2611 Running on {platform} platform', 'green'))
    # TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...)
    #       and emit a warning otherwise
    if not arm64_imports:
        print(colored(f'Warning: Required packages are missing.\n'
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                      f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow'))
    print(colored(f'\u26A0 Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))

current_time = datetime.now()
print(f'local date and time : {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
# for testing
if __name__ == "__main__":
    ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
    if ohmpi.controller is not None:
        ohmpi.controller.loop_forever()