Commit b435bbb7 authored by Arnaud WATLET's avatar Arnaud WATLET
Browse files

Revert "Adds mcp_board_address in ohmpi_config"

This reverts commit 3370f9ac
No related merge requests found
Showing with 293 additions and 280 deletions
+293 -280
...@@ -22,8 +22,7 @@ OHMPI_CONFIG = { ...@@ -22,8 +22,7 @@ OHMPI_CONFIG = {
'max_elec': 64, 'max_elec': 64,
'board_addresses': {'A': 0x73, 'B': 0x72, 'M': 0x71, 'N': 0x70}, # CHECK IF YOUR BOARDS HAVE THESE ADDRESSES 'board_addresses': {'A': 0x73, 'B': 0x72, 'M': 0x71, 'N': 0x70}, # CHECK IF YOUR BOARDS HAVE THESE ADDRESSES
'settings': 'ohmpi_settings.json', # INSERT YOUR FAVORITE SETTINGS FILE HERE 'settings': 'ohmpi_settings.json', # INSERT YOUR FAVORITE SETTINGS FILE HERE
'board_version': 'mb.2023.0.0',#,'22.10' 'board_version': 'mb.2023.0.0'#,'22.10'
'mcp_board_address': '0x20'
} # TODO: add a dictionary with INA models and associated gain values } # TODO: add a dictionary with INA models and associated gain values
# SET THE LOGGING LEVELS, MQTT BROKERS AND MQTT OPTIONS ACCORDING TO YOUR NEEDS # SET THE LOGGING LEVELS, MQTT BROKERS AND MQTT OPTIONS ACCORDING TO YOUR NEEDS
......
...@@ -118,8 +118,8 @@ class OhmPi(object): ...@@ -118,8 +118,8 @@ class OhmPi(object):
self.i2c = busio.I2C(board.SCL, board.SDA) # noqa self.i2c = busio.I2C(board.SCL, board.SDA) # noqa
# I2C connexion to MCP23008, for current injection # I2C connexion to MCP23008, for current injection
self.mcp = MCP23008(self.i2c, address=self.mcp_board_address) self.MCPIHM = MCP23008(self.i2c, address=0x24)
self.pin4 = self.mcp.get_pin(4) # Ohmpi_run self.pin4 = self.MCPIHM.get_pin(4) # Ohmpi_run
self.pin4.direction = Direction.OUTPUT self.pin4.direction = Direction.OUTPUT
self.pin4.value = True self.pin4.value = True
...@@ -133,13 +133,13 @@ class OhmPi(object): ...@@ -133,13 +133,13 @@ class OhmPi(object):
# current injection module # current injection module
if self.idps: if self.idps:
self.pin2 = self.mcp.get_pin(2) # dsp + self.pin2 = self.MCPIHM.get_pin(2) # dsp +
self.pin2.direction = Direction.OUTPUT self.pin2.direction = Direction.OUTPUT
self.pin2.value = True self.pin2.value = True
self.pin3 = self.mcp.get_pin(3) # dsp - self.pin3 = self.MCPIHM.get_pin(3) # dsp -
self.pin3.direction = Direction.OUTPUT self.pin3.direction = Direction.OUTPUT
self.pin3.value = True self.pin3.value = True
time.sleep(3) time.sleep(4)
self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal)
self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc
self.DPS.serial.bytesize = 8 # self.DPS.serial.bytesize = 8 #
...@@ -147,7 +147,7 @@ class OhmPi(object): ...@@ -147,7 +147,7 @@ class OhmPi(object):
self.DPS.debug = False # self.DPS.debug = False #
self.DPS.serial.parity = 'N' # No parity self.DPS.serial.parity = 'N' # No parity
self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode
self.DPS.write_register(0x0001, 100, 0) # max current allowed (100 mA for relays) self.DPS.write_register(0x0001, 1000, 0) # max current allowed (100 mA for relays)
print(self.DPS.read_register(0x05,2 )) # max current allowed (100 mA for relays) #voltage print(self.DPS.read_register(0x05,2 )) # max current allowed (100 mA for relays) #voltage
self.pin2.value = False self.pin2.value = False
...@@ -155,10 +155,10 @@ class OhmPi(object): ...@@ -155,10 +155,10 @@ class OhmPi(object):
# (last number) 0 is for mA, 3 is for A # (last number) 0 is for mA, 3 is for A
# injection courant and measure (TODO check if it works, otherwise back in run_measurement()) # injection courant and measure (TODO check if it works, otherwise back in run_measurement())
self.pin0 = self.mcp.get_pin(0) self.pin0 = self.MCPIHM.get_pin(0)
self.pin0.direction = Direction.OUTPUT self.pin0.direction = Direction.OUTPUT
self.pin0.value = False self.pin0.value = False
self.pin1 = self.mcp.get_pin(1) self.pin1 = self.MCPIHM.get_pin(1)
self.pin1.direction = Direction.OUTPUT self.pin1.direction = Direction.OUTPUT
self.pin1.value = False self.pin1.value = False
...@@ -305,67 +305,152 @@ class OhmPi(object): ...@@ -305,67 +305,152 @@ class OhmPi(object):
volt = 5. volt = 5.
# redefined the pin of the mcp (needed when relays are connected) # redefined the pin of the mcp (needed when relays are connected)
self.pin0 = self.mcp.get_pin(0) self.pin0 = self.MCPIHM.get_pin(0)
self.pin0.direction = Direction.OUTPUT self.pin0.direction = Direction.OUTPUT
self.pin0.value = False self.pin0.value = False
self.pin1 = self.mcp.get_pin(1) self.pin1 = self.MCPIHM.get_pin(1)
self.pin1.direction = Direction.OUTPUT self.pin1.direction = Direction.OUTPUT
self.pin1.value = False self.pin1.value = False
# select a polarity to start with # select a polarity to start with
self.pin0.value = True self.pin0.value = True
self.pin1.value = False self.pin1.value = False
I=0
vmn=0
count=0 if strategy == 'constant':
vab = volt
while I < 3 or abs(vmn) < 10 : # I supérieur à 1 mA et Vmn surpérieur
if count >0 : elif strategy == 'vmax':
volt = volt + 2 # implement different strategy
count=count+1 I=0
if volt > 50: vmn=0
break count=0
# set voltage for test while I < 3 or abs(vmn) < 20 : #TODO: hardware related - place in config
self.DPS.write_register(0x0000, volt, 2)
self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(best_tx_injtime) # inject for given tx time
# autogain if count > 0 :
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) print('o', volt)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) volt = volt + 2
gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) print('>', volt)
gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) count=count+1
gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) if volt > 50:
gain_voltage = np.min([gain_voltage0, gain_voltage2]) break
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) # set voltage for test
# we measure the voltage on both A0 and A2 to guess the polarity if count==1:
I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current self.DPS.write_register(0x09, 1) # DPS5005 on
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage time.sleep(best_tx_injtime) # inject for given tx time
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa self.DPS.write_register(0x0000, volt, 2)
# autogain
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address)
gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0))
gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))
gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))
gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address)
# we measure the voltage on both A0 and A2 to guess the polarity
I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa
# check polarity
polarity = 1 # by default, we guessed it right
vmn = U0
if U0 < 0: # we guessed it wrong, let's use a correction factor
polarity = -1
vmn = U2
# check polarity n = 0
polarity = 1 # by default, we guessed it right while (abs(vmn) > voltage_max or I > current_max) and volt>0: #If starting voltage is too high, need to lower it down
vmn = U0 # print('we are out of range! so decreasing volt')
if U0 < 0: # we guessed it wrong, let's use a correction factor volt = volt - 2
polarity = -1 self.DPS.write_register(0x0000, volt, 2)
vmn = U2 #self.DPS.write_register(0x09, 1) # DPS5005 on
if abs(vmn)>4500 or I> 45 : #time.sleep(best_tx_injtime)
volt = volt - 2 I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt
self.DPS.write_register(0x0000, volt, 2) U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
self.DPS.write_register(0x09, 1) # DPS5005 on U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.
time.sleep(best_tx_injtime) polarity = 1 # by default, we guessed it right
I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt vmn = U0
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. if U0 < 0: # we guessed it wrong, let's use a correction factor
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.
polarity = 1 # by default, we guessed it right
vmn = U0
if U0 < 0: # we guessed it wrong, let's use a correction factor
polarity = -1 polarity = -1
vmn = U2 vmn = U2
break n+=1
if n > 25 :
break
factor_I = (current_max) / I
factor_vmn = voltage_max / vmn
factor = factor_I
if factor_I > factor_vmn:
factor = factor_vmn
vab = factor * volt * 0.8
if vab > tx_max:
vab = tx_max
print(factor_I, factor_vmn, 'factor!!')
elif strategy == 'vmin':
# implement different strategy
I=20
vmn=400
count=0
while I > 10 or abs(vmn) > 300 : #TODO: hardware related - place in config
if count > 0 :
volt = volt - 2
print(volt, count)
count=count+1
if volt > 50:
break
# set voltage for test
if count==1:
self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(best_tx_injtime) # inject for given tx time
self.DPS.write_register(0x0000, volt, 2)
# autogain
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address)
gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0))
gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))
gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))
gain_voltage = np.min([gain_voltage0, gain_voltage2]) #TODO: separate gain for P0 and P2
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address)
# we measure the voltage on both A0 and A2 to guess the polarity
I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa
# check polarity
polarity = 1 # by default, we guessed it right
vmn = U0
if U0 < 0: # we guessed it wrong, let's use a correction factor
polarity = -1
vmn = U2
n=0
while (abs(vmn) < voltage_min or I < current_min) and volt > 0 : #If starting voltage is too high, need to lower it down
# print('we are out of range! so increasing volt')
volt = volt + 2
print(volt)
self.DPS.write_register(0x0000, volt, 2)
#self.DPS.write_register(0x09, 1) # DPS5005 on
#time.sleep(best_tx_injtime)
I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt
U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000.
U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000.
polarity = 1 # by default, we guessed it right
vmn = U0
if U0 < 0: # we guessed it wrong, let's use a correction factor
polarity = -1
vmn = U2
n+=1
if n > 25 :
break
vab = volt
self.DPS.write_register(0x09, 0) # DPS5005 off self.DPS.write_register(0x09, 0) # DPS5005 off
# print('polarity', polarity) # print('polarity', polarity)
...@@ -377,27 +462,11 @@ class OhmPi(object): ...@@ -377,27 +462,11 @@ class OhmPi(object):
self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms')
# implement different strategy
if strategy == 'vmax':
factor_I = (current_max) / I
factor_vmn = voltage_max / vmn
factor = factor_I
if factor_I > factor_vmn:
factor = factor_vmn
vab = factor * volt * 0.9
if vab > tx_max:
vab = tx_max
elif strategy == 'constant':
vab = volt
else:
vab = 5
# self.DPS.write_register(0x09, 0) # DPS5005 off # self.DPS.write_register(0x09, 0) # DPS5005 off
self.pin0.value = False self.pin0.value = False
self.pin1.value = False self.pin1.value = False
return vab, polarity return vab, polarity, Rab
@staticmethod @staticmethod
def _find_identical_in_line(quads): def _find_identical_in_line(quads):
...@@ -647,12 +716,8 @@ class OhmPi(object): ...@@ -647,12 +716,8 @@ class OhmPi(object):
self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes
self.board_addresses = OHMPI_CONFIG['board_addresses'] self.board_addresses = OHMPI_CONFIG['board_addresses']
self.board_version = OHMPI_CONFIG['board_version'] self.board_version = OHMPI_CONFIG['board_version']
self.mux_board_version = OHMPI_CONFIG['mux_board_version']
self.mux_addressing_table = OHMPI_CONFIG['mux_addressing_table']
self.mcp_board_address = OHMPI_CONFIG['mcp_board_address']
self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}')
def read_quad(self, **kwargs): def read_quad(self, **kwargs):
warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning) warnings.warn('This function is deprecated. Use load_sequence instead.', DeprecationWarning)
self.load_sequence(**kwargs) self.load_sequence(**kwargs)
...@@ -737,52 +802,54 @@ class OhmPi(object): ...@@ -737,52 +802,54 @@ class OhmPi(object):
# let's define the pin again as if we run through measure() # let's define the pin again as if we run through measure()
# as it's run in another thread, it doesn't consider these # as it's run in another thread, it doesn't consider these
# and this can lead to short circuit! # and this can lead to short circuit!
self.pin0 = self.mcp.get_pin(0)
self.pin0 = self.MCPIHM.get_pin(0)
self.pin0.direction = Direction.OUTPUT self.pin0.direction = Direction.OUTPUT
self.pin0.value = False self.pin0.value = False
self.pin1 = self.mcp.get_pin(1) self.pin1 = self.MCPIHM.get_pin(1)
self.pin1.direction = Direction.OUTPUT self.pin1.direction = Direction.OUTPUT
self.pin1.value = False self.pin1.value = False
self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement self.pin7 = self.MCPIHM.get_pin(7) #IHM on mesaurement
self.pin7.direction = Direction.OUTPUT self.pin7.direction = Direction.OUTPUT
self.pin7.value = False self.pin7.value = False
if self.sequence is None : if self.sequence is None :
self.pin2 = self.mcp.get_pin(2) # dsp + if self.idps:
self.pin2.direction = Direction.OUTPUT self.pin2 = self.MCPIHM.get_pin(2) # dsp +
self.pin2.value = True self.pin2.direction = Direction.OUTPUT
self.pin3 = self.mcp.get_pin(3) # dsp - self.pin2.value = True
self.pin3.direction = Direction.OUTPUT self.pin3 = self.MCPIHM.get_pin(3) # dsp -
self.pin3.value = True self.pin3.direction = Direction.OUTPUT
self.pin3.value = True
self.pin5 = self.mcp.get_pin(5) #IHM on mesaurement time.sleep(5)
self.pin5 = self.MCPIHM.get_pin(5) #IHM on mesaurement
self.pin5.direction = Direction.OUTPUT self.pin5.direction = Direction.OUTPUT
self.pin5.value = True self.pin5.value = True
self.pin6 = self.mcp.get_pin(6) #IHM on mesaurement self.pin6 = self.MCPIHM.get_pin(6) #IHM on mesaurement
self.pin6.direction = Direction.OUTPUT self.pin6.direction = Direction.OUTPUT
self.pin6.value = False self.pin6.value = False
self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement self.pin7 = self.MCPIHM.get_pin(7) #IHM on mesaurement
self.pin7.direction = Direction.OUTPUT self.pin7.direction = Direction.OUTPUT
self.pin7.value = False self.pin7.value = False
if self.idps:
if self.DPS.read_register(0x05,2) < 11: if self.DPS.read_register(0x05,2) < 11:
self.pin7.value = True# max current allowed (100 mA for relays) #voltage self.pin7.value = True# max current allowed (100 mA for relays) #voltage
# get best voltage to inject AND polarity # get best voltage to inject AND polarity
if self.idps: if self.idps:
tx_volt, polarity = self._compute_tx_volt( tx_volt, polarity, Rab = self._compute_tx_volt(
best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt)
self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V') self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V')
else: else:
polarity = 1 polarity = 1
Rab = None
# first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous)
self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860,
address=self.ads_current_address, mode=0) address=self.ads_current_address, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860,
address=self.ads_voltage_address, mode=0) address=self.ads_voltage_address, mode=0)
# turn on the power supply # turn on the power supply
start_delay = None start_delay = None
end_delay = None end_delay = None
...@@ -791,40 +858,43 @@ class OhmPi(object): ...@@ -791,40 +858,43 @@ class OhmPi(object):
if not np.isnan(tx_volt): if not np.isnan(tx_volt):
self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V
self.DPS.write_register(0x09, 1) # DPS5005 on self.DPS.write_register(0x09, 1) # DPS5005 on
time.sleep(0.05) time.sleep(0.3)
else: else:
self.exec_logger.debug('No best voltage found, will not take measurement') self.exec_logger.debug('No best voltage found, will not take measurement')
out_of_range = True out_of_range = True
if not out_of_range: # we found a Vab in the range so we measure if not out_of_range: # we found a Vab in the range so we measure
if autogain: if autogain:
if self.board_version == 'mb.2023.0.0':
# compute autogain # compute autogain
self.pin0.value = True gain_voltage = []
self.pin1.value = False for n in [0,1]: # make short cycle for gain computation
self.pin6.value = True # IHM current injection led on if n == 0:
self.pin0.value = True
self.pin1.value = False
if self.board_version == 'mb.2023.0.0':
self.pin6.value = True # IHM current injection led on
else:
self.pin0.value = False
self.pin1.value = True # current injection nr2
if self.board_version == 'mb.2023.0.0':
self.pin6.value = True # IHM current injection led on
time.sleep(injection_duration) time.sleep(injection_duration)
gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0))
if polarity > 0: if polarity > 0:
gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)))
else: else:
gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)))
self.pin0.value = False self.pin0.value = False
self.pin1.value = False self.pin1.value = False
self.pin6.value = False # IHM current injection led off if self.board_version == 'mb.2023.0.0':
self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage:.3f}') self.pin6.value = False # IHM current injection led off
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860,
address=self.ads_current_address, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860,
address=self.ads_voltage_address, mode=0)
elif self.board_version == '22.10':
gain_current = 2 / 3
gain_voltage = 2 / 3
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860,
address=self.ads_current_address, mode=0)
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860,
address=self.ads_voltage_address, mode=0)
self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, '
f'{gain_voltage[1]:.3f}')
self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860,
address=self.ads_current_address, mode=0)
self.pin0.value = False self.pin0.value = False
self.pin1.value = False self.pin1.value = False
...@@ -833,8 +903,8 @@ class OhmPi(object): ...@@ -833,8 +903,8 @@ class OhmPi(object):
pinMN = 0 if polarity > 0 else 2 # noqa pinMN = 0 if polarity > 0 else 2 # noqa
# sampling for each stack at the end of the injection # sampling for each stack at the end of the injection
sampling_interval = 10 # ms sampling_interval = 10 # ms # TODO: make this a config option
self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy
# full data for waveform # full data for waveform
fulldata = [] fulldata = []
...@@ -849,13 +919,18 @@ class OhmPi(object): ...@@ -849,13 +919,18 @@ class OhmPi(object):
if (n % 2) == 0: if (n % 2) == 0:
self.pin0.value = True self.pin0.value = True
self.pin1.value = False self.pin1.value = False
self.pin6.value = True# IHM current injection led on if autogain: # select gain computed on first half cycle
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[0], data_rate=860,
address=self.ads_voltage_address, mode=0)
else: else:
self.pin0.value = False self.pin0.value = False
self.pin1.value = True # current injection nr2 self.pin1.value = True # current injection nr2
self.pin6.value = True# IHM current injection led on if autogain: # select gain computed on first half cycle
self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[1], data_rate=860,
address=self.ads_voltage_address, mode=0)
self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}')
if self.board_version == 'mb.2023.0.0':
self.pin6.value = True # IHM current injection led on
# measurement of current i and voltage u during injection # measurement of current i and voltage u during injection
meas = np.zeros((self.nb_samples, 3)) * np.nan meas = np.zeros((self.nb_samples, 3)) * np.nan
start_delay = time.time() # stating measurement time start_delay = time.time() # stating measurement time
...@@ -885,7 +960,7 @@ class OhmPi(object): ...@@ -885,7 +960,7 @@ class OhmPi(object):
self.pin6.value = False# IHM current injection led on self.pin6.value = False# IHM current injection led on
end_delay = time.time() end_delay = time.time()
# truncate the meas array if we didn't fill the last samples # truncate the meas array if we didn't fill the last samples #TODO: check why
meas = meas[:k + 1] meas = meas[:k + 1]
# measurement of current i and voltage u during off time # measurement of current i and voltage u during off time
...@@ -929,10 +1004,19 @@ class OhmPi(object): ...@@ -929,10 +1004,19 @@ class OhmPi(object):
# TODO send a message on SOH stating the battery level # TODO send a message on SOH stating the battery level
# let's do some calculation (out of the stacking loop) # let's do some calculation (out of the stacking loop)
# i_stack = np.empty(2 * nb_stack, dtype=object)
# vmn_stack = np.empty(2 * nb_stack, dtype=object)
i_stack, vmn_stack = [], []
# select appropriate window length to average the readings
window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3)
for n, meas in enumerate(fulldata[::2]): for n, meas in enumerate(fulldata[::2]):
# take average from the samples per stack, then sum them all # take average from the samples per stack, then sum them all
# average for the last third of the stacked values # average for the last third of the stacked values
# is done outside the loop # is done outside the loop
i_stack.append(meas[-int(window):, 0])
vmn_stack.append(meas[-int(window):, 1])
sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0]))
vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1])
if (n % 2) == 0: if (n % 2) == 0:
...@@ -965,43 +1049,64 @@ class OhmPi(object): ...@@ -965,43 +1049,64 @@ class OhmPi(object):
else: else:
np.array([[]]) np.array([[]])
# create a dictionary and compute averaged values from all stacks vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)])
if self.board_version == 'mb.2023.0.0': vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])])
d = { i_stack_mean = np.mean(i_stack)
"time": datetime.now().isoformat(), i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])]))
"A": quad[0], r_stack_mean = vmn_stack_mean / i_stack_mean
"B": quad[1], r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean
"M": quad[2], ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)]))
"N": quad[3],
"inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0.,
"Vmn [mV]": sum_vmn / (2 * nb_stack),
"I [mA]": sum_i / (2 * nb_stack),
"R [ohm]": sum_vmn / sum_i,
"Ps [mV]": sum_ps / (2 * nb_stack),
"nbStack": nb_stack,
"Tx [V]": tx_volt if not out_of_range else 0.,
"CPU temp [degC]": CPUTemperature().temperature,
"Nb samples [-]": self.nb_samples,
"fulldata": fulldata,
}
elif self.board_version == '22.10':
d = {
"time": datetime.now().isoformat(),
"A": quad[0],
"B": quad[1],
"M": quad[2],
"N": quad[3],
"inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0.,
"Vmn [mV]": sum_vmn / (2 * nb_stack),
"I [mA]": sum_i / (2 * nb_stack),
"R [ohm]": sum_vmn / sum_i,
"Ps [mV]": sum_ps / (2 * nb_stack),
"nbStack": nb_stack,
"Tx [V]": tx_volt if not out_of_range else 0.,
"CPU temp [degC]": CPUTemperature().temperature,
"Nb samples [-]": self.nb_samples,
}
# create a dictionary and compute averaged values from all stacks
# if self.board_version == 'mb.2023.0.0':
d = {
"time": datetime.now().isoformat(),
"A": quad[0],
"B": quad[1],
"M": quad[2],
"N": quad[3],
"inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0.,
"Vmn [mV]": sum_vmn / (2 * nb_stack),
"I [mA]": sum_i / (2 * nb_stack),
"R [ohm]": sum_vmn / sum_i,
"Ps [mV]": sum_ps / (2 * nb_stack),
"nbStack": nb_stack,
"Tx [V]": tx_volt if not out_of_range else 0.,
"CPU temp [degC]": CPUTemperature().temperature,
"Nb samples [-]": self.nb_samples,
"fulldata": fulldata,
"I_stack [mA]": i_stack_mean,
"I_std [mA]": i_std,
"I_per_stack [mA]": np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]),
"Vmn_stack [mV]": vmn_stack_mean,
"Vmn_std [mV]": vmn_std,
"Vmn_per_stack [mV]": np.array([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)]),
"R_stack [ohm]": r_stack_mean,
"R_std [ohm]": r_stack_std,
"R_per_stack [Ohm]": np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]),
"PS_per_stack [mV]": np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)]),
"PS_stack [mV]": ps_stack_mean,
"R_ab [ohm]": Rab
}
# print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)]))
# elif self.board_version == '22.10':
# d = {
# "time": datetime.now().isoformat(),
# "A": quad[0],
# "B": quad[1],
# "M": quad[2],
# "N": quad[3],
# "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0.,
# "Vmn [mV]": sum_vmn / (2 * nb_stack),
# "I [mA]": sum_i / (2 * nb_stack),
# "R [ohm]": sum_vmn / sum_i,
# "Ps [mV]": sum_ps / (2 * nb_stack),
# "nbStack": nb_stack,
# "Tx [V]": tx_volt if not out_of_range else 0.,
# "CPU temp [degC]": CPUTemperature().temperature,
# "Nb samples [-]": self.nb_samples,
# "fulldata": fulldata,
# }
else: # for testing, generate random data else: # for testing, generate random data
d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3],
...@@ -1086,19 +1191,17 @@ class OhmPi(object): ...@@ -1086,19 +1191,17 @@ class OhmPi(object):
self.exec_logger.debug(f'Status: {self.status}') self.exec_logger.debug(f'Status: {self.status}')
self.exec_logger.debug(f'Measuring sequence: {self.sequence}') self.exec_logger.debug(f'Measuring sequence: {self.sequence}')
t0 = time.time() t0 = time.time()
self.pin2 = self.mcp.get_pin(2) # dsp - self.reset_mux()
self.pin2.direction = Direction.OUTPUT
self.pin2.value = True
self.pin3 = self.mcp.get_pin(3) # dsp -
self.pin3.direction = Direction.OUTPUT
self.pin3.value = True
# create filename with timestamp # create filename with timestamp
filename = self.settings["export_path"].replace('.csv', filename = self.settings["export_path"].replace('.csv',
f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv')
self.exec_logger.debug(f'Saving to {filename}') self.exec_logger.debug(f'Saving to {filename}')
# make sure all multiplexer are off # make sure all multiplexer are off
#self.reset_mux()
# measure all quadrupole of the sequence # measure all quadrupole of the sequence
if self.sequence is None: if self.sequence is None:
...@@ -1115,6 +1218,14 @@ class OhmPi(object): ...@@ -1115,6 +1218,14 @@ class OhmPi(object):
# call the switch_mux function to switch to the right electrodes # call the switch_mux function to switch to the right electrodes
self.switch_mux_on(quad) self.switch_mux_on(quad)
self.mcp = MCP23008(self.i2c, address=0x24)
self.pin2 = self.MCPIHM.get_pin(2) # dsp -
self.pin2.direction = Direction.OUTPUT
self.pin2.value = True
self.pin3 = self.MCPIHM.get_pin(3) # dsp -
self.pin3.direction = Direction.OUTPUT
self.pin3.value = True
time.sleep (5)
# run a measurement # run a measurement
if self.on_pi: if self.on_pi:
...@@ -1151,10 +1262,9 @@ class OhmPi(object): ...@@ -1151,10 +1262,9 @@ class OhmPi(object):
# save data and print in a text file # save data and print in a text file
self.append_and_save(filename, acquired_data) self.append_and_save(filename, acquired_data)
self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}')
self.pin2.value = True self.pin2.value = False
self.pin3.value = True self.pin3.value = False
self.status = 'idle' self.status = 'idle'
return acquired_data
def run_sequence_async(self, cmd_id=None, **kwargs): def run_sequence_async(self, cmd_id=None, **kwargs):
"""Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'. """Runs the sequence in a separate thread. Can be stopped by 'OhmPi.interrupt()'.
...@@ -1289,15 +1399,16 @@ class OhmPi(object): ...@@ -1289,15 +1399,16 @@ class OhmPi(object):
role : str role : str
Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board. Either 'A', 'B', 'M' or 'N', so we can assign it to a MUX board.
""" """
if not self.use_mux or not self.on_pi: if not self.use_mux or not self.on_pi:
if not self.on_pi: if not self.on_pi:
self.exec_logger.warning('Cannot reset mux while in simulation mode...') self.exec_logger.warning('Cannot reset mux while in simulation mode...')
else: else:
self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.' self.exec_logger.warning('You cannot use the multiplexer because use_mux is set to False.'
' Set use_mux to True to use the multiplexer...') ' Set use_mux to True to use the multiplexer...')
elif self.sequence is None: elif self.sequence is None and not self.use_mux:
self.exec_logger.warning('Unable to switch MUX without a sequence') self.exec_logger.warning('Unable to switch MUX without a sequence')
elif self.mux_board_version == '2023.0.0': else:
# choose with MUX board # choose with MUX board
tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role]) tca = adafruit_tca9548a.TCA9548A(self.i2c, self.board_addresses[role])
...@@ -1309,109 +1420,18 @@ class OhmPi(object): ...@@ -1309,109 +1420,18 @@ class OhmPi(object):
if i2c_address is not None: if i2c_address is not None:
# select the MCP23017 of the selected MUX board # select the MCP23017 of the selected MUX board
mcp2 = MCP23017(tca[i2c_address]) mcp2 = MCP23017(tca[i2c_address])
mcp2.get_pin(relay_nr - 1).direction = digitalio.Direction.OUTPUT mcp2.get_pin(relay_nr).direction = digitalio.Direction.OUTPUT
if state == 'on': if state == 'on':
mcp2.get_pin(relay_nr - 1).value = True mcp2.get_pin(relay_nr).value = True
else: else:
mcp2.get_pin(relay_nr - 1).value = False mcp2.get_pin(relay_nr).value = False
self.exec_logger.debug(f'Switching relay {relay_nr} ' self.exec_logger.debug(f'Switching relay {relay_nr} '
f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}') f'({str(hex(self.board_addresses[role]))}) {state} for electrode {electrode_nr}')
else: else:
self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}') self.exec_logger.warning(f'Unable to address electrode nr {electrode_nr}')
elif self.mux_board_version == '2024.0.0':
mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table)
with open(mux_addressing_table_file, 'r') as myfile:
header = myfile.readlines()[0].strip('\n').split(',')
mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str,
delimiter=',', skip_header=1, )
mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))}
def set_relay_state(mcp, mcp_pin, state=True):
pin_enable = mcp.get_pin(mcp_pin)
pin_enable.direction = Direction.OUTPUT
pin_enable.value = state
mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16)
mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16)
idx = np.where((mux_addressing_table['Electrode_id'] == electrode_nr) & (mux_addressing_table['Role'] == role))[0]
tca_addr = mux_addressing_table['TCA_address'][idx][0]
tca_channel = mux_addressing_table['TCA_channel'][idx][0]
mcp_gpio = mux_addressing_table['MCP_GPIO'][idx][0]
if tca_addr == 'None':
tca = self.i2c
else:
tca = adafruit_tca9548a.TCA9548A(self.i2c, int(tca_addr,16))[tca_channel]
mcp_addr = int(mux_addressing_table['MCP_address'][idx][0], 16)
mcp = MCP23017(tca, address=mcp_addr)
if state == 'on':
print('opening gpio nr', mcp_gpio)
print('opening electrode nr', electrode_nr)
print('opening role', role)
print('opening MCP', mux_addressing_table['MCP_address'][idx][0])
set_relay_state(mcp, mcp_gpio, True)
if state == 'off':
set_relay_state(mcp, mcp_gpio, False)
else:
self.exec_logger.warning('MUX board version not recognized')
def _switch_mux_quad(self, electrodes, state, roles):
"""Selects the right channel for the multiplexer cascade for a given electrode.
Parameters
----------
electrodes : np.array or list
Electrode indexes to be switched on or off.
state : str
Either 'on' or 'off'.
roles : np.array or list
Array of roles 'A', 'B', 'M' or 'N', so we can assign them to a MUX board.
"""
mux_addressing_table_file = os.path.join("MUX_board",self.mux_board_version,"relay_board_32",self.mux_addressing_table)
with open(mux_addressing_table_file, 'r') as myfile:
header = myfile.readlines()[0].strip('\n').split(',')
mux_addressing_table = np.genfromtxt(mux_addressing_table_file, dtype=str,
delimiter=',', skip_header=1, )
mux_addressing_table = {header[k]: mux_addressing_table.T[k] for k in range(len(header))}
mux_addressing_table['Electrode_id'] = mux_addressing_table['Electrode_id'].astype(np.uint16)
mux_addressing_table['MCP_GPIO'] = mux_addressing_table['MCP_GPIO'].astype(np.uint16)
def set_relay_state(mcp, mcp_pin, state=True):
pin_enable = mcp.get_pin(mcp_pin)
pin_enable.direction = Direction.OUTPUT
pin_enable.value = state
i2c = busio.I2C(board.SCL, board.SDA)
mcp_list = np.array([])
tca_addr = np.array([])
tca_channel = np.array([])
mcp_gpio = np.array([])
mcp_addr = np.array([])
for i in range(len(electrodes)):
idx = np.where((mux_addressing_table['Electrode_id'] == electrodes[i]) & (mux_addressing_table['Role'] == roles[i]))[0]
tca_addr = np.append(tca_addr,mux_addressing_table['TCA_address'][idx][0])
tca_channel = np.append(tca_channel,mux_addressing_table['TCA_channel'][idx][0])
mcp_addr = np.append(mcp_addr,int(mux_addressing_table['MCP_address'][idx][0], 16))
mcp_gpio = np.append(mcp_gpio,int(mux_addressing_table['MCP_GPIO'][idx][0]))
if tca_addr[i] == 'None':
tca = i2c
else:
tca = adafruit_tca9548a.TCA9548A(i2c, tca_addr[i])
mcp_list = np.append(mcp_list,MCP23017(tca, address=int(mcp_addr[i])))
mux_list = np.column_stack([tca_addr,tca_channel,mcp_addr])
mcp_to_address, mcp_idx, mcp_counts = np.unique(mux_list, return_index=True, return_counts=True, axis=0)
for j,mcp in enumerate(mcp_to_address):
for i,mux in enumerate(mux_list):
if np.array_equal(mux,mcp):
mcp_tmp = mcp_list[mcp_idx[j]]
if state == 'on':
set_relay_state(mcp_tmp,int(mcp_gpio[i]), True)
elif state == 'off':
set_relay_state(mcp_tmp,int(mcp_gpio[i]), False)
def switch_mux_on(self, quadrupole, cmd_id=None): def switch_mux_on(self, quadrupole, cmd_id=None):
"""Switches on multiplexer relays for given quadrupole. """Switches on multiplexer relays for given quadrupole.
...@@ -1425,12 +1445,9 @@ class OhmPi(object): ...@@ -1425,12 +1445,9 @@ class OhmPi(object):
roles = ['A', 'B', 'M', 'N'] roles = ['A', 'B', 'M', 'N']
# another check to be sure A != B # another check to be sure A != B
if quadrupole[0] != quadrupole[1]: if quadrupole[0] != quadrupole[1]:
if self.mux_board_version == '2024.0.0': for i in range(0, 4):
self._switch_mux_quad(quadrupole, 'on', roles) if quadrupole[i] > 0:
else: self._switch_mux(quadrupole[i], 'on', roles[i])
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'on', roles[i])
else: else:
self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!') self.exec_logger.error('Not switching MUX : A == B -> short circuit risk detected!')
...@@ -1445,12 +1462,9 @@ class OhmPi(object): ...@@ -1445,12 +1462,9 @@ class OhmPi(object):
List of 4 integers representing the electrode numbers. List of 4 integers representing the electrode numbers.
""" """
roles = ['A', 'B', 'M', 'N'] roles = ['A', 'B', 'M', 'N']
if self.mux_board_version == '2024.0.0': for i in range(0, 4):
self._switch_mux_quad(quadrupole, 'off', roles) if quadrupole[i] > 0:
else: self._switch_mux(quadrupole[i], 'off', roles[i])
for i in range(0, 4):
if quadrupole[i] > 0:
self._switch_mux(quadrupole[i], 'off', roles[i])
def test_mux(self, activation_time=1.0, address=0x70): def test_mux(self, activation_time=1.0, address=0x70):
"""Interactive method to test the multiplexer. """Interactive method to test the multiplexer.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment