Commit 7587d63f authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Merges changes on controller. Cleans up exec messages in info mode. Implements...

Merges changes on controller. Cleans up exec messages in info mode. Implements a generic _process_commands.
Showing with 454 additions and 351 deletions
+454 -351
...@@ -2,11 +2,12 @@ import logging ...@@ -2,11 +2,12 @@ import logging
from paho.mqtt.client import MQTTv31 from paho.mqtt.client import MQTTv31
mqtt_broker = 'mg3d-dev.umons.ac.be' # 'localhost' ohmpi_id = '0001'
mqtt_broker = 'localhost'
logging_suffix = '' logging_suffix = ''
# OhmPi configuration # OhmPi configuration
OHMPI_CONFIG = { OHMPI_CONFIG = {
'id': '0001', # Unique identifier of the OhmPi board (string) 'id': ohmpi_id, # Unique identifier of the OhmPi board (string)
'R_shunt': 2, # Shunt resistance in Ohms 'R_shunt': 2, # Shunt resistance in Ohms
'Imax': 4800/50/2, # Maximum current 'Imax': 4800/50/2, # Maximum current
'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V 'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V
......
...@@ -114,58 +114,58 @@ files (.json and .py). ...@@ -114,58 +114,58 @@ files (.json and .py).
.. code-block:: python .. code-block:: python
:caption: Example of using the Python API to control OhmPi :caption: Example of using the Python API to control OhmPi
import os
from ohmpi import OhmPi import numpy as np
k = OhmPi(idps=True) # if V3.0 make sure to set idps to True import time
# the DPS5005 is used in V3.0 to inject higher voltage os.chdir("/home/pi/OhmPi")
from ohmpi import OhmPi
# default parameters are stored in the pardict argument
# they can be manually changed ### Define object from class OhmPi
k.settings['injection_duration'] = 0.5 # injection time in seconds k = OhmPi() # this load default parameters from the disk
k.settings['nb_stack'] = 1 # one stack is two half-cycles
k.settings['nbr_meas'] = 1 # number of time the sequence is repeated ### Default parameters can also be edited manually
k.settings['injection_duration'] = 0.5 # injection time in seconds
# without multiplexer, one can simple measure using k.settings['nb_stack'] = 1 # one stack is two half-cycles
out = k.run_measurement() k.settings['nbr_meas'] = 1 # number of time the sequence is repeated
# out contains information about the measurement and can be save as
k.append_and_save('out.csv', out) ### Update settings if needed
k.update_settings({"injection_duration":0.2})
# custom or adaptative argument (see help of run_measurement())
k.run_measurement(nb_stack=4, # do 4 stacks (8 half-cycles) ### Set or load sequence
injection_duration=2, # inject for 2 seconds k.sequence = np.array([[1,2,3,4]]) # set numpy array of shape (n,4)
autogain=True, # adapt gain of ADS to get good resolution # k.set_sequence('1 2 3 4\n2 3 4 5') # call function set_sequence and pass a string
strategy='vmin', # inject min voltage for Vab (v3.0) # k.load_sequence('ABMN.txt') # load sequence from a local file
tx_volt=5) # vab for finding vmin or vab injectected
# if 'strategy' is 'constant' ### Run contact resistance check
k.rs_check()
### Run sequence (synchronously - it will wait that all
# sequence is measured before returning the prompt
k.run_sequence()
# k.run_sequence_async() # sequence is run in a separate thread and the prompt returns immediately
# time.sleep(2)
# k.interrupt() # kill the asynchrone sequence
### Run multiple sequences at given time interval
k.settings['nb_meas'] = 3 # run sequence three times
k.settings['sequence_delay'] = 100 # every 100 s
k.run_multiple_sequences() # asynchrone
# k.interrupt() # kill the asynchrone sequence
### Single measurement can also be taken with
k.switch_mux_on([1, 4, 2, 3])
k.run_measurement() # use default acquisition parameters
k.switch_mux_off([1, 4, 2, 3]) # don't forget this! risk of short-circuit
### Custom or adaptative argument, see help(k.run_measurement)
k.run_measurement(nb_stack=4, # do 4 stacks (8 half-cycles)
injection_duration=2, # inject for 2 seconds
autogain=True) # adapt gain of ADS to get good resolution
# if a multiplexer is connected, we can also manually switch it
k.reset_mux() # check that all mux are closed (do this FIRST)
k.switch_mux_on([1, 4, 2, 3])
k.run_measurement()
k.switch_mux_off([1, 4, 2, 3]) # don't forget this! risk of short-circuit
# import a sequence
k.read_quad('sequence.txt') # four columns, no header, space as separator
print(k.sequence) # where the sequence is stored
# rs check
k.rs_check() # run an RS check (check contact resistances) for all
# electrodes of the given sequence
# run a sequence
k.measure() # measure accept same arguments as run_measurement()
# NOTE: this is an asynchronous command that runs in a separate thread
# after executing the command, the prompt will return immediately
# the asynchronous thread can be stopped during execution using
k.stop()
# otherwise, it will exit by itself at the end of the sequence
# if multiple measurement are to be taken, the sequence will be repeated
***MQTT interface*** ***MQTT interface***
Interface to communicate with the Pi designed for the Internet of Things (IoT). Interface to communicate with the Pi designed for the Internet of Things (IoT).
......
...@@ -16,8 +16,10 @@ k.sequence = np.array([[1,2,3,4]]) # set numpy array of shape (n,4) ...@@ -16,8 +16,10 @@ k.sequence = np.array([[1,2,3,4]]) # set numpy array of shape (n,4)
# k.load_sequence('ABMN.txt') # load sequence from a local file # k.load_sequence('ABMN.txt') # load sequence from a local file
### Run contact resistance check ### Run contact resistance check
k.rs_check() # k.rs_check()
### Run sequence ### Run sequence
k.run_sequence() k.run_sequence()
# k.interrupt() # k.run_sequence_async()
\ No newline at end of file # time.sleep(2)
# k.interrupt()
\ No newline at end of file
...@@ -23,8 +23,56 @@ publisher_config.pop('ctrl_topic') ...@@ -23,8 +23,56 @@ publisher_config.pop('ctrl_topic')
print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker.")) print(colored(f"Sending commands control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker."))
cmd_id = None cmd_id = None
received = False
rdic = {} rdic = {}
# set controller globally as __init__ seem to be called for each request and so we subscribe again each time (=overhead)
controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue'))
trials = 0
trials_max = 10
broker_connected = False
while trials < trials_max:
try:
controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'),
MQTT_CONTROL_CONFIG['auth']['password'])
controller.connect(MQTT_CONTROL_CONFIG['hostname'])
trials = trials_max
broker_connected = True
except Exception as e:
print(f'Unable to connect control broker: {e}')
print('trying again to connect to control broker...')
time.sleep(2)
trials += 1
if broker_connected:
print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}")
controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else:
print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}")
controller = None
# start a listener for acknowledgement
def _control():
def on_message(client, userdata, message):
global cmd_id, rdic, received
command = json.loads(message.payload.decode('utf-8'))
#print('++++', cmd_id, received, command)
if ('reply' in command.keys()) and (command['cmd_id'] == cmd_id):
print(f'Acknowledgement reception of command {command} by OhmPi')
# print('oooooooooook', command['reply'])
received = True
#rdic = command
controller.on_message = on_message
controller.loop_forever()
t = threading.Thread(target=_control)
t.start()
class MyServer(SimpleHTTPRequestHandler): class MyServer(SimpleHTTPRequestHandler):
# because we use SimpleHTTPRequestHandler, we do not need to implement # because we use SimpleHTTPRequestHandler, we do not need to implement
# the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to) # the do_GET() method (if we use the BaseHTTPRequestHandler, we would need to)
...@@ -42,60 +90,44 @@ class MyServer(SimpleHTTPRequestHandler): ...@@ -42,60 +90,44 @@ class MyServer(SimpleHTTPRequestHandler):
def __init__(self, request, client_address, server): def __init__(self, request, client_address, server):
super().__init__(request, client_address, server) super().__init__(request, client_address, server)
# set controller # global controller, once # using global variable otherwise, we subscribe to client for EACH request
self.controller = mqtt_client.Client(f"ohmpi_{OHMPI_CONFIG['id']}_listener", clean_session=False) # create new instance # if once:
print(colored(f"Connecting to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']} on {MQTT_CONTROL_CONFIG['hostname']} broker", 'blue')) # self.controller = controller
trials = 0 # self.cmd_thread = threading.Thread(target=self._control)
trials_max = 10 # self.cmd_thread.start()
broker_connected = False # once = False
while trials < trials_max:
try:
self.controller.username_pw_set(MQTT_CONTROL_CONFIG['auth'].get('username'), # we would like to listen to the ackn topic to check our message has been wel received
MQTT_CONTROL_CONFIG['auth']['password']) # by the OhmPi, however, this won't work as it seems an instance of MyServer is created
self.controller.connect(MQTT_CONTROL_CONFIG['hostname']) # each time (actually it's not a server but a requestHandler)
trials = trials_max # def _control(self):
broker_connected = True # def on_message(client, userdata, message):
except Exception as e: # global cmd_id, rdic
print(f'Unable to connect control broker: {e}')
print('trying again to connect to control broker...') # command = json.loads(message.payload.decode('utf-8'))
time.sleep(2) # print(f'Acknowledgement reception of command {command} by OhmPi')
trials += 1 # if 'reply' in command.keys() and command['cmd_id'] == cmd_id :
if broker_connected: # print('oooooooooook', command['reply'])
print(f"Subscribing to control topic {MQTT_CONTROL_CONFIG['ctrl_topic']}") # #rdic = command
self.controller.subscribe(MQTT_CONTROL_CONFIG['ctrl_topic'], MQTT_CONTROL_CONFIG['qos'])
else: # self.controller.on_message = on_message
print(f"Unable to connect to control broker on {MQTT_CONTROL_CONFIG['hostname']}") # print('starting loop')
self.controller = None # self.controller.loop_forever()
self.cmd_thread = threading.Thread(target=self._control) # print('forever')
def _control(self):
def on_message(client, userdata, message):
global cmd_id, rdic
command = message.payload.decode('utf-8')
print(f'Received command {command}')
# self.process_commands(command)
if 'reply' in command.keys and command['cmd_id'] == cmd_id :
rdic = command
self.controller.on_message = on_message
self.controller.loop_start()
while True:
time.sleep(.1)
def do_POST(self): def do_POST(self):
global cmd_id, rdic global cmd_id, rdic, received
received = False
cmd_id = uuid.uuid4().hex cmd_id = uuid.uuid4().hex
# global socket
# global ohmpiThread, status, run
dic = json.loads(self.rfile.read(int(self.headers['Content-Length']))) dic = json.loads(self.rfile.read(int(self.headers['Content-Length'])))
#print('++', dic, cmd_id)
rdic = {} # response dictionary rdic = {} # response dictionary
if dic['cmd'] == 'run_sequence': if dic['cmd'] == 'run_multiple_sequences':
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_sequence'}) payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'run_multiple_sequences'})
publish.single(payload=payload, **publisher_config) publish.single(payload=payload, **publisher_config)
elif dic['cmd'] == 'interrupt': elif dic['cmd'] == 'interrupt':
# ohmpi.stop()
payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'}) payload = json.dumps({'cmd_id': cmd_id, 'cmd': 'interrupt'})
publish.single(payload=payload, **publisher_config) publish.single(payload=payload, **publisher_config)
elif dic['cmd'] == 'getData': elif dic['cmd'] == 'getData':
...@@ -103,9 +135,9 @@ class MyServer(SimpleHTTPRequestHandler): ...@@ -103,9 +135,9 @@ class MyServer(SimpleHTTPRequestHandler):
fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv'] fnames = [fname for fname in os.listdir('data/') if fname[-4:] == '.csv']
ddic = {} ddic = {}
for fname in fnames: for fname in fnames:
if (fname.replace('.csv', '') not in dic['surveyNames'] if ((fname != 'readme.txt')
and fname != 'readme.txt' and ('_rs' not in fname)
and '_rs' not in fname): and (fname.replace('.csv', '') not in dic['surveyNames'])):
df = pd.read_csv('data/' + fname) df = pd.read_csv('data/' + fname)
ddic[fname.replace('.csv', '')] = { ddic[fname.replace('.csv', '')] = {
'a': df['A'].tolist(), 'a': df['A'].tolist(),
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
<div class="form-check"> <div class="form-check">
<input id="dataRetrievalCheck" class="form-check-input" type="checkbox" value=""> <input id="dataRetrievalCheck" class="form-check-input" type="checkbox" value="">
<label class="form-check-label" for="dataRetrievalCheck"> <label class="form-check-label" for="dataRetrievalCheck">
Automaticaly get data every 1 secondStart Automaticaly get data every 1 second
</label> </label>
</div> </div>
<div id='output'>Status: idle</div> <div id='output'>Status: idle</div>
...@@ -34,8 +34,8 @@ ...@@ -34,8 +34,8 @@
<!-- Pseudo section --> <!-- Pseudo section -->
<select id='surveySelect' class='custom-select'> <select id='surveySelect' class='custom-select'>
</select> </select>
<input id="cmin" type="number" value="0"/> <input id="cmin" type="number" value=""/>
<input id="cmax" type="number" value="150"/> <input id="cmax" type="number" value=""/>
<button id="capplyBtn" type="button" class="btn btn-info">Apply</button> <button id="capplyBtn" type="button" class="btn btn-info">Apply</button>
<div id="gd"></div> <div id="gd"></div>
<div class="mb3 row"> <div class="mb3 row">
...@@ -163,9 +163,9 @@ ...@@ -163,9 +163,9 @@
// run button // run button
function runBtnFunc() { function runBtnFunc() {
sendCommand('{"cmd": "run_sequence"}', function(x) { sendCommand('{"cmd": "run_multiple_sequences"}', function(x) {
console.log(x['status']) console.log(x['ohmpi_status'])
if (x['status'] == 'running') { if (x['ohmpi_status'] == 'running') {
output.innerHTML = 'Status: measuring...' output.innerHTML = 'Status: measuring...'
} }
}) })
...@@ -176,7 +176,7 @@ ...@@ -176,7 +176,7 @@
// interrupt button // interrupt button
function stopBtnFunc() { function stopBtnFunc() {
sendCommand('{"cmd": "interrupt"}', function(x) { sendCommand('{"cmd": "interrupt"}', function(x) {
output.innerHTML = 'Status: ' + x['status'] output.innerHTML = 'Status: ' + x['ohmpi_status']
clearInterval(interv) clearInterval(interv)
getData() getData()
}) })
...@@ -375,13 +375,14 @@ ...@@ -375,13 +375,14 @@
// getData // getData
function getData() { function getData() {
let surveyNames = []
sendCommand(JSON.stringify({ sendCommand(JSON.stringify({
'cmd': 'getData', 'cmd': 'getData',
'surveyNames': Object.keys(data).slice(0, -1) 'surveyNames': surveyNames
// last survey is often partial so we download it again // last survey is often partial so we download it again
}), function(ddic) { }), function(ddic) {
// update status // update status
output.innerHTML = 'Status: ' + ddic['status'] //output.innerHTML = 'Status: ' + ddic['status']
// update data dic with new data // update data dic with new data
data = { // destructuring assignement (magic! :o) data = { // destructuring assignement (magic! :o)
...@@ -389,8 +390,8 @@ ...@@ -389,8 +390,8 @@
...ddic['data'] // value from second dic are preferred ...ddic['data'] // value from second dic are preferred
} }
// dropdown with number of surveys and +++ // dropdown with number of surveys
let surveyNames = Object.keys(data).sort() surveyNames = Object.keys(data).sort()
// remove listener as we will replace the choices // remove listener as we will replace the choices
surveySelect.removeEventListener('change', surveySelectFunc) surveySelect.removeEventListener('change', surveySelectFunc)
...@@ -417,18 +418,20 @@ ...@@ -417,18 +418,20 @@
// update list of quadrupoles if any // update list of quadrupoles if any
if (quads.length == 0) { if (quads.length == 0) {
console.log('updating list of quadrupoles') console.log('updating list of quadrupoles')
let df = data[surveyNames[0]] if (surveyNames.length > 0) {
let quadSelect = document.getElementById('quadSelect') let df = data[surveyNames[0]]
quadSelect.innerHTML = '' let quadSelect = document.getElementById('quadSelect')
for (let i = 0; i < df['a'].length; i++) { quadSelect.innerHTML = ''
quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]] for (let i = 0; i < df['a'].length; i++) {
quads.push(quad) quad = [df['a'][i], df['b'][i], df['m'][i], df['n'][i]]
let option = document.createElement('option') quads.push(quad)
option.value = quad.join(', ') let option = document.createElement('option')
option.innerText = quad.join(', ') option.value = quad.join(', ')
quadSelect.appendChild(option) option.innerText = quad.join(', ')
quadSelect.appendChild(option)
}
console.log('quads=', quads)
} }
console.log('quads=', quads)
} }
// update time-serie figure // update time-serie figure
...@@ -499,7 +502,7 @@ ...@@ -499,7 +502,7 @@
function removeDataBtnFunc() { function removeDataBtnFunc() {
sendCommand('{"cmd": "removeData"}',function(x) { sendCommand('{"cmd": "removeData"}',function(x) {
data = {} data = {}
output.innerHTML = 'Status: ' + x['status'] + ' (all data cleared)' output.innerHTML = 'Status: ' + x['ohmpi_status'] + ' (all data cleared)'
console.log('all data removed') console.log('all data removed')
}) })
} }
......
...@@ -76,7 +76,7 @@ def setup_loggers(mqtt=True): ...@@ -76,7 +76,7 @@ def setup_loggers(mqtt=True):
interval=DATA_LOGGING_CONFIG['interval']) interval=DATA_LOGGING_CONFIG['interval'])
data_formatter = logging.Formatter(log_format) data_formatter = logging.Formatter(log_format)
data_formatter.converter = gmtime data_formatter.converter = gmtime
data_formatter.datefmt = '%Y/%m/%d %H:%M:%S UTC' data_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC'
data_handler.setFormatter(exec_formatter) data_handler.setFormatter(exec_formatter)
data_logger.addHandler(data_handler) data_logger.addHandler(data_handler)
data_logger.setLevel(DATA_LOGGING_CONFIG['logging_level']) data_logger.setLevel(DATA_LOGGING_CONFIG['logging_level'])
......
This diff is collapsed.
...@@ -3,6 +3,6 @@ ...@@ -3,6 +3,6 @@
"injection_duration": 0.2, "injection_duration": 0.2,
"nb_stack": 1, "nb_stack": 1,
"nb_meas": 1, "nb_meas": 1,
"sequence_delay": 1, "sequence_delay": 120,
"export_path": "data/measurement.csv" "export_path": "data/measurement.csv"
} }
# test ohmpi and multiplexer on test resistances
from ohmpi import OhmPi
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import time
# configure testing
idps = False
board_version = '22.10' # v2.0
use_mux = True
start_elec = 17 # start elec
nelec = 16 # max elec in the sequence for testing
# nelec electrodes Wenner sequence
a = np.arange(nelec-3) + start_elec
b = a + 3
m = a + 1
n = a + 2
seq = np.c_[a, b, m, n]
# testing measurement board only
k = OhmPi(idps=idps, use_mux=use_mux)
k.sequence = seq # we need a sequence for possibly switching the mux
k.reset_mux() # just for safety
if use_mux:
k.switch_mux_on(seq[:, 0])
out1 = k.run_measurement(injection_duration=0.25, nb_stack=4, strategy='constant', tx_volt=12, autogain=True)
out2 = k.run_measurement(injection_duration=0.5, nb_stack=2, strategy='vmin', tx_volt=5, autogain=True)
out3 = k.run_measurement(injection_duration=1, nb_stack=1, strategy='vmax', tx_volt=5, autogain=True)
if use_mux:
k.switch_mux_off(seq[:, 0])
# visual figure of the full wave form
fig, axs = plt.subplots(2, 1, sharex=True)
ax = axs[0]
labels = ['constant', 'vmin', 'vmax']
for i, out in enumerate([out1, out2, out3]):
data = out['fulldata']
inan = ~(np.isnan(out['fulldata']).any(1))
ax.plot(data[inan,2], data[inan,0], '.-', label=labels[i])
ax.set_ylabel('Current AB [mA]')
ax.legend()
ax = axs[1]
for i, out in enumerate([out1, out2, out3]):
data = out['fulldata']
inan = ~(np.isnan(out['fulldata']).any(1))
ax.plot(data[inan,2], data[inan,1], '.-', label=labels[i])
ax.set_ylabel('Voltage MN [mV]')
ax.set_xlabel('Time [s]')
fig.savefig('check-fullwave.jpg')
# test a sequence
if use_mux:
# manually edit default settings
k.settings['injection_duration'] = 1
k.settings['nb_stack'] = 1
#k.settings['nbr_meas'] = 1
k.sequence = seq
k.reset_mux()
# set quadrupole manually
k.switch_mux_on(seq[0, :])
out = k.run_measurement(quad=[3, 3, 3, 3], nb_stack=1, tx_volt=12, strategy='constant', autogain=True)
k.switch_mux_off(seq[0, :])
print(out)
# run rs_check() and save data
k.rs_check() # check all electrodes of the sequence
# check values measured
fname = sorted(os.listdir('data/'))[-1]
print(fname)
dfrs = pd.read_csv('data/' + fname)
fig, ax = plt.subplots()
ax.hist(dfrs['RS [kOhm]'])
ax.set_xticks(np.arange(dfrs.shape[0]))
ax.set_xticklabels(dfrs['A'].astype(str) + ' - ' +
dfrs['B'].astype(str), rotation=90)
ax.set_ylabel('Contact resistances [kOhm]')
fig.tight_layout()
fig.savefig('check-rs.jpg')
# run sequence synchronously and save data to file
k.run_sequence(nb_stack=1, injection_duration=0.25)
# check values measured
fname = sorted(os.listdir('data/'))[-1]
print(fname)
df = pd.read_csv('data/' + fname)
fig, ax = plt.subplots()
ax.hist(df['R [ohm]'])
ax.set_ylabel('Transfer resistance [Ohm]')
ax.set_xticks(np.arange(df.shape[0]))
ax.set_xticklabels(df['A'].astype(str) + ','
+ df['B'].astype(str) + ','
+ df['M'].astype(str) + ','
+ df['N'].astype(str), rotation=90)
fig.tight_layout()
fig.savefig('check-r.jpg')
# run sequence asynchronously and save data to file
k.run_sequence_async(nb_stack=1, injection_duration=0.25)
time.sleep(2) # if too short, it will still be resetting the mux (= no effect)
k.interrupt() # will kill the asynchronous sequence running
# run a series of asynchronous sequences
k.settings['nb_meas'] = 2 # run the sequence twice
k.run_multiple_sequences(nb_stack=1, injection_duration=0.25)
time.sleep(5)
k.interrupt()
# look at the noise frequency with FFT
if False:
from numpy.fft import fft, ifft
x = data[inan, 1][10:300]
t = np.linspace(0, len(x)*4, len(x))
sr = 1/0.004
X = fft(x)
N = len(X)
n = np.arange(N)
T = N/sr
freq = n/T
plt.figure(figsize = (12, 6))
plt.subplot(121)
plt.stem(freq, np.abs(X), 'b', \
markerfmt=" ", basefmt="-b")
plt.xlabel('Freq (Hz)')
plt.ylabel('FFT Amplitude |X(freq)|')
#plt.xlim(0, 10)
plt.subplot(122)
plt.plot(t, ifft(X), 'r')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.tight_layout()
plt.show()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment