""" @brief: Explore Data Analysis of tweets indexed in ElasticSearch @author: R.Decoupes @copyright: CeCILL-B Explore Data Analysis of tweets indexed in ElasticSearch """ import logging from logging.handlers import RotatingFileHandler from elasticsearch import Elasticsearch # from elasticsearch import logger as es_logger from jinja2 import FileSystemLoader, Environment import os import requests import pandas as pd import plotly.express as px import plotly.io as pio def logsetup(): """ Initiate a logger object : - Log in file : collectweets.log - also print on screen :return: logger object """ logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s') file_handler = RotatingFileHandler( '/home/rdecoupe/PycharmProjects/mood-tweets-collect/elasticsearch/log/eda/eda.log', 'a', 1000000, 1) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) stream_handler = logging.StreamHandler() # Only display on screen INFO stream_handler.setLevel(logging.INFO) logger.addHandler(stream_handler) return logger def count_rt(jinja_env, es_url, index_es): """ Count the number of RT of the whole corpus :param jinja_env: Dir to jinja templates :param es_url: URL of Elastic Search :param index_es: Index of elastic :return: float: global number of RT """ template = jinja_env.get_template("count_rt.json.j2") query = template.render(field="retweeted_status.id") headers = {'content-type': 'application/json'} try: r = requests.get(es_url + index_es + "/_search", data=query, headers=headers) except Exception as e: logger.error("Count_RT: doesn't work") return -1 nb_RT = r.json()['aggregations']['0-bucket']['doc_count'] return nb_RT def count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease): """ for each keyword: get the nb of tweets containing this keyword :param jinja_env: Dir to jinja templates :param es_url: URL of Elastic Search :param index_es: Index of elastic :param list_of_keywords: a list of keywords to run the elastic query :param disease: name of the disease :return: a dataframe: for each keyword: get the nb of tweets containing this keyword """ template = jinja_env.get_template("count_by_disease.json.j2") query = template.render(list_of_keywords=list_of_keywords) headers = {'content-type': 'application/json'} try: r = requests.get(es_url + index_es + "/_search", data=query, headers=headers) except Exception as e: logger.error("Count_RT: doesn't work. See the full error: ") return -1 df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"]) # clean up label from elasticsearch (because it contains "text: [kw]") df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True) # transpose the dataframe df_results = df_results.T # add the disease name df_results["disease"] = disease logger.debug(df_results) return df_results if __name__ == '__main__': logger = logsetup() logger.info("EDA start") path_figs_dir = os.path.join(os.path.dirname(__file__), "figs") # Init Elasticsearch configurations es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/" client_es = Elasticsearch(es_url) index_es = "mood-tetis-tweets-collect" # init jinja2 configuration template_dir = os.path.join(os.path.dirname(__file__), "eda_templates") jinja_env = Environment(loader=FileSystemLoader(template_dir)) # end of init # Count RT and no RT # nb_rt_query = count_rt(jinja_env, es_url, index_es) # print(nb_rt_query) # Count tweets by disease path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv" params_kw = pd.read_csv(path_param_keywords) logger.info("Count tweets by disease: browse syndrome") df_kw_by_disease = "" for disease in params_kw['syndrome'].unique(): list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist() # remove duplicate in keywords: list_of_keywords = list(set(list_of_keywords)) logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords)) if list_of_keywords != []: df = count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease)) if type(df_kw_by_disease) == str: df_kw_by_disease = df else: df_kw_by_disease = df_kw_by_disease.append(df) df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "] # sunburst with plotly express pie_fig = px.sunburst(df_kw_by_disease, path=['disease', df_kw_by_disease.index], values='doc_count', color='disease') pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_all.png", format='png') pie_fig = px.sunburst(df_kw_by_disease_without_covid, path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease') pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_without_covid.png", format='png') # treemap treemap_fig3 = px.treemap(df_kw_by_disease, path=['disease', df_kw_by_disease.index], values='doc_count', color='disease') pio.write_image(treemap_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_all.png", format='png') treemap_without_covid_fig3 = px.treemap(df_kw_by_disease_without_covid, path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease') pio.write_image(treemap_without_covid_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_without_covid") # Si besoin d'affichage HTML : treemap_without_covid_fig3.show() logger.info("EDA stop")