automatic_annotation.py 6.40 KiB
# coding = utf-8
import json

import numpy as np
import geopandas as gpd
from scipy.spatial.distance import cdist

from ..models.str import STR
from ..helpers.match_cache import MatchingCache
from ..helpers.relation_cache import AdjacencyRelation, InclusionRelation
from ..utils import JsonProgress


class AnnotationAutomatic(object):
    """

    To facilitate the annotation, this class propose an automatic annotation.
    Author : Jacques Fize
    """

    def __init__(self, dataset, threshold_c3=0.5, inclusion_fn="", adjacency_fn=""):
        self.matching_cache = MatchingCache(dataset)
        self.adj_rel_db = AdjacencyRelation()
        self.inc_rel_db = InclusionRelation()
        self.inclusion, self.adjacency = {}, {}
        if inclusion_fn:
            self.inclusion = json.load(open(inclusion_fn), object_hook=JsonProgress(inclusion_fn))
        if adjacency_fn:
            self.adjacency = json.load(open(adjacency_fn), object_hook=JsonProgress(adjacency_fn))
        self.threshold = threshold_c3

    def all(self, str1, str2, id1=None, id2=None):
        """

        Parameters
        ----------
        str1
        str2
        id1
        id2

        Returns
        -------

        """
        if id1 and id2:
            found, value = self.matching_cache.is_match(int(id1), int(id2))
            if found:
                return list(value)

        crit_ = [self.criterion1(str1, str2),
                 self.criterion2(str1, str2),
                 self.criterion3(str1, str2, id1, id2),
                 self.criterion4(str1, str2, id1, id2),
                 self.criteria5(str1, str2, id1, id2),
                 self.criterion6(str1, str2)]

        self.matching_cache.add(id1, id2, *crit_)
        return crit_

    def criterion1(self, str1, str2):
        """
        Return True if both STR contains similar spatial entities.
        Parameters
        ----------
        str1
        str2

        Returns
        -------

        """
        return int(len(set(str1.graph.nodes.keys()) & set(str2.graph.nodes.keys())) > 0)

    def criterion2(self, str1: STR, str2: STR):
        """
        Return True if two STR contains proper spatial entities that share a proximity.
        Parameters
        ----------
        str1
        str2
        Returns
        -------

        """

        stop_en = set(str1.graph.nodes.keys()) & set(str2.graph.nodes.keys())
        for es in str1.spatial_entities:
            for es2 in str2.spatial_entities:
                if not es in stop_en and not es2 in stop_en and es != es2:
                    if self.inclusion[es][es2]:
                        return 1
                    if self.adjacency[es][es2]:
                        return 1
        return 0

    def criterion3(self, str1: STR, str2: STR, id1=None, id2=None):
        """
        Return True if one or multiple cluster of spatial entities have been found in both STR. Cluster
        are constructed based on low distance between spatial entities. The clustering method used is Mean-Shift as
        implemented in scikit-learn module.
        Parameters
        ----------
        str1
        str2
        id1
        id2

        Returns
        -------

        """
        try:
            c1 = str1.get_cluster(id1)
        except:
            c1 = str1.get_cluster()
        try:
            c2 = str2.get_cluster(id2)
        except:
            c2 = str2.get_cluster()

        if ("geometry" not in c1) or ("geometry" not in c2):
            return 0
        c1["area_"] = c1.area
        c2["area_"] = c2.area
        c1["nb_point"] = c1.nb_point.astype(int)
        c2["nb_point"] = c2.nb_point.astype(int)
        c1 = c1.sort_values(by="nb_point", ascending=False)
        c2 = c2.sort_values(by="nb_point", ascending=False)
        mean = np.mean(c1.nb_point)
        c1 = c1[c1.nb_point >= mean]
        mean2 = np.mean(c2.nb_point)
        c2 = c2[c2.nb_point >= mean2]
        if c2.intersects(c1).any():
            for ind, rows in c2.iterrows():
                for ind2, rows2 in c1.iterrows():
                    inter = gpd.overlay(
                        gpd.GeoDataFrame(geometry=[rows.geometry]),  # c2
                        gpd.GeoDataFrame(geometry=[rows2.geometry]),  # c1
                        how="intersection",
                        use_sindex=False
                    )
                    a1, a2 = rows.geometry.area, rows2.geometry.area
                    if "geometry" in inter:
                        ia = inter.area.sum()
                        if ia / a2 >= self.threshold:
                            return 1
        return 0

    def criterion4(self, str1, str2, id1=None, id2=None):
        """
        Return True if both str share the same clusters. Using the same clustering methods as in criterion3().
        Parameters
        ----------
        str1
        str2
        id1
        id2

        Returns
        -------

        """
        try:
            c1 = str1.get_cluster(id1)
        except:
            c1 = str1.get_cluster()  # Feignasse !!!!
        try:
            c2 = str2.get_cluster(id2)
        except:
            c2 = str2.get_cluster()
        if ("geometry" not in c1) or ("geometry" not in c2):
            return 0
        return int(c1.intersects(c2).all())

    def criteria5(self, str1, str2, id1=None, id2=None):
        """
        Return the average distance between the two set of clusters found
        Parameters
        ----------
        str1
        str2
        id1
        id2

        Returns
        -------

        """
        try:
            c1 = str1.get_cluster(id1)
        except:
            c1 = str1.get_cluster()  # Feignasse !!!!
        try:
            c2 = str2.get_cluster(id2)
        except:
            c2 = str2.get_cluster()

        if ("geometry" not in c1) or ("geometry" not in c2):
            return np.inf

        def get_centroid_array(gdf):
            gdf["centroid_"] = gdf.centroid.apply(lambda x: [x.x, x.y])
            return np.array(gdf.centroid_.values.tolist())

        return np.mean(cdist(get_centroid_array(c1), get_centroid_array(c2), "euclidean").flatten())

    def criterion6(self, str1, str2):
        """
        Return the value of the dice coefficient between two str spatial entities set
        Parameters
        ----------
        str1
        str2

        Returns
        -------

        """
        G = set(str1.graph.nodes.keys())
        H = set(str2.graph.nodes.keys())
        return 2 * (len(G & H)) / (len(G) + len(H))