Commit 65e2ee0f authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Tries to tackle issue #110

Showing with 21 additions and 6 deletions
+21 -6
...@@ -119,6 +119,8 @@ class MuxAbstract(ABC): ...@@ -119,6 +119,8 @@ class MuxAbstract(ABC):
self.exec_logger.debug(f'{self.board_id} cabling: {self.cabling}') self.exec_logger.debug(f'{self.board_id} cabling: {self.cabling}')
self.addresses = kwargs.pop('addresses', None) self.addresses = kwargs.pop('addresses', None)
self._barrier = kwargs.pop('barrier', Barrier(1)) self._barrier = kwargs.pop('barrier', Barrier(1))
self._activation_delay = kwargs.pop('activation_delay', 0.) # in s
self._release_delay = kwargs.pop('release_delay', 0.) # in s
@abstractmethod @abstractmethod
def _get_addresses(self): def _get_addresses(self):
...@@ -154,7 +156,7 @@ class MuxAbstract(ABC): ...@@ -154,7 +156,7 @@ class MuxAbstract(ABC):
# check to prevent A == B (SHORT-CIRCUIT) # check to prevent A == B (SHORT-CIRCUIT)
if 'A' in elec_dict.keys() and 'B' in elec_dict.keys(): if 'A' in elec_dict.keys() and 'B' in elec_dict.keys():
out = np.in1d(elec_dict['A'], elec_dict['B']) out = np.in1d(elec_dict['A'], elec_dict['B'])
if out.any() and state=='on': # noqa if out.any() and state == 'on': # noqa
self.exec_logger.error('Trying to switch on some electrodes with both A and B roles. ' self.exec_logger.error('Trying to switch on some electrodes with both A and B roles. '
'This would create a short-circuit! Switching aborted.') 'This would create a short-circuit! Switching aborted.')
status = False status = False
...@@ -183,13 +185,17 @@ class MuxAbstract(ABC): ...@@ -183,13 +185,17 @@ class MuxAbstract(ABC):
self.switch_one(elec, role, state) self.switch_one(elec, role, state)
status &= True status &= True
else: else:
self.exec_logger.warning(f'{self.board_id} skipping switching {(elec, role)} because it ' self.exec_logger.debug(f'{self.board_id} skipping switching {(elec, role)} because it '
f'is not in board cabling.') f'is not in board cabling.')
status = False status = False
self.exec_logger.debug(f'{self.board_id} switching done.') self.exec_logger.debug(f'{self.board_id} switching done.')
else: else:
self.exec_logger.warning(f'Missing argument for {self.board_name}.switch: elec_dict is None.') self.exec_logger.warning(f'Missing argument for {self.board_name}.switch: elec_dict is None.')
status = False status = False
if state == 'on':
time.sleep(self._activation_delay)
elif state == 'off':
time.sleep(self._release_delay)
return status return status
@abstractmethod @abstractmethod
......
...@@ -10,7 +10,7 @@ from digitalio import Direction # noqa ...@@ -10,7 +10,7 @@ from digitalio import Direction # noqa
MUX_CONFIG = HARDWARE_CONFIG['mux'].pop('default', {}) MUX_CONFIG = HARDWARE_CONFIG['mux'].pop('default', {})
MUX_CONFIG.update({'voltage_max': 50., 'current_max': 3.}) # board default values that overwrite system default values MUX_CONFIG.update({'voltage_max': 50., 'current_max': 3.}) # board default values that overwrite system default values
MUX_CONFIG.update({'activation_delay': 0.01, 'release_delay': 0.005}) # s MUX_CONFIG.update({'activation_delay': 0.01, 'release_delay': 0.005}) # s
default_mux_cabling = {(elec, role) : ('mux_1', elec) for role in ['A', 'B', 'M', 'N'] for elec in range(1,9)} # defaults to 4 roles cabling electrodes from 1 to 8 default_mux_cabling = {(elec, role) : ('mux_1', elec) for role in ['A', 'B', 'M', 'N'] for elec in range(1,9)} # defaults to 4 roles cabling electrodes from 1 to 8
...@@ -57,6 +57,10 @@ class Mux(MuxAbstract): ...@@ -57,6 +57,10 @@ class Mux(MuxAbstract):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
if 'cabling' not in kwargs.keys() or kwargs['cabling']=={}: if 'cabling' not in kwargs.keys() or kwargs['cabling']=={}:
kwargs.update({'cabling': default_mux_cabling}) kwargs.update({'cabling': default_mux_cabling})
if 'activation_delay' not in kwargs:
kwargs.update({'activation_delay': MUX_CONFIG['activation_delay']})
if 'release_delay' not in kwargs:
kwargs.update({'release_delay': MUX_CONFIG['release_delay']})
super().__init__(**kwargs) super().__init__(**kwargs)
self.exec_logger.debug(f'configuration: {MUX_CONFIG}') self.exec_logger.debug(f'configuration: {MUX_CONFIG}')
tca_address = kwargs.pop('tca_address', None) tca_address = kwargs.pop('tca_address', None)
...@@ -106,7 +110,7 @@ class Mux(MuxAbstract): ...@@ -106,7 +110,7 @@ class Mux(MuxAbstract):
d = self.addresses[elec, role] d = self.addresses[elec, role]
if state == 'on': if state == 'on':
activate_relay(self._mcp[d['MCP']], d['MCP_GPIO'], True) activate_relay(self._mcp[d['MCP']], d['MCP_GPIO'], True)
time.sleep(MUX_CONFIG['activation_delay']) # time.sleep(MUX_CONFIG['activation_delay']) # NOTE: moved to MuxAbstract switch
if state == 'off': if state == 'off':
activate_relay(self._mcp[d['MCP']], d['MCP_GPIO'], False) activate_relay(self._mcp[d['MCP']], d['MCP_GPIO'], False)
time.sleep(MUX_CONFIG['release_delay']) # time.sleep(MUX_CONFIG['release_delay']) # NOTE: moved to MuxAbstract switch
\ No newline at end of file
...@@ -173,7 +173,12 @@ class OhmPiHardware: ...@@ -173,7 +173,12 @@ class OhmPiHardware:
self.rx.voltage]) self.rx.voltage])
sample += 1 sample += 1
sleep_time = self._start_time + datetime.timedelta(seconds=sample * sampling_rate / 1000) - lap sleep_time = self._start_time + datetime.timedelta(seconds=sample * sampling_rate / 1000) - lap
time.sleep(np.max([0., sleep_time.total_seconds()])) # TODO set readings to nan if sleep time <0 and skip the sample (sample +=1) if sleep_time < 0.:
_readings.append([elapsed_seconds(self._start_time), self._pulse, self.tx.polarity, np.nan, np.nan]) # TODO:
sample += 1
else:
time.sleep(np.max([0., sleep_time.total_seconds()])) # TODO: set readings to nan if sleep time <0 and skip the sample (sample +=1)
self.exec_logger.warning(f'pulse {self._pulse}: elapsed time {(lap-self._start_time).total_seconds()} s') # TODO: Set to debug level self.exec_logger.warning(f'pulse {self._pulse}: elapsed time {(lap-self._start_time).total_seconds()} s') # TODO: Set to debug level
self.exec_logger.warning(f'pulse {self._pulse}: total samples {len(_readings)}') # TODO: Set to debug level self.exec_logger.warning(f'pulse {self._pulse}: total samples {len(_readings)}') # TODO: Set to debug level
self.readings = np.array(_readings) self.readings = np.array(_readings)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment