eda_es.py 6.16 KiB
"""
@brief: Explore Data Analysis of tweets indexed in ElasticSearch
@author: R.Decoupes
@copyright: CeCILL-B

Explore Data Analysis of tweets indexed in ElasticSearch
"""
import logging
from logging.handlers import RotatingFileHandler
from elasticsearch import Elasticsearch
# from elasticsearch import logger as es_logger
from jinja2 import FileSystemLoader, Environment
import os
import requests
import pandas as pd
import plotly.express as px
import plotly.io as pio


def logsetup():
    """
    Initiate a logger object :
        - Log in file : collectweets.log
        - also print on screen
    :return: logger object
    """
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
    file_handler = RotatingFileHandler(
        '/home/rdecoupe/PycharmProjects/mood-tweets-collect/elasticsearch/log/eda/eda.log', 'a', 1000000, 1)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    stream_handler = logging.StreamHandler()
    # Only display on screen INFO
    stream_handler.setLevel(logging.INFO)
    logger.addHandler(stream_handler)
    return logger


def count_rt(jinja_env, es_url, index_es):
    """
    Count the number of RT of the whole corpus

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :return: float: global number of RT
    """
    template = jinja_env.get_template("count_rt.json.j2")
    query = template.render(field="retweeted_status.id")
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
    except Exception as e:
        logger.error("Count_RT: doesn't work")
        return -1
    nb_RT = r.json()['aggregations']['0-bucket']['doc_count']
    return nb_RT


def count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease):
    """
    for each keyword: get the nb of tweets containing this keyword

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :param list_of_keywords: a list of keywords to run the elastic query
    :param disease: name of the disease
    :return: a dataframe: for each keyword: get the nb of tweets containing this keyword
    """
    template = jinja_env.get_template("count_by_disease.json.j2")
    query = template.render(list_of_keywords=list_of_keywords)
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
    except Exception as e:
        logger.error("Count_RT: doesn't work. See the full error: ")
        return -1
    df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"])
    # clean up label from elasticsearch (because it contains "text: [kw]")
    df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True)
    # transpose the dataframe
    df_results = df_results.T
    # add the disease name
    df_results["disease"] = disease
    logger.debug(df_results)
    return df_results


if __name__ == '__main__':
    logger = logsetup()
    logger.info("EDA start")
    path_figs_dir = os.path.join(os.path.dirname(__file__), "figs")
    # Init Elasticsearch configurations
    es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/"
    client_es = Elasticsearch(es_url)
    index_es = "mood-tetis-tweets-collect"
    # init jinja2 configuration
    template_dir = os.path.join(os.path.dirname(__file__), "eda_templates")
    jinja_env = Environment(loader=FileSystemLoader(template_dir))
    # end of init

    # Count RT and no RT
    # nb_rt_query = count_rt(jinja_env, es_url, index_es)
    # print(nb_rt_query)

    # Count tweets by disease
    path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
    params_kw = pd.read_csv(path_param_keywords)
    logger.info("Count tweets by disease: browse syndrome")
    df_kw_by_disease = ""
    for disease in params_kw['syndrome'].unique():
        list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist()
        # remove duplicate in keywords:
        list_of_keywords = list(set(list_of_keywords))
        logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords))
        if list_of_keywords != []:
            df = count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease))
            if type(df_kw_by_disease) == str:
                df_kw_by_disease = df
            else:
                df_kw_by_disease = df_kw_by_disease.append(df)
    df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "]
    # sunburst with plotly express
    pie_fig = px.sunburst(df_kw_by_disease,
                          path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
    pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_all.png", format='png')
    pie_fig = px.sunburst(df_kw_by_disease_without_covid,
                          path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease')
    pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_without_covid.png",
                    format='png')
    # treemap
    treemap_fig3 = px.treemap(df_kw_by_disease,
                              path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
    pio.write_image(treemap_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_all.png", format='png')
    treemap_without_covid_fig3 = px.treemap(df_kw_by_disease_without_covid,
                                            path=['disease', df_kw_by_disease_without_covid.index], values='doc_count',
                                            color='disease')
    pio.write_image(treemap_without_covid_fig3,
                    path_figs_dir + "/count_tweets_by_disease_keywords_treemap_without_covid")
    # Si besoin d'affichage HTML :
    treemap_without_covid_fig3.show()

    logger.info("EDA stop")