diff --git a/dev/test_2_mux_2024.py b/dev/test_2_mux_2024.py new file mode 100644 index 0000000000000000000000000000000000000000..036fa50e0ffe6e59a6955fa4da07d1855bf09912 --- /dev/null +++ b/dev/test_2_mux_2024.py @@ -0,0 +1,50 @@ +import time +from ohmpi.utils import change_config +import logging +change_config('../configs/config_mb_2023_2_mux_2024.py', verbose=False) +from ohmpi.hardware_components.mux_2024_rev_0_0 import Mux, MUX_CONFIG +from ohmpi.hardware_components import raspberry_pi_i2c as ctl_module +# from ohmpi.config import HARDWARE_CONFIG + +stand_alone_mux = False +part_of_hardware_system = False +within_ohmpi = True +# Stand alone mux +if stand_alone_mux: + MUX_CONFIG['ctl'] = ctl_module.Ctl() + MUX_CONFIG['id'] = 'mux_1' + MUX_CONFIG['cabling'] = {(i+8, j) : ('mux_1', i) for j in ['A', 'B', 'M', 'N'] for i in range(1,9)} + mux = Mux(**MUX_CONFIG) + mux.switch_one(elec=9, role='M', state='on') + time.sleep(2) + mux.switch_one(elec=9, role='M', state='off') + mux.switch({'A': [9], 'B': [12], 'M': [10], 'N': [11]}, state='on') + time.sleep(8) + # mux.switch({'A': [1], 'B': [4], 'M': [2], 'N': [3]}, state='off') + mux.reset() + mux.test({'A': [9, 10, 11, 12, 13, 14, 15, 16], 'B': [9, 10, 11, 12, 13, 14, 15, 16], + 'M': [9, 10, 11, 12, 13, 14, 15, 16], 'N': [9, 10, 11, 12, 13, 14, 15, 16]}, activation_time=.1) + +# mux as part of a OhmPiHardware system +if part_of_hardware_system: + from ohmpi.hardware_system import OhmPiHardware + print('Starting test of mux as part of a OhmPiHardware system.') + + k = OhmPiHardware() + k.exec_logger.setLevel(logging.DEBUG) + + # Test mux switching + k.reset_mux() + k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='on') + time.sleep(1.) + k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='off') + +if within_ohmpi: + from ohmpi.ohmpi import OhmPi + print('Starting test of mux within OhmPi.') + k = OhmPi() + k.switch_mux_on([1, 4, 2, 3]) + time.sleep(1.) + k.reset_mux() + +change_config('../configs/config_default.py', verbose=False) \ No newline at end of file diff --git a/ohmpi/hardware_system.py b/ohmpi/hardware_system.py index 209ccb76bafcd19bf2006ec5627b9a2b825b46b8..9dfd83c1360062de5a9aaefc495915ce3a26d07f 100644 --- a/ohmpi/hardware_system.py +++ b/ohmpi/hardware_system.py @@ -60,13 +60,13 @@ class OhmPiHardware: HARDWARE_CONFIG['rx'].pop('model') HARDWARE_CONFIG['rx'].update(**HARDWARE_CONFIG['rx']) - HARDWARE_CONFIG['rx'].update({'ctl':self.ctl}) + HARDWARE_CONFIG['rx'].update({'ctl': self.ctl}) HARDWARE_CONFIG['rx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger, 'soh_logger': self.soh_logger}) self.rx = kwargs.pop('rx', rx_module.Rx(**HARDWARE_CONFIG['rx'])) HARDWARE_CONFIG['pwr'].pop('model') HARDWARE_CONFIG['pwr'].update(**HARDWARE_CONFIG['pwr']) - HARDWARE_CONFIG['pwr'].update({'ctl':self.ctl}) + HARDWARE_CONFIG['pwr'].update({'ctl': self.ctl}) HARDWARE_CONFIG['pwr'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger, 'soh_logger': self.soh_logger}) self.pwr = kwargs.pop('pwr', pwr_module.Pwr(**HARDWARE_CONFIG['pwr'])) @@ -112,7 +112,7 @@ class OhmPiHardware: self.tx.adc_gain_auto() self.rx.adc_gain_auto() - def _inject(self, polarity=1, inj_time=None): # TODO: deal with voltage or current pulse + def _inject(self, polarity=1, inj_time=None): # TODO: deal with voltage or current pulse self.tx_sync.set() self.tx.voltage_pulse(length=inj_time, polarity=polarity) self.tx_sync.clear() @@ -165,11 +165,11 @@ class OhmPiHardware: mean_vmn = [] mean_iab = [] for i in range(n_pulses + 1): - mean_vmn.append(np.mean(self.readings[self.readings[:, 1]==i, 4])) - mean_iab.append(np.mean(self.readings[self.readings[:, 1]==i, 3])) + mean_vmn.append(np.mean(self.readings[self.readings[:, 1] == i, 4])) + mean_iab.append(np.mean(self.readings[self.readings[:, 1] == i, 3])) mean_vmn = np.array(mean_vmn) mean_iab = np.array(mean_iab) - sp = np.mean(mean_vmn[np.ix_(polarity==1)] - mean_vmn[np.ix_(polarity==-1)]) / 2 + sp = np.mean(mean_vmn[np.ix_(polarity == 1)] - mean_vmn[np.ix_(polarity == -1)]) / 2 return sp def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5, @@ -221,14 +221,14 @@ class OhmPiHardware: sampling_rate = best_tx_injtime # TODO: check this... else: sampling_rate = self.tx.sampling_rate - self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate) # TODO: use a square wave pulse? - vmn = np.mean(self.readings[:,4]) - iab = np.mean(self.readings[:,3]) + self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate) # TODO: use a square wave pulse? + vmn = np.mean(self.readings[:, 4]) + iab = np.mean(self.readings[:, 3]) # if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max , # then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example if strategy == 'vmax': # implement different strategies - if vab < vab_max and iab < current_max : + if vab < vab_max and iab < current_max: vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.') elif strategy == 'vmin': @@ -278,7 +278,7 @@ class OhmPiHardware: else: vab = self.tx.pwr.voltage # reads current and voltage during the pulse - injection = Thread(target=self._inject, kwargs={'inj_time':length, 'polarity': polarity}) + injection = Thread(target=self._inject, kwargs={'inj_time': length, 'polarity': polarity}) readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append}) readings.start() injection.start() @@ -325,7 +325,8 @@ class OhmPiHardware: if mux not in mux_workers: mux_workers.append(mux) except KeyError: - self.exec_logger.debug(f'Unable to switch {state} ({elec}, {roles[idx]}): not in cabling and will be ignored...') + self.exec_logger.debug(f'Unable to switch {state} ({elec}, {roles[idx]})' + f': not in cabling and will be ignored...') status = False if status: mux_workers = list(set(mux_workers)) @@ -334,7 +335,8 @@ class OhmPiHardware: for idx, mux in enumerate(mux_workers): # Create a new thread to perform some work self.mux_boards[mux].barrier = b - mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs={'elec_dict': elec_dict, 'state': state}) + mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs={'elec_dict': elec_dict, + 'state': state}) mux_workers[idx].start() self.mux_barrier.wait() for mux_worker in mux_workers: @@ -364,13 +366,13 @@ class OhmPiHardware: except Exception as e: self.exec_logger.error(f'Unable to parse channel: {e}') return - self.switch_mux(electrodes,roles,state='on') + self.switch_mux(electrodes, roles, state='on') time.sleep(activation_time) - self.switch_mux(electrodes,roles, state='off') + self.switch_mux(electrodes, roles, state='off') else: for c in self._cabling.keys(): self.exec_logger.info(f'Testing electrode {c[0]} with role {c[1]}.') - self.switch_mux(electrodes=[c[0]],roles=[c[1]],state='on') + self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='on') time.sleep(activation_time) self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='off') self.exec_logger.info('Test finished.') @@ -382,4 +384,4 @@ class OhmPiHardware: self.exec_logger.debug('Resetting all mux boards ...') for mux_id, mux in self.mux_boards.items(): # noqa self.exec_logger.debug(f'Resetting {mux_id}.') - mux.reset() \ No newline at end of file + mux.reset() diff --git a/ohmpi/ohmpi.py b/ohmpi/ohmpi.py index 57d9db7a1f8971292ba34924b547f492bb0797fd..b89b4ea3e3d8b68358bb9b82f16bb749a7631aaf 100644 --- a/ohmpi/ohmpi.py +++ b/ohmpi/ohmpi.py @@ -23,7 +23,7 @@ from termcolor import colored from logging import DEBUG from ohmpi.utils import get_platform from ohmpi.logging_setup import setup_loggers -from ohmpi.config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG, HARDWARE_CONFIG +from ohmpi.config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG import ohmpi.deprecated as deprecated from ohmpi.hardware_system import OhmPiHardware @@ -76,9 +76,7 @@ class OhmPi(object): self.exec_logger, _, self.data_logger, _, self.soh_logger, _, _, msg = setup_loggers(mqtt=mqtt) print(msg) - # read in hardware parameters (config.py) - # HARDWARE_CONFIG.update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger, - # 'soh_logger': self.soh_logger}) + # specify loggers when instancing the hardware self._hw = OhmPiHardware(**{'exec_logger': self.exec_logger, 'data_logger': self.data_logger, 'soh_logger': self.soh_logger}) self.exec_logger.info('Hardware configured...')