Commit 12fea16f authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Implements test with 1 mux

Showing with 66 additions and 44 deletions
+66 -44
import logging import logging
from ohmpi.utils import get_platform from ohmpi.utils import get_platform
from paho.mqtt.client import MQTTv31 # noqa
from paho.mqtt.client import MQTTv31
_, on_pi = get_platform() _, on_pi = get_platform()
# DEFINE THE ID OF YOUR OhmPi # DEFINE THE ID OF YOUR OhmPi
...@@ -32,15 +31,17 @@ HARDWARE_CONFIG = { ...@@ -32,15 +31,17 @@ HARDWARE_CONFIG = {
'sampling_rate': 50, # number of samples per second 'sampling_rate': 50, # number of samples per second
'interface_name': 'i2c' 'interface_name': 'i2c'
}, },
'mux': {'mux_00': 'mux': {'boards':
{'model': 'mux_2024_0_X', {'mux_00':
'tca_address': None, {'model': 'mux_2024_0_X',
'tca_channel': 0, 'tca_address': None,
'mcp_0': '0x24', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...) 'tca_channel': 0,
'mcp_1': '0x25', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...) 'mcp_0': '0x24', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...)
'roles': {'A': 'X', 'B': 'Y', 'M': 'XX', 'N': 'YY'}, 'mcp_1': '0x25', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...)
'cabling': {(i+0, j): ('mux_00', i) for j in ['A', 'B', 'M', 'N'] for i in range(1, 9)}, 'roles': {'A': 'X', 'B': 'Y', 'M': 'XX', 'N': 'YY'},
'voltage_max': 12.}, 'cabling': {(i+0, j): ('mux_00', i) for j in ['A', 'B', 'M', 'N'] for i in range(1, 9)},
'voltage_max': 12.}
},
'default': {'interface_name': 'i2c', 'default': {'interface_name': 'i2c',
'voltage_max': 100., 'voltage_max': 100.,
'current_max': 3.} 'current_max': 3.}
......
import time import matplotlib
matplotlib.use('TkAgg')
from ohmpi.utils import change_config from ohmpi.utils import change_config
from ohmpi.plots import plot_exec_log change_config('../configs/config_mb_2024_0_2__1_mux_2024.py', verbose=False)
import importlib
import time
import logging import logging
change_config('../configs/config_mb_2024_O_2__1_mux_2024.py', verbose=False)
from ohmpi.hardware_components.mux_2024_0_X import Mux
from ohmpi.hardware_components import raspberry_pi as ctl_module
from ohmpi.config import HARDWARE_CONFIG from ohmpi.config import HARDWARE_CONFIG
MUX_CONFIG = HARDWARE_CONFIG['mux']
stand_alone_mux = False stand_alone = False
part_of_hardware_system = False part_of_hardware_system = False
within_ohmpi = True within_ohmpi = True
# Stand alone mux
if stand_alone_mux: # Stand alone
mux_id = 'mux_00' if stand_alone:
first = 24 ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["ctl"].pop("model")}')
print(MUX_CONFIG) pwr_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["pwr"].pop("model")}')
MUX_CONFIG.update(HARDWARE_CONFIG['mux']['boards'][mux_id]) tx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["tx"].pop("model")}')
MUX_CONFIG.update({'id': mux_id}) rx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["rx"].pop("model")}')
MUX_CONFIG['ctl'] = ctl_module.Ctl() mux_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["mux"]["boards"]["mux_00"].pop("model")}')
mux = Mux(**MUX_CONFIG)
mux.switch_one(elec=1+first, role='M', state='on') ctl = ctl_module.Ctl()
HARDWARE_CONFIG['tx'].update({'ctl': ctl}) # HARDWARE_CONFIG['tx'].pop('ctl', ctl_module.Ctl())})
HARDWARE_CONFIG['rx'].update({'ctl': ctl}) # HARDWARE_CONFIG['rx'].pop('ctl', ctl_module.Ctl())})
HARDWARE_CONFIG['tx'].update({'connection': HARDWARE_CONFIG['tx'].pop('connection',
ctl.interfaces[
HARDWARE_CONFIG['tx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['rx'].update({'connection': HARDWARE_CONFIG['rx'].pop('connection',
ctl.interfaces[
HARDWARE_CONFIG['rx'].pop(
'interface_name', 'i2c')])})
HARDWARE_CONFIG['mux'].update({'connection':
HARDWARE_CONFIG['mux']['boards']['mux_00'].pop('connection', ctl.interfaces[
HARDWARE_CONFIG['mux']['boards']['mux_00'].pop('interface_name', 'i2c')])})
rx = rx_module.Rx(**HARDWARE_CONFIG['rx'])
tx = tx_module.Tx(**HARDWARE_CONFIG['tx'])
pwr = pwr_module.Pwr(**HARDWARE_CONFIG['pwr'])
mux = mux_module.Mux(**HARDWARE_CONFIG['mux']['boards']['mux_00'])
tx.polarity = 1
time.sleep(1)
tx.polarity = 0
mux.switch(elec_dict=[1,4,2,3], state='on')
time.sleep(1) time.sleep(1)
mux.switch_one(elec=1+first, role='M', state='off') mux.switch(elec_dict=[1, 4, 2, 3], state='off')
mux.switch({'A': [1], 'B': [2], 'M': [3], 'N': [4]}, state='on')
time.sleep(2)
# mux.switch({'A': [1], 'B': [4], 'M': [2], 'N': [3]}, state='off')
mux.reset()
mux.test({'A': [i+first for i in range(1, 9)], 'B': [i+first for i in range(1, 9)],
'M': [i+first for i in range(1, 9)], 'N': [i+first for i in range(1, 9)]}, activation_time=.1)
# mux as part of a OhmPiHardware system # mux as part of a OhmPiHardware system
if part_of_hardware_system: if part_of_hardware_system:
from ohmpi.hardware_system import OhmPiHardware from ohmpi.hardware_system import OhmPiHardware
print('Starting test of mux as part of a OhmPiHardware system.') print('Starting test of as part of an OhmPiHardware system.')
k = OhmPiHardware() k = OhmPiHardware()
k.exec_logger.setLevel(logging.DEBUG) k.exec_logger.setLevel(logging.DEBUG)
# Test mux switching # Test mux switching
k.reset_mux() k.reset_mux()
k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='on') k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='on')
...@@ -47,7 +61,9 @@ if part_of_hardware_system: ...@@ -47,7 +61,9 @@ if part_of_hardware_system:
if within_ohmpi: if within_ohmpi:
from ohmpi.ohmpi import OhmPi from ohmpi.ohmpi import OhmPi
print('Starting test of mux within OhmPi.') # from ohmpi.plots import plot_exec_log
print('Starting test with OhmPi.')
k = OhmPi() k = OhmPi()
#A, B, M, N = (32, 29, 31, 30) #A, B, M, N = (32, 29, 31, 30)
k.reset_mux() k.reset_mux()
...@@ -58,7 +74,7 @@ if within_ohmpi: ...@@ -58,7 +74,7 @@ if within_ohmpi:
#k._hw.rx._bias = -1.38 #k._hw.rx._bias = -1.38
#print(f'Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV') #print(f'Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
# k._hw._plot_readings() # k._hw._plot_readings()
A, B, M, N = (28, 25, 27, 26) A, B, M, N = (0, 0, 0, 0)
# k._hw.switch_mux([A, B, M, N], state='on') # k._hw.switch_mux([A, B, M, N], state='on')
# k._hw.vab_square_wave(12., cycle_duration=10., cycles=3) # k._hw.vab_square_wave(12., cycle_duration=10., cycles=3)
# k._hw.switch_mux([A, B, M, N], state='off') # k._hw.switch_mux([A, B, M, N], state='off')
...@@ -68,8 +84,8 @@ if within_ohmpi: ...@@ -68,8 +84,8 @@ if within_ohmpi:
d = k.run_measurement([A, B, M, N], injection_duration=1., nb_stack=2, duty_cycle=0.5) d = k.run_measurement([A, B, M, N], injection_duration=1., nb_stack=2, duty_cycle=0.5)
print(d) print(d)
#k._hw._plot_readings() #k._hw._plot_readings()
print(f'OhmPiHardware: Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV') print(f'OhmPiHardware: Resistance: {k._hw.last_resistance() :.2f} ohm, dev. {k._hw.last_dev():.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV')
print(f'OhmPi: Resistance: {d["R [ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV') print(f'OhmPi: Resistance: {d["R [ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV')
k._hw._plot_readings(save_fig=False) k._hw._plot_readings(save_fig=False)
# plot_exec_log('ohmpi/logs/exec.log') # plot_exec_log('ohmpi/logs/exec.log')
change_config('../configs/config_default.py', verbose=False) change_config('../configs/config_default.py', verbose=False)
\ No newline at end of file
...@@ -69,8 +69,6 @@ class Mux(MuxAbstract): ...@@ -69,8 +69,6 @@ class Mux(MuxAbstract):
self.exec_logger.event(f'{self.model}{self.board_id}\tmux_init\tbegin\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.model}{self.board_id}\tmux_init\tbegin\t{datetime.datetime.utcnow()}')
assert isinstance(self.connection, I2C) assert isinstance(self.connection, I2C)
self.exec_logger.debug(f'configuration: {kwargs}') self.exec_logger.debug(f'configuration: {kwargs}')
tca_address = kwargs.pop('tca_address', None)
tca_channel = kwargs.pop('tca_channel', 0)
self._roles = kwargs.pop('roles', None) self._roles = kwargs.pop('roles', None)
if self._roles is None: if self._roles is None:
self._roles = {'A': 'X', 'B': 'Y', 'M': 'XX', 'N': 'YY'} # NOTE: defaults to 4-roles self._roles = {'A': 'X', 'B': 'Y', 'M': 'XX', 'N': 'YY'} # NOTE: defaults to 4-roles
...@@ -81,15 +79,22 @@ class Mux(MuxAbstract): ...@@ -81,15 +79,22 @@ class Mux(MuxAbstract):
else: else:
self.exec_logger.error(f'Invalid role assignment for {self.model}: {self._roles} !') self.exec_logger.error(f'Invalid role assignment for {self.model}: {self._roles} !')
self._mode = '' self._mode = ''
# Setup TCA
tca_address = kwargs.pop('tca_address', None)
tca_channel = kwargs.pop('tca_channel', 0)
if tca_address is None: if tca_address is None:
self._tca = self.connection self._tca = self.connection
else: else:
self._tca = adafruit_tca9548a.TCA9548A(self.connection, tca_address)[tca_channel] self._tca = adafruit_tca9548a.TCA9548A(self.connection, tca_address)[tca_channel]
# Setup MCPs
self._mcp_addresses = (kwargs.pop('mcp_0', '0x22'), kwargs.pop('mcp_1', '0x23')) # TODO: add assert on valid addresses.. self._mcp_addresses = (kwargs.pop('mcp_0', '0x22'), kwargs.pop('mcp_1', '0x23')) # TODO: add assert on valid addresses..
self._mcp = [None, None] self._mcp = [None, None]
self.reset() self.reset()
if self.addresses is None: if self.addresses is None:
self._get_addresses() self._get_addresses()
self.exec_logger.debug(f'{self.board_id} addresses: {self.addresses}') self.exec_logger.debug(f'{self.board_id} addresses: {self.addresses}')
if not subclass_init: # TODO: try to only log this event and not the one created by super() if not subclass_init: # TODO: try to only log this event and not the one created by super()
self.exec_logger.event(f'{self.model}_{self.board_id}\tmux_init\tend\t{datetime.datetime.utcnow()}') self.exec_logger.event(f'{self.model}_{self.board_id}\tmux_init\tend\t{datetime.datetime.utcnow()}')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment