""" @brief: Download metric from Elasticsearch to easily compute TF-IDF matric @author: R.Decoupes @copyright: CeCILL-B Download metric from Elasticsearch to easily compute TF-IDF matric """ import hashlib from elasticsearch import Elasticsearch from jinja2 import FileSystemLoader, Environment import os import eland as ed import requests import pandas as pd from tqdm import tqdm import json from sklearn.feature_extraction.text import TfidfVectorizer from transformers import pipeline def elastic_pagination_scrolling(result, headers): """ Elasticsearch limit results of query at 10 000. To avoid this limit, we need to paginate results and scroll This method append all pages form scroll search :param result: a result of a ElasticSearcg query :return: """ scroll_size_total = result['hits']['total']["value"] # Progress bar pbar = tqdm(total=scroll_size_total) results = [] results += result['hits']['hits'] # we add the first pages of results before scrolling scroll_size = len(results) pbar.update(scroll_size) while (scroll_size > 0): try: scroll_id = result['_scroll_id'] # res = client.scroll(scroll_id=scroll_id, scroll='60s') query = { "scroll": "1m", "scroll_id": scroll_id } query_json = json.dumps(query) res = requests.get(es_url + "_search/scroll", data=query_json, headers=headers, ).json() results += res['hits']['hits'] scroll_size = len(res['hits']['hits']) pbar.update(scroll_size) except: pbar.close() break pbar.close() return results def elasticquery(es_url, index_es, query): """ Query elastic with selected fields :param es_url: :param index_es: :param query: :return: a dataframe """ headers = {'content-type': 'application/json'} try: r = requests.get(es_url + index_es + "/_search?scroll=1m&size=1000", # context for scrolling up to 1 minute data=query, headers=headers, ) except Exception as e: print("Can not query: "+str(query)) results = elastic_pagination_scrolling(r.json(), headers) df_results = pd.DataFrame(results) """ Formating elasticsearch respons 1. We extract nested fieds from elasticsearch: json_normalize the nested column 2. From cell value, extract the first element. Indeed: When we select specific fields in a elastic query, the response always send value as list (even if there is one value). """ df_results = pd.json_normalize(df_results['fields']) # for every cell we retrieve the first value from list. Sometimes there is NaN value when there is empty value. df_results = df_results.applymap(lambda x: x[0] if isinstance(x, list) else '') return df_results def tf_idf(list_of_docs, lang='english', nb_top_score=1000): try: vectorizer = TfidfVectorizer( stop_words=lang, max_features=250000, token_pattern="[A-zÀ-ÿ0-9#@]+" ) vectors = vectorizer.fit_transform(list_of_docs) except: print("tf-idf failled") listOfTerms = vectorizer.get_feature_names_out() countOfTerms = vectors.todense().tolist() df = pd.DataFrame( countOfTerms, columns=listOfTerms ) # sort value by TF-IDF score tf_idf_top_score = pd.DataFrame() tf_idf_top_score["index_of_tweet"] = df.idxmax() # keep a track from tweet ID tf_idf_top_score["tfidf_score"] = df.max().values # retrieve TF-IDF score for the term in document tf_idf_top_score = tf_idf_top_score.sort_values(by="tfidf_score", ascending=False) # sort values return tf_idf_top_score if __name__ == '__main__': # print working directory: print("working directory is: " + str(os.getcwd())) # Filter state = 'aquitaine' start_date = "2020-12-01T00:00:00.172Z" end_date = "2022-02-25T15:51:46.172Z" filters = { 'state' : ["aquitaine", "aquitaine", "occitanie"], 'start_date' : ["2020-12-01T00:00:00.172Z", "2021-04-01T00:00:00.172Z", "2021-01-01T00:00:00.172Z"], 'end_date' : ["2021-12-31T23:00:00.172Z", "2022-02-25T15:51:46.172Z", "2021-07-31T23:00:00.172Z"] } df_filters = pd.DataFrame(filters) # connect to elastic """ Why not using eland ? # we could not filter eland df with rest.features.properties.state certainly because there are to much fields ? # Two solutions : # - we are using eland with "ed_tweets["rest_user_osm.extent"] and intersect with polygones # - we use elasticsearch package with Jinja2 template and normalization # We prefere to use elasticsearch """ #ed_tweets = ed.DataFrame("http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200", # es_index_pattern="mood-tetis-tweets-collect") es_url = "http://mo-mood-tetis-tweets-collect.montpellier.irstea.priv:9200/" client_es = Elasticsearch(es_url) index_es = "mood-tetis-tweets-collect" # init jinja2 configuration template_dir = os.path.join(os.path.dirname(__file__), "eda_templates") jinja_env = Environment(loader=FileSystemLoader(template_dir)) template = jinja_env.get_template("filter_by_state_and_date.j2") for i, filter in df_filters.iterrows(): query = template.render( state=filter["state"], start_date=filter["start_date"], end_date=filter["end_date"] ) df_results = elasticquery(es_url, index_es, query) df_results["index_of_tweet"] = df_results.index df_tfidf = tf_idf(df_results["text"].tolist()) df_tfidf["tf_idf_terms"] = df_tfidf.index df_tfidf = df_tfidf.merge(df_results, on="index_of_tweet") # Translation to english model_checkpoint_fr = "Helsinki-NLP/opus-mt-fr-en" translator_fr = pipeline("translation", model=model_checkpoint_fr) # zeroshot classification classifier = pipeline("zero-shot-classification", model="digitalepidemiologylab/covid-twitter-bert-v2-mnli") candidate_labels_fr = ["grippe aviaire"] # candidate_labels_fr = ["covid-19", "grippe aviaire", "AMR", "tiques", "autres"] candidate_labels_en = ["avian influenza"] # candidate_labels_en = ["covid-19", "avian influenza", "AMR", "tick borne", "others"] classifier_results = [] classifier_results_2 = [] for i, tweets in tqdm(df_tfidf.iterrows(), total=df_tfidf.shape[0]): text = tweets["text"] try: text_translated = text # text_translated = translator_fr(text)[0]["translation_text"] classifier_results.append(classifier(text_translated, candidate_labels_fr)["scores"]) item = {"text" : text, "scores" : classifier(text_translated, candidate_labels_fr)["scores"]} classifier_results_2.append(item) except: df_tfidf.drop([i], inplace=True) print("text: " + text + " | translated: " + text_translated) classifier_df = pd.DataFrame(classifier_results, columns=candidate_labels_fr) try: f = open("analysis-output/test_2.txt", "w") for l in classifier_results_2: f.write(str(l)) f.close() except: print("can not save file with results from zeroshot") classifier_df_2 = pd.DataFrame(classifier_results_2) # classifier_df_2.to_csv("analysis-output/acquitaine_test.csv") df_tfidf = df_tfidf.join(classifier_df) df_tfidf.to_csv("analysis-output/" + filter["state"] + "_" + filter["start_date"] + "_digitalepidemiologylab.csv") # df_tfidf.to_pickle("analysis-output/acquitaine-digitalepidemiologylab.pkl") # prepare to Gephi for graph vizu: Graph bipartites. Nodes are Newspaper and TF-IDf news_paper_name = pd.read_csv("./../params/accountsFollowed.csv") # get account (followed by MOOD) names news_paper_name["retweeted_status.user.id"] = news_paper_name["twitterID"] # prepare for merge gephi = df_tfidf # gephi["Source"] = gephi["user.id"].apply(lambda x: hashlib.md5(str(x).encode()).hexdigest()) # pseudonimization gephi["Source"] = gephi.index # id du lien gephi["Target"] = gephi["retweeted_status.user.id"] gephi["Id"] = gephi.index gephi["Label"] = gephi["tf_idf_terms"] gephi["timeset"] = gephi["@timestamp"] gephi = gephi[gephi["Target"].str.len() !=0] # filter out tweet that are not retweeted gephi[["Id", "Label", "Source", "Target", "timeset"]].to_csv( "analysis-output/acquitaine_script_gephi_edge.csv", index=False ) # gephi.to_csv("analysis-output/gephi-debug.csv") # Node: newspapers (MOOD account followed) gephi_node = pd.DataFrame(gephi["retweeted_status.user.id"].unique(), columns=["retweeted_status.user.id"]) gephi_node["Label"] = gephi_node.merge(news_paper_name, on="retweeted_status.user.id")["account"] gephi_node["Id"] = gephi_node["retweeted_status.user.id"] # Node: TF-IDF gephi_node_sub = pd.DataFrame(gephi["tf_idf_terms"].unique(), columns=["tf_idf_terms"]) gephi_node_sub["Id"] = gephi_node_sub.merge(gephi, on="tf_idf_terms")["Id"] gephi_node_sub = gephi_node_sub.rename(columns={"tf_idf_terms": "Label", "Id": "Id"}) gephi_node = gephi_node.append(gephi_node_sub) gephi_node.to_csv( "analysis-output/acquitaine_script_gephi_node.csv", index=False )