diff --git a/ohmpi.py b/ohmpi.py index 5743f788ebd76fa9448a47ba912309301a3a699d..1058ad7d321b3dd103414d63a74886c89c410d4a 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -223,7 +223,8 @@ class OhmPi(object): ' Use python/ipython to interact with OhmPi object...') @staticmethod - def append_and_save(filename: str, last_measurement: dict, cmd_id=None): + + def append_and_save_new(filename: str, last_measurement: dict, cmd_id=None): """Appends and saves the last measurement dict. Parameters @@ -243,9 +244,13 @@ class OhmPi(object): idic = dict(zip(['i' + str(i) for i in range(n)], d[:, 0])) udic = dict(zip(['u' + str(i) for i in range(n)], d[:, 1])) tdic = dict(zip(['t' + str(i) for i in range(n)], d[:, 2])) + uxdic = dict(zip(['ux' + str(i) for i in range(n)], d[:, 3])) + uydic = dict(zip(['uy' + str(i) for i in range(n)], d[:, 4])) last_measurement.update(idic) last_measurement.update(udic) last_measurement.update(tdic) + last_measurement.update(uxdic) + last_measurement.update(uydic) last_measurement.pop('fulldata') if os.path.isfile(filename): @@ -784,7 +789,7 @@ class OhmPi(object): else: self.exec_logger.warning('Not on Raspberry Pi, skipping reboot...') - def run_measurement(self, quad=None, nb_stack=None, injection_duration=None, + def run_measurement_new(self, quad=None, nb_stack=None, injection_duration=None, autogain=True, strategy='constant', tx_volt=5, best_tx_injtime=0.1, duty_cycle=0.5, cmd_id=None): """Measures on a quadrupole and returns transfer resistance. @@ -841,46 +846,45 @@ class OhmPi(object): # let's define the pin again as if we run through measure() # as it's run in another thread, it doesn't consider these # and this can lead to short circuit! - + self.pin0 = self.mcp_board.get_pin(0) self.pin0.direction = Direction.OUTPUT self.pin0.value = False self.pin1 = self.mcp_board.get_pin(1) self.pin1.direction = Direction.OUTPUT self.pin1.value = False - self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement + self.pin7 = self.mcp_board.get_pin(7) # IHM on mesaurement self.pin7.direction = Direction.OUTPUT self.pin7.value = False - + if self.sequence is None: if self.idps: - # self.switch_dps('on') - self.pin2 = self.mcp_board.get_pin(2) # dsp + + self.pin2 = self.mcp_board.get_pin(2) # dsp + self.pin2.direction = Direction.OUTPUT self.pin2.value = True - self.pin3 = self.mcp_board.get_pin(3) # dsp - + self.pin3 = self.mcp_board.get_pin(3) # dsp - self.pin3.direction = Direction.OUTPUT self.pin3.value = True time.sleep(4) - - self.pin5 = self.mcp_board.get_pin(5) #IHM on mesaurement + + self.pin5 = self.mcp_board.get_pin(5) # IHM on mesaurement self.pin5.direction = Direction.OUTPUT self.pin5.value = True - self.pin6 = self.mcp_board.get_pin(6) #IHM on mesaurement + self.pin6 = self.mcp_board.get_pin(6) # IHM on mesaurement self.pin6.direction = Direction.OUTPUT self.pin6.value = False - self.pin7 = self.mcp_board.get_pin(7) #IHM on mesaurement + self.pin7 = self.mcp_board.get_pin(7) # IHM on mesaurement self.pin7.direction = Direction.OUTPUT - self.pin7.value = False - if self.idps: - if self.DPS.read_register(0x05,2) < 11: - self.pin7.value = True# max current allowed (100 mA for relays) #voltage - + self.pin7.value = False + if self.idps: + if self.DPS.read_register(0x05, 2) < 11: + self.pin7.value = True # max current allowed (100 mA for relays) #voltage + # get best voltage to inject AND polarity if self.idps: tx_volt, polarity, Rab = self._compute_tx_volt( - best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt,autogain=autogain) + best_tx_injtime=best_tx_injtime, strategy=strategy, tx_volt=tx_volt, autogain=autogain) self.exec_logger.debug(f'Best vab found is {tx_volt:.3f}V') else: polarity = 1 @@ -888,9 +892,9 @@ class OhmPi(object): # first reset the gain to 2/3 before trying to find best gain (mode 0 is continuous) self.ads_current = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, - address=self.ads_current_address, mode=0) + address=self.ads_current_address, mode=0) self.ads_voltage = ads.ADS1115(self.i2c, gain=2 / 3, data_rate=860, - address=self.ads_voltage_address, mode=0) + address=self.ads_voltage_address, mode=0) # turn on the power supply start_delay = None end_delay = None @@ -907,48 +911,49 @@ class OhmPi(object): if not out_of_range: # we found a Vab in the range so we measure gain = 2 / 3 self.ads_voltage = ads.ADS1115(self.i2c, gain=gain, data_rate=860, - address=self.ads_voltage_address, mode=0) + address=self.ads_voltage_address, mode=0) if autogain: # compute autogain gain_voltage = [] - for n in [0,1]: # make short cycle for gain computation + for n in [0, 1]: # make short cycle for gain computation if n == 0: self.pin0.value = True self.pin1.value = False if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on + self.pin6.value = True # IHM current injection led on else: self.pin0.value = False self.pin1.value = True # current injection nr2 if self.board_version == 'mb.2023.0.0': - self.pin6.value = True # IHM current injection led on + self.pin6.value = True # IHM current injection led on - time.sleep(injection_duration) + time.sleep(best_tx_injtime) gain_current = self._gain_auto(AnalogIn(self.ads_current, ads.P0)) - - if polarity > 0: - if n == 0: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) - else: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) - else: - if n == 0: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) - else: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + # if polarity > 0: + # if n == 0: + # gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + # else: + # gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + # else: + # if n == 0: + # gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + # else: + # gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) self.pin0.value = False self.pin1.value = False - time.sleep(injection_duration) - if n == 0: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) - else: - gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) + time.sleep(best_tx_injtime) + # if n == 0: + # gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P0))) + # else: + # gain_voltage.append(self._gain_auto(AnalogIn(self.ads_voltage, ads.P2))) if self.board_version == 'mb.2023.0.0': - self.pin6.value = False # IHM current injection led off + self.pin6.value = False # IHM current injection led off gain = np.min(gain_voltage) self.exec_logger.debug(f'Gain current: {gain_current:.3f}, gain voltage: {gain_voltage[0]:.3f}, ' - f'{gain_voltage[1]:.3f}') + f'{gain_voltage[1]:.3f}') self.ads_current = ads.ADS1115(self.i2c, gain=gain_current, data_rate=860, address=self.ads_current_address, mode=0) @@ -960,7 +965,7 @@ class OhmPi(object): # sampling for each stack at the end of the injection sampling_interval = 10 # ms # TODO: make this a config option - self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 #TODO: check this strategy + self.nb_samples = int(injection_duration * 1000 // sampling_interval) + 1 # TODO: check this strategy # full data for waveform fulldata = [] @@ -975,20 +980,20 @@ class OhmPi(object): if (n % 2) == 0: self.pin0.value = True self.pin1.value = False - if autogain: # select gain computed on first half cycle + if autogain: # select gain computed on first half cycle self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860, - address=self.ads_voltage_address, mode=0) + address=self.ads_voltage_address, mode=0) else: self.pin0.value = False self.pin1.value = True # current injection nr2 - if autogain: # select gain computed on first half cycle - self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage),data_rate=860, - address=self.ads_voltage_address, mode=0) + if autogain: # select gain computed on first half cycle + self.ads_voltage = ads.ADS1115(self.i2c, gain=np.min(gain_voltage), data_rate=860, + address=self.ads_voltage_address, mode=0) self.exec_logger.debug(f'Stack {n} {self.pin0.value} {self.pin1.value}') if self.board_version == 'mb.2023.0.0': self.pin6.value = True # IHM current injection led on # measurement of current i and voltage u during injection - meas = np.zeros((self.nb_samples, 3)) * np.nan + meas = np.zeros((self.nb_samples, 5)) * np.nan start_delay = time.time() # stating measurement time dt = 0 k = 0 @@ -996,10 +1001,20 @@ class OhmPi(object): # reading current value on ADS channels meas[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000) / (50 * self.r_shunt) if self.board_version == 'mb.2023.0.0': - if pinMN == 0: - meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000 - else: - meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P2).voltage * 1000 + # if pinMN == 0: + # meas[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + # meas[k, 3] = meas[k, 1] + # meas[k, 4] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0 + # else: + # meas[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0 + # meas[k, 4] = meas[k, 1] + # meas[k, 3] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + u0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + u2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + u = np.max([u0, u2]) * (np.heaviside(u0 - u2, 1.) * 2 - 1.) + meas[k, 1] = u + meas[k, 3] = u0 + meas[k, 4] = u2 *-1.0 elif self.board_version == '22.10': meas[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000 # else: @@ -1014,14 +1029,14 @@ class OhmPi(object): self.pin0.value = False self.pin1.value = False if self.board_version == 'mb.2023.0.0': - self.pin6.value = False# IHM current injection led on + self.pin6.value = False # IHM current injection led on end_delay = time.time() # truncate the meas array if we didn't fill the last samples #TODO: check why meas = meas[:k + 1] # measurement of current i and voltage u during off time - measpp = np.zeros((int(meas.shape[0]*(1/duty_cycle-1)), 3)) * np.nan + measpp = np.zeros((int(meas.shape[0] * (1 / duty_cycle - 1)), 5)) * np.nan time.sleep(sampling_interval / 1000) start_delay_off = time.time() # stating measurement time dt = 0 @@ -1029,10 +1044,20 @@ class OhmPi(object): # reading current value on ADS channels measpp[k, 0] = (AnalogIn(self.ads_current, ads.P0).voltage * 1000.) / (50 * self.r_shunt) if self.board_version == 'mb.2023.0.0': - if pinMN == 0: - measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. - else: - measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0 + # if pinMN == 0: + # measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + # measpp[k, 3] = measpp[k, 1] + # measpp[k, 4] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0 + # else: + # measpp[k, 3] = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + # measpp[k, 1] = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. * -1.0 + # measpp[k, 4] = measpp[k, 1] + u0 = AnalogIn(self.ads_voltage, ads.P0).voltage * 1000. + u2 = AnalogIn(self.ads_voltage, ads.P2).voltage * 1000. + u = np.max([u0, u2]) * (np.heaviside(u0 - u2, 1.) * 2 - 1.) + measpp[k, 1] = u + measpp[k, 3] = u0 + measpp[k, 4] = u2*-1.0 elif self.board_version == '22.10': measpp[k, 1] = -AnalogIn(self.ads_voltage, ads.P0, ads.P1).voltage * self.coef_p2 * 1000. else: @@ -1101,19 +1126,22 @@ class OhmPi(object): # we create a big enough array given nb_samples, number of # half-cycles (1 stack = 2 half-cycles), and twice as we # measure decay as well - a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 3)) * np.nan + a = np.zeros((nb_stack * self.nb_samples * 2 * 2, 5)) * np.nan a[:fulldata.shape[0], :] = fulldata fulldata = a else: np.array([[]]) - vmn_stack_mean = np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) - vmn_std =np.sqrt(np.std(vmn_stack[::2])**2 + np.std(vmn_stack[1::2])**2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) + vmn_stack_mean = np.mean( + [np.diff(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) / 2 for i in range(nb_stack)]) + vmn_std = np.sqrt(np.std(vmn_stack[::2]) ** 2 + np.std( + vmn_stack[1::2]) ** 2) # np.sum([np.std(vmn_stack[::2]),np.std(vmn_stack[1::2])]) i_stack_mean = np.mean(i_stack) i_std = np.mean(np.array([np.std(i_stack[::2]), np.std(i_stack[1::2])])) r_stack_mean = vmn_stack_mean / i_stack_mean - r_stack_std = np.sqrt((vmn_std/vmn_stack_mean)**2 + (i_std/i_stack_mean)**2) * r_stack_mean - ps_stack_mean = np.mean(np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) + r_stack_std = np.sqrt((vmn_std / vmn_stack_mean) ** 2 + (i_std / i_stack_mean) ** 2) * r_stack_mean + ps_stack_mean = np.mean( + np.array([np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)])) # create a dictionary and compute averaged values from all stacks # if self.board_version == 'mb.2023.0.0': @@ -1135,18 +1163,23 @@ class OhmPi(object): "fulldata": fulldata, "I_stack [mA]": i_stack_mean, "I_std [mA]": i_std, - "I_per_stack [mA]": np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), + "I_per_stack [mA]": np.array([np.mean(i_stack[i * 2:i * 2 + 2]) for i in range(nb_stack)]), "Vmn_stack [mV]": vmn_stack_mean, "Vmn_std [mV]": vmn_std, - "Vmn_per_stack [mV]": np.array([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1))[0] / 2 for i in range(nb_stack)]), + "Vmn_per_stack [mV]": np.array( + [np.diff(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1))[0] / 2 for i in range(nb_stack)]), "R_stack [ohm]": r_stack_mean, "R_std [ohm]": r_stack_std, - "R_per_stack [ohm]": np.mean([np.diff(np.mean(vmn_stack[i*2:i*2+2], axis=1)) / 2 for i in range(nb_stack)]) / np.array([np.mean(i_stack[i*2:i*2+2]) for i in range(nb_stack)]), - "PS_per_stack [mV]": np.array([np.mean(np.mean(vmn_stack[i*2:i*2+2], axis=1)) for i in range(nb_stack)]), + "R_per_stack [ohm]": np.mean( + [np.diff(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) / 2 for i in range(nb_stack)]) / np.array( + [np.mean(i_stack[i * 2:i * 2 + 2]) for i in range(nb_stack)]), + "PS_per_stack [mV]": np.array( + [np.mean(np.mean(vmn_stack[i * 2:i * 2 + 2], axis=1)) for i in range(nb_stack)]), "PS_stack [mV]": ps_stack_mean, - "R_ab [ohm]": Rab + "R_ab [ohm]": Rab, + "Gain_Vmn": gain } - # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)])) + # print(np.array([(vmn_stack[i*2:i*2+2]) for i in range(nb_stack)])) # elif self.board_version == '22.10': # d = { # "time": datetime.now().isoformat(), @@ -1168,7 +1201,7 @@ class OhmPi(object): else: # for testing, generate random data d = {'time': datetime.now().isoformat(), 'A': quad[0], 'B': quad[1], 'M': quad[2], 'N': quad[3], - 'R [ohm]': np.abs(np.random.randn(1)).tolist()} + 'R [ohm]': np.abs(np.random.randn(1)).tolist()} # to the data logger dd = d.copy() @@ -1185,12 +1218,13 @@ class OhmPi(object): dd['cmd_id'] = str(cmd_id) self.data_logger.info(dd) - self.pin5.value = False #IHM led on measurement off - if self.sequence is None : + self.pin5.value = False # IHM led on measurement off + if self.sequence is None: self.switch_dps('off') return d + def run_multiple_sequences(self, cmd_id=None, sequence_delay=None, nb_meas=None, **kwargs): """Runs multiple sequences in a separate thread for monitoring mode. Can be stopped by 'OhmPi.interrupt()'.