From 563c6898138a7049db0bb14d3e037de71570ac30 Mon Sep 17 00:00:00 2001 From: Guillaume <sagitta1618@gmail.com> Date: Fri, 28 Oct 2022 18:36:19 +0200 Subject: [PATCH] add example code snippet + add test_ohmpi.py --- doc/source/V2_00.rst | 92 ++++++++++++++---------------- ohmpi.py | 15 +++-- test_ohmpi_mux.py | 133 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 183 insertions(+), 57 deletions(-) create mode 100644 test_ohmpi_mux.py diff --git a/doc/source/V2_00.rst b/doc/source/V2_00.rst index 06f7a30d..9885051a 100644 --- a/doc/source/V2_00.rst +++ b/doc/source/V2_00.rst @@ -114,58 +114,52 @@ files (.json and .py). .. code-block:: python - :caption: Example of using the Python API to control OhmPi - - - from ohmpi import OhmPi - k = OhmPi(idps=True) # if V3.0 make sure to set idps to True - # the DPS5005 is used in V3.0 to inject higher voltage - - # default parameters are stored in the pardict argument - # they can be manually changed - k.settings['injection_duration'] = 0.5 # injection time in seconds - k.settings['nb_stack'] = 1 # one stack is two half-cycles - k.settings['nbr_meas'] = 1 # number of time the sequence is repeated - - # without multiplexer, one can simple measure using - out = k.run_measurement() - # out contains information about the measurement and can be save as - k.append_and_save('out.csv', out) - - # custom or adaptative argument (see help of run_measurement()) - k.run_measurement(nb_stack=4, # do 4 stacks (8 half-cycles) - injection_duration=2, # inject for 2 seconds - autogain=True, # adapt gain of ADS to get good resolution - strategy='vmin', # inject min voltage for Vab (v3.0) - tx_volt=5) # vab for finding vmin or vab injectected - # if 'strategy' is 'constant' + :caption: Example of using the Python API to control OhmPi + + import os + import numpy as np + import time + os.chdir("/home/pi/OhmPi") + from ohmpi import OhmPi + + ### Define object from class OhmPi + k = OhmPi() # this load default parameters from the disk + + ### Default parameters can also be edited manually + k.settings['injection_duration'] = 0.5 # injection time in seconds + k.settings['nb_stack'] = 1 # one stack is two half-cycles + k.settings['nbr_meas'] = 1 # number of time the sequence is repeated + + ### Update settings if needed + k.update_settings({"injection_duration":0.2}) + + ### Set or load sequence + k.sequence = np.array([[1,2,3,4]]) #Â set numpy array of shape (n,4) + # k.set_sequence('1 2 3 4\n2 3 4 5') #Â call function set_sequence and pass a string + # k.load_sequence('ABMN.txt') # load sequence from a local file + + ### Run contact resistance check + #Â k.rs_check() + + ### Run sequence (synchronously - it will wait that all + # sequence is measured before returning the prompt + k.run_sequence() + # k.run_sequence_async() # sequence is run in a separate thread and the prompt returns immediately + # time.sleep(2) + # k.interrupt() # kill the asynchrone sequence + + ### Single measurement can also be taken with + k.switch_mux_on([1, 4, 2, 3]) + k.run_measuremen() # use default acquisition parameters + k.switch_mux_off([1, 4, 2, 3]) # don't forget this! risk of short-circuit + + ### Custom or adaptative argument (see help of run_measurement()) + k.run_measurement(nb_stack=4, # do 4 stacks (8 half-cycles) + injection_duration=2, # inject for 2 seconds + autogain=True) # adapt gain of ADS to get good resolution - # if a multiplexer is connected, we can also manually switch it - k.reset_mux() # check that all mux are closed (do this FIRST) - k.switch_mux_on([1, 4, 2, 3]) - k.run_measurement() - k.switch_mux_off([1, 4, 2, 3]) # don't forget this! risk of short-circuit - - # import a sequence - k.read_quad('sequence.txt') # four columns, no header, space as separator - print(k.sequence) # where the sequence is stored - - # rs check - k.rs_check() # run an RS check (check contact resistances) for all - # electrodes of the given sequence - - # run a sequence - k.measure() # measure accept same arguments as run_measurement() - # NOTE: this is an asynchronous command that runs in a separate thread - # after executing the command, the prompt will return immediately - # the asynchronous thread can be stopped during execution using - k.stop() - # otherwise, it will exit by itself at the end of the sequence - # if multiple measurement are to be taken, the sequence will be repeated - - ***MQTT interface*** Interface to communicate with the Pi designed for the Internet of Things (IoT). diff --git a/ohmpi.py b/ohmpi.py index 4fe8ac6e..32369fe2 100644 --- a/ohmpi.py +++ b/ohmpi.py @@ -201,14 +201,13 @@ class OhmPi(object): self.update_settings(config) def update_settings(self, config): - """Update acquisition settings from a json file or dictionary. - Parameters can be: - - nb_electrodes (number of electrode used, if 4, no MUX needed) - - injection_duration (in seconds) - - nb_meas (total number of times the sequence will be run) - - sequence_delay (delay in second between each sequence run) - - nb_stack (number of stack for each quadrupole measurement) - - export_path (path where to export the data, timestamp will be added to filename) + """Update acquisition settings from a json file or dictionary. Parameters can be: + - nb_electrodes (number of electrode used, if 4, no MUX needed) + - injection_duration (in seconds) + - nb_meas (total number of times the sequence will be run) + - sequence_delay (delay in second between each sequence run) + - nb_stack (number of stack for each quadrupole measurement) + - export_path (path where to export the data, timestamp will be added to filename) Parameters ---------- diff --git a/test_ohmpi_mux.py b/test_ohmpi_mux.py new file mode 100644 index 00000000..8883e9cb --- /dev/null +++ b/test_ohmpi_mux.py @@ -0,0 +1,133 @@ +# test ohmpi and multiplexer on test resistances +from ohmpi import OhmPi +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import os + +# configure testing +idps = False +board_version = '22.10' # v2.0 +use_mux = True +start_elec = 1 # start elec +nelec = 16 # max elec in the sequence for testing + +# testing measurement board only +k = OhmPi(idps=idps, use_mux=use_mux) +k.reset_mux() # just for safety +out1 = k.run_measurement(injection_duration=0.25, nb_stack=4, strategy='constant', tx_volt=12, autogain=True) +out2 = k.run_measurement(injection_duration=0.5, nb_stack=2, strategy='vmin', tx_volt=5, autogain=True) +out3 = k.run_measurement(injection_duration=1, nb_stack=1, strategy='vmax', tx_volt=5, autogain=True) + +# visual figure of the full wave form +fig, axs = plt.subplots(2, 1, sharex=True) +ax = axs[0] +labels = ['constant', 'vmin', 'vmax'] +for i, out in enumerate([out1, out2, out3]): + data = out['fulldata'] + inan = ~(np.isnan(out['fulldata']).any(1)) + ax.plot(data[inan,2], data[inan,0], '.-', label=labels[i]) +ax.set_ylabel('Current AB [mA]') +ax.legend() +ax = axs[1] +for i, out in enumerate([out1, out2, out3]): + data = out['fulldata'] + inan = ~(np.isnan(out['fulldata']).any(1)) + ax.plot(data[inan,2], data[inan,1], '.-', label=labels[i]) +ax.set_ylabel('Voltage MN [mV]') +ax.set_xlabel('Time [s]') +fig.savefig('check-fullwave.jpg') + + +# test a sequence + +# nelec electrodes Wenner sequence +a = np.arange(nelec-3) + start_elec +b = a + 3 +m = a + 1 +n = a + 2 +seq = np.c_[a, b, m, n] + +# manually edit default settings +k.settings['injection_duration'] = 1 +k.settings['nb_stack'] = 1 +#k.settings['nbr_meas'] = 1 +k.sequence = seq +k.reset_mux() + +# set quadrupole manually +k.switch_mux_on([1, 4, 2, 3]) +out = k.run_measurement(quad=[3, 3, 3, 3], nb_stack=1, tx_volt=12, strategy='constant', autogain=True) +k.switch_mux_off([1, 4, 2, 3]) +print(out) + +# run rs_check() and save data +k.rs_check() # check all electrodes of the sequence + +# check values measured +fname = sorted(os.listdir('data/'))[-1] +print(fname) +dfrs = pd.read_csv('data/' + fname) +fig, ax = plt.subplots() +ax.hist(dfrs['R [Ohm]']/1000) +ax.set_xticks(np.arange(df.shape[0])) +ax.set_xticklabels(df['A'].str + ' - ' + df['B'].str) +ax.set_ylabel('Contact resistances [kOhm]') +fig.tight_layout() +fig.savefig('check-rs.jpg') + +# run sequence synchronously and save data to file +k.run_sequence(nb_stack=1, injection_duration=0.25) + +# check values measured +fname = sorted(os.listdir('data/'))[-1] +print(fname) +df = pd.read_csv('data/' + fname) +fig, ax = plt.subplots() +ax.hist(df['R [ohm]']) +ax.set_ylabel('Transfer resistance [Ohm]') +ax.set_xticks(np.arange(df.shape[0])) +ax.set_xticklabels(df['A'] + ',' + df['B'] + ',' + df['M'] + ',' + df['N']) +fig.tight_layou() +fig.savefig('check-r.jpg') + +# run sequence asynchronously and save data to file +k.run_sequence_async(nb_stack=1, injection_duration=0.25) +time.sleep(2) +k.interrupt() # will kill the asynchronous sequence running + +# run a series of asynchronous sequences +k.run_sequences(nb_stack=1, injection_duration=0.25) +time.sleep(10) +k.interrupt() + + +# look at the noise frequency with FFT +if False: + from numpy.fft import fft, ifft + + x = data[inan, 1][10:300] + t = np.linspace(0, len(x)*4, len(x)) + sr = 1/0.004 + + X = fft(x) + N = len(X) + n = np.arange(N) + T = N/sr + freq = n/T + + plt.figure(figsize = (12, 6)) + plt.subplot(121) + + plt.stem(freq, np.abs(X), 'b', \ + markerfmt=" ", basefmt="-b") + plt.xlabel('Freq (Hz)') + plt.ylabel('FFT Amplitude |X(freq)|') + #plt.xlim(0, 10) + + plt.subplot(122) + plt.plot(t, ifft(X), 'r') + plt.xlabel('Time (s)') + plt.ylabel('Amplitude') + plt.tight_layout() + plt.show() -- GitLab