diff --git a/ohmpi/hardware_components/mb_2024_0_2.py b/ohmpi/hardware_components/mb_2024_0_2.py index 0375baf575b6903b348fe07b5a3f974db67b9aa2..dc8adbfdd1e02c70275617d6df79562be85a3de3 100644 --- a/ohmpi/hardware_components/mb_2024_0_2.py +++ b/ohmpi/hardware_components/mb_2024_0_2.py @@ -7,58 +7,80 @@ from adafruit_ads1x15.ads1x15 import Mode # noqa from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa from digitalio import Direction # noqa import minimalmodbus # noqa +from busio import I2C # noqa import time import numpy as np import os from ohmpi.hardware_components import TxAbstract, RxAbstract -ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi') -ctl_connection = HARDWARE_CONFIG['ctl'].pop('connection', 'i2c') -ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}') - -TX_CONFIG = HARDWARE_CONFIG['tx'] -RX_CONFIG = HARDWARE_CONFIG['rx'] +from ohmpi.utils import enforce_specs +# ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi') +# ctl_connection = HARDWARE_CONFIG['ctl'].pop('connection', 'i2c') +# ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}') +# +# TX_CONFIG = HARDWARE_CONFIG['tx'] +# RX_CONFIG = HARDWARE_CONFIG['rx'] # hardware characteristics and limitations -# *** RX *** -# ADC for voltage -voltage_adc_voltage_min = 10. # mV -voltage_adc_voltage_max = 4500. # mV -sampling_rate = 20. # Hz -data_rate = 860. # S/s? -rx_mcp_board_address = 0x27 -RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV -RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV -RX_CONFIG['sampling_rate'] = RX_CONFIG.pop('sampling_rate', sampling_rate) -RX_CONFIG['data_rate'] = RX_CONFIG.pop('data_rate', data_rate) -# RX_CONFIG['coef_p2'] = RX_CONFIG.pop('coef_p2', 2.5) -RX_CONFIG['latency'] = RX_CONFIG.pop('latency', 0.01) -RX_CONFIG['bias'] = RX_CONFIG.pop('bias', 0.) -RX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address) - - -# *** TX *** -# ADC for current -current_adc_voltage_min = 10. # mV -current_adc_voltage_max = 4500. # mV -low_battery = 12. # V (conventional value as it is not measured on this board) -tx_mcp_board_address = 0x21 # -# pwr_voltage_max = 12. # V -# pwr_default_voltage = 12. # V -# pwr_switch_on_warmup = 0. # seconds - -TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50), - TX_CONFIG.pop('current_min', np.inf)]) # mA -TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50), - TX_CONFIG.pop('current_max', np.inf)]) # mA -# TX_CONFIG['voltage_max'] = np.min([pwr_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V -TX_CONFIG['voltage_max'] = TX_CONFIG.pop('voltage_max', np.inf) # V -TX_CONFIG['voltage_min'] = -TX_CONFIG['voltage_max'] # V -TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', np.inf), TX_CONFIG['voltage_max']]) # V -# TX_CONFIG['pwr_switch_on_warm_up'] = TX_CONFIG.pop('pwr_switch_on_warmup', pwr_switch_on_warmup) -TX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address) -TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', low_battery) -TX_CONFIG['latency'] = TX_CONFIG.pop('latency', 0.01) -TX_CONFIG['bias'] = TX_CONFIG.pop('bias', 0.) +# voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s +SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, + 'data_rate': {'default': 860.}, + 'bias': {'min': -5000., 'default': 0., 'max': 5000.}, + 'coef_p2': {'default': 2.50}, + 'voltage_min': {'default': 10.0}, + }, + 'tx': {'adc_voltage_min': {'default': 10.}, # Minimum voltage value used in vmin strategy + 'adc_voltage_max': {'default': 4500.}, # Maximum voltage on ads1115 used to measure current + 'voltage_max': {'min': 0., 'default': 12., 'max': 12.}, # Maximum input voltage + 'data_rate': {'default': 860.}, + 'compatible_power_sources': {'default': 'pwr_batt', 'others' : ['dps5005']}, + 'r_shunt': {'min': 0., 'default': 2. }, + 'activation_delay': {'default': 0.005}, # Max turn on time of 211EH relays = 5ms + 'release_delay': {'default': 0.001}, # Max turn off time of 211EH relays = 1ms + }} + +# TODO: move low_battery spec in pwr +# +# # hardware characteristics and limitations +# # *** RX *** +# # ADC for voltage +# voltage_adc_voltage_min = 10. # mV +# voltage_adc_voltage_max = 4500. # mV +# sampling_rate = 20. # Hz +# data_rate = 860. # S/s? +# rx_mcp_board_address = 0x27 +# RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV +# RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV +# RX_CONFIG['sampling_rate'] = RX_CONFIG.pop('sampling_rate', sampling_rate) +# RX_CONFIG['data_rate'] = RX_CONFIG.pop('data_rate', data_rate) +# # RX_CONFIG['coef_p2'] = RX_CONFIG.pop('coef_p2', 2.5) +# RX_CONFIG['latency'] = RX_CONFIG.pop('latency', 0.01) +# RX_CONFIG['bias'] = RX_CONFIG.pop('bias', 0.) +# RX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address) +# +# +# # *** TX *** +# # ADC for current +# current_adc_voltage_min = 10. # mV +# current_adc_voltage_max = 4500. # mV +# low_battery = 12. # V (conventional value as it is not measured on this board) +# tx_mcp_board_address = 0x21 # +# # pwr_voltage_max = 12. # V +# # pwr_default_voltage = 12. # V +# # pwr_switch_on_warmup = 0. # seconds +# +# TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50), +# TX_CONFIG.pop('current_min', np.inf)]) # mA +# TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50), +# TX_CONFIG.pop('current_max', np.inf)]) # mA +# # TX_CONFIG['voltage_max'] = np.min([pwr_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V +# TX_CONFIG['voltage_max'] = TX_CONFIG.pop('voltage_max', np.inf) # V +# TX_CONFIG['voltage_min'] = -TX_CONFIG['voltage_max'] # V +# TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', np.inf), TX_CONFIG['voltage_max']]) # V +# # TX_CONFIG['pwr_switch_on_warm_up'] = TX_CONFIG.pop('pwr_switch_on_warmup', pwr_switch_on_warmup) +# TX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address) +# TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', low_battery) +# TX_CONFIG['latency'] = TX_CONFIG.pop('latency', 0.01) +# TX_CONFIG['bias'] = TX_CONFIG.pop('bias', 0.) def _gain_auto(channel): @@ -89,25 +111,35 @@ def _gain_auto(channel): class Tx(TxAbstract): def __init__(self, **kwargs): + for key in SPECS['tx'].keys(): + kwargs = enforce_specs(kwargs, SPECS['tx'], key) kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) + assert isinstance(self.connection, I2C) + kwargs.update({'pwr': kwargs.pop('pwr', SPECS['tx']['compatible_power_sources']['default'])}) + if (kwargs['pwr'] != SPECS['tx']['compatible_power_sources']['default'] + and kwargs['pwr'] not in SPECS['tx']['compatible_power_sources']['other']): + self.exec_logger.warning(f'Incompatible power source specified check config') + assert kwargs['pwr'] in SPECS['tx'] + # self.pwr = None # TODO: set a list of compatible power system with the tx self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}') - self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage']) + # self.voltage_max = kwargs['voltage_max'] # TODO: check if used + self._activation_delay = kwargs['activation_delay'] + self._release_delay = kwargs['release_delay'] self.voltage_adjustable = False self.current_adjustable = False - if self.ctl is None: - self.ctl = ctl_module.Ctl() - # elif isinstance(self.ctl, dict): - # self.ctl = ctl_module.Ctl(**self.ctl) - self.io = self.ctl.interfaces[kwargs.pop('connection', ctl_connection)] # I2C connexion to MCP23008, for current injection - self.mcp_board = MCP23008(self.io, address=TX_CONFIG['mcp_board_address']) + self.mcp_board = MCP23008(self.connection, address=0x21) # ADS1115 for current measurement (AB) self._ads_current_address = 0x48 - self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, + self._ads_current_data_rate = kwargs['data_rate'] + self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=self._ads_current_data_rate, address=self._ads_current_address) self._ads_current.mode = Mode.CONTINUOUS + self.r_shunt = kwargs['r_shunt'] + self.adc_voltage_min = kwargs['adc_voltage_min'] + self.adc_voltage_max = kwargs['adc_voltage_max'] # Relays for pulse polarity self.pin0 = self.mcp_board.get_pin(0) @@ -115,9 +147,7 @@ class Tx(TxAbstract): self.pin1 = self.mcp_board.get_pin(1) self.pin1.direction = Direction.OUTPUT self.polarity = 0 - self.adc_gain = 2 / 3 - - self.pwr = None # TODO: set a list of compatible power system with the tx + self.gain = 2 / 3 # Initialize LEDs self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run @@ -129,41 +159,45 @@ class Tx(TxAbstract): self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}') @property - def adc_gain(self): + def gain(self): return self._adc_gain - @adc_gain.setter - def adc_gain(self, value): + @gain.setter + def gain(self, value): assert value in [2/3, 2, 4, 8, 16] self._adc_gain = value - self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, + self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, + data_rate=SPECS['tx']['data_rate']['default'], address=self._ads_current_address) self._ads_current.mode = Mode.CONTINUOUS self.exec_logger.debug(f'Setting TX ADC gain to {value}') - def adc_gain_auto(self): + def _adc_gain_auto(self): self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') gain = _gain_auto(AnalogIn(self._ads_current, ads.P0)) self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}') - self.adc_gain = gain + self.gain = gain self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') def current_pulse(self, **kwargs): TxAbstract.current_pulse(self, **kwargs) - self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board') + self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') @property def current(self): """ Gets the current IAB in Amps """ - iab = AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * TX_CONFIG['r_shunt']) # measure current + iab = AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * self.r_shunt) # measure current self.exec_logger.debug(f'Reading TX current: {iab} mA') return iab @ current.setter def current(self, value): - assert TX_CONFIG['current_min'] <= value <= TX_CONFIG['current_max'] - self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board') + assert self.adc_voltage_min / (50 * self.r_shunt) <= value <= self.adc_voltage_max / (50 * self.r_shunt) + self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board') + + def gain_auto(self): + self._adc_gain_auto() def inject(self, polarity=1, injection_duration=None): self.polarity = polarity @@ -180,15 +214,15 @@ class Tx(TxAbstract): if polarity == 1: self.pin0.value = True self.pin1.value = False - time.sleep(0.005) # Max turn on time of 211EH relays = 5ms + time.sleep(self._activation_delay) # Max turn on time of 211EH relays = 5ms elif polarity == -1: self.pin0.value = False self.pin1.value = True - time.sleep(0.005) # Max turn on time of 211EH relays = 5ms + time.sleep(self._activation_delay) # Max turn on time of 211EH relays = 5ms else: self.pin0.value = False self.pin1.value = False - time.sleep(0.001) # Max turn off time of 211EH relays = 1ms + time.sleep(self._release_delay) # Max turn off time of 211EH relays = 1ms def turn_off(self): self.pwr.turn_off(self) @@ -200,9 +234,9 @@ class Tx(TxAbstract): def tx_bat(self): self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}') self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.') - return TX_CONFIG['low_battery'] + return self.pwr.voltage - def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=1): + def voltage_pulse(self, voltage=None, length=None, polarity=1): """ Generates a square voltage pulse Parameters @@ -218,7 +252,8 @@ class Tx(TxAbstract): # self.exec_logger.info(f'injection_duration: {length}') # TODO: delete me if length is None: length = self.injection_duration - self.pwr.voltage = voltage + if voltage is not None: + self.pwr.voltage = voltage self.exec_logger.debug(f'Voltage pulse of {polarity*self.pwr.voltage:.3f} V for {length:.3f} s') self.inject(polarity=polarity, injection_duration=length) self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tend\t{datetime.datetime.utcnow()}') @@ -226,27 +261,28 @@ class Tx(TxAbstract): class Rx(RxAbstract): def __init__(self, **kwargs): + for key in SPECS['rx'].keys(): + kwargs = enforce_specs(kwargs, SPECS['rx'], key) kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) + assert isinstance(self.connection, I2C) + self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}') - if self.ctl is None: - self.ctl = ctl_module.Ctl() - self.io = self.ctl.interfaces[kwargs.pop('connection', ctl_connection)] # I2C connexion to MCP23008, for DG411 - self.mcp_board = MCP23008(self.io, address=RX_CONFIG['mcp_board_address']) + self.mcp_board = MCP23008(self.connection, address=0x27) # ADS1115 for voltage measurement (MN) self._ads_voltage_address = 0x49 self._adc_gain = 2/3 - self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self._adc_gain, data_rate=860, + self._ads_voltage = ads.ADS1115(self.connection, gain=self._adc_gain, + data_rate=SPECS['rx']['data_rate']['default'], address=self._ads_voltage_address) self._ads_voltage.mode = Mode.CONTINUOUS - self._coef_p2 = kwargs.pop('coef_p2', RX_CONFIG['coef_p2']) - self._voltage_max = kwargs.pop('voltage_max', RX_CONFIG['voltage_max']) - self._sampling_rate = kwargs.pop('sampling_rate', sampling_rate) - self._latency = kwargs.pop('latency', RX_CONFIG['latency']) - self._bias = kwargs.pop('bias', RX_CONFIG['bias']) + self._coef_p2 = kwargs['coef_p2'] + # self._voltage_max = kwargs['voltage_max'] + self._sampling_rate = kwargs['sampling_rate'] + self._bias = kwargs['bias'] self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}') self.pin_DG0 = self.mcp_board.get_pin(0) @@ -262,25 +298,30 @@ class Rx(RxAbstract): self._voltage_gain = 0.5 @property - def adc_gain(self): + def gain(self): return self._adc_gain - @adc_gain.setter - def adc_gain(self, value): + @gain.setter + def gain(self, value): assert value in [2/3, 2, 4, 8, 16] self._adc_gain = value - self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, + self._ads_voltage = ads.ADS1115(self.connection, gain=self.adc_gain, + data_rate=SPECS['rx']['data_rate']['default'], address=self._ads_voltage_address) self._ads_voltage.mode = Mode.CONTINUOUS self.exec_logger.debug(f'Setting RX ADC gain to {value}') - def adc_gain_auto(self): + def _adc_gain_auto(self): self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') - gain = 2/3 + gain_0 = _gain_auto(AnalogIn(self._ads_voltage, ads.P0)) + gain_2 = _gain_auto(AnalogIn(self._ads_voltage, ads.P2)) + gain = np.min([gain_0, gain_2]) self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}') - self.adc_gain = gain + self.gain = gain self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') + def gain_auto(self): + self._adc_gain_auto() @property def voltage(self): """ Gets the voltage VMN in Volts