diff --git a/configs/config_mb_2024_0_2.py b/configs/config_mb_2024_0_2.py index 5d38531c5b1f8d2aa157db72ee69ce274ee630bd..e90350deab5262485d623a7013ae297d49ddfca5 100644 --- a/configs/config_mb_2024_0_2.py +++ b/configs/config_mb_2024_0_2.py @@ -1,7 +1,6 @@ import logging from ohmpi.utils import get_platform - -from paho.mqtt.client import MQTTv31 +from paho.mqtt.client import MQTTv31 # noqa _, on_pi = get_platform() # DEFINE THE ID OF YOUR OhmPi @@ -44,7 +43,7 @@ EXEC_LOGGING_CONFIG = { 'log_file_logging_level': logging.DEBUG, 'logging_to_console': True, 'file_name': f'exec{logging_suffix}.log', - 'max_bytes': 262144, + 'max_bytes': 1048576, 'backup_count': 30, 'when': 'd', 'interval': 1 diff --git a/ohmpi/__init__.py b/ohmpi/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..05eed56091a0484d7b70c59b09dae9c276790702 100644 --- a/ohmpi/__init__.py +++ b/ohmpi/__init__.py @@ -0,0 +1 @@ +from .ohmpi import OhmPi diff --git a/ohmpi/hardware_components/abstract_hardware_components.py b/ohmpi/hardware_components/abstract_hardware_components.py index 8e428fd938eb81f0453a8107fcfbc358ec8891ba..a9c3e2127065337ece2e606711bec3e75441f71f 100644 --- a/ohmpi/hardware_components/abstract_hardware_components.py +++ b/ohmpi/hardware_components/abstract_hardware_components.py @@ -7,7 +7,7 @@ from threading import Event, Barrier, BrokenBarrierError class CtlAbstract(ABC): def __init__(self, **kwargs): - self.board_name = kwargs.pop('board_name', 'unknown CTL hardware') + self.model = kwargs.pop('model', 'unknown CTL hardware') self.interfaces = None self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: @@ -15,7 +15,7 @@ class CtlAbstract(ABC): self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_ctl') - self.exec_logger.debug(f'{self.board_name} Ctl initialization') + self.exec_logger.debug(f'{self.model} Ctl initialization') self._cpu_temp_available = False self.max_cpu_temp = np.inf self.connection = kwargs.pop('connection', None) @@ -23,12 +23,12 @@ class CtlAbstract(ABC): @property def cpu_temperature(self): if not self._cpu_temp_available: - self.exec_logger.warning(f'CPU temperature reading is not available for {self.board_name}') + self.exec_logger.warning(f'CPU temperature reading is not available for {self.model}') cpu_temp = np.nan else: cpu_temp = self._cpu_temp if cpu_temp > self.max_cpu_temp: - self.soh_logger.warning(f'CPU temperature of {self.board_name} is over the limit!') + self.soh_logger.warning(f'CPU temperature of {self.model} is over the limit!') return cpu_temp @property @@ -39,7 +39,7 @@ class CtlAbstract(ABC): class PwrAbstract(ABC): def __init__(self, **kwargs): - self.board_name = kwargs.pop('board_name', 'unknown PWR hardware') + self.model = kwargs.pop('model', 'unknown PWR hardware') self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_mux') @@ -71,12 +71,12 @@ class PwrAbstract(ABC): @abstractmethod def turn_off(self): - self.exec_logger.debug(f'Switching {self.board_name} off') + self.exec_logger.debug(f'Switching {self.model} off') self._state = 'off' @abstractmethod def turn_on(self): - self.exec_logger.debug(f'Switching {self.board_name} on') + self.exec_logger.debug(f'Switching {self.model} on') self._state = 'on' @property @@ -90,7 +90,7 @@ class PwrAbstract(ABC): def voltage(self, value): assert isinstance(value, float) if not self.voltage_adjustable: - self.exec_logger.debug(f'Voltage cannot be set on {self.board_name}...') + self.exec_logger.debug(f'Voltage cannot be set on {self.model}...') else: assert self._voltage_min < value < self._voltage_max # add actions to set the DPS voltage @@ -99,17 +99,13 @@ class PwrAbstract(ABC): class MuxAbstract(ABC): def __init__(self, **kwargs): - self.board_name = kwargs.pop('board_name', 'unknown MUX hardware') - self.exec_logger = kwargs.pop('exec_logger', None) - if self.exec_logger is None: - self.exec_logger = create_stdout_logger('exec_mux') - self.soh_logger = kwargs.pop('soh_logger', None) - if self.soh_logger is None: - self.soh_logger = create_stdout_logger('soh_mux') + self.model = kwargs.pop('model', 'unknown MUX hardware') + self.exec_logger = kwargs.pop('exec_logger', create_stdout_logger('exec_mux')) + self.soh_logger = kwargs.pop('soh_logger', create_stdout_logger('soh_mux')) self.board_id = kwargs.pop('id', None) if self.board_id is None: - self.exec_logger.error(f'MUX {self.board_name} should have an id !') - self.exec_logger.debug(f'MUX {self.board_id} ({self.board_name}) initialization') + self.exec_logger.error(f'MUX {self.model} should have an id !') + self.exec_logger.debug(f'MUX {self.board_id} ({self.model}) initialization') self.connection = kwargs.pop('connection', None) cabling = kwargs.pop('cabling', None) self.cabling = {} @@ -155,7 +151,7 @@ class MuxAbstract(ABC): """ status = True if elec_dict is not None: - self.exec_logger.debug(f'Switching {self.board_name} ') + self.exec_logger.debug(f'Switching {self.model} ') # check to prevent A == B (SHORT-CIRCUIT) if 'A' in elec_dict.keys() and 'B' in elec_dict.keys(): out = np.in1d(elec_dict['A'], elec_dict['B']) @@ -201,7 +197,7 @@ class MuxAbstract(ABC): self.exec_logger.debug(f'Barrier error {self.board_id} switching aborted.') status = False else: - self.exec_logger.warning(f'Missing argument for {self.board_name}.switch: elec_dict is None.') + self.exec_logger.warning(f'Missing argument for {self.model}.switch: elec_dict is None.') status = False if state == 'on': time.sleep(self._activation_delay) @@ -232,7 +228,7 @@ class MuxAbstract(ABC): activation_time : float, optional Time in seconds during which the relays are activated. """ - self.exec_logger.debug(f'Starting {self.board_name} test...') + self.exec_logger.debug(f'Starting {self.model} test...') self.reset() for role in elec_dict.keys(): @@ -246,7 +242,7 @@ class MuxAbstract(ABC): class TxAbstract(ABC): def __init__(self, **kwargs): - self.board_name = kwargs.pop('board_name', 'unknown TX hardware') + self.model = kwargs.pop('model', 'unknown TX hardware') injection_duration = kwargs.pop('injection_duration', 1.) self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: @@ -262,7 +258,7 @@ class TxAbstract(ABC): self.injection_duration = injection_duration self._latency = kwargs.pop('latency', 0.) self.tx_sync = kwargs.pop('tx_sync', Event()) - self.exec_logger.debug(f'{self.board_name} TX initialization') + self.exec_logger.debug(f'{self.model} TX initialization') @property def adc_gain(self): @@ -297,6 +293,8 @@ class TxAbstract(ABC): Injection polarity, can be eiter 1, 0 or -1 injection_duration: float, default None Injection duration in seconds + switch_pwr: bool + switches on and off tx.pwr """ assert polarity in [-1, 0, 1] if injection_duration is None: @@ -316,7 +314,6 @@ class TxAbstract(ABC): time.sleep(injection_duration) self.tx_sync.clear() - @property def injection_duration(self): return self._injection_duration @@ -377,9 +374,9 @@ class RxAbstract(ABC): if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_rx') self.connection = kwargs.pop('connection', None) - self.board_name = kwargs.pop('board_name', 'unknown RX hardware') + self.model = kwargs.pop('model', 'unknown RX hardware') self._sampling_rate = kwargs.pop('sampling_rate', 1) # ms - self.exec_logger.debug(f'{self.board_name} RX initialization') + self.exec_logger.debug(f'{self.model} RX initialization') self._voltage_max = kwargs.pop('voltage_max', 0.) self._adc_gain = 1. self._max_sampling_rate = np.inf diff --git a/ohmpi/hardware_components/dummy_ctl.py b/ohmpi/hardware_components/dummy_ctl.py index 9eeb98e0a4ee56f075ccc061c2feb58e13cfba90..0045bd726c551db024d5362b75182eeaed327ad0 100644 --- a/ohmpi/hardware_components/dummy_ctl.py +++ b/ohmpi/hardware_components/dummy_ctl.py @@ -6,5 +6,5 @@ CTL_CONFIG = HARDWARE_CONFIG['ctl'] class Ctl(CtlAbstract): def __init__(self, **kwargs): - kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) + kwargs.update({'model': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) diff --git a/ohmpi/hardware_components/dummy_mux.py b/ohmpi/hardware_components/dummy_mux.py index 21468e2dccbf7385f62afcbb923c5201db22653a..bb8a5a7d118aa5d132ffe6bb6aeaaba33066767c 100644 --- a/ohmpi/hardware_components/dummy_mux.py +++ b/ohmpi/hardware_components/dummy_mux.py @@ -6,7 +6,6 @@ MUX_CONFIG = HARDWARE_CONFIG['mux'].pop('default', {}) class Mux(MuxAbstract): def __init__(self, **kwargs): - kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) def _get_addresses(self): diff --git a/ohmpi/hardware_components/dummy_rx.py b/ohmpi/hardware_components/dummy_rx.py index 386b193ced9411f0b048305e42910c08d195b2bd..249a2e49170661195662736ed2abe2f65a054345 100644 --- a/ohmpi/hardware_components/dummy_rx.py +++ b/ohmpi/hardware_components/dummy_rx.py @@ -12,9 +12,9 @@ voltage_adc_voltage_max = 4500. RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV + class Rx(RxAbstract): def __init__(self, **kwargs): - kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) self._adc_gain = 1. diff --git a/ohmpi/hardware_components/dummy_tx.py b/ohmpi/hardware_components/dummy_tx.py index 067c1465978814876e4dba25e64f182b83d1d150..cef80c8e7dfdfb386487add40f79e50579fda93c 100644 --- a/ohmpi/hardware_components/dummy_tx.py +++ b/ohmpi/hardware_components/dummy_tx.py @@ -28,7 +28,6 @@ class Tx(TxAbstract): pass def __init__(self, **kwargs): - kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage']) diff --git a/ohmpi/hardware_components/mb_2023_0_X.py b/ohmpi/hardware_components/mb_2023_0_X.py index 1b67160bccd53068353cee97a10fe886726e9661..c985301aae721d887691384b800c163900230347 100644 --- a/ohmpi/hardware_components/mb_2023_0_X.py +++ b/ohmpi/hardware_components/mb_2023_0_X.py @@ -6,15 +6,14 @@ from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa from digitalio import Direction # noqa from busio import I2C # noqa import time -import numpy as np import os from ohmpi.hardware_components import TxAbstract, RxAbstract from ohmpi.utils import enforce_specs -import inspect # hardware characteristics and limitations # voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s -SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, +SPECS = {'rx': {'model': {'default': os.path.basename(__file__).rstrip('.py')}, + 'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, 'data_rate': {'default': 860.}, 'bias': {'min': -5000., 'default': 0., 'max': 5000.}, 'coef_p2': {'default': 2.50}, @@ -23,7 +22,8 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, 'voltage_min': {'default': 10.0}, 'vmn_hardware_offset': {'default': 0.} }, - 'tx': {'adc_voltage_min': {'default': 10.}, # Minimum voltage value used in vmin strategy + 'tx': {'model': {'default': os.path.basename(__file__).rstrip('.py')}, + 'adc_voltage_min': {'default': 10.}, # Minimum voltage value used in vmin strategy 'adc_voltage_max': {'default': 4500.}, # Maximum voltage on ads1115 used to measure current 'voltage_max': {'min': 0., 'default': 12., 'max': 12.}, # Maximum input voltage 'data_rate': {'default': 860.}, @@ -66,9 +66,12 @@ def _ads_1115_gain_auto(channel): # Make it a class method ? class Tx(TxAbstract): def __init__(self, **kwargs): - for key in SPECS['tx'].keys(): - kwargs = enforce_specs(kwargs, SPECS['tx'], key) - kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) + if kwargs['model'] == os.path.basename(__file__).rstrip('.py'): + for key in SPECS['tx'].keys(): + kwargs = enforce_specs(kwargs, SPECS['tx'], key) + subclass_init = False + else: + subclass_init = True super().__init__(**kwargs) assert isinstance(self.connection, I2C) kwargs.update({'pwr': kwargs.pop('pwr', SPECS['tx']['compatible_power_sources']['default'])}) @@ -76,7 +79,7 @@ class Tx(TxAbstract): and kwargs['pwr'] not in SPECS['tx']['compatible_power_sources']['other']): self.exec_logger.warning(f'Incompatible power source specified check config') assert kwargs['pwr'] in SPECS['tx'] - self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_init\tbegin\t{datetime.datetime.utcnow()}') # self.voltage_max = kwargs['voltage_max'] # TODO: check if used self._activation_delay = kwargs['activation_delay'] self._release_delay = kwargs['release_delay'] @@ -102,8 +105,8 @@ class Tx(TxAbstract): self.pin1.direction = Direction.OUTPUT self.polarity = 0 self.gain = 2 / 3 - - self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}') + if not subclass_init: + self.exec_logger.event(f'{self.model}\ttx_init\tend\t{datetime.datetime.utcnow()}') @property def gain(self): @@ -120,15 +123,15 @@ class Tx(TxAbstract): self.exec_logger.debug(f'Setting TX ADC gain to {value}') def _adc_gain_auto(self): - self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') gain = _ads_1115_gain_auto(AnalogIn(self._ads_current, ads.P0)) self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}') self.gain = gain - self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') def current_pulse(self, **kwargs): TxAbstract.current_pulse(self, **kwargs) - self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') + self.exec_logger.warning(f'Current pulse is not implemented for the {self.model} board') @property def current(self): @@ -141,7 +144,7 @@ class Tx(TxAbstract): @ current.setter def current(self, value): assert self.adc_voltage_min / (50 * self.r_shunt) <= value <= self.adc_voltage_max / (50 * self.r_shunt) - self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') + self.exec_logger.warning(f'Current pulse is not implemented for the {self.model} board') def gain_auto(self): self._adc_gain_auto() @@ -179,8 +182,8 @@ class Tx(TxAbstract): @property def tx_bat(self): - self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}') - self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.') + self.soh_logger.warning(f'Cannot get battery voltage on {self.model}') + self.exec_logger.debug(f'{self.model} cannot read battery voltage. Returning default battery voltage.') return self.pwr.voltage def voltage_pulse(self, voltage=None, length=None, polarity=1): @@ -195,7 +198,7 @@ class Tx(TxAbstract): polarity: 1,0,-1 Polarity of the pulse """ - self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_voltage_pulse\tbegin\t{datetime.datetime.utcnow()}') # self.exec_logger.info(f'injection_duration: {length}') # TODO: delete me if length is None: length = self.injection_duration @@ -203,18 +206,22 @@ class Tx(TxAbstract): self.pwr.voltage = voltage self.exec_logger.debug(f'Voltage pulse of {polarity*self.pwr.voltage:.3f} V for {length:.3f} s') self.inject(polarity=polarity, injection_duration=length) - self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_voltage_pulse\tend\t{datetime.datetime.utcnow()}') class Rx(RxAbstract): def __init__(self, **kwargs): - for key in SPECS['rx'].keys(): - kwargs = enforce_specs(kwargs, SPECS['rx'], key) + if kwargs['model'] == os.path.basename(__file__).rstrip('.py'): + for key in SPECS['rx'].keys(): + kwargs = enforce_specs(kwargs, SPECS['rx'], key) + subclass_init = False + else: + subclass_init = True kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) assert isinstance(self.connection, I2C) - self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_init\tbegin\t{datetime.datetime.utcnow()}') # ADS1115 for voltage measurement (MN) self._ads_voltage_address = kwargs['ads_address'] @@ -227,7 +234,8 @@ class Rx(RxAbstract): # self._voltage_max = kwargs['voltage_max'] self._sampling_rate = kwargs['sampling_rate'] self._bias = kwargs['bias'] - self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}') + if not subclass_init: + self.exec_logger.event(f'{self.model}\trx_init\tend\t{datetime.datetime.utcnow()}') @property def gain(self): @@ -244,11 +252,11 @@ class Rx(RxAbstract): self.exec_logger.debug(f'Setting RX ADC gain to {value}') def _adc_gain_auto(self): - self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') gain = _ads_1115_gain_auto(AnalogIn(self._ads_voltage, ads.P0, ads.P1)) self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}') self.gain = gain - self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') def gain_auto(self): self._adc_gain_auto() @@ -256,7 +264,7 @@ class Rx(RxAbstract): def voltage(self): """ Gets the voltage VMN in Volts """ - self.exec_logger.event(f'{self.board_name}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}') u = -AnalogIn(self._ads_voltage, ads.P0, ads.P1).voltage * self._coef_p2 * 1000. - self._bias # TODO: check if it should be negated - self.exec_logger.event(f'{self.board_name}\trx_voltage\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_voltage\tend\t{datetime.datetime.utcnow()}') return u diff --git a/ohmpi/hardware_components/mb_2024_0_2.py b/ohmpi/hardware_components/mb_2024_0_2.py index 9bb65d72ac37d8041f043afcc6c3d3b48132d1de..52113afecfd627a8d3ee6b2dd2a73e4220779af7 100644 --- a/ohmpi/hardware_components/mb_2024_0_2.py +++ b/ohmpi/hardware_components/mb_2024_0_2.py @@ -5,12 +5,15 @@ from adafruit_ads1x15.ads1x15 import Mode # noqa from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa from digitalio import Direction # noqa from busio import I2C # noqa +import os +from ohmpi.utils import enforce_specs from ohmpi.hardware_components.mb_2023_0_X import Tx as Tx_mb_2023 from ohmpi.hardware_components.mb_2023_0_X import Rx as Rx_mb_2023 # hardware characteristics and limitations # voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s -SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, +SPECS = {'rx': {'model': {'default': os.path.basename(__file__).rstrip('.py')}, + 'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, 'data_rate': {'default': 860.}, 'bias': {'min': -5000., 'default': 0., 'max': 5000.}, 'coef_p2': {'default': 1.00}, @@ -19,7 +22,8 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, 'voltage_min': {'default': 10.0}, 'vmn_hardware_offset': {'default': 2500.}, }, - 'tx': {'adc_voltage_min': {'default': 10.}, # Minimum voltage value used in vmin strategy + 'tx': {'model': {'default': os.path.basename(__file__).rstrip('.py')}, + 'adc_voltage_min': {'default': 10.}, # Minimum voltage value used in vmin strategy 'adc_voltage_max': {'default': 4500.}, # Maximum voltage on ads1115 used to measure current 'voltage_max': {'min': 0., 'default': 12., 'max': 12.}, # Maximum input voltage 'data_rate': {'default': 860.}, @@ -62,9 +66,13 @@ def _ads_1115_gain_auto(channel): # Make it a class method ? class Tx(Tx_mb_2023): def __init__(self, **kwargs): + if kwargs['model'] == os.path.basename(__file__).rstrip('.py'): + for key in SPECS['tx'].keys(): + kwargs = enforce_specs(kwargs, SPECS['tx'], key) + subclass_init = False + else: + subclass_init = True super().__init__(**kwargs) - # I2C connexion to MCP23008, for current injection - # self.mcp_board = MCP23008(self.connection, address=kwargs['mcp_address']) # Initialize LEDs self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run @@ -73,7 +81,8 @@ class Tx(Tx_mb_2023): self.pin6 = self.mcp_board.get_pin(6) self.pin6.direction = Direction.OUTPUT self.pin6.value = False - self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}') + if not subclass_init: + self.exec_logger.event(f'{self.model}\ttx_init\tend\t{datetime.datetime.utcnow()}') def inject(self, polarity=1, injection_duration=None): # add leds? @@ -84,8 +93,14 @@ class Tx(Tx_mb_2023): class Rx(Rx_mb_2023): def __init__(self, **kwargs): + if kwargs['model'] == os.path.basename(__file__).rstrip('.py'): + for key in SPECS['rx'].keys(): + kwargs = enforce_specs(kwargs, SPECS['rx'], key) + subclass_init = False + else: + subclass_init = True super().__init__(**kwargs) - # I2C connexion to MCP23008, for voltage + # I2C connection to MCP23008, for voltage self.mcp_board = MCP23008(self.connection, address=kwargs['mcp_address']) # ADS1115 for voltage measurement (MN) self._coef_p2 = 1. @@ -102,15 +117,15 @@ class Rx(Rx_mb_2023): self.pin_DG1.value = True # open gain 1 inactive self.pin_DG2.value = False # close gain 0.5 active self.gain = 1/3 - # TODO: try to only log this event and not the one created by super() - self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}') + if not subclass_init: # TODO: try to only log this event and not the one created by super() + self.exec_logger.event(f'{self.model}\trx_init\tend\t{datetime.datetime.utcnow()}') def _adc_gain_auto(self): - self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') gain = _ads_1115_gain_auto(AnalogIn(self._ads_voltage, ads.P0)) self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}') self.gain = gain - self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') def _dg411_gain_auto(self): if self.voltage < self._vmn_hardware_offset : @@ -140,7 +155,7 @@ class Rx(Rx_mb_2023): def voltage(self): """ Gets the voltage VMN in Volts """ - self.exec_logger.event(f'{self.board_name}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}') u = (AnalogIn(self._ads_voltage, ads.P0).voltage * self._coef_p2 * 1000. - self._vmn_hardware_offset) / self._dg411_gain - self._bias # TODO: check how to handle bias and _vmn_hardware_offset - self.exec_logger.event(f'{self.board_name}\trx_voltage\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_voltage\tend\t{datetime.datetime.utcnow()}') return u diff --git a/ohmpi/hardware_components/mb_2024_0_2_bkp.py b/ohmpi/hardware_components/mb_2024_0_2_bkp.py index dc8adbfdd1e02c70275617d6df79562be85a3de3..9b750376a87a62eceff387e61321809a5999bef6 100644 --- a/ohmpi/hardware_components/mb_2024_0_2_bkp.py +++ b/ohmpi/hardware_components/mb_2024_0_2_bkp.py @@ -122,7 +122,7 @@ class Tx(TxAbstract): self.exec_logger.warning(f'Incompatible power source specified check config') assert kwargs['pwr'] in SPECS['tx'] # self.pwr = None # TODO: set a list of compatible power system with the tx - self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_init\tbegin\t{datetime.datetime.utcnow()}') # self.voltage_max = kwargs['voltage_max'] # TODO: check if used self._activation_delay = kwargs['activation_delay'] self._release_delay = kwargs['release_delay'] @@ -156,7 +156,7 @@ class Tx(TxAbstract): self._latency = kwargs.pop('latency', TX_CONFIG['latency']) self._bias = kwargs.pop('bias', TX_CONFIG['bias']) - self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_init\tend\t{datetime.datetime.utcnow()}') @property def gain(self): @@ -173,15 +173,15 @@ class Tx(TxAbstract): self.exec_logger.debug(f'Setting TX ADC gain to {value}') def _adc_gain_auto(self): - self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') gain = _gain_auto(AnalogIn(self._ads_current, ads.P0)) self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}') self.gain = gain - self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') def current_pulse(self, **kwargs): TxAbstract.current_pulse(self, **kwargs) - self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') + self.exec_logger.warning(f'Current pulse is not implemented for the {self.model} board') @property def current(self): @@ -194,7 +194,7 @@ class Tx(TxAbstract): @ current.setter def current(self, value): assert self.adc_voltage_min / (50 * self.r_shunt) <= value <= self.adc_voltage_max / (50 * self.r_shunt) - self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') + self.exec_logger.warning(f'Current pulse is not implemented for the {self.model} board') def gain_auto(self): self._adc_gain_auto() @@ -232,8 +232,8 @@ class Tx(TxAbstract): @property def tx_bat(self): - self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}') - self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.') + self.soh_logger.warning(f'Cannot get battery voltage on {self.model}') + self.exec_logger.debug(f'{self.model} cannot read battery voltage. Returning default battery voltage.') return self.pwr.voltage def voltage_pulse(self, voltage=None, length=None, polarity=1): @@ -248,7 +248,7 @@ class Tx(TxAbstract): polarity: 1,0,-1 Polarity of the pulse """ - self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_voltage_pulse\tbegin\t{datetime.datetime.utcnow()}') # self.exec_logger.info(f'injection_duration: {length}') # TODO: delete me if length is None: length = self.injection_duration @@ -256,7 +256,7 @@ class Tx(TxAbstract): self.pwr.voltage = voltage self.exec_logger.debug(f'Voltage pulse of {polarity*self.pwr.voltage:.3f} V for {length:.3f} s') self.inject(polarity=polarity, injection_duration=length) - self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\ttx_voltage_pulse\tend\t{datetime.datetime.utcnow()}') class Rx(RxAbstract): @@ -267,7 +267,7 @@ class Rx(RxAbstract): super().__init__(**kwargs) assert isinstance(self.connection, I2C) - self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_init\tbegin\t{datetime.datetime.utcnow()}') # I2C connexion to MCP23008, for DG411 self.mcp_board = MCP23008(self.connection, address=0x27) @@ -283,7 +283,7 @@ class Rx(RxAbstract): # self._voltage_max = kwargs['voltage_max'] self._sampling_rate = kwargs['sampling_rate'] self._bias = kwargs['bias'] - self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_init\tend\t{datetime.datetime.utcnow()}') self.pin_DG0 = self.mcp_board.get_pin(0) self.pin_DG0.direction = Direction.OUTPUT @@ -312,13 +312,13 @@ class Rx(RxAbstract): self.exec_logger.debug(f'Setting RX ADC gain to {value}') def _adc_gain_auto(self): - self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') gain_0 = _gain_auto(AnalogIn(self._ads_voltage, ads.P0)) gain_2 = _gain_auto(AnalogIn(self._ads_voltage, ads.P2)) gain = np.min([gain_0, gain_2]) self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}') self.gain = gain - self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') def gain_auto(self): self._adc_gain_auto() @@ -326,9 +326,9 @@ class Rx(RxAbstract): def voltage(self): """ Gets the voltage VMN in Volts """ - self.exec_logger.event(f'{self.board_name}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}') u = -AnalogIn(self._ads_voltage, ads.P0, ads.P1).voltage * self._coef_p2 * 1000. - self._bias # TODO: check if it should be negated - self.exec_logger.event(f'{self.board_name}\trx_voltage\tend\t{datetime.datetime.utcnow()}') + self.exec_logger.event(f'{self.model}\trx_voltage\tend\t{datetime.datetime.utcnow()}') return u @property diff --git a/ohmpi/hardware_components/mux_2023_0_X.py b/ohmpi/hardware_components/mux_2023_0_X.py index 0686a044a8483fa35d85391022d672ab070b7e6b..7e380ee8cf16e0360b4b00d64d93ec7a902b8080 100644 --- a/ohmpi/hardware_components/mux_2023_0_X.py +++ b/ohmpi/hardware_components/mux_2023_0_X.py @@ -5,72 +5,77 @@ import adafruit_tca9548a # noqa from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa from digitalio import Direction # noqa from busio import I2C # noqa +from ohmpi.utils import enforce_specs # hardware characteristics and limitations -SPECS = {'voltage_max': 50., 'current_max': 3., 'activation_delay': 0.01, 'release_delay': 0.005} +SPECS = {'model': {'default': os.path.basename(__file__).rstrip('.py')}, + 'voltage_max': {'default': 50.}, + 'current_max': {'default': 3.}, + 'activation_delay': {'default': 0.01}, + 'release_delay': {'default': 0.005}, + 'tca_address': {'default': 0x70} + } -default_mux_cabling = {(elec, role) : ('mux_1', elec) for role in ['A', 'B', 'M', 'N'] for elec in range(1,9)} # defaults to 4 roles cabling electrodes from 1 to 8 +default_mux_cabling = {(elec, role): ('mux_1', elec) for role in ['A', 'B', 'M', 'N'] for elec in + range(1, 9)} # defaults to 4 roles cabling electrodes from 1 to 8 + +inner_cabling = {'1_role': {(1, 'X'): {'MCP': 0, 'MCP_GPIO': 0}, (2, 'X'): {'MCP': 0, 'MCP_GPIO': 1}, + (3, 'X'): {'MCP': 0, 'MCP_GPIO': 2}, (4, 'X'): {'MCP': 0, 'MCP_GPIO': 3}, + (5, 'X'): {'MCP': 0, 'MCP_GPIO': 4}, (6, 'X'): {'MCP': 0, 'MCP_GPIO': 5}, + (7, 'X'): {'MCP': 0, 'MCP_GPIO': 6}, (8, 'X'): {'MCP': 0, 'MCP_GPIO': 7}, + (9, 'X'): {'MCP': 0, 'MCP_GPIO': 8}, (10, 'X'): {'MCP': 0, 'MCP_GPIO': 9}, + (11, 'X'): {'MCP': 0, 'MCP_GPIO': 10}, (12, 'X'): {'MCP': 0, 'MCP_GPIO': 11}, + (13, 'X'): {'MCP': 0, 'MCP_GPIO': 12}, (14, 'X'): {'MCP': 0, 'MCP_GPIO': 13}, + (15, 'X'): {'MCP': 0, 'MCP_GPIO': 14}, (16, 'X'): {'MCP': 0, 'MCP_GPIO': 15}, + (17, 'X'): {'MCP': 1, 'MCP_GPIO': 0}, (18, 'X'): {'MCP': 1, 'MCP_GPIO': 1}, + (19, 'X'): {'MCP': 1, 'MCP_GPIO': 2}, (20, 'X'): {'MCP': 1, 'MCP_GPIO': 3}, + (21, 'X'): {'MCP': 1, 'MCP_GPIO': 4}, (22, 'X'): {'MCP': 1, 'MCP_GPIO': 5}, + (23, 'X'): {'MCP': 1, 'MCP_GPIO': 6}, (24, 'X'): {'MCP': 1, 'MCP_GPIO': 7}, + (25, 'X'): {'MCP': 1, 'MCP_GPIO': 8}, (26, 'X'): {'MCP': 1, 'MCP_GPIO': 9}, + (27, 'X'): {'MCP': 1, 'MCP_GPIO': 10}, (28, 'X'): {'MCP': 1, 'MCP_GPIO': 11}, + (29, 'X'): {'MCP': 1, 'MCP_GPIO': 12}, (30, 'X'): {'MCP': 1, 'MCP_GPIO': 13}, + (31, 'X'): {'MCP': 1, 'MCP_GPIO': 14}, (32, 'X'): {'MCP': 1, 'MCP_GPIO': 15}, + (33, 'X'): {'MCP': 2, 'MCP_GPIO': 0}, (34, 'X'): {'MCP': 2, 'MCP_GPIO': 1}, + (35, 'X'): {'MCP': 2, 'MCP_GPIO': 2}, (36, 'X'): {'MCP': 2, 'MCP_GPIO': 3}, + (37, 'X'): {'MCP': 2, 'MCP_GPIO': 4}, (38, 'X'): {'MCP': 2, 'MCP_GPIO': 5}, + (39, 'X'): {'MCP': 2, 'MCP_GPIO': 6}, (40, 'X'): {'MCP': 2, 'MCP_GPIO': 7}, + (41, 'X'): {'MCP': 2, 'MCP_GPIO': 8}, (42, 'X'): {'MCP': 2, 'MCP_GPIO': 9}, + (43, 'X'): {'MCP': 2, 'MCP_GPIO': 10}, (44, 'X'): {'MCP': 2, 'MCP_GPIO': 11}, + (45, 'X'): {'MCP': 2, 'MCP_GPIO': 12}, (46, 'X'): {'MCP': 2, 'MCP_GPIO': 13}, + (47, 'X'): {'MCP': 2, 'MCP_GPIO': 14}, (48, 'X'): {'MCP': 2, 'MCP_GPIO': 15}, + (49, 'X'): {'MCP': 3, 'MCP_GPIO': 0}, (50, 'X'): {'MCP': 3, 'MCP_GPIO': 1}, + (51, 'X'): {'MCP': 3, 'MCP_GPIO': 2}, (52, 'X'): {'MCP': 3, 'MCP_GPIO': 3}, + (53, 'X'): {'MCP': 3, 'MCP_GPIO': 4}, (54, 'X'): {'MCP': 3, 'MCP_GPIO': 5}, + (55, 'X'): {'MCP': 3, 'MCP_GPIO': 6}, (56, 'X'): {'MCP': 3, 'MCP_GPIO': 7}, + (57, 'X'): {'MCP': 3, 'MCP_GPIO': 8}, (58, 'X'): {'MCP': 3, 'MCP_GPIO': 9}, + (59, 'X'): {'MCP': 3, 'MCP_GPIO': 10}, (60, 'X'): {'MCP': 3, 'MCP_GPIO': 11}, + (61, 'X'): {'MCP': 3, 'MCP_GPIO': 12}, (62, 'X'): {'MCP': 3, 'MCP_GPIO': 13}, + (63, 'X'): {'MCP': 3, 'MCP_GPIO': 14}, (64, 'X'): {'MCP': 3, 'MCP_GPIO': 15} + } + } -inner_cabling = {'1_role' : {(1, 'X'): {'MCP': 0, 'MCP_GPIO': 0}, (2, 'X'): {'MCP': 0, 'MCP_GPIO': 1}, - (3, 'X'): {'MCP': 0, 'MCP_GPIO': 2}, (4, 'X'): {'MCP': 0, 'MCP_GPIO': 3}, - (5, 'X'): {'MCP': 0, 'MCP_GPIO': 4}, (6, 'X'): {'MCP': 0, 'MCP_GPIO': 5}, - (7, 'X'): {'MCP': 0, 'MCP_GPIO': 6}, (8, 'X'): {'MCP': 0, 'MCP_GPIO': 7}, - (9, 'X'): {'MCP': 0, 'MCP_GPIO': 8}, (10, 'X'): {'MCP': 0, 'MCP_GPIO': 9}, - (11, 'X'): {'MCP': 0, 'MCP_GPIO': 10}, (12, 'X'): {'MCP': 0, 'MCP_GPIO': 11}, - (13, 'X'): {'MCP': 0, 'MCP_GPIO': 12}, (14, 'X'): {'MCP': 0, 'MCP_GPIO': 13}, - (15, 'X'): {'MCP': 0, 'MCP_GPIO': 14}, (16, 'X'): {'MCP': 0, 'MCP_GPIO': 15}, - (17, 'X'): {'MCP': 1, 'MCP_GPIO': 0}, (18, 'X'): {'MCP': 1, 'MCP_GPIO': 1}, - (19, 'X'): {'MCP': 1, 'MCP_GPIO': 2}, (20, 'X'): {'MCP': 1, 'MCP_GPIO': 3}, - (21, 'X'): {'MCP': 1, 'MCP_GPIO': 4}, (22, 'X'): {'MCP': 1, 'MCP_GPIO': 5}, - (23, 'X'): {'MCP': 1, 'MCP_GPIO': 6}, (24, 'X'): {'MCP': 1, 'MCP_GPIO': 7}, - (25, 'X'): {'MCP': 1, 'MCP_GPIO': 8}, (26, 'X'): {'MCP': 1, 'MCP_GPIO': 9}, - (27, 'X'): {'MCP': 1, 'MCP_GPIO': 10}, (28, 'X'): {'MCP': 1, 'MCP_GPIO': 11}, - (29, 'X'): {'MCP': 1, 'MCP_GPIO': 12}, (30, 'X'): {'MCP': 1, 'MCP_GPIO': 13}, - (31, 'X'): {'MCP': 1, 'MCP_GPIO': 14}, (32, 'X'): {'MCP': 1, 'MCP_GPIO': 15}, - (33, 'X'): {'MCP': 2, 'MCP_GPIO': 0}, (34, 'X'): {'MCP': 2, 'MCP_GPIO': 1}, - (35, 'X'): {'MCP': 2, 'MCP_GPIO': 2}, (36, 'X'): {'MCP': 2, 'MCP_GPIO': 3}, - (37, 'X'): {'MCP': 2, 'MCP_GPIO': 4}, (38, 'X'): {'MCP': 2, 'MCP_GPIO': 5}, - (39, 'X'): {'MCP': 2, 'MCP_GPIO': 6}, (40, 'X'): {'MCP': 2, 'MCP_GPIO': 7}, - (41, 'X'): {'MCP': 2, 'MCP_GPIO': 8}, (42, 'X'): {'MCP': 2, 'MCP_GPIO': 9}, - (43, 'X'): {'MCP': 2, 'MCP_GPIO': 10}, (44, 'X'): {'MCP': 2, 'MCP_GPIO': 11}, - (45, 'X'): {'MCP': 2, 'MCP_GPIO': 12}, (46, 'X'): {'MCP': 2, 'MCP_GPIO': 13}, - (47, 'X'): {'MCP': 2, 'MCP_GPIO': 14}, (48, 'X'): {'MCP': 2, 'MCP_GPIO': 15}, - (49, 'X'): {'MCP': 3, 'MCP_GPIO': 0}, (50, 'X'): {'MCP': 3, 'MCP_GPIO': 1}, - (51, 'X'): {'MCP': 3, 'MCP_GPIO': 2}, (52, 'X'): {'MCP': 3, 'MCP_GPIO': 3}, - (53, 'X'): {'MCP': 3, 'MCP_GPIO': 4}, (54, 'X'): {'MCP': 3, 'MCP_GPIO': 5}, - (55, 'X'): {'MCP': 3, 'MCP_GPIO': 6}, (56, 'X'): {'MCP': 3, 'MCP_GPIO': 7}, - (57, 'X'): {'MCP': 3, 'MCP_GPIO': 8}, (58, 'X'): {'MCP': 3, 'MCP_GPIO': 9}, - (59, 'X'): {'MCP': 3, 'MCP_GPIO': 10}, (60, 'X'): {'MCP': 3, 'MCP_GPIO': 11}, - (61, 'X'): {'MCP': 3, 'MCP_GPIO': 12}, (62, 'X'): {'MCP': 3, 'MCP_GPIO': 13}, - (63, 'X'): {'MCP': 3, 'MCP_GPIO': 14}, (64, 'X'): {'MCP': 3, 'MCP_GPIO': 15} -}} class Mux(MuxAbstract): def __init__(self, **kwargs): - kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) + if kwargs['model'] == os.path.basename(__file__).rstrip('.py'): + for key in SPECS.keys(): + kwargs = enforce_specs(kwargs, SPECS, key) + subclass_init = False + else: + subclass_init = True kwargs.update({'cabling': kwargs.pop('cabling', default_mux_cabling)}) - kwargs.update({'activation_delay': max(kwargs.pop('activation_delay', SPECS['activation_delay']), - SPECS['activation_delay'])}) - kwargs.update({'release_delay': max(kwargs.pop('release_delay', SPECS['release_delay']), - SPECS['activation_delay'])}) - kwargs.update({'voltage_max': max(0., min(kwargs.pop('voltage_max', SPECS['voltage_max']), - SPECS['voltage_max']))}) - kwargs.update({'current_max': max(0., min(kwargs.pop('current_max', SPECS['current_max']), - SPECS['current_max']))}) super().__init__(**kwargs) assert isinstance(self.connection, I2C) self.exec_logger.debug(f'configuration: {kwargs}') - tca_address = kwargs.pop('tca_address', 0x70) - # tca_channel = kwargs.pop('tca_channel', 0) self._roles = kwargs.pop('roles', None) if self._roles is None: - self._roles = {'A': 'X'} # NOTE: defaults to 4-roles + self._roles = {'A': 'X'} # NOTE: defaults to 1-role if np.alltrue([j in self._roles.values() for j in set([i[1] for i in list(inner_cabling['1_role'].keys())])]): self._mode = '1_role' else: - self.exec_logger.error(f'Invalid role assignment for {self.board_name}: {self._roles} !') + self.exec_logger.error(f'Invalid role assignment for {self.model}: {self._roles} !') self._mode = '' - self._tca = [adafruit_tca9548a.TCA9548A(self.connection, tca_address)[i] for i in np.arange(7, 3, -1)] + self._tca = [adafruit_tca9548a.TCA9548A(self.connection, kwargs['tca_address'])[i] for i in np.arange(7, 3, -1)] # self._mcp_addresses = (kwargs.pop('mcp', '0x20')) # TODO: add assert on valid addresses.. self._mcp = [None, None, None, None] self.reset() @@ -104,7 +109,5 @@ class Mux(MuxAbstract): d = self.addresses[elec, role] if state == 'on': activate_relay(self._mcp[d['MCP']], d['MCP_GPIO'], True) - # time.sleep(MUX_CONFIG['activation_delay']) # NOTE: moved to MuxAbstract switch if state == 'off': activate_relay(self._mcp[d['MCP']], d['MCP_GPIO'], False) - # time.sleep(MUX_CONFIG['release_delay']) # NOTE: moved to MuxAbstract switch diff --git a/ohmpi/hardware_components/mux_2024_0_X.py b/ohmpi/hardware_components/mux_2024_0_X.py index 54158a4344140b76f66fdb7c30bbed630b57c36f..db82f198af23ac80650570a92fe09ff298a31fc4 100644 --- a/ohmpi/hardware_components/mux_2024_0_X.py +++ b/ohmpi/hardware_components/mux_2024_0_X.py @@ -1,20 +1,26 @@ -from ohmpi.config import HARDWARE_CONFIG import os +import datetime import numpy as np from ohmpi.hardware_components import MuxAbstract import adafruit_tca9548a # noqa from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa from digitalio import Direction # noqa from busio import I2C # noqa -# import time +from ohmpi.utils import enforce_specs # hardware characteristics and limitations -SPECS = {'voltage_max': 50., 'current_max': 3., 'activation_delay': 0.01, 'release_delay': 0.005} +SPECS = {'model': {'default': os.path.basename(__file__).rstrip('.py')}, + 'id' : {'default': 'mux_??'}, + 'voltage_max': {'default': 50.}, + 'current_max': {'default': 3.}, + 'activation_delay': {'default': 0.01}, + 'release_delay': {'default': 0.005} + } # defaults to 4 roles cabling electrodes from 1 to 8 -default_mux_cabling = {(elec, role) : ('mux_1', elec) for role in ['A', 'B', 'M', 'N'] for elec in range(1,9)} +default_mux_cabling = {(elec, role): ('mux_1', elec) for role in ['A', 'B', 'M', 'N'] for elec in range(1, 9)} -inner_cabling = {'4_roles' : {(1, 'X'): {'MCP': 0, 'MCP_GPIO': 0}, (1, 'Y'): {'MCP': 0, 'MCP_GPIO': 8}, +inner_cabling = {'4_roles': {(1, 'X'): {'MCP': 0, 'MCP_GPIO': 0}, (1, 'Y'): {'MCP': 0, 'MCP_GPIO': 8}, (2, 'X'): {'MCP': 0, 'MCP_GPIO': 1}, (2, 'Y'): {'MCP': 0, 'MCP_GPIO': 9}, (3, 'X'): {'MCP': 0, 'MCP_GPIO': 2}, (3, 'Y'): {'MCP': 0, 'MCP_GPIO': 10}, (4, 'X'): {'MCP': 0, 'MCP_GPIO': 3}, (4, 'Y'): {'MCP': 0, 'MCP_GPIO': 11}, @@ -30,7 +36,7 @@ inner_cabling = {'4_roles' : {(1, 'X'): {'MCP': 0, 'MCP_GPIO': 0}, (1, 'Y'): {'M (6, 'XX'): {'MCP': 1, 'MCP_GPIO': 2}, (6, 'YY'): {'MCP': 1, 'MCP_GPIO': 10}, (7, 'XX'): {'MCP': 1, 'MCP_GPIO': 1}, (7, 'YY'): {'MCP': 1, 'MCP_GPIO': 9}, (8, 'XX'): {'MCP': 1, 'MCP_GPIO': 0}, (8, 'YY'): {'MCP': 1, 'MCP_GPIO': 8}}, - '2_roles': # TODO: WARNING check 2_roles table, it has not been verified yet !!! + '2_roles': # TODO: WARNING check 2_roles table, it has not been verified yet !!! {(1, 'X'): {'MCP': 0, 'MCP_GPIO': 0}, (1, 'Y'): {'MCP': 0, 'MCP_GPIO': 8}, (2, 'X'): {'MCP': 0, 'MCP_GPIO': 1}, (2, 'Y'): {'MCP': 0, 'MCP_GPIO': 9}, (3, 'X'): {'MCP': 0, 'MCP_GPIO': 2}, (3, 'Y'): {'MCP': 0, 'MCP_GPIO': 10}, @@ -47,21 +53,17 @@ inner_cabling = {'4_roles' : {(1, 'X'): {'MCP': 0, 'MCP_GPIO': 0}, (1, 'Y'): {'M (11, 'X'): {'MCP': 1, 'MCP_GPIO': 2}, (11, 'Y'): {'MCP': 1, 'MCP_GPIO': 10}, (10, 'X'): {'MCP': 1, 'MCP_GPIO': 1}, (10, 'Y'): {'MCP': 1, 'MCP_GPIO': 9}, (9, 'X'): {'MCP': 1, 'MCP_GPIO': 0}, (9, 'Y'): {'MCP': 1, 'MCP_GPIO': 8}} - } + } class Mux(MuxAbstract): def __init__(self, **kwargs): - kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) - kwargs.update({'cabling': kwargs.pop('cabling', default_mux_cabling)}) - kwargs.update({'activation_delay': max(kwargs.pop('activation_delay', SPECS['activation_delay']), - SPECS['activation_delay'])}) - kwargs.update({'release_delay': max(kwargs.pop('release_delay', SPECS['release_delay']), - SPECS['activation_delay'])}) - kwargs.update({'voltage_max': max(0., min(kwargs.pop('voltage_max', SPECS['voltage_max']), - SPECS['voltage_max']))}) - kwargs.update({'current_max': max(0., min(kwargs.pop('current_max', SPECS['current_max']), - SPECS['current_max']))}) + if kwargs['model'] == os.path.basename(__file__).rstrip('.py'): + for key in SPECS.keys(): + kwargs = enforce_specs(kwargs, SPECS, key) + subclass_init = False + else: + subclass_init = True super().__init__(**kwargs) assert isinstance(self.connection, I2C) self.exec_logger.debug(f'configuration: {kwargs}') @@ -75,7 +77,7 @@ class Mux(MuxAbstract): elif np.alltrue([j in self._roles.values() for j in set([i[1] for i in list(inner_cabling['2_roles'].keys())])]): self._mode = '2_roles' else: - self.exec_logger.error(f'Invalid role assignment for {self.board_name}: {self._roles} !') + self.exec_logger.error(f'Invalid role assignment for {self.model}: {self._roles} !') self._mode = '' if tca_address is None: self._tca = self.connection @@ -87,6 +89,8 @@ class Mux(MuxAbstract): if self.addresses is None: self._get_addresses() self.exec_logger.debug(f'{self.board_id} addresses: {self.addresses}') + if not subclass_init: # TODO: try to only log this event and not the one created by super() + self.exec_logger.event(f'{self.model}_{self.board_id}\tmux_init\tend\t{datetime.datetime.utcnow()}') def _get_addresses(self): """ Converts inner cabling addressing into (electrodes, role) addressing """ diff --git a/ohmpi/hardware_components/pwr_batt.py b/ohmpi/hardware_components/pwr_batt.py index dd811385307fb63e0c3b01e4f3f2de8f790f0c5c..614e4fdefd00e04e16da67b368f8030c80ea3a3c 100644 --- a/ohmpi/hardware_components/pwr_batt.py +++ b/ohmpi/hardware_components/pwr_batt.py @@ -20,13 +20,13 @@ class Pwr(PwrAbstract): @current.setter def current(self, value, **kwargs): - self.exec_logger.debug(f'Current cannot be set on {self.board_name}') + self.exec_logger.debug(f'Current cannot be set on {self.model}') def turn_off(self): - self.exec_logger.debug(f'{self.board_name} cannot be turned off') + self.exec_logger.debug(f'{self.model} cannot be turned off') def turn_on(self): - self.exec_logger.debug(f'{self.board_name} is always on') + self.exec_logger.debug(f'{self.model} is always on') @property def voltage(self): diff --git a/ohmpi/hardware_components/pwr_dps5005.py b/ohmpi/hardware_components/pwr_dps5005.py index 7e4d7ceac90172f0944f78f67e5923565e8b3acf..163456d9ad0c8ce9b2d92a10b36f79cb86f3fcd9 100644 --- a/ohmpi/hardware_components/pwr_dps5005.py +++ b/ohmpi/hardware_components/pwr_dps5005.py @@ -40,15 +40,15 @@ class Pwr(PwrAbstract): @current.setter def current(self, value, **kwargs): - self.exec_logger.debug(f'Current cannot be set on {self.board_name}') + self.exec_logger.debug(f'Current cannot be set on {self.model}') def turn_off(self): self.connection.write_register(0x09, 1) - self.exec_logger.debug(f'{self.board_name} is off') + self.exec_logger.debug(f'{self.model} is off') def turn_on(self): self.connection.write_register(0x09, 1) - self.exec_logger.debug(f'{self.board_name} is on') + self.exec_logger.debug(f'{self.model} is on') @property def voltage(self): diff --git a/ohmpi/hardware_system.py b/ohmpi/hardware_system.py index cf8e0f3bae229911f53337cf0bd893847aedef9d..19c0724c58b6de1adc4e541ecb4c717a76524bf4 100644 --- a/ohmpi/hardware_system.py +++ b/ohmpi/hardware_system.py @@ -45,7 +45,8 @@ for k, v in rx_module.SPECS['rx'].items(): current_max = np.min([TX_CONFIG['voltage_max']/50/TX_CONFIG['r_shunt'], # TODO: replace 50 by a TX config np.min(np.hstack((np.inf, [MUX_CONFIG[i].pop('current_max', np.inf) for i in MUX_CONFIG.keys()])))]) -voltage_max = np.min([TX_CONFIG['voltage_max'], np.min(np.hstack((np.inf, [MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))]) +voltage_max = np.min([TX_CONFIG['voltage_max'], + np.min(np.hstack((np.inf, [MUX_CONFIG[i].pop('voltage_max', np.inf) for i in MUX_CONFIG.keys()])))]) voltage_min = RX_CONFIG['voltage_min'] @@ -57,16 +58,10 @@ def elapsed_seconds(start_time): class OhmPiHardware: def __init__(self, **kwargs): # OhmPiHardware initialization - self.exec_logger = kwargs.pop('exec_logger', None) + self.exec_logger = kwargs.pop('exec_logger', create_stdout_logger('exec_hw')) self.exec_logger.event(f'OhmPiHardware\tinit\tbegin\t{datetime.datetime.utcnow()}') - if self.exec_logger is None: - self.exec_logger = create_stdout_logger('exec_hw') - self.data_logger = kwargs.pop('exec_logger', None) - if self.data_logger is None: - self.data_logger = create_stdout_logger('data_hw') - self.soh_logger = kwargs.pop('soh_logger', None) - if self.soh_logger is None: - self.soh_logger = create_stdout_logger('soh_hw') + self.data_logger = kwargs.pop('exec_logger', create_stdout_logger('data_hw')) + self.soh_logger = kwargs.pop('soh_logger', create_stdout_logger('soh_hw')) self.tx_sync = Event() # Main Controller initialization