RestoreFusion.py 5.82 KiB
import sys
import os
import tensorflow as tf
import numpy as np
from sklearn.metrics import accuracy_score, cohen_kappa_score, f1_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.ensemble import RandomForestClassifier


#Format values matrix from (n,22*13) shape to (n,22,13)
def getRNNFormat(X, n_timestamps):

    #print X.shape
    new_X = []
    for row in X:
        new_X.append( np.split(row, n_timestamps) )
    return np.array(new_X)

#Format labels matrix from (n,1) shape to (n,3)
def getRNNFormatLabel(Y):

    vals = np.unique(np.array(Y))
    sorted(vals)
    hash_val = {}

    for el in vals:
        hash_val[el] = len(hash_val.keys())

    new_Y = []

    for el in Y:
        t = np.zeros(len(vals))
        t[hash_val[el]] = 1.0
        new_Y.append(t)

    return np.array(new_Y)

#Get i-th batches of values set X and labels set Y
def getBatch(X, Y, i, batchsz):
    start_id = i*batchsz
    end_id = min( (i+1) * batchsz, X.shape[0])

    batch_x = X[start_id:end_id]
    batch_y = Y[start_id:end_id]

    return batch_x, batch_y

def getLabelFormat(Y):
	vals = np.unique(np.array(Y))
	sorted(vals)
	hash_val = {}
	for el in vals:
		hash_val[el] = len(hash_val.keys())
	new_Y = []
	for el in Y:
		t = np.zeros(len(vals))
		t[hash_val[el]] = 1.0
		new_Y.append(t)
	return np.array(new_Y)


#MAIN
#Model parameters
nunits = 1024
batchsz = 64
hm_epochs = 400
n_levels_lstm = 1
#dropout = 0.2

#Data INformation
n_timestamps = 34
n_dims = 16
patch_window = 25
n_channels = 5
p_split=30

itr = int( sys.argv[1] )
choice = sys.argv[2]
path_in_vhsr = sys.argv[3]
path_in_gt = sys.argv[4]
path_in_ts = sys.argv[5]

#load ds
test_x_vhs = np.load( path_in_vhsr+"test_x%d_%d.npy"%( itr, p_split ) )
test_x_ts = np.load( path_in_ts+"test_x%d_%d.npy"%( itr, p_split ) )
test_y = np.load( path_in_gt+"test_y%d_%d.npy"%( itr, p_split ) )

#format data
test_x_ts =  getRNNFormat( test_x_ts, n_timestamps)
test_y = getLabelFormat( test_y )

height = test_x_vhs.shape[1]
width = test_x_vhs.shape[2]
if height!=width:
    test_x_vhs = np.swapaxes(test_x_vhs, 1, 3)
    tag_band = '4bands'
else:
    tag_band = 'NDVI'

height = test_x_vhs.shape[1]
width = height
band = test_x_vhs.shape[3]

#declare paths
net_path = "./dataset/%s_%d_%d_%s/"%(choice, p_split, batchsz, tag_band)
path_in_dir = net_path+'/modelTT%d/'%itr #model path
path_in_model = path_in_dir+'model-%d.meta'%itr #model file


#scores files
ftest_acc = open(net_path+"test_accuracy.txt", "a")
ftest_fscore = open(net_path+"test_fscore.txt","a")
ftest_precisions = open(net_path+"test_prec_classes.txt","a")
ftest_recall = open(net_path+"test_rec_classes.txt","a")
ftest_scores = open(net_path+"test_fscore_classes.txt","a")

tf.reset_default_graph()

with tf.Session() as sess:

    model_saver = tf.train.import_meta_graph(path_in_model)
    model_saver.restore(sess, tf.train.latest_checkpoint(path_in_dir))
    print "Model Restored"

    writer = tf.summary.FileWriter( path_in_dir+'histogram_example', sess.graph )
    print "Graph Wrote"
    writer.close()

    graph = tf.get_default_graph()
    #for n in tf.global_variables():
    #    print n
    graph_var = open(net_path+"graph.txt",'a')
    for op in graph.get_operations():
        #print op.values()
        graph_var.write(str(op.values())+"\n")
    graph_var.close()


    x_rnn = graph.get_tensor_by_name("x_rnn:0")
    x_cnn = graph.get_tensor_by_name("x_cnn:0")
    y = graph.get_tensor_by_name("y:0")

    '''
    if 'AttentionII' in choice:
        p = graph.get_tensor_by_name("dense_37/BiasAdd:0")
        print "ATTENTION II"
    else:
        p = graph.get_tensor_by_name("dense_3/BiasAdd:0")
        print "ATTENTION I"
    '''
    p = graph.get_tensor_by_name("prediction_full:0")

    learning_rate = graph.get_tensor_by_name("learning_rate:0")
    is_training_ph = graph.get_tensor_by_name("is_training:0")
    dropout = graph.get_tensor_by_name("drop_rate_rnn:0")


    #EVAL TEST
    testAcc = 0
    testFscore = 0
    testFArray = np.array([])
    pred_test = []
    feat_test = []

    iter_test = len( test_x_ts )/batchsz

    if len( test_y )%batchsz != 0:
        iter_test+=1

    cl = np.zeros( len( test_y ) )

    for ibatch in range( iter_test ):

        ts_batch_x, _ = getBatch(test_x_ts, cl, ibatch, batchsz)
        vhs_batch_x, batch_y = getBatch(test_x_vhs, test_y, ibatch, batchsz)

        pred_temp = sess.run( p , feed_dict = { x_rnn:ts_batch_x,
                                                x_cnn:vhs_batch_x,
                                                y:batch_y,
                                                is_training_ph:False,
                                                dropout:1,
                                                learning_rate:0.0002} )

        for el in pred_temp:
            pred_test.append( np.argmax(el) )

    #grounf_truth test
    gt_test = []
    for el in test_y:
        gt_test.append( np.argmax( el ) )

    pred_test = pred_test[ 0:len( gt_test )]

    testAcc = accuracy_score( gt_test, pred_test )
    testFscore = f1_score( gt_test, pred_test, average='weighted' )
    var_prec, var_rec, var_fsc, _ = precision_recall_fscore_support( gt_test, pred_test )

    print "Test Accuracy: %f\nTest Fscore:%f \nTest Fxclassses: %s\nTest Pxclasses:%s\nTest Rxclasses:%s\n" %( testAcc, testFscore, var_fsc, var_prec, var_rec )
    ftest_acc.write("%f\n"%testAcc)
    ftest_fscore.write("%f\n"%testFscore)
    for el in var_rec:
        ftest_recall.write("%f\t"%el)
    for el1 in var_prec:
        ftest_precisions.write("%f\t"%el1)
    for el2 in var_fsc:
        ftest_scores.write("%f\t"%el2)
    ftest_recall.write("\n")
    ftest_precisions.write("\n")
    ftest_scores.write("\n")

    #ftest.write("%f\n%f\n%s\n%s\n%s\n"%( testAcc, testFscore, var_fsc, var_prec, var_rec ))
    ftest_acc.close()
    ftest_fscore.close()
    ftest_recall.close()
    ftest_precisions.close()
    ftest_fscore.close()