Commit 0baa8e44 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Updates mb_2024 to inherit from mb_2023

Showing with 51 additions and 320 deletions
+51 -320
import datetime import datetime
import importlib
# from ohmpi.config import HARDWARE_CONFIG # TODO: Remove references at config here -> move it in ohmpi_hardware as done for mux_2024
import adafruit_ads1x15.ads1115 as ads # noqa import adafruit_ads1x15.ads1115 as ads # noqa
from adafruit_ads1x15.analog_in import AnalogIn # noqa from adafruit_ads1x15.analog_in import AnalogIn # noqa
from adafruit_ads1x15.ads1x15 import Mode # noqa from adafruit_ads1x15.ads1x15 import Mode # noqa
...@@ -12,12 +10,6 @@ import numpy as np ...@@ -12,12 +10,6 @@ import numpy as np
import os import os
from ohmpi.hardware_components import TxAbstract, RxAbstract from ohmpi.hardware_components import TxAbstract, RxAbstract
from ohmpi.utils import enforce_specs from ohmpi.utils import enforce_specs
# ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi')
# ctl_connection = HARDWARE_CONFIG['ctl'].pop('connection', 'i2c')
# ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}')
# TX_CONFIG = HARDWARE_CONFIG['tx']
# RX_CONFIG = HARDWARE_CONFIG['rx']
# hardware characteristics and limitations # hardware characteristics and limitations
# voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s # voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s
...@@ -39,48 +31,7 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, ...@@ -39,48 +31,7 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.},
# TODO: move low_battery spec in pwr # TODO: move low_battery spec in pwr
# *** RX *** def _ads_1115_gain_auto(channel): # Make it a class method ?
# ADC for voltage
# voltage_adc_voltage_min = 10. # mV
# voltage_adc_voltage_max = 4500. # mV
# sampling_rate = 20. # Hz
# data_rate = 860. # S/s?
# RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV
# RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV
# RX_CONFIG['sampling_rate'] = RX_CONFIG.pop('sampling_rate', sampling_rate)
# RX_CONFIG['data_rate'] = RX_CONFIG.pop('data_rate', data_rate)
# RX_CONFIG['coef_p2'] = RX_CONFIG.pop('coef_p2', 2.5)
# RX_CONFIG['latency'] = RX_CONFIG.pop('latency', 0.01)
# RX_CONFIG['bias'] = RX_CONFIG.pop('bias', 0.)
# *** TX ***
# ADC for current
# current_adc_voltage_min = 10. # mV
# current_adc_voltage_max = 4500. # mV
# low_battery = 12. # V (conventional value as it is not measured on this board)
# tx_mcp_board_address = 0x20 #
# pwr_voltage_max = 12. # V
# pwr_default_voltage = 12. # V
# pwr_switch_on_warmup = 0. # seconds
# TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50),
# TX_CONFIG.pop('current_min', np.inf)]) # mA
# TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50),
# TX_CONFIG.pop('current_max', np.inf)]) # mA
# # TX_CONFIG['voltage_max'] = np.min([pwr_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V
# TX_CONFIG['voltage_max'] = TX_CONFIG.pop('voltage_max', np.inf) # V
# TX_CONFIG['voltage_min'] = -TX_CONFIG['voltage_max'] # V
# TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', np.inf), TX_CONFIG['voltage_max']]) # V
# # TX_CONFIG['pwr_switch_on_warm_up'] = TX_CONFIG.pop('pwr_switch_on_warmup', pwr_switch_on_warmup)
# TX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address)
# TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', low_battery)
# TX_CONFIG['latency'] = TX_CONFIG.pop('latency', 0.01)
# TX_CONFIG['bias'] = TX_CONFIG.pop('bias', 0.)
def _gain_auto(channel):
"""Automatically sets the gain on a channel """Automatically sets the gain on a channel
Parameters Parameters
...@@ -118,7 +69,6 @@ class Tx(TxAbstract): ...@@ -118,7 +69,6 @@ class Tx(TxAbstract):
and kwargs['pwr'] not in SPECS['tx']['compatible_power_sources']['other']): and kwargs['pwr'] not in SPECS['tx']['compatible_power_sources']['other']):
self.exec_logger.warning(f'Incompatible power source specified check config') self.exec_logger.warning(f'Incompatible power source specified check config')
assert kwargs['pwr'] in SPECS['tx'] assert kwargs['pwr'] in SPECS['tx']
#self.pwr = None # TODO: set a list of compatible power system with the tx
self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}')
# self.voltage_max = kwargs['voltage_max'] # TODO: check if used # self.voltage_max = kwargs['voltage_max'] # TODO: check if used
self._activation_delay = kwargs['activation_delay'] self._activation_delay = kwargs['activation_delay']
...@@ -147,9 +97,9 @@ class Tx(TxAbstract): ...@@ -147,9 +97,9 @@ class Tx(TxAbstract):
self.gain = 2 / 3 self.gain = 2 / 3
# MCP23008 pins for LEDs # MCP23008 pins for LEDs
self.pin4 = self.mcp_board.get_pin(4) # TODO: Delete me? No LED on this version of the board # self.pin4 = self.mcp_board.get_pin(4) # TODO: Delete me? No LED on this version of the board
self.pin4.direction = Direction.OUTPUT # self.pin4.direction = Direction.OUTPUT
self.pin4.value = True # self.pin4.value = True
self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}')
...@@ -169,7 +119,7 @@ class Tx(TxAbstract): ...@@ -169,7 +119,7 @@ class Tx(TxAbstract):
def _adc_gain_auto(self): def _adc_gain_auto(self):
self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}')
gain = _gain_auto(AnalogIn(self._ads_current, ads.P0)) gain = _ads_1115_gain_auto(AnalogIn(self._ads_current, ads.P0))
self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}') self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}')
self.gain = gain self.gain = gain
self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}')
...@@ -293,8 +243,8 @@ class Rx(RxAbstract): ...@@ -293,8 +243,8 @@ class Rx(RxAbstract):
def _adc_gain_auto(self): def _adc_gain_auto(self):
self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}')
gain_0 = _gain_auto(AnalogIn(self._ads_voltage, ads.P0)) gain_0 = _ads_1115_gain_auto(AnalogIn(self._ads_voltage, ads.P0))
gain_2 = _gain_auto(AnalogIn(self._ads_voltage, ads.P2)) gain_2 = _ads_1115_gain_auto(AnalogIn(self._ads_voltage, ads.P2))
gain = np.min([gain_0, gain_2]) gain = np.min([gain_0, gain_2])
self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}') self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}')
self.gain = gain self.gain = gain
......
import datetime import datetime
import importlib
from ohmpi.config import HARDWARE_CONFIG # TODO: Remove references at config here -> move it in ohmpi_hardware as done for mux_2024
import adafruit_ads1x15.ads1115 as ads # noqa import adafruit_ads1x15.ads1115 as ads # noqa
from adafruit_ads1x15.analog_in import AnalogIn # noqa from adafruit_ads1x15.analog_in import AnalogIn # noqa
from adafruit_ads1x15.ads1x15 import Mode # noqa from adafruit_ads1x15.ads1x15 import Mode # noqa
from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa
from digitalio import Direction # noqa from digitalio import Direction # noqa
import minimalmodbus # noqa
from busio import I2C # noqa from busio import I2C # noqa
import time from ohmpi.hardware_components.mb_2023_0_X import Tx as Tx_mb_2023
import numpy as np from ohmpi.hardware_components.mb_2023_0_X import Rx as Rx_mb_2023
import os
from ohmpi.hardware_components import TxAbstract, RxAbstract
from ohmpi.utils import enforce_specs
# ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi')
# ctl_connection = HARDWARE_CONFIG['ctl'].pop('connection', 'i2c')
# ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}')
#
# TX_CONFIG = HARDWARE_CONFIG['tx']
# RX_CONFIG = HARDWARE_CONFIG['rx']
# hardware characteristics and limitations # hardware characteristics and limitations
# voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s # voltages are given in mV, currents in mA, sampling rates in Hz and data_rate in S/s
...@@ -27,6 +15,7 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, ...@@ -27,6 +15,7 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.},
'bias': {'min': -5000., 'default': 0., 'max': 5000.}, 'bias': {'min': -5000., 'default': 0., 'max': 5000.},
'coef_p2': {'default': 2.50}, 'coef_p2': {'default': 2.50},
'voltage_min': {'default': 10.0}, 'voltage_min': {'default': 10.0},
'vmn_hardware_offset' : {'default': 2500.},
}, },
'tx': {'adc_voltage_min': {'default': 10.}, # Minimum voltage value used in vmin strategy 'tx': {'adc_voltage_min': {'default': 10.}, # Minimum voltage value used in vmin strategy
'adc_voltage_max': {'default': 4500.}, # Maximum voltage on ads1115 used to measure current 'adc_voltage_max': {'default': 4500.}, # Maximum voltage on ads1115 used to measure current
...@@ -34,56 +23,13 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.}, ...@@ -34,56 +23,13 @@ SPECS = {'rx': {'sampling_rate': {'min': 2., 'default': 10., 'max': 100.},
'data_rate': {'default': 860.}, 'data_rate': {'default': 860.},
'compatible_power_sources': {'default': 'pwr_batt', 'others' : ['dps5005']}, 'compatible_power_sources': {'default': 'pwr_batt', 'others' : ['dps5005']},
'r_shunt': {'min': 0., 'default': 2. }, 'r_shunt': {'min': 0., 'default': 2. },
'activation_delay': {'default': 0.005}, # Max turn on time of 211EH relays = 5ms 'activation_delay': {'default': 0.010}, # Max turn on time of OMRON G5LE-1 5VDC relays
'release_delay': {'default': 0.001}, # Max turn off time of 211EH relays = 1ms 'release_delay': {'default': 0.005}, # Max turn off time of OMRON G5LE-1 5VDC relays = 1ms
}} }}
# TODO: move low_battery spec in pwr # TODO: move low_battery spec in pwr
#
# # hardware characteristics and limitations
# # *** RX ***
# # ADC for voltage
# voltage_adc_voltage_min = 10. # mV
# voltage_adc_voltage_max = 4500. # mV
# sampling_rate = 20. # Hz
# data_rate = 860. # S/s?
# rx_mcp_board_address = 0x27
# RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV
# RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV
# RX_CONFIG['sampling_rate'] = RX_CONFIG.pop('sampling_rate', sampling_rate)
# RX_CONFIG['data_rate'] = RX_CONFIG.pop('data_rate', data_rate)
# # RX_CONFIG['coef_p2'] = RX_CONFIG.pop('coef_p2', 2.5)
# RX_CONFIG['latency'] = RX_CONFIG.pop('latency', 0.01)
# RX_CONFIG['bias'] = RX_CONFIG.pop('bias', 0.)
# RX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address)
#
#
# # *** TX ***
# # ADC for current
# current_adc_voltage_min = 10. # mV
# current_adc_voltage_max = 4500. # mV
# low_battery = 12. # V (conventional value as it is not measured on this board)
# tx_mcp_board_address = 0x21 #
# # pwr_voltage_max = 12. # V
# # pwr_default_voltage = 12. # V
# # pwr_switch_on_warmup = 0. # seconds
#
# TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50),
# TX_CONFIG.pop('current_min', np.inf)]) # mA
# TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50),
# TX_CONFIG.pop('current_max', np.inf)]) # mA
# # TX_CONFIG['voltage_max'] = np.min([pwr_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V
# TX_CONFIG['voltage_max'] = TX_CONFIG.pop('voltage_max', np.inf) # V
# TX_CONFIG['voltage_min'] = -TX_CONFIG['voltage_max'] # V
# TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', np.inf), TX_CONFIG['voltage_max']]) # V
# # TX_CONFIG['pwr_switch_on_warm_up'] = TX_CONFIG.pop('pwr_switch_on_warmup', pwr_switch_on_warmup)
# TX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address)
# TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', low_battery)
# TX_CONFIG['latency'] = TX_CONFIG.pop('latency', 0.01)
# TX_CONFIG['bias'] = TX_CONFIG.pop('bias', 0.)
def _ads_1115_gain_auto(channel): # Make it a class method ?
def _gain_auto(channel):
"""Automatically sets the gain on a channel """Automatically sets the gain on a channel
Parameters Parameters
...@@ -109,246 +55,83 @@ def _gain_auto(channel): ...@@ -109,246 +55,83 @@ def _gain_auto(channel):
return gain return gain
class Tx(TxAbstract): class Tx(Tx_mb_2023):
def __init__(self, **kwargs): def __init__(self, **kwargs):
for key in SPECS['tx'].keys():
kwargs = enforce_specs(kwargs, SPECS['tx'], key)
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs) super().__init__(**kwargs)
assert isinstance(self.connection, I2C)
kwargs.update({'pwr': kwargs.pop('pwr', SPECS['tx']['compatible_power_sources']['default'])})
if (kwargs['pwr'] != SPECS['tx']['compatible_power_sources']['default']
and kwargs['pwr'] not in SPECS['tx']['compatible_power_sources']['other']):
self.exec_logger.warning(f'Incompatible power source specified check config')
assert kwargs['pwr'] in SPECS['tx']
# self.pwr = None # TODO: set a list of compatible power system with the tx
self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}')
# self.voltage_max = kwargs['voltage_max'] # TODO: check if used
self._activation_delay = kwargs['activation_delay']
self._release_delay = kwargs['release_delay']
self.voltage_adjustable = False
self.current_adjustable = False
# I2C connexion to MCP23008, for current injection # I2C connexion to MCP23008, for current injection
self.mcp_board = MCP23008(self.connection, address=0x21) self.mcp_board = MCP23008(self.connection, address=0x21)
# ADS1115 for current measurement (AB)
self._ads_current_address = 0x48
self._ads_current_data_rate = kwargs['data_rate']
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain, data_rate=self._ads_current_data_rate,
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
self.r_shunt = kwargs['r_shunt']
self.adc_voltage_min = kwargs['adc_voltage_min']
self.adc_voltage_max = kwargs['adc_voltage_max']
# Relays for pulse polarity
self.pin0 = self.mcp_board.get_pin(0)
self.pin0.direction = Direction.OUTPUT
self.pin1 = self.mcp_board.get_pin(1)
self.pin1.direction = Direction.OUTPUT
self.polarity = 0
self.gain = 2 / 3
# Initialize LEDs # Initialize LEDs
self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run
self.pin4.direction = Direction.OUTPUT self.pin4.direction = Direction.OUTPUT
self.pin4.value = True self.pin4.value = True
self.pin6 = self.mcp_board.get_pin(6)
self._latency = kwargs.pop('latency', TX_CONFIG['latency']) self.pin6.direction = Direction.OUTPUT
self._bias = kwargs.pop('bias', TX_CONFIG['bias']) self.pin6.value = False
self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}')
@property
def gain(self):
return self._adc_gain
@gain.setter
def gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_current = ads.ADS1115(self.connection, gain=self.adc_gain,
data_rate=SPECS['tx']['data_rate']['default'],
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
self.exec_logger.debug(f'Setting TX ADC gain to {value}')
def _adc_gain_auto(self):
self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}')
gain = _gain_auto(AnalogIn(self._ads_current, ads.P0))
self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}')
self.gain = gain
self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}')
def current_pulse(self, **kwargs):
TxAbstract.current_pulse(self, **kwargs)
self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board')
@property
def current(self):
""" Gets the current IAB in Amps
"""
iab = AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * self.r_shunt) # measure current
self.exec_logger.debug(f'Reading TX current: {iab} mA')
return iab
@ current.setter
def current(self, value):
assert self.adc_voltage_min / (50 * self.r_shunt) <= value <= self.adc_voltage_max / (50 * self.r_shunt)
self.exec_logger.warning(f'Current pulse is not implemented for the {self.board_name} board')
def gain_auto(self):
self._adc_gain_auto()
def inject(self, polarity=1, injection_duration=None): def inject(self, polarity=1, injection_duration=None):
self.polarity = polarity self.polarity = polarity
TxAbstract.inject(self, polarity=polarity, injection_duration=injection_duration) # add leds?
self.pin6.value=True
@property Tx_mb_2023.inject(self, polarity=polarity, injection_duration=injection_duration)
def polarity(self): self.pin6.value = False
return self._polarity
@polarity.setter
def polarity(self, polarity):
assert polarity in [-1, 0, 1]
self._polarity = polarity
if polarity == 1:
self.pin0.value = True
self.pin1.value = False
time.sleep(self._activation_delay) # Max turn on time of 211EH relays = 5ms
elif polarity == -1:
self.pin0.value = False
self.pin1.value = True
time.sleep(self._activation_delay) # Max turn on time of 211EH relays = 5ms
else:
self.pin0.value = False
self.pin1.value = False
time.sleep(self._release_delay) # Max turn off time of 211EH relays = 1ms
def turn_off(self):
self.pwr.turn_off(self)
def turn_on(self):
self.pwr.turn_on(self)
@property
def tx_bat(self):
self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}')
self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.')
return self.pwr.voltage
def voltage_pulse(self, voltage=None, length=None, polarity=1):
""" Generates a square voltage pulse
Parameters
----------
voltage: float, optional
Voltage to apply in volts, tx_v_def is applied if omitted.
length: float, optional
Length of the pulse in seconds
polarity: 1,0,-1
Polarity of the pulse
"""
self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tbegin\t{datetime.datetime.utcnow()}')
# self.exec_logger.info(f'injection_duration: {length}') # TODO: delete me
if length is None:
length = self.injection_duration
if voltage is not None:
self.pwr.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity*self.pwr.voltage:.3f} V for {length:.3f} s')
self.inject(polarity=polarity, injection_duration=length)
self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tend\t{datetime.datetime.utcnow()}')
class Rx(RxAbstract): class Rx(Rx_mb_2023):
def __init__(self, **kwargs): def __init__(self, **kwargs):
for key in SPECS['rx'].keys():
kwargs = enforce_specs(kwargs, SPECS['rx'], key)
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs) super().__init__(**kwargs)
assert isinstance(self.connection, I2C) # I2C connexion to MCP23008, for current injection
self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}')
# I2C connexion to MCP23008, for DG411
self.mcp_board = MCP23008(self.connection, address=0x27) self.mcp_board = MCP23008(self.connection, address=0x27)
# ADS1115 for voltage measurement (MN) # ADS1115 for voltage measurement (MN)
self._ads_voltage_address = 0x49 self._coef_p2 = 1.
self._adc_gain = 2/3 # Define default DG411 gain
self._ads_voltage = ads.ADS1115(self.connection, gain=self._adc_gain, self._dg411_gain = 1/2
data_rate=SPECS['rx']['data_rate']['default'], # Define pins for DG411
address=self._ads_voltage_address)
self._ads_voltage.mode = Mode.CONTINUOUS
self._coef_p2 = kwargs['coef_p2']
# self._voltage_max = kwargs['voltage_max']
self._sampling_rate = kwargs['sampling_rate']
self._bias = kwargs['bias']
self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}')
self.pin_DG0 = self.mcp_board.get_pin(0) self.pin_DG0 = self.mcp_board.get_pin(0)
self.pin_DG0.direction = Direction.OUTPUT self.pin_DG0.direction = Direction.OUTPUT
self.pin_DG1 = self.mcp_board.get_pin(1) self.pin_DG1 = self.mcp_board.get_pin(1)
self.pin_DG1.direction = Direction.OUTPUT self.pin_DG1.direction = Direction.OUTPUT
self.pin_DG2 = self.mcp_board.get_pin(2) self.pin_DG2 = self.mcp_board.get_pin(2)
self.pin_DG2.direction = Direction.OUTPUT self.pin_DG2.direction = Direction.OUTPUT
# TODO: try to only log this event and not the one created by super()
self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}')
self.pin_DG0.value = True # open self.pin_DG0.value = True # open
self.pin_DG1.value = True # open gain 1 inactive self.pin_DG1.value = True # open gain 1 inactive
self.pin_DG2.value = False # close gain 0.5 active self.pin_DG2.value = False # close gain 0.5 active
self._voltage_gain = 0.5 self.gain = 1/3
def _dg411_gain_auto(self):
u = ((AnalogIn(self.ads_voltage, ads.P0).voltage * 1000) - self._vmn_hardware_offset) / self.voltage_gain
if u < self._vmn_hardware_offset :
self.voltage_gain = 1
else:
self.voltage_gain = 0.5
@property @property
def gain(self): def gain(self):
return self._adc_gain return self._adc_gain*self._dg411_gain
@gain.setter @gain.setter
def gain(self, value): def gain(self, value):
assert value in [2/3, 2, 4, 8, 16] assert value in [1/3, 2/3]
self._adc_gain = value self._dg411_gain = value / self._adc_gain
self._ads_voltage = ads.ADS1115(self.connection, gain=self.adc_gain, if self._dg411_gain == 1:
data_rate=SPECS['rx']['data_rate']['default'], self.pin_DG1.value = False # closed gain 1 active
address=self._ads_voltage_address) self.pin_DG2.value = True # open gain 0.5 inactive
self._ads_voltage.mode = Mode.CONTINUOUS elif self._dg411_gain == 1/2:
self.exec_logger.debug(f'Setting RX ADC gain to {value}') self.pin_DG1.value = True # closed gain 1 active
self.pin_DG2.value = False # open gain 0.5 inactive
def _adc_gain_auto(self):
self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}')
gain_0 = _gain_auto(AnalogIn(self._ads_voltage, ads.P0))
gain_2 = _gain_auto(AnalogIn(self._ads_voltage, ads.P2))
gain = np.min([gain_0, gain_2])
self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}')
self.gain = gain
self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}')
def gain_auto(self): def gain_auto(self):
self._adc_gain_auto() self._dg411_gain_auto()
@property @property
def voltage(self): def voltage(self):
""" Gets the voltage VMN in Volts """ Gets the voltage VMN in Volts
""" """
self.exec_logger.event(f'{self.board_name}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}')
u = -AnalogIn(self._ads_voltage, ads.P0, ads.P1).voltage * self._coef_p2 * 1000. - self._bias # TODO: check if it should be negated u = (AnalogIn(self._ads_voltage, ads.P0).voltage * self._coef_p2 * 1000. - self._vmn_hardware_offset) / self._dg411_gain - self._bias # TODO: check how to handle bias and _vmn_hardware_offset
self.exec_logger.event(f'{self.board_name}\trx_voltage\tend\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.board_name}\trx_voltage\tend\t{datetime.datetime.utcnow()}')
return u return u
@property
def voltage_gain(self):
return self._voltage_gain
@voltage_gain.setter
def voltage_gain(self,value):
assert value in [0.5, 1]
self._voltage_gain = value
if self._voltage_gain == 1:
self.pin_DG1.value = False # closed gain 1 active
self.pin_DG2.value = True # open gain 0.5 inactive
elif self._voltage_gain == 0.5:
self.pin_DG1.value = True # closed gain 1 active
self.pin_DG2.value = False # open gain 0.5 inactive
def voltage_gain_auto(self):
u = ((AnalogIn(self.ads_voltage, ads.P0).voltage * 1000) - self.vmn_hardware_offset) / self.voltage_gain
if abs(vmn1) < 2500 and abs(vmn2) < 2500: ###TODO change voltage gain auto logic
self.voltage_gain = 1
else:
self.voltage_gain = 0.5
import time
from ohmpi.config import HARDWARE_CONFIG
import os import os
import numpy as np import numpy as np
from ohmpi.hardware_components import MuxAbstract from ohmpi.hardware_components import MuxAbstract
import adafruit_tca9548a # noqa import adafruit_tca9548a # noqa
from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa
from digitalio import Direction # noqa from digitalio import Direction # noqa
from busio import I2C # noqa
# hardware characteristics and limitations # hardware characteristics and limitations
SPECS = {'voltage_max': 50., 'current_max': 3., 'activation_delay': 0.01, 'release_delay': 0.005} SPECS = {'voltage_max': 50., 'current_max': 3., 'activation_delay': 0.01, 'release_delay': 0.005}
......
...@@ -295,7 +295,7 @@ class OhmPiHardware: ...@@ -295,7 +295,7 @@ class OhmPiHardware:
sp = np.mean(mean_vmn[np.ix_(polarity == 1)] + mean_vmn[np.ix_(polarity == -1)]) / 2 sp = np.mean(mean_vmn[np.ix_(polarity == 1)] + mean_vmn[np.ix_(polarity == -1)]) / 2
return sp return sp
def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5, def _compute_tx_volt(self, pulse_duration=0.1, strategy='vmax', tx_volt=5.,
vab_max=voltage_max, vmn_min=voltage_min): vab_max=voltage_max, vmn_min=voltage_min):
"""Estimates best Tx voltage based on different strategies. """Estimates best Tx voltage based on different strategies.
At first a half-cycle is made for a short duration with a fixed At first a half-cycle is made for a short duration with a fixed
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment