Commit 43a667b0 authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

more responsive MQTT instance in ohmpi.py

Showing with 115 additions and 101 deletions
+115 -101
......@@ -125,31 +125,19 @@ class MyServer(SimpleHTTPRequestHandler):
rdic = {} # response dictionary
if dic['cmd'] == 'run_multiple_sequences':
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'})
#print('-- payload...', end='')
publish.single(payload=payload, **publisher_config)
#print('published!')
elif dic['cmd'] == 'interrupt':
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'})
#for i in range(10):
publish.single(payload=payload, **publisher_config)
# time.sleep(.5)
# if received:
# break
elif dic['cmd'] == 'getData':
print(dic)
# get all .csv file in data folder
fnames = sorted([fname for fname in os.listdir('data/') if fname[-4:] == '.csv'])
fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv']
ddic = {}
fdownloaded = True
if dic['lastSurvey'] == '0':
fdownloaded = False
for fname in fnames:
if (((fname != 'readme.txt')
and ('_rs' not in fname))
and ((fname.replace('.csv', '') == dic['lastSurvey'])
or (fdownloaded == False))):
fdownloaded = False
if ((fname != 'readme.txt')
and ('_rs' not in fname)
and (fname.replace('.csv', '') not in dic['surveyNames'])):
df = pd.read_csv('data/' + fname)
ddic[fname.replace('.csv', '')] = {
'a': df['A'].tolist(),
......
......@@ -375,13 +375,10 @@
// getData
function getData() {
let lastSurvey = '0'
if (Object.keys(data).length > 1) {
lastSurvey = Object.keys(data).slice(-2, -1)
}
let surveyNames = []
sendCommand(JSON.stringify({
'cmd': 'getData',
'lastSurvey': lastSurvey
'surveyNames': surveyNames
// last survey is often partial so we download it again
}), function(ddic) {
// update status
......@@ -393,8 +390,8 @@
...ddic['data'] // value from second dic are preferred
}
// dropdown with number of surveys and +++
let surveyNames = Object.keys(data).sort()
// dropdown with number of surveys
surveyNames = Object.keys(data).sort()
// remove listener as we will replace the choices
surveySelect.removeEventListener('change', surveySelectFunc)
......
......@@ -78,57 +78,6 @@ class OhmPi(object):
print(colored(f'Data logger {self.data_logger.handlers if self.data_logger is not None else "None"}', 'blue'))
print(colored(f'SOH logger {self.soh_logger.handlers if self.soh_logger is not None else "None"}', 'blue'))
# set controller
self.mqtt = False
self.cmd_id = None
if mqtt:
self.mqtt = True
import paho.mqtt.client as mqtt_client # if we don't use MQTT but just Python API, we don't need to install it to start the ohmpi.py
import paho.mqtt.publish as publish
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except Exception as e:
self.exec_logger.debug(f'Unable to connect control broker: {e}')
self.exec_logger.info('trying again to connect to control broker...')
time.sleep(2)
trials += 1
if broker_connected:
self.exec_logger.info(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
# MQTT does not ensure that the message is delivered to the subscribed client
# this is important for us as we want to ensure the command has been sent AND
# received. Hence, upon reception of a command, we publish a message acknowledgement
publisher_config = MQTT_CONTROL_CONFIG.copy()
publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
publisher_config.pop('ctrl_topic')
def on_message(client, userdata, message):
command = message.payload.decode('utf-8')
dic = json.loads(command)
if dic['cmd_id'] != self.cmd_id:
self.cmd_id = dic['cmd_id']
self.exec_logger.debug(f'Received command {command}')
payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'})
publish.single(payload=payload, **publisher_config)
self._process_commands(command)
self.controller.on_message = on_message
else:
self.exec_logger.error(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
self.controller = None
# read in hardware parameters (config.py)
self._read_hardware_config()
......@@ -189,11 +138,90 @@ class OhmPi(object):
self.pin1.direction = Direction.OUTPUT
self.pin1.value = False
if False:
# Starts the command processing thread
self.cmd_listen = True
self.cmd_thread = threading.Thread(target=self._control)
self.cmd_thread.start()
# set controller
self.mqtt = mqtt
self.cmd_id = None
if mqtt:
import paho.mqtt.client as mqtt_client # if we don't use MQTT but just Python API, we don't need to install it to start the ohmpi.py
import paho.mqtt.publish as publish
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
# ISSUE: the below code works but leads to miss message sometimes...??
# self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
# def on_connect(client, userdata, flags, rc):
# if rc == 0:
# print('Connected to MQTT Broker')
# else:
# print("Failed to connect, return code %d\n", rc)
# self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
# MQTT_CONTROL_CONFIG['auth']['password'])
# self.controller.on_connect = on_connect
# self.controller.connect(MQTT_CONTROL_CONFIG['hostname'], 1883)
# the below code works much better and solve the issue with missing messages that
# the above code creates (hint: maybe binding to the class with self. is the issue)
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client("ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False)
client.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
client.on_connect = on_connect
client.connect(MQTT_CONTROL_CONFIG['hostname'], 1883)
return client
self.controller = connect_mqtt()
# trials = 0
# trials_max = 10
# broker_connected = False
# while trials < trials_max:
# try:
# self.controller.connect(MQTT_CONTROL_CONFIG['hostname'])
# trials = trials_max
# broker_connected = True
# except Exception as e:
# print(f'Unable to connect control broker: {e}')
# print('trying again to connect to control broker...')
# time.sleep(2)
# trials += 1
# if broker_connected:
# print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
# reply 'ok' to the client upon reception of the command
# so we are sure the command has been received
publisher_config = MQTT_CONTROL_CONFIG.copy()
publisher_config['qos'] = 2
publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic']
publisher_config.pop('ctrl_topic')
def on_message(client, userdata, message):
print(message.payload.decode('utf-8'))
command = message.payload.decode('utf-8')
dic = json.loads(command)
if dic['cmd_id'] != self.cmd_id:
self.cmd_id = dic['cmd_id']
self.exec_logger.debug(f'Received command {command}')
payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'})
publish.single(payload=payload, **publisher_config)
self._process_commands(command)
self.controller.on_message = on_message
# else:
# print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
# self.controller = None
# if False:
# # Starts the command processing thread
# self.cmd_listen = True
# self.cmd_thread = threading.Thread(target=self._control)
# self.cmd_thread.start()
@property
def sequence(self):
......@@ -211,17 +239,17 @@ class OhmPi(object):
self.use_mux = False
self._sequence = sequence
def _control(self): # ISSUE: somehow not ALL message are catch by this method in a thread
# this might be due to the thread... -> that means we can miss commands!
def on_message(client, userdata, message):
command = message.payload.decode('utf-8')
self.exec_logger.debug(f'Received command {command}')
self._process_commands(command)
self.controller.on_message = on_message
self.controller.loop_start()
while True:
time.sleep(.5)
# def _control(self): # ISSUE: somehow not ALL message are catch by this method in a thread
# # this might be due to the thread... -> that means we can miss commands!
# def on_message(client, userdata, message):
# command = message.payload.decode('utf-8')
# self.exec_logger.debug(f'Received command {command}')
# self._process_commands(command)
# self.controller.on_message = on_message
# self.controller.loop_start()
# while True:
# time.sleep(.5)
def _update_acquisition_settings(self, config):
warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning)
......@@ -1143,7 +1171,7 @@ class OhmPi(object):
warnings.warn('This function is deprecated. Use run_multiple_sequences() instead.', DeprecationWarning)
self.run_multiple_sequences(self, *args, **kwargs)
def run_multiple_sequences(self, sequence_delay=None, **kwargs):
def run_multiple_sequences(self, sequence_delay=None, nb_meas=None, **kwargs):
""" Run multiple sequences in a separate thread for monitoring mode.
Can be stopped by 'OhmPi.interrupt()'.
Additional arguments are passed to run_measurement().
......@@ -1152,27 +1180,31 @@ class OhmPi(object):
----------
sequence_delay : int, optional
Number of seconds at which the sequence must be started from each others.
nb_meas : int, optional
Number of time the sequence must be repeated.
kwargs : dict, optional
See help(k.run_measurement) for more info.
"""
# self.run = True
if sequence_delay == None:
if sequence_delay is None:
sequence_delay = self.settings['sequence_delay']
sequence_delay = int(sequence_delay)
if nb_meas is None:
nb_meas = self.settings['nb_meas']
self.status = 'running'
def func():
for g in range(0, self.settings["nb_meas"]): # for time-lapse monitoring
if self.status != 'running':
for g in range(0, nb_meas): # for time-lapse monitoring
if self.status == 'stopping':
self.exec_logger.warning('Data acquisition interrupted')
break
t0 = time.time()
self.run_sequence(**kwargs)
# sleeping time between sequence
dt = sequence_delay - (time.time() - t0)
if dt < 0:
dt = 0
if self.settings["nb_meas"] > 1:
if nb_meas > 1:
time.sleep(dt) # waiting for next measurement (time-lapse)
self.status = 'idle'
self.thread = threading.Thread(target=func)
......@@ -1231,7 +1263,4 @@ print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
# for testing
if __name__ == "__main__":
ohmpi = OhmPi(settings=OHMPI_CONFIG['settings'])
def func():
ohmpi.controller.loop_forever()
t = threading.Thread(target=func)
t.start()
ohmpi.controller.loop_forever()
......@@ -3,6 +3,6 @@
"injection_duration": 0.2,
"nb_stack": 1,
"nb_meas": 1,
"sequence_delay": 1,
"sequence_delay": 120,
"export_path": "data/measurement.csv"
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment