Commit 957104a2 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Adds a Pwr component and refactors inject

Showing with 137 additions and 114 deletions
+137 -114
......@@ -18,12 +18,13 @@ OHMPI_CONFIG = {
}
HARDWARE_CONFIG = {
'controller': {'model' : 'raspberry_pi'
'controller': {'model' : 'raspberry_pi_i2c'
},
'pwr': {'model' : 'pwr_batt', 'voltage': 12.},
'tx' : {'model' : 'ohmpi_card_3_15',
'mcp_board_address': 0x20,
'voltage_max': 12., # Maximum voltage [V]
'current_max': 4800 / 50 / 2, # Maximum current [mA]
'voltage_max': 12., # Maximum voltage supported by the TX board [V]
'current_max': 4800 / 50 / 2, # Maximum current supported by the TX board [mA]
'r_shunt': 2 # Shunt resistance in Ohms
},
'rx' : {'model': 'ohmpi_card_3_15',
......
......@@ -19,4 +19,4 @@ DPS.serial.parity = 'N' # No parity
DPS.mode = minimalmodbus.MODE_RTU # RTU mode
DPS.write_register(0x0001, 40, 0) # (last number) 0 is for mA, 3 is for A
DPS.write_register(0x0000, 5, 0) self.DPS.write_register(0x0000, tx_volt, 2)
\ No newline at end of file
DPS.write_register(0x0000, 5, 0) self.pwr.write_register(0x0000, tx_volt, 2)
\ No newline at end of file
......@@ -35,6 +35,58 @@ class ControllerAbstract(ABC):
def _cpu_temp(self):
pass
class PwrAbstract(ABC):
def __init__(self, **kwargs):
self.board_name = kwargs.pop('board_name', 'unknown Pwr hardware')
self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec_mux')
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_mux')
self.voltage_adjustable = kwargs.pop('voltage_adjustable', False)
self._voltage = np.nan
self._current_adjustable = kwargs.pop('current_adjustable', False)
self._current = np.nan
self._state = 'off'
@property
@abstractmethod
def current(self):
# add actions to read the DPS current
return self._current
@current.setter
@abstractmethod
def current(self, value, **kwargs):
# add actions to set the DPS current
pass
@abstractmethod
def turn_off(self):
self.exec_logger.debug(f'Switching {self.board_name} off')
self._state = 'off'
@abstractmethod
def turn_on(self):
self.exec_logger.debug(f'Switching {self.board_name} on')
self._state = 'on'
@property
@abstractmethod
def voltage(self):
# add actions to read the DPS voltage
return self._voltage
@voltage.setter
@abstractmethod
def voltage(self, value):
assert isinstance(value, float)
if not self.voltage_adjustable:
self.exec_logger.warning(f'Voltage cannot be set on {self.board_name}...')
else:
# add actions to set the DPS voltage
self._voltage = value
class MuxAbstract(ABC):
def __init__(self, **kwargs):
self.board_name = kwargs.pop('board_name', 'unknown MUX hardware')
......@@ -159,10 +211,6 @@ class MuxAbstract(ABC):
class TxAbstract(ABC):
def __init__(self, **kwargs):
self.board_name = kwargs.pop('board_name', 'unknown TX hardware')
polarity = kwargs.pop('polarity', 1)
if polarity is None:
polarity = 0
self._polarity = polarity
inj_time = kwargs.pop('inj_time', 1.)
self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None:
......@@ -171,13 +219,10 @@ class TxAbstract(ABC):
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_tx')
self.controller = kwargs.pop('controller', None)
self.pwr = kwargs.pop('pwr', None)
self._inj_time = None
self._dps_state = 'off'
self._adc_gain = 1.
self.inj_time = inj_time
self._voltage = 0.
self.voltage_adjustable = True
self._current_adjustable = False
self.exec_logger.debug(f'{self.board_name} TX initialization')
@property
......@@ -193,25 +238,22 @@ class TxAbstract(ABC):
def adc_gain_auto(self):
pass
@property
@abstractmethod
def current(self):
# add actions to read the TX current and return it
return None
@current.setter
@abstractmethod
def current(self, value, **kwargs):
# add actions to set the DPS current
pass
@abstractmethod
def current_pulse(self, **kwargs):
pass
@abstractmethod
def inject(self, state='on'):
assert state in ['on', 'off']
def inject(self, polarity=1, inj_time=None):
assert polarity in [-1,0,1]
if inj_time is None:
inj_time = self._inj_time
if np.abs(polarity) > 0:
self.pwr.turn_on()
time.sleep(inj_time)
self.pwr.turn_off()
else:
self.pwr.turn_off()
time.sleep(inj_time)
@property
def inj_time(self):
......@@ -222,44 +264,13 @@ class TxAbstract(ABC):
assert isinstance(value, float)
self._inj_time = value
@property
def polarity(self):
return self._polarity
@polarity.setter
def polarity(self, value):
assert value in [-1,0,1]
self._polarity = value
# add actions to set the polarity (switch relays)
def turn_off(self):
self.exec_logger.debug(f'Switching DPS off')
self._dps_state = 'off'
def turn_on(self):
self.exec_logger.debug(f'Switching DPS on')
self._dps_state = 'on'
@property
def voltage(self):
return self._voltage
@voltage.setter
def voltage(self, value):
assert isinstance(value, float)
if not self.voltage_adjustable:
self.exec_logger.warning(f'Voltage cannot be set on {self.board_name}...')
else:
self._voltage = value
# Add specifics to set DPS voltage
@property
@abstractmethod
def tx_bat(self):
pass
def voltage_pulse(self, voltage=0., length=None, polarity=None):
def voltage_pulse(self, voltage=0., length=None, polarity=1):
""" Generates a square voltage pulse
Parameters
......@@ -273,15 +284,9 @@ class TxAbstract(ABC):
"""
if length is None:
length = self.inj_time
if polarity is None:
polarity = self.polarity
self.polarity = polarity
self.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity * voltage:.3f} V for {length:.3f} s')
self.inject(state='on')
time.sleep(length)
# self.tx_sync.clear()
self.inject(state='off')
self.pwr.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity * self.pwr.voltage:.3f} V for {length:.3f} s')
self.inject(polarity=polarity, inj_time=length)
class RxAbstract(ABC):
......
......@@ -101,7 +101,7 @@ class Tx(TxAbstract):
# DPH 5005 Digital Power Supply
self.turn_on()
time.sleep(TX_CONFIG['dps_switch_on_warm_up'])
self.DPS = None
self.pwr = None
# I2C connexion to MCP23008, for current injection
self.pin4 = self.mcp_board.get_pin(4) # Ohmpi_run
......@@ -145,33 +145,28 @@ class Tx(TxAbstract):
assert TX_CONFIG['current_min'] <= value <= TX_CONFIG['current_max']
self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board')
def inject(self, state='on'):
TxAbstract.inject(self, state=state)
# Add specifics here...
@property
def polarity(self):
return TxAbstract.polarity.fget(self)
@polarity.setter
def polarity(self, value):
TxAbstract.polarity.fset(self, value)
if value==1:
def inject(self, polarity=1, inj_time=None):
assert polarity in [-1,0,1]
if polarity==1:
self.pin0.value = True
self.pin1.value = False
elif value==-1:
time.sleep(0.005) # Max turn on time of 211EH relays = 5ms
elif polarity==-1:
self.pin0.value = False
self.pin1.value = True
time.sleep(0.005) # Max turn on time of 211EH relays = 5ms
else:
self.pin0.value = False
self.pin1.value = False
#time.sleep(0.001) # TODO: check max switching time of relays
time.sleep(0.001) # Max turn off time of 211EH relays = 1ms
TxAbstract.inject(self, polarity=polarity, inj_time=None)
def turn_off(self):
pass
self.pwr.turn_off(self)
def turn_on(self):
pass
self.pwr.turn_on(self)
@property
def tx_bat(self):
......@@ -179,7 +174,7 @@ class Tx(TxAbstract):
self.exec_logger.debug(f'{self.board_name} cannot read battery voltage. Returning default battery voltage.')
return TX_CONFIG['low_battery']
def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=None):
def voltage_pulse(self, voltage=TX_CONFIG['default_voltage'], length=None, polarity=1):
""" Generates a square voltage pulse
Parameters
......@@ -194,28 +189,11 @@ class Tx(TxAbstract):
if length is None:
length = self.inj_time
if polarity is None:
polarity = self.polarity
self.polarity = polarity
self.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity*voltage:.3f} V for {length:.3f} s')
self.inject(state='on')
time.sleep(length)
self.inject(state='off')
self.pwr.voltage = voltage
self.exec_logger.debug(f'Voltage pulse of {polarity*self.pwr.voltage:.3f} V for {length:.3f} s')
self.inject(polarity=polarity, inj_time=length)
@property
def voltage(self):
return self._voltage
@voltage.setter
def voltage(self, value):
assert isinstance(value, float)
value = np.max([TX_CONFIG['voltage_min'], np.min([value, TX_CONFIG['voltage_max']])])
if not self.voltage_adjustable:
self.exec_logger.warning(f'Voltage cannot be set on {self.board_name}...')
else:
self._voltage = value
class Rx(RxAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
......
from OhmPi.hardware_components.abstract_hardware_components import PwrAbstract
import numpy as np
import os
class Pwr(PwrAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
voltage = kwargs.pop('voltage', 12.)
super().__init__(**kwargs)
self.voltage_adjustable = False
self._voltage = voltage
self._current_adjustable = False
self._current = np.nan
@property
def current(self):
return self._current
@current.setter
def current(self, value, **kwargs):
self.exec_logger.debug(f'Current cannot be set on {self.board_name}')
def turn_off(self):
self.exec_logger.debug(f'{self.board_name} cannot be turned off')
def turn_on(self):
self.exec_logger.debug(f'{self.board_name} is always on')
@property
def voltage(self):
return PwrAbstract.voltage.fget(self)
@voltage.setter
def voltage(self, value):
PwrAbstract.voltage.fset(self, value)
\ No newline at end of file
......@@ -12,6 +12,7 @@ from OhmPi.config import HARDWARE_CONFIG
from threading import Thread, Event, Barrier
controller_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["controller"]["model"]}')
pwr_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["pwr"]["model"]}')
tx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}')
rx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["rx"]["model"]}')
MUX_CONFIG = {}
......@@ -55,10 +56,15 @@ class OhmPiHardware:
data_logger=self.data_logger,
soh_logger=self.soh_logger,
controller=self.controller))
self.pwr = kwargs.pop('pwr', pwr_module.Pwr(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger,
controller=self.controller))
self.tx = kwargs.pop('tx', tx_module.Tx(exec_logger=self.exec_logger,
data_logger=self.data_logger,
soh_logger=self.soh_logger,
controller=self.controller))
self.tx.pwr = self.pwr
self._cabling = kwargs.pop('cabling', {})
self.mux_boards = kwargs.pop('mux', {'mux_1': mux_module.Mux(id='mux_1',
exec_logger=self.exec_logger,
......@@ -127,12 +133,12 @@ class OhmPiHardware:
return 0.
else:
n_pulses = int(np.max(self.readings[:, 1]))
polarity = np.array([np.mean(self.readings[self.readings[:, 1] == i, 2]) for i in range(n_pulses + 1)])
polarity = np.array([np.median(self.readings[self.readings[:, 1]==i, 2]) for i in range(n_pulses + 1)])
mean_vmn = []
mean_iab = []
for i in range(n_pulses + 1):
mean_vmn.append(np.mean(self.readings[self.readings[:, 1] == i, 4]))
mean_iab.append(np.mean(self.readings[self.readings[:, 1] == i, 3]))
mean_vmn.append(np.mean(self.readings[self.readings[:, 1]==i, 4]))
mean_iab.append(np.mean(self.readings[self.readings[:, 1]==i, 3]))
mean_vmn = np.array(mean_vmn)
mean_iab = np.array(mean_iab)
sp = np.mean(mean_vmn[np.ix_(polarity==1)] - mean_vmn[np.ix_(polarity==-1)]) / 2
......@@ -182,13 +188,12 @@ class OhmPiHardware:
vab_max = np.abs(vab_max)
vmn_min = np.abs(vmn_min)
vab = np.min([np.abs(tx_volt), vab_max])
self.tx.polarity = 1
self.tx.turn_on()
if self.rx.sampling_rate*1000 > best_tx_injtime:
sampling_rate = best_tx_injtime # TODO: check this...
else:
sampling_rate = self.tx.sampling_rate
self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate)
self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
vmn = np.mean(self.readings[:,4])
iab = np.mean(self.readings[:,3])
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
......@@ -206,7 +211,6 @@ class OhmPiHardware:
else:
self.tx.exec_logger.debug(f'Constant strategy for setting VAB, using {vab} V')
self.tx.turn_off()
self.tx.polarity = 0
rab = (np.abs(vab) * 1000.) / iab
self.exec_logger.debug(f'RAB = {rab:.2f} Ohms')
if vmn < 0:
......
......@@ -3,7 +3,7 @@ from utils import change_config
import logging
change_config('config_mb_2023_mux_2024.py', verbose=False)
from OhmPi.hardware_components.mux_2024_rev_0_0 import Mux, MUX_CONFIG
from OhmPi.hardware_components import raspberry_pi as controller_module
from OhmPi.hardware_components import raspberry_pi_i2c as controller_module
stand_alone_mux = False
part_of_hardware_system = False
......
......@@ -16,8 +16,7 @@ rx = Rx(exec_logger= exec_logger, soh_logger= soh_logger)
print(f'TX current: {tx.current:.3f} mA')
print(f'RX voltage: {rx.voltage:.3f} mV')
tx.polarity = 1
tx.inject(state='on')
tx.inject(state='on', polarity=1)
tx.adc_gain_auto()
rx.adc_gain_auto()
r = []
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment