An error occurred while loading the file. Please try again.
-
Decoupes Remy authored2532e56a
Forked from an inaccessible project.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
@brief: Explore Data Analysis of tweets indexed in ElasticSearch
@author: R.Decoupes
@copyright: CeCILL-B
Explore Data Analysis of tweets indexed in ElasticSearch
"""
import logging
from logging.handlers import RotatingFileHandler
from elasticsearch import Elasticsearch
# from elasticsearch import logger as es_logger
from jinja2 import FileSystemLoader, Environment
import os
import requests
import pandas as pd
import plotly.express as px
import plotly.io as pio
from plotly.subplots import make_subplots
from collections import defaultdict
def logsetup():
"""
Initiate a logger object :
- Log in file : collectweets.log
- also print on screen
:return: logger object
"""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
file_handler = RotatingFileHandler(
'/home/rdecoupe/PycharmProjects/mood-tweets-collect/elasticsearch/log/eda/eda.log', 'a', 1000000, 1)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
# Only display on screen INFO
stream_handler.setLevel(logging.INFO)
logger.addHandler(stream_handler)
return logger
def count_rt(jinja_env, es_url, index_es):
"""
Count the number of RT of the whole corpus
:param jinja_env: Dir to jinja templates
:param es_url: URL of Elastic Search
:param index_es: Index of elastic
:return: float: global number of RT
"""
template = jinja_env.get_template("count_rt.json.j2")
query = template.render(field="retweeted_status.id")
headers = {'content-type': 'application/json'}
try:
r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
except Exception as e:
logger.error("Count_RT: doesn't work")
return -1
nb_RT = r.json()['aggregations']['0-bucket']['doc_count']
return nb_RT
def count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease):
"""
for each keyword: get the nb of tweets containing this keyword
:param jinja_env: Dir to jinja templates
:param es_url: URL of Elastic Search
:param index_es: Index of elastic
:param list_of_keywords: a list of keywords to run the elastic query
:param disease: name of the disease
:return: a dataframe: for each keyword: get the nb of tweets containing this keyword
"""
template = jinja_env.get_template("count_by_disease.json.j2")
query = template.render(list_of_keywords=list_of_keywords)
headers = {'content-type': 'application/json'}
try:
r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
except Exception as e:
logger.error("Count_RT: doesn't work. See the full error: ")
return -1
df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"])
# clean up label from elasticsearch (because it contains "text: [kw]")
df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True)
# transpose the dataframe
df_results = df_results.T
# add the disease name
df_results["disease"] = disease
logger.debug(df_results)
return df_results
def time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease):
"""
Return dataframe :
- index : timestamp : date + time
- Columns : Multi-indexing (or hiearchical indexing): 2 levels: [1] disease, [2] keyword :
disease Respiratory Unknown ... SARS-CoV-2
keyword lungdisease NewVirus ... SARS-CoV-2 SARS-CoV-2
2021-05-03T00:00:00.000+02:00 54.0 NaN ... 3225 3225
2021-06-02T00:00:00.000+02:00 127.0 NaN ... 1979 1979
To do so, we have to work on the formating of Elasticsearch result
:param jinja_env: Dir to jinja templates
:param es_url: URL of Elastic Search
:param index_es: Index of elastic
:param list_of_keywords: a list of keywords to run the elastic query
:param disease: name of the disease
:return: a dataframe: for each keyword: get the nb of tweets containing this keyword
"""
template = jinja_env.get_template("disease_time_series.json.j2")
query = template.render(list_of_keywords=list_of_keywords)
headers = {'content-type': 'application/json'}
try:
r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
except Exception as e:
logger.error("Time series: doesn't work. See the full error: ")
return -1
df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"])
# clean up label from elasticsearch (because it contains "text: [kw]")
df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True)
# transpose the dataframe
df_results = df_results.T
# add the disease name
df_results["disease"] = disease
# reformat time serie column
df_results["time_serie"] = df_results.apply(lambda x: x["time_serie"]["buckets"], axis=1)
"""
build a brand new dataframe with
index = date
column = disease:keywords
"""
df_all_kw_timeserie_empty = True
for keyword in df_results.index:
kw_time_serie = []
df_kw = df_results.loc[keyword]
if df_kw['doc_count'] != 0: # avoid empty time serie
for element in df_kw['time_serie']:
timestamp = element['key_as_string']
value = element['doc_count']
kw_time_serie.append([timestamp, value])
df_kw_timeserie = pd.DataFrame(kw_time_serie, columns=['timestamp', disease+':'+keyword])
df_kw_timeserie.set_index('timestamp', inplace=True)
# hierarchical indexing for column
df_kw_timeserie.columns = pd.MultiIndex.from_product([[disease], [keyword]], names=["disease", "keyword"])
if df_all_kw_timeserie_empty == True:
df_all_kw_timeserie = df_kw_timeserie
df_all_kw_timeserie_empty = False
else: #merge column
# df_all_kw_timeserie[disease+':'+keyword] = df_kw_timeserie[disease+':'+keyword]
df_all_kw_timeserie = pd.concat([df_all_kw_timeserie, df_kw_timeserie], axis=1)
if df_all_kw_timeserie_empty == True:# none of keywords have a time serie
return "empty_series"
else:
logger.debug(df_all_kw_timeserie)
return df_all_kw_timeserie
def get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease, nb_of_estimated_results=10000):
template = jinja_env.get_template("get_tweets_content_by_keywords.json.j2")
query = template.render(list_of_keywords=list_of_keywords)
headers = {'content-type': 'application/json'}
try:
r = requests.get(es_url + index_es + "/_search?size=" + str(nb_of_estimated_results), data=query, headers=headers)
except Exception as e:
logger.error("Time series: doesn't work. See the full error: ")
return -1
list_of_tweets = []
for hit in r.json()["hits"]["hits"]:
try:# test is we have more than 140 characters, ie, extended_tweet
text = hit["fields"]["extended_tweet.full_text"][0]
except:# else we take only the 140 characters
text = hit["fields"]["text"][0]
tweet = {
"timestamp": hit["fields"]["@timestamp"][0],
"text": text
}
list_of_tweets.append(tweet)
df_results = pd.DataFrame(list_of_tweets)
df_results.to_pickle("/home/rdecoupe/Téléchargements/test/get_tweet_content_by_disease.pkl")
return df_results
if __name__ == '__main__':
logger = logsetup()
logger.info("EDA start")
path_figs_dir = os.path.join(os.path.dirname(__file__), "figs")
# Init Elasticsearch configurations
es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/"
client_es = Elasticsearch(es_url)
index_es = "mood-tetis-tweets-collect"
# init jinja2 configuration
template_dir = os.path.join(os.path.dirname(__file__), "eda_templates")
jinja_env = Environment(loader=FileSystemLoader(template_dir))
# end of init
"""
Count RT and no RT
"""
# nb_rt_query = count_rt(jinja_env, es_url, index_es)
# print(nb_rt_query)
"""
Count tweets by disease
"""
path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
params_kw = pd.read_csv(path_param_keywords)
logger.info("Count tweets by disease: browse syndrome")
df_kw_by_disease = ""
for disease in params_kw['syndrome'].unique():
list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist()
# remove duplicate in keywords:
list_of_keywords = list(set(list_of_keywords))
logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords))
if list_of_keywords != []:
df = count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease))
if type(df_kw_by_disease) == str:
df_kw_by_disease = df
else:
df_kw_by_disease = df_kw_by_disease.append(df)
df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "]
# sunburst with plotly express
pie_fig = px.sunburst(df_kw_by_disease,
path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_all.png", format='png')
pie_fig = px.sunburst(df_kw_by_disease_without_covid,
path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease')
pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_without_covid.png",
format='png')
# treemap
treemap_fig3 = px.treemap(df_kw_by_disease,
path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
pio.write_image(treemap_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_all.png", format='png')
treemap_without_covid_fig3 = px.treemap(df_kw_by_disease_without_covid,
path=['disease', df_kw_by_disease_without_covid.index], values='doc_count',
color='disease')
pio.write_image(treemap_without_covid_fig3,
path_figs_dir + "/count_tweets_by_disease_keywords_treemap_without_covid.png", format='png')
# Si besoin d'affichage HTML :
# treemap_without_covid_fig3.show()
"""
Time series of keywords (except covid)
"""
path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
params_kw = pd.read_csv(path_param_keywords)
logger.info("time series by disease")
df_kw_by_disease = ""
for disease in params_kw['syndrome'].unique():
list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist()
# remove duplicate in keywords:
list_of_keywords = list(set(list_of_keywords))
logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords))
if list_of_keywords != []:
df = time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease))
if type(df_kw_by_disease) == str:
df_kw_by_disease = df
else:
if type(df) != str: # For disease that they have empy time serie for each keywords: we don't plot them
df_kw_by_disease = pd.concat([df_kw_by_disease, df], axis=1)
# df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "]
logger.debug(df_kw_by_disease.keys())
# make subplot
list_disease_with_timeserie = set([d[0] for d in df_kw_by_disease.keys()])
nb_of_subplots = len(list_disease_with_timeserie)
subplots_timeseries_fig = make_subplots(rows=nb_of_subplots, cols=1)
current_row = 0
for disease in list_disease_with_timeserie:
timeserie_fig = px.bar(df_kw_by_disease[disease], facet_col_wrap=2)
current_row = current_row + 1
# timeserie_fig.show()
"""
Make a subplot with plotly express: it has net been implemented yet !
Instead we have to convert our plotly express object into trace
"""
for trace in range(len(timeserie_fig["data"])):
subplots_timeseries_fig.add_trace(timeserie_fig["data"][trace], row=current_row, col=1)
# subplots_timeseries_fig.show()
"""
Clustering tweets for a disease name
"""
path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
params_kw = pd.read_csv(path_param_keywords)
logger.info("time series by disease")
# For Avian Influenza:
disease = "Avian influenza"
list_of_keywords = ['Fowl', 'Bird', 'Avian', 'HPAI', 'FowlPlague', 'AvianInfluenza', 'avianInfluenza',
'Avianflu', 'bird', 'BirdFlu']
corpus_tweets = get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease)
logger.info("EDA stop")