From 31dd38879571bb9f02fccdf9ffd81ef1d8a5fb97 Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Tue, 1 Nov 2022 18:04:05 +0100 Subject: [PATCH] Fixes issue #77. Improves args and kwargs handling by _process_command. Reorders method in alphabetical order to ease navigating the code --- config.py | 21 +- ohmpi.py | 1110 +++++++++++++++++++++++++++-------------------------- 2 files changed, 576 insertions(+), 555 deletions(-) diff --git a/config.py b/config.py index d6411920..03ead8f4 100644 --- a/config.py +++ b/config.py @@ -13,15 +13,16 @@ logging_suffix = '' OHMPI_CONFIG = { 'id': ohmpi_id, # Unique identifier of the OhmPi board (string) 'R_shunt': 2, # Shunt resistance in Ohms - 'Imax': 4800/50/2, # Maximum current + 'Imax': 4800 / 50 / 2, # Maximum current 'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V - 'coef_p3': 2.50, # slope for current conversion for ADS.P3, measurement in V/V - 'offset_p2': 0, - 'offset_p3': 0, - 'integer': 2, # Max value 10 # TODO: Explain what this is... - 'version': 2, + # 'coef_p3': 2.50, # slope for current conversion for ADS.P3, measurement in V/V + # 'offset_p2': 0, + # 'offset_p3': 0, + 'nb_samples': 2, # Max value 10 # was named integer before... + 'version': 2, # Is this still needed? 'max_elec': 64, - 'board_addresses': {'A': 0x73, 'B': 0x72, 'M': 0x71, 'N': 0x70}, # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70} + 'board_addresses': {'A': 0x73, 'B': 0x72, 'M': 0x71, 'N': 0x70}, + # def. {'A': 0x76, 'B': 0x71, 'M': 0x74, 'N': 0x70} 'settings': 'ohmpi_settings.json', 'board_version': '22.10' } # TODO: add a dictionary with INA models and associated gain values @@ -29,7 +30,7 @@ OHMPI_CONFIG = { # Execution logging configuration EXEC_LOGGING_CONFIG = { 'logging_level': logging.INFO, - 'log_file_logging_level' : logging.DEBUG, + 'log_file_logging_level': logging.DEBUG, 'logging_to_console': True, 'file_name': f'exec{logging_suffix}.log', 'max_bytes': 262144, @@ -67,7 +68,7 @@ MQTT_LOGGING_CONFIG = { 'retain': False, 'keepalive': 60, 'will': None, - 'auth': { 'username': 'mqtt_user', 'password': 'mqtt_password' }, + 'auth': {'username': 'mqtt_user', 'password': 'mqtt_password'}, 'tls': None, 'protocol': MQTTv31, 'transport': 'tcp', @@ -85,7 +86,7 @@ MQTT_CONTROL_CONFIG = { 'retain': False, 'keepalive': 60, 'will': None, - 'auth': { 'username': 'mqtt_user', 'password': 'mqtt_password' }, + 'auth': {'username': 'mqtt_user', 'password': 'mqtt_password'}, 'tls': None, 'protocol': MQTTv31, 'transport': 'tcp', diff --git a/ohmpi.py b/ohmpi.py index 367c97f3..c40bcf72 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -35,7 +35,7 @@ try: import digitalio # noqa from digitalio import Direction # noqa from gpiozero import CPUTemperature # noqa - import minimalmodbus + import minimalmodbus # noqa arm64_imports = True except ImportError as error: if EXEC_LOGGING_CONFIG['logging_level'] == DEBUG: @@ -43,6 +43,7 @@ except ImportError as error: arm64_imports = False except Exception as error: print(colored(f'Unexpected error: {error}', 'red')) + arm64_imports = None exit() @@ -50,7 +51,7 @@ class OhmPi(object): """ OhmPi class. """ - def __init__(self, settings=None, sequence=None, use_mux=False, mqtt=True, on_pi=None, idps=False): + def __init__(self, settings=None, sequence=None, use_mux=False, mqtt=True, onpi=None, idps=False): """Constructs the ohmpi object Parameters @@ -63,19 +64,20 @@ class OhmPi(object): if True use the multiplexor to select active electrodes mqtt: bool, defaut: True if True publish on mqtt topics while logging, otherwise use other loggers only - on_pi: bool,None default: None + onpi: bool,None default: None if None, the platform on which the class is instantiated is determined to set on_pi to either True or False. if False the behaviour of an ohmpi will be partially emulated and return random data. idps: if true uses the DPS """ - if on_pi is None: - _, on_pi = get_platform() + if onpi is None: + _, onpi = get_platform() self._sequence = sequence + self.nb_samples = 0 self.use_mux = use_mux - self.on_pi = on_pi # True if run from the RaspberryPi with the hardware, otherwise False for random data + self.on_pi = onpi # True if run from the RaspberryPi with the hardware, otherwise False for random data self.status = 'idle' # either running or idle self.thread = None # contains the handle for the thread taking the measurement @@ -150,7 +152,6 @@ class OhmPi(object): self.cmd_id = None if self.mqtt: import paho.mqtt.client as mqtt_client - import paho.mqtt.publish as publish self.exec_logger.debug(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}" f" on {MQTT_CONTROL_CONFIG['hostname']} broker") @@ -192,15 +193,9 @@ class OhmPi(object): publisher_config.pop('ctrl_topic') def on_message(client, userdata, message): - print(message.payload.decode('utf-8')) command = message.payload.decode('utf-8') - dic = json.loads(command) - if dic['cmd_id'] != self.cmd_id: - self.cmd_id = dic['cmd_id'] - self.exec_logger.debug(f'Received command {command}') - # payload = json.dumps({'cmd_id': dic['cmd_id'], 'reply': 'ok'}) - # publish.single(payload=payload, **publisher_config) - self._process_commands(command) + self.exec_logger.debug(f'Received command {command}') + self._process_commands(command) self.controller.on_message = on_message else: @@ -208,75 +203,181 @@ class OhmPi(object): self.exec_logger.warning('No connection to control broker.' ' Use python/ipython to interact with OhmPi object...') - @property - def sequence(self): - """Gets sequence""" - if self._sequence is not None: - assert isinstance(self._sequence, np.ndarray) - return self._sequence + @staticmethod + def append_and_save(filename, last_measurement): + """Appends and saves the last measurement dict. - @sequence.setter - def sequence(self, sequence): - """Sets sequence""" - if sequence is not None: - assert isinstance(sequence, np.ndarray) - self.use_mux = True - else: - self.use_mux = False - self._sequence = sequence + Parameters + ---------- + filename : str + filename to save the last measurement dataframe + last_measurement : dict + Last measurement taken in the form of a python dictionary + """ + last_measurement = deepcopy(last_measurement) + if 'fulldata' in last_measurement: + d = last_measurement['fulldata'] + n = d.shape[0] + if n > 1: + idic = dict(zip(['i' + str(i) for i in range(n)], d[:, 0])) + udic = dict(zip(['u' + str(i) for i in range(n)], d[:, 1])) + tdic = dict(zip(['t' + str(i) for i in range(n)], d[:, 2])) + last_measurement.update(idic) + last_measurement.update(udic) + last_measurement.update(tdic) + last_measurement.pop('fulldata') - def _update_acquisition_settings(self, config): - warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) - self.update_settings(config) + if os.path.isfile(filename): + # Load data file and append data to it + with open(filename, 'a') as f: + w = csv.DictWriter(f, last_measurement.keys()) + w.writerow(last_measurement) + # last_measurement.to_csv(f, header=False) + else: + # create data file and add headers + with open(filename, 'a') as f: + w = csv.DictWriter(f, last_measurement.keys()) + w.writeheader() + w.writerow(last_measurement) - def update_settings(self, config): - """Updates acquisition settings from a json file or dictionary. - Parameters can be: - - nb_electrodes (number of electrode used, if 4, no MUX needed) - - injection_duration (in seconds) - - nb_meas (total number of times the sequence will be run) - - sequence_delay (delay in second between each sequence run) - - nb_stack (number of stack for each quadrupole measurement) - - export_path (path where to export the data, timestamp will be added to filename) + def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5): + """Estimates best Tx voltage based on different strategies. + At first a half-cycle is made for a short duration with a fixed + known voltage. This gives us Iab and Rab. We also measure Vmn. + A constant c = vmn/iab is computed (only depends on geometric + factor and ground resistivity, that doesn't change during a + quadrupole). Then depending on the strategy, we compute which + vab to inject to reach the minimum/maximum Iab current or + min/max Vmn. + This function also compute the polarity on Vmn (on which pin + of the ADS1115 we need to measure Vmn to get the positive value). Parameters ---------- - config : str, dict - Path to the .json settings file or dictionary of settings. + best_tx_injtime : float, optional + Time in milliseconds for the half-cycle used to compute Rab. + strategy : str, optional + Either: + - vmin : compute Vab to reach a minimum Iab and Vmn + - vmax : compute Vab to reach a maximum Iab and Vmn + - constant : apply given Vab + tx_volt : float, optional + Voltage apply to try to guess the best voltage. 5 V applied + by default. If strategy "constant" is chosen, constant voltage + to applied is "tx_volt". + + Returns + ------- + vab : float + Proposed Vab according to the given strategy. + polarity : int + Either 1 or -1 to know on which pin of the ADS the Vmn is measured. """ - status = False - if config is not None: - try: - if isinstance(config, dict): - self.settings.update(config) - else: - with open(config) as json_file: - dic = json.load(json_file) - self.settings.update(dic) - self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings)) - status = True - except Exception as e: - self.exec_logger.warning('Unable to update settings.') - status = False + + # hardware limits + voltage_min = 10. # mV + voltage_max = 4500. + current_min = voltage_min / (self.r_shunt * 50) # mA + current_max = voltage_max / (self.r_shunt * 50) + tx_max = 40. # volt + + # check of volt + volt = tx_volt + if volt > tx_max: + self.exec_logger.warning('Sorry, cannot inject more than 40 V, set it back to 5 V') + volt = 5. + + # redefined the pin of the mcp (needed when relays are connected) + self.pin0 = self.mcp.get_pin(0) + self.pin0.direction = Direction.OUTPUT + self.pin0.value = False + self.pin1 = self.mcp.get_pin(1) + self.pin1.direction = Direction.OUTPUT + self.pin1.value = False + + # select a polarity to start with + self.pin0.value = True + self.pin1.value = False + + # set voltage for test + self.DPS.write_register(0x0000, volt, 2) + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) # inject for given tx time + + # autogain + self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_voltage_address) + # print('current P0', AnalogIn(self.ads_current, ads.P0).voltage) + # print('voltage P0', AnalogIn(self.ads_voltage, ads.P0).voltage) + # print('voltage P2', AnalogIn(self.ads_voltage, ads.P2).voltage) + gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) + gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) + gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) + gain_voltage = np.min([gain_voltage0, gain_voltage2]) + # print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) + self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) + self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) + + # we measure the voltage on both A0 and A2 to guess the polarity + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # measure voltage + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + # print('I (mV)', I*50*self.r_shunt) + # print('I (mA)', I) + # print('U0 (mV)', U0) + # print('U2 (mV)', U2) + + # check polarity + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + # print('polarity', polarity) + + # compute constant + c = vmn / I + Rab = (volt * 1000.) / I + + self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') + + # implement different strategy + if strategy == 'vmax': + vmn_max = c * current_max + if voltage_max > vmn_max > voltage_min: + vab = current_max * Rab + self.exec_logger.debug('target max current') + else: + iab = voltage_max / c + vab = iab * Rab + self.exec_logger.debug('target max voltage') + if vab > 25000.: + vab = 25000. + vab = vab / 1000. * 0.9 + + elif strategy == 'vmin': + vmn_min = c * current_min + if voltage_min < vmn_min < voltage_max: + vab = current_min * Rab + self.exec_logger.debug('target min current') + else: + iab = voltage_min / c + vab = iab * Rab + self.exec_logger.debug('target min voltage') + if vab < 1000.: + vab = 1000. + vab = vab / 1000. * 1.1 + + elif strategy == 'constant': + vab = volt else: - self.exec_logger.warning('Settings are missing...') - return status + vab = 5 - def _read_hardware_config(self): - """Reads hardware configuration from config.py - """ - self.exec_logger.debug('Getting hardware config') - self.id = OHMPI_CONFIG['id'] # ID of the OhmPi - self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm - self.Imax = OHMPI_CONFIG['Imax'] # maximum current - self.exec_logger.debug(f'The maximum current cannot be higher than {self.Imax} mA') - self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V - self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack - self.version = OHMPI_CONFIG['version'] # hardware version - self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes - self.board_addresses = OHMPI_CONFIG['board_addresses'] - self.board_version = OHMPI_CONFIG['board_version'] - self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') + # self.DPS.write_register(0x09, 0) # DPS5005 off + self.pin0.value = False + self.pin1.value = False + + return vab, polarity @staticmethod def _find_identical_in_line(quads): @@ -302,9 +403,41 @@ class OhmPi(object): return output - def read_quad(self, filename): - warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) - self.load_sequence(filename) + def _gain_auto(self, channel): + """Automatically sets the gain on a channel + + Parameters + ---------- + channel : ads.ADS1x15 + Instance of ADS where voltage is measured. + + Returns + ------- + gain : float + Gain to be applied on ADS1115. + """ + + gain = 2 / 3 + if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.023): + gain = 2 + elif (abs(channel.voltage) < 1.023) and (abs(channel.voltage) >= 0.508): + gain = 4 + elif (abs(channel.voltage) < 0.508) and (abs(channel.voltage) >= 0.250): + gain = 8 + elif abs(channel.voltage) < 0.256: + gain = 16 + self.exec_logger.debug(f'Setting gain to {gain}') + return gain + + def interrupt(self): + """Interrupts the acquisition. """ + self.status = 'stopping' + if self.thread is not None: + self.thread.join() + self.exec_logger.debug('Interrupted sequence acquisition...') + else: + self.exec_logger.debug('No sequence measurement thread to interrupt.') + self.exec_logger.debug(f'Status: {self.status}') def load_sequence(self, filename: str): """Reads quadrupole sequence from file. @@ -352,257 +485,105 @@ class OhmPi(object): self.sequence = sequence - def _switch_mux(self, electrode_nr, state, role): - """Selects the right channel for the multiplexer cascade for a given electrode. - - Parameters - ---------- - electrode_nr : int - Electrode index to be switched on or off. - state : str - Either 'on' or 'off'. - role : str - Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. - """ - if not self.use_mux or not self.on_pi: - if not self.on_pi: - self.exec_logger.warning('Cannot reset mux while in simulation mode...') - else: - self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' - ' Set use_mux to True to use the multiplexer...') - elif self.sequence is None: - self.exec_logger.warning('Unable to switch MUX without a sequence') - else: - # choose with MUX board - tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) - - # find I2C address of the electrode and corresponding relay - # considering that one MCP23017 can cover 16 electrodes - i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division - relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 - - if i2c_address is not None: - # select the MCP23017 of the selected MUX board - mcp2 = MCP23017(tca[i2c_address]) - mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT - - if state == 'on': - mcp2.get_pin(relay_nr - 1).value = True - else: - mcp2.get_pin(relay_nr - 1).value = False - - self.exec_logger.debug(f'Switching relay {relay_nr} ' - f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') - else: - self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') + def measure(self, *args, **kwargs): + warnings.warn('This function is deprecated. Use run_multiple_sequences() instead.', DeprecationWarning) + self.run_multiple_sequences(self, *args, **kwargs) - def switch_mux_on(self, quadrupole): - """Switches on multiplexer relays for given quadrupole. - - Parameters - ---------- - quadrupole : list of 4 int - List of 4 integers representing the electrode numbers. - """ - roles = ['A', 'B', 'M', 'N'] - # another check to be sure A != B - if quadrupole[0] != quadrupole[1]: - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'on', roles[i]) - else: - self.exec_logger.error('A == B -> short circuit risk detected!') + def _process_commands(self, message): + """Processes commands received from the controller(s) - def switch_mux_off(self, quadrupole): - """Switches off multiplexer relays for given quadrupole. - Parameters ---------- - quadrupole : list of 4 int - List of 4 integers representing the electrode numbers. + message : str + message containing a command and arguments or keywords and arguments """ - roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - if quadrupole[i] > 0: - self._switch_mux(quadrupole[i], 'off', roles[i]) - def reset_mux(self): - """Switches off all multiplexer relays.""" - if self.on_pi and self.use_mux: - roles = ['A', 'B', 'M', 'N'] - for i in range(0, 4): - for j in range(1, self.max_elec + 1): - self._switch_mux(j, 'off', roles[i]) - self.exec_logger.debug('All MUX switched off.') - elif not self.on_pi: - self.exec_logger.warning('Cannot reset mux while in simulation mode...') - else: - self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' - ' Set use_mux to True to use the multiplexer...') - - def _gain_auto(self, channel): - """Automatically sets the gain on a channel - - Parameters - ---------- - channel : object - Instance of ADS where voltage is measured. - - Returns - ------- - gain : float - Gain to be applied on ADS1115. - """ - gain = 2 / 3 - if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.023): - gain = 2 - elif (abs(channel.voltage) < 1.023) and (abs(channel.voltage) >= 0.508): - gain = 4 - elif (abs(channel.voltage) < 0.508) and (abs(channel.voltage) >= 0.250): - gain = 8 - elif abs(channel.voltage) < 0.256: - gain = 16 - self.exec_logger.debug(f'Setting gain to {gain}') - return gain + status = False + cmd_id = '?' + try: + decoded_message = json.loads(message) + self.exec_logger.debug(f'Decoded message {decoded_message}') + cmd_id = decoded_message.pop('cmd_id', None) + cmd = decoded_message.pop('cmd', None) + args = decoded_message.pop('args', None) + if args is not None: + if len(args) != 0: + if args[0] != '[': + args = f'["{args}"]' + self.exec_logger.debug(f'args to decode: {args}') + args = json.loads(args) if args != '[]' else None + self.exec_logger.debug(f'Decoded args {args}') + else: + args = None + kwargs = decoded_message.pop('kwargs', None) + if kwargs is not None: + if len(kwargs) != 0: + if kwargs[0] != '{': + kwargs = '{"' + kwargs + '"}' + self.exec_logger.debug(f'kwargs to decode: {kwargs}') + kwargs = json.loads(kwargs) if kwargs != '' else None + self.exec_logger.debug(f'Decoded kwargs {kwargs}') + else: + kwargs = None + self.exec_logger.debug(f"Calling method {cmd}({str(args) + ', ' if args is not None else ''}" + f"{str(kwargs) if kwargs is not None else ''})") + if cmd_id is None: + self.exec_logger.warning('You should use a unique identifier for cmd_id') + if cmd is not None: + try: + if args is None: + if kwargs is None: + output = getattr(self, cmd)() + else: + output = getattr(self, cmd)(**kwargs) + else: + if kwargs is None: + output = getattr(self, cmd)(*args) + else: + output = getattr(self, cmd)(*args, **kwargs) + status = True + except Exception as e: + self.exec_logger.error( + f"Unable to execute {cmd}({str(args) + ', ' if args is not None else ''}" + f"{str(kwargs) if kwargs is not None else ''}): {e}") + status = False + except Exception as e: + self.exec_logger.warning(f'Unable to decode command {message}: {e}') + status = False + finally: + reply = {'cmd_id': cmd_id, 'status': status} + reply = json.dumps(reply) + self.exec_logger.debug(f'Execution report: {reply}') - def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5): - """Estimates best Tx voltage based on different strategies. - At first a half-cycle is made for a short duration with a fixed - known voltage. This gives us Iab and Rab. We also measure Vmn. - A constant c = vmn/iab is computed (only depends on geometric - factor and ground resistivity, that doesn't change during a - quadrupole). Then depending on the strategy, we compute which - vab to inject to reach the minimum/maximum Iab current or - min/max Vmn. - This function also compute the polarity on Vmn (on which pin - of the ADS1115 we need to measure Vmn to get the positive value). + @staticmethod + def quit(self): + """Quits OhmPi""" - Parameters - ---------- - best_tx_injtime : float, optional - Time in milliseconds for the half-cycle used to compute Rab. - strategy : str, optional - Either: - - vmin : compute Vab to reach a minimum Iab and Vmn - - vmax : compute Vab to reach a maximum Iab and Vmn - - constant : apply given Vab - tx_volt : float, optional - Voltage apply to try to guess the best voltage. 5 V applied - by default. If strategy "constant" is chosen, constant voltage - to applied is "tx_volt". + exit() - Returns - ------- - vab : float - Proposed Vab according to the given strategy. - polarity : int - Either 1 or -1 to know on which pin of the ADS the Vmn is measured. + def _read_hardware_config(self): + """Reads hardware configuration from config.py """ + self.exec_logger.debug('Getting hardware config') + self.id = OHMPI_CONFIG['id'] # ID of the OhmPi + self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm + self.Imax = OHMPI_CONFIG['Imax'] # maximum current + self.exec_logger.debug(f'The maximum current cannot be higher than {self.Imax} mA') + self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V + self.nb_samples = OHMPI_CONFIG['nb_samples'] # number of samples measured for each stack + self.version = OHMPI_CONFIG['version'] # hardware version + self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes + self.board_addresses = OHMPI_CONFIG['board_addresses'] + self.board_version = OHMPI_CONFIG['board_version'] + self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') - # hardware limits - voltage_min = 10 # mV - voltage_max = 4500 - current_min = voltage_min / (self.r_shunt * 50) # mA - current_max = voltage_max / (self.r_shunt * 50) - tx_max = 40 # volt - - # check of volt - volt = tx_volt - if volt > tx_max: - print('sorry, cannot inject more than 40 V, set it back to 5 V') - volt = 5 - - # redefined the pin of the mcp (needed when relays are connected) - self.pin0 = self.mcp.get_pin(0) - self.pin0.direction = Direction.OUTPUT - self.pin0.value = False - self.pin1 = self.mcp.get_pin(1) - self.pin1.direction = Direction.OUTPUT - self.pin1.value = False - - # select a polarity to start with - self.pin0.value = True - self.pin1.value = False - - # set voltage for test - self.DPS.write_register(0x0000, volt, 2) - self.DPS.write_register(0x09, 1) # DPS5005 on - time.sleep(best_tx_injtime) # inject for given tx time - - # autogain - self.ads_current = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=2/3, data_rate=860, address=self.ads_voltage_address) - # print('current P0', AnalogIn(self.ads_current, ads.P0).voltage) - # print('voltage P0', AnalogIn(self.ads_voltage, ads.P0).voltage) - # print('voltage P2', AnalogIn(self.ads_voltage, ads.P2).voltage) - gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) - gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) - gain_voltage = np.min([gain_voltage0, gain_voltage2]) - # print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) - self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) - self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - - # we measure the voltage on both A0 and A2 to guess the polarity - I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # measure current - U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # measure voltage - U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. - # print('I (mV)', I*50*self.r_shunt) - # print('I (mA)', I) - # print('U0 (mV)', U0) - # print('U2 (mV)', U2) - - # check polarity - polarity = 1 # by default, we guessed it right - vmn = U0 - if U0 < 0: # we guessed it wrong, let's use a correction factor - polarity = -1 - vmn = U2 - # print('polarity', polarity) - - # compute constant - c = vmn / I - Rab = (volt * 1000.) / I - - self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') - - # implement different strategy - if strategy == 'vmax': - vmn_max = c * current_max - if voltage_max > vmn_max > voltage_min: - vab = current_max * Rab - self.exec_logger.debug('target max current') - else: - iab = voltage_max / c - vab = iab * Rab - self.exec_logger.debug('target max voltage') - if vab > 25000.: - vab = 25000. - vab = vab / 1000. * 0.9 - - elif strategy == 'vmin': - vmn_min = c * current_min - if voltage_min < vmn_min < voltage_max: - vab = current_min * Rab - self.exec_logger.debug('target min current') - else: - iab = voltage_min / c - vab = iab * Rab - self.exec_logger.debug('target min voltage') - if vab < 1000.: - vab = 1000. - vab = vab / 1000. * 1.1 - - elif strategy == 'constant': - vab = volt - else: - vab = 5 - - # self.DPS.write_register(0x09, 0) # DPS5005 off - self.pin0.value = False - self.pin1.value = False + def read_quad(self, filename): + warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) + self.load_sequence(filename) - return vab, polarity + def restart(self): + self.exec_logger.info('Restarting pi...') + os.system('reboot') def run_measurement(self, quad=None, nb_stack=None, injection_duration=None, autogain=True, strategy='constant', tx_volt=5, best_tx_injtime=0.1, @@ -634,6 +615,8 @@ class OhmPi(object): measurement will be taken and values will be NaN. best_tx_injtime : float, optional (V3.0 only) Injection time in seconds used for finding the best voltage. + cmd_id : + """ self.exec_logger.debug('Starting measurement') self.exec_logger.info('Waiting for data') @@ -679,6 +662,8 @@ class OhmPi(object): address=self.ads_voltage_address, mode=0) # turn on the power supply + start_delay = None + end_delay = None out_of_range = False if self.idps: if not np.isnan(tx_volt): @@ -740,6 +725,7 @@ class OhmPi(object): meas = np.zeros((self.nb_samples, 3)) * np.nan start_delay = time.time() # stating measurement time dt = 0 + k = 0 for k in range(0, self.nb_samples): # reading current value on ADS channels meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) @@ -824,6 +810,7 @@ class OhmPi(object): sum_i = np.nan sum_vmn = np.nan sum_ps = np.nan + fulldata = None if self.idps: self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt @@ -883,6 +870,121 @@ class OhmPi(object): return d + def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs): + """Runs multiple sequences in a separate thread for monitoring mode. + Can be stopped by 'OhmPi.interrupt()'. + Additional arguments are passed to run_measurement(). + + Parameters + ---------- + cmd_id : + + sequence_delay : int, optional + Number of seconds at which the sequence must be started from each others. + nb_meas : int, optional + Number of time the sequence must be repeated. + kwargs : dict, optional + See help(k.run_measurement) for more info. + """ + # self.run = True + if sequence_delay is None: + sequence_delay = self.settings['sequence_delay'] + sequence_delay = int(sequence_delay) + if nb_meas is None: + nb_meas = self.settings['nb_meas'] + self.status = 'running' + self.exec_logger.debug(f'Status: {self.status}') + self.exec_logger.debug(f'Measuring sequence: {self.sequence}') + + def func(): + for g in range(0, nb_meas): # for time-lapse monitoring + if self.status == 'stopping': + self.exec_logger.warning('Data acquisition interrupted') + break + t0 = time.time() + self.run_sequence(**kwargs) + + # sleeping time between sequence + dt = sequence_delay - (time.time() - t0) + if dt < 0: + dt = 0 + if nb_meas > 1: + time.sleep(dt) # waiting for next measurement (time-lapse) + self.status = 'idle' + self.thread = threading.Thread(target=func) + self.thread.start() + + def run_sequence(self, cmd_id=None, **kwargs): + """Runs sequence synchronously (=blocking on main thread). + Additional arguments are passed to run_measurement(). + """ + self.status = 'running' + self.exec_logger.debug(f'Status: {self.status}') + self.exec_logger.debug(f'Measuring sequence: {self.sequence}') + t0 = time.time() + + # create filename with timestamp + filename = self.settings["export_path"].replace('.csv', + f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') + self.exec_logger.debug(f'Saving to {filename}') + + # make sure all multiplexer are off + self.reset_mux() + + # measure all quadrupole of the sequence + if self.sequence is None: + n = 1 + else: + n = self.sequence.shape[0] + for i in range(0, n): + if self.sequence is None: + quad = np.array([0, 0, 0, 0]) + else: + quad = self.sequence[i, :] # quadrupole + if self.status == 'stopping': + break + + # call the switch_mux function to switch to the right electrodes + self.switch_mux_on(quad) + + # run a measurement + if self.on_pi: + acquired_data = self.run_measurement(quad, **kwargs) + else: # for testing, generate random data + acquired_data = { + 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], + 'R [ohm]': np.abs(np.random.randn(1)) + } + + # switch mux off + self.switch_mux_off(quad) + + # add command_id in dataset + acquired_data.update({'cmd_id': cmd_id}) + # log data to the data logger + self.data_logger.info(f'{acquired_data}') + # save data and print in a text file + self.append_and_save(filename, acquired_data) + self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') + + self.status = 'idle' + + def run_sequence_async(self, cmd_id=None, **kwargs): + """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. + Additional arguments are passed to run_measurement(). + + Parameters + ---------- + cmd_id: + """ + + def func(): + self.run_sequence(**kwargs) + + self.thread = threading.Thread(target=func) + self.thread.start() + self.status = 'idle' + def rs_check(self, tx_volt=12): """Checks contact resistances""" # create custom sequence where MN == AB @@ -935,112 +1037,28 @@ class OhmPi(object): self.exec_logger.debug(msg) - # if contact resistance = 0 -> we have a short circuit!! - if resist < 1e-5: - msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm' - self.exec_logger.warning(msg) - print(msg) - - # save data and print in a text file - self.append_and_save(export_path_rs, { - 'A': quad[0], - 'B': quad[1], - 'RS [kOhm]': resist, - }) - - # close mux path and put pin back to GND - self.switch_mux_off(quad) - else: - pass - self.status = 'idle' - - # - # # TODO if interrupted, we would need to restore the values - # # TODO or we offer the possibility in 'run_measurement' to have rs_check each time? - - @staticmethod - def append_and_save(filename, last_measurement): - """Appends and saves the last measurement dict. - - Parameters - ---------- - filename : str - filename to save the last measurement dataframe - last_measurement : dict - Last measurement taken in the form of a python dictionary - """ - last_measurement = deepcopy(last_measurement) - if 'fulldata' in last_measurement: - d = last_measurement['fulldata'] - n = d.shape[0] - if n > 1: - idic = dict(zip(['i' + str(i) for i in range(n)], d[:, 0])) - udic = dict(zip(['u' + str(i) for i in range(n)], d[:, 1])) - tdic = dict(zip(['t' + str(i) for i in range(n)], d[:, 2])) - last_measurement.update(idic) - last_measurement.update(udic) - last_measurement.update(tdic) - last_measurement.pop('fulldata') - - if os.path.isfile(filename): - # Load data file and append data to it - with open(filename, 'a') as f: - w = csv.DictWriter(f, last_measurement.keys()) - w.writerow(last_measurement) - # last_measurement.to_csv(f, header=False) - else: - # create data file and add headers - with open(filename, 'a') as f: - w = csv.DictWriter(f, last_measurement.keys()) - w.writeheader() - w.writerow(last_measurement) - - def _process_commands(self, message): - """Processes commands received from the controller(s) - - Parameters - ---------- - message : str - message containing a command and arguments or keywords and arguments - """ - - try: - decoded_message = json.loads(message) - print(f'decoded message: {decoded_message}') - cmd_id = decoded_message.pop('cmd_id', None) - cmd = decoded_message.pop('cmd', None) - args = decoded_message.pop('args', '') - if len(args) == 0: - args = f'["{args}"]' - args = json.loads(args) - kwargs = decoded_message.pop('kwargs', '') - if len(kwargs) == 0: - kwargs = '"{}"' - kwargs = json.loads(kwargs) - self.exec_logger.debug(f'Calling method {cmd}({args}, {kwargs})') - status = False - # e = None # NOTE: Why this? - print(cmd, args, kwargs) - if cmd_id is None: - self.exec_logger.warning('You should use a unique identifier for cmd_id') - if cmd is not None: - try: - output = getattr(self, cmd)(*args, **kwargs) - status = True - except Exception as e: - self.exec_logger.error( - f"{e}\nUnable to execute {cmd}({args + ', ' if args != '[]' else ''}" - f"{kwargs if kwargs != '{}' else ''})") - status = False - except Exception as e: - self.exec_logger.warning(f'Unable to decode command {message}: {e}') - status = False - finally: - reply = {'cmd_id': cmd_id, 'status': status} - reply = json.dumps(reply) - self.exec_logger.debug(f'Execution report: {reply}') - - def set_sequence(self, sequence=sequence): + # if contact resistance = 0 -> we have a short circuit!! + if resist < 1e-5: + msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm' + self.exec_logger.warning(msg) + + # save data in a text file + self.append_and_save(export_path_rs, { + 'A': quad[0], + 'B': quad[1], + 'RS [kOhm]': resist, + }) + + # close mux path and put pin back to GND + self.switch_mux_off(quad) + else: + pass + self.status = 'idle' + # + # # TODO if interrupted, we would need to restore the values + # # TODO or we offer the possibility in 'run_measurement' to have rs_check each time? + + def set_sequence(self, sequence=None): try: self.sequence = np.loadtxt(StringIO(sequence)).astype('uint32') status = True @@ -1048,150 +1066,152 @@ class OhmPi(object): self.exec_logger.warning(f'Unable to set sequence: {e}') status = False - def run_sequence(self, cmd_id=None, **kwargs): - """Runs sequence synchronously (=blocking on main thread). - Additional arguments are passed to run_measurement(). - """ - self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - t0 = time.time() - - # create filename with timestamp - filename = self.settings["export_path"].replace('.csv', - f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') - self.exec_logger.debug(f'Saving to {filename}') + def stop(self): + warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning) + self.interrupt() - # make sure all multiplexer are off - self.reset_mux() + def _switch_mux(self, electrode_nr, state, role): + """Selects the right channel for the multiplexer cascade for a given electrode. - # measure all quadrupole of the sequence - if self.sequence is None: - n = 1 - else: - n = self.sequence.shape[0] - for i in range(0, n): - if self.sequence is None: - quad = np.array([0, 0, 0, 0]) + Parameters + ---------- + electrode_nr : int + Electrode index to be switched on or off. + state : str + Either 'on' or 'off'. + role : str + Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. + """ + if not self.use_mux or not self.on_pi: + if not self.on_pi: + self.exec_logger.warning('Cannot reset mux while in simulation mode...') else: - quad = self.sequence[i, :] # quadrupole - if self.status == 'stopping': - break - - # call the switch_mux function to switch to the right electrodes - self.switch_mux_on(quad) - - # run a measurement - if self.on_pi: - acquired_data = self.run_measurement(quad, **kwargs) - else: # for testing, generate random data - acquired_data = { - 'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], - 'R [ohm]': np.abs(np.random.randn(1)) - } + self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' + ' Set use_mux to True to use the multiplexer...') + elif self.sequence is None: + self.exec_logger.warning('Unable to switch MUX without a sequence') + else: + # choose with MUX board + tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) - # switch mux off - self.switch_mux_off(quad) + # find I2C address of the electrode and corresponding relay + # considering that one MCP23017 can cover 16 electrodes + i2c_address = 7 - (electrode_nr - 1) // 16 # quotient without rest of the division + relay_nr = electrode_nr - (electrode_nr // 16) * 16 + 1 - # add command_id in dataset - acquired_data.update({'cmd_id': cmd_id}) - # log data to the data logger - self.data_logger.info(f'{acquired_data}') - # save data and print in a text file - self.append_and_save(filename, acquired_data) - self.exec_logger.debug(f'quadrupole {i+1:d}/{n:d}') + if i2c_address is not None: + # select the MCP23017 of the selected MUX board + mcp2 = MCP23017(tca[i2c_address]) + mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT - self.status = 'idle' + if state == 'on': + mcp2.get_pin(relay_nr - 1).value = True + else: + mcp2.get_pin(relay_nr - 1).value = False - def run_sequence_async(self, cmd_id=None, **kwargs): - """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. - Additional arguments are passed to run_measurement(). + self.exec_logger.debug(f'Switching relay {relay_nr} ' + f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') + else: + self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') - Parameters - ---------- - cmd_id: + def switch_mux_on(self, quadrupole): + """Switches on multiplexer relays for given quadrupole. + Parameters + ---------- + quadrupole : list of 4 int + List of 4 integers representing the electrode numbers. """ + roles = ['A', 'B', 'M', 'N'] + # another check to be sure A != B + if quadrupole[0] != quadrupole[1]: + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'on', roles[i]) + else: + self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') - def func(): - self.run_sequence(**kwargs) - - self.thread = threading.Thread(target=func) - self.thread.start() - self.status = 'idle' - - def measure(self, *args, **kwargs): - warnings.warn('This function is deprecated. Use run_multiple_sequences() instead.', DeprecationWarning) - self.run_multiple_sequences(self, *args, **kwargs) - - def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs): - """Runs multiple sequences in a separate thread for monitoring mode. - Can be stopped by 'OhmPi.interrupt()'. - Additional arguments are passed to run_measurement(). + def switch_mux_off(self, quadrupole): + """Switches off multiplexer relays for given quadrupole. Parameters ---------- - sequence_delay : int, optional - Number of seconds at which the sequence must be started from each others. - nb_meas : int, optional - Number of time the sequence must be repeated. - kwargs : dict, optional - See help(k.run_measurement) for more info. + quadrupole : list of 4 int + List of 4 integers representing the electrode numbers. """ - # self.run = True - if sequence_delay is None: - sequence_delay = self.settings['sequence_delay'] - sequence_delay = int(sequence_delay) - if nb_meas is None: - nb_meas = self.settings['nb_meas'] - self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') + roles = ['A', 'B', 'M', 'N'] + for i in range(0, 4): + if quadrupole[i] > 0: + self._switch_mux(quadrupole[i], 'off', roles[i]) - def func(): - for g in range(0, nb_meas): # for time-lapse monitoring - if self.status == 'stopping': - self.exec_logger.warning('Data acquisition interrupted') - break - t0 = time.time() - self.run_sequence(**kwargs) + def reset_mux(self): + """Switches off all multiplexer relays.""" + if self.on_pi and self.use_mux: + roles = ['A', 'B', 'M', 'N'] + for i in range(0, 4): + for j in range(1, self.max_elec + 1): + self._switch_mux(j, 'off', roles[i]) + self.exec_logger.debug('All MUX switched off.') + elif not self.on_pi: + self.exec_logger.warning('Cannot reset mux while in simulation mode...') + else: + self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' + ' Set use_mux to True to use the multiplexer...') - # sleeping time between sequence - dt = sequence_delay - (time.time() - t0) - if dt < 0: - dt = 0 - if nb_meas > 1: - time.sleep(dt) # waiting for next measurement (time-lapse) - self.status = 'idle' - self.thread = threading.Thread(target=func) - self.thread.start() + def _update_acquisition_settings(self, config): + warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) + self.update_settings(config) - def stop(self): - warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning) - self.interrupt() + def update_settings(self, config): + """Updates acquisition settings from a json file or dictionary. + Parameters can be: + - nb_electrodes (number of electrode used, if 4, no MUX needed) + - injection_duration (in seconds) + - nb_meas (total number of times the sequence will be run) + - sequence_delay (delay in second between each sequence run) + - nb_stack (number of stack for each quadrupole measurement) + - export_path (path where to export the data, timestamp will be added to filename) - def interrupt(self): - """Interrupts the acquisition. """ - self.status = 'stopping' - if self.thread is not None: - self.exec_logger.debug('Joining tread...') - self.thread.join() + Parameters + ---------- + config : str, dict + Path to the .json settings file or dictionary of settings. + """ + status = False + if config is not None: + try: + if isinstance(config, dict): + self.settings.update(config) + else: + with open(config) as json_file: + dic = json.load(json_file) + self.settings.update(dic) + self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings)) + status = True + except Exception as e: + self.exec_logger.warning('Unable to update settings.') + status = False else: - self.exec_logger.debug('No sequence measurement thread to interrupt.') - self.exec_logger.debug(f'Status: {self.status}') + self.exec_logger.warning('Settings are missing...') + return status - def quit(self): - """Quit OhmPi. - """ - self.cmd_listen = False - if self.cmd_thread is not None: - self.cmd_thread.join() - self.exec_logger.debug(f'Stopped listening to control topic.') - exit() + # Properties + @property + def sequence(self): + """Gets sequence""" + if self._sequence is not None: + assert isinstance(self._sequence, np.ndarray) + return self._sequence - def restart(self): - self.exec_logger.info('Restarting pi...') - os.system('reboot') + @sequence.setter + def sequence(self, sequence): + """Sets sequence""" + if sequence is not None: + assert isinstance(sequence, np.ndarray) + self.use_mux = True + else: + self.use_mux = False + self._sequence = sequence VERSION = '2.1.5' -- GitLab