Commit 95d0b5b6 authored by Fize Jacques's avatar Fize Jacques
Browse files

Add Cache relation extraction using sqlite + Add entry for sqlite db in...

Add Cache relation extraction using sqlite + Add entry for sqlite db in config.json (with files...) + LICENSE
parent 847ed7fb
No related merge requests found
Showing with 344 additions and 0 deletions
+344 -0
MIT License
Copyright (c) 2019 Fize Jacques
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# coding = utf-8
import sqlite3
import os
from ..config.configuration import config
class GeoRelationMatchingDatabase():
def __init__(self, db_filename = config.relation_db_path):
if os.path.exists(db_filename):
try:
self._db_connection = sqlite3.connect(db_filename)
except:
raise ValueError("File is not a sqlite database")
else:
self._db_connection = sqlite3.connect(db_filename)
self.__init_database__()
def __init_database__(self):
"""
Initialize database
"""
cursor = self._db_connection.cursor()
inclusion_schema = """CREATE TABLE inclusion
(idse1 text, idse2 text, value integer)
"""
adjacency_schema = """CREATE TABLE adjacency
(idse1 text, idse2 text, value integer)
"""
matching_schema = """CREATE TABLE matching
(dataset text, g1 integer, g2 integer, c1 integer, c2 integer, c3 integer,c4 integer)
"""
cursor.execute(inclusion_schema)
cursor.execute(adjacency_schema)
cursor.execute(matching_schema)
self._db_connection.commit()
cursor.close()
def add_adjacency(self, idse1: str, idse2: str, value: bool):
"""
Add adjacency relation value
Parameters
----------
idse1 : str
spatial entity id1
idse2 : str
spatial entity id2
value : bool
value of the relation
"""
cursor = self._db_connection.cursor()
cursor.execute('INSERT INTO adjacency VALUES(?,?,?)', (idse1, idse2, int(value)))
self._db_connection.commit()
cursor.close()
def add_inclusion(self, idse1: str, idse2: str, value: bool):
"""
Add inclusion relation in the database
Parameters
----------
idse1 : str
id of the first spatial entity
idse2 : str
id of the second spatial entity
value : bool
value of the relation
"""
cursor = self._db_connection.cursor()
cursor.execute('INSERT INTO inclusion VALUES(?,?,?)', (idse1, idse2, int(value)))
self._db_connection.commit()
cursor.close()
def add_matching(self, dataset: str, G1: int, G2: int, c1: bool, c2: bool, c3: bool, c4: bool):
"""
Add a matching criteria result within the database
Parameters
----------
dataset : str
name of the dataset from where the matching have been done
G1 : int
id of the first STR
G2 : int
id of the second STR
c1 : bool
value of criterion 1
c2 : bool
value of criterion 2
c3 : bool
value of criterion 3
c4 : bool
value of criterion 4
"""
cursor = self._db_connection.cursor()
cursor.execute('INSERT INTO matching VALUES(?,?,?,?,?,?,?)',
(dataset, G1, G2, int(c1), int(c2), int(c3), int(c4)))
self._db_connection.commit()
cursor.close()
def get_spatial_relation(self, idse1: str, idse2: str, table: str):
"""
Return the value of the spatial relation if exist in the table
Parameters
----------
idse1 : str
id of the first spatial entity
idse2 : str
id of the first spatial entity
table : str
name of table that store the relation value desired
Returns
-------
bool,bool
(True if relation found,value of the relation)
"""
cursor = self._db_connection.cursor()
cursor.execute(
"SELECT * from {2} a where a.idse1 LIKE '{0}' and a.idse2 LIKE '{1}'".format(idse1, idse2, table))
result_ = cursor.fetchone()
if not result_ and table != "inclusion":
cursor.execute(
"SELECT * from {2} a where a.idse2 LIKE '{0}' and a.idse1 LIKE '{1}'".format(idse1, idse2, table))
result_ = cursor.fetchone()
cursor.close()
if result_:
return True, bool(result_[-1])
return False, False
def get_adjacency(self, idse1: str, idse2: str):
"""
Parameters
----------
idse1 : str
id of the first spatial entity
idse2 : str
id of the first spatial entity
Returns
-------
"""
return self.get_spatial_relation(idse1, idse2, "adjacency")
def get_inclusion(self, idse1: str, idse2: str):
"""
Parameters
----------
idse1 : str
id of the first spatial entity
idse2 : str
id of the first spatial entity
Returns
-------
"""
return self.get_spatial_relation(idse1, idse2, "inclusion")
def get_matching(self, G1: int, G2: int, dataset: str):
cursor = self._db_connection.cursor()
cursor.execute("SELECT * from {2} a where a.dataset LIKE '{3}' AND a.g1 = {0} and a.g2 = {1}".format(G1, G2, "matching",dataset))
result_ = cursor.fetchone()
if not result_:
cursor.execute("SELECT * from {2} a where a.dataset LIKE '{3}' AND a.g2 = {0} and a.g1 = {1} ".format(G1, G2, "matching",dataset))
result_ = cursor.fetchone()
cursor.close()
if result_:
return True, tuple(map(bool, result_[-4:]))
return False, False
if __name__ == "__main__":
if os.path.exists("test2.db"):
os.remove("test2.db")
g = GeoRelationMatchingDatabase("test2.db")
g.add_adjacency("GD1", "GD2", True)
assert g.get_adjacency("GD1", "GD2") == (True, True)
assert g.get_adjacency("GD2", "GD1") == (True, True)
g.add_inclusion("GD1", "GD2", True)
assert g.get_inclusion("GD1", "GD2") == (True, True)
assert g.get_inclusion("GD2", "GD1") == (False, False)
g.add_matching("test", 1, 2, True, True, False, True)
g.add_matching("test2", 1, 2, True, False, False, True)
assert g.get_matching(1, 2, "test") == (True, (True, True, False, True))
assert g.get_matching(1, 2, "test2") != (True, (True, True, False, True))
print("Passed the tests !")
# coding = utf-8
from shapely.geometry import Point
from strpython.helpers.collision import collide
from .geo_relation_database import GeoRelationMatchingDatabase
from ..helpers.geodict_helpers import gazetteer
class RelationExtractor():
__cache_entity_data = {}
def __init__(self, geo_rel_match_database=GeoRelationMatchingDatabase()):
self.db_rel_match = geo_rel_match_database
def is_relation(self, id_se1: str, id_se2: str):
raise NotImplementedError()
def get_data(self, id_se):
"""
Return an gazpy.Element object containing information about a spatial entity.
Parameters
----------
id_se : str
Identifier of the spatial entity
Returns
-------
gazpy.Element
data
"""
if id_se in RelationExtractor.__cache_entity_data:
return RelationExtractor.__cache_entity_data[id_se]
data = gazetteer.get_by_id(id_se)
if len(data) > 0:
RelationExtractor.__cache_entity_data[id_se] = data[0]
return data[0]
class AdjacencyRelation(RelationExtractor):
def __init__(self):
RelationExtractor.__init__(self)
def is_relation(self, id_se1: str, id_se2: str):
found_, value = self.db_rel_match.get_adjacency(id_se1, id_se2)
if found_:
return value
stop_class = {"A-PCLI", "A-ADM1"}
def get_p47_adjacency_data(data):
p47se1 = []
for el in data.other.P47:
d = gazetteer.get_by_other_id(el, "wikidata")
if not d: continue
p47se1.append(d[0].id)
return p47se1
inc = InclusionRelation()
if inc.is_relation(id_se1, id_se2) or inc.is_relation(id_se2, id_se1):
self.db_rel_match.add_adjacency(id_se1, id_se2,False)
return False
data_se1, data_se2 = self.get_data(id_se1), self.get_data(id_se2)
if "P47" in data_se2.other and id_se1 in get_p47_adjacency_data(data_se2):
self.db_rel_match.add_adjacency(id_se1, id_se2,False)
return True
elif "P47" in data_se1.other and id_se2 in get_p47_adjacency_data(data_se1):
self.db_rel_match.add_adjacency(id_se1, id_se2,True)
return True
if collide(id_se1, id_se2):
self.db_rel_match.add_adjacency(id_se1, id_se2,True)
return True
if data_se1 and data_se2 and "coord" in data_se1 and "coord" in data_se2:
if Point(data_se1.coord.lon, data_se1.coord.lat).distance(
Point(data_se2.coord.lon, data_se2.coord.lat)) < 1 and len(
set(data_se1.class_) & stop_class) < 1 and len(set(data_se2.class_) & stop_class) < 1:
self.db_rel_match.add_adjacency(id_se1, id_se2,True)
return True
self.db_rel_match.add_adjacency(id_se1, id_se2,False)
return False
class InclusionRelation(RelationExtractor):
def __init__(self):
RelationExtractor.__init__(self)
def is_relation(self, id_se1: str, id_se2: str):
found_, value = self.db_rel_match.get_inclusion(id_se1, id_se2)
if found_:
return value
inc_chain_P131, inc_chain_P706 = self.get_inclusion_chain(id_se1, "P131"), self.get_inclusion_chain(id_se1,"P706")
inc_chain = inc_chain_P131
inc_chain.extend(inc_chain_P706)
inc_chain = set(inc_chain)
if id_se2 in inc_chain:
self.db_rel_match.add_inclusion(id_se1, id_se2, True)
return True
self.db_rel_match.add_inclusion(id_se1, id_se2, False)
return False
def get_inclusion_chain(self, id_, prop):
"""
For an entity return it geographical inclusion tree using a property.
"""
arr__ = []
current_entity = gazetteer.get_by_id(id_)[0]
if "inc_" + prop in current_entity.other:
arr__ = current_entity.other["inc_" + prop]
elif "inc_geoname" in current_entity.other:
arr__ = current_entity.other.inc_geoname
if isinstance(arr__, str):
arr__ = [arr__]
return arr__
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment