From f2bcb5118215e8d984015d3365d79755fa47c0a7 Mon Sep 17 00:00:00 2001 From: awatlet <arnaud.watlet@umons.ac.be> Date: Tue, 3 Oct 2023 16:13:12 +0200 Subject: [PATCH] Updates mb 2023 kwargs management --- configs/config_mb_2023_3_mux_2024.py | 4 +- .../abstract_hardware_components.py | 6 +- ohmpi/hardware_components/mb_2023_0_X.py | 76 ++++++++++++------- ohmpi/utils.py | 23 +++--- 4 files changed, 66 insertions(+), 43 deletions(-) diff --git a/configs/config_mb_2023_3_mux_2024.py b/configs/config_mb_2023_3_mux_2024.py index 78673566..cdbcc7a4 100644 --- a/configs/config_mb_2023_3_mux_2024.py +++ b/configs/config_mb_2023_3_mux_2024.py @@ -21,14 +21,12 @@ HARDWARE_CONFIG = { 'ctl': {'model': 'raspberry_pi'}, 'pwr': {'model': 'pwr_batt', 'voltage': 12.}, 'tx': {'model': 'mb_2023_0_X', - 'mcp_board_address': 0x20, 'voltage_max': 12., # Maximum voltage supported by the TX board [V] - 'current_max': 4800 / 50 / 2, # Maximum current supported by the TX board [mA] + 'adc_voltage_max': 4800., # Maximum voltage read by the current ADC on the TX board [mA] 'r_shunt': 2 # Shunt resistance in Ohms }, 'rx': {'model': 'mb_2023_0_X', 'coef_p2': 2.50, # slope for conversion for ADS, measurement in V/V - 'latency': 0.010, # latency in seconds in continuous mode 'sampling_rate': 50 # number of samples per second }, 'mux': # default properties are system properties that will be diff --git a/ohmpi/hardware_components/abstract_hardware_components.py b/ohmpi/hardware_components/abstract_hardware_components.py index 4868a008..5729623f 100644 --- a/ohmpi/hardware_components/abstract_hardware_components.py +++ b/ohmpi/hardware_components/abstract_hardware_components.py @@ -112,7 +112,7 @@ class MuxAbstract(ABC): if self.board_id is None: self.exec_logger.error(f'MUX {self.board_name} should have an id !') self.exec_logger.debug(f'MUX {self.board_id} ({self.board_name}) initialization') - self.connection = kwargs.pop('io', None) + self.connection = kwargs.pop('connection', None) cabling = kwargs.pop('cabling', None) self.cabling = {} if cabling is not None: @@ -257,7 +257,7 @@ class TxAbstract(ABC): if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_tx') self.ctl = kwargs.pop('ctl', None) - self.connection = kwargs.pop('io', None) + self.connection = kwargs.pop('connection', None) self.pwr = kwargs.pop('pwr', None) self._polarity = 0 self._injection_duration = None @@ -375,7 +375,7 @@ class RxAbstract(ABC): if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_rx') self.ctl = kwargs.pop('ctl', None) - self.connection = kwargs.pop('io', None) + self.connection = kwargs.pop('connection', None) self.board_name = kwargs.pop('board_name', 'unknown RX hardware') self._sampling_rate = kwargs.pop('sampling_rate', 1) # ms self.exec_logger.debug(f'{self.board_name} RX initialization') diff --git a/ohmpi/hardware_components/mb_2023_0_X.py b/ohmpi/hardware_components/mb_2023_0_X.py index d130f209..1dd0c329 100644 --- a/ohmpi/hardware_components/mb_2023_0_X.py +++ b/ohmpi/hardware_components/mb_2023_0_X.py @@ -10,6 +10,7 @@ import time import numpy as np import os from ohmpi.hardware_components import TxAbstract, RxAbstract +from ohmpi.utils import enforce_specs # ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi') # ctl_connection = HARDWARE_CONFIG['ctl'].pop('connection', 'i2c') # ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}') @@ -19,11 +20,21 @@ from ohmpi.hardware_components import TxAbstract, RxAbstract # hardware characteristics and limitations # voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s -SPECS = {'RX': {'voltage_adc_voltage_min': 10., 'voltage_adc_voltage_max': 4500., 'sampling_rate': 20., - 'data_rate': 860.}, - 'TX': {'current_adc_voltage_min': 10., 'bias': 0., 'injection_voltage_max': 12000., 'low_battery': 12000., - 'tx_mcp_board_address': 0x20, 'data_rate': 860., 'comptatible_power_sources': ['pwr_batt', 'dps5005'], - 'r_shunt': 2., 'activation_delay': 0.005, 'release_delay': 0.001}} +SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, + 'data_rate': {'default': 860.}, + 'connection': {'default': 'i2c'}, + 'bias': {'min': -5000., 'default': 0., 'max': 5000.}, + 'coef_p2': {'default': 2.50}}, + 'tx': {'adc_voltage_min': {'default': 10.}, + 'adc_voltage_max': {'default': 4500.}, + 'voltage_max': {'min': 0., 'default': 12., 'max': 12.}, + 'data_rate': {'default': 860.}, + 'compatible_power_sources': {'default': ['pwr_batt', 'dps5005']}, + 'r_shunt': {'min': 0., 'default': 2. }, + 'activation_delay': {'default': 0.005}, + 'release_delay': {'default': 0.001}, + 'connection': {'default': 'i2c'} + }} # TODO: move low_battery spec in pwr @@ -96,30 +107,37 @@ def _gain_auto(channel): class Tx(TxAbstract): def __init__(self, **kwargs): + for key in SPECS['tx']: + kwargs = enforce_specs(kwargs, SPECS['tx'], key) kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) kwargs.update({'pwr': kwargs.pop('pwr', SPECS['compatible_power_sources'][0])}) - if kwargs['pwr'] not in SPECS['TX']['compatible_power_sources']: + if kwargs['pwr'] not in SPECS['tx']['compatible_power_sources']: self.exec_logger.warning(f'Incompatible power source specified check config') - assert kwargs['pwr'] in SPECS['TX'] + assert kwargs['pwr'] in SPECS['tx'] #self.pwr = None # TODO: set a list of compatible power system with the tx self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}') - self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage']) + self.voltage_max = kwargs['voltage_max'] + self.voltage_adjustable = False self.current_adjustable = False if self.ctl is None: self.ctl = ctl_module.Ctl() # elif isinstance(self.ctl, dict): # self.ctl = ctl_module.Ctl(**self.ctl) - self.connection = self.ctl.interfaces[kwargs.pop('connection', ctl_connection)] + self.connection = self.ctl.interfaces[kwargs['connection']] # I2C connexion to MCP23008, for current injection - self.mcp_board = MCP23008(self.connection, address=SPECS['TX']['mcp_board_address']) + self.mcp_board = MCP23008(self.connection, address=0x20) # ADS1115 for current measurement (AB) self._ads_current_address = 0x48 - self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=860, + self._ads_current_data_rate = kwargs['data_rate'] + self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=self._ads_current_data_rate, address=self._ads_current_address) self._ads_current.mode = Mode.CONTINUOUS + self.r_shunt = kwargs['r_shunt'] + self.adc_voltage_min = kwargs['adc_voltage_min'] + self.adc_voltage_max = kwargs['adc_voltage_max'] # Relays for pulse polarity self.pin0 = self.mcp_board.get_pin(0) @@ -128,13 +146,14 @@ class Tx(TxAbstract): self.pin1.direction = Direction.OUTPUT self.polarity = 0 self.adc_gain = 2 / 3 + self.activation_delay = kwargs['activation_delay'] + self.release_delay = kwargs['release_delay'] # MCP23008 pins for LEDs self.pin4 = self.mcp_board.get_pin(4) # TODO: Delete me? No LED on this version of the board self.pin4.direction = Direction.OUTPUT self.pin4.value = True - self._bias = kwargs.pop('bias', TX_CONFIG['bias']) self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}') @property @@ -145,7 +164,7 @@ class Tx(TxAbstract): def adc_gain(self, value): assert value in [2/3, 2, 4, 8, 16] self._adc_gain = value - self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=SPECS['TX']['data_rate'], + self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=kwargs['data_rate'], address=self._ads_current_address) self._ads_current.mode = Mode.CONTINUOUS self.exec_logger.debug(f'Setting TX ADC gain to {value}') @@ -159,20 +178,20 @@ class Tx(TxAbstract): def current_pulse(self, **kwargs): TxAbstract.current_pulse(self, **kwargs) - self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board') + self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') @property def current(self): """ Gets the current IAB in Amps """ - iab = AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * TX_CONFIG['r_shunt']) # measure current + iab = AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * self.r_shunt) # measure current self.exec_logger.debug(f'Reading TX current: {iab} mA') return iab @ current.setter def current(self, value): - assert TX_CONFIG['current_min'] <= value <= TX_CONFIG['current_max'] - self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board') + assert self.adc_voltage_min / (50 * self.r_shunt) <= value <= self.adc_voltage_max / (50 * self.r_shunt) + self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') def inject(self, polarity=1, injection_duration=None): self.polarity = polarity @@ -189,15 +208,15 @@ class Tx(TxAbstract): if polarity == 1: self.pin0.value = True self.pin1.value = False - time.sleep(SPECS['TX']['activation_delay']) # Max turn on time of 211EH relays = 5ms + time.sleep(self.activation_delay) # Max turn on time of 211EH relays = 5ms elif polarity == -1: self.pin0.value = False self.pin1.value = True - time.sleep(SPECS['TX']['activation_delay']) # Max turn on time of 211EH relays = 5ms + time.sleep(self.activation_delay) # Max turn on time of 211EH relays = 5ms else: self.pin0.value = False self.pin1.value = False - time.sleep(SPECS['TX']['release_delay']) # Max turn off time of 211EH relays = 1ms + time.sleep(self.release_delay) # Max turn off time of 211EH relays = 1ms def turn_off(self): self.pwr.turn_off(self) @@ -209,9 +228,9 @@ class Tx(TxAbstract): def tx_bat(self): self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}') self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.') - return TX_CONFIG['low_battery'] + return self.pwr.voltage - def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=1): + def voltage_pulse(self, voltage=self.voltage, length=None, polarity=1): """ Generates a square voltage pulse Parameters @@ -235,12 +254,14 @@ class Tx(TxAbstract): class Rx(RxAbstract): def __init__(self, **kwargs): + for key in kwargs: + kwargs = enforce_specs(kwargs, SPECS['rx'], key) kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}') if self.ctl is None: self.ctl = ctl_module.Ctl() - self.connection = self.ctl.interfaces[kwargs.pop('connection', ctl_connection)] + self.connection = self.ctl.interfaces[kwargs['connection']] # ADS1115 for voltage measurement (MN) self._ads_voltage_address = 0x49 @@ -248,11 +269,10 @@ class Rx(RxAbstract): self._ads_voltage = ads.ADS1115(self.connection, gain=self._adc_gain, data_rate=860, address=self._ads_voltage_address) self._ads_voltage.mode = Mode.CONTINUOUS - self._coef_p2 = kwargs.pop('coef_p2', RX_CONFIG['coef_p2']) - self._voltage_max = kwargs.pop('voltage_max', RX_CONFIG['voltage_max']) - self._sampling_rate = kwargs.pop('sampling_rate', sampling_rate) - self._latency = kwargs.pop('latency', RX_CONFIG['latency']) - self._bias = kwargs.pop('bias', RX_CONFIG['bias']) + self._coef_p2 = kwargs['coef_p2'] + # self._voltage_max = kwargs['voltage_max'] + self._sampling_rate = kwargs['sampling_rate'] + self._bias = kwargs['bias'] self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}') @property diff --git a/ohmpi/utils.py b/ohmpi/utils.py index 70ab93dc..6f133d04 100644 --- a/ohmpi/utils.py +++ b/ohmpi/utils.py @@ -3,18 +3,23 @@ import os import shutil import collections.abc import numpy as np +from numbers import Number def enforce_specs(kwargs, specs, key): + kwargs.update({key: kwargs.pop(key, specs[key]['default'])}) - s = specs.copy() - min_value = s[key].pop('min', -np.inf) - s[key]['min'] = min_value - max_value = s[key].pop('max', np.inf) - s[key]['max'] = max_value - if kwargs[key] < min_value: - kwargs[key] = min_value - elif kwargs[key] > max_value: - kwargs[key] = max_value + + if isinstance(kwargs[key], Number): + s = specs.copy() + min_value = s[key].pop('min', -np.inf) + s[key]['min'] = min_value + max_value = s[key].pop('max', np.inf) + s[key]['max'] = max_value + if kwargs[key] < min_value: + kwargs[key] = min_value + elif kwargs[key] > max_value: + kwargs[key] = max_value + return kwargs def update_dict(d, u): -- GitLab