eda_es.py 14.9 KB
Newer Older
Decoupes Remy's avatar
Decoupes Remy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
"""
@brief: Explore Data Analysis of tweets indexed in ElasticSearch
@author: R.Decoupes
@copyright: CeCILL-B

Explore Data Analysis of tweets indexed in ElasticSearch
"""
import logging
from logging.handlers import RotatingFileHandler
from elasticsearch import Elasticsearch
# from elasticsearch import logger as es_logger
from jinja2 import FileSystemLoader, Environment
import os
14
import requests
15
import pandas as pd
16
17
import plotly.express as px
import plotly.io as pio
18
from plotly.subplots import make_subplots
19
20
21
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.manifold import TSNE
Decoupes Remy's avatar
Decoupes Remy committed
22
23
24
25
26
27
28
29
30
31
32

def logsetup():
    """
    Initiate a logger object :
        - Log in file : collectweets.log
        - also print on screen
    :return: logger object
    """
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
Decoupes Remy's avatar
Decoupes Remy committed
33
34
    file_handler = RotatingFileHandler(
        '/home/rdecoupe/PycharmProjects/mood-tweets-collect/elasticsearch/log/eda/eda.log', 'a', 1000000, 1)
Decoupes Remy's avatar
Decoupes Remy committed
35
36
37
38
39
40
41
42
43
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    stream_handler = logging.StreamHandler()
    # Only display on screen INFO
    stream_handler.setLevel(logging.INFO)
    logger.addHandler(stream_handler)
    return logger

Decoupes Remy's avatar
Decoupes Remy committed
44

45
def count_rt(jinja_env, es_url, index_es):
46
47
48
49
50
51
52
53
    """
    Count the number of RT of the whole corpus

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :return: float: global number of RT
    """
Decoupes Remy's avatar
Decoupes Remy committed
54
55
    template = jinja_env.get_template("count_rt.json.j2")
    query = template.render(field="retweeted_status.id")
56
    headers = {'content-type': 'application/json'}
Decoupes Remy's avatar
Decoupes Remy committed
57
    try:
58
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
Decoupes Remy's avatar
Decoupes Remy committed
59
    except Exception as e:
Decoupes Remy's avatar
Decoupes Remy committed
60
        logger.error("Count_RT: doesn't work")
61
62
63
64
        return -1
    nb_RT = r.json()['aggregations']['0-bucket']['doc_count']
    return nb_RT

Decoupes Remy's avatar
Decoupes Remy committed
65

66
67
68
69
70
71
72
73
74
75
76
def count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease):
    """
    for each keyword: get the nb of tweets containing this keyword

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :param list_of_keywords: a list of keywords to run the elastic query
    :param disease: name of the disease
    :return: a dataframe: for each keyword: get the nb of tweets containing this keyword
    """
77
    template = jinja_env.get_template("count_by_disease.json.j2")
78
    query = template.render(list_of_keywords=list_of_keywords)
79
80
81
82
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
    except Exception as e:
Decoupes Remy's avatar
Decoupes Remy committed
83
        logger.error("Count_RT: doesn't work. See the full error: ")
84
        return -1
85
86
87
88
89
    df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"])
    # clean up label from elasticsearch (because it contains "text: [kw]")
    df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True)
    # transpose the dataframe
    df_results = df_results.T
Decoupes Remy's avatar
Decoupes Remy committed
90
    # add the disease name
91
92
93
    df_results["disease"] = disease
    logger.debug(df_results)
    return df_results
Decoupes Remy's avatar
Decoupes Remy committed
94

95
96
97
98
def time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease):
    """
    Return dataframe :
        - index : timestamp : date + time
99
100
101
102
103
        - Columns : Multi-indexing (or hiearchical indexing): 2 levels: [1] disease, [2] keyword :
        disease                       Respiratory  Unknown  ... SARS-CoV-2
        keyword                       lungdisease NewVirus  ...  SARS-CoV-2 SARS-CoV-2
        2021-05-03T00:00:00.000+02:00        54.0      NaN  ...        3225        3225
        2021-06-02T00:00:00.000+02:00       127.0      NaN  ...        1979        1979
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

    To do so, we have to work on the formating of Elasticsearch result

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :param list_of_keywords: a list of keywords to run the elastic query
    :param disease: name of the disease
    :return: a dataframe: for each keyword: get the nb of tweets containing this keyword
    """
    template = jinja_env.get_template("disease_time_series.json.j2")
    query = template.render(list_of_keywords=list_of_keywords)
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
    except Exception as e:
        logger.error("Time series: doesn't work. See the full error: ")
        return -1
    df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"])
    # clean up label from elasticsearch (because it contains "text: [kw]")
    df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True)
    # transpose the dataframe
    df_results = df_results.T
    # add the disease name
    df_results["disease"] = disease
    # reformat time serie column
    df_results["time_serie"] = df_results.apply(lambda x: x["time_serie"]["buckets"], axis=1)

    """
    build a brand new dataframe with
        index = date
        column = disease:keywords
    """

    df_all_kw_timeserie_empty = True
    for keyword in df_results.index:
        kw_time_serie = []
        df_kw = df_results.loc[keyword]
        if df_kw['doc_count'] != 0: # avoid empty time serie
            for element in df_kw['time_serie']:
                timestamp = element['key_as_string']
                value = element['doc_count']
                kw_time_serie.append([timestamp, value])
            df_kw_timeserie = pd.DataFrame(kw_time_serie, columns=['timestamp', disease+':'+keyword])
            df_kw_timeserie.set_index('timestamp', inplace=True)
149
150
            # hierarchical indexing for column
            df_kw_timeserie.columns =  pd.MultiIndex.from_product([[disease], [keyword]], names=["disease", "keyword"])
151
152
153
154
155
156
157
158
159
160
161
            if df_all_kw_timeserie_empty == True:
                df_all_kw_timeserie = df_kw_timeserie
                df_all_kw_timeserie_empty = False
            else: #merge column
                # df_all_kw_timeserie[disease+':'+keyword] = df_kw_timeserie[disease+':'+keyword]
                df_all_kw_timeserie = pd.concat([df_all_kw_timeserie, df_kw_timeserie], axis=1)
    if df_all_kw_timeserie_empty == True:# none of keywords have a time serie
        return "empty_series"
    else:
        logger.debug(df_all_kw_timeserie)
        return df_all_kw_timeserie
Decoupes Remy's avatar
Decoupes Remy committed
162

163
def get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease, nb_of_estimated_results=10000):
164
165
166
167
168
169
170
171
172
173
    """
    Retrieves all tweets for a specific disease thanks its keywords
    :param jinja_env:
    :param es_url:
    :param index_es:
    :param list_of_keywords:
    :param disease:
    :param nb_of_estimated_results:
    :return:
    """
174
175
176
177
178
179
180
181
182
183
    template = jinja_env.get_template("get_tweets_content_by_keywords.json.j2")
    query = template.render(list_of_keywords=list_of_keywords)
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search?size=" + str(nb_of_estimated_results), data=query, headers=headers)
    except Exception as e:
        logger.error("Time series: doesn't work. See the full error: ")
        return -1
    list_of_tweets = []
    for hit in r.json()["hits"]["hits"]:
184
185
186
187
188
189
190
191
192
        try:# test is we have more than 140 characters, ie, extended_tweet
            text = hit["fields"]["extended_tweet.full_text"][0]
        except:# else we take only the 140 characters
            text = hit["fields"]["text"][0]
        tweet = {
            "timestamp": hit["fields"]["@timestamp"][0],
            "text": text
        }
        list_of_tweets.append(tweet)
193
    df_results = pd.DataFrame(list_of_tweets)
194
    # df_results.to_pickle("/home/rdecoupe/Téléchargements/test/get_tweet_content_by_disease.pkl")
195
196
    return df_results

197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def visualize_sentence_through_embedding(corpus):
    # How to choose the model: An overview on huggingface
    # https://www.sbert.net/docs/pretrained_models.html#sentence-embedding-models
    multi_lingual_model = "distiluse-base-multilingual-cased-v1"
    best_quality_model = "all-mpnet-base-v2"
    faster_model = "all-MiniLM-L6-v2" # used by UKPLab : https://github.com/UKPLab/sentence-transformers/blob/a94030226da1f4b03f9d703596b0ebd360c9ef43/examples/applications/clustering/agglomerative.py#L33
    embedder = SentenceTransformer(faster_model)
    # Encode !
    corpus_embeddings = embedder.encode(corpus)
    # Normalize the embeddings to unit length
    corpus_embeddings = corpus_embeddings / np.linalg.norm(corpus_embeddings, axis=1, keepdims=True)
    # Dimension reduction with t-SNE
    tsne_model = TSNE(perplexity=40, n_components=2, init='pca', n_iter=2500, random_state=23)
    corpus_embeddings_tsne = tsne_model.fit_transform(corpus_embeddings)
    corpus_embeddings_tsne_df = pd.DataFrame(corpus_embeddings_tsne)
    corpus_embeddings_tsne_df["label"] = corpus
    # plot with plotly express
    fig = px.scatter(
        corpus_embeddings_tsne_df, x=0, y=1,
        hover_data=["label"]
    )
    fig.show()


Decoupes Remy's avatar
Decoupes Remy committed
221
222
223
if __name__ == '__main__':
    logger = logsetup()
    logger.info("EDA start")
224
    path_figs_dir = os.path.join(os.path.dirname(__file__), "figs")
225
226
227
    # Init Elasticsearch configurations
    es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/"
    client_es = Elasticsearch(es_url)
Decoupes Remy's avatar
Decoupes Remy committed
228
229
230
231
232
233
    index_es = "mood-tetis-tweets-collect"
    # init jinja2 configuration
    template_dir = os.path.join(os.path.dirname(__file__), "eda_templates")
    jinja_env = Environment(loader=FileSystemLoader(template_dir))
    # end of init

234
235
236
    """
    Count RT and no RT
    """
237
238
    # nb_rt_query = count_rt(jinja_env, es_url, index_es)
    # print(nb_rt_query)
Decoupes Remy's avatar
Decoupes Remy committed
239

240
241
242
    """
    Count tweets by disease
    """
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
    path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
    params_kw = pd.read_csv(path_param_keywords)
    logger.info("Count tweets by disease: browse syndrome")
    df_kw_by_disease = ""
    for disease in params_kw['syndrome'].unique():
        list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist()
        # remove duplicate in keywords:
        list_of_keywords = list(set(list_of_keywords))
        logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords))
        if list_of_keywords != []:
            df = count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease))
            if type(df_kw_by_disease) == str:
                df_kw_by_disease = df
            else:
                df_kw_by_disease = df_kw_by_disease.append(df)
258
259
    df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "]
    # sunburst with plotly express
Decoupes Remy's avatar
Decoupes Remy committed
260
261
    pie_fig = px.sunburst(df_kw_by_disease,
                          path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
262
    pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_all.png", format='png')
Decoupes Remy's avatar
Decoupes Remy committed
263
264
265
266
    pie_fig = px.sunburst(df_kw_by_disease_without_covid,
                          path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease')
    pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_without_covid.png",
                    format='png')
267
    # treemap
Decoupes Remy's avatar
Decoupes Remy committed
268
269
270
271
272
273
274
    treemap_fig3 = px.treemap(df_kw_by_disease,
                              path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
    pio.write_image(treemap_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_all.png", format='png')
    treemap_without_covid_fig3 = px.treemap(df_kw_by_disease_without_covid,
                                            path=['disease', df_kw_by_disease_without_covid.index], values='doc_count',
                                            color='disease')
    pio.write_image(treemap_without_covid_fig3,
Decoupes Remy's avatar
Decoupes Remy committed
275
                    path_figs_dir + "/count_tweets_by_disease_keywords_treemap_without_covid.png", format='png')
Decoupes Remy's avatar
Decoupes Remy committed
276
    # Si besoin d'affichage HTML :
277
    # treemap_without_covid_fig3.show()
Decoupes Remy's avatar
Decoupes Remy committed
278

279
280
281
    """ 
    Time series of keywords (except covid)
    """
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
    path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
    params_kw = pd.read_csv(path_param_keywords)
    logger.info("time series by disease")
    df_kw_by_disease = ""
    for disease in params_kw['syndrome'].unique():
        list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist()
        # remove duplicate in keywords:
        list_of_keywords = list(set(list_of_keywords))
        logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords))
        if list_of_keywords != []:
            df = time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease))
            if type(df_kw_by_disease) == str:
                df_kw_by_disease = df
            else:
                if type(df) != str: # For disease that they have empy time serie for each keywords: we don't plot them
                    df_kw_by_disease = pd.concat([df_kw_by_disease, df], axis=1)
    # df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "]
    logger.debug(df_kw_by_disease.keys())
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
    # make subplot
    list_disease_with_timeserie = set([d[0] for d in df_kw_by_disease.keys()])
    nb_of_subplots = len(list_disease_with_timeserie)
    subplots_timeseries_fig = make_subplots(rows=nb_of_subplots, cols=1)
    current_row = 0
    for disease in list_disease_with_timeserie:
        timeserie_fig = px.bar(df_kw_by_disease[disease], facet_col_wrap=2)
        current_row = current_row + 1
        # timeserie_fig.show()
        """
        Make a subplot with plotly express: it has net been implemented yet !
        Instead we have to convert our plotly express object into trace 
        """
        for trace in range(len(timeserie_fig["data"])):
            subplots_timeseries_fig.add_trace(timeserie_fig["data"][trace], row=current_row, col=1)
315
316
317
318
319
320
321
322
323
324
325
326
327
    # subplots_timeseries_fig.show()

    """
    Clustering tweets for a disease name
    """
    path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
    params_kw = pd.read_csv(path_param_keywords)
    logger.info("time series by disease")
    # For Avian Influenza:
    disease = "Avian influenza"
    list_of_keywords = ['Fowl', 'Bird', 'Avian', 'HPAI', 'FowlPlague', 'AvianInfluenza', 'avianInfluenza',
                        'Avianflu', 'bird', 'BirdFlu']
    corpus_tweets = get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease)
328
329
330
    corpus_tweets_list = corpus_tweets.text.values.tolist()
    corpus_tweets_list = list(set(corpus_tweets_list)) #Remove duplicate tweets (mostly RT)
    visualize_sentence_through_embedding(corpus_tweets_list)
Decoupes Remy's avatar
Decoupes Remy committed
331

Decoupes Remy's avatar
Decoupes Remy committed
332
    logger.info("EDA stop")