diff --git a/Proposition software architecture.odp b/Proposition software architecture.odp index b73ef69ade0d0f2ca80058e9fa60ffe56df09234..015445e49fc49b2d1db5a670465441101d07ccca 100644 Binary files a/Proposition software architecture.odp and b/Proposition software architecture.odp differ diff --git a/config.py b/config.py index 4468951265fa1ee997304e2c44cd47a0be6a5398..6345fe8396f9f6892fe8c48bf14539d7e4e9cc76 100644 --- a/config.py +++ b/config.py @@ -18,7 +18,7 @@ OHMPI_CONFIG = { } HARDWARE_CONFIG = { - 'controller': {'model' : 'raspberry_pi' + 'ctl': {'model' : 'raspberry_pi' }, 'tx' : {'model' : 'ohmpi_card_3_15', 'mcp_board_address': 0x20, diff --git a/config_default.py b/config_default.py index f9682f20669a2ce92fac37e23fd4eb7c306eb557..5839429a9473de1071cadc40c83c6ac539804133 100644 --- a/config_default.py +++ b/config_default.py @@ -18,7 +18,7 @@ OHMPI_CONFIG = { } HARDWARE_CONFIG = { - 'controller': {'model' : 'dummy_controller' + 'ctl': {'model' : 'dummy_ctl' }, 'tx' : {'model' : 'dummy_tx', 'current_max': 4800 / 50 / 2, # Maximum current mA diff --git a/config_dummy.py b/config_dummy.py index 0a0bc06e42ccf4d61329327b4dda22741e51cb49..4d3be74692ce837350a546fd791f4691a6cdc0db 100644 --- a/config_dummy.py +++ b/config_dummy.py @@ -27,8 +27,9 @@ OHMPI_CONFIG = { } # TODO: add a dictionary with INA models and associated gain values HARDWARE_CONFIG = { - 'controller': {'model' : 'dummy_controller' + 'ctl': {'model' : 'dummy_ctl' }, + 'pwr' : {'model': 'dummy_pwr'}, 'tx' : {'model' : 'dummy_tx', 'current_max': 4800 / 50 / 2, # Maximum current 'R_shunt': 2 # Shunt resistance in Ohms diff --git a/config_mb_2023_mux_2024.py b/config_mb_2023_mux_2024.py index 688d2f3cae8ebcd508377ec2a46df3ac38fdb0c5..8bb281bed52c6b973c4805ba24974874c2589fbb 100644 --- a/config_mb_2023_mux_2024.py +++ b/config_mb_2023_mux_2024.py @@ -18,7 +18,7 @@ OHMPI_CONFIG = { } HARDWARE_CONFIG = { - 'controller': {'model' : 'raspberry_pi_i2c' + 'ctl': {'model' : 'raspberry_pi_i2c' }, 'pwr': {'model' : 'pwr_batt', 'voltage': 12.}, 'tx' : {'model' : 'ohmpi_card_3_15', diff --git a/config_mb_2024_rev_0_0.py b/config_mb_2024_rev_0_0.py index 26b8c7fb97d3b38a775eca7de4c1a949fed8b5b9..61550a2dc92e158ee684d8b9c69778c8d98171f2 100644 --- a/config_mb_2024_rev_0_0.py +++ b/config_mb_2024_rev_0_0.py @@ -18,8 +18,10 @@ OHMPI_CONFIG = { } HARDWARE_CONFIG = { - 'controller': {'model' : 'raspberry_pi' + 'ctl': {'model' : 'raspberry_pi_i2c' }, + 'pwr': {'model' : 'DPS_5005', + 'voltage_adjustable': True}, 'tx' : {'model' : 'mb_2024_rev_0_0', 'mcp_board_address': 0x20, 'current_max': 4800 / 50 / 2, # Maximum current diff --git a/hardware_components/__init__.py b/hardware_components/__init__.py index bdc78fd8e121c2a5888be2fca509a1aa749bf997..975664318c1fbab760c45faf049838304e77486a 100644 --- a/hardware_components/__init__.py +++ b/hardware_components/__init__.py @@ -1 +1 @@ -from .abstract_hardware_components import TxAbstract, RxAbstract, MuxAbstract, ControllerAbstract \ No newline at end of file +from .abstract_hardware_components import TxAbstract, RxAbstract, MuxAbstract, CtlAbstract \ No newline at end of file diff --git a/hardware_components/abstract_hardware_components.py b/hardware_components/abstract_hardware_components.py index 4cfdb993717440686ea77ec9b9689289307f3a85..125cb188c9e845c25e694ae5f8e441f7329c6e2b 100644 --- a/hardware_components/abstract_hardware_components.py +++ b/hardware_components/abstract_hardware_components.py @@ -5,9 +5,9 @@ from OhmPi.logging_setup import create_stdout_logger import time from threading import Barrier -class ControllerAbstract(ABC): +class CtlAbstract(ABC): def __init__(self, **kwargs): - self.board_name = kwargs.pop('board_name', 'unknown Controller hardware') + self.board_name = kwargs.pop('board_name', 'unknown CTL hardware') self.bus = None # TODO: allow for several buses self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: @@ -15,7 +15,7 @@ class ControllerAbstract(ABC): self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_ctl') - self.exec_logger.debug(f'{self.board_name} Controller initialization') + self.exec_logger.debug(f'{self.board_name} Ctl initialization') self._cpu_temp_available = False self.max_cpu_temp = np.inf @@ -37,7 +37,7 @@ class ControllerAbstract(ABC): class PwrAbstract(ABC): def __init__(self, **kwargs): - self.board_name = kwargs.pop('board_name', 'unknown Pwr hardware') + self.board_name = kwargs.pop('board_name', 'unknown PWR hardware') self.exec_logger = kwargs.pop('exec_logger', None) if self.exec_logger is None: self.exec_logger = create_stdout_logger('exec_mux') @@ -100,7 +100,7 @@ class MuxAbstract(ABC): if self.board_id is None: self.exec_logger.error(f'MUX {self.board_name} should have an id !') self.exec_logger.debug(f'MUX {self.board_id} ({self.board_name}) initialization') - self.controller = kwargs.pop('controller', None) + self.ctl = kwargs.pop('ctl', None) cabling = kwargs.pop('cabling', None) self.cabling = {} if cabling is not None: @@ -218,7 +218,7 @@ class TxAbstract(ABC): self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_tx') - self.controller = kwargs.pop('controller', None) + self.ctl = kwargs.pop('ctl', None) self.pwr = kwargs.pop('pwr', None) self._inj_time = None self._adc_gain = 1. @@ -297,7 +297,7 @@ class RxAbstract(ABC): self.soh_logger = kwargs.pop('soh_logger', None) if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_rx') - self.controller = kwargs.pop('controller', None) + self.ctl = kwargs.pop('ctl', None) self.board_name = kwargs.pop('board_name', 'unknown RX hardware') self._sampling_rate = kwargs.pop('sampling_rate', 1) self.exec_logger.debug(f'{self.board_name} RX initialization') diff --git a/hardware_components/dummy_controller.py b/hardware_components/dummy_ctl.py similarity index 54% rename from hardware_components/dummy_controller.py rename to hardware_components/dummy_ctl.py index 829881f3fb3fde7b0730ebabccf4ffda96befb33..0e977091eb2654efb3f5807f861fa12fd487e703 100644 --- a/hardware_components/dummy_controller.py +++ b/hardware_components/dummy_ctl.py @@ -1,10 +1,10 @@ from OhmPi.config import HARDWARE_CONFIG import os -from OhmPi.hardware_components import ControllerAbstract -CONTROLLER_CONFIG = HARDWARE_CONFIG['controller'] +from OhmPi.hardware_components import CtlAbstract +CTL_CONFIG = HARDWARE_CONFIG['ctl'] -class Controller(ControllerAbstract): +class Ctl(CtlAbstract): def __init__(self, **kwargs): kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) diff --git a/hardware_components/mb_2024_rev_0_0.py b/hardware_components/mb_2024_rev_0_0.py index 8b55db7846d0905de474e6ecf2afd4bae23c617a..37b2f3ea0dc30bd8dad944936672304e88bdfad4 100644 --- a/hardware_components/mb_2024_rev_0_0.py +++ b/hardware_components/mb_2024_rev_0_0.py @@ -9,7 +9,7 @@ import time import numpy as np import os from OhmPi.hardware_components import TxAbstract, RxAbstract -controller_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["hardware"]["controller"]["model"]}') +ctl_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["hardware"]["ctl"]["model"]}') TX_CONFIG = HARDWARE_CONFIG['tx'] RX_CONFIG = HARDWARE_CONFIG['rx'] @@ -71,15 +71,15 @@ class Tx(TxAbstract): kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage']) - self.controller = kwargs.pop('controller', controller_module.Controller()) + self.ctl = kwargs.pop('controller', ctl_module.Ctl()) # I2C connexion to MCP23008, for current injection - self.mcp_board = MCP23008(self.controller.bus, address=TX_CONFIG['mcp_board_address']) + self.mcp_board = MCP23008(self.ctl.bus, address=TX_CONFIG['mcp_board_address']) # ADS1115 for current measurement (AB) self._adc_gain = 2/3 self._ads_current_address = 0x48 - self._ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, address=self._ads_current_address) # Relays for pulse polarity @@ -122,7 +122,7 @@ class Tx(TxAbstract): def adc_gain(self, value): assert value in [2/3, 2, 4, 8, 16] self._adc_gain = value - self._ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, address=self._ads_current_address) self.exec_logger.debug(f'Setting TX ADC gain to {value}') @@ -148,12 +148,13 @@ class Tx(TxAbstract): assert TX_CONFIG['current_min'] <= value <= TX_CONFIG['current_max'] self.exec_logger.warning(f'Current pulse is not implemented for the {TX_CONFIG["model"]} board') - def inject(self, state='on'): - TxAbstract.inject(self, state=state) - if state=='on': - self.DPS.write_register(0x09, 1) # DPS5005 on - else: - self.DPS.write_register(0x09, 0) # DPS5005 off + def inject(self, polarity=1, inj_time=None): + TxAbstract.inject(self, polarity=polarity, inj_time=inj_time) + # move this part in DPS5005 + # if state=='on': + # self.DPS.write_register(0x09, 1) # DPS5005 on + # else: + # self.DPS.write_register(0x09, 0) # DPS5005 off @property def polarity(self): @@ -234,12 +235,12 @@ class Rx(RxAbstract): def __init__(self, **kwargs): kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) - self.controller = kwargs.pop('controller', controller_module.Controller()) + self.ctl = kwargs.pop('controller', ctl_module.Ctl()) # ADS1115 for voltage measurement (MN) self._ads_voltage_address = 0x49 self._adc_gain = 2/3 - self._ads_voltage = ads.ADS1115(self.controller.bus, gain=self._adc_gain, data_rate=860, address=self._ads_voltage_address) + self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self._adc_gain, data_rate=860, address=self._ads_voltage_address) @property def adc_gain(self): @@ -249,7 +250,7 @@ class Rx(RxAbstract): def adc_gain(self, value): assert value in [2/3, 2, 4, 8, 16] self._adc_gain = value - self._ads_voltage = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, address=self._ads_voltage_address) self.exec_logger.debug(f'Setting RX ADC gain to {value}') diff --git a/hardware_components/mux_2024_rev_0_0.py b/hardware_components/mux_2024_rev_0_0.py index 39538a94e537daccb48f4b8513a2819020e56f31..7065ee8bf680a53cec1ec120d495e6f931149377 100644 --- a/hardware_components/mux_2024_rev_0_0.py +++ b/hardware_components/mux_2024_rev_0_0.py @@ -72,9 +72,9 @@ class Mux(MuxAbstract): self.exec_logger.error(f'Invalid role assignment for {self.board_name}: {self._roles} !') self._mode = '' if tca_address is None: - self._tca = self.controller.bus + self._tca = self.ctl.bus else: - self._tca = adafruit_tca9548a.TCA9548A(self.controller.bus, tca_address)[tca_channel] + self._tca = adafruit_tca9548a.TCA9548A(self.ctl.bus, tca_address)[tca_channel] self._mcp_addresses = (kwargs.pop('mcp_0', '0x22'), kwargs.pop('mcp_1', '0x23')) # TODO: add assert on valid addresses.. self._mcp = [None, None] self.reset() diff --git a/hardware_components/ohmpi_card_3_15.py b/hardware_components/ohmpi_card_3_15.py index 22c732475d54828d783246e1f78417f7a9ba4eaa..8a95e6bb731069c2903c36021f55c270a4587af1 100644 --- a/hardware_components/ohmpi_card_3_15.py +++ b/hardware_components/ohmpi_card_3_15.py @@ -10,8 +10,8 @@ import time import numpy as np import os from OhmPi.hardware_components import TxAbstract, RxAbstract -controller_name = HARDWARE_CONFIG['controller'].pop('board_name', 'raspberry_pi_i2c') -controller_module = importlib.import_module(f'OhmPi.hardware_components.{controller_name}') +ctl_name = HARDWARE_CONFIG['ctl'].pop('board_name', 'raspberry_pi_i2c') +ctl_module = importlib.import_module(f'OhmPi.hardware_components.{ctl_name}') TX_CONFIG = HARDWARE_CONFIG['tx'] RX_CONFIG = HARDWARE_CONFIG['rx'] @@ -32,16 +32,17 @@ current_adc_voltage_min = 10. # mV current_adc_voltage_max = 4500. # mV low_battery = 12. # V (conventional value as it is not measured on this board) tx_mcp_board_address = 0x20 # -dps_voltage_max = 12. # V -dps_default_voltage = 12. # V -dps_switch_on_warmup = 0. # seconds +# pwr_voltage_max = 12. # V +# pwr_default_voltage = 12. # V +# pwr_switch_on_warmup = 0. # seconds TX_CONFIG['current_min'] = np.min([current_adc_voltage_min / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop('current_min', np.inf)]) # mA TX_CONFIG['current_max'] = np.min([current_adc_voltage_max / (TX_CONFIG['r_shunt'] * 50), TX_CONFIG.pop('current_max', np.inf)]) # mA -TX_CONFIG['voltage_max'] = np.min([dps_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V +# TX_CONFIG['voltage_max'] = np.min([pwr_voltage_max, TX_CONFIG.pop('voltage_max', np.inf)]) # V +TX_CONFIG['voltage_max'] = TX_CONFIG.pop('voltage_max', np.inf) # V TX_CONFIG['voltage_min'] = -TX_CONFIG['voltage_max'] # V -TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', dps_default_voltage), TX_CONFIG['voltage_max']]) # V -TX_CONFIG['dps_switch_on_warm_up'] = TX_CONFIG.pop('dps_switch_on_warmup', dps_switch_on_warmup) +# TX_CONFIG['default_voltage'] = np.min([TX_CONFIG.pop('default_voltage', pwr_default_voltage), TX_CONFIG['voltage_max']]) # V +# TX_CONFIG['pwr_switch_on_warm_up'] = TX_CONFIG.pop('pwr_switch_on_warmup', pwr_switch_on_warmup) TX_CONFIG['mcp_board_address'] = TX_CONFIG.pop('mcp_board_address', tx_mcp_board_address) TX_CONFIG['low_battery'] = TX_CONFIG.pop('low_battery', low_battery) @@ -77,15 +78,15 @@ class Tx(TxAbstract): self._voltage = kwargs.pop('voltage', TX_CONFIG['default_voltage']) self.voltage_adjustable = False self.current_adjustable = False - if self.controller is None: - self.controller = controller_module.Controller() + if self.ctl is None: + self.ctl = ctl_module.Ctl() # I2C connexion to MCP23008, for current injection - self.mcp_board = MCP23008(self.controller.bus, address=TX_CONFIG['mcp_board_address']) + self.mcp_board = MCP23008(self.ctl.bus, address=TX_CONFIG['mcp_board_address']) # ADS1115 for current measurement (AB) self._ads_current_address = 0x48 - self._ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, address=self._ads_current_address) self._ads_current.mode = Mode.CONTINUOUS @@ -113,7 +114,7 @@ class Tx(TxAbstract): def adc_gain(self, value): assert value in [2/3, 2, 4, 8, 16] self._adc_gain = value - self._ads_current = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + self._ads_current = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, address=self._ads_current_address) self.exec_logger.debug(f'Setting TX ADC gain to {value}') @@ -192,13 +193,13 @@ class Rx(RxAbstract): def __init__(self, **kwargs): kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) - if self.controller is None: - self.controller = controller_module.Controller() + if self.ctl is None: + self.ctl = ctl_module.Ctl() # ADS1115 for voltage measurement (MN) self._ads_voltage_address = 0x49 self._adc_gain = 2/3 - self._ads_voltage = ads.ADS1115(self.controller.bus, gain=self._adc_gain, data_rate=860, address=self._ads_voltage_address) + self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self._adc_gain, data_rate=860, address=self._ads_voltage_address) self._ads_voltage.mode = Mode.CONTINUOUS self._sampling_rate = kwargs.pop('sampling_rate', sampling_rate) @@ -210,7 +211,7 @@ class Rx(RxAbstract): def adc_gain(self, value): assert value in [2/3, 2, 4, 8, 16] self._adc_gain = value - self._ads_voltage = ads.ADS1115(self.controller.bus, gain=self.adc_gain, data_rate=860, + self._ads_voltage = ads.ADS1115(self.ctl.bus, gain=self.adc_gain, data_rate=860, address=self._ads_voltage_address) self.exec_logger.debug(f'Setting RX ADC gain to {value}') diff --git a/hardware_components/raspberry_pi_i2c.py b/hardware_components/raspberry_pi_i2c.py index 1169a2e9838b89dbed803358c5913fdd209588e5..46d7156df092eda4dd57939285c5498cc90f9767 100644 --- a/hardware_components/raspberry_pi_i2c.py +++ b/hardware_components/raspberry_pi_i2c.py @@ -1,11 +1,11 @@ -from OhmPi.hardware_components import ControllerAbstract +from OhmPi.hardware_components import CtlAbstract import board # noqa import busio # noqa import os from OhmPi.utils import get_platform from gpiozero import CPUTemperature # noqa -class Controller(ControllerAbstract): +class Ctl(CtlAbstract): def __init__(self, **kwargs): kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) super().__init__(**kwargs) diff --git a/hardware_system.py b/hardware_system.py index e4eea7fa124fd449ceb57196c49e5b53480bc38f..009dd29b767463a8ed0a2dd8ef8690cf6027b581 100644 --- a/hardware_system.py +++ b/hardware_system.py @@ -11,7 +11,7 @@ from OhmPi.utils import update_dict from OhmPi.config import HARDWARE_CONFIG from threading import Thread, Event, Barrier -controller_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["controller"]["model"]}') +ctl_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["ctl"]["model"]}') pwr_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["pwr"]["model"]}') tx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["tx"]["model"]}') rx_module = importlib.import_module(f'OhmPi.hardware_components.{HARDWARE_CONFIG["rx"]["model"]}') @@ -48,29 +48,28 @@ class OhmPiHardware: if self.soh_logger is None: self.soh_logger = create_stdout_logger('soh_hw') self.tx_sync = Event() - self.controller = kwargs.pop('controller', - controller_module.Controller(exec_logger=self.exec_logger, - data_logger=self.data_logger, - soh_logger=self.soh_logger)) + self.ctl = kwargs.pop('ctl', ctl_module.Ctl(exec_logger=self.exec_logger, + data_logger=self.data_logger, + soh_logger=self.soh_logger)) self.rx = kwargs.pop('rx', rx_module.Rx(exec_logger=self.exec_logger, - data_logger=self.data_logger, - soh_logger=self.soh_logger, - controller=self.controller)) + data_logger=self.data_logger, + soh_logger=self.soh_logger, + ctl=self.ctl)) self.pwr = kwargs.pop('pwr', pwr_module.Pwr(exec_logger=self.exec_logger, - data_logger=self.data_logger, - soh_logger=self.soh_logger, - controller=self.controller)) + data_logger=self.data_logger, + soh_logger=self.soh_logger, + ctl=self.ctl)) self.tx = kwargs.pop('tx', tx_module.Tx(exec_logger=self.exec_logger, - data_logger=self.data_logger, - soh_logger=self.soh_logger, - controller=self.controller)) + data_logger=self.data_logger, + soh_logger=self.soh_logger, + ctl=self.ctl)) self.tx.pwr = self.pwr self._cabling = kwargs.pop('cabling', {}) self.mux_boards = kwargs.pop('mux', {'mux_1': mux_module.Mux(id='mux_1', exec_logger=self.exec_logger, data_logger=self.data_logger, soh_logger=self.soh_logger, - controller=self.controller, + ctl=self.ctl, cabling = self._cabling)}) self.mux_barrier = Barrier(len(self.mux_boards) + 1) self._cabling={} diff --git a/hw.py b/hw.py deleted file mode 100644 index 2ad5fd8b5f4faea08e25ed77180fc77f9b32fa7b..0000000000000000000000000000000000000000 --- a/hw.py +++ /dev/null @@ -1,287 +0,0 @@ -# definition of hardware level functions -import numpy as np - -import board # noqa -import busio # noqa -import adafruit_tca9548a # noqa -import adafruit_ads1x15.ads1115 as ads # noqa -from adafruit_ads1x15.analog_in import AnalogIn # noqa -from adafruit_mcp230xx.mcp23008 import MCP23008 # noqa -from adafruit_mcp230xx.mcp23017 import MCP23017 # noqa -import digitalio # noqa -from digitalio import Direction # noqa -from gpiozero import CPUTemperature # noqa -import minimalmodbus # noqa -import time - -# global variable -i2c = busio.I2C(board.SCL, board.SDA) - -from config import OHMPI_CONFIG - - -class Alimentation(): - def __init__(self, address=0x20, tx_voltage=12): - self.mcp = MCP23017(i2c, address=address) - self.tx_voltage = tx_voltage - self.polarity = True - self.on = False - self.pinA = 0 - self.pinB = 1 - - # setup DPS - self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) - self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc - self.DPS.serial.bytesize = 8 # - self.DPS.serial.timeout = 1 # greater than 0.5 for it to work - self.DPS.debug = False # - self.DPS.serial.parity = 'N' # No parity - self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode - self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays) - # (last number) 0 is for mA, 3 is for A - - def turn_on(self): - if self.on is False: - self.DPS.write_register(0x09, 1) # DPS5005 on - self.on = True - - def turn_off(self): - self.DPS.write_register(0x09, 0) # DPS5005 off - self.on = False - - def start_injection(self, polarity=True): - # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) - self.polarity = polarity - if self.polarity: - self.pin0 = self.mcp.get_pin(self.pinA) - self.pin0.direction = Direction.OUTPUT - self.pin0.value = True - self.pin1 = self.mcp.get_pin(self.pinB) - self.pin1.direction = Direction.OUTPUT - self.pin1.value = False - else: - self.pin0 = self.mcp.get_pin(self.pinA) - self.pin0.direction = Direction.OUTPUT - self.pin0.value = False - self.pin1 = self.mcp.get_pin(self.pinB) - self.pin1.direction = Direction.OUTPUT - self.pin1.value = True - - def stop_injection(self): - self.pin0 = self.mcp.get_pin(self.pinA) - self.pin0.direction = Direction.OUTPUT - self.pin0.value = False - self.pin1 = self.mcp.get_pin(self.pinB) - self.pin1.direction = Direction.OUTPUT - self.pin1.value = False - - def set_polarity(self, polarity=True): - self.polarity = polarity - - def set_tx_voltage(self, tx_voltage=12): - if tx_voltage >= 0: - self.tx_voltage = tx_voltage - # set voltage for test - self.DPS.write_register(0x0000, tx_voltage, 2) - self.DPS.write_register(0x09, 1) # DPS5005 on - else: - raise ValueError('Voltage needs to be >= 0 V') - - -class ADS(): # analog to digital converter ADS1115 - def __init__(self, address=0x48, gain=2/3, data_rate=820, mode=1): - self.ads = ads.ADS1115(i2c, gain=gain, data_rate=data_rate, address=address, mode=mode) - self.gain = gain - self.data_rate = data_rate - self.mode = mode - self.pins = { - 0: self.ads.P0, - 1: self.ads.P1, - 2: self.ads.P2, - 3: self.ads.P3, - } - self.vmin = 0.01 # volts - self.vmax = 4.5 # volts - - def read_single(self, pin=0): - return AnalogIn(self.ads, self.pins[pin]).voltage - - def read_diff(self, pins='01'): - if pins == '01': - return AnalogIn(self.ads, self.ads.P0, self.ads.P1).voltage - elif pins == '23': - return AnalogIn(self.ads, self.ads.P2, self.ads.P3).voltage - - def set_gain(self, gain=2/3): - self.gain = gain - # TODO maybe there is already a set_gain() function in the library? check that - self.ads = ads.ADS1115( - i2c, gain=self.gain, data_rate=self.data_rate, - address=self.address, mode=self.mode) - - def get_best_gain(self, channel=0): - """Automatically sets the gain on a channel - - Parameters - ---------- - channel : ads.ADS1x15 - Instance of ADS where voltage is measured. - - Returns - ------- - gain : float - Gain to be applied on ADS1115. - """ - voltage = self.read_singl(channel) - gain = 2 / 3 - if (abs(voltage) < 2.040) and (abs(voltage) >= 1.023): - gain = 2 - elif (abs(voltage) < 1.023) and (abs(voltage) >= 0.508): - gain = 4 - elif (abs(voltage) < 0.508) and (abs(voltage) >= 0.250): - gain = 8 - elif abs(voltage) < 0.256: - gain = 16 - #self.exec_logger.debug(f'Setting gain to {gain}') - return gain - - def set_best_gain(self, channel=0): - gain = self.get_best_gain(channel) - self.set_gain(gain) - - -class Voltage(ADS): # for MN - def __init__(self): - super().__init__() - - def read(self, pin=0): - return self.read_single(self, pin=pin) - - def read_all(self, pins=[0, 2]): - return [self.read_single(pin) for pin in pins] - - -class Current(ADS): # for AB - def __init__(self, address=0x48, gain=2/3, data_rate=820, mode=1, r_shunt=OHMPI_CONFIG['R_shunt']): - super().__init__(address=address, gain=gain, data_rate=data_rate, mode=mode) - self.r_shunt = r_shunt - self.imin = self.vmin / (self.r_shunt * 50) - self.imax = self.vmax / (self.r_shunt * 50) - - def read(self): - U = self.read_single(pin=0) - return U / 50 / self.r_shunt - - -class Multiplexer(): - def __init__(self, addresses={ - 'A': 0x70, - 'B': 0x71, - 'M': 0x72, - 'N': 0x73 - }, - nelec=64): - #OHMPI_CONFIG['board_addresses'] - self.addresses = addresses - self.nelec = nelec # max number of electrodes per board - - def switch_one(self, elec, role, state='off'): - self.tca = adafruit_tca9548a.TCA9548A(i2c, self.addresses[role]) - # find I2C address of the electrode and corresponding relay - # considering that one MCP23017 can cover 16 electrodes - i2c_address = 7 - (elec - 1) // 16 # quotient without rest of the division - relay = (elec-1) - ((elec-1) // 16) * 16 - - if i2c_address is not None: - # select the MCP23017 of the selected MUX board - mcp = MCP23017(self.tca[i2c_address]) - mcp.get_pin(relay - 1).direction = digitalio.Direction.OUTPUT - if state == 'on': - mcp.get_pin(relay - 1).value = True - else: - mcp.get_pin(relay - 1).value = False - #exec_logger.debug(f'Switching relay {relay} ' - # f'({str(hex(self.addresses[role]))}) on:{on} for electrode {elec}') - else: - raise ValueError('No I2C address found for the electrode' - ' {:d} on board {:s}'.format(elec, self.addresses[role])) - #exec_logger.warning(f'Unable to address electrode nr {elec}') - - def switch(self, elecdic={}, state='on'): - """Switch a given list of electrodes with different roles. - Electrodes with a value of 0 will be ignored. - - Parameters - ---------- - elecdic : dictionary, optional - Dictionnary of the form: role: [list of electrodes]. - state : str, optional - Either 'on' or 'off'. - """ - # check to prevent A == B (SHORT-CIRCUIT) - if 'A' in elecdic and 'B' in elecdic: - out = np.in1d(elecdic['A'], elecdic['B']) - if out.any(): - raise ValueError('Some electrodes have A == B -> SHORT-CIRCUIT') - return - - # check none of M and N are the same A or B - # as to prevent burning the MN part which cannot take - # the full voltage of the DPS - if 'A' in elecdic and 'B' in elecdic and 'M' in elecdic and 'N' in elecdic: - if (np.in1d(elecdic['M'], elecdic['A']).any() - or np.in1d(elecdic['M'], elecdic['B']).any() - or np.in1d(elecdic['N'], elecdic['A']).any() - or np.in1d(elecdic['N'], elecdic['B']).any()): - raise ValueError('Some electrodes M and N are on A and B -> cannot be with DPS') - return - - # if all ok, then switch the electrodes - for role in elecdic: - for elec in elecdic[role]: - if elec > 0: - self.switch_one(elec, role, state) - - def reset(self): - for role in self.addresses: - for elec in range(self.nelec): - self.switch_one(elec, role, 'off') - - def test(self, role, activation_time=1): - """Interactive method to test the multiplexer. - - Parameters - ---------- - activation_time : float, optional - Time in seconds during which the relays are activated. - address : hex, optional - Address of the multiplexer board to test (e.g. 0x70, 0x71, ...). - """ - self.reset() - - # ask use some details on how to proceed - a = input('If you want try 1 channel choose 1, if you want try all channels choose 2!') - if a == '1': - print('run channel by channel test') - electrode = int(input('Choose your electrode number (integer):')) - electrodes = [electrode] - elif a == '2': - electrodes = range(1, 65) - else: - print('Wrong choice !') - return - - # run the test - for elec in electrodes: - self.switch_one(elec, role, 'on') - print('electrode:', elec, ' activated...', end='', flush=True) - time.sleep(activation_time) - self.switch_one(elec, role, 'off') - print(' deactivated') - time.sleep(activation_time) - print('Test finished.') - - - - - diff --git a/ohmpi.py b/ohmpi.py index cf8132f852fad68c7b9c21ad2e97ec426b2940ca..66f03a8f8222d2088668de0b3afc4ca3fc76668d 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -464,7 +464,7 @@ class OhmPi(object): "Ps [mV]": self._hw.sp, "nbStack": nb_stack, "Tx [V]": tx_volt, - "CPU temp [degC]": self._hw.controller.cpu_temperature, + "CPU temp [degC]": self._hw.ctl.cpu_temperature, "Nb samples [-]": len(self._hw.readings), # TODO: use only samples after a delay in each pulse "fulldata": self._hw.readings[:, [0, -2, -1]], # "I_stack [mA]": i_stack_mean, diff --git a/sw-test.py b/sw-test.py deleted file mode 100644 index 1e63137fcd0524b0fb5a335708ce0d2016bb563e..0000000000000000000000000000000000000000 --- a/sw-test.py +++ /dev/null @@ -1,7 +0,0 @@ -from sw import OhmPi -k = OhmPi(mqtt=False, onpi=True) -k.read_values() -k._compute_tx_volt() -k.load_sequence('ABMN.txt') -k.run_sequence() -k.run_multiple_sequences(nb_meas=2, sequence_delay=10) diff --git a/sw.py b/sw.py deleted file mode 100644 index a44c21902bce4b97df6c4710a8b294901bc5436b..0000000000000000000000000000000000000000 --- a/sw.py +++ /dev/null @@ -1,1116 +0,0 @@ -from hwTest import Alimentation, Current, Voltage, Multiplexer - -# -*- coding: utf-8 -*- -""" -created on January 6, 2020. -Updates dec 2022. -Hardware: Licensed under CERN-OHL-S v2 or any later version -Software: Licensed under the GNU General Public License v3.0 -Ohmpi.py is a program to control a low-cost and open hardware resistivity meter OhmPi that has been developed by -Rémi CLEMENT (INRAE), Vivien DUBOIS (INRAE), Hélène GUYARD (IGE), Nicolas FORQUET (INRAE), Yannick FARGIER (IFSTTAR) -Olivier KAUFMANN (UMONS), Arnaud WATLET (UMONS) and Guillaume BLANCHY (FNRS/ULiege). -""" - -import os -from utils import get_platform -import json -import warnings -from copy import deepcopy -import numpy as np -import csv -import time -import shutil -from datetime import datetime -from termcolor import colored -import threading -from logging_setup import setup_loggers -from config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG -from logging import DEBUG - -# finish import (done only when class is instantiated as some libs are only available on arm64 platform) -try: - from gpiozero import CPUTemperature # noqa - - arm64_imports = True -except ImportError as error: - if EXEC_LOGGING_CONFIG['logging_level'] == DEBUG: - print(colored(f'Import error: {error}', 'yellow')) - arm64_imports = False -except Exception as error: - print(colored(f'Unexpected error: {error}', 'red')) - arm64_imports = None - -class OhmPi(object): - """ OhmPi class. - """ - - def __init__(self, settings=None, sequence=None, use_mux=False, mqtt=True, onpi=None, idps=False): - """Constructs the ohmpi object - - Parameters - ---------- - settings: - - sequence: - - use_mux: - if True use the multiplexor to select active electrodes - mqtt: bool, defaut: True - if True publish on mqtt topics while logging, otherwise use other loggers only - onpi: bool,None default: None - if None, the platform on which the class is instantiated is determined to set on_pi to either True or False. - if False the behaviour of an ohmpi will be partially emulated and return random data. - idps: - if true uses the DPS - """ - - if onpi is None: - _, onpi = get_platform() - - self._sequence = sequence - self.nb_samples = 0 - self.use_mux = use_mux - self.on_pi = onpi # True if run from the RaspberryPi with the hardware, otherwise False for random data - self.status = 'idle' # either running or idle - self.thread = None # contains the handle for the thread taking the measurement - - # set loggers - config_exec_logger, _, config_data_logger, _, _, msg = setup_loggers(mqtt=mqtt) # TODO: add SOH - self.data_logger = config_data_logger - self.exec_logger = config_exec_logger - self.soh_logger = None # TODO: Implement the SOH logger - print(msg) - - - # read in hardware parameters (config.py) - self._read_hardware_config() # TODO should go to hw.py - - # default acquisition settings - self.settings = { - 'injection_duration': 0.2, - 'nb_meas': 1, - 'sequence_delay': 1, - 'nb_stack': 1, - 'export_path': 'data/measurement.csv' - } - # read in acquisition settings - if settings is not None: - self.update_settings(settings) - - self.exec_logger.debug('Initialized with settings:' + str(self.settings)) - - # read quadrupole sequence - if sequence is not None: - self.load_sequence(sequence) - - self.idps = idps # flag to use dps for injection or not - - # connect to components on the OhmPi board - if self.on_pi: - # initialize hardware - self.alim = Alimentation() - self.voltage = Voltage() - self.current = Current() - self.mux = Multiplexer() - - # set controller - self.mqtt = mqtt - self.cmd_id = None - if self.mqtt: - import paho.mqtt.client as mqtt_client - - self.exec_logger.debug(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}" - f" on {MQTT_CONTROL_CONFIG['hostname']} broker") - - def connect_mqtt() -> mqtt_client: - def on_connect(mqttclient, userdata, flags, rc): - if rc == 0: - self.exec_logger.debug(f"Successfully connected to control broker:" - f" {MQTT_CONTROL_CONFIG['hostname']}") - else: - self.exec_logger.warning(f'Failed to connect to control broker. Return code : {rc}') - - client = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) - client.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), - MQTT_CONTROL_CONFIG['auth']['password']) - client.on_connect = on_connect - client.connect(MQTT_CONTROL_CONFIG['hostname'], MQTT_CONTROL_CONFIG['port']) - return client - - try: - self.exec_logger.debug(f"Connecting to control broker: {MQTT_CONTROL_CONFIG['hostname']}") - self.controller = connect_mqtt() - except Exception as e: - self.exec_logger.debug(f'Unable to connect control broker: {e}') - self.controller = None - if self.controller is not None: - self.exec_logger.debug(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") - try: - self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos']) - - msg = f"Subscribed to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}" \ - f" on {MQTT_CONTROL_CONFIG['hostname']} broker" - self.exec_logger.debug(msg) - print(colored(f'\u2611 {msg}', 'blue')) - except Exception as e: - self.exec_logger.warning(f'Unable to subscribe to control topic : {e}') - self.controller = None - publisher_config = MQTT_CONTROL_CONFIG.copy() - publisher_config['topic'] = MQTT_CONTROL_CONFIG['ctrl_topic'] - publisher_config.pop('ctrl_topic') - - def on_message(client, userdata, message): - command = message.payload.decode('utf-8') - self.exec_logger.debug(f'Received command {command}') - self._process_commands(command) - - self.controller.on_message = on_message - else: - self.controller = None - self.exec_logger.warning('No connection to control broker.' - ' Use python/ipython to interact with OhmPi object...') - - @staticmethod - def append_and_save(filename: str, last_measurement: dict, cmd_id=None): - """Appends and saves the last measurement dict. - - Parameters - ---------- - filename : str - filename to save the last measurement dataframe - last_measurement : dict - Last measurement taken in the form of a python dictionary - cmd_id : str, optional - Unique command identifier - """ - last_measurement = deepcopy(last_measurement) - if 'fulldata' in last_measurement: - d = last_measurement['fulldata'] - n = d.shape[0] - if n > 1: - idic = dict(zip(['i' + str(i) for i in range(n)], d[:, 0])) - udic = dict(zip(['u' + str(i) for i in range(n)], d[:, 1])) - tdic = dict(zip(['t' + str(i) for i in range(n)], d[:, 2])) - last_measurement.update(idic) - last_measurement.update(udic) - last_measurement.update(tdic) - last_measurement.pop('fulldata') - - if os.path.isfile(filename): - # Load data file and append data to it - with open(filename, 'a') as f: - w = csv.DictWriter(f, last_measurement.keys()) - w.writerow(last_measurement) - # last_measurement.to_csv(f, header=False) - else: - # create data file and add headers - with open(filename, 'a') as f: - w = csv.DictWriter(f, last_measurement.keys()) - w.writeheader() - w.writerow(last_measurement) - - def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5): - """Estimates best Tx voltage based on different strategies. - At first a half-cycle is made for a short duration with a fixed - known voltage. This gives us Iab and Rab. We also measure Vmn. - A constant c = vmn/iab is computed (only depends on geometric - factor and ground resistivity, that doesn't change during a - quadrupole). Then depending on the strategy, we compute which - vab to inject to reach the minimum/maximum Iab current or - min/max Vmn. - - Parameters - ---------- - best_tx_injtime : float, optional - Time in milliseconds for the half-cycle used to compute Rab. - strategy : str, optional - Either: - - vmin : compute Vab to reach a minimum Iab and Vmn - - vmax : compute Vab to reach a maximum Iab and Vmn - - constant : apply given Vab - tx_volt : float, optional - Voltage apply to try to guess the best voltage. 5 V applied - by default. If strategy "constant" is chosen, constant voltage - to applied is "tx_volt". - - Returns - ------- - vab : float - Proposed Vab according to the given strategy. - """ - - # hardware limits - voltage_min = self.voltage.vmin # V - voltage_max = self.voltage.vmax - current_min = self.current.imin # A - current_max = self.current.imax - tx_max = 40. # volt - - # check of V - volt = tx_volt - if volt > tx_max: - self.exec_logger.warning('Sorry, cannot inject more than 40 V, set it back to 5 V') - volt = 5. - - # make sure we are not injecting - self.alim.stop_injection() - - # select a polarity to start with - self.alim.set_polarity(True) - - # set voltage for test - self.alim.turn_on() - self.alim.set_tx_voltage(volt) - time.sleep(best_tx_injtime) # inject for given tx time - self.alim.start_injection() - - # autogain: set best gain - self.current.set_best_gain() - self.voltage.set_best_gain() - - # we measure the voltage on both A0 and A2 to guess the polarity - values = self.read_values(duration=0.1) - self.alim.stop_injection() - iab = values[-1, 1] - vmn = values[-1, 2] - - # compute constant - c = vmn / iab - Rab = (volt * 1000.) / iab # noqa - self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') - - # implement different strategies - if strategy == 'vmax': - vmn_max = c * current_max - if voltage_max > vmn_max > voltage_min: - vab = current_max * Rab - self.exec_logger.debug('target max current') - else: - iab = voltage_max / c - vab = iab * Rab - self.exec_logger.debug('target max voltage') - if vab > 25.: - vab = 25. - vab = vab * 0.9 - - elif strategy == 'vmin': - vmn_min = c * current_min - if voltage_min < vmn_min < voltage_max: - vab = current_min * Rab - self.exec_logger.debug('target min current') - else: - iab = voltage_min / c - vab = iab * Rab - self.exec_logger.debug('target min voltage') - if vab < 1.: - vab = 1. - vab = vab * 1.1 - - elif strategy == 'constant': - vab = volt - else: - vab = 5 - - return vab - - def get_data(self, survey_names=None, cmd_id=None): - """Get available data. - - Parameters - ---------- - survey_names : list of str, optional - List of filenames already available from the html interface. So - their content won't be returned again. Only files not in the list - will be read. - cmd_id : str, optional - Unique command identifier - """ - # get all .csv file in data folder - if survey_names is None: - survey_names = [] - fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] - ddic = {} - if cmd_id is None: - cmd_id = 'unknown' - for fname in fnames: - if ((fname != 'readme.txt') - and ('_rs' not in fname) - and (fname.replace('.csv', '') not in survey_names)): - try: - data = np.loadtxt('data/' + fname, delimiter=',', - skiprows=1, usecols=(1, 2, 3, 4, 8)) - data = data[None, :] if len(data.shape) == 1 else data - ddic[fname.replace('.csv', '')] = { - 'a': data[:, 0].astype(int).tolist(), - 'b': data[:, 1].astype(int).tolist(), - 'm': data[:, 2].astype(int).tolist(), - 'n': data[:, 3].astype(int).tolist(), - 'rho': data[:, 4].tolist(), - } - except Exception as e: - print(fname, ':', e) - rdic = {'cmd_id': cmd_id, 'data': ddic} - self.data_logger.info(json.dumps(rdic)) - return ddic - - def interrupt(self, cmd_id=None): - """Interrupts the acquisition when launched in async mode. - - Parameters - ---------- - cmd_id : str, optional - Unique command identifier - """ - self.status = 'stopping' - if self.thread is not None: - self.thread.join() - self.exec_logger.debug('Interrupted sequence acquisition...') - else: - self.exec_logger.debug('No sequence measurement thread to interrupt.') - self.exec_logger.debug(f'Status: {self.status}') - - def load_sequence(self, filename: str, cmd_id=None): - """Reads quadrupole sequence from file. - - Parameters - ---------- - filename : str - Path of the .csv or .txt file with A, B, M and N electrodes. - Electrode index start at 1. - cmd_id : str, optional - Unique command identifier - - Returns - ------- - sequence : numpy.array - Array of shape (number quadrupoles * 4). - """ - self.exec_logger.debug(f'Loading sequence {filename}') - try: - sequence = np.loadtxt(filename, delimiter=" ", dtype=np.uint32) # load quadrupole file - self.exec_logger.debug(f'Sequence of {sequence.shape[0]:d} quadrupoles read.') - self.set_sequence(sequence) - except Exception as e: - self.exec_logger.debug('ERROR in load_sequence(): ' + str(e)) - - if sequence is not None: - self.exec_logger.info(f'Sequence {filename} of {sequence.shape[0]:d} quadrupoles loaded.') - else: - self.exec_logger.warning(f'Unable to load sequence {filename}') - - def set_sequence(self, sequence): - """Set a sequence of quadrupoles. - - Parameters - ---------- - sequence : list of list or np.array - 2D array with 1 row per quadrupole. - """ - # locate lines where the electrode index exceeds the maximum number of electrodes - test_index_elec = np.array(np.where(sequence > self.max_elec)) - - # reshape in case we have a 1D array (=1 quadrupole) - if len(sequence.shape) == 1: - sequence = sequence[None, :] - - # test for elec A == B - test_same_elec = np.where(sequence[:, 0] == sequence[:, 1])[0] - ok = True - - # if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2) - if test_index_elec.size != 0: - for i in range(len(test_index_elec[0, :])): - ok = False - self.exec_logger.error(f'An electrode index at line {str(test_index_elec[0, i] + 1)} ' - f'exceeds the maximum number of electrodes') - # sys.exit(1) - sequence = None - if len(test_same_elec) != 0: - for i in range(len(test_same_elec)): - ok = False - self.exec_logger.error(f'An electrode index A == B detected at line {str(test_same_elec[i] + 1)}') - # sys.exit(1) - sequence = None - - # set sequence attribute - if ok: - self.sequence = sequence - else: - self.exec_logger.error('Unable to set sequence. Fix sequence first.') - - - def _process_commands(self, message: str): - """Processes commands received from the controller(s) - - Parameters - ---------- - message : str - message containing a command and arguments or keywords and arguments - """ - status = False - cmd_id = '?' - try: - decoded_message = json.loads(message) - self.exec_logger.debug(f'Decoded message {decoded_message}') - cmd_id = decoded_message.pop('cmd_id', None) - cmd = decoded_message.pop('cmd', None) - kwargs = decoded_message.pop('kwargs', None) - self.exec_logger.debug(f"Calling method {cmd}({str(kwargs) if kwargs is not None else ''})") - if cmd_id is None: - self.exec_logger.warning('You should use a unique identifier for cmd_id') - if cmd is not None: - try: - if kwargs is None: - output = getattr(self, cmd)() - else: - output = getattr(self, cmd)(**kwargs) - status = True - except Exception as e: - self.exec_logger.error( - f"Unable to execute {cmd}({str(kwargs) if kwargs is not None else ''}): {e}") - status = False - except Exception as e: - self.exec_logger.warning(f'Unable to decode command {message}: {e}') - status = False - finally: - reply = {'cmd_id': cmd_id, 'status': status} - reply = json.dumps(reply) - self.exec_logger.debug(f'Execution report: {reply}') - - def quit(self, cmd_id=None): - """Quits OhmPi - - Parameters - ---------- - cmd_id : str, optional - Unique command identifier - """ - self.exec_logger.debug(f'Quitting ohmpi.py following command {cmd_id}') - exit() - - def _read_hardware_config(self): - """Reads hardware configuration from config.py - """ - self.exec_logger.debug('Getting hardware config') - self.id = OHMPI_CONFIG['id'] # ID of the OhmPi - self.Imax = OHMPI_CONFIG['Imax'] # maximum current - self.exec_logger.debug(f'The maximum current cannot be higher than {self.Imax} mA') - self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ads.P2, measurement in V/V - self.nb_samples = OHMPI_CONFIG['nb_samples'] # number of samples measured for each stack - self.version = OHMPI_CONFIG['version'] # hardware version - self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes - self.board_version = OHMPI_CONFIG['board_version'] - self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') - - def remove_data(self, cmd_id=None): - """Remove all data in the data folder - - Parameters - ---------- - cmd_id : str, optional - Unique command identifier - """ - self.exec_logger.debug(f'Removing all data following command {cmd_id}') - shutil.rmtree('data') - os.mkdir('data') - - def restart(self, cmd_id=None): - """Restarts the Raspberry Pi - - Parameters - ---------- - cmd_id : str, optional - Unique command identifier - """ - - if self.on_pi: - self.exec_logger.info(f'Restarting pi following command {cmd_id}...') - os.system('reboot') - else: - self.exec_logger.warning('Not on Raspberry Pi, skipping reboot...') - - def set_best_gain(self): - """Set best gain.""" - self.current.set_best_gain() - gain0 = self.voltage.get_best_gain(channel=0) - gain2 = self.voltage.get_best_gain(channel=2) - self.voltage.set_gain(np.min([gain0, gain2])) - - def read_values(self, duration=0.2, sampling=0.01): - """Read voltage during a given time for current and voltage ADS. - - Parameters - ---------- - duration : int, optional - Time in seconds to monitor the voltage. - sampling : int, optional - Time between two samples in seconds. - - Returns - ------- - meas : numpy.array - Array with first column time in ms from start, - second column, current in mA, then voltage in mV - from the different channels. - """ - # compute maximum number of samples possible - # we probably harvest less samples but like this - # we can already allocated the array and that makes - # the collection faster - nsamples = int((int(duration * 1000) // sampling) + 1) - - # measurement of current i and voltage u during injection - nchannel = len(self.voltage.read_all()) - meas = np.zeros((nsamples, 2 + nchannel)) * np.nan - start_time = time.time() # stating measurement time - elapsed = 0 - for i in range(0, nsamples): - # reading current value on ADS channels - elapsed = time.time() - start_time # real injection time (s) - if elapsed >= (duration): - break - meas[i, 0] = elapsed - meas[i, 1] = self.current.read() - meas[i, 2:] = self.voltage.read_all() - time.sleep(sampling) - - return meas[:i-1, :] - - - def run_measurement(self, quad=[0, 0, 0, 0], nb_stack=None, injection_duration=None, - autogain=True, strategy='constant', tx_volt=5, best_tx_injtime=0.1, - duty=1, cmd_id=None): - """Measures on a quadrupole and returns transfer resistance. - - Parameters - ---------- - quad : iterable (list of int) - Quadrupole to measure, just for labelling. Only switch_mux_on/off - really creates the route to the electrodes. - nb_stack : int, optional - Number of stacks. A stacl is considered two half-cycles (one - positive, one negative). - injection_duration : int, optional - Injection time in seconds. - autogain : bool, optional - If True, will adapt the gain of the ADS1115 to maximize the - resolution of the reading. - strategy : str, optional - (V3.0 only) If we search for best voltage (tx_volt == 0), we can choose - different strategy: - - vmin: find the lowest voltage that gives us a signal - - vmax: find the highest voltage that stays in the range - For a constant value, just set the tx_volt. - tx_volt : float, optional - (V3.0 only) If specified, voltage will be imposed. If 0, we will look - for the best voltage. If the best Tx cannot be found, no - measurement will be taken and values will be NaN. - best_tx_injtime : float, optional - (V3.0 only) Injection time in seconds used for finding the best voltage. - duty : float, optional - Proportion of time spent on injection vs no injection time. - cmd_id : str, optional - Command ID. - """ - self.exec_logger.debug('Starting measurement') - - if nb_stack is None: - nb_stack = self.settings['nb_stack'] - if injection_duration is None: - injection_duration = self.settings['injection_duration'] - tx_volt = float(tx_volt) - - # let's define the pin again as if we run through measure() - # as it's run in another thread, it doesn't consider these - # and this can lead to short circuit! - self.alim = Alimentation() # TODO carefully test that - self.alim.stop_injection() - - # get best voltage to inject AND polarity - if self.idps: - tx_volt, polarity = self._compute_tx_volt( - best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) - self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V') - - # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) - self.current.set_gain(2/3) - self.voltage.set_gain(2/3) - - # turn on the power supply - if self.alim.on == False: - self.alim.turn_on() - self.alim.set_tx_voltage(tx_volt) - time.sleep(0.05) # let it time to reach tx_volt - - if tx_volt > 0: # we found a Vab in the range so we measure - # find best gain during injection - if autogain: - self.alim.start_injection() - time.sleep(injection_duration) - self.set_best_gain() - self.alim.stop_injection() - - # make sure we are not injecting - self.alim.stop_injection() - - # full data for waveform - fulldata = [] - - # we sample every 10 ms (as using AnalogIn for both current - # and voltage takes about 7 ms). When we go over the injection - # duration, we break the loop and truncate the meas arrays - # only the last values in meas will be taken into account - start_delay = time.time() - injtimes = np.zeros(nb_stack * 2) - for n in range(0, nb_stack * 2): # for each half-cycles - # current injection polarity - if (n % 2) == 0: - self.alim.set_polarity(True) - else: - self.alim.set_polarity(False) - self.alim.start_injection() - - # reading voltages and currents - elapsed = time.time() - start_delay - values = self.read_values(duration=injection_duration) - injtimes[n] = values[-1, 0] - values[:, 0] += elapsed - fulldata.append(values) - - # stop current injection - self.alim.stop_injection() - - # waiting time (no injection) before next half-cycle - if duty < 1: - duration = injection_duration * (1 - duty) - elapsed = time.time() - start_delay - values = self.read_values(duration=duration) - values[:, 0] += elapsed - fulldata.append(values) - else: - fulldata.append(np.array([[], [], [], []]).T) - - # TODO get battery voltage and warn if battery is running low - # TODO send a message on SOH stating the battery level - - # let's do some calculation (out of the stacking loop) - stacks = np.zeros((len(fulldata) // 2, fulldata[0].shape[1]-1)) - - # define number of sample to average for the injection half-cycle - n2avg = int(fulldata[0].shape[0] // 3) - - # compute average for the injection half-cycle - for n, meas in enumerate(fulldata[::2]): - stacks[n, :] = np.mean(meas[-n2avg:, 1:], axis=0) - - # identify which of U0 or U2 is on top - a = 1 - b = 0 - if stacks[0, 1] > stacks[0, 2]: - a = 0 - b = 1 - - # compute average vmn and i - iab = np.mean(stacks[:, 1]) - vmn = np.mean(stacks[a::2, 1] + stacks[b::2, 2]) - - # self-potential estimated during on-time - spon = np.mean(stacks[a::2, 1] - stacks[b::2, 2]) - - # remove the average sp computed on injection half-cycle - vmn = vmn - spon - - # compute average self potential between injection half-cycle - if duty < 1: - spoff = 0 - n2avg = int(fulldata[0].shape[0] // 3) - for n, meas in enumerate(fulldata[1::2]): - spoff += np.mean(meas[-n2avg:, 2]) - spoff = spoff / len(fulldata) * 2 - else: - iab = np.nan - vmn = np.nan - spon = np.nan - fulldata = None - - # set a low voltage for safety - self.alim.set_tx_voltage(12) - - # reshape full data to an array of good size - # we need an array of regular size to save in the csv - if tx_volt > 0: # TODO what if have different array size? - for a in fulldata: - print(a.shape) - fulldata = np.vstack(fulldata) - # we create a big enough array given nb_samples, number of - # half-cycles (1 stack = 2 half-cycles), and twice as we - # measure decay as well - nsamples = int((int(injection_duration * 1000) / duty) // 0.01 + 1) - a = np.zeros((nb_stack * nsamples * 2, fulldata.shape[1])) * np.nan - a[:fulldata.shape[0], :] = fulldata - fulldata = a - - # create a dictionary and compute averaged values from all stacks - d = { - "time": datetime.now().isoformat(), - "A": quad[0], - "B": quad[1], - "M": quad[2], - "N": quad[3], - "injtime [ms]": np.mean(injtimes), - "Vmn [mV]": vmn, - "I [mA]": iab, - "R [ohm]": vmn/iab, - "Ps [mV]": spon, - "nbStack": nb_stack, - "tmp [degC]": CPUTemperature().temperature if arm64_imports else -10, - "Nb samples [-]": n2avg, - "stacks": stacks, - "fulldata": fulldata, - } - - # to the data logger - dd = d.copy() - dd.pop('fulldata') # too much for logger - dd.update({'A': str(dd['A'])}) - dd.update({'B': str(dd['B'])}) - dd.update({'M': str(dd['M'])}) - dd.update({'N': str(dd['N'])}) - - # round float to 2 decimal - for key in dd.keys(): - if isinstance(dd[key], float): - dd[key] = np.round(dd[key], 3) - - dd['cmd_id'] = str(cmd_id) - self.data_logger.info(dd) - - return d - - def run_sequence(self, cmd_id=None, **kwargs): - """Runs sequence synchronously (=blocking on main thread). - Additional arguments are passed to run_measurement(). - - Parameters - ---------- - cmd_id : str, optional - Unique command identifier. - kwargs : optional - Optional keyword arguments passed to run_measurement(). - See help(OhmPi.run_measurement). - """ - self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - - # create filename with timestamp - filename = self.settings["export_path"].replace('.csv', - f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') - self.exec_logger.debug(f'Saving to {filename}') - - # make sure all multiplexer are off - self.mux.reset() - - # measure all quadrupole of the sequence - if self.sequence is None: - seq = np.array([[0, 0, 0, 0]]) - else: - seq = self.sequence.copy() - for i in range(0, seq.shape[0]): - quad = seq[i, :] - if self.status == 'stopping': - break - - # call the switch_mux function to switch to the right electrodes - self.switch_mux_on(quad) - - # run a measurement - acquired_data = self.run_measurement(quad, **kwargs) - self.data_logger.info(acquired_data) - - # switch mux off - self.switch_mux_off(quad) - - # add command_id in dataset - acquired_data.update({'cmd_id': cmd_id}) - - # log data to the data logger - # self.data_logger.info(f'{acquired_data}') - # save data and print in a text file - self.append_and_save(filename, acquired_data) - self.exec_logger.debug(f'quadrupole {i + 1:d}/{seq.shape[0]:d}') - - self.status = 'idle' - - def run_sequence_async(self, cmd_id=None, **kwargs): - """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. - Additional arguments are passed to run_measurement(). - - Parameters - ---------- - cmd_id : str, optional - Unique command identifier - """ - def func(): - self.run_sequence(**kwargs) - - self.thread = threading.Thread(target=func) - self.thread.start() - self.status = 'idle' - - def run_multiple_sequences(self, sequence_delay=None, nb_meas=None, cmd_id=None, **kwargs): - """Runs multiple sequences in a separate thread for monitoring mode. - Can be stopped by 'OhmPi.interrupt()'. - Additional arguments are passed to run_measurement(). - - Parameters - ---------- - cmd_id : str, optional - Unique command identifier - sequence_delay : int, optional - Number of seconds at which the sequence must be started from each others. - nb_meas : int, optional - Number of time the sequence must be repeated. - kwargs : dict, optional - See help(k.run_measurement) for more info. - """ - # self.run = True - if sequence_delay is None: - sequence_delay = self.settings['sequence_delay'] - sequence_delay = int(sequence_delay) - if nb_meas is None: - nb_meas = self.settings['nb_meas'] - self.status = 'running' - self.exec_logger.debug(f'Status: {self.status}') - self.exec_logger.debug(f'Measuring sequence: {self.sequence}') - - def func(): - for g in range(0, nb_meas): # for time-lapse monitoring - if self.status == 'stopping': - self.exec_logger.warning('Data acquisition interrupted') - break - t0 = time.time() - self.run_sequence(**kwargs) - - # sleeping time between sequence - dt = sequence_delay - (time.time() - t0) - if dt < 0: - dt = 0 - if nb_meas > 1: - time.sleep(dt) # waiting for next measurement (time-lapse) - self.status = 'idle' - - self.thread = threading.Thread(target=func) - self.thread.start() - - def _quad2qdic(self, quad): - """Convert a quadrupole to a more flexible qdic - of format {'A': [1], 'B': [2], 'M': [3], 'N': [4]}. - This format enable to inject at several electrodes and - is more flexible for multichannelling (we can add M1, N1, ...). - - Parameters - ---------- - quad : list of int, - List of quadrupoles. Electrodes equal to 0 are ignored. - - Returns - ------- - Dictionnary in the form: {role: [list of electrodes]}. - """ - return dict(zip(['A', 'B', 'M', 'N'], [[a] for a in quad if a > 0])) - - def switch_mux_on(self, quad): - """"Switch quadrupoles on. - - Parameters - ---------- - quad : list of int, - List of quadrupoles. Electrodes equal to 0 are ignored. - """ - qdic = self._quad2qdic(quad) - self.mux.switch(qdic, 'on') - - def switch_mux_off(self, quad): - """Switch quadrupoles off. - - Parameters - ---------- - quad : list of int, - List of quadrupoles. Electrodes equal to 0 are ignored. - """ - qdic = self._quad2qdic(quad) - self.mux.switch(qdic, 'off') - - def reset_mux(self): - """Reset the mux, make sure all relays are off. - """ - self.mux.reset() - - def rs_check(self, tx_volt=12., cmd_id=None): - """Checks contact resistances. - - Parameters - ---------- - tx_volt : float, optional - Voltage of the injection. - cmd_id : str, optional - Unique command identifier. - """ - # create custom sequence where MN == AB - # we only check the electrodes which are in the sequence (not all might be connected) - if self.sequence is None: - quads = np.array([[1, 2, 0, 0]], dtype=np.uint32) - else: - elec = np.sort(np.unique(self.sequence.flatten())) # assumed order - quads = np.vstack([ - elec[:-1], - elec[1:], - np.zeros(len(elec)-1), - np.zeros(len(elec)-1) - ]).T - - # create filename to store RS - export_path_rs = self.settings['export_path'].replace('.csv', '') \ - + '_' + datetime.now().strftime('%Y%m%dT%H%M%S') + '_rs.csv' - - # perform RS check - self.status = 'running' - - # make sure all mux are off to start with - self.mux.reset() - - # turn on alim - self.alim.turn_on() - self.alim.set_tx_voltage(tx_volt) - - # measure all quad of the RS sequence - for i in range(0, quads.shape[0]): - quad = quads[i, :] # quadrupole - self.switch_mux_on(quad) # put before raising the pins (otherwise conflict i2c) - d = self.run_measurement( - quad=quad, nb_stack=1, injection_duration=0.2, - tx_volt=tx_volt, autogain=False) - self.switch_mux_off(quad) - - voltage = d['Tx [V]'] - current = d['I [mA]'] / 1000. - - # compute resistance measured (= contact resistance) - resist = abs(voltage / current) / 1000. - # print(str(quad) + '> I: {:>10.3f} mA, V: {:>10.3f} mV, R: {:>10.3f} kOhm'.format( - # current, voltage, resist)) - msg = f'Contact resistance {str(quad):s}: I: {current * 1000.:>10.3f} mA, ' \ - f'V: {voltage :>10.3f} mV, ' \ - f'R: {resist :>10.3f} kOhm' - - self.exec_logger.debug(msg) - - # if contact resistance = 0 -> we have a short circuit!! - if resist < 1e-5: - msg = f'!!!SHORT CIRCUIT!!! {str(quad):s}: {resist:.3f} kOhm' - self.exec_logger.warning(msg) - - # save data in a text file - self.append_and_save(export_path_rs, { - 'A': quad[0], - 'B': quad[1], - 'RS [kOhm]': resist, - }) - - self.alim.turn_off() - self.status = 'idle' - - def update_settings(self, settings: str, cmd_id=None): - """Updates acquisition settings from a json file or dictionary. - Parameters can be: - - nb_electrodes (number of electrode used, if 4, no MUX needed) - - injection_duration (in seconds) - - nb_meas (total number of times the sequence will be run) - - sequence_delay (delay in second between each sequence run) - - nb_stack (number of stack for each quadrupole measurement) - - export_path (path where to export the data, timestamp will be added to filename) - - Parameters - ---------- - settings : str, dict - Path to the .json settings file or dictionary of settings. - cmd_id : str, optional - Unique command identifier - """ - status = False - if settings is not None: - try: - if isinstance(settings, dict): - self.settings.update(settings) - else: - with open(settings) as json_file: - dic = json.load(json_file) - self.settings.update(dic) - self.exec_logger.debug('Acquisition parameters updated: ' + str(self.settings)) - status = True - except Exception as e: # noqa - self.exec_logger.warning('Unable to update settings.') - status = False - else: - self.exec_logger.warning('Settings are missing...') - return status - - def stop(self, **kwargs): - warnings.warn('This function is deprecated. Use interrupt instead.', DeprecationWarning) - self.interrupt(**kwargs) - - def _update_acquisition_settings(self, config): - warnings.warn('This function is deprecated, use update_settings() instead.', DeprecationWarning) - self.update_settings(settings=config) - - # Properties - @property - def sequence(self): - """Gets sequence""" - if self._sequence is not None: - assert isinstance(self._sequence, np.ndarray) - return self._sequence - - # TODO not sure if the below is still needed now we have a - # method set_sequence() - @sequence.setter - def sequence(self, sequence): - """Sets sequence""" - if sequence is not None: - assert isinstance(sequence, np.ndarray) - self.use_mux = True - else: - self.use_mux = False - self._sequence = sequence - - -VERSION = '3.0.0' - -print(colored(r' ________________________________' + '\n' + - r'| _ | | | || \/ || ___ \_ _|' + '\n' + - r'| | | | |_| || . . || |_/ / | |' + '\n' + - r'| | | | _ || |\/| || __/ | |' + '\n' + - r'\ \_/ / | | || | | || | _| |_' + '\n' + - r' \___/\_| |_/\_| |_/\_| \___/ ', 'red')) -print('Version:', VERSION) -platform, on_pi = get_platform() - -if on_pi: - print(colored(f'\u2611 Running on {platform} platform', 'green')) - # TODO: check model for compatible platforms (exclude Raspberry Pi versions that are not supported...) - # and emit a warning otherwise - if not arm64_imports: - print(colored(f'Warning: Required packages are missing.\n' - f'Please run ./env.sh at command prompt to update your virtual environment\n', 'yellow')) -else: - print(colored(f'\u26A0 Not running on the Raspberry Pi platform.\nFor simulation purposes only...', 'yellow')) - -current_time = datetime.now() -print(f'local date and time : {current_time.strftime("%Y-%m-%d %H:%M:%S")}') - -# for testing -if __name__ == "__main__": - ohmpi = OhmPi(settings=OHMPI_CONFIG['settings']) - if ohmpi.controller is not None: - ohmpi.controller.loop_forever() diff --git a/test_measure_with_ohmpi_card_3_15.py b/test_measure_with_ohmpi_card_3_15.py index e9251e5b4f252cf215b6ccde2b00a85ae0d375d2..3c3b54b9b7f9b89fd78e74b78511794553c644f8 100644 --- a/test_measure_with_ohmpi_card_3_15.py +++ b/test_measure_with_ohmpi_card_3_15.py @@ -1,5 +1,3 @@ -import time - import numpy as np import logging import matplotlib.pyplot as plt diff --git a/test_modif.py b/test_modif.py deleted file mode 100644 index 78b2c719d213a03f5c6a8e185cb53b17592f8e42..0000000000000000000000000000000000000000 --- a/test_modif.py +++ /dev/null @@ -1,110 +0,0 @@ -from ohmpi import OhmPi -import matplotlib.pyplot as plt -import numpy as np - -# a = np.arange(13) + 1 -# b = a + 3 -# m = a + 1 -# n = a + 2 -# seq = np.c_[a, b, m, n] - -k = OhmPi(idps=True) -k.settings['injection_duration'] = 1 -k.settings['nbr_meas'] = 1 -#k.sequence = seq -#k.reset_mux() -#k.switch_mux_on([1, 4, 2, 3]) -#k.switch_mux_on([12, 15, 13, 14]) -#k.measure(strategy='vmax') -#print('vab', k.compute_tx_volt(strategy='vmin')) -#k.rs_check() - -R1=11.5 #sol -R2=200 # contact - - - -out = k.run_measurement(quad=[1, 2, 3, 4], nb_stack=2, tx_volt=2, strategy='vmax', autogain=True) -print(k.sequence) - -data = out['fulldata'] -inan = ~np.isnan(data[:,0]) -print(['R1:',R1,' ','R2:',R2,' ', out['R [ohm]'],out['Vmn [mV]'],out['I [mA]'],out['Ps [mV]'],out['nbStack'],out['Tx [V]']]) -f=open(r'data_goog.txt','a+') -f.write("\n") -f.write('R1:'+';'+ str(R1)+';'+'R2:'+';'+str(R2)+';'+ str(out['R [ohm]'])+';' + str(out['Vmn [mV]'])+';' + str(out['I [mA]'])+';'+ str(out['Ps [mV]'])+';'+ str(out['nbStack'])+';'+ str(out['Tx [V]'])) -f.close() - -k.append_and_save('out_test_qualite.csv', out) - -# out = k.run_measurement(quad=[1, 2, 3, 4], nb_stack=2, tx_volt=2, strategy='vmin', autogain=True) -# -# data = out['fulldata'] -# inan = ~np.isnan(data[:,0]) -# print(out['R [ohm]']) -# k.append_and_save('out_test_qualite.csv', out) -# -# out = k.run_measurement(quad=[1, 2, 3, 4], nb_stack=2, tx_volt=5, strategy='constant', autogain=True) -# -# data = out['fulldata'] -# inan = ~np.isnan(data[:,0]) -# print(out['R [ohm]']) -# k.append_and_save('out_test_qualite.csv', out) - -if True: - fig, axs = plt.subplots(2, 1, sharex=True) - ax = axs[0] - ax.plot(data[inan,2], data[inan,0], 'r.-', label='current [mA]') - ax.set_ylabel('Current AB [mA]') - ax = axs[1] - ax.plot(data[inan,2], data[inan,1], '.-', label='voltage [mV]') - ax.set_ylabel('Voltage MN [mV]') - ax.set_xlabel('Time [s]') - plt.show() - -# fig,ax=plt.subplots() -# -# -# ax.plot(data[inan,2], data[inan,0], label='current [mA]', marker="o") -# ax2=ax.twinx() -# ax2.plot(data[inan,2], data[inan,1],'r.-' , label='current [mV]') -# ax2.set_ylabel('Voltage [mV]', color='r') -# ymin=-50 -# ymax=50 -# ymin1=-4500 -# ymax1= 4500 -# ax.set_ylim([ymin,ymax]) -# ax2.set_ylim([ymin1,ymax1]) -# -# plt.show() - - - -if False: - from numpy.fft import fft, ifft - - x = data[inan, 1][10:300] - t = np.linspace(0, len(x)*4, len(x)) - sr = 1/0.004 - - X = fft(x) - N = len(X) - n = np.arange(N) - T = N/sr - freq = n/T - - plt.figure(figsize = (12, 6)) - plt.subplot(121) - - plt.stem(freq, np.abs(X), 'b', \ - markerfmt=" ", basefmt="-b") - plt.xlabel('Freq (Hz)') - plt.ylabel('FFT Amplitude |X(freq)|') - #plt.xlim(0, 10) - - plt.subplot(122) - plt.plot(t, ifft(X), 'r') - plt.xlabel('Time (s)') - plt.ylabel('Amplitude') - plt.tight_layout() - plt.show() diff --git a/test_mux_2024.py b/test_mux_2024.py index 4d14cdb07315390d9f8f6c1462dacce53e5fd778..3a05601397f76422d8d75d86b6dcbb108a886e60 100644 --- a/test_mux_2024.py +++ b/test_mux_2024.py @@ -3,14 +3,14 @@ from utils import change_config import logging change_config('config_mb_2023_mux_2024.py', verbose=False) from OhmPi.hardware_components.mux_2024_rev_0_0 import Mux, MUX_CONFIG -from OhmPi.hardware_components import raspberry_pi_i2c as controller_module +from OhmPi.hardware_components import raspberry_pi_i2c as ctl_module stand_alone_mux = False part_of_hardware_system = False within_ohmpi = True # Stand alone mux if stand_alone_mux: - MUX_CONFIG['controller'] = controller_module.Controller() + MUX_CONFIG['ctl'] = ctl_module.Ctl() MUX_CONFIG['id'] = 'mux_1' MUX_CONFIG['cabling'] = {(i+8, j) : ('mux_1', i) for j in ['A', 'B', 'M', 'N'] for i in range(1,9)} mux = Mux(**MUX_CONFIG)