diff --git a/http_interface.py b/http_interface.py index 2710fa535dc3420dc9de90df6b98e24e07d2b78e..9c43bae70f13ba22942f9f5a0273d1a1c0f41d0b 100644 --- a/http_interface.py +++ b/http_interface.py @@ -125,31 +125,19 @@ class MyServer(SimpleHTTPRequestHandler): rdic = {} # response dictionary if dic['cmd'] == 'run_multiple_sequences': payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'}) - #print('-- payload...', end='') publish.single(payload=payload, **publisher_config) - #print('published!') elif dic['cmd'] == 'interrupt': payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'}) - #for i in range(10): publish.single(payload=payload, **publisher_config) - # time.sleep(.5) - # if received: - # break elif dic['cmd'] == 'getData': - print(dic) # get all .csv file in data folder - fnames = sorted([fname for fname in os.listdir('data/') if fname[-4:] == '.csv']) + fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] ddic = {} - fdownloaded = True - if dic['lastSurvey'] == '0': - fdownloaded = False for fname in fnames: - if (((fname != 'readme.txt') - and ('_rs' not in fname)) - and ((fname.replace('.csv', '') == dic['lastSurvey']) - or (fdownloaded == False))): - fdownloaded = False + if ((fname != 'readme.txt') + and ('_rs' not in fname) + and (fname.replace('.csv', '') not in dic['surveyNames'])): df = pd.read_csv('data/' + fname) ddic[fname.replace('.csv', '')] = { 'a': df['A'].tolist(), diff --git a/index.html b/index.html index 3292a067e68075b9a958ddcf4dad17f531c4ed55..70a2a2da71a4ede06ddc949424bdaa54a2a1a55d 100644 --- a/index.html +++ b/index.html @@ -375,13 +375,10 @@ // getData function getData() { - let lastSurvey = '0' - if (Object.keys(data).length > 1) { - lastSurvey = Object.keys(data).slice(-2, -1) - } + let surveyNames = [] sendCommand(JSON.stringify({ 'cmd': 'getData', - 'lastSurvey': lastSurvey + 'surveyNames': surveyNames // last survey is often partial so we download it again }), function(ddic) { // update status @@ -393,8 +390,8 @@ ...ddic['data'] // value from second dic are preferred } - // dropdown with number of surveys and +++ - let surveyNames = Object.keys(data).sort() + // dropdown with number of surveys + surveyNames = Object.keys(data).sort() // remove listener as we will replace the choices surveySelect.removeEventListener('change', surveySelectFunc) diff --git a/ohmpi.py b/ohmpi.py index 80efeec85ca70a877d2f42870d655e74aad9c302..c8ab1074fbd6e4b33a385b70c21af2712b2621b7 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -78,57 +78,6 @@ class OhmPi(object): print(colored(f'Data logger {self.data_logger.handlers if self.data_logger is not None else "None"}', 'blue')) print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue')) - # set controller - self.mqtt = False - self.cmd_id = None - if mqtt: - self.mqtt = True - import paho.mqtt.client as mqtt_client # if we don't use MQTT but just Python API, we don't need to install it to start the ohmpi.py - import paho.mqtt.publish as publish - self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance - print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) - trials = 0 - trials_max = 10 - broker_connected = False - while trials < trials_max: - try: - self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), - MQTT_CONTROL_CONFIG['auth']['password']) - self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) - trials = trials_max - broker_connected = True - except Exception as e: - self.exec_logger.debug(f'Unable to connect control broker: {e}') - self.exec_logger.info('trying again to connect to control broker...') - time.sleep(2) - trials += 1 - if broker_connected: - self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") - self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) - - # MQTT does not ensure that the message is delivered to the subscribed client - # this is important for us as we want to ensure the command has been sent AND - # received. Hence, upon reception of a command, we publish a message acknowledgement - publisher_config = MQTT_CONTROL_CONFIG.copy() - publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic'] - publisher_config.pop('ctrl_topic') - - def on_message(client, userdata, message): - command = message.payload.decode('utf-8') - dic = json.loads(command) - if dic['cmd_id'] != self.cmd_id: - self.cmd_id = dic['cmd_id'] - self.exec_logger.debug(f'Received command {command}') - payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'}) - publish.single(payload=payload, **publisher_config) - self._process_commands(command) - - self.controller.on_message = on_message - - else: - self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") - self.controller = None - # read in hardware parameters (config.py) self._read_hardware_config() @@ -189,11 +138,90 @@ class OhmPi(object): self.pin1.direction = Direction.OUTPUT self.pin1.value = False - if False: - # Starts the command processing thread - self.cmd_listen = True - self.cmd_thread = threading.Thread(target=self._control) - self.cmd_thread.start() + # set controller + self.mqtt = mqtt + self.cmd_id = None + if mqtt: + import paho.mqtt.client as mqtt_client # if we don't use MQTT but just Python API, we don't need to install it to start the ohmpi.py + import paho.mqtt.publish as publish + print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) + + # ISSUE: the below code works but leads to miss message sometimes...?? + # self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance + # def on_connect(client, userdata, flags, rc): + # if rc == 0: + # print('Connected to MQTT Broker') + # else: + # print("Failed to connect, return code %d\n", rc) + # self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), + # MQTT_CONTROL_CONFIG['auth']['password']) + # self.controller.on_connect = on_connect + # self.controller.connect(MQTT_CONTROL_CONFIG['hostname'], 1883) + + # the below code works much better and solve the issue with missing messages that + # the above code creates (hint: maybe binding to the class with self. is the issue) + def connect_mqtt() -> mqtt_client: + def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Connected to MQTT Broker!") + else: + print("Failed to connect, return code %d\n", rc) + + client = mqtt_client.Client("ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) + client.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), + MQTT_CONTROL_CONFIG['auth']['password']) + client.on_connect = on_connect + client.connect(MQTT_CONTROL_CONFIG['hostname'], 1883) + return client + + self.controller = connect_mqtt() + + # trials = 0 + # trials_max = 10 + # broker_connected = False + # while trials < trials_max: + # try: + # self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) + # trials = trials_max + # broker_connected = True + # except Exception as e: + # print(f'Unable to connect control broker: {e}') + # print('trying again to connect to control broker...') + # time.sleep(2) + # trials += 1 + # if broker_connected: + # print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") + self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) + + # reply 'ok' to the client upon reception of the command + # so we are sure the command has been received + publisher_config = MQTT_CONTROL_CONFIG.copy() + publisher_config['qos'] = 2 + publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic'] + publisher_config.pop('ctrl_topic') + + def on_message(client, userdata, message): + print(message.payload.decode('utf-8')) + command = message.payload.decode('utf-8') + dic = json.loads(command) + if dic['cmd_id'] != self.cmd_id: + self.cmd_id = dic['cmd_id'] + self.exec_logger.debug(f'Received command {command}') + payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'}) + publish.single(payload=payload, **publisher_config) + self._process_commands(command) + + self.controller.on_message = on_message + + # else: + # print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") + # self.controller = None + +# if False: +# # Starts the command processing thread +# self.cmd_listen = True +# self.cmd_thread = threading.Thread(target=self._control) +# self.cmd_thread.start() @property def sequence(self): @@ -211,17 +239,17 @@ class OhmPi(object): self.use_mux = False self._sequence = sequence - def _control(self): # ISSUE: somehow not ALL message are catch by this method in a thread - # this might be due to the thread... -> that means we can miss commands! - def on_message(client, userdata, message): - command = message.payload.decode('utf-8') - self.exec_logger.debug(f'Received command {command}') - self._process_commands(command) - - self.controller.on_message = on_message - self.controller.loop_start() - while True: - time.sleep(.5) + # def _control(self): # ISSUE: somehow not ALL message are catch by this method in a thread + # # this might be due to the thread... -> that means we can miss commands! + # def on_message(client, userdata, message): + # command = message.payload.decode('utf-8') + # self.exec_logger.debug(f'Received command {command}') + # self._process_commands(command) + + # self.controller.on_message = on_message + # self.controller.loop_start() + # while True: + # time.sleep(.5) def _update_acquisition_settings(self, config): warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) @@ -1143,7 +1171,7 @@ class OhmPi(object): warnings.warn('This function is deprecated. Use run_multiple_sequences() instead.', DeprecationWarning) self.run_multiple_sequences(self, *args, **kwargs) - def run_multiple_sequences(self, sequence_delay=None, **kwargs): + def run_multiple_sequences(self, sequence_delay=None, nb_meas=None, **kwargs): """ Run multiple sequences in a separate thread for monitoring mode. Can be stopped by 'OhmPi.interrupt()'. Additional arguments are passed to run_measurement(). @@ -1152,27 +1180,31 @@ class OhmPi(object): ---------- sequence_delay : int, optional Number of seconds at which the sequence must be started from each others. + nb_meas : int, optional + Number of time the sequence must be repeated. kwargs : dict, optional See help(k.run_measurement) for more info. """ # self.run = True - if sequence_delay == None: + if sequence_delay is None: sequence_delay = self.settings['sequence_delay'] sequence_delay = int(sequence_delay) + if nb_meas is None: + nb_meas = self.settings['nb_meas'] self.status = 'running' def func(): - for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring - if self.status != 'running': + for g in range(0, nb_meas): # for time-lapse monitoring + if self.status == 'stopping': self.exec_logger.warning('Data acquisition interrupted') break t0 = time.time() self.run_sequence(**kwargs) - + # sleeping time between sequence dt = sequence_delay - (time.time() - t0) if dt < 0: dt = 0 - if self.settings["nb_meas"] > 1: + if nb_meas > 1: time.sleep(dt) # waiting for next measurement (time-lapse) self.status = 'idle' self.thread = threading.Thread(target=func) @@ -1231,7 +1263,4 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S")) # for testing if __name__ == "__main__": ohmpi = OhmPi(settings=OHMPI_CONFIG['settings']) - def func(): - ohmpi.controller.loop_forever() - t = threading.Thread(target=func) - t.start() + ohmpi.controller.loop_forever() diff --git a/ohmpi_settings.json b/ohmpi_settings.json index 8d4907d7a2f3c971bec8dfd529cfbec10aa7bb7b..e839f5fb9a4ddef894a5a8b714757b71cd860c02 100644 --- a/ohmpi_settings.json +++ b/ohmpi_settings.json @@ -3,6 +3,6 @@ "injection_duration": 0.2, "nb_stack": 1, "nb_meas": 1, - "sequence_delay": 1, + "sequence_delay": 120, "export_path": "data/measurement.csv" }