diff --git a/configs/config_mb_2024_0_2__1_mux_2024.py b/configs/config_mb_2024_0_2__1_mux_2024.py index b6bedc9a9ffcc89e277f39df553c9f1658b67b00..ebac0b709c32c01fde12007a343adeaae6a8795d 100644 --- a/configs/config_mb_2024_0_2__1_mux_2024.py +++ b/configs/config_mb_2024_0_2__1_mux_2024.py @@ -1,7 +1,6 @@ import logging from ohmpi.utils import get_platform - -from paho.mqtt.client import MQTTv31 +from paho.mqtt.client import MQTTv31 # noqa _, on_pi = get_platform() # DEFINE THE ID OF YOUR OhmPi @@ -32,15 +31,17 @@ HARDWARE_CONFIG = { 'sampling_rate': 50, # number of samples per second 'interface_name': 'i2c' }, - 'mux': {'mux_00': - {'model': 'mux_2024_0_X', - 'tca_address': None, - 'tca_channel': 0, - 'mcp_0': '0x24', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...) - 'mcp_1': '0x25', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...) - 'roles': {'A': 'X', 'B': 'Y', 'M': 'XX', 'N': 'YY'}, - 'cabling': {(i+0, j): ('mux_00', i) for j in ['A', 'B', 'M', 'N'] for i in range(1, 9)}, - 'voltage_max': 12.}, + 'mux': {'boards': + {'mux_00': + {'model': 'mux_2024_0_X', + 'tca_address': None, + 'tca_channel': 0, + 'mcp_0': '0x24', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...) + 'mcp_1': '0x25', # TODO : Replace this with pos of jumper on MUX board (address doesn't mean anything for the average user...) + 'roles': {'A': 'X', 'B': 'Y', 'M': 'XX', 'N': 'YY'}, + 'cabling': {(i+0, j): ('mux_00', i) for j in ['A', 'B', 'M', 'N'] for i in range(1, 9)}, + 'voltage_max': 12.} + }, 'default': {'interface_name': 'i2c', 'voltage_max': 100., 'current_max': 3.} diff --git a/dev/test_mb_2024_1_mux_2024.py b/dev/test_mb_2024_1_mux_2024.py index 5f201492bb9667a314dea3d8cca429109221a950..48ca449ef660770797d28a85e8f98a89b0de4566 100644 --- a/dev/test_mb_2024_1_mux_2024.py +++ b/dev/test_mb_2024_1_mux_2024.py @@ -1,44 +1,58 @@ -import time +import matplotlib +matplotlib.use('TkAgg') from ohmpi.utils import change_config -from ohmpi.plots import plot_exec_log +change_config('../configs/config_mb_2024_0_2__1_mux_2024.py', verbose=False) +import importlib +import time import logging -change_config('../configs/config_mb_2024_O_2__1_mux_2024.py', verbose=False) -from ohmpi.hardware_components.mux_2024_0_X import Mux -from ohmpi.hardware_components import raspberry_pi as ctl_module from ohmpi.config import HARDWARE_CONFIG -MUX_CONFIG = HARDWARE_CONFIG['mux'] - -stand_alone_mux = False +stand_alone = False part_of_hardware_system = False within_ohmpi = True -# Stand alone mux -if stand_alone_mux: - mux_id = 'mux_00' - first = 24 - print(MUX_CONFIG) - MUX_CONFIG.update(HARDWARE_CONFIG['mux']['boards'][mux_id]) - MUX_CONFIG.update({'id': mux_id}) - MUX_CONFIG['ctl'] = ctl_module.Ctl() - mux = Mux(**MUX_CONFIG) - mux.switch_one(elec=1+first, role='M', state='on') + +# Stand alone +if stand_alone: + ctl_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["ctl"].pop("model")}') + pwr_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["pwr"].pop("model")}') + tx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["tx"].pop("model")}') + rx_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["rx"].pop("model")}') + mux_module = importlib.import_module(f'ohmpi.hardware_components.{HARDWARE_CONFIG["mux"]["boards"]["mux_00"].pop("model")}') + + ctl = ctl_module.Ctl() + HARDWARE_CONFIG['tx'].update({'ctl': ctl}) # HARDWARE_CONFIG['tx'].pop('ctl', ctl_module.Ctl())}) + HARDWARE_CONFIG['rx'].update({'ctl': ctl}) # HARDWARE_CONFIG['rx'].pop('ctl', ctl_module.Ctl())}) + HARDWARE_CONFIG['tx'].update({'connection': HARDWARE_CONFIG['tx'].pop('connection', + ctl.interfaces[ + HARDWARE_CONFIG['tx'].pop( + 'interface_name', 'i2c')])}) + HARDWARE_CONFIG['rx'].update({'connection': HARDWARE_CONFIG['rx'].pop('connection', + ctl.interfaces[ + HARDWARE_CONFIG['rx'].pop( + 'interface_name', 'i2c')])}) + HARDWARE_CONFIG['mux'].update({'connection': + HARDWARE_CONFIG['mux']['boards']['mux_00'].pop('connection', ctl.interfaces[ + HARDWARE_CONFIG['mux']['boards']['mux_00'].pop('interface_name', 'i2c')])}) + + rx = rx_module.Rx(**HARDWARE_CONFIG['rx']) + tx = tx_module.Tx(**HARDWARE_CONFIG['tx']) + pwr = pwr_module.Pwr(**HARDWARE_CONFIG['pwr']) + mux = mux_module.Mux(**HARDWARE_CONFIG['mux']['boards']['mux_00']) + + tx.polarity = 1 + time.sleep(1) + tx.polarity = 0 + mux.switch(elec_dict=[1,4,2,3], state='on') time.sleep(1) - mux.switch_one(elec=1+first, role='M', state='off') - mux.switch({'A': [1], 'B': [2], 'M': [3], 'N': [4]}, state='on') - time.sleep(2) - # mux.switch({'A': [1], 'B': [4], 'M': [2], 'N': [3]}, state='off') - mux.reset() - mux.test({'A': [i+first for i in range(1, 9)], 'B': [i+first for i in range(1, 9)], - 'M': [i+first for i in range(1, 9)], 'N': [i+first for i in range(1, 9)]}, activation_time=.1) + mux.switch(elec_dict=[1, 4, 2, 3], state='off') # mux as part of a OhmPiHardware system if part_of_hardware_system: from ohmpi.hardware_system import OhmPiHardware - print('Starting test of mux as part of a OhmPiHardware system.') + print('Starting test of as part of an OhmPiHardware system.') k = OhmPiHardware() k.exec_logger.setLevel(logging.DEBUG) - # Test mux switching k.reset_mux() k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='on') @@ -47,7 +61,9 @@ if part_of_hardware_system: if within_ohmpi: from ohmpi.ohmpi import OhmPi - print('Starting test of mux within OhmPi.') + # from ohmpi.plots import plot_exec_log + + print('Starting test with OhmPi.') k = OhmPi() #A, B, M, N = (32, 29, 31, 30) k.reset_mux() @@ -58,7 +74,7 @@ if within_ohmpi: #k._hw.rx._bias = -1.38 #print(f'Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV') # k._hw._plot_readings() - A, B, M, N = (28, 25, 27, 26) + A, B, M, N = (0, 0, 0, 0) # k._hw.switch_mux([A, B, M, N], state='on') # k._hw.vab_square_wave(12., cycle_duration=10., cycles=3) # k._hw.switch_mux([A, B, M, N], state='off') @@ -68,8 +84,8 @@ if within_ohmpi: d = k.run_measurement([A, B, M, N], injection_duration=1., nb_stack=2, duty_cycle=0.5) print(d) #k._hw._plot_readings() - print(f'OhmPiHardware: Resistance: {k._hw.last_rho :.2f} ohm, dev. {k._hw.last_dev:.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV') + print(f'OhmPiHardware: Resistance: {k._hw.last_resistance() :.2f} ohm, dev. {k._hw.last_dev():.2f} %, sp: {k._hw.sp:.2f} mV, rx bias: {k._hw.rx._bias:.2f} mV') print(f'OhmPi: Resistance: {d["R [ohm]"] :.2f} ohm, dev. {d["R_std [%]"]:.2f} %, rx bias: {k._hw.rx._bias:.2f} mV') k._hw._plot_readings(save_fig=False) # plot_exec_log('ohmpi/logs/exec.log') -change_config('../configs/config_default.py', verbose=False) \ No newline at end of file +change_config('../configs/config_default.py', verbose=False) diff --git a/ohmpi/hardware_components/mux_2024_0_X.py b/ohmpi/hardware_components/mux_2024_0_X.py index c7cfce2c3232d96ced942b544791a43881b83847..decd5461c1151d72c233b955a4d272c2e35897b0 100644 --- a/ohmpi/hardware_components/mux_2024_0_X.py +++ b/ohmpi/hardware_components/mux_2024_0_X.py @@ -69,8 +69,6 @@ class Mux(MuxAbstract): self.exec_logger.event(f'{self.model}{self.board_id}\tmux_init\tbegin\t{datetime.datetime.utcnow()}') assert isinstance(self.connection, I2C) self.exec_logger.debug(f'configuration: {kwargs}') - tca_address = kwargs.pop('tca_address', None) - tca_channel = kwargs.pop('tca_channel', 0) self._roles = kwargs.pop('roles', None) if self._roles is None: self._roles = {'A': 'X', 'B': 'Y', 'M': 'XX', 'N': 'YY'} # NOTE: defaults to 4-roles @@ -81,15 +79,22 @@ class Mux(MuxAbstract): else: self.exec_logger.error(f'Invalid role assignment for {self.model}: {self._roles} !') self._mode = '' + + # Setup TCA + tca_address = kwargs.pop('tca_address', None) + tca_channel = kwargs.pop('tca_channel', 0) if tca_address is None: self._tca = self.connection else: self._tca = adafruit_tca9548a.TCA9548A(self.connection, tca_address)[tca_channel] + + # Setup MCPs self._mcp_addresses = (kwargs.pop('mcp_0', '0x22'), kwargs.pop('mcp_1', '0x23')) # TODO: add assert on valid addresses.. self._mcp = [None, None] self.reset() if self.addresses is None: self._get_addresses() + self.exec_logger.debug(f'{self.board_id} addresses: {self.addresses}') if not subclass_init: # TODO: try to only log this event and not the one created by super() self.exec_logger.event(f'{self.model}_{self.board_id}\tmux_init\tend\t{datetime.datetime.utcnow()}')