Commit 2fed7199 authored by Olivier Kaufmann's avatar Olivier Kaufmann
Browse files

Improves abstract hardware; Adds vab_square_wave to measure; Adds soh logger;...

Improves abstract hardware; Adds vab_square_wave to measure; Adds soh logger; Adds first modifications to OhmPi
No related merge requests found
Showing with 2358 additions and 737 deletions
+2358 -737
...@@ -65,6 +65,7 @@ DATA_LOGGING_CONFIG = { ...@@ -65,6 +65,7 @@ DATA_LOGGING_CONFIG = {
# State of Health logging configuration (For a future release) # State of Health logging configuration (For a future release)
SOH_LOGGING_CONFIG = { SOH_LOGGING_CONFIG = {
'logging_level': logging.INFO, 'logging_level': logging.INFO,
'log_file_logging_level': logging.DEBUG,
'logging_to_console': True, 'logging_to_console': True,
'file_name': f'soh{logging_suffix}.log', 'file_name': f'soh{logging_suffix}.log',
'max_bytes': 16777216, 'max_bytes': 16777216,
......
from OhmPi.config import HARDWARE_CONFIG
import os
from OhmPi.hardware import MuxAbstract
MUX_CONFIG = HARDWARE_CONFIG['mux']
class Mux(MuxAbstract):
def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs)
self.max_elec = MUX_CONFIG['max_elec']
def switch_one(self, elec, role, state='off'):
self.tca = adafruit_tca9548a.TCA9548A(i2c, self.addresses[role])
# find I2C address of the electrode and corresponding relay
# considering that one MCP23017 can cover 16 electrodes
i2c_address = 7 - (elec - 1) // 16 # quotient without rest of the division
relay = (elec - 1) - ((elec - 1) // 16) * 16
if i2c_address is not None:
# select the MCP23017 of the selected MUX board
mcp = MCP23017(self.tca[i2c_address])
mcp.get_pin(relay - 1).direction = digitalio.Direction.OUTPUT
if state == 'on':
mcp.get_pin(relay - 1).value = True
else:
mcp.get_pin(relay - 1).value = False
# exec_logger.debug(f'Switching relay {relay} '
# f'({str(hex(self.addresses[role]))}) on:{on} for electrode {elec}')
else:
raise ValueError('No I2C address found for the electrode'
' {:d} on board {:s}'.format(elec, self.addresses[role]))
# exec_logger.warning(f'Unable to address electrode nr {elec}')
def switch(self, elecdic={}, state='on'):
"""Switch a given list of electrodes with different roles.
Electrodes with a value of 0 will be ignored.
Parameters
----------
elecdic : dictionary, optional
Dictionnary of the form: role: [list of electrodes].
state : str, optional
Either 'on' or 'off'.
"""
# check to prevent A == B (SHORT-CIRCUIT)
if 'A' in elecdic and 'B' in elecdic:
out = np.in1d(elecdic['A'], elecdic['B'])
if out.any():
raise ValueError('Some electrodes have A == B -> SHORT-CIRCUIT')
return
# check none of M and N are the same A or B
# as to prevent burning the MN part which cannot take
# the full voltage of the DPS
if 'A' in elecdic and 'B' in elecdic and 'M' in elecdic and 'N' in elecdic:
if (np.in1d(elecdic['M'], elecdic['A']).any()
or np.in1d(elecdic['M'], elecdic['B']).any()
or np.in1d(elecdic['N'], elecdic['A']).any()
or np.in1d(elecdic['N'], elecdic['B']).any()):
raise ValueError('Some electrodes M and N are on A and B -> cannot be with DPS')
return
# if all ok, then switch the electrodes
for role in elecdic:
for elec in elecdic[role]:
if elec > 0:
self.switch_one(elec, role, state)
def reset(self):
for role in self.addresses:
for elec in range(self.nelec):
self.switch_one(elec, role, 'off')
\ No newline at end of file
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import numpy as np
from OhmPi.logging_setup import create_stdout_logger from OhmPi.logging_setup import create_stdout_logger
import time import time
...@@ -9,16 +12,107 @@ class ControllerAbstract(ABC): ...@@ -9,16 +12,107 @@ class ControllerAbstract(ABC):
self.exec_logger = kwargs.pop('exec_logger', None) self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None: if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec_ctl') self.exec_logger = create_stdout_logger('exec_ctl')
self.soh_logger = kwargs.pop('soh_logger', None)
if self.soh_logger is None:
self.soh_logger = create_stdout_logger('soh_ctl')
self.exec_logger.debug(f'{self.board_name} Controller initialization') self.exec_logger.debug(f'{self.board_name} Controller initialization')
self._cpu_temp_available = False
self.max_cpu_temp = np.inf
@property
def cpu_temperature(self):
if not self._cpu_temp_available:
self.exec_logger.warning(f'CPU temperature reading is not available for {self.board_name}')
cpu_temp = np.nan
else:
cpu_temp = self._get_cpu_temp()
if cpu_temp > self.max_cpu_temp:
self.soh_logger.warning(f'CPU temperature of {self.board_name} is over the limit!')
return cpu_temp
@abstractmethod
def _get_cpu_temp(self):
pass
class MuxAbstract(ABC): class MuxAbstract(ABC):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.board_name = kwargs.pop('board_name', 'unknown MUX hardware') self.board_name = kwargs.pop('board_name', 'unknown MUX hardware') # TODO: introduce MUX boards that take part to a MUX system (could be the same for RX boards that take part to an RX system (e.g. different channels)
self.exec_logger = kwargs.pop('exec_logger', None) self.exec_logger = kwargs.pop('exec_logger', None)
if self.exec_logger is None: if self.exec_logger is None:
self.exec_logger = create_stdout_logger('exec_mux') self.exec_logger = create_stdout_logger('exec_mux')
self.exec_logger.debug(f'{self.board_name} MUX initialization') self.exec_logger.debug(f'{self.board_name} MUX initialization')
@abstractmethod
def reset(self):
pass
def switch(self, elec_dict=None, state='on'):
"""Switch a given list of electrodes with different roles.
Electrodes with a value of 0 will be ignored.
Parameters
----------
elec_dict : dictionary, optional
Dictionary of the form: {role: [list of electrodes]}.
state : str, optional
Either 'on' or 'off'.
"""
if elec_dict is not None:
self.exec_logger.debug(f'Switching {self.board_name} ')
# check to prevent A == B (SHORT-CIRCUIT)
if 'A' in elec_dict.keys() and 'B' in elec_dict.keys():
out = np.in1d(elec_dict['A'], elec_dict['B'])
if out.any() and state=='on':
self.exec_logger.error('Trying to switch on some electrodes with both A and B roles. '
'This would create a short-circuit! Switching aborted.')
return
# check that none of M or N are the same as A or B
# as to prevent burning the MN part which cannot take
# the full voltage of the DPS
if 'A' in elec_dict.keys() and 'B' in elec_dict.keys() and 'M' in elec_dict.keys() and 'N' in elec_dict.keys():
if (np.in1d(elec_dict['M'], elec_dict['A']).any()
or np.in1d(elec_dict['M'], elec_dict['B']).any()
or np.in1d(elec_dict['N'], elec_dict['A']).any()
or np.in1d(elec_dict['N'], elec_dict['B']).any()) and state=='on':
self.exec_logger.error('Trying to switch on some electrodes with both M or N roles and A or B roles.'
'This would create an over-voltage in the RX! Switching aborted.')
return
# if all ok, then switch the electrodes
for role in elec_dict:
for elec in elec_dict[role]:
if elec > 0:
self.switch_one(elec, role, state)
else:
self.exec_logger.warning(f'Missing argument for {self.board_name}.switch: elec_dict is None.')
@abstractmethod
def switch_one(self, elec, role, state):
pass
def test(self, elec_dict, activation_time=1.):
"""Method to test the multiplexer.
Parameters
----------
elec_dict : dictionary, optional
Dictionary of the form: {role: [list of electrodes]}.
activation_time : float, optional
Time in seconds during which the relays are activated.
"""
self.exec_logger.debug(f'Starting {self.board_name} test...')
self.reset()
for role in elec_dict.keys():
for elec in elec_dict['role']:
self.switch_one(elec, role, 'on')
self.exec_logger.debug(f'electrode: {elec} activated.')
time.sleep(activation_time)
self.switch_one(elec, role, 'off')
self.exec_logger.debug(f'electrode: {elec} deactivated.')
time.sleep(activation_time)
self.exec_logger.debug('Test finished.')
class TxAbstract(ABC): class TxAbstract(ABC):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.board_name = kwargs.pop('board_name', 'unknown TX hardware') self.board_name = kwargs.pop('board_name', 'unknown TX hardware')
...@@ -153,6 +247,7 @@ class RxAbstract(ABC): ...@@ -153,6 +247,7 @@ class RxAbstract(ABC):
self._sampling_rate = kwargs.pop('sampling_rate', 1) self._sampling_rate = kwargs.pop('sampling_rate', 1)
self.exec_logger.debug(f'{self.board_name} RX initialization') self.exec_logger.debug(f'{self.board_name} RX initialization')
self._adc_gain = 1. self._adc_gain = 1.
self._max_sampling_rate = np.inf
@property @property
def adc_gain(self): def adc_gain(self):
...@@ -175,6 +270,10 @@ class RxAbstract(ABC): ...@@ -175,6 +270,10 @@ class RxAbstract(ABC):
@sampling_rate.setter @sampling_rate.setter
def sampling_rate(self, value): def sampling_rate(self, value):
assert value > 0. assert value > 0.
if value > self._max_sampling_rate:
self.exec_logger.warning(f'{self} maximum sampling rate is {self._max_sampling_rate}. '
f'Setting sampling rate to the highest allowed value.')
value = self._max_sampling_rate
self._sampling_rate = value self._sampling_rate = value
self.exec_logger.debug(f'Sampling rate set to {value}') self.exec_logger.debug(f'Sampling rate set to {value}')
......
...@@ -7,4 +7,13 @@ class Mux(MuxAbstract): ...@@ -7,4 +7,13 @@ class Mux(MuxAbstract):
def __init__(self, **kwargs): def __init__(self, **kwargs):
kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')}) kwargs.update({'board_name': os.path.basename(__file__).rstrip('.py')})
super().__init__(**kwargs) super().__init__(**kwargs)
self.max_elec = MUX_CONFIG['max_elec'] self.max_elec = MUX_CONFIG['max_elec']
\ No newline at end of file
def reset(self):
pass
def switch(self, elec_dict, state):
pass
def test(self):
self.exec_logger.info('MUX test finished.')
\ No newline at end of file
...@@ -2,6 +2,7 @@ from OhmPi.hardware import ControllerAbstract ...@@ -2,6 +2,7 @@ from OhmPi.hardware import ControllerAbstract
import board # noqa import board # noqa
import busio # noqa import busio # noqa
from OhmPi.utils import get_platform from OhmPi.utils import get_platform
from gpiozero import CPUTemperature
class Controller(ControllerAbstract): class Controller(ControllerAbstract):
def __init__(self, **kwargs): def __init__(self, **kwargs):
...@@ -9,4 +10,10 @@ class Controller(ControllerAbstract): ...@@ -9,4 +10,10 @@ class Controller(ControllerAbstract):
self.bus = busio.I2C(board.SCL, board.SDA) # noqa self.bus = busio.I2C(board.SCL, board.SDA) # noqa
platform, on_pi = get_platform() platform, on_pi = get_platform()
assert on_pi assert on_pi
self.board_name = platform self.board_name = platform
\ No newline at end of file self._cpu_temp_available = True
self.max_cpu_temp = 85. # °C
@property
def _get_cpu_temp(self):
return CPUTemperature().temperature
\ No newline at end of file
import json import json
from OhmPi.config import EXEC_LOGGING_CONFIG, DATA_LOGGING_CONFIG, MQTT_LOGGING_CONFIG, MQTT_CONTROL_CONFIG from OhmPi.config import EXEC_LOGGING_CONFIG, DATA_LOGGING_CONFIG, SOH_LOGGING_CONFIG,\
MQTT_LOGGING_CONFIG, MQTT_CONTROL_CONFIG
from os import path, mkdir, statvfs from os import path, mkdir, statvfs
from time import gmtime from time import gmtime
import logging import logging
...@@ -27,10 +28,50 @@ def setup_loggers(mqtt=True): ...@@ -27,10 +28,50 @@ def setup_loggers(mqtt=True):
if not path.isdir(log_path): if not path.isdir(log_path):
mkdir(log_path) mkdir(log_path)
exec_log_filename = path.join(log_path, EXEC_LOGGING_CONFIG['file_name']) exec_log_filename = path.join(log_path, EXEC_LOGGING_CONFIG['file_name'])
soh_log_filename = path.join(log_path, SOH_LOGGING_CONFIG['file_name'])
exec_logger = logging.getLogger('exec_logger') exec_logger = logging.getLogger('exec_logger')
soh_logger = logging.getLogger('soh_logger')
# SOH logging setup # SOH logging setup
# TODO: Add state of health logging here # Set message logging format and level
log_format = '%(asctime)-15s | %(process)d | %(levelname)s: %(message)s'
logging_to_console = SOH_LOGGING_CONFIG['logging_to_console']
soh_handler = CompressedSizedTimedRotatingFileHandler(soh_log_filename,
max_bytes=SOH_LOGGING_CONFIG['max_bytes'],
backup_count=SOH_LOGGING_CONFIG['backup_count'],
when=SOH_LOGGING_CONFIG['when'],
interval=SOH_LOGGING_CONFIG['interval'])
soh_formatter = logging.Formatter(log_format)
soh_formatter.converter = gmtime
soh_formatter.datefmt = '%Y-%m-%d %H:%M:%S UTC'
soh_handler.setFormatter(soh_formatter)
soh_logger.addHandler(soh_handler)
soh_logger.setLevel(SOH_LOGGING_CONFIG['log_file_logging_level'])
if logging_to_console:
console_soh_handler = logging.StreamHandler(sys.stdout)
console_soh_handler.setLevel(SOH_LOGGING_CONFIG['logging_level'])
console_soh_handler.setFormatter(soh_formatter)
soh_logger.addHandler(console_soh_handler)
if mqtt:
mqtt_settings = MQTT_LOGGING_CONFIG.copy()
mqtt_soh_logging_level = mqtt_settings.pop('soh_logging_level', logging.DEBUG)
[mqtt_settings.pop(i) for i in ['client_id', 'exec_topic', 'data_topic', 'soh_topic', 'data_logging_level',
'soh_logging_level']]
mqtt_settings.update({'topic': MQTT_LOGGING_CONFIG['soh_topic']})
# TODO: handle the case of MQTT broker down or temporarily unavailable
try:
mqtt_soh_handler = MQTTHandler(**mqtt_settings)
mqtt_soh_handler.setLevel(mqtt_soh_logging_level)
mqtt_soh_handler.setFormatter(soh_formatter)
soh_logger.addHandler(mqtt_soh_handler)
msg += colored(f"\n\u2611 Publishes execution as {MQTT_LOGGING_CONFIG['soh_topic']} topic on the "
f"{MQTT_LOGGING_CONFIG['hostname']} broker", 'blue')
except Exception as e:
msg += colored(f'\nWarning: Unable to connect to soh topic on broker\n{e}', 'yellow')
mqtt = False
# Data logging setup # Data logging setup
base_path = path.dirname(__file__) base_path = path.dirname(__file__)
...@@ -119,14 +160,16 @@ def setup_loggers(mqtt=True): ...@@ -119,14 +160,16 @@ def setup_loggers(mqtt=True):
mqtt = False mqtt = False
try: try:
init_logging(exec_logger, data_logger, EXEC_LOGGING_CONFIG['logging_level'], log_path, data_log_filename) init_logging(exec_logger, data_logger, soh_logger, EXEC_LOGGING_CONFIG['logging_level'],
SOH_LOGGING_CONFIG['logging_level'], log_path, data_log_filename)
except Exception as err: except Exception as err:
msg += colored(f'\n\u26A0 ERROR: Could not initialize logging!\n{err}', 'red') msg += colored(f'\n\u26A0 ERROR: Could not initialize logging!\n{err}', 'red')
finally: finally:
return exec_logger, exec_log_filename, data_logger, data_log_filename, EXEC_LOGGING_CONFIG['logging_level'], msg return exec_logger, exec_log_filename, data_logger, data_log_filename, soh_logger, soh_log_filename,\
EXEC_LOGGING_CONFIG['logging_level'], msg
def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_log_filename): def init_logging(exec_logger, data_logger, soh_logger, exec_logging_level, soh_logging_level, log_path, data_log_filename):
""" This is the init sequence for the logging system """ """ This is the init sequence for the logging system """
init_logging_status = True init_logging_status = True
...@@ -135,7 +178,8 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo ...@@ -135,7 +178,8 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo
exec_logger.info('*** NEW SESSION STARTING ***') exec_logger.info('*** NEW SESSION STARTING ***')
exec_logger.info('****************************') exec_logger.info('****************************')
exec_logger.info('') exec_logger.info('')
exec_logger.debug(f'Logging level: {exec_logging_level}') exec_logger.debug(f'Execution logging level: {exec_logging_level}')
exec_logger.debug(f'State of health logging level: {soh_logging_level}')
try: try:
st = statvfs('.') st = statvfs('.')
available_space = st.f_bavail * st.f_frsize / 1024 / 1024 available_space = st.f_bavail * st.f_frsize / 1024 / 1024
...@@ -144,6 +188,7 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo ...@@ -144,6 +188,7 @@ def init_logging(exec_logger, data_logger, exec_logging_level, log_path, data_lo
exec_logger.debug('Unable to get remaining disk space: {e}') exec_logger.debug('Unable to get remaining disk space: {e}')
exec_logger.info('Saving data log to ' + data_log_filename) exec_logger.info('Saving data log to ' + data_log_filename)
config_dict = {'execution logging configuration': json.dumps(EXEC_LOGGING_CONFIG, indent=4), config_dict = {'execution logging configuration': json.dumps(EXEC_LOGGING_CONFIG, indent=4),
'state of health logging configuration': json.dumps(SOH_LOGGING_CONFIG, indent=4),
'data logging configuration': json.dumps(DATA_LOGGING_CONFIG, indent=4), 'data logging configuration': json.dumps(DATA_LOGGING_CONFIG, indent=4),
'mqtt logging configuration': json.dumps(MQTT_LOGGING_CONFIG, indent=4), 'mqtt logging configuration': json.dumps(MQTT_LOGGING_CONFIG, indent=4),
'mqtt control configuration': json.dumps(MQTT_CONTROL_CONFIG, indent=4)} 'mqtt control configuration': json.dumps(MQTT_CONTROL_CONFIG, indent=4)}
......
...@@ -48,46 +48,32 @@ class OhmPiHardware: ...@@ -48,46 +48,32 @@ class OhmPiHardware:
self.mux = kwargs.pop('mux', mux_module.Mux(exec_logger=self.exec_logger, self.mux = kwargs.pop('mux', mux_module.Mux(exec_logger=self.exec_logger,
data_logger=self.data_logger, data_logger=self.data_logger,
soh_logger=self.soh_logger)) soh_logger=self.soh_logger))
self.readings=np.array([]) self.readings = np.array([])
self.readings_window = (0.3, 1.0)
def _clear_values(self):
self.readings = np.array([])
def _inject(self, duration): def _inject(self, duration):
self.tx_sync.set() self.tx_sync.set()
self.tx.voltage_pulse(length=duration) self.tx.voltage_pulse(length=duration)
self.tx_sync.clear() self.tx_sync.clear()
def _read_values(self, sampling_rate): # noqa def _read_values(self, sampling_rate, append=False): # noqa
if not append:
self._clear_values()
_readings = [] _readings = []
sample = 0 sample = 0
self.tx_sync.wait() self.tx_sync.wait()
start_time = datetime.datetime.utcnow() start_time = datetime.datetime.utcnow()
while self.tx_sync.is_set(): while self.tx_sync.is_set():
lap = datetime.datetime.utcnow() lap = datetime.datetime.utcnow()
_readings.append([elapsed_seconds(start_time), self.tx.current, self.rx.voltage]) _readings.append([elapsed_seconds(start_time), self.tx.current, self.rx.voltage, self.tx.polarity])
sample+=1 sample+=1
sleep_time = start_time + datetime.timedelta(seconds = sample * sampling_rate / 1000) - lap sleep_time = start_time + datetime.timedelta(seconds = sample * sampling_rate / 1000) - lap
# print(f'expected_end_time: {start_time+datetime.timedelta(seconds = sample * sampling_rate / 1000)}, lap: {lap}, sample: {sample}, sleep_time: {sleep_time.total_seconds()} seconds')
time.sleep(np.min([0, np.abs(sleep_time.total_seconds())])) time.sleep(np.min([0, np.abs(sleep_time.total_seconds())]))
self.readings = np.array(_readings) self.readings = np.array(_readings)
def _vab_pulse(self, vab, length, sampling_rate=None, polarity=None):
""" Gets VMN and IAB from a single voltage pulse
"""
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarity is not None and polarity != self.tx.polarity:
self.tx.polarity = polarity
self.tx.voltage = vab
injection = Thread(target=self._inject, kwargs={'duration':length})
readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate})
# set gains automatically
self.tx.adc_gain_auto()
self.rx.adc_gain_auto()
readings.start()
injection.start()
readings.join()
injection.join()
def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5, def _compute_tx_volt(self, best_tx_injtime=0.1, strategy='vmax', tx_volt=5,
vab_max=voltage_max, vmn_min=voltage_min): vab_max=voltage_max, vmn_min=voltage_min):
"""Estimates best Tx voltage based on different strategies. """Estimates best Tx voltage based on different strategies.
...@@ -163,4 +149,38 @@ class OhmPiHardware: ...@@ -163,4 +149,38 @@ class OhmPiHardware:
polarity = -1 # TODO: check if we really need to return polarity polarity = -1 # TODO: check if we really need to return polarity
else: else:
polarity = 1 polarity = 1
return vab, polarity, rab return vab, polarity, rab
\ No newline at end of file
def vab_square_wave(self, vab, length, sampling_rate, cycles=3):
self._vab_pulses(self, vab, [length]*cycles, sampling_rate)
def _vab_pulse(self, vab, length, sampling_rate=None, polarity=None, append=False):
""" Gets VMN and IAB from a single voltage pulse
"""
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarity is not None and polarity != self.tx.polarity:
self.tx.polarity = polarity
self.tx.voltage = vab
injection = Thread(target=self._inject, kwargs={'duration':length})
readings = Thread(target=self._read_values, kwargs={'sampling_rate': sampling_rate, 'append': append})
# set gains automatically
self.tx.adc_gain_auto()
self.rx.adc_gain_auto()
readings.start()
injection.start()
readings.join()
injection.join()
def _vab_pulses(self, vab, lengths, sampling_rate, polarities=None):
n_pulses = len(lengths)
if sampling_rate is None:
sampling_rate = RX_CONFIG['sampling_rate']
if polarities is not None:
assert len(polarities)==n_pulses
else:
polarities = [-self.tx.polarity * np.heaviside(i % 2, -1.) for i in range(n_pulses)]
self._clear_values()
for i in range(n_pulses):
self._vab_pulse(self, length=lengths[i], sampling_rate=sampling_rate, polarity=polarities[i], append=True)
\ No newline at end of file
This diff is collapsed.
ohmpi_bkp.py 0 → 100644
+ 1701
0
View file @ 2fed7199
This diff is collapsed.
import numpy as np import numpy as np
import matplotlib.pyplot as plt # import matplotlib.pyplot as plt
from utils import change_config from utils import change_config
change_config('config_ohmpi_card_3_15.py', verbose=False) change_config('config_ohmpi_card_3_15.py', verbose=False)
from OhmPi.measure import OhmPiHardware from OhmPi.measure import OhmPiHardware
k = OhmPiHardware() k = OhmPiHardware()
k._vab_pulse(vab=12, length=1., sampling_rate=k.rx.sampling_rate, polarity=1) k.vab_square_wave(vab=12, length=1., sampling_rate=k.rx.sampling_rate, cycles=3)
r = k.readings[:,2]/k.readings[:,1] r = k.readings[:,4]*k.readings[:,2]/k.readings[:,1]
print(f'Mean resistance: {np.mean(r):.3f} Ohms, Dev. {100*np.std(r)/np.mean(r):.1f} %') print(f'Mean resistance: {np.mean(r):.3f} Ohms, Dev. {100*np.std(r)/np.mean(r):.1f} %')
print(f'sampling rate: {k.rx.sampling_rate:.1f} ms, mean sample spacing: {np.mean(np.diff(k.readings[:,0]))*1000.:.1f} ms') print(f'sampling rate: {k.rx.sampling_rate:.1f} ms, mean sample spacing: {np.mean(np.diff(k.readings[:,0]))*1000.:.1f} ms')
change_config('config_default.py', verbose=False) change_config('config_default.py', verbose=False)
print(k.readings)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment