eda_es.py 13.19 KiB
"""
@brief: Explore Data Analysis of tweets indexed in ElasticSearch
@author: R.Decoupes
@copyright: CeCILL-B

Explore Data Analysis of tweets indexed in ElasticSearch
"""
import logging
from logging.handlers import RotatingFileHandler
from elasticsearch import Elasticsearch
# from elasticsearch import logger as es_logger
from jinja2 import FileSystemLoader, Environment
import os
import requests
import pandas as pd
import plotly.express as px
import plotly.io as pio
from plotly.subplots import make_subplots
from collections import defaultdict

def logsetup():
    """
    Initiate a logger object :
        - Log in file : collectweets.log
        - also print on screen
    :return: logger object
    """
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
    file_handler = RotatingFileHandler(
        '/home/rdecoupe/PycharmProjects/mood-tweets-collect/elasticsearch/log/eda/eda.log', 'a', 1000000, 1)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    stream_handler = logging.StreamHandler()
    # Only display on screen INFO
    stream_handler.setLevel(logging.INFO)
    logger.addHandler(stream_handler)
    return logger


def count_rt(jinja_env, es_url, index_es):
    """
    Count the number of RT of the whole corpus

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :return: float: global number of RT
    """
    template = jinja_env.get_template("count_rt.json.j2")
    query = template.render(field="retweeted_status.id")
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
    except Exception as e:
        logger.error("Count_RT: doesn't work")
        return -1
    nb_RT = r.json()['aggregations']['0-bucket']['doc_count']
    return nb_RT


def count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease):
    """
    for each keyword: get the nb of tweets containing this keyword

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :param list_of_keywords: a list of keywords to run the elastic query
    :param disease: name of the disease
    :return: a dataframe: for each keyword: get the nb of tweets containing this keyword
    """
    template = jinja_env.get_template("count_by_disease.json.j2")
    query = template.render(list_of_keywords=list_of_keywords)
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
    except Exception as e:
        logger.error("Count_RT: doesn't work. See the full error: ")
        return -1
    df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"])
    # clean up label from elasticsearch (because it contains "text: [kw]")
    df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True)
    # transpose the dataframe
    df_results = df_results.T
    # add the disease name
    df_results["disease"] = disease
    logger.debug(df_results)
    return df_results

def time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease):
    """
    Return dataframe :
        - index : timestamp : date + time
        - Columns : Multi-indexing (or hiearchical indexing): 2 levels: [1] disease, [2] keyword :
        disease                       Respiratory  Unknown  ... SARS-CoV-2
        keyword                       lungdisease NewVirus  ...  SARS-CoV-2 SARS-CoV-2
        2021-05-03T00:00:00.000+02:00        54.0      NaN  ...        3225        3225
        2021-06-02T00:00:00.000+02:00       127.0      NaN  ...        1979        1979

    To do so, we have to work on the formating of Elasticsearch result

    :param jinja_env: Dir to jinja templates
    :param es_url: URL of Elastic Search
    :param index_es: Index of elastic
    :param list_of_keywords: a list of keywords to run the elastic query
    :param disease: name of the disease
    :return: a dataframe: for each keyword: get the nb of tweets containing this keyword
    """
    template = jinja_env.get_template("disease_time_series.json.j2")
    query = template.render(list_of_keywords=list_of_keywords)
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search", data=query, headers=headers)
    except Exception as e:
        logger.error("Time series: doesn't work. See the full error: ")
        return -1
    df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"])
    # clean up label from elasticsearch (because it contains "text: [kw]")
    df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True)
    # transpose the dataframe
    df_results = df_results.T
    # add the disease name
    df_results["disease"] = disease
    # reformat time serie column
    df_results["time_serie"] = df_results.apply(lambda x: x["time_serie"]["buckets"], axis=1)

    """
    build a brand new dataframe with
        index = date
        column = disease:keywords
    """

    df_all_kw_timeserie_empty = True
    for keyword in df_results.index:
        kw_time_serie = []
        df_kw = df_results.loc[keyword]
        if df_kw['doc_count'] != 0: # avoid empty time serie
            for element in df_kw['time_serie']:
                timestamp = element['key_as_string']
                value = element['doc_count']
                kw_time_serie.append([timestamp, value])
            df_kw_timeserie = pd.DataFrame(kw_time_serie, columns=['timestamp', disease+':'+keyword])
            df_kw_timeserie.set_index('timestamp', inplace=True)
            # hierarchical indexing for column
            df_kw_timeserie.columns =  pd.MultiIndex.from_product([[disease], [keyword]], names=["disease", "keyword"])
            if df_all_kw_timeserie_empty == True:
                df_all_kw_timeserie = df_kw_timeserie
                df_all_kw_timeserie_empty = False
            else: #merge column
                # df_all_kw_timeserie[disease+':'+keyword] = df_kw_timeserie[disease+':'+keyword]
                df_all_kw_timeserie = pd.concat([df_all_kw_timeserie, df_kw_timeserie], axis=1)
    if df_all_kw_timeserie_empty == True:# none of keywords have a time serie
        return "empty_series"
    else:
        logger.debug(df_all_kw_timeserie)
        return df_all_kw_timeserie

def get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease, nb_of_estimated_results=10000):
    template = jinja_env.get_template("get_tweets_content_by_keywords.json.j2")
    query = template.render(list_of_keywords=list_of_keywords)
    headers = {'content-type': 'application/json'}
    try:
        r = requests.get(es_url + index_es + "/_search?size=" + str(nb_of_estimated_results), data=query, headers=headers)
    except Exception as e:
        logger.error("Time series: doesn't work. See the full error: ")
        return -1
    list_of_tweets = []
    for hit in r.json()["hits"]["hits"]:
        try:# test is we have more than 140 characters, ie, extended_tweet
            text = hit["fields"]["extended_tweet.full_text"][0]
        except:# else we take only the 140 characters
            text = hit["fields"]["text"][0]
        tweet = {
            "timestamp": hit["fields"]["@timestamp"][0],
            "text": text
        }
        list_of_tweets.append(tweet)
    df_results = pd.DataFrame(list_of_tweets)
    df_results.to_pickle("/home/rdecoupe/Téléchargements/test/get_tweet_content_by_disease.pkl")
    return df_results

if __name__ == '__main__':
    logger = logsetup()
    logger.info("EDA start")
    path_figs_dir = os.path.join(os.path.dirname(__file__), "figs")
    # Init Elasticsearch configurations
    es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/"
    client_es = Elasticsearch(es_url)
    index_es = "mood-tetis-tweets-collect"
    # init jinja2 configuration
    template_dir = os.path.join(os.path.dirname(__file__), "eda_templates")
    jinja_env = Environment(loader=FileSystemLoader(template_dir))
    # end of init

    """
    Count RT and no RT
    """
    # nb_rt_query = count_rt(jinja_env, es_url, index_es)
    # print(nb_rt_query)

    """
    Count tweets by disease
    """
    path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
    params_kw = pd.read_csv(path_param_keywords)
    logger.info("Count tweets by disease: browse syndrome")
    df_kw_by_disease = ""
    for disease in params_kw['syndrome'].unique():
        list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist()
        # remove duplicate in keywords:
        list_of_keywords = list(set(list_of_keywords))
        logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords))
        if list_of_keywords != []:
            df = count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease))
            if type(df_kw_by_disease) == str:
                df_kw_by_disease = df
            else:
                df_kw_by_disease = df_kw_by_disease.append(df)
    df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "]
    # sunburst with plotly express
    pie_fig = px.sunburst(df_kw_by_disease,
                          path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
    pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_all.png", format='png')
    pie_fig = px.sunburst(df_kw_by_disease_without_covid,
                          path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease')
    pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_without_covid.png",
                    format='png')
    # treemap
    treemap_fig3 = px.treemap(df_kw_by_disease,
                              path=['disease', df_kw_by_disease.index], values='doc_count', color='disease')
    pio.write_image(treemap_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_all.png", format='png')
    treemap_without_covid_fig3 = px.treemap(df_kw_by_disease_without_covid,
                                            path=['disease', df_kw_by_disease_without_covid.index], values='doc_count',
                                            color='disease')
    pio.write_image(treemap_without_covid_fig3,
                    path_figs_dir + "/count_tweets_by_disease_keywords_treemap_without_covid.png", format='png')
    # Si besoin d'affichage HTML :
    # treemap_without_covid_fig3.show()

    """ 
    Time series of keywords (except covid)
    """
    path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
    params_kw = pd.read_csv(path_param_keywords)
    logger.info("time series by disease")
    df_kw_by_disease = ""
    for disease in params_kw['syndrome'].unique():
        list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist()
        # remove duplicate in keywords:
        list_of_keywords = list(set(list_of_keywords))
        logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords))
        if list_of_keywords != []:
            df = time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease))
            if type(df_kw_by_disease) == str:
                df_kw_by_disease = df
            else:
                if type(df) != str: # For disease that they have empy time serie for each keywords: we don't plot them
                    df_kw_by_disease = pd.concat([df_kw_by_disease, df], axis=1)
    # df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "]
    logger.debug(df_kw_by_disease.keys())
    # make subplot
    list_disease_with_timeserie = set([d[0] for d in df_kw_by_disease.keys()])
    nb_of_subplots = len(list_disease_with_timeserie)
    subplots_timeseries_fig = make_subplots(rows=nb_of_subplots, cols=1)
    current_row = 0
    for disease in list_disease_with_timeserie:
        timeserie_fig = px.bar(df_kw_by_disease[disease], facet_col_wrap=2)
        current_row = current_row + 1
        # timeserie_fig.show()
        """
        Make a subplot with plotly express: it has net been implemented yet !
        Instead we have to convert our plotly express object into trace 
        """
        for trace in range(len(timeserie_fig["data"])):
            subplots_timeseries_fig.add_trace(timeserie_fig["data"][trace], row=current_row, col=1)
    # subplots_timeseries_fig.show()

    """
    Clustering tweets for a disease name
    """
    path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv"
    params_kw = pd.read_csv(path_param_keywords)
    logger.info("time series by disease")
    # For Avian Influenza:
    disease = "Avian influenza"
    list_of_keywords = ['Fowl', 'Bird', 'Avian', 'HPAI', 'FowlPlague', 'AvianInfluenza', 'avianInfluenza',
                        'Avianflu', 'bird', 'BirdFlu']
    corpus_tweets = get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease)

    logger.info("EDA stop")