Commit b0fdf141 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Tests using 2 muxes

Showing with 71 additions and 21 deletions
+71 -21
import time
from ohmpi.utils import change_config
import logging
change_config('../configs/config_mb_2023_2_mux_2024.py', verbose=False)
from ohmpi.hardware_components.mux_2024_rev_0_0 import Mux, MUX_CONFIG
from ohmpi.hardware_components import raspberry_pi_i2c as ctl_module
# from ohmpi.config import HARDWARE_CONFIG
stand_alone_mux = False
part_of_hardware_system = False
within_ohmpi = True
# Stand alone mux
if stand_alone_mux:
MUX_CONFIG['ctl'] = ctl_module.Ctl()
MUX_CONFIG['id'] = 'mux_1'
MUX_CONFIG['cabling'] = {(i+8, j) : ('mux_1', i) for j in ['A', 'B', 'M', 'N'] for i in range(1,9)}
mux = Mux(**MUX_CONFIG)
mux.switch_one(elec=9, role='M', state='on')
time.sleep(2)
mux.switch_one(elec=9, role='M', state='off')
mux.switch({'A': [9], 'B': [12], 'M': [10], 'N': [11]}, state='on')
time.sleep(8)
# mux.switch({'A': [1], 'B': [4], 'M': [2], 'N': [3]}, state='off')
mux.reset()
mux.test({'A': [9, 10, 11, 12, 13, 14, 15, 16], 'B': [9, 10, 11, 12, 13, 14, 15, 16],
'M': [9, 10, 11, 12, 13, 14, 15, 16], 'N': [9, 10, 11, 12, 13, 14, 15, 16]}, activation_time=.1)
# mux as part of a OhmPiHardware system
if part_of_hardware_system:
from ohmpi.hardware_system import OhmPiHardware
print('Starting test of mux as part of a OhmPiHardware system.')
k = OhmPiHardware()
k.exec_logger.setLevel(logging.DEBUG)
# Test mux switching
k.reset_mux()
k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='on')
time.sleep(1.)
k.switch_mux(electrodes=[1, 4, 2, 3], roles=['A', 'B', 'M', 'N'], state='off')
if within_ohmpi:
from ohmpi.ohmpi import OhmPi
print('Starting test of mux within OhmPi.')
k = OhmPi()
k.switch_mux_on([1, 4, 2, 3])
time.sleep(1.)
k.reset_mux()
change_config('../configs/config_default.py', verbose=False)
\ No newline at end of file
......@@ -60,13 +60,13 @@ class OhmPiHardware:
HARDWARE_CONFIG['rx'].pop('model')
HARDWARE_CONFIG['rx'].update(**HARDWARE_CONFIG['rx'])
HARDWARE_CONFIG['rx'].update({'ctl':self.ctl})
HARDWARE_CONFIG['rx'].update({'ctl': self.ctl})
HARDWARE_CONFIG['rx'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
self.rx = kwargs.pop('rx', rx_module.Rx(**HARDWARE_CONFIG['rx']))
HARDWARE_CONFIG['pwr'].pop('model')
HARDWARE_CONFIG['pwr'].update(**HARDWARE_CONFIG['pwr'])
HARDWARE_CONFIG['pwr'].update({'ctl':self.ctl})
HARDWARE_CONFIG['pwr'].update({'ctl': self.ctl})
HARDWARE_CONFIG['pwr'].update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
self.pwr = kwargs.pop('pwr', pwr_module.Pwr(**HARDWARE_CONFIG['pwr']))
......@@ -112,7 +112,7 @@ class OhmPiHardware:
self.tx.adc_gain_auto()
self.rx.adc_gain_auto()
def _inject(self, polarity=1, inj_time=None): # TODO: deal with voltage or current pulse
def _inject(self, polarity=1, inj_time=None): # TODO: deal with voltage or current pulse
self.tx_sync.set()
self.tx.voltage_pulse(length=inj_time, polarity=polarity)
self.tx_sync.clear()
......@@ -165,11 +165,11 @@ class OhmPiHardware:
mean_vmn = []
mean_iab = []
for i in range(n_pulses + 1):
mean_vmn.append(np.mean(self.readings[self.readings[:, 1]==i, 4]))
mean_iab.append(np.mean(self.readings[self.readings[:, 1]==i, 3]))
mean_vmn.append(np.mean(self.readings[self.readings[:, 1] == i, 4]))
mean_iab.append(np.mean(self.readings[self.readings[:, 1] == i, 3]))
mean_vmn = np.array(mean_vmn)
mean_iab = np.array(mean_iab)
sp = np.mean(mean_vmn[np.ix_(polarity==1)] - mean_vmn[np.ix_(polarity==-1)]) / 2
sp = np.mean(mean_vmn[np.ix_(polarity == 1)] - mean_vmn[np.ix_(polarity == -1)]) / 2
return sp
def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5,
......@@ -221,14 +221,14 @@ class OhmPiHardware:
sampling_rate = best_tx_injtime # TODO: check this...
else:
sampling_rate = self.tx.sampling_rate
self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
vmn = np.mean(self.readings[:,4])
iab = np.mean(self.readings[:,3])
self._vab_pulse(vab=vab, length=best_tx_injtime, sampling_rate=sampling_rate) # TODO: use a square wave pulse?
vmn = np.mean(self.readings[:, 4])
iab = np.mean(self.readings[:, 3])
# if np.abs(vmn) is too small (smaller than voltage_min), strategy is not constant and vab < vab_max ,
# then we could call _compute_tx_volt with a tx_volt increased to np.min([vab_max, tx_volt*2.]) for example
if strategy == 'vmax':
# implement different strategies
if vab < vab_max and iab < current_max :
if vab < vab_max and iab < current_max:
vab = vab * np.min([0.9 * vab_max / vab, 0.9 * current_max / iab]) # TODO: check if setting at 90% of max as a safety margin is OK
self.tx.exec_logger.debug(f'vmax strategy: setting VAB to {vab} V.')
elif strategy == 'vmin':
......@@ -278,7 +278,7 @@ class OhmPiHardware:
else:
vab = self.tx.pwr.voltage
# reads current and voltage during the pulse
injection = Thread(target=self._inject, kwargs={'inj_time':length, 'polarity': polarity})
injection = Thread(target=self._inject, kwargs={'inj_time': length, 'polarity': polarity})
readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append})
readings.start()
injection.start()
......@@ -325,7 +325,8 @@ class OhmPiHardware:
if mux not in mux_workers:
mux_workers.append(mux)
except KeyError:
self.exec_logger.debug(f'Unable to switch {state} ({elec}, {roles[idx]}): not in cabling and will be ignored...')
self.exec_logger.debug(f'Unable to switch {state} ({elec}, {roles[idx]})'
f': not in cabling and will be ignored...')
status = False
if status:
mux_workers = list(set(mux_workers))
......@@ -334,7 +335,8 @@ class OhmPiHardware:
for idx, mux in enumerate(mux_workers):
# Create a new thread to perform some work
self.mux_boards[mux].barrier = b
mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs={'elec_dict': elec_dict, 'state': state})
mux_workers[idx] = Thread(target=self.mux_boards[mux].switch, kwargs={'elec_dict': elec_dict,
'state': state})
mux_workers[idx].start()
self.mux_barrier.wait()
for mux_worker in mux_workers:
......@@ -364,13 +366,13 @@ class OhmPiHardware:
except Exception as e:
self.exec_logger.error(f'Unable to parse channel: {e}')
return
self.switch_mux(electrodes,roles,state='on')
self.switch_mux(electrodes, roles, state='on')
time.sleep(activation_time)
self.switch_mux(electrodes,roles, state='off')
self.switch_mux(electrodes, roles, state='off')
else:
for c in self._cabling.keys():
self.exec_logger.info(f'Testing electrode {c[0]} with role {c[1]}.')
self.switch_mux(electrodes=[c[0]],roles=[c[1]],state='on')
self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='on')
time.sleep(activation_time)
self.switch_mux(electrodes=[c[0]], roles=[c[1]], state='off')
self.exec_logger.info('Test finished.')
......@@ -382,4 +384,4 @@ class OhmPiHardware:
self.exec_logger.debug('Resetting all mux boards ...')
for mux_id, mux in self.mux_boards.items(): # noqa
self.exec_logger.debug(f'Resetting {mux_id}.')
mux.reset()
\ No newline at end of file
mux.reset()
......@@ -23,7 +23,7 @@ from termcolor import colored
from logging import DEBUG
from ohmpi.utils import get_platform
from ohmpi.logging_setup import setup_loggers
from ohmpi.config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG, HARDWARE_CONFIG
from ohmpi.config import MQTT_CONTROL_CONFIG, OHMPI_CONFIG, EXEC_LOGGING_CONFIG
import ohmpi.deprecated as deprecated
from ohmpi.hardware_system import OhmPiHardware
......@@ -76,9 +76,7 @@ class OhmPi(object):
self.exec_logger, _, self.data_logger, _, self.soh_logger, _, _, msg = setup_loggers(mqtt=mqtt)
print(msg)
# read in hardware parameters (config.py)
# HARDWARE_CONFIG.update({'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
# 'soh_logger': self.soh_logger})
# specify loggers when instancing the hardware
self._hw = OhmPiHardware(**{'exec_logger': self.exec_logger, 'data_logger': self.data_logger,
'soh_logger': self.soh_logger})
self.exec_logger.info('Hardware configured...')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment