""" @brief: Explore Data Analysis of tweets indexed in ElasticSearch @author: R.Decoupes @copyright: CeCILL-B Explore Data Analysis of tweets indexed in ElasticSearch """ import logging from logging.handlers import RotatingFileHandler from elasticsearch import Elasticsearch # from elasticsearch import logger as es_logger from jinja2 import FileSystemLoader, Environment import os import requests import pandas as pd import plotly.express as px import plotly.io as pio from plotly.subplots import make_subplots from sentence_transformers import SentenceTransformer import numpy as np from sklearn.manifold import TSNE def logsetup(): """ Initiate a logger object : - Log in file : collectweets.log - also print on screen :return: logger object """ logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s') file_handler = RotatingFileHandler( '/home/rdecoupe/PycharmProjects/mood-tweets-collect/elasticsearch/log/eda/eda.log', 'a', 1000000, 1) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) stream_handler = logging.StreamHandler() # Only display on screen INFO stream_handler.setLevel(logging.INFO) logger.addHandler(stream_handler) return logger def count_rt(jinja_env, es_url, index_es): """ Count the number of RT of the whole corpus :param jinja_env: Dir to jinja templates :param es_url: URL of Elastic Search :param index_es: Index of elastic :return: float: global number of RT """ template = jinja_env.get_template("count_rt.json.j2") query = template.render(field="retweeted_status.id") headers = {'content-type': 'application/json'} try: r = requests.get(es_url + index_es + "/_search", data=query, headers=headers) except Exception as e: logger.error("Count_RT: doesn't work") return -1 nb_RT = r.json()['aggregations']['0-bucket']['doc_count'] return nb_RT def count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease): """ for each keyword: get the nb of tweets containing this keyword :param jinja_env: Dir to jinja templates :param es_url: URL of Elastic Search :param index_es: Index of elastic :param list_of_keywords: a list of keywords to run the elastic query :param disease: name of the disease :return: a dataframe: for each keyword: get the nb of tweets containing this keyword """ template = jinja_env.get_template("count_by_disease.json.j2") query = template.render(list_of_keywords=list_of_keywords) headers = {'content-type': 'application/json'} try: r = requests.get(es_url + index_es + "/_search", data=query, headers=headers) except Exception as e: logger.error("Count_RT: doesn't work. See the full error: ") return -1 df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"]) # clean up label from elasticsearch (because it contains "text: [kw]") df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True) # transpose the dataframe df_results = df_results.T # add the disease name df_results["disease"] = disease logger.debug(df_results) return df_results def time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, disease): """ Return dataframe : - index : timestamp : date + time - Columns : Multi-indexing (or hiearchical indexing): 2 levels: [1] disease, [2] keyword : disease Respiratory Unknown ... SARS-CoV-2 keyword lungdisease NewVirus ... SARS-CoV-2 SARS-CoV-2 2021-05-03T00:00:00.000+02:00 54.0 NaN ... 3225 3225 2021-06-02T00:00:00.000+02:00 127.0 NaN ... 1979 1979 To do so, we have to work on the formating of Elasticsearch result :param jinja_env: Dir to jinja templates :param es_url: URL of Elastic Search :param index_es: Index of elastic :param list_of_keywords: a list of keywords to run the elastic query :param disease: name of the disease :return: a dataframe: for each keyword: get the nb of tweets containing this keyword """ template = jinja_env.get_template("disease_time_series.json.j2") query = template.render(list_of_keywords=list_of_keywords) headers = {'content-type': 'application/json'} try: r = requests.get(es_url + index_es + "/_search", data=query, headers=headers) except Exception as e: logger.error("Time series: doesn't work. See the full error: ") return -1 df_results = pd.DataFrame.from_dict(r.json()["aggregations"]["0"]["buckets"]) # clean up label from elasticsearch (because it contains "text: [kw]") df_results.rename(columns=lambda x: x.split(' : ')[1], inplace=True) # transpose the dataframe df_results = df_results.T # add the disease name df_results["disease"] = disease # reformat time serie column df_results["time_serie"] = df_results.apply(lambda x: x["time_serie"]["buckets"], axis=1) """ build a brand new dataframe with index = date column = disease:keywords """ df_all_kw_timeserie_empty = True for keyword in df_results.index: kw_time_serie = [] df_kw = df_results.loc[keyword] if df_kw['doc_count'] != 0: # avoid empty time serie for element in df_kw['time_serie']: timestamp = element['key_as_string'] value = element['doc_count'] kw_time_serie.append([timestamp, value]) df_kw_timeserie = pd.DataFrame(kw_time_serie, columns=['timestamp', disease+':'+keyword]) df_kw_timeserie.set_index('timestamp', inplace=True) # hierarchical indexing for column df_kw_timeserie.columns = pd.MultiIndex.from_product([[disease], [keyword]], names=["disease", "keyword"]) if df_all_kw_timeserie_empty == True: df_all_kw_timeserie = df_kw_timeserie df_all_kw_timeserie_empty = False else: #merge column # df_all_kw_timeserie[disease+':'+keyword] = df_kw_timeserie[disease+':'+keyword] df_all_kw_timeserie = pd.concat([df_all_kw_timeserie, df_kw_timeserie], axis=1) if df_all_kw_timeserie_empty == True:# none of keywords have a time serie return "empty_series" else: logger.debug(df_all_kw_timeserie) return df_all_kw_timeserie def get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease, nb_of_estimated_results=10000): """ Retrieves all tweets for a specific disease thanks its keywords :param jinja_env: :param es_url: :param index_es: :param list_of_keywords: :param disease: :param nb_of_estimated_results: :return: """ template = jinja_env.get_template("get_tweets_content_by_keywords.json.j2") query = template.render(list_of_keywords=list_of_keywords) headers = {'content-type': 'application/json'} try: r = requests.get(es_url + index_es + "/_search?size=" + str(nb_of_estimated_results), data=query, headers=headers) except Exception as e: logger.error("Time series: doesn't work. See the full error: ") return -1 list_of_tweets = [] for hit in r.json()["hits"]["hits"]: try:# test is we have more than 140 characters, ie, extended_tweet text = hit["fields"]["extended_tweet.full_text"][0] except:# else we take only the 140 characters text = hit["fields"]["text"][0] tweet = { "timestamp": hit["fields"]["@timestamp"][0], "text": text } list_of_tweets.append(tweet) df_results = pd.DataFrame(list_of_tweets) # df_results.to_pickle("/home/rdecoupe/Téléchargements/test/get_tweet_content_by_disease.pkl") return df_results def visualize_sentence_through_embedding(corpus): # How to choose the model: An overview on huggingface # https://www.sbert.net/docs/pretrained_models.html#sentence-embedding-models multi_lingual_model = "distiluse-base-multilingual-cased-v1" best_quality_model = "all-mpnet-base-v2" faster_model = "all-MiniLM-L6-v2" # used by UKPLab : https://github.com/UKPLab/sentence-transformers/blob/a94030226da1f4b03f9d703596b0ebd360c9ef43/examples/applications/clustering/agglomerative.py#L33 embedder = SentenceTransformer(faster_model) # Encode ! corpus_embeddings = embedder.encode(corpus) # Normalize the embeddings to unit length corpus_embeddings = corpus_embeddings / np.linalg.norm(corpus_embeddings, axis=1, keepdims=True) # Dimension reduction with t-SNE tsne_model = TSNE(perplexity=40, n_components=2, init='pca', n_iter=2500, random_state=23) corpus_embeddings_tsne = tsne_model.fit_transform(corpus_embeddings) corpus_embeddings_tsne_df = pd.DataFrame(corpus_embeddings_tsne) corpus_embeddings_tsne_df["label"] = corpus # plot with plotly express fig = px.scatter( corpus_embeddings_tsne_df, x=0, y=1, hover_data=["label"] ) fig.show() if __name__ == '__main__': logger = logsetup() logger.info("EDA start") path_figs_dir = os.path.join(os.path.dirname(__file__), "figs") # Init Elasticsearch configurations es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/" client_es = Elasticsearch(es_url) index_es = "mood-tetis-tweets-collect" # init jinja2 configuration template_dir = os.path.join(os.path.dirname(__file__), "eda_templates") jinja_env = Environment(loader=FileSystemLoader(template_dir)) # end of init """ Count RT and no RT """ # nb_rt_query = count_rt(jinja_env, es_url, index_es) # print(nb_rt_query) """ Count tweets by disease """ path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv" params_kw = pd.read_csv(path_param_keywords) logger.info("Count tweets by disease: browse syndrome") df_kw_by_disease = "" for disease in params_kw['syndrome'].unique(): list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist() # remove duplicate in keywords: list_of_keywords = list(set(list_of_keywords)) logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords)) if list_of_keywords != []: df = count_tweets_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease)) if type(df_kw_by_disease) == str: df_kw_by_disease = df else: df_kw_by_disease = df_kw_by_disease.append(df) df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "] # sunburst with plotly express pie_fig = px.sunburst(df_kw_by_disease, path=['disease', df_kw_by_disease.index], values='doc_count', color='disease') pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_all.png", format='png') pie_fig = px.sunburst(df_kw_by_disease_without_covid, path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease') pio.write_image(pie_fig, path_figs_dir + "/count_tweets_by_disease_keywords_sunburst_without_covid.png", format='png') # treemap treemap_fig3 = px.treemap(df_kw_by_disease, path=['disease', df_kw_by_disease.index], values='doc_count', color='disease') pio.write_image(treemap_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_all.png", format='png') treemap_without_covid_fig3 = px.treemap(df_kw_by_disease_without_covid, path=['disease', df_kw_by_disease_without_covid.index], values='doc_count', color='disease') pio.write_image(treemap_without_covid_fig3, path_figs_dir + "/count_tweets_by_disease_keywords_treemap_without_covid.png", format='png') # Si besoin d'affichage HTML : # treemap_without_covid_fig3.show() """ Time series of keywords (except covid) """ path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv" params_kw = pd.read_csv(path_param_keywords) logger.info("time series by disease") df_kw_by_disease = "" for disease in params_kw['syndrome'].unique(): list_of_keywords = params_kw[params_kw['syndrome'] == disease]["hashtags"].tolist() # remove duplicate in keywords: list_of_keywords = list(set(list_of_keywords)) logger.debug("\t" + str(disease) + ": list of keywords: " + str(list_of_keywords)) if list_of_keywords != []: df = time_series_by_disease_keywords(jinja_env, es_url, index_es, list_of_keywords, str(disease)) if type(df_kw_by_disease) == str: df_kw_by_disease = df else: if type(df) != str: # For disease that they have empy time serie for each keywords: we don't plot them df_kw_by_disease = pd.concat([df_kw_by_disease, df], axis=1) # df_kw_by_disease_without_covid = df_kw_by_disease[df_kw_by_disease["disease"] != "SARS-CoV-2 "] logger.debug(df_kw_by_disease.keys()) # make subplot list_disease_with_timeserie = set([d[0] for d in df_kw_by_disease.keys()]) nb_of_subplots = len(list_disease_with_timeserie) subplots_timeseries_fig = make_subplots(rows=nb_of_subplots, cols=1) current_row = 0 for disease in list_disease_with_timeserie: timeserie_fig = px.bar(df_kw_by_disease[disease], facet_col_wrap=2) current_row = current_row + 1 # timeserie_fig.show() """ Make a subplot with plotly express: it has net been implemented yet ! Instead we have to convert our plotly express object into trace """ for trace in range(len(timeserie_fig["data"])): subplots_timeseries_fig.add_trace(timeserie_fig["data"][trace], row=current_row, col=1) # subplots_timeseries_fig.show() """ Clustering tweets for a disease name """ path_param_keywords = "/home/rdecoupe/PycharmProjects/mood-tweets-collect/params/keywordsFilter.csv" params_kw = pd.read_csv(path_param_keywords) logger.info("time series by disease") # For Avian Influenza: disease = "Avian influenza" list_of_keywords = ['Fowl', 'Bird', 'Avian', 'HPAI', 'FowlPlague', 'AvianInfluenza', 'avianInfluenza', 'Avianflu', 'bird', 'BirdFlu'] corpus_tweets = get_tweet_content_by_disease(jinja_env, es_url, index_es, list_of_keywords, disease) corpus_tweets_list = corpus_tweets.text.values.tolist() corpus_tweets_list = list(set(corpus_tweets_list)) #Remove duplicate tweets (mostly RT) visualize_sentence_through_embedding(corpus_tweets_list) logger.info("EDA stop")