diff --git a/config.py b/config.py index 9345d1d932a00f7059936f24ca02b58112d11fc9..0f7beb5be099a521d2333aa22191e721a3ae7c1b 100644 --- a/config.py +++ b/config.py @@ -21,10 +21,10 @@ OHMPI_CONFIG = { 'board_version': '22.10' } # TODO: add a dictionary with INA models and associated gain values -# CONTROL_CONFIG = { -# 'tcp_port': 5555, -# 'interface': 'mqtt_interface.py' # 'http_interface' -# } +CONTROL_CONFIG = { + 'tcp_port': 5555, + 'interface': 'mqtt_interface.py' # 'http_interface' +} # Execution logging configuration EXEC_LOGGING_CONFIG = { 'logging_level': logging.DEBUG, @@ -80,7 +80,7 @@ MQTT_CONTROL_CONFIG = { 'hostname': mqtt_broker, 'port': 1883, 'qos': 2, - 'retain': False, + 'retain': True, 'keepalive': 60, 'will': None, 'auth': { 'username': 'mqtt_user', 'password': 'mqtt_password' }, @@ -88,5 +88,5 @@ MQTT_CONTROL_CONFIG = { 'protocol': MQTTv31, 'transport': 'tcp', 'client_id': f'{OHMPI_CONFIG["id"]}', - 'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl' + 'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl', } diff --git a/example_simple_measurement.py b/example_simple_measurement.py index 2a5d50939cb20ac9506de31d4cd8bb451a119c2a..ee795b1808cf4fb2bf64484b566d58dc2275b984 100644 --- a/example_simple_measurement.py +++ b/example_simple_measurement.py @@ -16,8 +16,10 @@ k.sequence = np.array([[1,2,3,4]]) # set numpy array of shape (n,4) # k.load_sequence('ABMN.txt') # load sequence from a local file ### Run contact resistance check -k.rs_check() +# k.rs_check() ### Run sequence -k.run_sequence() -# k.interrupt() \ No newline at end of file +k.run_sequence() +# k.run_sequence_async() +# time.sleep(2) +# k.interrupt() \ No newline at end of file diff --git a/http_interface.py b/http_interface.py index bc0e71f3db458c1a3ba2cfab94ff949d2e3edab3..9c43bae70f13ba22942f9f5a0273d1a1c0f41d0b 100644 --- a/http_interface.py +++ b/http_interface.py @@ -23,8 +23,56 @@ publisher_config.pop('ctrl_topic') print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker.")) cmd_id = None +received = False rdic = {} + +# set controller globally as __init__ seem to be called for each request and so we subscribe again each time (=overhead) +controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance +print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) +trials = 0 +trials_max = 10 +broker_connected = False +while trials < trials_max: + try: + controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), + MQTT_CONTROL_CONFIG['auth']['password']) + controller.connect(MQTT_CONTROL_CONFIG['hostname']) + trials = trials_max + broker_connected = True + except Exception as e: + print(f'Unable to connect control broker: {e}') + print('trying again to connect to control broker...') + time.sleep(2) + trials += 1 +if broker_connected: + print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") + controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) +else: + print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") + controller = None + + +# start a listener for acknowledgement +def _control(): + def on_message(client, userdata, message): + global cmd_id, rdic, received + + command = json.loads(message.payload.decode('utf-8')) + #print('++++', cmd_id, received, command) + if ('reply' in command.keys()) and (command['cmd_id'] == cmd_id): + print(f'Acknowledgement reception of command {command} by OhmPi') + # print('oooooooooook', command['reply']) + received = True + #rdic = command + + controller.on_message = on_message + controller.loop_forever() + +t = threading.Thread(target=_control) +t.start() + + class MyServer(SimpleHTTPRequestHandler): # because we use SimpleHTTPRequestHandler, we do not need to implement # the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to) @@ -42,60 +90,44 @@ class MyServer(SimpleHTTPRequestHandler): def __init__(self, request, client_address, server): super().__init__(request, client_address, server) - # set controller - self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance - print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) - trials = 0 - trials_max = 10 - broker_connected = False - while trials < trials_max: - try: - self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), - MQTT_CONTROL_CONFIG['auth']['password']) - self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) - trials = trials_max - broker_connected = True - except Exception as e: - print(f'Unable to connect control broker: {e}') - print('trying again to connect to control broker...') - time.sleep(2) - trials += 1 - if broker_connected: - print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") - self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) - else: - print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") - self.controller = None - self.cmd_thread = threading.Thread(target=self._control) - - def _control(self): - def on_message(client, userdata, message): - global cmd_id, rdic - - command = message.payload.decode('utf-8') - print(f'Received command {command}') - # self.process_commands(command) - if 'reply' in command.keys and command['cmd_id'] == cmd_id : - rdic = command - - self.controller.on_message = on_message - self.controller.loop_start() - while True: - time.sleep(.1) + # global controller, once # using global variable otherwise, we subscribe to client for EACH request + # if once: + # self.controller = controller + # self.cmd_thread = threading.Thread(target=self._control) + # self.cmd_thread.start() + # once = False + + + # we would like to listen to the ackn topic to check our message has been wel received + # by the OhmPi, however, this won't work as it seems an instance of MyServer is created + # each time (actually it's not a server but a requestHandler) + # def _control(self): + # def on_message(client, userdata, message): + # global cmd_id, rdic + + # command = json.loads(message.payload.decode('utf-8')) + # print(f'Acknowledgement reception of command {command} by OhmPi') + # if 'reply' in command.keys() and command['cmd_id'] == cmd_id : + # print('oooooooooook', command['reply']) + # #rdic = command + + # self.controller.on_message = on_message + # print('starting loop') + # self.controller.loop_forever() + # print('forever') def do_POST(self): - global cmd_id, rdic + global cmd_id, rdic, received + received = False cmd_id = uuid.uuid4().hex - # global socket - - # global ohmpiThread, status, run dic = json.loads(self.rfile.read(int(self.headers['Content-Length']))) + #print('++', dic, cmd_id) rdic = {} # response dictionary - if dic['cmd'] == 'run_sequence': - payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_sequence'}) + if dic['cmd'] == 'run_multiple_sequences': + payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'}) publish.single(payload=payload, **publisher_config) + elif dic['cmd'] == 'interrupt': - # ohmpi.stop() payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'}) publish.single(payload=payload, **publisher_config) elif dic['cmd'] == 'getData': @@ -103,9 +135,9 @@ class MyServer(SimpleHTTPRequestHandler): fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] ddic = {} for fname in fnames: - if (fname.replace('.csv', '') not in dic['surveyNames'] - and fname != 'readme.txt' - and '_rs' not in fname): + if ((fname != 'readme.txt') + and ('_rs' not in fname) + and (fname.replace('.csv', '') not in dic['surveyNames'])): df = pd.read_csv('data/' + fname) ddic[fname.replace('.csv', '')] = { 'a': df['A'].tolist(), diff --git a/index.html b/index.html index db104d009f47532bc3ed3fcbdd304433b72c4496..70a2a2da71a4ede06ddc949424bdaa54a2a1a55d 100644 --- a/index.html +++ b/index.html @@ -26,7 +26,7 @@ <div class="form-check"> <input id="dataRetrievalCheck" class="form-check-input" type="checkbox" value=""> <label class="form-check-label" for="dataRetrievalCheck"> - Automaticaly get data every 1 secondStart + Automaticaly get data every 1 second </label> </div> <div id='output'>Status: idle</div> @@ -34,8 +34,8 @@ <!-- Pseudo section --> <select id='surveySelect' class='custom-select'> </select> - <input id="cmin" type="number" value="0"/> - <input id="cmax" type="number" value="150"/> + <input id="cmin" type="number" value=""/> + <input id="cmax" type="number" value=""/> <button id="capplyBtn" type="button" class="btn btn-info">Apply</button> <div id="gd"></div> <div class="mb3 row"> @@ -163,9 +163,9 @@ // run button function runBtnFunc() { - sendCommand('{"cmd": "run_sequence"}', function(x) { - console.log(x['status']) - if (x['status'] == 'running') { + sendCommand('{"cmd": "run_multiple_sequences"}', function(x) { + console.log(x['ohmpi_status']) + if (x['ohmpi_status'] == 'running') { output.innerHTML = 'Status: measuring...' } }) @@ -176,7 +176,7 @@ // interrupt button function stopBtnFunc() { sendCommand('{"cmd": "interrupt"}', function(x) { - output.innerHTML = 'Status: ' + x['status'] + output.innerHTML = 'Status: ' + x['ohmpi_status'] clearInterval(interv) getData() }) @@ -375,13 +375,14 @@ // getData function getData() { + let surveyNames = [] sendCommand(JSON.stringify({ 'cmd': 'getData', - 'surveyNames': Object.keys(data).slice(0, -1) + 'surveyNames': surveyNames // last survey is often partial so we download it again }), function(ddic) { // update status - output.innerHTML = 'Status: ' + ddic['status'] + //output.innerHTML = 'Status: ' + ddic['status'] // update data dic with new data data = { // destructuring assignement (magic! :o) @@ -389,8 +390,8 @@ ...ddic['data'] // value from second dic are preferred } - // dropdown with number of surveys and +++ - let surveyNames = Object.keys(data).sort() + // dropdown with number of surveys + surveyNames = Object.keys(data).sort() // remove listener as we will replace the choices surveySelect.removeEventListener('change', surveySelectFunc) @@ -417,18 +418,20 @@ // update list of quadrupoles if any if (quads.length == 0) { console.log('updating list of quadrupoles') - let df = data[surveyNames[0]] - let quadSelect = document.getElementById('quadSelect') - quadSelect.innerHTML = '' - for (let i = 0; i < df['a'].length; i++) { - quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]] - quads.push(quad) - let option = document.createElement('option') - option.value = quad.join(', ') - option.innerText = quad.join(', ') - quadSelect.appendChild(option) + if (surveyNames.length > 0) { + let df = data[surveyNames[0]] + let quadSelect = document.getElementById('quadSelect') + quadSelect.innerHTML = '' + for (let i = 0; i < df['a'].length; i++) { + quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]] + quads.push(quad) + let option = document.createElement('option') + option.value = quad.join(', ') + option.innerText = quad.join(', ') + quadSelect.appendChild(option) + } + console.log('quads=', quads) } - console.log('quads=', quads) } // update time-serie figure @@ -499,7 +502,7 @@ function removeDataBtnFunc() { sendCommand('{"cmd": "removeData"}',function(x) { data = {} - output.innerHTML = 'Status: ' + x['status'] + ' (all data cleared)' + output.innerHTML = 'Status: ' + x['ohmpi_status'] + ' (all data cleared)' console.log('all data removed') }) } diff --git a/logging_setup.py b/logging_setup.py index 1cc4a9c45a3cdfb210874e1e6535da31ddc15894..ee0200ad9dcaeebd2d9350aeca3350c0286d5cd3 100644 --- a/logging_setup.py +++ b/logging_setup.py @@ -37,7 +37,7 @@ def setup_loggers(mqtt=True): interval=EXEC_LOGGING_CONFIG['interval']) exec_formatter = logging.Formatter(log_format) exec_formatter.converter = gmtime - exec_formatter.datefmt = '%Y/%m/%d %H:%M:%S UTC' + exec_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' exec_handler.setFormatter(exec_formatter) exec_logger.addHandler(exec_handler) exec_logger.setLevel(EXEC_LOGGING_CONFIG['logging_level']) @@ -73,7 +73,7 @@ def setup_loggers(mqtt=True): interval=DATA_LOGGING_CONFIG['interval']) data_formatter = logging.Formatter(log_format) data_formatter.converter = gmtime - data_formatter.datefmt = '%Y/%m/%d %H:%M:%S UTC' + data_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC' data_handler.setFormatter(exec_formatter) data_logger.addHandler(data_handler) data_logger.setLevel(DATA_LOGGING_CONFIG['logging_level']) diff --git a/ohmpi.py b/ohmpi.py index ed1f456ca4712a2dbeccc58f1cedb6194a4d430c..c8ab1074fbd6e4b33a385b70c21af2712b2621b7 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -4,7 +4,7 @@ created on January 6, 2020. Updates May 2022, Oct 2022. Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed by Rémi CLEMENT (INRAE),Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR) -Olivier KAUFMANN (UMONS), Arnaud WATELET (UMONS) and Guillaume BLANCHY (ILVO). +Olivier KAUFMANN (UMONS), Arnaud WATELET (UMONS) and Guillaume BLANCHY (FNRS/ULiege). """ import os @@ -19,7 +19,6 @@ from io import StringIO from datetime import datetime from termcolor import colored import threading -import paho.mqtt.client as mqtt_client from logging_setup import setup_loggers from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG @@ -79,31 +78,6 @@ class OhmPi(object): print(colored(f'Data logger {self.data_logger.handlers if self.data_logger is not None else "None"}', 'blue')) print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue')) - # set controller - self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance - print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) - trials = 0 - trials_max = 10 - broker_connected = False - while trials < trials_max: - try: - self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), - MQTT_CONTROL_CONFIG['auth']['password']) - self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) - trials = trials_max - broker_connected = True - except Exception as e: - self.exec_logger.debug(f'Unable to connect control broker: {e}') - self.exec_logger.info('trying again to connect to control broker...') - time.sleep(2) - trials += 1 - if broker_connected: - self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") - self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) - else: - self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") - self.controller = None - # read in hardware parameters (config.py) self._read_hardware_config() @@ -164,10 +138,90 @@ class OhmPi(object): self.pin1.direction = Direction.OUTPUT self.pin1.value = False - # Starts the command processing thread - self.cmd_listen = True - self.cmd_thread = threading.Thread(target=self._control) - self.cmd_thread.start() + # set controller + self.mqtt = mqtt + self.cmd_id = None + if mqtt: + import paho.mqtt.client as mqtt_client # if we don't use MQTT but just Python API, we don't need to install it to start the ohmpi.py + import paho.mqtt.publish as publish + print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) + + # ISSUE: the below code works but leads to miss message sometimes...?? + # self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance + # def on_connect(client, userdata, flags, rc): + # if rc == 0: + # print('Connected to MQTT Broker') + # else: + # print("Failed to connect, return code %d\n", rc) + # self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), + # MQTT_CONTROL_CONFIG['auth']['password']) + # self.controller.on_connect = on_connect + # self.controller.connect(MQTT_CONTROL_CONFIG['hostname'], 1883) + + # the below code works much better and solve the issue with missing messages that + # the above code creates (hint: maybe binding to the class with self. is the issue) + def connect_mqtt() -> mqtt_client: + def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Connected to MQTT Broker!") + else: + print("Failed to connect, return code %d\n", rc) + + client = mqtt_client.Client("ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) + client.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), + MQTT_CONTROL_CONFIG['auth']['password']) + client.on_connect = on_connect + client.connect(MQTT_CONTROL_CONFIG['hostname'], 1883) + return client + + self.controller = connect_mqtt() + + # trials = 0 + # trials_max = 10 + # broker_connected = False + # while trials < trials_max: + # try: + # self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) + # trials = trials_max + # broker_connected = True + # except Exception as e: + # print(f'Unable to connect control broker: {e}') + # print('trying again to connect to control broker...') + # time.sleep(2) + # trials += 1 + # if broker_connected: + # print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") + self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) + + # reply 'ok' to the client upon reception of the command + # so we are sure the command has been received + publisher_config = MQTT_CONTROL_CONFIG.copy() + publisher_config['qos'] = 2 + publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic'] + publisher_config.pop('ctrl_topic') + + def on_message(client, userdata, message): + print(message.payload.decode('utf-8')) + command = message.payload.decode('utf-8') + dic = json.loads(command) + if dic['cmd_id'] != self.cmd_id: + self.cmd_id = dic['cmd_id'] + self.exec_logger.debug(f'Received command {command}') + payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'}) + publish.single(payload=payload, **publisher_config) + self._process_commands(command) + + self.controller.on_message = on_message + + # else: + # print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") + # self.controller = None + +# if False: +# # Starts the command processing thread +# self.cmd_listen = True +# self.cmd_thread = threading.Thread(target=self._control) +# self.cmd_thread.start() @property def sequence(self): @@ -184,31 +238,31 @@ class OhmPi(object): else: self.use_mux = False self._sequence = sequence - - def _control(self): - def on_message(client, userdata, message): - command = message.payload.decode('utf-8') - self.exec_logger.debug(f'Received command {command}') - self._process_commands(command) - - self.controller.on_message = on_message - self.controller.loop_start() - while True: - time.sleep(.5) + + # def _control(self): # ISSUE: somehow not ALL message are catch by this method in a thread + # # this might be due to the thread... -> that means we can miss commands! + # def on_message(client, userdata, message): + # command = message.payload.decode('utf-8') + # self.exec_logger.debug(f'Received command {command}') + # self._process_commands(command) + + # self.controller.on_message = on_message + # self.controller.loop_start() + # while True: + # time.sleep(.5) def _update_acquisition_settings(self, config): warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) self.update_settings(config) def update_settings(self, config): - """Update acquisition settings from a json file or dictionary. - Parameters can be: - - nb_electrodes (number of electrode used, if 4, no MUX needed) - - injection_duration (in seconds) - - nb_meas (total number of times the sequence will be run) - - sequence_delay (delay in second between each sequence run) - - nb_stack (number of stack for each quadrupole measurement) - - export_path (path where to export the data, timestamp will be added to filename) + """Update acquisition settings from a json file or dictionary. Parameters can be: + - nb_electrodes (number of electrode used, if 4, no MUX needed) + - injection_duration (in seconds) + - nb_meas (total number of times the sequence will be run) + - sequence_delay (delay in second between each sequence run) + - nb_stack (number of stack for each quadrupole measurement) + - export_path (path where to export the data, timestamp will be added to filename) Parameters ---------- @@ -381,7 +435,7 @@ class OhmPi(object): else: mcp2.get_pin(relay_nr - 1).value = False - self.exec_logger.debug(f'Switching relay {relay_nr} {state} for electrode {electrode_nr}') + self.exec_logger.debug(f'Switching relay {relay_nr} ({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') else: self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') @@ -429,11 +483,13 @@ class OhmPi(object): Parameters ---------- - channel: + channel : object + Instance of ADS where voltage is measured. Returns ------- - float + gain : float + Gain to be applied on ADS1115. """ gain = 2 / 3 if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.023): @@ -444,7 +500,7 @@ class OhmPi(object): gain = 8 elif abs(channel.voltage) < 0.256: gain = 16 - self.exec_logger.debug(f'Setting gain to {gain}') + #self.exec_logger.debug(f'Setting gain to {gain}') return gain def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5): @@ -588,7 +644,8 @@ class OhmPi(object): def run_measurement(self, quad=None, nb_stack=None, injection_duration=None, - autogain=True, strategy='constant', tx_volt=5, best_tx_injtime=0.1): + autogain=True, strategy='constant', tx_volt=5, best_tx_injtime=0.1, + cmd_id=None): """Do a 4 electrode measurement and measure transfer resistance obtained. Parameters @@ -712,7 +769,7 @@ class OhmPi(object): else: self.pin0.value = False self.pin1.value = True # current injection nr2 - self.exec_logger.debug(str(n) + ' ' + str(self.pin0.value) + ' ' + str(self.pin1.value)) + self.exec_logger.debug('Stack ' + str(n) + ' ' + str(self.pin0.value) + ' ' + str(self.pin1.value)) # measurement of current i and voltage u during injection meas = np.zeros((self.nb_samples, 3)) * np.nan @@ -843,17 +900,6 @@ class OhmPi(object): d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], 'R [ohm]': np.abs(np.random.randn(1)).tolist()} - # round number to two decimal for nicer string output - output = [f'{k}\t' for k in d.keys()] - output = str(output)[:-1] + '\n' - for k in d.keys(): - if isinstance(d[k], float): - val = np.round(d[k], 2) - else: - val = d[k] - output += f'{val}\t' - output = output[:-1] - # to the data logger dd = d.copy() dd.pop('fulldata') # too much for logger @@ -861,7 +907,14 @@ class OhmPi(object): dd.update({'B': str(dd['B'])}) dd.update({'M': str(dd['M'])}) dd.update({'N': str(dd['N'])}) - self.data_logger.info(json.dumps(dd)) + + # round float to 2 decimal + for key in dd.keys(): + if isinstance(dd[key], float): + dd[key] = np.round(dd[key], 3) + + dd['cmd_id'] = str(cmd_id) + self.data_logger.info(dd) return d @@ -892,49 +945,47 @@ class OhmPi(object): # self.run = True self.status = 'running' - if self.on_pi: - # make sure all mux are off to start with - self.reset_mux() + # make sure all mux are off to start with + self.reset_mux() - # measure all quad of the RS sequence - for i in range(0, quads.shape[0]): - quad = quads[i, :] # quadrupole - self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c) - d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=1, tx_volt=tx_volt, autogain=False) + # measure all quad of the RS sequence + for i in range(0, quads.shape[0]): + quad = quads[i, :] # quadrupole + self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c) + d = self.run_measurement(quad=quad, nb_stack=1, injection_duration=0.25, tx_volt=tx_volt, autogain=False) - if self.idps: - voltage = tx_volt * 1000. # imposed voltage on dps5005 - else: - voltage = d['Vmn [mV]'] - current = d['I [mA]'] - - # compute resistance measured (= contact resistance) - resist = abs(voltage / current) /1000. - #print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format( - # current, voltage, resist)) - msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \ - f'V: {voltage :>10.3f} mV, ' \ - f'R: {resist :>10.3f} kOhm' - - self.exec_logger.debug(msg) - - # if contact resistance = 0 -> we have a short circuit!! - if resist < 1e-5: - msg = '!!!SHORT CIRCUIT!!! {:s}: {:.3f} kOhm'.format( - str(quad), resist) - self.exec_logger.warning(msg) - print(msg) - - # save data and print in a text file - self.append_and_save(export_path_rs, { - 'A': quad[0], - 'B': quad[1], - 'RS [kOhm]': resist, - }) - - # close mux path and put pin back to GND - self.switch_mux_off(quad) - self.reset_mux() + if self.idps: + voltage = tx_volt * 1000. # imposed voltage on dps5005 + else: + voltage = d['Vmn [mV]'] + current = d['I [mA]'] + + # compute resistance measured (= contact resistance) + resist = abs(voltage / current) /1000. + #print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format( + # current, voltage, resist)) + msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \ + f'V: {voltage :>10.3f} mV, ' \ + f'R: {resist :>10.3f} kOhm' + + self.exec_logger.debug(msg) + + # if contact resistance = 0 -> we have a short circuit!! + if resist < 1e-5: + msg = '!!!SHORT CIRCUIT!!! {:s}: {:.3f} kOhm'.format( + str(quad), resist) + self.exec_logger.warning(msg) + print(msg) + + # save data and print in a text file + self.append_and_save(export_path_rs, { + 'A': quad[0], + 'B': quad[1], + 'RS [kOhm]': resist, + }) + + # close mux path and put pin back to GND + self.switch_mux_off(quad) else: pass self.status = 'idle' @@ -993,6 +1044,7 @@ class OhmPi(object): ------- """ + print('yyyy', command) try: cmd_id = None decoded_message = json.loads(command) @@ -1012,10 +1064,18 @@ class OhmPi(object): except Exception as e: self.exec_logger.warning(f'Unable to set sequence: {e}') status = False - elif cmd == 'run_sequence': - self.run_sequence(cmd_id) - while not self.status == 'idle': - time.sleep(0.1) + elif cmd == 'run_sequence': + self.run_sequence(cmd_id=cmd_id) + elif cmd == 'run_sequence_async': + self.run_sequence_async(cmd_id=cmd_id) + #while not self.status == 'idle': # idem for async, we need to return immediately otherwise + # the interrupt command cannot be processed + # time.sleep(0.1) + status = True + elif cmd == 'run_multiple_sequences': + self.run_multiple_sequences(cmd_id=cmd_id) + #while not self.status == 'idle': # we cannot do that as it's supposed to be an asynchrone command + # time.sleep(0.1) status = True elif cmd == 'interrupt': self.interrupt() @@ -1040,14 +1100,10 @@ class OhmPi(object): self.exec_logger.warning(f'Unable to decode command {command}: {e}') status = False finally: - reply = {'cmd_id': cmd_id, 'status': status} + reply = {'cmd_id': cmd_id, 'status':status, 'ohmpi_status': self.status} reply = json.dumps(reply) self.exec_logger.debug(f'Execution report: {reply}') - def measure(self, *args, **kwargs): - warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) - self.run_sequence(self, *args, **kwargs) - def set_sequence(self, args): try: self.sequence = np.loadtxt(StringIO(args)).astype('uint32') @@ -1056,14 +1112,13 @@ class OhmPi(object): self.exec_logger.warning(f'Unable to set sequence: {e}') status = False - def run_sequence(self, cmd_id=None, **kwargs): - """Run sequence in sync mode + def run_sequence(self, **kwargs): + """Run sequence synchronously (=blocking on main thread). + Additional arguments are passed to run_measurement(). """ self.status = 'running' self.exec_logger.debug(f'Status: {self.status}') self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - - t0 = time.time() # create filename with timestamp filename = self.settings["export_path"].replace('.csv', @@ -1090,169 +1145,68 @@ class OhmPi(object): self.switch_mux_on(quad) # run a measurement - if self.on_pi: - acquired_data = self.run_measurement(quad, **kwargs) - else: # for testing, generate random data - acquired_data = { - 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], - 'R [ohm]': np.abs(np.random.randn(1)) - } - + acquired_data = self.run_measurement(quad, **kwargs) + # switch mux off self.switch_mux_off(quad) - # add command_id in dataset - acquired_data.update({'cmd_id': cmd_id}) - # log data to the data logger - self.data_logger.info(f'{acquired_data}') - print(f'{acquired_data}') # save data and print in a text file self.append_and_save(filename, acquired_data) self.exec_logger.debug(f'{i+1:d}/{n:d}') self.status = 'idle' - def run_sequence_async(self, cmd_id=None, **kwargs): + def run_sequence_async(self, **kwargs): """ Run the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. + Additional arguments are passed to run_measurement(). """ - # self.run = True - self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - def func(): - # if self.status != 'running': - # self.exec_logger.warning('Data acquisition interrupted') - # break - t0 = time.time() - - # create filename with timestamp - filename = self.settings["export_path"].replace('.csv', - f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') - self.exec_logger.debug(f'Saving to {filename}') - - # make sure all multiplexer are off - self.reset_mux() - - # measure all quadrupole of the sequence - if self.sequence is None: - n = 1 - else: - n = self.sequence.shape[0] - for i in range(0, n): - if self.sequence is None: - quad = np.array([0, 0, 0, 0]) - else: - quad = self.sequence[i, :] # quadrupole - if self.status == 'stopping': - break - - # call the switch_mux function to switch to the right electrodes - self.switch_mux_on(quad) - - # run a measurement - if self.on_pi: - acquired_data = self.run_measurement(quad, **kwargs) - else: # for testing, generate random data - acquired_data = { - 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], - 'R [ohm]': np.abs(np.random.randn(1)) - } - - # switch mux off - self.switch_mux_off(quad) - - # add command_id in dataset - acquired_data.update({'cmd_id': cmd_id}) - # log data to the data logger - self.data_logger.info(f'{acquired_data}') - print(f'{acquired_data}') - # save data and print in a text file - self.append_and_save(filename, acquired_data) - self.exec_logger.debug(f'{i+1:d}/{n:d}') - - self.status = 'idle' - + self.run_sequence(**kwargs) + self.thread = threading.Thread(target=func) self.thread.start() + self.status = 'idle' + + def measure(self, *args, **kwargs): + warnings.warn('This function is deprecated. Use run_multiple_sequences() instead.', DeprecationWarning) + self.run_multiple_sequences(self, *args, **kwargs) - def run_multiple_sequences(self, cmd_id=None, **kwargs): + def run_multiple_sequences(self, sequence_delay=None, nb_meas=None, **kwargs): """ Run multiple sequences in a separate thread for monitoring mode. Can be stopped by 'OhmPi.interrupt()'. + Additional arguments are passed to run_measurement(). + + Parameters + ---------- + sequence_delay : int, optional + Number of seconds at which the sequence must be started from each others. + nb_meas : int, optional + Number of time the sequence must be repeated. + kwargs : dict, optional + See help(k.run_measurement) for more info. """ # self.run = True + if sequence_delay is None: + sequence_delay = self.settings['sequence_delay'] + sequence_delay = int(sequence_delay) + if nb_meas is None: + nb_meas = self.settings['nb_meas'] self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - def func(): - for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring - if self.status != 'running': + for g in range(0, nb_meas): # for time-lapse monitoring + if self.status == 'stopping': self.exec_logger.warning('Data acquisition interrupted') break t0 = time.time() - - # create filename with timestamp - filename = self.settings["export_path"].replace('.csv', - f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') - self.exec_logger.debug(f'Saving to {filename}') - - # make sure all multiplexer are off - self.reset_mux() - - # measure all quadrupole of the sequence - if self.sequence is None: - n = 1 - else: - n = self.sequence.shape[0] - for i in range(0, n): - if self.sequence is None: - quad = np.array([0, 0, 0, 0]) - else: - quad = self.sequence[i, :] # quadrupole - if self.status == 'stopping': - break - - # call the switch_mux function to switch to the right electrodes - self.switch_mux_on(quad) - - # run a measurement - if self.on_pi: - acquired_data = self.run_measurement(quad, **kwargs) - else: # for testing, generate random data - acquired_data = { - 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], - 'R [ohm]': np.abs(np.random.randn(1)) - } - - # switch mux off - self.switch_mux_off(quad) - - # add command_id in dataset - acquired_data.update({'cmd_id': cmd_id}) - # log data to the data logger - self.data_logger.info(f'{acquired_data}') - print(f'{acquired_data}') - # save data and print in a text file - self.append_and_save(filename, acquired_data) - self.exec_logger.debug(f'{i+1:d}/{n:d}') - - # compute time needed to take measurement and subtract it from interval - # between two sequence run (= sequence_delay) - measuring_time = time.time() - t0 - sleep_time = self.settings["sequence_delay"] - measuring_time - - if sleep_time < 0: - # it means that the measuring time took longer than the sequence delay - sleep_time = 0 - self.exec_logger.warning('The measuring time is longer than the sequence delay. ' - 'Increase the sequence delay') - + self.run_sequence(**kwargs) + # sleeping time between sequence - if self.settings["nb_meas"] > 1: - time.sleep(sleep_time) # waiting for next measurement (time-lapse) + dt = sequence_delay - (time.time() - t0) + if dt < 0: + dt = 0 + if nb_meas > 1: + time.sleep(dt) # waiting for next measurement (time-lapse) self.status = 'idle' - self.thread = threading.Thread(target=func) self.thread.start() @@ -1264,6 +1218,7 @@ class OhmPi(object): """ Interrupt the acquisition. """ self.status = 'stopping' if self.thread is not None: + print('joining thread') self.thread.join() self.exec_logger.debug(f'Status: {self.status}') @@ -1308,3 +1263,4 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S")) # for testing if __name__ == "__main__": ohmpi = OhmPi(settings=OHMPI_CONFIG['settings']) + ohmpi.controller.loop_forever() diff --git a/ohmpi_settings.json b/ohmpi_settings.json index 8d4907d7a2f3c971bec8dfd529cfbec10aa7bb7b..e839f5fb9a4ddef894a5a8b714757b71cd860c02 100644 --- a/ohmpi_settings.json +++ b/ohmpi_settings.json @@ -3,6 +3,6 @@ "injection_duration": 0.2, "nb_stack": 1, "nb_meas": 1, - "sequence_delay": 1, + "sequence_delay": 120, "export_path": "data/measurement.csv" } diff --git a/test_ohmpi_mux.py b/test_ohmpi_mux.py new file mode 100644 index 0000000000000000000000000000000000000000..ed3222f6b4cd2e79c282d71f9e025f1f57455181 --- /dev/null +++ b/test_ohmpi_mux.py @@ -0,0 +1,144 @@ +# test ohmpi and multiplexer on test resistances +from ohmpi import OhmPi +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import os +import time + +# configure testing +idps = False +board_version = '22.10' # v2.0 +use_mux = True +start_elec = 17 # start elec +nelec = 16 # max elec in the sequence for testing + +# nelec electrodes Wenner sequence +a = np.arange(nelec-3) + start_elec +b = a + 3 +m = a + 1 +n = a + 2 +seq = np.c_[a, b, m, n] + +# testing measurement board only +k = OhmPi(idps=idps, use_mux=use_mux) +k.sequence = seq # we need a sequence for possibly switching the mux +k.reset_mux() # just for safety +if use_mux: + k.switch_mux_on(seq[:, 0]) +out1 = k.run_measurement(injection_duration=0.25, nb_stack=4, strategy='constant', tx_volt=12, autogain=True) +out2 = k.run_measurement(injection_duration=0.5, nb_stack=2, strategy='vmin', tx_volt=5, autogain=True) +out3 = k.run_measurement(injection_duration=1, nb_stack=1, strategy='vmax', tx_volt=5, autogain=True) +if use_mux: + k.switch_mux_off(seq[:, 0]) + +# visual figure of the full wave form +fig, axs = plt.subplots(2, 1, sharex=True) +ax = axs[0] +labels = ['constant', 'vmin', 'vmax'] +for i, out in enumerate([out1, out2, out3]): + data = out['fulldata'] + inan = ~(np.isnan(out['fulldata']).any(1)) + ax.plot(data[inan,2], data[inan,0], '.-', label=labels[i]) +ax.set_ylabel('Current AB [mA]') +ax.legend() +ax = axs[1] +for i, out in enumerate([out1, out2, out3]): + data = out['fulldata'] + inan = ~(np.isnan(out['fulldata']).any(1)) + ax.plot(data[inan,2], data[inan,1], '.-', label=labels[i]) +ax.set_ylabel('Voltage MN [mV]') +ax.set_xlabel('Time [s]') +fig.savefig('check-fullwave.jpg') + + +# test a sequence +if use_mux: + # manually edit default settings + k.settings['injection_duration'] = 1 + k.settings['nb_stack'] = 1 + #k.settings['nbr_meas'] = 1 + k.sequence = seq + k.reset_mux() + + # set quadrupole manually + k.switch_mux_on(seq[0, :]) + out = k.run_measurement(quad=[3, 3, 3, 3], nb_stack=1, tx_volt=12, strategy='constant', autogain=True) + k.switch_mux_off(seq[0, :]) + print(out) + + # run rs_check() and save data + k.rs_check() # check all electrodes of the sequence + + # check values measured + fname = sorted(os.listdir('data/'))[-1] + print(fname) + dfrs = pd.read_csv('data/' + fname) + fig, ax = plt.subplots() + ax.hist(dfrs['RS [kOhm]']) + ax.set_xticks(np.arange(dfrs.shape[0])) + ax.set_xticklabels(dfrs['A'].astype(str) + ' - ' + + dfrs['B'].astype(str), rotation=90) + ax.set_ylabel('Contact resistances [kOhm]') + fig.tight_layout() + fig.savefig('check-rs.jpg') + + # run sequence synchronously and save data to file + k.run_sequence(nb_stack=1, injection_duration=0.25) + + # check values measured + fname = sorted(os.listdir('data/'))[-1] + print(fname) + df = pd.read_csv('data/' + fname) + fig, ax = plt.subplots() + ax.hist(df['R [ohm]']) + ax.set_ylabel('Transfer resistance [Ohm]') + ax.set_xticks(np.arange(df.shape[0])) + ax.set_xticklabels(df['A'].astype(str) + ',' + + df['B'].astype(str) + ',' + + df['M'].astype(str) + ',' + + df['N'].astype(str), rotation=90) + fig.tight_layout() + fig.savefig('check-r.jpg') + + # run sequence asynchronously and save data to file + k.run_sequence_async(nb_stack=1, injection_duration=0.25) + time.sleep(2) # if too short, it will still be resetting the mux (= no effect) + k.interrupt() # will kill the asynchronous sequence running + + # run a series of asynchronous sequences + k.settings['nb_meas'] = 2 # run the sequence twice + k.run_multiple_sequences(nb_stack=1, injection_duration=0.25) + time.sleep(5) + k.interrupt() + + +# look at the noise frequency with FFT +if False: + from numpy.fft import fft, ifft + + x = data[inan, 1][10:300] + t = np.linspace(0, len(x)*4, len(x)) + sr = 1/0.004 + + X = fft(x) + N = len(X) + n = np.arange(N) + T = N/sr + freq = n/T + + plt.figure(figsize = (12, 6)) + plt.subplot(121) + + plt.stem(freq, np.abs(X), 'b', \ + markerfmt=" ", basefmt="-b") + plt.xlabel('Freq (Hz)') + plt.ylabel('FFT Amplitude |X(freq)|') + #plt.xlim(0, 10) + + plt.subplot(122) + plt.plot(t, ifft(X), 'r') + plt.xlabel('Time (s)') + plt.ylabel('Amplitude') + plt.tight_layout() + plt.show()