automatic_annotation.py 4.94 KiB
# coding = utf-8
import json
import os

from strpython.models.str import STR
import networkx as nx
import numpy as np
import geopandas as gpd
from shapely.geometry import MultiPoint,Polygon,Point,LineString


def jsonKeys2int(x):
    if isinstance(x, dict):
            return {int(k):jsonKeys2int(v) for k,v in x.items() }
    return x

__cache__crit={}

if os.path.exists("cache.json"):
    try:
        __cache__crit=json.load(open("cache.json"))
        __cache__crit=jsonKeys2int(__cache__crit)
    except Exception as e:
        print(e)

def save_cache():
    global __cache__crit
    open("cache.json", 'w').write(json.dumps(__cache__crit))

def get_from_cache(id1,id2):
    global __cache__crit
    # try:
    if id1 in __cache__crit:
        if id2 in __cache__crit[id1]:
            return __cache__crit[id1][id2]
    elif id2 in __cache__crit:
        if id1 in __cache__crit[id2]:
            return __cache__crit[id2][id1]
    return None

def add_cache(id1,id2,data):
    global __cache__crit
    if not id1 in __cache__crit:
        __cache__crit[id1] = {}
    __cache__crit[id1][id2] = data

class AnnotationAutomatic(object):
    """

    To facilitate the annotation, this class propose an automatic annotation.
    Author : Jacques Fize
    """
    def __init__(self):
        pass

    def all(self,str1,str2,id1=None,id2=None):
        cache_data=get_from_cache(id1,id2)
        if not cache_data:
            crit_ = [self.criterion1(str1, str2), self.criterion2(str1, str2),self.criterion3(str1, str2, id1, id2),self.criterion4(str1, str2, id1, id2)]
            add_cache(id1,id2,crit_)
            return crit_
        return cache_data

    def criterion1(self,str1,str2):
        """
        Return True if both STR contains similar spatial entities.
        :param str1: STR
        :param str2: STR
        :return:
        """
        return int(len(set(str1.graph.nodes.keys()) & set(str2.graph.nodes.keys())) > 0)

    def criterion2(self,str1 : STR,str2 : STR):
        """
        Return True if two STR contains proper spatial entities that share a proximity.
        :param str1: STR
        :param str2: STR
        :return:
        """
        stop_en=set(str1.graph.nodes.keys()) & set(str2.graph.nodes.keys())
        for es in str1.spatial_entities:
            for es2 in str2.spatial_entities:
                if not es in stop_en and not es2 in stop_en:
                    if str1.is_included_in(es,es2):
                        return 1
                    if str1.is_adjacent(es,es2):
                        return 1
        return 0

    def criterion3(self, str1 :STR , str2: STR,id1=None,id2=None,th=0.3):
        """
        Return True if one or multiple cluster of spatial entities have been found in both STR. Cluster
        are constructed based on low distance between spatial entities. The clustering method used is Mean-Shift as
        implemented in scikit-learn module.
        :param str1:
        :param str2:
        :return:
        """

        try:
            c1=str1.get_cluster(id1)
        except:
            c1 = str1.get_cluster() ## Feignasse !!!!
        try:
            c2=str2.get_cluster(id2)
        except:
            c2 = str2.get_cluster()

        if not "geometry" in c1 or (not "geometry" in c2):
            return 0
        c1["area"] = c1.area
        c2["area"] = c2.area
        c1=c1.sort_values(by="area",ascending=False)
        c2=c2.sort_values(by="area",ascending=False)
        mean=np.mean(c1.area)
        for ind,rows in c1.iterrows():
            if rows.area <mean:
                break
            for ind2,rows2 in c2.iterrows():
                if rows.geometry.intersects(rows2.geometry):
                    return 1
                    #print(gpd.GeoDataFrame(geometry=[rows.geometry]))
                    # inter = gpd.overlay(
                    #     gpd.GeoDataFrame(geometry=[rows.geometry]),
                    #     gpd.GeoDataFrame(geometry=[rows2.geometry]),
                    #     how="intersection",
                    #     use_sindex=False
                    # )
                    # a1,a2=c1.area.sum(),c2.area.sum()
                    # if "geometry" in inter:
                    #     ia=inter.area.sum()
                    #     if a1 < a2 and ia/a1 >= th:
                    #         return 1
                    #     elif a1 > a2 and ia/a2 >= th:
                    #         return 1

        return 0



    def criterion4(self, str1, str2,id1=None,id2=None,):
        """
        Return True if both str share the same clusters. Using the same clustering methods as in criterion3().
        :param str1:
        :param str2:
        :return:
        """
        try:
            c1=str1.get_cluster(id1)
        except:
            c1 = str1.get_cluster() ## Feignasse !!!!
        try:
            c2=str2.get_cluster(id2)
        except:
            c2 = str2.get_cluster()
        if not "geometry" in c1 or (not "geometry" in c2):
            return 0
        return int(c1.intersects(c2).all())