Commit 670262d9 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Tries to propose a general approach for _process_command

Showing with 50 additions and 73 deletions
+50 -73
......@@ -171,13 +171,14 @@ class OhmPi(object):
@property
def sequence(self):
"""Gets or sets sequence"""
"""Gets sequence"""
if self._sequence is not None:
assert isinstance(self._sequence, np.ndarray)
return self._sequence
@sequence.setter
def sequence(self, sequence):
"""Sets sequence"""
if sequence is not None:
assert isinstance(sequence, np.ndarray)
self.use_mux = True
......@@ -201,7 +202,7 @@ class OhmPi(object):
self.update_settings(config)
def update_settings(self, config):
"""Update acquisition settings from a json file or dictionary.
"""Updates acquisition settings from a json file or dictionary.
Parameters can be:
- nb_electrodes (number of electrode used, if 4, no MUX needed)
- injection_duration (in seconds)
......@@ -215,18 +216,28 @@ class OhmPi(object):
config : str
Path to the .json or dictionary.
"""
if isinstance(config, dict):
self.settings.update(config)
status = False
if config is not None:
try:
if isinstance(config, dict):
self.settings.update(config)
else:
with open(config) as json_file:
dic = json.load(json_file)
self.settings.update(dic)
self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings))
status = True
except Exception as e:
self.exec_logger.warning('Unable to update settings.')
status = False
else:
with open(config) as json_file:
dic = json.load(json_file)
self.settings.update(dic)
self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings))
self.exec_logger.warning('Settings are missing...')
return status
def _read_hardware_config(self):
"""Read hardware configuration from config.py
"""Reads hardware configuration from config.py
"""
from config import OHMPI_CONFIG
self.exec_logger.debug('Getting hardware config')
self.id = OHMPI_CONFIG['id'] # ID of the OhmPi
self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm
self.Imax = OHMPI_CONFIG['Imax'] # maximum current
......@@ -241,8 +252,8 @@ class OhmPi(object):
@staticmethod
def _find_identical_in_line(quads):
"""Find quadrupole where A and B are identical.
If A and B are connected to the same relay, the Pi burns (short-circuit).
"""Finds quadrupole where A and B are identical.
If A and B are connected to the same electrode, the Pi burns (short-circuit).
Parameters
----------
......@@ -256,31 +267,17 @@ class OhmPi(object):
"""
# TODO is this needed for M and N?
# if we have a 1D array (so only 1 quadrupole), make it 2D
# if we have a 1D array (so only 1 quadrupole), make it a 2D array
if len(quads.shape) == 1:
quads = quads[None, :]
output = np.where(quads[:, 0] == quads[:, 1])[0]
# output = []
# if array_object.ndim == 1:
# temp = np.zeros(4)
# for i in range(len(array_object)):
# temp[i] = np.count_nonzero(array_object == array_object[i])
# if any(temp > 1):
# output.append(0)
# else:
# for i in range(len(array_object[:,1])):
# temp = np.zeros(len(array_object[1,:]))
# for j in range(len(array_object[1,:])):
# temp[j] = np.count_nonzero(array_object[i,:] == array_object[i,j])
# if any(temp > 1):
# output.append(i)
return output
@staticmethod
def _get_platform():
"""Get platform name and check if it is a raspberry pi
"""Gets platform name and checks if it is a raspberry pi
Returns
=======
str, bool
......@@ -982,62 +979,42 @@ class OhmPi(object):
w.writeheader()
w.writerow(last_measurement)
def _process_commands(self, command):
def _process_commands(self, message):
""" TODO
Parameters
----------
command
message : str
command arguments
kwargs: str
command keywords and arguments
Returns
-------
"""
try:
cmd_id = None
decoded_message = json.loads(command)
decoded_message = json.loads(message)
cmd_id = decoded_message.pop('cmd_id', None)
cmd = decoded_message.pop('cmd', None)
args = decoded_message.pop('args', None)
args = decoded_message.pop('args', '[]')
args = json.loads(args)
kwargs = decoded_message.pop('kwargs', '{}')
kwargs = json.loads(kwargs)
self.exec_logger.debug(f'Calling method {cmd}({args}, {kwargs})')
status = False
e = None
if cmd is not None and cmd_id is not None:
if cmd == 'update_settings' and args is not None:
self.update_settings(args)
e = None # NOTE: Why this?
if cmd_id is None:
self.exec_logger.warning('You should use a unique identifier for cmd_id')
if cmd is not None:
try:
output = getattr(self, cmd)(*args, **kwargs)
status = True
elif cmd == 'set_sequence' and args is not None:
try:
self.sequence = np.loadtxt(StringIO(args)).astype('uint32')
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False
elif cmd == 'run_sequence':
self.run_sequence(cmd_id)
while not self.status == 'idle':
time.sleep(0.1)
status = True
elif cmd == 'interrupt':
self.interrupt()
status = True
elif cmd == 'load_sequence':
try:
self.load_sequence(args)
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to load sequence: {e}')
status = False
elif cmd == 'rs_check':
try:
self.rs_check()
status = True
except Exception as e:
print('error====', e)
self.exec_logger.warning(f'Unable to run rs-check: {e}')
else:
self.exec_logger.warning(f'Unknown command {cmd} - cmd_id: {cmd_id}')
except Exception as e:
self.exec_logger.error(f'{e}\nUnable to execute {cmd}({args}, {kwargs}')
status = False
except Exception as e:
self.exec_logger.warning(f'Unable to decode command {command}: {e}')
self.exec_logger.warning(f'Unable to decode command {message}: {e}')
status = False
finally:
reply = {'cmd_id': cmd_id, 'status': status}
......@@ -1048,22 +1025,22 @@ class OhmPi(object):
warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning)
self.run_sequence(self, *args, **kwargs)
def set_sequence(self, args):
def set_sequence(self, sequence=sequence):
try:
self.sequence = np.loadtxt(StringIO(args)).astype('uint32')
self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32')
status = True
except Exception as e:
self.exec_logger.warning(f'Unable to set sequence: {e}')
status = False
def run_sequence(self, cmd_id=None, **kwargs):
def run_sequence(self, **kwargs):
""" Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'.
"""
# self.run = True
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
cmd_id = kwargs.pop('cmd_id', None)
def func():
for g in range(0, self.settings["nbr_meas"]): # for time-lapse monitoring
if self.status != 'running':
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment