tf-idf-es.py 9.66 KB
Newer Older
1
2
3
4
5
6
7
"""
@brief: Download metric from Elasticsearch to easily compute TF-IDF matric
@author: R.Decoupes
@copyright: CeCILL-B

Download metric from Elasticsearch to easily compute TF-IDF matric
"""
8
9
import hashlib

10
11
12
from elasticsearch import Elasticsearch
from jinja2 import FileSystemLoader, Environment
import os
13
14
15
import eland as ed
import requests
import pandas as pd
16
17
from tqdm import tqdm
import json
18
from sklearn.feature_extraction.text import TfidfVectorizer
19
from transformers import pipeline
20
21
22
23
24
25
26
27

def elastic_pagination_scrolling(result, headers):
    """
    Elasticsearch limit results of query at 10 000. To avoid this limit, we need to paginate results and scroll
    This method append all pages form scroll search
    :param result: a result of a ElasticSearcg query
    :return:
    """
28
    scroll_size_total = result['hits']['total']["value"]
29
    # Progress bar
30
31
32
33
34
    pbar = tqdm(total=scroll_size_total)
    results = []
    results += result['hits']['hits'] # we add the first pages of results before scrolling
    scroll_size = len(results)
    pbar.update(scroll_size)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    while (scroll_size > 0):
        try:
            scroll_id = result['_scroll_id']
            # res = client.scroll(scroll_id=scroll_id, scroll='60s')
            query = {
                "scroll": "1m",
                "scroll_id": scroll_id
            }
            query_json = json.dumps(query)
            res = requests.get(es_url + "_search/scroll",
                               data=query_json,
                               headers=headers,
                               ).json()
            results += res['hits']['hits']
            scroll_size = len(res['hits']['hits'])
            pbar.update(scroll_size)
        except:
            pbar.close()
            break
    pbar.close()
    return results

57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def elasticquery(es_url, index_es, query):
    """
    Query elastic with selected fields
    :param es_url:
    :param index_es:
    :param query:
    :return: a dataframe
    """
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search?scroll=1m&size=1000",  # context for scrolling up to 1 minute
                         data=query,
                         headers=headers,
                         )
    except Exception as e:
72
        print("Can not query: "+str(query))
73
74
75
76
77
78
79
80
81
82
83
84
85
    results = elastic_pagination_scrolling(r.json(), headers)
    df_results = pd.DataFrame(results)
    """ Formating elasticsearch respons

    1. We extract nested fieds from elasticsearch: json_normalize the nested column
    2. From cell value, extract the first element. Indeed: When we select specific fields in a elastic query, 
    the response always send value as list (even if there is one value).
    """
    df_results = pd.json_normalize(df_results['fields'])
    # for every cell we retrieve the first value from list. Sometimes there is NaN value when there is empty value.
    df_results = df_results.applymap(lambda x: x[0] if isinstance(x, list) else '')
    return df_results

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
def tf_idf(list_of_docs, lang='english', nb_top_score=1000):
    try:
        vectorizer = TfidfVectorizer(
            stop_words=lang,
            max_features=250000,
            token_pattern="[A-zÀ-ÿ0-9#@]+"
        )
        vectors = vectorizer.fit_transform(list_of_docs)
    except:
        print("tf-idf failled")
    listOfTerms = vectorizer.get_feature_names_out()
    countOfTerms = vectors.todense().tolist()
    df = pd.DataFrame(
        countOfTerms,
        columns=listOfTerms
    )
    # sort value by TF-IDF score
103
104
105
106
    tf_idf_top_score = pd.DataFrame()
    tf_idf_top_score["index_of_tweet"] = df.idxmax() # keep a track from tweet ID
    tf_idf_top_score["tfidf_score"] = df.max().values # retrieve TF-IDF score for the term in document
    tf_idf_top_score = tf_idf_top_score.sort_values(by="tfidf_score", ascending=False) # sort values
107
    return tf_idf_top_score
108
109

if __name__ == '__main__':
110
111
    # print working directory:
    print("working directory is: " + str(os.getcwd()))
112
113
114
115
    # Filter
    state = 'aquitaine'
    start_date = "2020-12-01T00:00:00.172Z"
    end_date = "2022-02-25T15:51:46.172Z"
Decoupes Remy's avatar
Decoupes Remy committed
116
117
118
119
120
121
    filters = {
        'state' : ["aquitaine", "aquitaine", "occitanie"],
        'start_date' : ["2020-12-01T00:00:00.172Z", "2021-04-01T00:00:00.172Z", "2021-01-01T00:00:00.172Z"],
        'end_date' : ["2021-12-31T23:00:00.172Z", "2022-02-25T15:51:46.172Z", "2021-07-31T23:00:00.172Z"]
    }
    df_filters = pd.DataFrame(filters)
122
    # connect to elastic
123
124
125
126
127
128
129
130
131
132
    """ Why not using eland ?
    # we could not filter eland df with rest.features.properties.state certainly because there are to much fields ?
    # Two solutions :
    #   - we are using eland with "ed_tweets["rest_user_osm.extent"] and intersect with polygones
    #   - we use elasticsearch package with Jinja2 template and normalization
    # We prefere to use elasticsearch
    """
    #ed_tweets = ed.DataFrame("http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200",
    #                         es_index_pattern="mood-tetis-tweets-collect")

133
134
135
136
137
    es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/"
    client_es = Elasticsearch(es_url)
    index_es = "mood-tetis-tweets-collect"
    # init jinja2 configuration
    template_dir = os.path.join(os.path.dirname(__file__), "eda_templates")
