Commit 36812934 authored by Guillaume Blanchy's avatar Guillaume Blanchy
Browse files

update classes (dry run on desktop)

Showing with 137 additions and 77 deletions
+137 -77
......@@ -2,3 +2,4 @@ data/*.csv
**/.ipynb_notebooks/**
data.zip
__pycache__
Ohmpi_4elec_mqtt.py
......@@ -28,21 +28,27 @@ from datetime import datetime
from termcolor import colored
import threading
onpi = False # set to True if running on raspberrypi
import board, busio,adafruit_tca9548a
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
from adafruit_mcp230xx.mcp23008 import MCP23008
from adafruit_mcp230xx.mcp23017 import MCP23017
import digitalio
from digitalio import Direction
from gpiozero import CPUTemperature
if False:
import board, busio, adafruit_tca9548a
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
from adafruit_mcp230xx.mcp23008 import MCP23008
from adafruit_mcp230xx.mcp23017 import MCP23017
import digitalio
from digitalio import Direction
from gpiozero import CPUTemperature
current_time = datetime.now()
print(current_time.strftime("%Y-%m-%d %H:%M:%S"))
# from logging_setup import setup_loggers
# from mqtt_setup import mqtt_client_setup
# msg_logger, msg_log_filename, data_logger, data_log_filename, logging_level = setup_loggers()
# mqtt_client, measurement_topic = mqtt_client_setup()
# msg_logger.info(f'publishing mqtt to topic {measurement_topic}')
class OhmPi(object):
def __init__(self, config=None, sequence=None, onpi=False, output='print'):
"""Create the main OhmPi object.
......@@ -73,15 +79,15 @@ class OhmPi(object):
# default acquisition parameters
self.pardict = {
injection_duration = 0.2
nbr_meas = 100
sequence_delay = 1
nb_stack = 1
export_path = 'data/measurement.csv'
'injection_duration': 0.2,
'nbr_meas': 100,
'sequence_delay': 1,
'nb_stack': 1,
'export_path': 'data/measurement.csv'
}
# read in acquisition parameters
if config is None:
if config is not None:
self._read_acquisition_parameters(config)
self.dump('Initialized with configuration:' + str(self.pardict), level='debug')
......@@ -138,6 +144,7 @@ class OhmPi(object):
elif self.output == 'mqtt':
if level == 'debug':
# TODO mqtt transmission here
pass
def _read_acquisition_parameters(self, config):
......@@ -155,7 +162,7 @@ class OhmPi(object):
config : str
Path to the .json or dictionnary.
"""
if isinstance(config, dic):
if isinstance(config, dict):
self.pardict.update(config)
else:
dic = json.loads(config)
......@@ -166,56 +173,61 @@ class OhmPi(object):
def _read_hardware_parameters(self):
"""Read hardware parameters from settings.py.
"""
from settings.py import OHMPI_CONFIG
self.r_shunt = OHMPI_CONFIG['r_shunt'] # reference resistance value in ohm
self.Imax = OHMPI_CONFIG['Imax']
from settings import OHMPI_CONFIG
self.id = OHMPI_CONFIG['id'] # ID of the OhmPi
self.r_shunt = OHMPI_CONFIG['R_shunt'] # reference resistance value in ohm
self.Imax = OHMPI_CONFIG['Imax'] # maximum current
self.dump('The maximum current cannot be higher than 48 mA', level='warn')
self.coef_p2 = OHMPI_CONFIG['coef_p2'] # slope for current conversion for ADS.P2, measurement in V/V
self.coef_p3 = OHMPI_CONFIG['coef_p3'] # slope for current conversion for ADS.P3, measurement in V/V
self.offset_p2 = OHMPI_CONFIG['offset_p2']
self.offset_p3 = OHMPI_CONFIG['offset_p3']
self.nb_samples = OHMPI_CONFIG['integer'] # number of samples measured for each stack
self.nb_elec = OHMPI_CONFIG['nb_elec'] # max number of electrodes (physically)
self.version = OHMPI_CONFIG['version'] # hardware version
self.max_elec = OHMPI_CONFIG['max_elec'] # maximum number of electrodes
self.dump('OHMPI_CONFIG = ' + str(OHMPI_CONFIG), level='debug')
def find_identical_in_line(self, quads):
"""Find quadrupole which where A and B are identical.
If A and B are connected to the same relay, the Pi burns (short-circuit).
Parameters
----------
quads : 1D or 2D array
List of quadrupoles of shape nquad x 4 or 1D vector of shape nquad.
Returns
-------
output : 1D array of int
List of index of rows where A and B are identical.
"""
# TODO is this needed for M and N?
# if we have a 1D array (so only 1 quadrupole), make it 2D
if len(quads.shape) == 1:
quads = quads[None, :]
output = np.where(quads[:, 0] == quads[:, 1])[0]
# output = []
# if array_object.ndim == 1:
# temp = np.zeros(4)
# for i in range(len(array_object)):
# temp[i] = np.count_nonzero(array_object == array_object[i])
# if any(temp > 1):
# output.append(0)
# else:
# for i in range(len(array_object[:,1])):
# temp = np.zeros(len(array_object[1,:]))
# for j in range(len(array_object[1,:])):
# temp[j] = np.count_nonzero(array_object[i,:] == array_object[i,j])
# if any(temp > 1):
# output.append(i)
return output
def read_quad(self, filename):
def find_identical_in_line(self, quads):
"""Find quadrupole which where A and B are identical.
If A and B are connected to the same relay, the Pi burns (short-circuit).
Parameters
----------
quads : 1D or 2D array
List of quadrupoles of shape nquad x 4 or 1D vector of shape nquad.
Returns
-------
output : 1D array of int
List of index of rows where A and B are identical.
"""
# TODO is this needed for M and N?
# if we have a 1D array (so only 1 quadrupole), make it 2D
if len(quads.shape) == 1:
quads = quads[None, :]
output = np.where(quads[:, 0] == quads[:, 1])[0]
# output = []
# if array_object.ndim == 1:
# temp = np.zeros(4)
# for i in range(len(array_object)):
# temp[i] = np.count_nonzero(array_object == array_object[i])
# if any(temp > 1):
# output.append(0)
# else:
# for i in range(len(array_object[:,1])):
# temp = np.zeros(len(array_object[1,:]))
# for j in range(len(array_object[1,:])):
# temp[j] = np.count_nonzero(array_object[i,:] == array_object[i,j])
# if any(temp > 1):
# output.append(i)
return output
def read_quad(self, filename):
"""Read quadrupole sequence from file.
Parameters
......@@ -232,10 +244,10 @@ def find_identical_in_line(self, quads):
output = np.loadtxt(filename, delimiter=" ", dtype=int) # load quadripole file
# locate lines where the electrode index exceeds the maximum number of electrodes
test_index_elec = np.array(np.where(output > self.nb_elec))
test_index_elec = np.array(np.where(output > self.max_elec))
# locate lines where electrode A == electrode B
test_same_elec = find_identical_in_line(output)
test_same_elec = self.find_identical_in_line(output)
# if statement with exit cases (TODO rajouter un else if pour le deuxième cas du ticket #2)
if test_index_elec.size != 0:
......@@ -320,7 +332,7 @@ def find_identical_in_line(self, quads):
"""
roles = ['A', 'B', 'M', 'N']
# another check to be sure A != B
if quadrupoles[0] != quadrupoles[1]:
if quadrupole[0] != quadrupole[1]:
for i in range(0, 4):
self.switch_mux(quadrupole[i], 'on', roles[i])
else:
......@@ -344,7 +356,7 @@ def find_identical_in_line(self, quads):
"""Switch off all multiplexer relays."""
roles = ['A', 'B', 'M', 'N']
for i in range(0, 4):
for j in range(1, self.nb_elec + 1):
for j in range(1, self.max_elec + 1):
self.switch_mux(j, 'off', roles[i])
self.dump('All MUX switched off.', level='debug')
......@@ -400,9 +412,9 @@ def find_identical_in_line(self, quads):
# sampling for each stack at the end of the injection
meas = np.zero_like((3, self.nb_samples))
for k in range(0, self.nb_samples):
meas[0, k] = (AnalogIn(ads_current, ADS.P0).voltage*1000) / (50 * self.r_shunt) # reading current value on ADS channel A0
meas[1, k] = AnalogIn(ads_voltage, ADS.P0).voltage * self.coefp2 * 1000
meas[2, k] = AnalogIn(ads_voltage, ADS.P1).voltage * self.coefp3 * 1000 # reading voltage value on ADS channel A2
meas[0, k] = (AnalogIn(self.ads_current, ADS.P0).voltage*1000) / (50 * self.r_shunt) # reading current value on ADS channel A0
meas[1, k] = AnalogIn(self.ads_voltage, ADS.P0).voltage * self.coefp2 * 1000
meas[2, k] = AnalogIn(self.ads_voltage, ADS.P1).voltage * self.coefp3 * 1000 # reading voltage value on ADS channel A2
# stop current injection
pin1.value = False
......@@ -430,7 +442,7 @@ def find_identical_in_line(self, quads):
time.sleep(2*(end_delay-start_delay)-(end_calc-start_delay))
# create dateframe and compute averaged values from all stacks
df = DataFrame({
df = pd.DataFrame({
"time": [datetime.now()],
"A": [(1)],
"B": [(2)],
......@@ -444,7 +456,7 @@ def find_identical_in_line(self, quads):
"nbStack": [nb_stack],
"CPU temp [degC]": [CPUTemperature().temperature],
"Time [s]": [(-start_time + time.time())],
"Nb samples [-]": [nb_samples]
"Nb samples [-]": [self.nb_samples]
})
# round number to two decimal for nicer string output
......@@ -478,12 +490,16 @@ def find_identical_in_line(self, quads):
# run the RS check
self.dump('RS check (check contact resistance)', level='debug')
self.measure()
# restore
self.pardict['export_path'] = export_path
self.sequence = sequence
# TODO if interrupted, we would need to restore the values
# TODO or we offer the possiblity in 'run_measurement' to have rs_check each time?
def append_and_save(self, last_measurement):
def append_and_save(self, fname, last_measurement):
"""Append and save last measurement dataframe.
Parameters
......@@ -492,13 +508,13 @@ def find_identical_in_line(self, quads):
Last measurement taken in the form of a pandas dataframe.
"""
if os.path.isfile(self.path):
if os.path.isfile(fname):
# Load data file and append data to it
with open(self.path, 'a') as f:
with open(fname, 'a') as f:
last_measurement.to_csv(f, header=False)
else:
# create data file and add headers
with open(self.path, 'a') as f:
with open(fname, 'a') as f:
last_measurement.to_csv(f, header=True)
......@@ -533,15 +549,15 @@ def find_identical_in_line(self, quads):
self.switch_mux_on(quad)
# run a measurement
if onpi:
current_measurement = run_measurement(quad, self.pardict["stack"], self.pardict["injection_duration"])
if self.onpi:
current_measurement = self.run_measurement(quad, self.pardict["stack"], self.pardict["injection_duration"])
else: # for testing, generate random data
current_measurement = pd.DataFrame({
'A': [quad[0]], 'B': [quad[1]], 'M': [quad[2]], 'N': [quad[3]], 'R [ohm]': np.abs(np.random.randn(1))
})
# switch mux off
self.switch_mux_off(quad))
self.switch_mux_off(quad)
# save data and print in a text file
self.append_and_save(fname, current_measurement)
......@@ -575,5 +591,6 @@ def find_identical_in_line(self, quads):
# test
#with open('ohmpi_param.json') as json_file:
# pardict = json.load(json_file)
#ohmpi = OhmPi(pardict)
ohmpi = OhmPi()
#ohmpi.measure()
......@@ -7,3 +7,4 @@ adafruit-circuitpython-tca9548a
adafruit-circuitpython-mcp230xx
gpiozero
termcolor
board
settings.py 0 → 100644
# OhmPi configuration
OHMPI_CONFIG = {
'id': '0001', # Unique identifier of the OhmPi board (string)
'R_shunt': 2, # Shunt resistance in Ohms
'Imax': 4800/50/2, # Maximum current
'coef_p2': 2.50, # slope for current conversion for ADS.P2, measurement in V/V
'coef_p3': 2.50, # slope for current conversion for ADS.P3, measurement in V/V
'offset_p2': 0,
'offset_p3': 0,
'integer': 2, # Max value 10 WHAT IS THIS?
'version': 2,
'max_elec': 64,
}
# local messages logging configuration
LOGGING_CONFIG = {
'debug_mode': False,
'logging_to_console': False,
'file_name': 'ohmpi_log',
'max_bytes': 262144,
'backup_count': 30,
'when': 'd',
'interval': 1
}
# local data logging configuration
DATA_LOGGING_CONFIG = {
'file_name': 'data_log',
'max_bytes': 16777216,
'backup_count': 1024,
'when': 'd',
'interval': 1
}
# MQTT configuration parameters
MQTT_CONFIG = {
'mqtt_broker': 'mg3d-dev.umons.ac.be',
'client_id': f'ohmpi_sn_{OHMPI_CONFIG["id"]}',
'control_topic': f'cmd_ohmpi_sn_{OHMPI_CONFIG["id"]}',
'measurements_topic': f'measurements_ohmpi_sn_{OHMPI_CONFIG["id"]}'
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment