Commit bff7e26d authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Modifies the way a controller is used by defining several connections and...

Modifies the way a controller is used by defining several connections and adding an io to components
Showing with 277 additions and 118 deletions
+277 -118
......@@ -18,7 +18,7 @@ OHMPI_CONFIG = {
}
HARDWARE_CONFIG = {
'ctl': {'model': 'raspberry_pi_i2c'},
'ctl': {'model': 'raspberry_pi', 'connection': 'i2c'},
'pwr': {'model': 'pwr_batt', 'voltage': 12.},
'tx': {'model': 'ohmpi_card_3_15',
'mcp_board_address': 0x20,
......
......@@ -40,9 +40,9 @@ HARDWARE_CONFIG = {
{'model' : 'mux_2024_rev_0_0', # 'ohmpi_i2c_mux64_v1.01',
'tca_address': None,
'tca_channel': 0,
'mcp_0' : '0x22', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...
'mcp_1' : '0x23', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...)
'roles' : {'M': 'X', 'N': 'Y'},
'mcp_0': '0x22', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...
'mcp_1': '0x23', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...)
'roles': {'M': 'X', 'N': 'Y'},
'voltage_max': 12.
}},
'default': {'voltage_max': 100., 'current_max': 3.}}
......
import logging
from ohmpi.utils import get_platform
from paho.mqtt.client import MQTTv31
_, on_pi = get_platform()
# DEFINE THE ID OF YOUR OhmPi
ohmpi_id = '0001' if on_pi else 'XXXX'
# DEFINE YOUR MQTT BROKER (DEFAULT: 'localhost')
mqtt_broker = 'localhost' if on_pi else 'NAME_YOUR_BROKER_WHEN_IN_SIMULATION_MODE_HERE'
# DEFINE THE SUFFIX TO ADD TO YOUR LOGS FILES
logging_suffix = ''
# OhmPi configuration
OHMPI_CONFIG = {
'id': ohmpi_id, # Unique identifier of the OhmPi board (string)
'settings': 'ohmpi_settings.json', # INSERT YOUR FAVORITE SETTINGS FILE HERE
}
HARDWARE_CONFIG = {
'ctl': {'model': 'raspberry_pi_i2c'},
'pwr': {'model': 'pwr_batt', 'voltage': 12.},
'tx': {'model': 'mb_2024_rev_0_2',
'voltage_max': 50., # Maximum voltage supported by the TX board [V]
'current_max': 4800 / 50 / 2, # Maximum current supported by the TX board [mA]
'r_shunt': 2 # Shunt resistance in Ohms
},
'rx': {'model': 'mb_2024_rev_0_2',
'coef_p2': 2.50, # slope for conversion for ADS, measurement in V/V
'latency': 0.010, # latency in seconds in continuous mode
'sampling_rate': 50 # number of samples per second
}
}
# SET THE LOGGING LEVELS, MQTT BROKERS AND MQTT OPTIONS ACCORDING TO YOUR NEEDS
# Execution logging configuration
EXEC_LOGGING_CONFIG = {
'logging_level': logging.INFO,
'log_file_logging_level': logging.DEBUG,
'logging_to_console': True,
'file_name': f'exec{logging_suffix}.log',
'max_bytes': 262144,
'backup_count': 30,
'when': 'd',
'interval': 1
}
# Data logging configuration
DATA_LOGGING_CONFIG = {
'logging_level': logging.INFO,
'logging_to_console': True,
'file_name': f'data{logging_suffix}.log',
'max_bytes': 16777216,
'backup_count': 1024,
'when': 'd',
'interval': 1
}
# State of Health logging configuration (For a future release)
SOH_LOGGING_CONFIG = {
'logging_level': logging.INFO,
'logging_to_console': True,
'log_file_logging_level': logging.DEBUG,
'file_name': f'soh{logging_suffix}.log',
'max_bytes': 16777216,
'backup_count': 1024,
'when': 'd',
'interval': 1
}
# MQTT logging configuration parameters
MQTT_LOGGING_CONFIG = {
'hostname': mqtt_broker,
'port': 1883,
'qos': 2,
'retain': False,
'keepalive': 60,
'will': None,
'auth': {'username': 'mqtt_user', 'password': 'mqtt_password'},
'tls': None,
'protocol': MQTTv31,
'transport': 'tcp',
'client_id': f'{OHMPI_CONFIG["id"]}',
'exec_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/exec',
'exec_logging_level': logging.DEBUG,
'data_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/data',
'data_logging_level': DATA_LOGGING_CONFIG['logging_level'],
'soh_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/soh',
'soh_logging_level': SOH_LOGGING_CONFIG['logging_level']
}
# MQTT control configuration parameters
MQTT_CONTROL_CONFIG = {
'hostname': mqtt_broker,
'port': 1883,
'qos': 2,
'retain': False,
'keepalive': 60,
'will': None,
'auth': {'username': 'mqtt_user', 'password': 'mqtt_password'},
'tls': None,
'protocol': MQTTv31,
'transport': 'tcp',
'client_id': f'{OHMPI_CONFIG["id"]}',
'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl'
}
......@@ -9,7 +9,7 @@ from threading import Event, Barrier, BrokenBarrierError
class CtlAbstract(ABC):
def __init__(self, **kwargs):
self.board_name = kwargs.pop('board_name', 'unknown CTL hardware')
self.bus = None # TODO: allow for several buses
self.connections = None # TODO: allow for several buses
self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec_ctl')
......@@ -55,7 +55,7 @@ class PwrAbstract(ABC):
self._current_max = kwargs.pop('current_max', 0.)
self._voltage_min = kwargs.pop('voltage_min', 0.)
self._voltage_max = kwargs.pop('voltage_max', 0.)
self.ctl = kwargs.pop('ctl', None)
self.io = kwargs.pop('io', None)
@property
@abstractmethod
......@@ -110,7 +110,7 @@ class MuxAbstract(ABC):
if self.board_id is None:
self.exec_logger.error(f'MUX {self.board_name} should have an id !')
self.exec_logger.debug(f'MUX {self.board_id} ({self.board_name}) initialization')
self.ctl = kwargs.pop('ctl', None)
self.io = kwargs.pop('io', None)
cabling = kwargs.pop('cabling', None)
self.cabling = {}
if cabling is not None:
......@@ -254,7 +254,7 @@ class TxAbstract(ABC):
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_tx')
self.ctl = kwargs.pop('ctl', None)
self.io = kwargs.pop('io', None)
self.pwr = kwargs.pop('pwr', None)
self._polarity = 0
self._injection_duration = None
......@@ -371,7 +371,7 @@ class RxAbstract(ABC):
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_rx')
self.ctl = kwargs.pop('ctl', None)
self.io = kwargs.pop('io', None)
self.board_name = kwargs.pop('board_name', 'unknown RX hardware')
self._sampling_rate = kwargs.pop('sampling_rate', 1) # ms
self.exec_logger.debug(f'{self.board_name} RX initialization')
......
import datetime
import importlib
from ohmpi.config import HARDWARE_CONFIG
import adafruit_ads1x15.ads1115 as ads # noqa
from adafruit_ads1x15.analog_in import AnalogIn # noqa
from adafruit_ads1x15.ads1x15 import Mode # noqa
from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa
from digitalio import Direction # noqa
import minimalmodbus # noqa
......@@ -9,7 +11,8 @@ import time
import numpy as np
import os
from ohmpi.hardware_components import TxAbstract, RxAbstract
ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["hardware"]["ctl"]["model"]}')
ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi_i2c')
ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}')
TX_CONFIG = HARDWARE_CONFIG['tx']
RX_CONFIG = HARDWARE_CONFIG['rx']
......@@ -19,27 +22,40 @@ RX_CONFIG = HARDWARE_CONFIG['rx']
# ADC for voltage
voltage_adc_voltage_min = 10. # mV
voltage_adc_voltage_max = 4500. # mV
sampling_rate = 20. # Hz
data_rate = 860. # S/s?
RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV
RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV
RX_CONFIG['sampling_rate'] = RX_CONFIG.pop('sampling_rate', sampling_rate)
RX_CONFIG['data_rate'] = RX_CONFIG.pop('data_rate', data_rate)
# RX_CONFIG['coef_p2'] = RX_CONFIG.pop('coef_p2', 2.5)
RX_CONFIG['latency'] = RX_CONFIG.pop('latency', 0.01)
RX_CONFIG['bias'] = RX_CONFIG.pop('bias', 0.)
# *** TX ***
# ADC for current
current_adc_voltage_min = 10. # mV
current_adc_voltage_max = 4500. # mV
# DPS
dps_voltage_max = 50. # V
dps_default_voltage = 5. # V
dps_switch_on_warmup = 4. # seconds
tx_low_battery = 12. # V
TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop('current_min', np.inf)]) # mA
TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop('current_max', np.inf)]) # mA
TX_CONFIG['voltage_max'] = np.min([dps_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V
TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', dps_default_voltage), TX_CONFIG['voltage_max']]) # V
TX_CONFIG['dps_switch_on_warm_up'] = TX_CONFIG.pop('dps_switch_on_warmup', dps_switch_on_warmup)
TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', tx_low_battery)
current_adc_voltage_max = 4500. # mV
low_battery = 12. # V (conventional value as it is not measured on this board)
tx_mcp_board_address = 0x21 #
# pwr_voltage_max = 12. # V
# pwr_default_voltage = 12. # V
# pwr_switch_on_warmup = 0. # seconds
TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50),
TX_CONFIG.pop('current_min', np.inf)]) # mA
TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50),
TX_CONFIG.pop('current_max', np.inf)]) # mA
# TX_CONFIG['voltage_max'] = np.min([pwr_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V
TX_CONFIG['voltage_max'] = TX_CONFIG.pop('voltage_max', np.inf) # V
TX_CONFIG['voltage_min'] = -TX_CONFIG['voltage_max'] # V
TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', np.inf), TX_CONFIG['voltage_max']]) # V
# TX_CONFIG['pwr_switch_on_warm_up'] = TX_CONFIG.pop('pwr_switch_on_warmup', pwr_switch_on_warmup)
TX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address)
TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', low_battery)
TX_CONFIG['latency'] = TX_CONFIG.pop('latency', 0.01)
TX_CONFIG['bias'] = TX_CONFIG.pop('bias', 0.)
def _gain_auto(channel):
......@@ -57,31 +73,37 @@ def _gain_auto(channel):
"""
gain = 2 / 3
if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.0):
if (abs(channel.voltage) < 2.048) and (abs(channel.voltage) >= 1.024):
gain = 2
elif (abs(channel.voltage) < 1.0) and (abs(channel.voltage) >= 0.500):
elif (abs(channel.voltage) < 1.024) and (abs(channel.voltage) >= 0.512):
gain = 4
elif (abs(channel.voltage) < 0.500) and (abs(channel.voltage) >= 0.250):
elif (abs(channel.voltage) < 0.512) and (abs(channel.voltage) >= 0.256):
gain = 8
elif abs(channel.voltage) < 0.250:
elif abs(channel.voltage) < 0.256:
gain = 16
return gain
class Tx(TxAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs)
self.exec_logger.event(f'{self.board_name}\ttx_init\tbegin\t{datetime.datetime.utcnow()}')
self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage'])
self.ctl = kwargs.pop('controller', ctl_module.Ctl())
self.voltage_adjustable = False
self.current_adjustable = False
if self.ctl is None:
self.ctl = ctl_module.Ctl()
# elif isinstance(self.ctl, dict):
# self.ctl = ctl_module.Ctl(**self.ctl)
# I2C connexion to MCP23008, for current injection
self.mcp_board = MCP23008(self.ctl.bus, address=TX_CONFIG['mcp_board_address'])
# ADS1115 for current measurement (AB)
self._adc_gain = 2/3
self._ads_current_address = 0x48
self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860,
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
# Relays for pulse polarity
self.pin0 = self.mcp_board.get_pin(0)
......@@ -89,31 +111,18 @@ class Tx(TxAbstract):
self.pin1 = self.mcp_board.get_pin(1)
self.pin1.direction = Direction.OUTPUT
self.polarity = 0
self.adc_gain = 2 / 3
# DPH 5005 Digital Power Supply
self.pin2 = self.mcp_board.get_pin(2) # dps +
self.pin2.direction = Direction.OUTPUT
self.pin3 = self.mcp_board.get_pin(3) # dps -
self.pin3.direction = Direction.OUTPUT
self.turn_on()
time.sleep(TX_CONFIG['dps_switch_on_warm_up'])
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal)
self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc
self.DPS.serial.bytesize = 8 #
self.DPS.serial.timeout = 1. # greater than 0.5 for it to work
self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
self.DPS.write_register(0x0001, 1000, 0) # max current allowed (100 mA for relays) :
# (last number) 0 is for mA, 3 is for A
self.pwr = None # TODO: set a list of compatible power system with the tx
# I2C connexion to MCP23008, for current injection
# Initialize LEDs
self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run
self.pin4.direction = Direction.OUTPUT
self.pin4.value = True
self.exec_logger.info(f'TX battery: {self.tx_bat:.1f} V')
self.turn_off()
self._latency = kwargs.pop('latency', TX_CONFIG['latency'])
self._bias = kwargs.pop('bias', TX_CONFIG['bias'])
self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}')
@property
def adc_gain(self):
......@@ -125,12 +134,15 @@ class Tx(TxAbstract):
self._adc_gain = value
self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860,
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
self.exec_logger.debug(f'Setting TX ADC gain to {value}')
def adc_gain_auto(self):
self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}')
gain = _gain_auto(AnalogIn(self._ads_current, ads.P0))
self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}')
self.adc_gain = gain
self.exec_logger.event(f'{self.board_name}\ttx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}')
def current_pulse(self, **kwargs):
TxAbstract.current_pulse(self, **kwargs)
......@@ -150,65 +162,43 @@ class Tx(TxAbstract):
self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board')
def inject(self, polarity=1, injection_duration=None):
self.polarity = polarity
TxAbstract.inject(self, polarity=polarity, injection_duration=injection_duration)
# move this part in DPS5005
# if state=='on':
# self.DPS.write_register(0x09, 1) # DPS5005 on
# else:
# self.DPS.write_register(0x09, 0) # DPS5005 off
@property
def polarity(self):
return TxAbstract.polarity.fget(self)
return self._polarity
@polarity.setter
def polarity(self, value):
TxAbstract.polarity.fset(self, value)
if value==1:
def polarity(self, polarity):
assert polarity in [-1, 0, 1]
self._polarity = polarity
if polarity == 1:
self.pin0.value = True
self.pin1.value = False
elif value==-1:
time.sleep(0.005) # Max turn on time of 211EH relays = 5ms
elif polarity == -1:
self.pin0.value = False
self.pin1.value = True
time.sleep(0.005) # Max turn on time of 211EH relays = 5ms
else:
self.pin0.value = False
self.pin1.value = False
#time.sleep(0.001) # TODO: check max switching time of relays
@property
def voltage(self):
return self._voltage
@voltage.setter
def voltage(self, value):
if value > TX_CONFIG['voltage_max']:
self.exec_logger.warning(f'Sorry, cannot inject more than {TX_CONFIG["voltage_max"]} V, '
f'set it back to {TX_CONFIG["default_voltage"]} V (default value).')
value = TX_CONFIG['default_voltage']
if value < 0.:
self.exec_logger.warning(f'Voltage should be given as a positive number. '
f'Set polarity to -1 to reverse voltage...')
value = np.abs(value)
self.DPS.write_register(0x0000, value, 2)
time.sleep(0.001) # Max turn off time of 211EH relays = 1ms
def turn_off(self):
TxAbstract.turn_off(self)
self.pin2.value = False
self.pin3.value = False
self.pwr.turn_off(self)
def turn_on(self):
TxAbstract.turn_on(self)
self.pin2.value = True
self.pin3.value = True
self.pwr.turn_on(self)
@property
def tx_bat(self):
tx_bat = self.DPS.read_register(0x05, 2)
if tx_bat < TX_CONFIG['low_battery']:
self.soh_logger.warning(f'Low TX Battery: {tx_bat:.1f} V')
return tx_bat
self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}')
self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.')
return TX_CONFIG['low_battery']
def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=None):
def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=1):
""" Generates a square voltage pulse
Parameters
......@@ -220,28 +210,38 @@ class Tx(TxAbstract):
polarity: 1,0,-1
Polarity of the pulse
"""
self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tbegin\t{datetime.datetime.utcnow()}')
# self.exec_logger.info(f'injection_duration: {length}') # TODO: delete me
if length is None:
length = self.injection_duration
if polarity is None:
polarity = self.polarity
self.polarity = polarity
self.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity*voltage:.3f} V for {length:.3f} s')
self.inject(state='on')
time.sleep(length)
self.inject(state='off')
self.pwr.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity*self.pwr.voltage:.3f} V for {length:.3f} s')
self.inject(polarity=polarity, injection_duration=length)
self.exec_logger.event(f'{self.board_name}\ttx_voltage_pulse\tend\t{datetime.datetime.utcnow()}')
class Rx(RxAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs)
self.ctl = kwargs.pop('controller', ctl_module.Ctl())
self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}')
if self.ctl is None:
self.ctl = ctl_module.Ctl()
# I2C connexion to MCP23008, for DG411
self.mcp_board = MCP23008(self.ctl.bus, address=RX_CONFIG['mcp_board_address'])
# ADS1115 for voltage measurement (MN)
self._ads_voltage_address = 0x49
self._adc_gain = 2/3
self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self._adc_gain, data_rate=860, address=self._ads_voltage_address)
self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self._adc_gain, data_rate=860,
address=self._ads_voltage_address)
self._ads_voltage.mode = Mode.CONTINUOUS
self._coef_p2 = kwargs.pop('coef_p2', RX_CONFIG['coef_p2'])
self._voltage_max = kwargs.pop('voltage_max', RX_CONFIG['voltage_max'])
self._sampling_rate = kwargs.pop('sampling_rate', sampling_rate)
self._latency = kwargs.pop('latency', RX_CONFIG['latency'])
self._bias = kwargs.pop('bias', RX_CONFIG['bias'])
self.exec_logger.event(f'{self.board_name}\trx_init\tend\t{datetime.datetime.utcnow()}')
@property
def adc_gain(self):
......@@ -253,21 +253,23 @@ class Rx(RxAbstract):
self._adc_gain = value
self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860,
address=self._ads_voltage_address)
self._ads_voltage.mode = Mode.CONTINUOUS
self.exec_logger.debug(f'Setting RX ADC gain to {value}')
def adc_gain_auto(self):
self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tbegin\t{datetime.datetime.utcnow()}')
gain_0 = _gain_auto(AnalogIn(self._ads_voltage, ads.P0))
gain_2 = _gain_auto(AnalogIn(self._ads_voltage, ads.P2))
gain = np.min([gain_0, gain_2])
self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}')
self.adc_gain = gain
self.exec_logger.event(f'{self.board_name}\trx_adc_auto_gain\tend\t{datetime.datetime.utcnow()}')
@property
def voltage(self):
""" Gets the voltage VMN in Volts
"""
u0 = AnalogIn(self._ads_voltage, ads.P0).voltage * 1000.
u2 = AnalogIn(self._ads_voltage, ads.P2).voltage * 1000.
u = np.max([u0,u2]) * (np.heaviside(u0-u2, 1.) * 2 - 1.) # gets the max between u0 & u2 and set the sign
self.exec_logger.debug(f'Reading voltages {u0} V and {u2} V on RX. Returning {u} V')
return u
\ No newline at end of file
self.exec_logger.event(f'{self.board_name}\trx_voltage\tbegin\t{datetime.datetime.utcnow()}')
u = -AnalogIn(self._ads_voltage, ads.P0, ads.P1).voltage * self._coef_p2 * 1000. - self._bias # TODO: check if it should be negated
self.exec_logger.event(f'{self.board_name}\trx_voltage\tend\t{datetime.datetime.utcnow()}')
return u
......@@ -71,7 +71,7 @@ class Mux(MuxAbstract):
else:
self.exec_logger.error(f'Invalid role assignment for {self.board_name}: {self._roles} !')
self._mode = ''
self._tca = [adafruit_tca9548a.TCA9548A(self.ctl.bus, tca_address)[i] for i in np.arange(7,3,-1)]
self._tca = [adafruit_tca9548a.TCA9548A(self.ctl.bus, tca_address)[i] for i in np.arange(7, 3, -1)]
# self._mcp_addresses = (kwargs.pop('mcp', '0x20')) # TODO: add assert on valid addresses..
self._mcp = [None, None, None, None]
self.reset()
......
......@@ -6,12 +6,12 @@ from adafruit_ads1x15.analog_in import AnalogIn # noqa
from adafruit_ads1x15.ads1x15 import Mode # noqa
from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa
from digitalio import Direction # noqa
import minimalmodbus # noqa
import time
import numpy as np
import os
from ohmpi.hardware_components import TxAbstract, RxAbstract
ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi_i2c')
ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi')
ctl_connection = HARDWARE_CONFIG['ctl'].pop('connection', 'i2c')
ctl_module = importlib.import_module(f'ohmpi.hardware_components.{ctl_name}')
TX_CONFIG = HARDWARE_CONFIG['tx']
......@@ -96,12 +96,13 @@ class Tx(TxAbstract):
self.ctl = ctl_module.Ctl()
# elif isinstance(self.ctl, dict):
# self.ctl = ctl_module.Ctl(**self.ctl)
self.io = self.ctl[kwargs.pop('connection', ctl_connection)]
# I2C connexion to MCP23008, for current injection
self.mcp_board = MCP23008(self.ctl.bus, address=TX_CONFIG['mcp_board_address'])
self.mcp_board = MCP23008(self.io, address=TX_CONFIG['mcp_board_address'])
# ADS1115 for current measurement (AB)
self._ads_current_address = 0x48
self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860,
self._ads_current = ads.ADS1115(self.io, gain=self.adc_gain, data_rate=860,
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
......@@ -115,10 +116,11 @@ class Tx(TxAbstract):
self.pwr = None # TODO: set a list of compatible power system with the tx
# I2C connexion to MCP23008, for current injection
self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run
# MCP23008 pins for LEDs
self.pin4 = self.mcp_board.get_pin(4) # TODO: Delete me? No LED on this version of the board
self.pin4.direction = Direction.OUTPUT
self.pin4.value = True
self._latency = kwargs.pop('latency', TX_CONFIG['latency'])
self._bias = kwargs.pop('bias', TX_CONFIG['bias'])
self.exec_logger.event(f'{self.board_name}\ttx_init\tend\t{datetime.datetime.utcnow()}')
......@@ -131,7 +133,7 @@ class Tx(TxAbstract):
def adc_gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860,
self._ads_current = ads.ADS1115(self.io, gain=self.adc_gain, data_rate=860,
address=self._ads_current_address)
self._ads_current.mode = Mode.CONTINUOUS
self.exec_logger.debug(f'Setting TX ADC gain to {value}')
......@@ -226,10 +228,12 @@ class Rx(RxAbstract):
self.exec_logger.event(f'{self.board_name}\trx_init\tbegin\t{datetime.datetime.utcnow()}')
if self.ctl is None:
self.ctl = ctl_module.Ctl()
self.io = self.ctl[kwargs.pop('connection', ctl_connection)]
# ADS1115 for voltage measurement (MN)
self._ads_voltage_address = 0x49
self._adc_gain = 2/3
self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self._adc_gain, data_rate=860,
self._ads_voltage = ads.ADS1115(self.io, gain=self._adc_gain, data_rate=860,
address=self._ads_voltage_address)
self._ads_voltage.mode = Mode.CONTINUOUS
self._coef_p2 = kwargs.pop('coef_p2', RX_CONFIG['coef_p2'])
......@@ -247,7 +251,7 @@ class Rx(RxAbstract):
def adc_gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860,
self._ads_voltage = ads.ADS1115(self.io, gain=self.adc_gain, data_rate=860,
address=self._ads_voltage_address)
self._ads_voltage.mode = Mode.CONTINUOUS
self.exec_logger.debug(f'Setting RX ADC gain to {value}')
......
from ohmpi.hardware_components import CtlAbstract
import board # noqa
import busio # noqa
from adafruit_extended_bus import ExtendedI2C # noqa
import minimalmodbus # noqa
import os
from ohmpi.utils import get_platform
from gpiozero import CPUTemperature # noqa
class Ctl(CtlAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
modbus_baudrate = kwargs.pop('modbus_baudrate', 9600)
modbus_bitesize = kwargs.pop('modbus_bitesize', 8)
modbus_timeout = kwargs.pop('modbus_timeout', 1)
modbus_debug = kwargs.pop('modbus_debug', False)
modbus_parity = kwargs.pop('modbus_parity', 'N')
modbus_mode = kwargs.pop('modbus_mode', minimalmodbus.MODE_RTU)
modbus_port = kwargs.pop('modbus_port', '/dev/ttyUSB0')
modbus_slave_address = kwargs.pop('modbus_slave_address', 1)
super().__init__(**kwargs)
self.connections = dict()
# I2C
self.connections['i2c'] = I2C(board.SCL, board.SDA) # noqa
# Extended I2C
self.connections['i2c_ext'] = ExtendedI2C(4) # 4 is defined
# modbus
self.connections['modbus'] = minimalmodbus.Instrument(port=modbus_port, slaveaddress=modbus_slave_address)
self.connections['modbus'].serial.baudrate = modbus_baudrate # Baud rate 9600 as listed in doc
self.connections['modbus'].serial.bytesize = modbus_bitesize #
self.connections['modbus'].serial.timeout = modbus_timeout # greater than 0.5 for it to work
self.connections['modbus'].debug = modbus_debug #
self.connections['modbus'].serial.parity = modbus_parity # No parity
self.connections['modbus'].mode = modbus_mode # RTU mode
platform, on_pi = get_platform()
assert on_pi
self.board_name = platform
self._cpu_temp_available = True
self.max_cpu_temp = 85. # °C
@property
def _cpu_temp(self):
return CPUTemperature().temperature
......@@ -8,7 +8,6 @@ from gpiozero import CPUTemperature # noqa
import minimalmodbus # noqa
class Ctl(CtlAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
......@@ -20,8 +19,6 @@ class Ctl(CtlAbstract):
mode = kwargs.pop('mode', minimalmodbus.MODE_RTU)
port = kwargs.pop('port', '/dev/ttyUSB0')
slave_address = kwargs.pop('slave_address', 1)
port = kwargs.pop('port', '/dev/ttyUSB0')
slave_address = kwargs.pop('slave_address', 1)
super().__init__(**kwargs)
self.bus = minimalmodbus.Instrument(port=port, slaveaddress=slave_address) # port name, address (decimal)
self.bus.serial.baudrate = baudrate # Baud rate 9600 as listed in doc
......
......@@ -413,6 +413,9 @@ class OhmPi(object):
def run_measurement(self, quad=None, nb_stack=None, injection_duration=None,
autogain=True, strategy='constant', tx_volt=5., best_tx_injtime=0.1,
cmd_id=None, **kwargs):
# TODO: add sampling_interval -> impact on _hw.rx.sampling_rate (store the current value, change the _hw.rx.sampling_rate, do the measurement, reset the sampling_rate to the previous value)
# TODO: default value of tx_volt and other parameters set to None should be given in config.py and used in function definition
# TODO: add rs_check option (or propose an other way to do this)
"""Measures on a quadrupole and returns transfer resistance.
Parameters
......@@ -669,6 +672,7 @@ class OhmPi(object):
# isolate electrodes that are responsible for high resistances (ex: AB high, AC low, BC high
# -> might be a problem at B (cf what we did with WofE)
def rs_check(self, tx_volt=12., cmd_id=None):
# TODO: add a default value for rs-check in config.py import it in ohmpi.py and add it in rs_check definition
"""Checks contact resistances
Parameters
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment