Newer
Older
self.exec_logger.warning(f'Unable to decode command {message}: {e}')
status = False
finally:
reply = {'cmd_id': cmd_id, 'status': status}
reply = json.dumps(reply)
self.exec_logger.debug(f'Execution report: {reply}')
reply = reply.encode('utf-8')
socket.send(reply)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
pass # no message was ready (yet!)
else:
self.exec_logger.error(f'Unexpected error while process: {e}')

Guillaume Blanchy
committed
"""Run the sequence in a separate thread. Can be stopped by 'OhmPi.stop()'.
"""
self.run = True
self.status = 'running'
self.exec_logger.debug(f'Status: {self.status}')
def func():
for g in range(0, self.settings["nbr_meas"]): # for time-lapse monitoring

Guillaume Blanchy
committed
if self.run is False:
self.exec_logger.warning('Data acquisition interrupted')
break
t0 = time.time()
# create filename with timestamp
filename = self.settings["export_path"].replace('.csv',

Guillaume Blanchy
committed
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
self.exec_logger.debug(f'Saving to {filename}')
# make sure all multiplexer are off
self.reset_mux()
# measure all quadrupole of the sequence
for i in range(0, self.sequence.shape[0]):
quad = self.sequence[i, :] # quadrupole
if self.run is False:
break
# call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad)
# run a measurement
if self.on_pi:

Guillaume Blanchy
committed
else: # for testing, generate random data

Guillaume Blanchy
committed
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]],
'R [ohm]': np.abs(np.random.randn(1))
}
# switch mux off
self.switch_mux_off(quad)
# add command_id in dataset
acquired_data.update({'cmd_id': cmd_id})

Guillaume Blanchy
committed
# log data to the data logger
self.data_logger.info(f'{acquired_data}')
print(f'{acquired_data}')

Guillaume Blanchy
committed
# save data and print in a text file
self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'{i+1:d}/{self.sequence.shape[0]:d}')

Guillaume Blanchy
committed
# compute time needed to take measurement and subtract it from interval
# between two sequence run (= sequence_delay)
measuring_time = time.time() - t0
sleep_time = self.settings["sequence_delay"] - measuring_time

Guillaume Blanchy
committed
if sleep_time < 0:
# it means that the measuring time took longer than the sequence delay
sleep_time = 0
self.exec_logger.warning('The measuring time is longer than the sequence delay. '
'Increase the sequence delay')
# sleeping time between sequence

Guillaume Blanchy
committed
time.sleep(sleep_time) # waiting for next measurement (time-lapse)
self.status = 'idle'
self.thread = threading.Thread(target=func)
self.thread.start()
def stop(self):
"""Stop the acquisition.
"""
self.run = False
if self.thread is not None:
self.thread.join()
self.exec_logger.debug(f'Status: {self.status}')
def quit(self):
"""Quit OhmPi.
"""
self.cmd_listen = False
if self.cmd_thread is not None:
self.cmd_thread.join()
self.exec_logger.debug(f'Stopped listening to tcp port.')
exit()

Guillaume Blanchy
committed
VERSION = '2.1.0'

Guillaume Blanchy
committed
print(colored(r' ________________________________' + '\n' +
r'| _ | | | || \/ || ___ \_ _|' + '\n' +
r'| | | | |_| || . . || |_/ / | |' + '\n' +
r'| | | | _ || |\/| || __/ | |' + '\n' +
r'\ \_/ / | | || | | || | _| |_' + '\n' +
r' \___/\_| |_/\_| |_/\_| \___/ ', 'red'))
print('OhmPi start')
print('Version:', VERSION)
platform, on_pi = OhmPi.get_platform()
if on_pi:
print(colored(f'Running on {platform} platform', 'green'))
# TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...)
# and emit a warning otherwise
if not arm64_imports:
print(colored(f'Warning: Required packages are missing.\n'
f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow'))

Guillaume Blanchy
committed
else:
print(colored(f'Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow'))
current_time = datetime.now()
print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
# for testing
if __name__ == "__main__":
# to run from 'python ohmpi.py'
#ohmpi = OhmPi(config='ohmpi_param.json')
#ohmpi.run_measurement()
#ohmpi.read_quad('breadboard.txt')
#ohmpi.measure()
#time.sleep(20)
#ohmpi.stop()