Commit d6f810a2 authored by GBODJO Yawogan Jean Eudes's avatar GBODJO Yawogan Jean Eudes
Browse files

Initial commit

parents
import sys
import os
import argparse
from pathlib import Path
import numpy as np
from utils import format_y
from train import run
from test import restore
from models.tempcnn import TempCNN_Model
from models.hob2srnn import HOb2sRNN_Model
def boolean_string(s):
if s not in {'False', 'True'}:
raise ValueError('Not a valid boolean string')
return s == 'True'
if __name__ == '__main__':
# Parsing arguments
if len(sys.argv) == 1:
print ('Usage: python '+sys.argv[0]+' train_X train_y valid_X valid_y test_X test_y [options]' )
print ('Help: python '+sys.argv[0]+' -h/--help')
sys.exit(1)
parser = argparse.ArgumentParser()
parser.add_argument('data_path',help='Path to data',type=str)
parser.add_argument('gt_path',help='Path to label',type=str)
parser.add_argument('num_split',help='Number of split to use',type=str)
parser.add_argument('-m','--model',help='Which model to execute',choices=['tempcnn','hob2srnn'],default='tempcnn',type=str)
parser.add_argument('-out','--out_path',help='Output path for model and results',type=str)
parser.add_argument('-bs','--batch_size',dest='batch_size',help='Batch size',default=32,type=int)
parser.add_argument('-ep','--num_epochs',dest='num_epochs',help='Number of training epochs',default=1000,type=int)
parser.add_argument('-lr','--learning_rate',dest='learning_rate',help='Learning rate',default=1e-4,type=float)
parser.add_argument('-tqdm',dest='tqdm',help='Display tqdm progress bar',default=False,type=boolean_string)
args = parser.parse_args()
# Get argument values
data_path = args.data_path
gt_path = args.gt_path
n_split = args.num_split
model_to_train = args.model
if not args.out_path is None :
out_path = args.out_path
else:
out_path = f'model_{model_to_train}'
batch_size = args.batch_size
n_epochs = args.num_epochs
lr = args.learning_rate
tqdm_display = args.tqdm
# Create output path if does not exist
Path(out_path).mkdir(parents=True, exist_ok=True)
# Load Training and Validation set
train_y = format_y(os.path.join(gt_path,f'train_gt_{n_split}.npy') )
print ('Training GT:',train_y.shape)
valid_y = format_y(os.path.join(gt_path,f'valid_gt_{n_split}.npy') )
print ('Validation GT:', valid_y.shape)
n_classes = len(np.unique(train_y))
print ('Number of classes:',n_classes)
train_X = np.load(os.path.join(data_path,f'train_S2_{n_split}.npy') )
print ('Training X:',train_X.shape)
valid_X = np.load(os.path.join(data_path,f'valid_S2_{n_split}.npy') )
print ('Validation X:',valid_X.shape)
# Create the object model
if model_to_train == 'tempcnn':
model = TempCNN_Model(n_classes)
elif model_to_train == 'hob2srnn':
model = HOb2sRNN_Model(n_classes)
# Learning stage
checkpoint_path = os.path.join(out_path,f'model_{n_split}')
run (model,train_X,train_y,valid_X,valid_y,checkpoint_path,batch_size,
lr,n_epochs,tqdm_display)
# Load Test set
test_y = format_y(os.path.join(gt_path,f'test_gt_{n_split}.npy'), encode=False )
print ('Test GT:',test_y.shape)
test_X = np.load(os.path.join(data_path,f'test_S2_{n_split}.npy') )
print ('Test X:',test_X.shape)
# Inference stage
result_path = os.path.join(out_path,f'pred_{n_split}.npy')
restore (model,test_X,test_y,batch_size,checkpoint_path,result_path,tqdm_display)
\ No newline at end of file
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Activation, Dropout, BatchNormalization, RNN
tf.keras.backend.set_floatx('float32')
class FCGRU (tf.keras.layers.SimpleRNNCell):
'''
Gated Recurrent Unit cell (http://arxiv.org/abs/1406.1078)
enriched with Fully Connected layers
'''
def __init__(self,units,fc_units,drop=0):
super(FCGRU, self).__init__(units)
self.fc_units = fc_units
self.dense1 = Dense(fc_units,activation='tanh')
self.drop1 = Dropout(rate=drop)
self.dense2 = Dense(fc_units*2,activation='tanh')
self.drop2 = Dropout(rate=drop)
self.drop3 = Dropout(rate=drop)
def build(self,input_shapes):
self.b_g1 = self.add_weight(name='b_g1', shape=(self.units,))
self.b_g2 = self.add_weight(name='b_g2', shape=(self.units,))
self.b_g3 = self.add_weight(name='b_g3', shape=(self.units,))
self.weights_g1 = self.add_weight(name='weights_g1', shape=(self.fc_units*2, self.units))
self.weights_g2 = self.add_weight(name='weights_g2', shape=(self.fc_units*2, self.units))
self.weights_g3 = self.add_weight(name='weights_g3', shape=(self.fc_units*2, self.units))
self.weights_g1h = self.add_weight(name='weights_g1_h', shape=(self.units, self.units))
self.weights_g2h = self.add_weight(name='weights_g2_h', shape=(self.units, self.units))
self.weights_g3h = self.add_weight(name='weights_g3_h', shape=(self.units, self.units))
def call(self, inputs, state, training):
# FC Layers
fc1 = self.dense1(inputs)
fc1 = self.drop1(fc1,training)
fc2 = self.dense2(fc1)
fc2 = self.drop2(fc2,training)
# Update Gate
zt = tf.math.sigmoid( tf.matmul(fc2, self.weights_g1) + tf.matmul(state[0], self.weights_g1h) + self.b_g1)
# Reset Gate
rt = tf.math.sigmoid( tf.matmul(fc2, self.weights_g2) + tf.matmul(state[0], self.weights_g2h) + self.b_g2)
# Memory content
ht_c = self.activation( tf.matmul(fc2, self.weights_g3) + tf.matmul(rt * state[0], self.weights_g3h) + self.b_g3)
# New hidden state
ht = (1-zt) * state[0] + zt * ht_c
ht = self.drop3(ht,training)
return ht, [ht]
class Attention(Layer):
'''
Attention Mechanism
'''
def __init__(self, units, score_function='tanh'):
super(Attention, self).__init__()
self.W1 = tf.keras.layers.Dense(units)
self.V = tf.keras.layers.Dense(1)
self.score_function = score_function
def call(self, query):
score = self.V( tf.math.tanh( self.W1(query) ))
if self.score_function == 'softmax':
attention_weights = tf.nn.softmax(score,1)
elif self.score_function == 'tanh':
attention_weights = tf.math.tanh(score)
context_vector = attention_weights * query
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
class RNN_Branch (Layer):
'''
RNN Branch with FCGRU and attention mechanism
'''
def __init__(self, drop, units, fc_units):
super(RNN_Branch, self).__init__()
fcgru = FCGRU(units,fc_units,drop)
self.rnn_cell = RNN(fcgru,return_sequences=True)
self.attention = Attention(units)
def call(self,inputs,is_training):
H = self.rnn_cell(inputs,training=is_training)
feat, scores = self.attention(H)
return feat, scores
class FC(Layer):
'''
Dense layer with batch normalization
'''
def __init__(self,num_units,drop,act='relu'):
super(FC,self).__init__()
self.dense = Dense(num_units)
self.bn = BatchNormalization()
self.act = Activation(act)
self.drop_layer = Dropout(rate=drop)
def call(self,inputs,is_training):
return self.drop_layer(self.act(self.bn( self.dense(inputs) ) ), training=is_training)
class HOb2sRNN_Model (tf.keras.Model):
'''
HOb2sRNN model without hierarchy
'''
def __init__(self, n_classes, drop=0.4, units=256, fc_units=64, classif_units=512):
super(HOb2sRNN_Model, self).__init__(name='HOb2sRNN')
self.branch = RNN_Branch(drop,units,fc_units)
self.fc1 = FC (classif_units,drop)
self.fc2 = FC (classif_units,drop)
self.output_layer = Dense (n_classes,activation='softmax')
def call (self, inputs, is_training):
feat, _ = self.branch(inputs,is_training)
feat = self.fc2(self.fc1(feat,is_training),is_training)
return self.output_layer(feat)
\ No newline at end of file
import tensorflow as tf
from tensorflow.keras.layers import Layer, Activation, Dense, Dropout, BatchNormalization, Conv1D, Flatten
tf.keras.backend.set_floatx('float32')
class ConvBlock(Layer):
'''
1D Convolution block with batch normalization and dropout layer
'''
def __init__(self, n_filters, k_size, drop, padding_mode='valid'):
super(ConvBlock, self).__init__()
self.conv = Conv1D(filters=n_filters, kernel_size=k_size, padding=padding_mode,
kernel_regularizer=tf.keras.regularizers.l2(l=1E-6))
self.bn = BatchNormalization()
self.activation = Activation('relu')
self.drop_layer = Dropout(rate=drop)
def call (self,inputs,is_training):
conv = self.conv(inputs)
conv = self.bn(conv)
conv = self.activation(conv)
conv = self.drop_layer(conv,training=is_training)
return conv
class TempCNN_Encoder (Layer):
'''
TempCNN encoder from (Pelletier et al, 2019)
https://www.mdpi.com/2072-4292/11/5/523
'''
def __init__(self, n_filters, drop):
super(TempCNN_Encoder, self).__init__()
self.block1 = ConvBlock(n_filters,5,drop)
self.block2 = ConvBlock(n_filters,5,drop)
self.block3 = ConvBlock(n_filters,5,drop)
self.flatten = Flatten()
def call(self,inputs, is_training):
b1 = self.block1(inputs,is_training)
b2 = self.block2(b1,is_training)
b3 = self.block3(b2,is_training)
flatten = self.flatten(b3)
return flatten
class TempCNN_Model (tf.keras.Model):
'''
TempCNN model with output layer
'''
def __init__(self, n_classes, drop=0.5, n_filters=64, n_units=256):
super(TempCNN_Model, self).__init__(name='TempCNN')
self.branch = TempCNN_Encoder(n_filters,drop)
self.dense = Dense(n_units,kernel_regularizer=tf.keras.regularizers.l2(l=1E-6))
self.bn = BatchNormalization()
self.activation = Activation('relu')
self.drop_layer = Dropout(rate=drop)
self.output_layer = Dense (n_classes,activation='softmax',
kernel_regularizer=tf.keras.regularizers.l2(l=1E-6))
def call (self, inputs, is_training):
feat = self.branch(inputs,is_training)
feat = self.drop_layer(self.activation(self.bn(self.dense(feat))),is_training)
return self.output_layer(feat)
\ No newline at end of file
#!/bin/bash
export CUDA_VISIBLE_DEVICES=3
gt_path='../Knowledge_Distillation/data_v2/gt'
s2_path='../Knowledge_Distillation/data_v2/S2'
python main.py $s2_path $gt_path 1 -tqdm True -ep 2 -bs 256 -m hob2srnn
\ No newline at end of file
import time
import numpy as np
import tensorflow as tf
from utils import get_batch, get_iteration, transform_y
from sklearn.metrics import accuracy_score,f1_score,cohen_kappa_score
from tqdm import tqdm
def predict_by_batch (model,test_X,test_y,batch_size,tqdm_display):
'''
Predict batch of test set
'''
pred=[]
iteration = get_iteration(test_y,batch_size)
if not tqdm_display:
print (f'Test batchs: {iteration}')
start = time.time()
for batch in tqdm(range(iteration),disable=not(tqdm_display)):
batch_X = get_batch (test_X,batch,batch_size)
batch_pred = model(batch_X,is_training=False)
pred.append(tf.argmax(batch_pred,axis=1))
del batch_X,batch_pred
stop = time.time()
elapsed = stop - start
pred = np.hstack(pred)
return pred, elapsed
def restore (model,test_X,test_y,batch_size,checkpoint_path,result_path,tqdm_display):
'''
Load weights for best configuration and evaluate on test set
'''
model.load_weights(checkpoint_path)
print ('Weights loaded')
pred, elapsed = predict_by_batch (model,test_X,test_y,batch_size,tqdm_display)
if not tqdm_display:
print (f'Test Time: {elapsed}')
pred = transform_y (test_y,pred)
np.save (result_path,pred)
print ('Acc:',accuracy_score(test_y,pred))
print ('F1:',f1_score(test_y,pred,average='weighted'))
print ('Kappa:',cohen_kappa_score(test_y,pred))
\ No newline at end of file
import time
import numpy as np
import tensorflow as tf
from utils import get_batch, get_iteration
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score,f1_score,cohen_kappa_score
from tqdm import tqdm
def train_info (model,checkpoint_path,epoch,train_loss,train_acc,valid_loss,valid_acc,elapsed,best_acc,valid_y,pred):
'''
Output of training step
Save model if accuracy improves
'''
print (f'Epoch {epoch+1}, Loss: {train_loss.result()}, Acc: {train_acc.result()}, Valid Loss: {valid_loss.result()}, Valid Acc: {valid_acc.result()}, Time: {elapsed}')
if valid_acc.result() > best_acc :
print ( f1_score (valid_y,pred,average=None) )
model.save_weights(checkpoint_path)
print (f'{valid_acc.name} improved from {best_acc} to {valid_acc.result()}, saving to {checkpoint_path}')
best_acc = valid_acc.result()
# Reset metrics for the next epoch
train_loss.reset_states()
train_acc.reset_states()
valid_loss.reset_states()
valid_acc.reset_states()
return best_acc
@tf.function
def train_step (model, x, y, loss_function, optimizer, loss, metric, is_training):
'''
Gradient differentiation
'''
with tf.GradientTape() as tape:
pred = model(x, is_training)
cost = loss_function(y,pred)
if is_training :
gradients = tape.gradient(cost, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
loss(cost)
metric(y, tf.math.argmax(pred,axis=1))
return tf.math.argmax(pred,axis=1)
def run (model,train_X,train_y,valid_X,valid_y,checkpoint_path,
batch_size,lr,n_epochs,tqdm_display) :
'''
Main function for training models
'''
loss_function = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate = lr)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_acc = tf.keras.metrics.Accuracy(name='train_acc')
valid_loss = tf.keras.metrics.Mean(name='valid_loss')
valid_acc = tf.keras.metrics.Accuracy(name='valid_acc')
best_acc = float("-inf")
train_iter = get_iteration (train_y,batch_size)
valid_iter = get_iteration (valid_y,batch_size)
if not tqdm_display:
print (f'Training batchs: {train_iter}')
print (f'Validation batchs: {valid_iter}')
for epoch in range(n_epochs):
start = time.time()
train_X, train_y = shuffle(train_X, train_y, random_state=0)
for batch in tqdm(range(train_iter),disable=not(tqdm_display)):
batch_X = get_batch (train_X,batch,batch_size)
batch_y = get_batch (train_y,batch,batch_size)
train_step(model,batch_X,batch_y,loss_function,optimizer,train_loss,train_acc,is_training=True)
del batch_X,batch_y
pred = []
for batch in tqdm(range(valid_iter),disable=not(tqdm_display)):
batch_X = get_batch (valid_X,batch,batch_size)
batch_y = get_batch (valid_y,batch,batch_size)
batch_pred = train_step(model,batch_X,batch_y,loss_function,optimizer,valid_loss,valid_acc,is_training=False)
pred.append(batch_pred)
del batch_X,batch_y,batch_pred
pred = np.hstack(pred)
stop = time.time()
elapsed = stop - start
best_acc = train_info (model,checkpoint_path,epoch,train_loss,train_acc,valid_loss,valid_acc,elapsed,best_acc,valid_y,pred)
print (model.summary())
\ No newline at end of file
import numpy as np
from sklearn.preprocessing import LabelEncoder
def format_y (array_path,encode=True):
'''
Format ground truth data
Encode label (second column) with values between 0 and n_classes-1.
output shape: (number of samples,)
'''
array = np.load(array_path)[:,1]
if encode :
encoder = LabelEncoder()
array = encoder.fit_transform( array )
return array
def transform_y (y,prediction):
'''
Transform labels back to original encoding
output shape: (number of samples,)
'''
encoder = LabelEncoder()
encoder.fit(y)
return encoder.inverse_transform(prediction)
def get_iteration (array, batch_size):
'''
Function to get the number of iterations over one epoch w.r.t batch size
'''
n_batch = int(array.shape[0]/batch_size)
if array.shape[0] % batch_size != 0:
n_batch+=1
return n_batch
def get_batch (array, i, batch_size):
'''
Function to select batch of training/validation/test set
'''
start_id = i*batch_size
end_id = min((i+1) * batch_size, array.shape[0])
batch = array[start_id:end_id]
return batch
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment