ohmpi.py 54 KB
Newer Older
        if self.idps:
            quads[:, 2:] = 0  # we don't open Vmn to prevent burning the MN part
            # as it has a smaller range of accepted voltage
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        # create filename to store RS
        export_path_rs = self.settings['export_path'].replace('.csv', '') \
                         + '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '_rs.csv'
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        # perform RS check
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        self.status = 'running'
        if self.on_pi:
            # make sure all mux are off to start with
            self.reset_mux()
Guillaume Blanchy's avatar
Guillaume Blanchy committed

            # measure all quad of the RS sequence
            for i in range(0, quads.shape[0]):
                quad = quads[i, :]  # quadrupole
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                self.switch_mux_on(quad)  # put before raising the pins (otherwise conflict i2c)
                d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=0.2, tx_volt=tx_volt, autogain=False)
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                if self.idps:
                    voltage = tx_volt * 1000.  # imposed voltage on dps5005
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                else:
                    voltage = d['Vmn [mV]']
                current = d['I [mA]']
                # compute resistance measured (= contact resistance)
                resist = abs(voltage / current) / 1000.
                # print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format(
rpi2.0's avatar
rpi2.0 committed
                #    current, voltage, resist))
                msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \
                      f'V: {voltage :>10.3f} mV, ' \
                      f'R: {resist :>10.3f} kOhm'
                # if contact resistance = 0 -> we have a short circuit!!
                if resist < 1e-5:
                    msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm'
                    self.exec_logger.warning(msg)

                # save data in a text file
                self.append_and_save(export_path_rs, {
                    'A': quad[0],
                    'B': quad[1],
                    'RS [kOhm]': resist,
                })

                # close mux path and put pin back to GND
                self.switch_mux_off(quad)
        else:
            pass
        self.status = 'idle'
    #
    #         # TODO if interrupted, we would need to restore the values
    #         # TODO or we offer the possibility in 'run_measurement' to have rs_check each time?

    def set_sequence(self, sequence=None, cmd_id=None):
            self.sequence = np.array(sequence).astype(int)
            # self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32')
            status = True
        except Exception as e:
            self.exec_logger.warning(f'Unable to set sequence: {e}')
            status = False

        warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning)
    def _switch_mux(self, electrode_nr, state, role):
        """Selects the right channel for the multiplexer cascade for a given electrode.
        Parameters
        ----------
        electrode_nr : int
            Electrode index to be switched on or off.
        state : str
            Either 'on' or 'off'.
        role : str
            Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board.
        """
        if not self.use_mux or not self.on_pi:
            if not self.on_pi:
                self.exec_logger.warning('Cannot reset mux while in simulation mode...')
                self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
                                         ' Set use_mux to True to use the multiplexer...')
        elif self.sequence is None:
            self.exec_logger.warning('Unable to switch MUX without a sequence')
        else:
            # choose with MUX board
            tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
            # find I2C address of the electrode and corresponding relay
            # considering that one MCP23017 can cover 16 electrodes
            i2c_address = 7 - (electrode_nr - 1) // 16  # quotient without rest of the division
            relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1
            if i2c_address is not None:
                # select the MCP23017 of the selected MUX board
                mcp2 = MCP23017(tca[i2c_address])
                mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT
                if state == 'on':
                    mcp2.get_pin(relay_nr - 1).value = True
                else:
                    mcp2.get_pin(relay_nr - 1).value = False
                self.exec_logger.debug(f'Switching relay {relay_nr} '
                                       f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}')
            else:
                self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
    def switch_mux_on(self, quadrupole, cmd_id=None):
        """Switches on multiplexer relays for given quadrupole.
        quadrupole : list of 4 int
            List of 4 integers representing the electrode numbers.
        roles = ['A', 'B', 'M', 'N']
        # another check to be sure A != B
        if quadrupole[0] != quadrupole[1]:
            for i in range(0, 4):
                if quadrupole[i] > 0:
                    self._switch_mux(quadrupole[i], 'on', roles[i])
        else:
            self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!')
    def switch_mux_off(self, quadrupole, cmd_id=None):
        """Switches off multiplexer relays for given quadrupole.
