Commit 846dfb66 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Updates hardwares and adds a first version of ohmpi_card_3.15

Showing with 347 additions and 50 deletions
+347 -50
......@@ -50,8 +50,9 @@ HARDWARE_CONFIG = {
'controller': {'model' : 'dummy_controller'
},
'tx' : {'model' : 'dummy_tx',
'Imax': 4800 / 50 / 2, # Maximum current
'R_shunt': 2 # Shunt resistance in Ohms
'current_max': 4800 / 50 / 2, # Maximum current mA
'r_shunt': 2, # Shunt resistance in Ohms
'low_battery': 12. # Volts
},
'rx' : {'model': 'dummy_rx',
},
......
from abc import ABC, abstractmethod
from OhmPi.logging_setup import create_stdout_logger
import time
class ControllerAbstract(ABC):
def __init__(self, **kwargs):
......@@ -13,6 +14,8 @@ class TxAbstract(ABC):
def __init__(self, **kwargs):
self.board_name = kwargs.pop('board_name', 'unknown TX hardware')
polarity = kwargs.pop('polarity', 1)
if polarity is None:
polarity = 0
inj_time = kwargs.pop('inj_time', 1.)
self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None:
......@@ -24,6 +27,7 @@ class TxAbstract(ABC):
self._inj_time = None
self._dps_state = 'off'
self._adc_gain = 1.
print(f'polarity : {polarity}')
self.polarity = polarity
self.inj_time = inj_time
self.exec_logger.debug(f'{self.board_name} TX initialization')
......@@ -105,8 +109,8 @@ class TxAbstract(ABC):
def tx_bat(self):
pass
@abstractmethod
def voltage_pulse(self, voltage, length, polarity):
def voltage_pulse(self, voltage=0., length=None, polarity=None):
""" Generates a square voltage pulse
Parameters
......@@ -118,7 +122,17 @@ class TxAbstract(ABC):
polarity: 1,0,-1
Polarity of the pulse
"""
pass
if length is None:
length = self.inj_time
if polarity is None:
polarity = self.polarity
self.polarity = polarity
self.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity * voltage:.3f} V for {length:.3f} s')
self.inject(state='on')
time.sleep(length)
self.tx_sync.clear()
self.inject(state='off')
class RxAbstract(ABC):
......
......@@ -4,11 +4,13 @@ import os
from OhmPi.hardware import RxAbstract
RX_CONFIG = HARDWARE_CONFIG['rx']
# hardware limits
voltage_min = 10. # mV
voltage_max = 4500.
RX_CONFIG['voltage_min'] = voltage_min # mV
RX_CONFIG['voltage_max'] = voltage_max
# hardware characteristics and limitations
# ADC for voltage
voltage_adc_voltage_min = 10. # mV
voltage_adc_voltage_max = 4500.
RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV
RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV
class Rx(RxAbstract):
def __init__(self, **kwargs):
......@@ -25,8 +27,8 @@ class Rx(RxAbstract):
self.exec_logger.debug(f'Setting RX ADC gain to {value}')
def adc_gain_auto(self):
gain = 1
self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}')
gain = 1.
self.exec_logger.debug(f'Setting RX ADC gain automatically to {gain}')
self.adc_gain = gain
@property
......
......@@ -6,15 +6,22 @@ from OhmPi.hardware import TxAbstract
TX_CONFIG = HARDWARE_CONFIG['tx']
# hardware limits
voltage_min = 10. # mV
voltage_max = 4500.
TX_CONFIG['current_min'] = voltage_min / (TX_CONFIG['R_shunt'] * 50) # mA
TX_CONFIG['current_max'] = voltage_max / (TX_CONFIG['R_shunt'] * 50)
TX_CONFIG['default_voltage'] = 5. # V
TX_CONFIG['voltage_max'] = 50. # V
TX_CONFIG['dps_switch_on_warm_up'] = 4. # 4 seconds
# ADC for current
current_adc_voltage_min = 10. # mV
current_adc_voltage_max = 4500. # mV
# DPS
dps_voltage_max = 50. # V
dps_default_voltage = 5. # V
dps_switch_on_warmup = 4. # seconds
tx_low_battery = 12. # V
TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop('current_min', np.inf)]) # mA
TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop(['current_max'], np.inf)]) # mA
TX_CONFIG['voltage_max'] = np.min([dps_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V
TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', dps_default_voltage), TX_CONFIG['voltage_max']]) # V
TX_CONFIG['dps_switch_on_warm_up'] = TX_CONFIG.pop('dps_switch_on_warmup', dps_switch_on_warmup)
TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', tx_low_battery)
class Tx(TxAbstract):
def inject(self, state='on'):
......@@ -25,7 +32,7 @@ class Tx(TxAbstract):
super().__init__(**kwargs)
self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage'])
self._adc_gain = 1
self._adc_gain = 1.
self.polarity = 0
self.turn_on()
......@@ -33,6 +40,7 @@ class Tx(TxAbstract):
self.exec_logger.info(f'TX battery: {self.tx_bat:.1f} V')
self.turn_off()
@property
def adc_gain(self):
return self._adc_gain
......@@ -98,14 +106,9 @@ class Tx(TxAbstract):
polarity: 1,0,-1
Polarity of the pulse
"""
kwargs = locals()
kwargs.pop('self')
kwargs.pop('__class__')
print(kwargs)
super().voltage_pulse(**kwargs)
if length is None:
length = self.inj_time
if polarity is None:
polarity = self.polarity
self.polarity = polarity
self.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity*voltage:.3f} V for {length:.3f} s')
self.inject(state='on')
time.sleep(length)
self.inject(state='off')
......@@ -14,16 +14,32 @@ controller_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["control
TX_CONFIG = OHMPI_CONFIG['rx']
RX_CONFIG = OHMPI_CONFIG['tx']
# hardware limits
voltage_min = 10. # mV
voltage_max = 4500.
RX_CONFIG['voltage_min'] = voltage_min # mV
RX_CONFIG['voltage_max'] = voltage_max
TX_CONFIG['current_min'] = voltage_min / (TX_CONFIG['R_shunt'] * 50) # mA
TX_CONFIG['current_max'] = voltage_max / (TX_CONFIG['R_shunt'] * 50)
TX_CONFIG['default_voltage'] = 5. # V
TX_CONFIG['voltage_max'] = 50. # V
TX_CONFIG['dps_switch_on_warm_up'] = 4. # 4 seconds
# hardware characteristics and limitations
# *** RX ***
# ADC for voltage
voltage_adc_voltage_min = 10. # mV
voltage_adc_voltage_max = 4500.
RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV
RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV
# *** TX ***
# ADC for current
current_adc_voltage_min = 10. # mV
current_adc_voltage_max = 4500. # mV
# DPS
dps_voltage_max = 50. # V
dps_default_voltage = 5. # V
dps_switch_on_warmup = 4. # seconds
tx_low_battery = 12. # V
TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop('current_min', np.inf)]) # mA
TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop(['current_max'], np.inf)]) # mA
TX_CONFIG['voltage_max'] = np.min([dps_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V
TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', dps_default_voltage), TX_CONFIG['voltage_max']]) # V
TX_CONFIG['dps_switch_on_warm_up'] = TX_CONFIG.pop('dps_switch_on_warmup', dps_switch_on_warmup)
TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', tx_low_battery)
def _gain_auto(channel):
"""Automatically sets the gain on a channel
......@@ -184,8 +200,8 @@ class Tx(TxAbstract):
@property
def tx_bat(self):
tx_bat = self.DPS.read_register(0x05, 2)
if tx_bat < 12.:
self.soh_logger.debug(f'Low TX Battery: {tx_bat:.1f} V')
if tx_bat < TX_CONFIG['low_battery']:
self.soh_logger.warning(f'Low TX Battery: {tx_bat:.1f} V')
return tx_bat
def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=None):
......
import importlib
from OhmPi.config import OHMPI_CONFIG
import adafruit_ads1x15.ads1115 as ads # noqa
from adafruit_ads1x15.analog_in import AnalogIn # noqa
from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa
from digitalio import Direction # noqa
import minimalmodbus # noqa
import time
import numpy as np
import os
from OhmPi.hardware import TxAbstract, RxAbstract
controller_module = importlib.import_module(f'{OHMPI_CONFIG["hardware"]["controller"]["model"]}')
TX_CONFIG = OHMPI_CONFIG['rx']
RX_CONFIG = OHMPI_CONFIG['tx']
# hardware characteristics and limitations
# *** RX ***
# ADC for voltage
voltage_adc_voltage_min = 10. # mV
voltage_adc_voltage_max = 4500.
RX_CONFIG['voltage_min'] = np.min([voltage_adc_voltage_min, RX_CONFIG.pop('voltage_min', np.inf)]) # mV
RX_CONFIG['voltage_max'] = np.min([voltage_adc_voltage_max, RX_CONFIG.pop('voltage_max', np.inf)]) # mV
# *** TX ***
# ADC for current
current_adc_voltage_min = 10. # mV
current_adc_voltage_max = 4500. # mV
# DPS
dps_voltage_max = 12. # V
dps_default_voltage = 12. # V
dps_switch_on_warmup = 0. # seconds
TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop('current_min', np.inf)]) # mA
TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop(['current_max'], np.inf)]) # mA
TX_CONFIG['voltage_max'] = np.min([dps_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V
TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', dps_default_voltage), TX_CONFIG['voltage_max']]) # V
TX_CONFIG['dps_switch_on_warm_up'] = TX_CONFIG.pop('dps_switch_on_warmup', dps_switch_on_warmup)
def _gain_auto(channel):
"""Automatically sets the gain on a channel
Parameters
----------
channel : ads.ADS1x15
Instance of ADS where voltage is measured.
Returns
-------
gain : float
Gain to be applied on ADS1115.
"""
gain = 2 / 3
if (abs(channel.voltage) < 2.040) and (abs(channel.voltage) >= 1.0):
gain = 2
elif (abs(channel.voltage) < 1.0) and (abs(channel.voltage) >= 0.500):
gain = 4
elif (abs(channel.voltage) < 0.500) and (abs(channel.voltage) >= 0.250):
gain = 8
elif abs(channel.voltage) < 0.250:
gain = 16
return gain
class Tx(TxAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs)
self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage'])
self.controller = kwargs.pop('controller', controller_module.Controller())
# I2C connexion to MCP23008, for current injection
self.mcp_board = MCP23008(self.controller.bus, address=TX_CONFIG['mcp_board_address'])
# ADS1115 for current measurement (AB)
self._adc_gain = 2/3
self._ads_current_address = 0x48
self._ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860,
address=self._ads_current_address)
# Relays for pulse polarity
self.pin0 = self.mcp_board.get_pin(0)
self.pin0.direction = Direction.OUTPUT
self.pin1 = self.mcp_board.get_pin(1)
self.pin1.direction = Direction.OUTPUT
self.polarity = 0
# DPH 5005 Digital Power Supply
self.turn_on()
time.sleep(TX_CONFIG['dps_switch_on_warm_up'])
self.DPS = None
# I2C connexion to MCP23008, for current injection
self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run
self.pin4.direction = Direction.OUTPUT
self.pin4.value = True
self.exec_logger.info(f'TX battery: {self.tx_bat:.1f} V')
self.turn_off()
@property
def adc_gain(self):
return self._adc_gain
@adc_gain.setter
def adc_gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860,
address=self._ads_current_address)
self.exec_logger.debug(f'Setting TX ADC gain to {value}')
def adc_gain_auto(self):
gain = _gain_auto(AnalogIn(self._ads_current, ads.P0))
self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}')
self.adc_gain = gain
def current_pulse(self, **kwargs):
super().current_pulse(**kwargs)
self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board')
@property
def current(self):
""" Gets the current IAB in Amps
"""
return AnalogIn(self._ads_current, ads.P0).voltage * 1000. / (50 * TX_CONFIG['R_shunt']) # noqa measure current
@ current.setter
def current(self, value):
assert TX_CONFIG['current_min'] <= value <= TX_CONFIG['current_max']
self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board')
def inject(self, state='on'):
super().inject(state=state)
if state=='on':
self.DPS.write_register(0x09, 1) # DPS5005 on
else:
self.DPS.write_register(0x09, 0) # DPS5005 off
@property
def polarity(self):
return super().polarity
@polarity.setter
def polarity(self, value):
super().polarity(value)
if value==1:
self.pin0.value = True
self.pin1.value = False
elif value==-1:
self.pin0.value = False
self.pin1.value = True
else:
self.pin0.value = False
self.pin1.value = False
#time.sleep(0.001) # TODO: check max switching time of relays
@property
def voltage(self):
return self._voltage
@voltage.setter
def voltage(self, value):
self.exec_logger.warning(f'Voltage cannot be set on {self.board_name}...')
def turn_off(self):
pass
def turn_on(self):
pass
@property
def tx_bat(self):
self.soh_logger.warning(f'Cannot get battery voltage on {self.board_name}')
self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.')
return TX_CONFIG['low_battery']
def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=None):
""" Generates a square voltage pulse
Parameters
----------
voltage: float, optional
Voltage to apply in volts, tx_v_def is applied if omitted.
length: float, optional
Length of the pulse in seconds
polarity: 1,0,-1
Polarity of the pulse
"""
if length is None:
length = self.inj_time
if polarity is None:
polarity = self.polarity
self.polarity = polarity
self.voltage(voltage)
self.exec_logger.debug(f'Voltage pulse of {polarity*voltage:.3f} V for {length:.3f} s')
self.inject(state='on')
time.sleep(length)
self.inject(state='off')
class Rx(RxAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs)
self.controller = kwargs.pop('controller', controller_module.Controller())
# ADS1115 for voltage measurement (MN)
self._ads_voltage_address = 0x49
self._adc_gain = 2/3
self._ads_voltage = ads.ADS1115(self.controller.bus, gain=self._adc_gain, data_rate=860, address=self._ads_voltage_address)
@property
def adc_gain(self):
return self._adc_gain
@adc_gain.setter
def adc_gain(self, value):
assert value in [2/3, 2, 4, 8, 16]
self._adc_gain = value
self._ads_voltage = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860,
address=self._ads_voltage_address)
self.exec_logger.debug(f'Setting RX ADC gain to {value}')
def adc_gain_auto(self):
gain_0 = _gain_auto(AnalogIn(self._ads_voltage, ads.P0))
gain_2 = _gain_auto(AnalogIn(self._ads_voltage, ads.P2))
gain = np.min([gain_0, gain_2])[0]
self.exec_logger.debug(f'Setting TX ADC gain automatically to {gain}')
self.adc_gain = gain
@property
def voltage(self):
""" Gets the voltage VMN in Volts
"""
u0 = AnalogIn(self._ads_voltage, ads.P0).voltage * 1000.
u2 = AnalogIn(self._ads_voltage, ads.P2).voltage * 1000.
u = np.max([u0,u2]) * (np.heaviside(u0-u2, 1.) * 2 - 1.) # gets the max between u0 & u2 and set the sign
self.exec_logger.debug(f'Reading voltages {u0} V and {u2} V on RX. Returning {u} V')
return u
\ No newline at end of file
import importlib
import time
import numpy as np
from OhmPi.logging_setup import create_stdout_logger
from OhmPi.config import HARDWARE_CONFIG
from threading import Thread, Event
controller_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["controller"]["model"]}')
tx_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'OhmPi.hardware.{HARDWARE_CONFIG["rx"]["model"]}')
......@@ -25,6 +29,7 @@ class OhmPiHardware:
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh')
self.tx_sync = Event()
self.controller = kwargs.pop('controller',
controller_module.Controller(exec_logger=self.exec_logger,
data_logger=self.data_logger,
......@@ -38,13 +43,27 @@ class OhmPiHardware:
self.mux = kwargs.pop('mux', mux_module.Mux(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger))
def _vab_pulse(self, vab, length, polarity=None):
""" Gets VMN and IAB from a single voltage pulse
"""
def inject(length):
self.tx_sync.set()
self.tx.voltage_pulse(length=length)
self.tx_sync.clear()
def read_values():
current = []
self.tx_sync.wait()
start_time = time.gmtime()
while self.tx_sync.is_set():
current.append([time.gmtime() - start_time, self.tx.current, self.rx.voltage])
if polarity is not None and polarity != self.tx.polarity:
self.tx.polarity = polarity
self.tx.voltage = vab
self.tx.voltage_pulse(length=length)
injection = Thread(target=self.tx.voltage_pulse, kwargs={'length':length})
# set gains automatically
self.tx.adc_gain_auto()
self.rx.adc_gain_auto()
......@@ -99,13 +118,15 @@ class OhmPiHardware:
self.tx.polarity = 1
self.tx.turn_on()
vmn, iab = self._vab_pulse(vab=vab, length=best_tx_injtime)
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
if strategy == 'vmax':
# implement different strategies
if vab < vab_max and iab < current_max :
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
elif strategy == 'vmin':
if vab < vab_max and iab < current_max:
if vab <= vab_max and iab < current_max:
vab = vab * np.min([0.9 * vab_max / vab, vmn_min / np.abs(vmn), 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
elif strategy != 'constant':
self.tx.exec_logger.warning(f'Unknown strategy {strategy} for setting VAB! Using {vab} V')
......
......@@ -10,7 +10,7 @@ Olivier KAUFMANN (UMONS), Arnaud WATLET (UMONS) and Guillaume BLANCHY (FNRS/ULie
"""
import os
from utils import get_platform
from OhmPi.utils import get_platform
import json
import warnings
from copy import deepcopy
......@@ -21,8 +21,8 @@ import shutil
from datetime import datetime
from termcolor import colored
import threading
from logging_setup import setup_loggers
from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG
from OhmPi.logging_setup import setup_loggers
from OhmPi.config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG
from logging import DEBUG
# finish import (done only when class is instantiated as some libs are only available on arm64 platform)
......
......@@ -11,4 +11,3 @@ print('\nCreating TX...')
tx = Tx(exec_logger= exec_logger, soh_logger= soh_logger)
print('\nCreating RX...')
rx = Rx(exec_logger= exec_logger, soh_logger= soh_logger)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment