Commit b668a017 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Merges

Showing with 152 additions and 10 deletions
+152 -10
import os
import numpy as np
import time
os.chdir("/home/pi/OhmPi")
from ohmpi import OhmPi
### Define object from class OhmPi
k = OhmPi()
### Update settings if needed
k.update_settings({"injection_duration":0.2})
### Set or load sequence
k.sequence = np.array([[1,2,3,4]]) # set numpy array of shape (n,4)
# k.set_sequence('1 2 3 4\n2 3 4 5') # call function set_sequence and pass a string
# k.load_sequence('ABMN.txt') # load sequence from a local file
### Run contact resistance check
k.rs_check()
### Run sequence
k.run_sequence()
# k.interrupt()
\ No newline at end of file
...@@ -110,7 +110,7 @@ class OhmPi(object): ...@@ -110,7 +110,7 @@ class OhmPi(object):
# default acquisition settings # default acquisition settings
self.settings = { self.settings = {
'injection_duration': 0.2, 'injection_duration': 0.2,
'nbr_meas': 1, 'nb_meas': 1,
'sequence_delay': 1, 'sequence_delay': 1,
'nb_stack': 1, 'nb_stack': 1,
'export_path': 'data/measurement.csv' 'export_path': 'data/measurement.csv'
...@@ -206,7 +206,7 @@ class OhmPi(object): ...@@ -206,7 +206,7 @@ class OhmPi(object):
Parameters can be: Parameters can be:
- nb_electrodes (number of electrode used, if 4, no MUX needed) - nb_electrodes (number of electrode used, if 4, no MUX needed)
- injection_duration (in seconds) - injection_duration (in seconds)
- nbr_meas (total number of times the sequence will be run) - nb_meas (total number of times the sequence will be run)
- sequence_delay (delay in second between each sequence run) - sequence_delay (delay in second between each sequence run)
- nb_stack (number of stack for each quadrupole measurement) - nb_stack (number of stack for each quadrupole measurement)
- export_path (path where to export the data, timestamp will be added to filename) - export_path (path where to export the data, timestamp will be added to filename)
...@@ -985,9 +985,7 @@ class OhmPi(object): ...@@ -985,9 +985,7 @@ class OhmPi(object):
Parameters Parameters
---------- ----------
message : str message : str
command arguments command and arguments
kwargs: str
command keywords and arguments
Returns Returns
------- -------
...@@ -1034,7 +1032,128 @@ class OhmPi(object): ...@@ -1034,7 +1032,128 @@ class OhmPi(object):
status = False status = False
def run_sequence(self, **kwargs): def run_sequence(self, **kwargs):
""" Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'. """Run sequence in sync mode
"""
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
t0 = time.time()
# create filename with timestamp
filename = self.settings["export_path"].replace('.csv',
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
self.exec_logger.debug(f'Saving to {filename}')
# make sure all multiplexer are off
self.reset_mux()
# measure all quadrupole of the sequence
if self.sequence is None:
n = 1
else:
n = self.sequence.shape[0]
for i in range(0, n):
if self.sequence is None:
quad = np.array([0, 0, 0, 0])
else:
quad = self.sequence[i, :] # quadrupole
if self.status == 'stopping':
break
# call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad)
# run a measurement
if self.on_pi:
acquired_data = self.run_measurement(quad, **kwargs)
else: # for testing, generate random data
acquired_data = {
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
'R [ohm]': np.abs(np.random.randn(1))
}
# switch mux off
self.switch_mux_off(quad)
# add command_id in dataset
acquired_data.update({'cmd_id': cmd_id})
# log data to the data logger
self.data_logger.info(f'{acquired_data}')
print(f'{acquired_data}')
# save data and print in a text file
self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'{i+1:d}/{n:d}')
self.status = 'idle'
def run_sequence_async(self, cmd_id=None, **kwargs):
""" Run the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
"""
# self.run = True
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
def func():
# if self.status != 'running':
# self.exec_logger.warning('Data acquisition interrupted')
# break
t0 = time.time()
# create filename with timestamp
filename = self.settings["export_path"].replace('.csv',
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
self.exec_logger.debug(f'Saving to {filename}')
# make sure all multiplexer are off
self.reset_mux()
# measure all quadrupole of the sequence
if self.sequence is None:
n = 1
else:
n = self.sequence.shape[0]
for i in range(0, n):
if self.sequence is None:
quad = np.array([0, 0, 0, 0])
else:
quad = self.sequence[i, :] # quadrupole
if self.status == 'stopping':
break
# call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad)
# run a measurement
if self.on_pi:
acquired_data = self.run_measurement(quad, **kwargs)
else: # for testing, generate random data
acquired_data = {
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
'R [ohm]': np.abs(np.random.randn(1))
}
# switch mux off
self.switch_mux_off(quad)
# add command_id in dataset
acquired_data.update({'cmd_id': cmd_id})
# log data to the data logger
self.data_logger.info(f'{acquired_data}')
print(f'{acquired_data}')
# save data and print in a text file
self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'{i+1:d}/{n:d}')
self.status = 'idle'
self.thread = threading.Thread(target=func)
self.thread.start()
def run_multiple_sequences(self, cmd_id=None, **kwargs):
""" Run multiple sequences in a separate thread for monitoring mode.
Can be stopped by 'OhmPi.interrupt()'.
""" """
# self.run = True # self.run = True
self.status = 'running' self.status = 'running'
...@@ -1042,7 +1161,7 @@ class OhmPi(object): ...@@ -1042,7 +1161,7 @@ class OhmPi(object):
self.exec_logger.debug(f'Measuring sequence: {self.sequence}') self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
cmd_id = kwargs.pop('cmd_id', None) cmd_id = kwargs.pop('cmd_id', None)
def func(): def func():
for g in range(0, self.settings["nbr_meas"]): # for time-lapse monitoring for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring
if self.status != 'running': if self.status != 'running':
self.exec_logger.warning('Data acquisition interrupted') self.exec_logger.warning('Data acquisition interrupted')
break break
...@@ -1105,7 +1224,7 @@ class OhmPi(object): ...@@ -1105,7 +1224,7 @@ class OhmPi(object):
'Increase the sequence delay') 'Increase the sequence delay')
# sleeping time between sequence # sleeping time between sequence
if self.settings["nbr_meas"] > 1: if self.settings["nb_meas"] > 1:
time.sleep(sleep_time) # waiting for next measurement (time-lapse) time.sleep(sleep_time) # waiting for next measurement (time-lapse)
self.status = 'idle' self.status = 'idle'
......
{ {
"nb_electrodes": 64, "nb_electrodes": 64,
"injection_duration": 0.2, "injection_duration": 0.2,
"nbr_meas": 1,
"sequence_delay": 1,
"nb_stack": 1, "nb_stack": 1,
"nb_meas": 1,
"sequence_delay": 1,
"export_path": "data/measurement.csv" "export_path": "data/measurement.csv"
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment