From 99cc375b7b7352addd2dd952e4d2c70c340e3ef6 Mon Sep 17 00:00:00 2001 From: su530201 <olivier.kaufmann@umons.ac.be> Date: Tue, 2 May 2023 17:03:24 +0200 Subject: [PATCH] Fixes continuous mode in ohmpi_card_3_15 --- ohmpi.py | 390 ++++++------------------------------------------------- 1 file changed, 39 insertions(+), 351 deletions(-) diff --git a/ohmpi.py b/ohmpi.py index 9b9a9671..1a587b8e 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -433,10 +433,7 @@ class OhmPi(object): For a constant value, just set the tx_volt. tx_volt : float, optional (V3.0 only) If specified, voltage will be imposed. If 0, we will look - for the best voltage. If the best Tx cannot be found, no - measurement will be taken and values will be NaN. - best_tx_injtime : float, optional - (V3.0 only) Injection time in seconds used for finding the best voltage. + for the best voltage. cmd_id : str, optional Unique command identifier """ @@ -447,351 +444,45 @@ class OhmPi(object): if quad is None: quad = [0, 0, 0, 0] - if self.on_pi: # TODO : Remove this condition? - if nb_stack is None: - nb_stack = self.settings['nb_stack'] - if injection_duration is None: + if nb_stack is None: + nb_stack = self.settings['nb_stack'] + if injection_duration is None: injection_duration = self.settings['injection_duration'] - tx_volt = float(tx_volt) - - # inner variable initialization - sum_i = 0 - sum_vmn = 0 - sum_ps = 0 - - # # let's define the pin again as if we run through measure() - # # as it's run in another thread, it doesn't consider these - # # and this can lead to short circuit! - # - # self.pin0 = self.mcp_board.get_pin(0) - # self.pin0.direction = Direction.OUTPUT - # self.pin0.value = False - # self.pin1 = self.mcp_board.get_pin(1) - # self.pin1.direction = Direction.OUTPUT - # self.pin1.value = False - # self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement - # self.pin7.direction = Direction.OUTPUT - # self.pin7.value = False - - # if self.sequence is None: -# if self.idps: -# -# # self.switch_dps('on') -# self.pin2 = self.mcp_board.get_pin(2) # dsp + -# self.pin2.direction = Direction.OUTPUT -# self.pin2.value = True -# self.pin3 = self.mcp_board.get_pin(3) # dsp - -# self.pin3.direction = Direction.OUTPUT -# self.pin3.value = True -# time.sleep(4) -# -# self.pin5 = self.mcp_board.get_pin(5) #IHM on mesaurement -# self.pin5.direction = Direction.OUTPUT -# self.pin5.value = True -# self.pin6 = self.mcp_board.get_pin(6) #IHM on mesaurement -# self.pin6.direction = Direction.OUTPUT -# self.pin6.value = False -# self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement -# self.pin7.direction = Direction.OUTPUT -# self.pin7.value = False -# if self.idps: -# if self.DPS.read_register(0x05,2) < 11: -# self.pin7.value = True# max current allowed (100 mA for relays) #voltage -# -# # get best voltage to inject AND polarity -# if self.idps: -# tx_volt, polarity, Rab = self._compute_tx_volt( -# best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt) -# self.exec_logger.debug(f'Best VAB found is {tx_volt:.3f}V') -# else: -# polarity = 1 -# Rab = None -# -# # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) -# self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, -# address=self.ads_current_address, mode=0) -# self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, -# address=self.ads_voltage_address, mode=0) -# # turn on the power supply -# start_delay = None -# end_delay = None -# out_of_range = False -# if self.idps: -# if not np.isnan(tx_volt): -# self.DPS.write_register(0x0000, tx_volt, 2) # set tx voltage in V -# self.DPS.write_register(0x09, 1) # DPS5005 on -# time.sleep(0.3) -# else: -# self.exec_logger.debug('No best voltage found, will not take measurement') -# out_of_range = True -# -# if not out_of_range: # we found a Vab in the range so we measure -# if autogain: -# -# # compute autogain -# gain_voltage = [] -# for n in [0,1]: # make short cycle for gain computation -# self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, -# address=self.ads_voltage_address, mode=0) -# if n == 0: -# self.pin0.value = True -# self.pin1.value = False -# if self.board_version == 'mb.2023.0.0': -# self.pin6.value = True # IHM current injection led on -# else: -# self.pin0.value = False -# self.pin1.value = True # current injection nr2 -# if self.board_version == 'mb.2023.0.0': -# self.pin6.value = True # IHM current injection led on -# -# time.sleep(injection_duration) -# gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) -# -# if polarity > 0: -# if n == 0: -# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) -# else: -# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) -# else: -# if n == 0: -# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) -# else: -# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) -# -# self.pin0.value = False -# self.pin1.value = False -# time.sleep(injection_duration) -# if n == 0: -# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) -# else: -# gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) -# if self.board_version == 'mb.2023.0.0': -# self.pin6.value = False # IHM current injection led off -# -# self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, ' -# f'{gain_voltage[1]:.3f}') -# self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, -# address=self.ads_current_address, mode=0) -# -# self.pin0.value = False -# self.pin1.value = False -# -# # one stack = 2 half-cycles (one positive, one negative) -# pinMN = 0 if polarity > 0 else 2 # noqa -# -# # sampling for each stack at the end of the injection -# sampling_interval = 10 # ms # TODO: make this a config option -# self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy -# -# # full data for waveform -# fulldata = [] -# -# # we sample every 10 ms (as using AnalogIn for both current -# # and voltage takes about 7 ms). When we go over the injection -# # duration, we break the loop and truncate the meas arrays -# # only the last values in meas will be taken into account -# start_time = time.time() # start counter -# for n in range(0, nb_stack * 2): # for each half-cycles -# # current injection -# if (n % 2) == 0: -# self.pin0.value = True -# self.pin1.value = False -# if autogain: # select gain computed on first half cycle -# self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860, -# address=self.ads_voltage_address, mode=0) -# else: -# self.pin0.value = False -# self.pin1.value = True # current injection nr2 -# if autogain: # select gain computed on first half cycle -# self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage),data_rate=860, -# address=self.ads_voltage_address, mode=0) -# self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') -# if self.board_version == 'mb.2023.0.0': -# self.pin6.value = True # IHM current injection led on -# # measurement of current i and voltage u during injection -# meas = np.zeros((self.nb_samples, 3)) * np.nan -# start_delay = time.time() # stating measurement time -# dt = 0 -# k = 0 -# for k in range(0, self.nb_samples): -# # reading current value on ADS channels -# meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) -# if self.board_version == 'mb.2023.0.0': -# if pinMN == 0: -# meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 -# else: -# meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 -# elif self.board_version == '22.10': -# meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 -# # else: -# # self.exec_logger.debug('Unknown board') -# time.sleep(sampling_interval / 1000) -# dt = time.time() - start_delay # real injection time (s) -# meas[k, 2] = time.time() - start_time -# if dt > (injection_duration - 0 * sampling_interval / 1000.): -# break -# -# # stop current injection -# self.pin0.value = False -# self.pin1.value = False -# # if autogain: # select gain computed on first half cycle -# # self.ads_voltage = ads.ADS1115(self.i2c, gain=gain_voltage[2],data_rate=860, -# # address=self.ads_voltage_address, mode=0) -# self.pin6.value = False# IHM current injection led on -# end_delay = time.time() -# -# # truncate the meas array if we didn't fill the last samples #TODO: check why -# meas = meas[:k + 1] -# -# # measurement of current i and voltage u during off time -# measpp = np.zeros((meas.shape[0], 3)) * np.nan -# start_delay = time.time() # stating measurement time -# dt = 0 -# for k in range(0, measpp.shape[0]): -# # reading current value on ADS channels -# measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt) -# if self.board_version == 'mb.2023.0.0': -# if pinMN == 0: -# measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. -# else: -# measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1 -# elif self.board_version == '22.10': -# measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000. -# else: -# self.exec_logger.debug('unknown board') -# time.sleep(sampling_interval / 1000) -# dt = time.time() - start_delay # real injection time (s) -# measpp[k, 2] = time.time() - start_time -# if dt > (injection_duration - 0 * sampling_interval / 1000.): -# break -# -# end_delay = time.time() -# -# # truncate the meas array if we didn't fill the last samples -# measpp = measpp[:k + 1] -# -# # we alternate on which ADS1115 pin we measure because of sign of voltage -# if pinMN == 0: -# pinMN = 2 # noqa -# else: -# pinMN = 0 # noqa -# -# # store data for full wave form -# fulldata.append(meas) -# fulldata.append(measpp) -# -# # TODO get battery voltage and warn if battery is running low -# # TODO send a message on SOH stating the battery level -# -# # let's do some calculation (out of the stacking loop) -# -# # i_stack = np.empty(2 * nb_stack, dtype=object) -# # vmn_stack = np.empty(2 * nb_stack, dtype=object) -# i_stack, vmn_stack = [], [] -# # select appropriate window length to average the readings -# window = int(np.min([f.shape[0] for f in fulldata[::2]]) // 3) -# for n, meas in enumerate(fulldata[::2]): -# # take average from the samples per stack, then sum them all -# # average for the last third of the stacked values -# # is done outside the loop -# i_stack.append(meas[-int(window):, 0]) -# vmn_stack.append(meas[-int(window):, 1]) -# -# sum_i = sum_i + (np.mean(meas[-int(meas.shape[0] // 3):, 0])) -# vmn1 = np.mean(meas[-int(meas.shape[0] // 3), 1]) -# if (n % 2) == 0: -# sum_vmn = sum_vmn - vmn1 -# sum_ps = sum_ps + vmn1 -# else: -# sum_vmn = sum_vmn + vmn1 -# sum_ps = sum_ps + vmn1 -# -# else: -# sum_i = np.nan -# sum_vmn = np.nan -# sum_ps = np.nan -# fulldata = None -# -# if self.idps: -# self.DPS.write_register(0x0000, 0, 2) # reset to 0 volt -# self.DPS.write_register(0x09, 0) # DPS5005 off -# -# # reshape full data to an array of good size -# # we need an array of regular size to save in the csv -# if not out_of_range: -# fulldata = np.vstack(fulldata) -# # we create a big enough array given nb_samples, number of -# # half-cycles (1 stack = 2 half-cycles), and twice as we -# # measure decay as well -# a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 3)) * np.nan -# a[:fulldata.shape[0], :] = fulldata -# fulldata = a -# else: -# np.array([[]]) -# -# vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) -# vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) -# i_stack_mean = np.mean(i_stack) -# i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) -# r_stack_mean = vmn_stack_mean / i_stack_mean -# r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean -# ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) - - # create a dictionary and compute averaged values from all stacks - # if self.board_version == 'mb.2023.0.0': - # d = { - # "time": datetime.now().isoformat(), - # "A": quad[0], - # "B": quad[1], - # "M": quad[2], - # "N": quad[3], - # "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - # "Vmn [mV]": sum_vmn / (2 * nb_stack), - # "I [mA]": sum_i / (2 * nb_stack), - # "R [ohm]": sum_vmn / sum_i, - # "Ps [mV]": sum_ps / (2 * nb_stack), - # "nbStack": nb_stack, - # "Tx [V]": tx_volt if not out_of_range else 0., - # "CPU temp [degC]": self._hw.cpu_temperature, - # "Nb samples [-]": self.nb_samples, - # "fulldata": fulldata, - # "I_stack [mA]": i_stack_mean, - # "I_std [mA]": i_std, - # "I_per_stack [mA]": np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), - # "Vmn_stack [mV]": vmn_stack_mean, - # "Vmn_std [mV]": vmn_std, - # "Vmn_per_stack [mV]": np.array([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)]), - # "R_stack [ohm]": r_stack_mean, - # "R_std [ohm]": r_stack_std, - # "R_per_stack [Ohm]": np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), - # "PS_per_stack [mV]": np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)]), - # "PS_stack [mV]": ps_stack_mean, - # "R_ab [ohm]": Rab - # } - # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)])) - # elif self.board_version == '22.10': - # d = { - # "time": datetime.now().isoformat(), - # "A": quad[0], - # "B": quad[1], - # "M": quad[2], - # "N": quad[3], - # "inj time [ms]": (end_delay - start_delay) * 1000. if not out_of_range else 0., - # "Vmn [mV]": sum_vmn / (2 * nb_stack), - # "I [mA]": sum_i / (2 * nb_stack), - # "R [ohm]": sum_vmn / sum_i, - # "Ps [mV]": sum_ps / (2 * nb_stack), - # "nbStack": nb_stack, - # "Tx [V]": tx_volt if not out_of_range else 0., - # "CPU temp [degC]": CPUTemperature().temperature, - # "Nb samples [-]": self.nb_samples, - # "fulldata": fulldata, - # } - d = {} - - else: # for testing, generate random data - d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], - 'R [ohm]': np.abs(np.random.randn(1)).tolist()} + tx_volt = float(tx_volt) + + self.switch_mux_on(quad, cmd_id) + self._hw.vab_square_wave(tx_volt, cycle_length=injection_duration*2, cycles=nb_stack) + self.switch_mux_off(quad, cmd_id) + + d = { + "time": datetime.now().isoformat(), + "A": quad[0], + "B": quad[1], + "M": quad[2], + "N": quad[3], + "inj time [ms]": injection_duration, # NOTE: check this + # "Vmn [mV]": sum_vmn / (2 * nb_stack), + # "I [mA]": sum_i / (2 * nb_stack), + # "R [ohm]": sum_vmn / sum_i, + "Ps [mV]": self._hw.sp, + "nbStack": nb_stack, + "Tx [V]": tx_volt, + "CPU temp [degC]": self._hw.controller.cpu_temperature, + "Nb samples [-]": len(self._hw.readings), # TODO: use only samples after a delay in each pulse + "fulldata": self._hw.readings[:,[0,-2,-1]], + # "I_stack [mA]": i_stack_mean, + # "I_std [mA]": i_std, + # "I_per_stack [mA]": np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), + # "Vmn_stack [mV]": vmn_stack_mean, + # "Vmn_std [mV]": vmn_std, + # "Vmn_per_stack [mV]": np.array([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)]), + # "R_stack [ohm]": r_stack_mean, + # "R_std [ohm]": r_stack_std, + # "R_per_stack [Ohm]": np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), + # "PS_per_stack [mV]": np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)]), + # "PS_stack [mV]": ps_stack_mean, + # "R_ab [ohm]": Rab + } # to the data logger dd = d.copy() @@ -808,9 +499,6 @@ class OhmPi(object): dd['cmd_id'] = str(cmd_id) self.data_logger.info(dd) - # self.pin5.value = False #IHM led on measurement off - # if self.sequence is None : - # self.switch_dps('off') return d -- GitLab