138
139
    jinja_env = Environment(loader=FileSystemLoader(template_dir))
    template = jinja_env.get_template("filter_by_state_and_date.j2")
Decoupes Remy's avatar
Decoupes Remy committed
140
141
142
143
144
145
146
147
148
149
150
    for i, filter in df_filters.iterrows():
        query = template.render(
            state=filter["state"],
            start_date=filter["start_date"],
            end_date=filter["end_date"]
        )
        df_results = elasticquery(es_url, index_es, query)
        df_results["index_of_tweet"] = df_results.index
        df_tfidf = tf_idf(df_results["text"].tolist())
        df_tfidf["tf_idf_terms"] = df_tfidf.index
        df_tfidf = df_tfidf.merge(df_results, on="index_of_tweet")
151

Decoupes Remy's avatar
Decoupes Remy committed
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
        # Translation to english
        model_checkpoint_fr = "Helsinki-NLP/opus-mt-fr-en"
        translator_fr = pipeline("translation", model=model_checkpoint_fr)
        # zeroshot classification
        classifier = pipeline("zero-shot-classification",
                              model="digitalepidemiologylab/covid-twitter-bert-v2-mnli")
        candidate_labels_fr = ["grippe aviaire"]
        # candidate_labels_fr = ["covid-19", "grippe aviaire", "AMR", "tiques", "autres"]
        candidate_labels_en = ["avian influenza"]
        # candidate_labels_en = ["covid-19", "avian influenza", "AMR", "tick borne", "others"]
        classifier_results = []
        classifier_results_2 = []
        for i, tweets in tqdm(df_tfidf.iterrows(), total=df_tfidf.shape[0]):
            text = tweets["text"]
            try:
                text_translated = text
                # text_translated = translator_fr(text)[0]["translation_text"]
                classifier_results.append(classifier(text_translated, candidate_labels_fr)["scores"])
                item = {"text" : text, "scores" : classifier(text_translated, candidate_labels_fr)["scores"]}
                classifier_results_2.append(item)
            except:
                df_tfidf.drop([i], inplace=True)
                print("text: " + text + " | translated: " + text_translated)
        classifier_df = pd.DataFrame(classifier_results, columns=candidate_labels_fr)
Rémy Decoupes's avatar
Rémy Decoupes committed
176
        try:
Decoupes Remy's avatar
Decoupes Remy committed
177
178
179
180
            f = open("analysis-output/test_2.txt", "w")
            for l in classifier_results_2:
                f.write(str(l))
            f.close()
Rémy Decoupes's avatar
Rémy Decoupes committed
181
        except:
Decoupes Remy's avatar
Decoupes Remy committed
182
183
184
185
186
187
            print("can not save file with results from zeroshot")
        classifier_df_2 = pd.DataFrame(classifier_results_2)
        # classifier_df_2.to_csv("analysis-output/acquitaine_test.csv")
        df_tfidf = df_tfidf.join(classifier_df)
        df_tfidf.to_csv("analysis-output/" + filter["state"] + "_" + filter["start_date"] + "_digitalepidemiologylab.csv")
        # df_tfidf.to_pickle("analysis-output/acquitaine-digitalepidemiologylab.pkl")
188

Decoupes Remy's avatar
Decoupes Remy committed
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
        # prepare to Gephi for graph vizu: Graph bipartites. Nodes are Newspaper and TF-IDf
        news_paper_name = pd.read_csv("./../params/accountsFollowed.csv") # get account (followed by MOOD) names
        news_paper_name["retweeted_status.user.id"] = news_paper_name["twitterID"] # prepare for merge
        gephi = df_tfidf
        # gephi["Source"] = gephi["user.id"].apply(lambda x: hashlib.md5(str(x).encode()).hexdigest())  # pseudonimization
        gephi["Source"] = gephi.index # id du lien
        gephi["Target"] = gephi["retweeted_status.user.id"]
        gephi["Id"] = gephi.index
        gephi["Label"] = gephi["tf_idf_terms"]
        gephi["timeset"] = gephi["@timestamp"]
        gephi = gephi[gephi["Target"].str.len() !=0] # filter out tweet that are not retweeted
        gephi[["Id", "Label", "Source", "Target", "timeset"]].to_csv(
            "analysis-output/acquitaine_script_gephi_edge.csv",
            index=False
        )
        # gephi.to_csv("analysis-output/gephi-debug.csv")
        # Node: newspapers (MOOD account followed)
        gephi_node = pd.DataFrame(gephi["retweeted_status.user.id"].unique(), columns=["retweeted_status.user.id"])
        gephi_node["Label"] = gephi_node.merge(news_paper_name, on="retweeted_status.user.id")["account"]
        gephi_node["Id"] = gephi_node["retweeted_status.user.id"]
        # Node: TF-IDF
        gephi_node_sub = pd.DataFrame(gephi["tf_idf_terms"].unique(), columns=["tf_idf_terms"])
        gephi_node_sub["Id"] = gephi_node_sub.merge(gephi, on="tf_idf_terms")["Id"]
        gephi_node_sub = gephi_node_sub.rename(columns={"tf_idf_terms": "Label", "Id": "Id"})
        gephi_node = gephi_node.append(gephi_node_sub)
        gephi_node.to_csv(
            "analysis-output/acquitaine_script_gephi_node.csv",
            index=False
        )