Commit 563c6898 authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

add example code snippet + add test_ohmpi.py

Showing with 183 additions and 57 deletions
+183 -57
......@@ -114,58 +114,52 @@ files (.json and .py).
.. code-block:: python
:caption: Example of using the Python API to control OhmPi
from ohmpi import OhmPi
k = OhmPi(idps=True) # if V3.0 make sure to set idps to True
# the DPS5005 is used in V3.0 to inject higher voltage
# default parameters are stored in the pardict argument
# they can be manually changed
k.settings['injection_duration'] = 0.5 # injection time in seconds
k.settings['nb_stack'] = 1 # one stack is two half-cycles
k.settings['nbr_meas'] = 1 # number of time the sequence is repeated
# without multiplexer, one can simple measure using
out = k.run_measurement()
# out contains information about the measurement and can be save as
k.append_and_save('out.csv', out)
# custom or adaptative argument (see help of run_measurement())
k.run_measurement(nb_stack=4, # do 4 stacks (8 half-cycles)
injection_duration=2, # inject for 2 seconds
autogain=True, # adapt gain of ADS to get good resolution
strategy='vmin', # inject min voltage for Vab (v3.0)
tx_volt=5) # vab for finding vmin or vab injectected
# if 'strategy' is 'constant'
:caption: Example of using the Python API to control OhmPi
import os
import numpy as np
import time
os.chdir("/home/pi/OhmPi")
from ohmpi import OhmPi
### Define object from class OhmPi
k = OhmPi() # this load default parameters from the disk
### Default parameters can also be edited manually
k.settings['injection_duration'] = 0.5 # injection time in seconds
k.settings['nb_stack'] = 1 # one stack is two half-cycles
k.settings['nbr_meas'] = 1 # number of time the sequence is repeated
### Update settings if needed
k.update_settings({"injection_duration":0.2})
### Set or load sequence
k.sequence = np.array([[1,2,3,4]]) # set numpy array of shape (n,4)
# k.set_sequence('1 2 3 4\n2 3 4 5') # call function set_sequence and pass a string
# k.load_sequence('ABMN.txt') # load sequence from a local file
### Run contact resistance check
# k.rs_check()
### Run sequence (synchronously - it will wait that all
# sequence is measured before returning the prompt
k.run_sequence()
# k.run_sequence_async() # sequence is run in a separate thread and the prompt returns immediately
# time.sleep(2)
# k.interrupt() # kill the asynchrone sequence
### Single measurement can also be taken with
k.switch_mux_on([1, 4, 2, 3])
k.run_measuremen() # use default acquisition parameters
k.switch_mux_off([1, 4, 2, 3]) # don't forget this! risk of short-circuit
### Custom or adaptative argument (see help of run_measurement())
k.run_measurement(nb_stack=4, # do 4 stacks (8 half-cycles)
injection_duration=2, # inject for 2 seconds
autogain=True) # adapt gain of ADS to get good resolution
# if a multiplexer is connected, we can also manually switch it
k.reset_mux() # check that all mux are closed (do this FIRST)
k.switch_mux_on([1, 4, 2, 3])
k.run_measurement()
k.switch_mux_off([1, 4, 2, 3]) # don't forget this! risk of short-circuit
# import a sequence
k.read_quad('sequence.txt') # four columns, no header, space as separator
print(k.sequence) # where the sequence is stored
# rs check
k.rs_check() # run an RS check (check contact resistances) for all
# electrodes of the given sequence
# run a sequence
k.measure() # measure accept same arguments as run_measurement()
# NOTE: this is an asynchronous command that runs in a separate thread
# after executing the command, the prompt will return immediately
# the asynchronous thread can be stopped during execution using
k.stop()
# otherwise, it will exit by itself at the end of the sequence
# if multiple measurement are to be taken, the sequence will be repeated
***MQTT interface***
Interface to communicate with the Pi designed for the Internet of Things (IoT).
......
......@@ -201,14 +201,13 @@ class OhmPi(object):
self.update_settings(config)
def update_settings(self, config):
"""Update acquisition settings from a json file or dictionary.
Parameters can be:
- nb_electrodes (number of electrode used, if 4, no MUX needed)
- injection_duration (in seconds)
- nb_meas (total number of times the sequence will be run)
- sequence_delay (delay in second between each sequence run)
- nb_stack (number of stack for each quadrupole measurement)
- export_path (path where to export the data, timestamp will be added to filename)
"""Update acquisition settings from a json file or dictionary. Parameters can be:
- nb_electrodes (number of electrode used, if 4, no MUX needed)
- injection_duration (in seconds)
- nb_meas (total number of times the sequence will be run)
- sequence_delay (delay in second between each sequence run)
- nb_stack (number of stack for each quadrupole measurement)
- export_path (path where to export the data, timestamp will be added to filename)
Parameters
----------
......
# test ohmpi and multiplexer on test resistances
from ohmpi import OhmPi
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
# configure testing
idps = False
board_version = '22.10' # v2.0
use_mux = True
start_elec = 1 # start elec
nelec = 16 # max elec in the sequence for testing
# testing measurement board only
k = OhmPi(idps=idps, use_mux=use_mux)
k.reset_mux() # just for safety
out1 = k.run_measurement(injection_duration=0.25, nb_stack=4, strategy='constant', tx_volt=12, autogain=True)
out2 = k.run_measurement(injection_duration=0.5, nb_stack=2, strategy='vmin', tx_volt=5, autogain=True)
out3 = k.run_measurement(injection_duration=1, nb_stack=1, strategy='vmax', tx_volt=5, autogain=True)
# visual figure of the full wave form
fig, axs = plt.subplots(2, 1, sharex=True)
ax = axs[0]
labels = ['constant', 'vmin', 'vmax']
for i, out in enumerate([out1, out2, out3]):
data = out['fulldata']
inan = ~(np.isnan(out['fulldata']).any(1))
ax.plot(data[inan,2], data[inan,0], '.-', label=labels[i])
ax.set_ylabel('Current AB [mA]')
ax.legend()
ax = axs[1]
for i, out in enumerate([out1, out2, out3]):
data = out['fulldata']
inan = ~(np.isnan(out['fulldata']).any(1))
ax.plot(data[inan,2], data[inan,1], '.-', label=labels[i])
ax.set_ylabel('Voltage MN [mV]')
ax.set_xlabel('Time [s]')
fig.savefig('check-fullwave.jpg')
# test a sequence
# nelec electrodes Wenner sequence
a = np.arange(nelec-3) + start_elec
b = a + 3
m = a + 1
n = a + 2
seq = np.c_[a, b, m, n]
# manually edit default settings
k.settings['injection_duration'] = 1
k.settings['nb_stack'] = 1
#k.settings['nbr_meas'] = 1
k.sequence = seq
k.reset_mux()
# set quadrupole manually
k.switch_mux_on([1, 4, 2, 3])
out = k.run_measurement(quad=[3, 3, 3, 3], nb_stack=1, tx_volt=12, strategy='constant', autogain=True)
k.switch_mux_off([1, 4, 2, 3])
print(out)
# run rs_check() and save data
k.rs_check() # check all electrodes of the sequence
# check values measured
fname = sorted(os.listdir('data/'))[-1]
print(fname)
dfrs = pd.read_csv('data/' + fname)
fig, ax = plt.subplots()
ax.hist(dfrs['R [Ohm]']/1000)
ax.set_xticks(np.arange(df.shape[0]))
ax.set_xticklabels(df['A'].str + ' - ' + df['B'].str)
ax.set_ylabel('Contact resistances [kOhm]')
fig.tight_layout()
fig.savefig('check-rs.jpg')
# run sequence synchronously and save data to file
k.run_sequence(nb_stack=1, injection_duration=0.25)
# check values measured
fname = sorted(os.listdir('data/'))[-1]
print(fname)
df = pd.read_csv('data/' + fname)
fig, ax = plt.subplots()
ax.hist(df['R [ohm]'])
ax.set_ylabel('Transfer resistance [Ohm]')
ax.set_xticks(np.arange(df.shape[0]))
ax.set_xticklabels(df['A'] + ',' + df['B'] + ',' + df['M'] + ',' + df['N'])
fig.tight_layou()
fig.savefig('check-r.jpg')
# run sequence asynchronously and save data to file
k.run_sequence_async(nb_stack=1, injection_duration=0.25)
time.sleep(2)
k.interrupt() # will kill the asynchronous sequence running
# run a series of asynchronous sequences
k.run_sequences(nb_stack=1, injection_duration=0.25)
time.sleep(10)
k.interrupt()
# look at the noise frequency with FFT
if False:
from numpy.fft import fft, ifft
x = data[inan, 1][10:300]
t = np.linspace(0, len(x)*4, len(x))
sr = 1/0.004
X = fft(x)
N = len(X)
n = np.arange(N)
T = N/sr
freq = n/T
plt.figure(figsize = (12, 6))
plt.subplot(121)
plt.stem(freq, np.abs(X), 'b', \
markerfmt=" ", basefmt="-b")
plt.xlabel('Freq (Hz)')
plt.ylabel('FFT Amplitude |X(freq)|')
#plt.xlim(0, 10)
plt.subplot(122)
plt.plot(t, ifft(X), 'r')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.tight_layout()
plt.show()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment