ohmpi.py 53.5 KB
Newer Older
            args = decoded_message.pop('args', None)
            status = False
            e = None
            if cmd is not None and cmd_id is not None:
                if cmd == 'update_settings' and args is not None:
                    self.update_settings(args)
                    status = True
                elif cmd == 'set_sequence' and args is not None:
                    try:
                        self.sequence = np.loadtxt(StringIO(args)).astype('uint32')
                        status = True
                    except Exception as e:
                        self.exec_logger.warning(f'Unable to set sequence: {e}')
                        status = False
                elif cmd == 'run_sequence':
                    self.run_sequence(cmd_id)
                    while not self.status == 'idle':
                        time.sleep(0.1)
                    status = True
                elif cmd == 'interrupt':
                    self.interrupt()
                        status = True
                    except Exception as e:
                        self.exec_logger.warning(f'Unable to load sequence: {e}')
                        status = False
                elif cmd == 'rs_check':
                    try:
                        self.rs_check()
                        status = True
                    except Exception as e:
                        print('error====', e)
                        self.exec_logger.warning(f'Unable to run rs-check: {e}')
                    self.exec_logger.warning(f'Unknown command {cmd} - cmd_id: {cmd_id}')
        except Exception as e:
            self.exec_logger.warning(f'Unable to decode command {command}: {e}')
            status = False
        finally:
            reply = {'cmd_id': cmd_id, 'status': status}
            reply = json.dumps(reply)
            self.exec_logger.debug(f'Execution report: {reply}')
    def measure(self, *args, **kwargs):
        warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning)
        self.run_sequence(self, *args, **kwargs)

    def set_sequence(self, args):
        try:
            self.sequence = np.loadtxt(StringIO(args)).astype('uint32')
            status = True
        except Exception as e:
            self.exec_logger.warning(f'Unable to set sequence: {e}')
            status = False

    def run_sequence(self, cmd_id=None, **kwargs):
        """Run sequence in sync mode
        """
        self.status = 'running'
        self.exec_logger.debug(f'Status: {self.status}')
        self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
        
        t0 = time.time()

        # create filename with timestamp
        filename = self.settings["export_path"].replace('.csv',
                                                        f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
        self.exec_logger.debug(f'Saving to {filename}')

        # make sure all multiplexer are off
        self.reset_mux()

        # measure all quadrupole of the sequence
        if self.sequence is None:
            n = 1
        else:
            n = self.sequence.shape[0]
        for i in range(0, n):
            if self.sequence is None:
                quad = np.array([0, 0, 0, 0])
            else:
                quad = self.sequence[i, :]  # quadrupole
            if self.status == 'stopping':
                break

            # call the switch_mux function to switch to the right electrodes
            self.switch_mux_on(quad)

            # run a measurement
            if self.on_pi:
                acquired_data = self.run_measurement(quad, **kwargs)
            else:  # for testing, generate random data
                acquired_data = {
                    'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
                    'R [ohm]': np.abs(np.random.randn(1))
                }

            # switch mux off
            self.switch_mux_off(quad)

            # add command_id in dataset
            acquired_data.update({'cmd_id': cmd_id})
            # log data to the data logger
            self.data_logger.info(f'{acquired_data}')
            print(f'{acquired_data}')
            # save data and print in a text file
            self.append_and_save(filename, acquired_data)
            self.exec_logger.debug(f'{i+1:d}/{n:d}')

        self.status = 'idle'

    def run_sequence_async(self, cmd_id=None, **kwargs):
        """ Run the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
        """
        # self.run = True
        self.status = 'running'
        self.exec_logger.debug(f'Status: {self.status}')
        self.exec_logger.debug(f'Measuring sequence: {self.sequence}')

        def func():
            # if self.status != 'running':
            #    self.exec_logger.warning('Data acquisition interrupted')
            #    break
            t0 = time.time()

            # create filename with timestamp
            filename = self.settings["export_path"].replace('.csv',
                                                            f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
            self.exec_logger.debug(f'Saving to {filename}')

            # make sure all multiplexer are off
            self.reset_mux()

            # measure all quadrupole of the sequence
            if self.sequence is None:
                n = 1
            else:
                n = self.sequence.shape[0]
            for i in range(0, n):
                if self.sequence is None:
                    quad = np.array([0, 0, 0, 0])
                else:
                    quad = self.sequence[i, :]  # quadrupole
                if self.status == 'stopping':
                    break

                # call the switch_mux function to switch to the right electrodes
                self.switch_mux_on(quad)

                # run a measurement
                if self.on_pi:
                    acquired_data = self.run_measurement(quad, **kwargs)
                else:  # for testing, generate random data
                    acquired_data = {
                        'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
                        'R [ohm]': np.abs(np.random.randn(1))
                    }

                # switch mux off
                self.switch_mux_off(quad)

                # add command_id in dataset
                acquired_data.update({'cmd_id': cmd_id})
                # log data to the data logger
                self.data_logger.info(f'{acquired_data}')
                print(f'{acquired_data}')
                # save data and print in a text file
                self.append_and_save(filename, acquired_data)
                self.exec_logger.debug(f'{i+1:d}/{n:d}')

        self.status = 'idle'

        self.thread = threading.Thread(target=func)
        self.thread.start()
        
    def run_multiple_sequences(self, cmd_id=None, **kwargs):
        """ Run multiple sequences in a separate thread for monitoring mode.
            Can be stopped by 'OhmPi.interrupt()'.
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        """
        self.status = 'running'
        self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
            for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring
                    self.exec_logger.warning('Data acquisition interrupted')
                    break
                t0 = time.time()
Guillaume Blanchy's avatar
Guillaume Blanchy committed

                # create filename with timestamp
                filename = self.settings["export_path"].replace('.csv',
                                                                f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
Guillaume Blanchy's avatar
Guillaume Blanchy committed

                # make sure all multiplexer are off
                self.reset_mux()

                # measure all quadrupole of the sequence
                if self.sequence is None:
                    n = 1
                else:
                    n = self.sequence.shape[0]
                for i in range(0, n):
                    if self.sequence is None:
                        quad = np.array([0, 0, 0, 0])
                    else:
                        quad = self.sequence[i, :]  # quadrupole
                    if self.status == 'stopping':
                    # call the switch_mux function to switch to the right electrodes
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                    self.switch_mux_on(quad)

                    # run a measurement
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                        acquired_data = self.run_measurement(quad, **kwargs)
remi.clement@inrae.fr's avatar
remi.clement@inrae.fr committed
                        acquired_data = {
                            'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
                            'R [ohm]': np.abs(np.random.randn(1))
                        }
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                    # switch mux off
                    self.switch_mux_off(quad)
                    acquired_data.update({'cmd_id': cmd_id})
                    self.data_logger.info(f'{acquired_data}')
                    print(f'{acquired_data}')
                    # save data and print in a text file
                    self.append_and_save(filename, acquired_data)
                    self.exec_logger.debug(f'{i+1:d}/{n:d}')
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                # compute time needed to take measurement and subtract it from interval
                # between two sequence run (= sequence_delay)
                measuring_time = time.time() - t0
                sleep_time = self.settings["sequence_delay"] - measuring_time
                if sleep_time < 0:
                    # it means that the measuring time took longer than the sequence delay
                    sleep_time = 0
                    self.exec_logger.warning('The measuring time is longer than the sequence delay. '
                                             'Increase the sequence delay')
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                # sleeping time between sequence
                if self.settings["nbr_meas"] > 1:
                    time.sleep(sleep_time)  # waiting for next measurement (time-lapse)
            self.status = 'idle'
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        self.thread = threading.Thread(target=func)
        self.thread.start()
        warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning)
        self.interrupt()

    def interrupt(self):
        """ Interrupt the acquisition. """
Guillaume Blanchy's avatar
Guillaume Blanchy committed
        if self.thread is not None:
            self.thread.join()

    def quit(self):
        """Quit OhmPi.
        """
        self.cmd_listen = False
        if self.cmd_thread is not None:
            self.cmd_thread.join()
        self.exec_logger.debug(f'Stopped listening to control topic.')
    def restart(self):
        self.exec_logger.info('Restarting pi...')
        os.system('reboot')

Olivier Kaufmann's avatar
Olivier Kaufmann committed
VERSION = '2.1.5'

print(colored(r' ________________________________' + '\n' +
              r'|  _  | | | ||  \/  || ___ \_   _|' + '\n' +
              r'| | | | |_| || .  . || |_/ / | |' + '\n' +
              r'| | | |  _  || |\/| ||  __/  | |' + '\n' +
              r'\ \_/ / | | || |  | || |    _| |_' + '\n' +
              r' \___/\_| |_/\_|  |_/\_|    \___/ ', 'red'))
print('OhmPi start')
print('Version:', VERSION)
platform, on_pi = OhmPi._get_platform()
if on_pi:
    print(colored(f'Running on {platform} platform', 'green'))
    # TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...)
    #       and emit a warning otherwise
    if not arm64_imports:
        print(colored(f'Warning: Required packages are missing.\n'
Guillaume Blanchy's avatar
Guillaume Blanchy committed
                      f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow'))
else:
    print(colored(f'Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))

current_time = datetime.now()
print(current_time.strftime("%Y-%m-%d %H:%M:%S"))

# for testing
if __name__ == "__main__":
    ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])