Commit 9c649eeb authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

Update setup_config.py

Showing with 262 additions and 5 deletions
+262 -5
import logging
from ohmpi.utils import get_platform
from paho.mqtt.client import MQTTv31 # noqa
_, on_pi = get_platform()
# DEFINE THE ID OF YOUR OhmPi
ohmpi_id = '0001' if on_pi else 'XXXX'
# DEFINE YOUR MQTT BROKER (DEFAULT: 'localhost')
mqtt_broker = 'localhost' if on_pi else 'NAME_YOUR_BROKER_WHEN_IN_SIMULATION_MODE_HERE'
# DEFINE THE SUFFIX TO ADD TO YOUR LOGS FILES
logging_suffix = ''
# OhmPi configuration
OHMPI_CONFIG = {
'id': ohmpi_id, # Unique identifier of the OhmPi board (string)
'settings': 'ohmpi_settings.json', # INSERT YOUR FAVORITE SETTINGS FILE HERE
}
r_shunt = 2.
HARDWARE_CONFIG = {
'ctl': {'model': 'raspberry_pi'},
'pwr': {'model': 'pwr_batt', 'voltage': 12., 'interface_name': 'none'},
'tx': {'model': 'mb_2024_0_2',
'voltage_max': 50., # Maximum voltage supported by the TX board [V]
'current_max': 4.80/(50*r_shunt), # Maximum voltage read by the current ADC on the TX board [A]
'r_shunt': r_shunt, # Shunt resistance in Ohms
'interface_name': 'i2c'
},
'rx': {'model': 'mb_2024_0_2',
'latency': 0.010, # latency in seconds in continuous mode
'sampling_rate': 50, # number of samples per second
'interface_name': 'i2c'
},
'mux': {'boards':
{'mux_A':
{'model': 'mux_2023_0_X',
'mux_tca_address': 0x70,
'roles': {'A': 'X'},
'cabling': {(i, j): ('mux_A', i) for j in ['A'] for i in range(1, 65)},
'voltage_max': 12.},
'mux_B':
{'model': 'mux_2023_0_X',
'mux_tca_address': 0x71,
'roles': {'B': 'X'},
'cabling': {(i, j): ('mux_B', i) for j in ['B'] for i in range(1, 65)},
'voltage_max': 12.},
'mux_M':
{'model': 'mux_2023_0_X',
'mux_tca_address': 0x72,
'roles': {'M': 'X'},
'cabling': {(i, j): ('mux_M', i) for j in ['M'] for i in range(1, 65)},
'voltage_max': 12.},
'mux_N':
{'model': 'mux_2023_0_X',
'mux_tca_address': 0x73,
'roles': {'N': 'X'},
'cabling': {(i, j): ('mux_N', i) for j in ['N'] for i in range(1, 65)},
'voltage_max': 12.},
},
'default': {'interface_name': 'i2c_ext',
'voltage_max': 100.,
'current_max': 3.}
}
}
# SET THE LOGGING LEVELS, MQTT BROKERS AND MQTT OPTIONS ACCORDING TO YOUR NEEDS
# Execution logging configuration
EXEC_LOGGING_CONFIG = {
'logging_level': logging.INFO,
'log_file_logging_level': logging.DEBUG,
'logging_to_console': True,
'file_name': f'exec{logging_suffix}.log',
'max_bytes': 262144,
'backup_count': 30,
'when': 'd',
'interval': 1
}
# Data logging configuration
DATA_LOGGING_CONFIG = {
'logging_level': logging.INFO,
'logging_to_console': True,
'file_name': f'data{logging_suffix}.log',
'max_bytes': 16777216,
'backup_count': 1024,
'when': 'd',
'interval': 1
}
# State of Health logging configuration (For a future release)
SOH_LOGGING_CONFIG = {
'logging_level': logging.INFO,
'logging_to_console': True,
'log_file_logging_level': logging.DEBUG,
'file_name': f'soh{logging_suffix}.log',
'max_bytes': 16777216,
'backup_count': 1024,
'when': 'd',
'interval': 1
}
# MQTT logging configuration parameters
MQTT_LOGGING_CONFIG = {
'hostname': mqtt_broker,
'port': 1883,
'qos': 2,
'retain': False,
'keepalive': 60,
'will': None,
'auth': {'username': 'mqtt_user', 'password': 'mqtt_password'},
'tls': None,
'protocol': MQTTv31,
'transport': 'tcp',
'client_id': f'{OHMPI_CONFIG["id"]}',
'exec_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/exec',
'exec_logging_level': logging.DEBUG,
'data_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/data',
'data_logging_level': DATA_LOGGING_CONFIG['logging_level'],
'soh_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/soh',
'soh_logging_level': SOH_LOGGING_CONFIG['logging_level']
}
# MQTT control configuration parameters
MQTT_CONTROL_CONFIG = {
'hostname': mqtt_broker,
'port': 1883,
'qos': 2,
'retain': False,
'keepalive': 60,
'will': None,
'auth': {'username': 'mqtt_user', 'password': 'mqtt_password'},
'tls': None,
'protocol': MQTTv31,
'transport': 'tcp',
'client_id': f'{OHMPI_CONFIG["id"]}',
'ctrl_topic': f'ohmpi_{OHMPI_CONFIG["id"]}/ctrl'
}
...@@ -77,7 +77,7 @@ if within_ohmpi: ...@@ -77,7 +77,7 @@ if within_ohmpi:
print(d) print(d)
#k._hw._plot_readings() #k._hw._plot_readings()
print(f'OhmPiHardware: Resistance: {k._hw.last_resistance() :.2f} ohm, dev. {k._hw.last_dev():.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV') print(f'OhmPiHardware: Resistance: {k._hw.last_resistance() :.2f} ohm, dev. {k._hw.last_dev():.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV')
print(f'OhmPi: Resistance: {d["R [ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV') # print(f'OhmPi: Resistance: {d["R [ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
k._hw._plot_readings(save_fig=False) k._hw._plot_readings(save_fig=False)
# plot_exec_log('ohmpi/logs/exec.log') # plot_exec_log('ohmpi/logs/exec.log')
change_config('../configs/config_default.py', verbose=False) change_config('../configs/config_default.py', verbose=False)
import matplotlib import matplotlib
matplotlib.use('TkAgg') matplotlib.use('TkAgg')
from ohmpi.utils import change_config from ohmpi.utils import change_config
change_config('../configs/config_mb_2024_0_2__4_mux_2023_dps5005.py', verbose=False) change_config('../configs/config_mb_2024_0_2__4_mux_2023.py', verbose=False)
import importlib import importlib
import time import time
import logging import logging
......
import matplotlib
matplotlib.use('TkAgg')
from ohmpi.utils import change_config
change_config('../configs/config_mb_2024_0_2__4_mux_2023_dps5005.py', verbose=False)
import importlib
import time
import logging
from ohmpi.config import HARDWARE_CONFIG
stand_alone = False
part_of_hardware_system = False
within_ohmpi = True
# Stand alone
if stand_alone:
ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["ctl"].pop("model")}')
pwr_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["pwr"].pop("model")}')
tx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["tx"].pop("model")}')
rx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["rx"].pop("model")}')
ctl = ctl_module.Ctl()
HARDWARE_CONFIG['tx'].update({'ctl': ctl, 'exec_logger': ctl.exec_logger, 'soh_logger': ctl.soh_logger})
HARDWARE_CONFIG['rx'].update({'ctl': ctl, 'exec_logger': ctl.exec_logger, 'soh_logger': ctl.soh_logger})
HARDWARE_CONFIG['tx'].update({'connection': HARDWARE_CONFIG['tx'].pop('connection',
ctl.interfaces[
HARDWARE_CONFIG['tx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['rx'].update({'connection': HARDWARE_CONFIG['rx'].pop('connection',
ctl.interfaces[
HARDWARE_CONFIG['rx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['pwr'].update({'connection': HARDWARE_CONFIG['pwr'].pop('connection',
ctl.interfaces[
HARDWARE_CONFIG['pwr'].pop(
'interface_name', None)])})
rx = rx_module.Rx(**HARDWARE_CONFIG['rx'])
tx = tx_module.Tx(**HARDWARE_CONFIG['tx'])
pwr = pwr_module.Pwr(**HARDWARE_CONFIG['pwr'])
role = 'A'
mux_id = f'mux_{role}'
mux_boards = []
mux_module = importlib.import_module(
f'ohmpi.hardware_components.{HARDWARE_CONFIG["mux"]["boards"][mux_id].pop("model")}')
MUX_CONFIG = HARDWARE_CONFIG['mux']['boards'][mux_id]
MUX_CONFIG.update({'ctl': ctl, 'connection': MUX_CONFIG.pop('connection', ctl.interfaces[
MUX_CONFIG.pop('interface_name', 'i2c_ext')]), 'exec_logger': ctl.exec_logger,
'soh_logger': ctl.soh_logger})
MUX_CONFIG.update({'id': mux_id})
mux = mux_module.Mux(**MUX_CONFIG)
mux.reset()
mux.test({role: [i for i in range(1, 65)]}, activation_time=.1)
mux.reset()
# mux as part of a OhmPiHardware system
if part_of_hardware_system:
from ohmpi.hardware_system import OhmPiHardware
print('Starting test of as part of an OhmPiHardware system.')
# mux_id = 'mux_03'
k = OhmPiHardware()
k.exec_logger.setLevel(logging.DEBUG)
# Test mux switching
k.reset_mux()
# k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='on')
# time.sleep(1.)
# k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='off')
# k.mux_boards[mux_id].test(activation_time=.4)
k.test_mux()
k.reset_mux()
if within_ohmpi:
from ohmpi.ohmpi import OhmPi
# from ohmpi.plots import plot_exec_log
print('Starting test with OhmPi.')
k = OhmPi()
# A, B, M, N = (32, 29, 31, 30)
k.reset_mux()
# k.test_mux(mux_id='mux_03')
# k._hw.switch_mux([A, B, M, N], state='on')
# k._hw.vab_square_wave(12.,1., cycles=2)
# k._hw.switch_mux([A, B, M, N], state='off')
# k._hw.calibrate_rx_bias() # electrodes 1 4 2 3 should be connected to a reference circuit
# k._hw.rx._bias = -1.38
# print(f'Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
# k._hw._plot_readings()
A, B, M, N = (1, 4, 2, 3)
# k._hw.switch_mux([A, B, M, N], state='on')
# k._hw.vab_square_wave(12., cycle_duration=10., cycles=3)
# k._hw.switch_mux([A, B, M, N], state='off')
# print(f'OhmPiHardware Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
# k._hw._plot_readings()
k.load_sequence('sequences/test_circuit_1423.txt')
k.run_sequence(tx_volt=5, injection_duration=1., nb_stack=2, duty_cycle=0.5)
print('using OhmPi')
#d = k.run_measurement([A, B, M, N], injection_duration=1., nb_stack=2, duty_cycle=0.5)
#print(d)
#k._hw._plot_readings()
print(f'OhmPiHardware: Resistance: {k._hw.last_resistance() :.2f} ohm, dev. {k._hw.last_dev():.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV')
print(f'OhmPi: Resistance: {d["R [ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
# k._hw._plot_readings(save_fig=False)
# plot_exec_log('ohmpi/logs/exec.log')
change_config('../configs/config_default.py', verbose=False)
...@@ -8,6 +8,9 @@ while True: ...@@ -8,6 +8,9 @@ while True:
else: else:
mb = input('Choose a measurement boards: [v2023/v2024]: ') mb = input('Choose a measurement boards: [v2023/v2024]: ')
if mb == 'v2024':
mb = 'v2024_0_2_'
mux = None mux = None
while True: while True:
if mux in ['v2023', 'v2024']: if mux in ['v2023', 'v2024']:
...@@ -20,10 +23,18 @@ while True: ...@@ -20,10 +23,18 @@ while True:
if nb_mux in ['0', '1', '2', '3', '4']: if nb_mux in ['0', '1', '2', '3', '4']:
break break
else: else:
nb_mux = input('Number of multiplexers: [0, 1, 2, 3, 4]: ') nb_mux = input('Number of multiplexers: [0/1/2/3/4]: ')
pwr = None
while True:
if pwr in ['battery', 'dps5005']:
break
else:
pwr = input('Tx power: [battery/dps5005]:')
config = 'config_mb_' + mb[1:] + '_' + nb_mux + '_mux_' + mux[1:] + '.py' config = 'config_mb_' + mb[1:] + '_' + nb_mux + '_mux_' + mux[1:] + '.py'
if pwr != 'battery':
config = config.replace('.py', '_' + pwr + '.py')
print('Using this configuration: ' + config) print('Using this configuration: ' + config)
import os import os
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment