From 95adb832ce7b3b2b1f616277b707215180d10f31 Mon Sep 17 00:00:00 2001 From: Clement Remi <remi.clement@irstea.fr> Date: Tue, 14 Feb 2023 13:44:16 +0100 Subject: [PATCH] Replace ohmpi.py --- ohmpi.py | 165 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 96 insertions(+), 69 deletions(-) diff --git a/ohmpi.py b/ohmpi.py index fc58a7fc..f097a520 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -119,6 +119,9 @@ class OhmPi(object): # I2C connexion to MCP23008, for current injection self.mcp = MCP23008(self.i2c, address=0x20) + self.pin4 = self.mcp.get_pin(4) # Ohmpi_run + self.pin4.direction = Direction.OUTPUT + self.pin4.value = True # ADS1115 for current measurement (AB) self.ads_current_address = 0x48 @@ -130,6 +133,13 @@ class OhmPi(object): # current injection module if self.idps: + self.pin2 = self.mcp.get_pin(2) # dsp + + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + time.sleep(3) self.DPS = minimalmodbus.Instrument(port='/dev/ttyUSB0', slaveaddress=1) # port name, address (decimal) self.DPS.serial.baudrate = 9600 # Baud rate 9600 as listed in doc self.DPS.serial.bytesize = 8 # @@ -137,7 +147,11 @@ class OhmPi(object): self.DPS.debug = False # self.DPS.serial.parity = 'N' # No parity self.DPS.mode = minimalmodbus.MODE_RTU # RTU mode - self.DPS.write_register(0x0001, 40, 0) # max current allowed (36 mA for relays) + self.DPS.write_register(0x0001, 100, 0) # max current allowed (100 mA for relays) + print(self.DPS.read_register(0x05,2 )) # max current allowed (100 mA for relays) #voltage + + self.pin2.value = False + self.pin3.value = False # (last number) 0 is for mA, 3 is for A # injection courant and measure (TODO check if it works, otherwise back in run_measurement()) @@ -262,7 +276,6 @@ class OhmPi(object): Time in milliseconds for the half-cycle used to compute Rab. strategy : str, optional Either: - - vmin : compute Vab to reach a minimum Iab and Vmn - vmax : compute Vab to reach a maximum Iab and Vmn - constant : apply given Vab tx_volt : float, optional @@ -306,7 +319,7 @@ class OhmPi(object): vmn=0 count=0 - while I < 1 or abs(vmn)<5 : # I supérieur à 1 mA et Vmn surpérieur + while I < 3 or abs(vmn) < 10 : # I supérieur à 1 mA et Vmn surpérieur if count >0 : volt = volt + 2 count=count+1 @@ -316,7 +329,7 @@ class OhmPi(object): self.DPS.write_register(0x0000, volt, 2) self.DPS.write_register(0x09, 1) # DPS5005 on time.sleep(best_tx_injtime) # inject for given tx time - + # autogain self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_current_address) self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, address=self.ads_voltage_address) @@ -324,31 +337,37 @@ class OhmPi(object): gain_voltage0 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P0)) gain_voltage2 = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) gain_voltage = np.min([gain_voltage0, gain_voltage2]) - # print('gain current: {:.3f}, gain voltage: {:.3f}'.format(gain_current, gain_voltage)) self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address) self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address) - # we measure the voltage on both A0 and A2 to guess the polarity I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt # noqa measure current U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. # noqa measure voltage U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. # noqa - self.DPS.write_register(0x09, 0) # DPS5005 off - - - # print('I (mV)', I*50*self.r_shunt) - # print('I (mA)', I) - # print('U0 (mV)', U0) - # print('U2 (mV)', U2) - # check polarity polarity = 1 # by default, we guessed it right vmn = U0 if U0 < 0: # we guessed it wrong, let's use a correction factor polarity = -1 vmn = U2 - - + if abs(vmn)>4500 or I> 45 : + volt = volt - 2 + self.DPS.write_register(0x0000, volt, 2) + self.DPS.write_register(0x09, 1) # DPS5005 on + time.sleep(best_tx_injtime) + I = AnalogIn(self.ads_current, ads.P0).voltage * 1000. / 50 / self.r_shunt + U0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + U2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + + polarity = 1 # by default, we guessed it right + vmn = U0 + if U0 < 0: # we guessed it wrong, let's use a correction factor + polarity = -1 + vmn = U2 + break + + + self.DPS.write_register(0x09, 0) # DPS5005 off # print('polarity', polarity) self.pin0.value = False self.pin1.value = False @@ -359,43 +378,16 @@ class OhmPi(object): self.exec_logger.debug(f'Rab = {Rab:.2f} Ohms') # implement different strategy - if strategy == 'vmax_rc': - factor_I = current_max / I - factor_vmn = voltage_max / Vmn - factor= factor_I + if strategy == 'vmax': + factor_I = (current_max) / I + factor_vmn = voltage_max / vmn + factor = factor_I if factor_I > factor_vmn: factor = factor_vmn - Vab = factor * tx_volt - if Vab > tx_max: - Vab=tx_max - - - if strategy == 'vmax': - vmn_max = c * current_max - if voltage_max > vmn_max > voltage_min: - vab = current_max * Rab - self.exec_logger.debug('target max current') - else: - iab = voltage_max / c - vab = iab * Rab - self.exec_logger.debug('target max voltage') - if vab > 25000.: - vab = 25000. - vab = vab / 1000. * 0.9 - - elif strategy == 'vmin': - vmn_min = c * current_min - if voltage_min < vmn_min < voltage_max: - vab = current_min * Rab - self.exec_logger.debug('target min current') - else: - iab = voltage_min / c - vab = iab * Rab - self.exec_logger.debug('target min voltage') - if vab < 1000.: - vab = 1000. - vab = vab / 1000. * 1.1 - + vab = factor * volt * 0.95 + if vab > tx_max: + vab = tx_max + elif strategy == 'constant': vab = volt else: @@ -558,7 +550,6 @@ class OhmPi(object): self.exec_logger.info(f'Sequence {filename} of {sequence.shape[0]:d} quadrupoles loaded.') else: self.exec_logger.warning(f'Unable to load sequence {filename}') - self.sequence = sequence def measure(self, **kwargs): @@ -573,7 +564,6 @@ class OhmPi(object): message : str message containing a command and arguments or keywords and arguments """ - status = False cmd_id = '?' try: @@ -656,7 +646,7 @@ class OhmPi(object): self.version = OHMPI_CONFIG['version'] # hardware version self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes self.board_addresses = OHMPI_CONFIG['board_addresses'] - self.board_version = OHMPI_CONFIG['measurement board_version'] + self.board_version = OHMPI_CONFIG['board_version'] self.exec_logger.debug(f'OHMPI_CONFIG = {str(OHMPI_CONFIG)}') def read_quad(self, **kwargs): @@ -710,9 +700,7 @@ class OhmPi(object): resolution of the reading. strategy : str, optional (V3.0 only) If we search for best voltage (tx_volt == 0), we can choose - different strategy: - - vmin: find the lowest voltage that gives us a signal - - vmax: find the highest voltage that stays in the range + vmax strategy : find the highest voltage that stays in the range For a constant value, just set the tx_volt. tx_volt : float, optional (V3.0 only) If specified, voltage will be imposed. If 0, we will look @@ -751,7 +739,32 @@ class OhmPi(object): self.pin1 = self.mcp.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False - + self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement + self.pin7.direction = Direction.OUTPUT + self.pin7.value = False + + + if self.sequence == None : + self.pin2 = self.mcp.get_pin(2) # dsp + + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True + + self.pin5 = self.mcp.get_pin(5) #IHM on mesaurement + self.pin5.direction = Direction.OUTPUT + self.pin5.value = True + self.pin6 = self.mcp.get_pin(6) #IHM on mesaurement + self.pin6.direction = Direction.OUTPUT + self.pin6.value = False + self.pin7 = self.mcp.get_pin(7) #IHM on mesaurement + self.pin7.direction = Direction.OUTPUT + self.pin7.value = False + + if self.DPS.read_register(0x05,2) < 11: + self.pin7.value = True# max current allowed (100 mA for relays) #voltage + # get best voltage to inject AND polarity if self.idps: tx_volt, polarity = self._compute_tx_volt( @@ -781,10 +794,11 @@ class OhmPi(object): if not out_of_range: # we found a Vab in the range so we measure if autogain: - if self.board_version == '22.11': + if self.board_version == 'mb.2023.0.0': # compute autogain self.pin0.value = True self.pin1.value = False + self.pin6.value = True # IHM current injection led on time.sleep(injection_duration) gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) if polarity > 0: @@ -793,12 +807,13 @@ class OhmPi(object): gain_voltage = self._gain_auto(AnalogIn(self.ads_voltage, ads.P2)) self.pin0.value = False self.pin1.value = False + self.pin6.value = False # IHM current injection led off self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage:.3f}') self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address, mode=0) self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage, data_rate=860, address=self.ads_voltage_address, mode=0) - elif self.board_version == 'mb.2023.0.0': + elif self.board_version == '22.10': gain_current = 2 / 3 gain_voltage = 2 / 3 self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, @@ -830,9 +845,11 @@ class OhmPi(object): if (n % 2) == 0: self.pin0.value = True self.pin1.value = False + self.pin6.value = True# IHM current injection led on else: self.pin0.value = False self.pin1.value = True # current injection nr2 + self.pin6.value = True# IHM current injection led on self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') # measurement of current i and voltage u during injection @@ -843,12 +860,12 @@ class OhmPi(object): for k in range(0, self.nb_samples): # reading current value on ADS channels meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) - if self.board_version == '22.11': + if self.board_version == 'mb.2023.0.0': if pinMN == 0: meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 else: meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 - elif self.board_version == 'mb.2023.0.0': + elif self.board_version == '22.10': meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 # else: # self.exec_logger.debug('Unknown board') @@ -861,6 +878,7 @@ class OhmPi(object): # stop current injection self.pin0.value = False self.pin1.value = False + self.pin6.value = False# IHM current injection led on end_delay = time.time() # truncate the meas array if we didn't fill the last samples @@ -873,12 +891,12 @@ class OhmPi(object): for k in range(0, measpp.shape[0]): # reading current value on ADS channels measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt) - if self.board_version == '22.11': + if self.board_version == 'mb.2023.0.0': if pinMN == 0: measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. else: measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1 - elif self.board_version == 'mb.2023.0.0': + elif self.board_version == '22.10': measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000. else: self.exec_logger.debug('unknown board') @@ -944,7 +962,7 @@ class OhmPi(object): np.array([[]]) # create a dictionary and compute averaged values from all stacks - if self.board_version == '22.11': + if self.board_version == 'mb.2023.0.0': d = { "time": datetime.now().isoformat(), "A": quad[0], @@ -962,7 +980,7 @@ class OhmPi(object): "Nb samples [-]": self.nb_samples, "fulldata": fulldata, } - elif self.board_version == 'mb.2023.0.0': + elif self.board_version == '22.10': d = { "time": datetime.now().isoformat(), "A": quad[0], @@ -1000,7 +1018,10 @@ class OhmPi(object): dd['cmd_id'] = str(cmd_id) self.data_logger.info(dd) - + self.pin5.value = False #IHM led on measurement off + if self.sequence == None : + self.pin2.value = False # DSP + off + self.pin3.value = False # DSP - off return d def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs): @@ -1061,7 +1082,12 @@ class OhmPi(object): self.exec_logger.debug(f'Status: {self.status}') self.exec_logger.debug(f'Measuring sequence: {self.sequence}') t0 = time.time() - + self.pin2 = self.mcp.get_pin(2) # dsp - + self.pin2.direction = Direction.OUTPUT + self.pin2.value = True + self.pin3 = self.mcp.get_pin(3) # dsp - + self.pin3.direction = Direction.OUTPUT + self.pin3.value = True # create filename with timestamp filename = self.settings["export_path"].replace('.csv', f'_{datetime.now().strftime("%Y%m%dT%H%M%S")}.csv') @@ -1121,7 +1147,8 @@ class OhmPi(object): # save data and print in a text file self.append_and_save(filename, acquired_data) self.exec_logger.debug(f'quadrupole {i + 1:d}/{n:d}') - + self.pin2.value = True + self.pin3.value = True self.status = 'idle' def run_sequence_async(self, cmd_id=None, **kwargs): -- GitLab