Olivier Kaufmann's avatar
Olivier Kaufmann committed

        Parameters
        ----------
        quadrupole : list of 4 int
            List of 4 integers representing the electrode numbers.
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        """
        roles = ['A', 'B', 'M', 'N']
        for i in range(0, 4):
            if quadrupole[i] > 0:
                self._switch_mux(quadrupole[i], 'off', roles[i])
    def test_mux(self, activation_time=1.0, address=0x70):
        """Interactive method to test the multiplexer.

        Parameters
        ----------
        activation_time : float, optional
            Time in seconds during which the relays are activated.
        address : hex, optional
            Address of the multiplexer board to test (e.g. 0x70, 0x71, ...).
        """
        self.use_mux = True
        self.reset_mux()

        # choose with MUX board
        tca = adafruit_tca9548a.TCA9548A(self.i2c, address)

        # ask use some details on how to proceed
        a = input(' if vous want try 1 channel choose 1, if you want try all channel choose 2!') 
        if a == '1':
            print("run channel by channel test") 
            electrode = int(input('Choose your electrode number (integer):')) 
            electrodes = [electrode]
        elif a == '2':
            electrodes = range(1, 65)
        else:
            print ("Wrong choice !")
            return 
        
        # run the test
        for electrode_nr in electrodes:
            # find I2C address of the electrode and corresponding relay
            # considering that one MCP23017 can cover 16 electrodes
            i2c_address = 7 - (electrode_nr - 1) // 16  # quotient without rest of the division
            relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1

            if i2c_address is not None:
                # select the MCP23017 of the selected MUX board
                mcp2 = MCP23017(tca[i2c_address])
                mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT

                # activate relay for given time    
                mcp2.get_pin(relay_nr - 1).value = True
                print('electrode:', electrode_nr, ' activated...', end='', flush=True) 
                time.sleep(activation_time) 
                mcp2.get_pin(relay_nr - 1).value = False
                print(' deactivated' ) 
                time.sleep(activation_time) 
        print('Test finished.')

        """Switches off all multiplexer relays."""
        if self.on_pi and self.use_mux:
            roles = ['A', 'B', 'M', 'N']
            for i in range(0, 4):
                for j in range(1, self.max_elec + 1):
                    self._switch_mux(j, 'off', roles[i])
            self.exec_logger.debug('All MUX switched off.')
        elif not self.on_pi:
            self.exec_logger.warning('Cannot reset mux while in simulation mode...')
        else:
            self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
                                     ' Set use_mux to True to use the multiplexer...')
    def _update_acquisition_settings(self, config):
        warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning)
        self.update_settings(settings=config)
    def update_settings(self, settings: str, cmd_id=None):
        """Updates acquisition settings from a json file or dictionary.
        Parameters can be:
            - nb_electrodes (number of electrode used, if 4, no MUX needed)
            - injection_duration (in seconds)
            - nb_meas (total number of times the sequence will be run)
            - sequence_delay (delay in second between each sequence run)
            - nb_stack (number of stack for each quadrupole measurement)
            - export_path (path where to export the data, timestamp will be added to filename)
            Path to the .json settings file or dictionary of settings.
        """
        status = False
                if isinstance(settings, dict):
                    self.settings.update(settings)
                        dic = json.load(json_file)
                    self.settings.update(dic)
                self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings))
                status = True
                self.exec_logger.warning('Unable to update settings.')
                status = False
            self.exec_logger.warning('Settings are missing...')
        return status
    # Properties
    @property
    def sequence(self):
        """Gets sequence"""
        if self._sequence is not None:
            assert isinstance(self._sequence, np.ndarray)
        return self._sequence
    @sequence.setter
    def sequence(self, sequence):
        """Sets sequence"""
        if sequence is not None:
            assert isinstance(sequence, np.ndarray)
            self.use_mux = True
        else:
            self.use_mux = False
        self._sequence = sequence
Olivier Kaufmann's avatar
Olivier Kaufmann committed
VERSION = '2.1.5'

print(colored(r' ________________________________' + '\n' +
              r'|  _  | | | ||  \/  || ___ \_   _|' + '\n' +
              r'| | | | |_| || .  . || |_/ / | |' + '\n' +
              r'| | | |  _  || |\/| ||  __/  | |' + '\n' +
              r'\ \_/ / | | || |  | || |    _| |_' + '\n' +
              r' \___/\_| |_/\_|  |_/\_|    \___/ ', 'red'))
print('Version:', VERSION)
if on_pi:
    print(colored(f'\u2611 Running on {platform} platform', 'green'))
    # TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...)
    #       and emit a warning otherwise
    if not arm64_imports:
        print(colored(f'Warning: Required packages are missing.\n'
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                      f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow'))
    print(colored(f'\u26A0 Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))

current_time = datetime.now()
print(f'local date and time : {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
# for testing
if __name__ == "__main__":
    ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
    if ohmpi.controller is not None:
        ohmpi.controller.loop_forever()