Commit f2bcb511 authored by Arnaud WATLET's avatar Arnaud WATLET
Browse files

Updates mb 2023 kwargs management

Showing with 66 additions and 43 deletions
+66 -43
......@@ -21,14 +21,12 @@ HARDWARE_CONFIG = {
'ctl': {'model': 'raspberry_pi'},
'pwr': {'model': 'pwr_batt', 'voltage': 12.},
'tx': {'model': 'mb_2023_0_X',
'mcp_board_address': 0x20,
'voltage_max': 12., # Maximum voltage supported by the TX board [V]
'current_max': 4800 / 50 / 2, # Maximum current supported by the TX board [mA]
'adc_voltage_max': 4800., # Maximum voltage read by the current ADC on the TX board [mA]
'r_shunt': 2 # Shunt resistance in Ohms
},
'rx': {'model': 'mb_2023_0_X',
'coef_p2': 2.50, # slope for conversion for ADS, measurement in V/V
'latency': 0.010, # latency in seconds in continuous mode
'sampling_rate': 50 # number of samples per second
},
'mux': # default properties are system properties that will be
......
......@@ -112,7 +112,7 @@ class MuxAbstract(ABC):
if self.board_id is None:
self.exec_logger.error(f'MUX {self.board_name} should have an id !')
self.exec_logger.debug(f'MUX {self.board_id} ({self.board_name}) initialization')
self.connection = kwargs.pop('io', None)
self.connection = kwargs.pop('connection', None)
cabling = kwargs.pop('cabling', None)
self.cabling = {}
if cabling is not None:
......@@ -257,7 +257,7 @@ class TxAbstract(ABC):
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_tx')
self.ctl = kwargs.pop('ctl', None)
self.connection = kwargs.pop('io', None)
self.connection = kwargs.pop('connection', None)
self.pwr = kwargs.pop('pwr', None)
self._polarity = 0
self._injection_duration = None
......@@ -375,7 +375,7 @@ class RxAbstract(ABC):
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_rx')
self.ctl = kwargs.pop('ctl', None)
self.connection = kwargs.pop('io', None)
self.connection = kwargs.pop('connection', None)
self.board_name = kwargs.pop('board_name', 'unknown RX hardware')
self._sampling_rate = kwargs.pop('sampling_rate', 1) # ms
self.exec_logger.debug(f'{self.board_name} RX initialization')
......
......@@ -10,6 +10,7 @@ import time
import numpy as np
import os
from ohmpi.hardware_components import TxAbstract, RxAbstract
from ohmpi.utils import enforce_specs
# ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi')
# ctl_connection = HARDWARE_CONFIG['ctl'].pop('connection', 'i2c')
# ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}')
......@@ -19,11 +20,21 @@ from ohmpi.hardware_components import TxAbstract, RxAbstract
# hardware characteristics and limitations
# voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s
SPECS = {'RX': {'voltage_adc_voltage_min': 10., 'voltage_adc_voltage_max': 4500., 'sampling_rate': 20.,
'data_rate': 860.},
'TX': {'current_adc_voltage_min': 10., 'bias': 0., 'injection_voltage_max': 12000., 'low_battery': 12000.,
'tx_mcp_board_address': 0x20, 'data_rate': 860., 'comptatible_power_sources': ['pwr_batt', 'dps5005'],
'r_shunt': 2., 'activation_delay': 0.005, 'release_delay': 0.001}}
SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.},
'data_rate': {'default': 860.},
'connection': {'default': 'i2c'},
'bias': {'min': -5000., 'default': 0., 'max': 5000.},
'coef_p2': {'default': 2.50}},
'tx': {'adc_voltage_min': {'default': 10.},
'adc_voltage_max': {'default': 4500.},
'voltage_max': {'min': 0., 'default': 12., 'max': 12.},
'data_rate': {'default': 860.},
'compatible_power_sources': {'default': ['pwr_batt', 'dps5005']},
'r_shunt': {'min': 0., 'default': 2. },
'activation_delay': {'default': 0.005},
'release_delay': {'default': 0.001},
'connection': {'default': 'i2c'}
}}
# TODO: move low_battery spec in pwr
......@@ -96,30 +107,37 @@ def _gain_auto(channel):
class Tx(TxAbstract):
def __init__(self, **kwargs):
for key in SPECS['tx']:
kwargs = enforce_specs(kwargs, SPECS['tx'], key)
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs)
kwargs.update({'pwr': kwargs.pop('pwr', SPECS['compatible_power_sources'][0])})
if kwargs['pwr'] not in SPECS['TX']['compatible_power_sources']:
if kwargs['pwr'] not in SPECS['tx']['compatible_power_sources']:
self.exec_logger.warning(f'Incompatible power source specified check config')
assert kwargs['pwr'] in SPECS['TX']
assert kwargs['pwr'] in SPECS['tx']
#self.pwr = None # TODO: set a list of compatible power system with the tx
self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}')
self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage'])
self.voltage_max = kwargs['voltage_max']
self.voltage_adjustable = False
self.current_adjustable = False
if self.ctl is None:
self.ctl = ctl_module.Ctl()
# elif isinstance(self.ctl, dict):
# self.ctl = ctl_module.Ctl(**self.ctl)
self.connection = self.ctl.interfaces[kwargs.pop('connection', ctl_connection)]
self.connection = self.ctl.interfaces[kwargs['connection']]
# I2C connexion to MCP23008, for current injection
self.mcp_board = MCP23008(self.connection, address=SPECS['TX']['mcp_board_address'])
self.mcp_board = MCP23008(self.connection, address=0x20)
# ADS1115 for current measurement (AB)
self._ads_current_address = 0x48
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=860,
self._ads_current_data_rate = kwargs['data_rate']
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=self._ads_current_data_rate,
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
self.r_shunt = kwargs['r_shunt']
self.adc_voltage_min = kwargs['adc_voltage_min']
self.adc_voltage_max = kwargs['adc_voltage_max']
# Relays for pulse polarity
self.pin0 = self.mcp_board.get_pin(0)
......@@ -128,13 +146,14 @@ class Tx(TxAbstract):
self.pin1.direction = Direction.OUTPUT
self.polarity = 0
self.adc_gain = 2 / 3
self.activation_delay = kwargs['activation_delay']
self.release_delay = kwargs['release_delay']
# MCP23008 pins for LEDs
self.pin4 = self.mcp_board.get_pin(4) # TODO: Delete me? No LED on this version of the board
self.pin4.direction = Direction.OUTPUT
self.pin4.value = True
self._bias = kwargs.pop('bias', TX_CONFIG['bias'])
self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}')
@property
......@@ -145,7 +164,7 @@ class Tx(TxAbstract):
def adc_gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=SPECS['TX']['data_rate'],
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=kwargs['data_rate'],
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
self.exec_logger.debug(f'Setting TX ADC gain to {value}')
......@@ -159,20 +178,20 @@ class Tx(TxAbstract):
def current_pulse(self, **kwargs):
TxAbstract.current_pulse(self, **kwargs)
self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board')
self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board')
@property
def current(self):
""" Gets the current IAB in Amps
"""
iab = AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * TX_CONFIG['r_shunt']) # measure current
iab = AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * self.r_shunt) # measure current
self.exec_logger.debug(f'Reading TX current: {iab} mA')
return iab
@ current.setter
def current(self, value):
assert TX_CONFIG['current_min'] <= value <= TX_CONFIG['current_max']
self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board')
assert self.adc_voltage_min / (50 * self.r_shunt) <= value <= self.adc_voltage_max / (50 * self.r_shunt)
self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board')
def inject(self, polarity=1, injection_duration=None):
self.polarity = polarity
......@@ -189,15 +208,15 @@ class Tx(TxAbstract):
if polarity == 1:
self.pin0.value = True
self.pin1.value = False
time.sleep(SPECS['TX']['activation_delay']) # Max turn on time of 211EH relays = 5ms
time.sleep(self.activation_delay) # Max turn on time of 211EH relays = 5ms
elif polarity == -1:
self.pin0.value = False
self.pin1.value = True
time.sleep(SPECS['TX']['activation_delay']) # Max turn on time of 211EH relays = 5ms
time.sleep(self.activation_delay) # Max turn on time of 211EH relays = 5ms
else:
self.pin0.value = False
self.pin1.value = False
time.sleep(SPECS['TX']['release_delay']) # Max turn off time of 211EH relays = 1ms
time.sleep(self.release_delay) # Max turn off time of 211EH relays = 1ms
def turn_off(self):
self.pwr.turn_off(self)
......@@ -209,9 +228,9 @@ class Tx(TxAbstract):
def tx_bat(self):
self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}')
self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.')
return TX_CONFIG['low_battery']
return self.pwr.voltage
def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=1):
def voltage_pulse(self, voltage=self.voltage, length=None, polarity=1):
""" Generates a square voltage pulse
Parameters
......@@ -235,12 +254,14 @@ class Tx(TxAbstract):
class Rx(RxAbstract):
def __init__(self, **kwargs):
for key in kwargs:
kwargs = enforce_specs(kwargs, SPECS['rx'], key)
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs)
self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}')
if self.ctl is None:
self.ctl = ctl_module.Ctl()
self.connection = self.ctl.interfaces[kwargs.pop('connection', ctl_connection)]
self.connection = self.ctl.interfaces[kwargs['connection']]
# ADS1115 for voltage measurement (MN)
self._ads_voltage_address = 0x49
......@@ -248,11 +269,10 @@ class Rx(RxAbstract):
self._ads_voltage = ads.ADS1115(self.connection, gain=self._adc_gain, data_rate=860,
address=self._ads_voltage_address)
self._ads_voltage.mode = Mode.CONTINUOUS
self._coef_p2 = kwargs.pop('coef_p2', RX_CONFIG['coef_p2'])
self._voltage_max = kwargs.pop('voltage_max', RX_CONFIG['voltage_max'])
self._sampling_rate = kwargs.pop('sampling_rate', sampling_rate)
self._latency = kwargs.pop('latency', RX_CONFIG['latency'])
self._bias = kwargs.pop('bias', RX_CONFIG['bias'])
self._coef_p2 = kwargs['coef_p2']
# self._voltage_max = kwargs['voltage_max']
self._sampling_rate = kwargs['sampling_rate']
self._bias = kwargs['bias']
self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}')
@property
......
......@@ -3,18 +3,23 @@ import os
import shutil
import collections.abc
import numpy as np
from numbers import Number
def enforce_specs(kwargs, specs, key):
kwargs.update({key: kwargs.pop(key, specs[key]['default'])})
s = specs.copy()
min_value = s[key].pop('min', -np.inf)
s[key]['min'] = min_value
max_value = s[key].pop('max', np.inf)
s[key]['max'] = max_value
if kwargs[key] < min_value:
kwargs[key] = min_value
elif kwargs[key] > max_value:
kwargs[key] = max_value
if isinstance(kwargs[key], Number):
s = specs.copy()
min_value = s[key].pop('min', -np.inf)
s[key]['min'] = min_value
max_value = s[key].pop('max', np.inf)
s[key]['max'] = max_value
if kwargs[key] < min_value:
kwargs[key] = min_value
elif kwargs[key] > max_value:
kwargs[key] = max_value
return kwargs
def update_dict(d, u):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment