Source code for internetnl_domain_analyse.domain_analyse_classes

import codecs
import logging
import os
import pickle
import re
import sqlite3
import sys
from collections import Counter
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
import yaml

from weighted_sample_statistics import WeightedSampleStatistics, VariableProperties
from weighted_sample_statistics import (
    get_records_select,
    rename_all_variables,
    prepare_df_for_statistics,
)
from internetnl_domain_analyse.domain_plots import (
    make_cdf_plot,
    make_bar_plot,
    make_bar_plot_stacked,
)
from internetnl_domain_analyse.latex_output import make_latex_overview
from internetnl_domain_analyse.utils import (
    read_tables_from_sqlite,
    get_all_clean_urls,
    dump_data_frame_as_sqlite,
    add_derived_variables,
    fill_booleans,
    prepare_stat_data_for_write,
    get_option_mask,
    impose_variable_defaults,
    add_missing_groups,
    clean_all_suffix,
    get_windows_or_linux_value,
)

_logger = logging.getLogger(__name__)

tld_logger = logging.getLogger("tldextract")

mpl_logger = logging.getLogger("matplotlib")
mpl_logger.setLevel(logging.WARNING)


[docs] def make_plot_cache_file_name(cache_directory, file_base, prefix): return cache_directory / Path("_".join([prefix, file_base, "cache_for_plot.pkl"]))
[docs] class ImageFileInfo: def __init__( self, scan_data_key, cache_file_name_base="image_info", cache_directory="cache" ): self.scan_data_key = scan_data_key self.cache_directory = Path(cache_directory) self.cache_directory.mkdir(exist_ok=True) cache_file_name = Path( "_".join([cache_file_name_base, scan_data_key]) ).with_suffix(".yml") self.cache_file_name = self.cache_directory / cache_file_name self.data = None
[docs] def add_entry( self, plot_key, plot_info, image_key, sub_image_label, file_name, tex_right_shift=None, section=None, ): """add a new entry""" if image_key not in self.data.keys(): self.data[image_key] = dict() self.data[image_key][plot_key] = dict( file_name=file_name, tex_right_shift=tex_right_shift, sub_image_label=sub_image_label, ) if section: self.data[image_key][plot_key]["section"] = section # To get the key order in the dict the same as in the input file, alter the order if len(self.data[image_key].keys()) > 1: tmp_data = self.data[image_key].copy() self.data[image_key] = dict() for plot_key in plot_info.keys(): try: tex_prop = tmp_data[plot_key] except KeyError as err: # this entry is not in the plot. No problem. skip it pass else: self.data[image_key][plot_key] = tex_prop
[docs] def fix_order(self, variables): tmp_info = self.data.copy() self.data = dict() for var_name in variables.index: try: var_entry = tmp_info[var_name] except KeyError as err: _logger.debug(f"no entry for {var_name}. No problem, skipping") else: self.data[var_name] = var_entry
[docs] def read_cache(self): """Lees de cache""" if self.cache_file_name.exists(): with codecs.open( self.cache_file_name.as_posix(), "r", encoding="UTF-8" ) as stream: self.data = yaml.load(stream=stream, Loader=yaml.Loader) else: self.data = dict()
[docs] def write_cache(self): """Schrijf de cache""" with codecs.open( self.cache_file_name.as_posix(), "w", encoding="UTF-8" ) as stream: yaml.dump(data=self.data, stream=stream, Dumper=yaml.Dumper)
[docs] class RecordCacheInfo: def __init__( self, records_cache_data: dict, year_key: str, stat_directory: str = None ): """ Store the properties of the cache file in a class """ self.records_cache_data = records_cache_data self.stat_directory = stat_directory self.year_key = f"{year_key}" match = re.search("20(\d\d)", self.year_key) if match: self.year_digits = match.group(1) else: self.year_digits = year_key[-2:] self.cache_dir = None self.file_name = None self.table_names = None self.get_cache_file_name() self.get_cache_table_names()
[docs] def get_cache_file_name(self): """ Retrieve the cache file name from the dictionary. If environment variables are given, base the directory on the environment name. Names are given like RECORDS_CACHE_DIR_20, RECORDS_CACHE_DIR_21, for 2020, 2021 resp. """ records_environment_variable = "_".join(["RECORDS_CACHE_DIR", self.year_key]) records_cache_dir_name = os.getenv(records_environment_variable) if records_cache_dir_name is None: records_cache_dir_name = self.records_cache_data.get( "records_cache_directory", "." ) records_cache_dir_name = records_cache_dir_name.replace( "{{ stat_directory }}", self.stat_directory ) self.cache_dir = Path(records_cache_dir_name) records_file_basename = Path(self.records_cache_data["records_cache_file"]) self.file_name = self.cache_dir / records_file_basename
[docs] def get_cache_table_names(self): """ Get the table names of the cache files. """ table_records_environment_variable = "_".join( ["RECORDS_TABLE_RECS", self.year_key] ) table_records_name = os.environ.get(table_records_environment_variable) table_info_environment_variable = "_".join( ["RECORDS_TABLE_INFO", self.year_key] ) tabl_info_name = os.environ.get(table_info_environment_variable) if table_records_name is None: # the environment variabel RECORDS_TABLE_INFO is not set. read from the settings file self.table_names = self.records_cache_data.get("records_table_names") if self.table_names is None: # It is also not found in the settings file. Make a guess self.table_names = [ f"records_df_{self.year_digits}_2", f"info_records_df_{self.year_digits}", ] else: self.table_names = [table_records_name, tabl_info_name]
[docs] class DomainAnalyser: def __init__( self, scan_data_key=None, cache_file_base="tables_df", cache_directory_base_name=None, tld_extract_cache_directory=None, output_file=None, reset=None, records_cache_info: RecordCacheInfo = None, internet_nl_filename=None, breakdown_labels=None, statistics: dict = None, default_scan=None, variables: dict = None, module_info: dict = None, weights=None, url_key="website_url", suffix_key="suffix", translations=None, module_key="module", variable_key="variable", sheet_renames=None, n_digits=None, write_dataframe_to_sqlite=False, statistics_to_xls=False, n_bins=100, mode=None, correlations=None, categories=None, dump_cache_as_sqlite=False, ): _logger.info(f"Running here {os.getcwd()}") self.records_cache_info = records_cache_info if output_file is None: self.output_file = Path("output.sqlite") else: self.output_file = Path(output_file) self.output_directory = Path("output") self.output_directory.mkdir(exist_ok=True) self.output_file = self.output_directory / self.output_file outfile_suff = self.output_file.suffixes outfile_base = self.output_file.with_suffix("").with_suffix("").as_posix() outfile_year = Path( "_".join([outfile_base, scan_data_key, self.records_cache_info.year_key]) ) self.output_file = outfile_year.with_suffix(".".join(outfile_suff)) self.dump_cache_as_sqlite = dump_cache_as_sqlite self.scan_data_key = scan_data_key self.breakdown_labels = breakdown_labels self.correlations = correlations self.categories = categories self.statistics = statistics self.default_scan = default_scan self.module_key = module_key self.variable_key = variable_key self.module_info = module_info self.variables = self.variable_dict2df(variables, module_info) self.n_digits = n_digits self.n_bins = n_bins self.sheet_renames = sheet_renames self.url_key = url_key self.suffix_key = suffix_key self.be_id = "be_id" self.mi_labels = ["sbi", "gk_sbs", self.be_id] self.translations = translations self.categories_coefficient_df = None self.correlation_coefficient_df = None if internet_nl_filename is not None: self.internet_nl_filename = internet_nl_filename else: self.internet_nl_filename = Path("internet_nl.sqlite") self.cache_directory = Path( "_".join([cache_directory_base_name, self.records_cache_info.year_key]) ) self.cache_directory.mkdir(exist_ok=True) if tld_extract_cache_directory is None: self.tld_extract_cache_directory = "tld_cache" else: self.tld_extract_cache_directory = tld_extract_cache_directory cache_file_base = Path( "_".join([cache_file_base, self.records_cache_info.year_key, scan_data_key]) + ".pkl" ) self.cache_file = self.cache_directory / cache_file_base self.cate_outfile = None self.cate_pkl_file = None self.corr_outfile = None self.corr_pkl_file = None self.score_outfile = None self.score_pkl_file = None if self.correlations is not None: plot_info = self.correlations["plots"] else: plot_info = None try: self.cate_outfile = self.cache_directory / Path( self.categories["categories_output_file"] ) except TypeError: _logger.debug("categories not defined") else: self.cate_pkl_file = self.cate_outfile.with_suffix(".pkl") try: self.corr_outfile = self.cache_directory / Path( plot_info["correlation"]["output_file"] ) except TypeError: _logger.debug("correlations not defined") else: self.corr_pkl_file = self.corr_outfile.with_suffix(".pkl") self.score_outfile = self.cache_directory / Path( plot_info["scores_per_interval"]["output_file"] ) self.score_pkl_file = self.score_outfile.with_suffix(".pkl") if reset is None: self.reset = None else: self.reset = int(reset) self.weight_key = weights self.dataframe = None self.score_df = None self.all_stats_per_format = dict() self.all_hist_per_format = dict() self.all_plots = None have_cache = self.check_if_cache_exist(mode) if (self.reset is not None and self.reset <= 1) or not have_cache: # de microdata alleen lezen als we geen pickle files van de statistische output hebben # als we alleen plaatjes willen maken is het sneller om de uitgerekende tabellen van # cache te lezen self.read_data() if write_dataframe_to_sqlite: if self.dataframe is not None: self.write_data() sys.exit(0) else: msg = ( "The write_data_frame option only works if you force to read the micro " "data with --reset 0 or --reset 1" ) _logger.warning(msg) if mode in ("all", "statistics"): self.calculate_statistics() if statistics_to_xls or reset is not None or not have_cache: self.write_statistics() if mode in ("all", "correlations") and self.dataframe is not None: self.calculate_correlations_and_scores() if mode in ("all", "categories") and self.dataframe is not None: self.calculate_categories()
[docs] def check_if_cache_exist(self, mode: str): cache_exists = True if mode in ("all", "statistics"): cache_exists = self.cache_file.exists() if mode in ("all", "correlations"): cache_exists = cache_exists and self.corr_pkl_file.exists() cache_exists = cache_exists and self.score_pkl_file.exists() if mode in ("all", "categories"): cache_exists = cache_exists and self.cate_pkl_file.exists() return cache_exists
[docs] def variable_dict2df(self, variables, module_info: dict = None) -> DataFrame: """ Converteer de directory met variable info naar een data frame Args: variables: dict met variable info module_info: dict met module informatie Returns: dataframe """ var_df = pd.DataFrame.from_dict(variables).unstack().dropna() var_df = var_df.reset_index() var_df = var_df.rename( columns={ "level_0": self.module_key, "level_1": self.variable_key, 0: "properties", } ) var_df.set_index(self.variable_key, drop=True, inplace=True) var_df = impose_variable_defaults( var_df, module_info=module_info, module_key=self.module_key ) return var_df
[docs] def write_statistics(self): _logger.info(f"Writing statistics {self.output_file}") connection = sqlite3.connect(self.output_file) excel_file = Path(self.output_file).with_suffix(".xlsx") sheets = list() cnt = 0 with pd.ExcelWriter(str(excel_file), engine="openpyxl") as writer: _logger.info(f"Start writing standard output to {excel_file}") for file_base, all_stats in self.all_stats_per_format.items(): stat_df = prepare_stat_data_for_write( file_base=file_base, all_stats=all_stats, variables=self.variables, variable_key=self.variable_key, module_key=self.module_key, breakdown_labels=self.breakdown_labels, n_digits=self.n_digits, connection=connection, ) cache_file = make_plot_cache_file_name( cache_directory=self.cache_directory, prefix=self.scan_data_key, file_base=file_base, ) _logger.info(f"Writing cache for stat {cache_file}") with open(cache_file, "wb") as stream: pickle.dump(stat_df, stream) sheet_name = file_base if self.sheet_renames is not None: for rename_key, sheet_rename in self.sheet_renames.items(): pat = sheet_rename["pattern"] rep = sheet_rename["replace"] sheet_name = re.sub(pat, rep, sheet_name) if len(sheet_name) > 32: sheet_name = sheet_name[:32] if sheet_name in sheets: sheet_name = sheet_name[:30] + "{:02d}".format(cnt) cnt += 1 sheets.append(sheets) stat_df.to_excel(excel_writer=writer, sheet_name=sheet_name)
[docs] def calculate_statistics_one_breakdown(self, group_by): index_names = group_by + [self.be_id] try: dataframe = prepare_df_for_statistics( self.dataframe, index_names=index_names, units_key="units" ) except KeyError: _logger.info(f"Breakdown on {index_names} does not exist") return None, None all_stats = dict() all_hist = dict() for var_key, var_prop in self.variables.iterrows(): _logger.debug(f"{var_key}") var_prop_klass = VariableProperties( variables=self.variables, column=var_key ) column = var_key column_list = list([var_key]) var_module = var_prop["module"] try: module = self.module_info[var_module] except KeyError as err: _logger.warning(err) continue if not module.get("include", True): continue var_type = var_prop["type"] var_filter = var_prop["filter"] var_weight_key = var_prop["gewicht"] schaal_factor_key = "_".join(["ratio", var_weight_key]) units_schaal_factor_key = "_".join(["ratio", "units"]) weight_cols = set( list([var_weight_key, schaal_factor_key, units_schaal_factor_key]) ) df_weights = dataframe.loc[:, list(weight_cols)] try: data, column_list = get_records_select( dataframe=dataframe, variables=self.variables, var_type=var_type, column=column, column_list=column_list, output_format="statline", var_filter=var_filter, ) except KeyError: _logger.info(f"Failed to get selection of {column}. Skipping") continue if data is None: _logger.info(f"Could not get data selection for {var_key}. Skipping") continue if data.index.size < df_weights.index.size: _logger.info( f"we filtered data, reducing from {df_weights.index.size} to {data.index.size}" ) df_weights = df_weights.reindex(data.index) stats = WeightedSampleStatistics( group_keys=group_by, records_df_selection=data, weights_df=df_weights, column_list=column_list, var_type=var_type, var_weight_key=var_weight_key, scaling_factor_key=schaal_factor_key, units_scaling_factor_key=units_schaal_factor_key, report_numbers=var_prop_klass.report_number, ) stats.calculate() if ( not np.isnan(var_prop_klass.report_number) and var_prop_klass.report_number ): all_stats[column] = stats.records_sum else: all_stats[column] = stats.records_weighted_mean_agg # voeg hier het histogram van de data toe all_hist[var_key] = calculate_histogram_per_breakdown( data, var_key=var_key, df_weights=df_weights, n_bins=self.n_bins ) return all_stats, all_hist
[docs] def get_correct_categories_count(self): """Bekijk per record hoeveel categorieën goed zijn en geef terug als dataframe""" col_sel = list() for cat_key, cat_prop in self.categories["index_categories"].items(): variable = cat_prop["variable"] col_sel.append(variable) _logger.debug(f"make selection\n{col_sel}") data_df: pd.DataFrame = self.dataframe[col_sel] # alleen 1 wordt als succes beschouwd data_df = data_df == 1 count = data_df.sum(axis=1) count = count.rename("count") return data_df, count
[docs] def calculate_categories(self): if self.cate_pkl_file.exists() and self.reset is None: _logger.info( f"Cache {self.cate_pkl_file} and already exist. " f"Skip calculation categories and go to plot" ) return if self.dataframe is None: msg = "For correlations you need the microdata. Run with --reset 1" raise ValueError(msg) _logger.info("Calculating categories") score_df = self.dataframe["percentage"].copy() score_df = score_df.rename("score") weights = self.dataframe[self.weight_key].copy() data_df, count = self.get_correct_categories_count() tot = pd.concat([score_df, count], axis=1) conditional_scores = list() sum_per_number_of_cat = list() total_sum = 0 mask_tot: pd.Series = None for number_of_cat in range(0, 5): mask = tot["count"] == number_of_cat tot_cond = tot.loc[mask, "score"] sel_df = data_df[mask] sum_per_number_of_cat.append(sel_df.sum(axis=0)) ww = weights[mask].to_numpy() if mask_tot is None: mask_tot = mask else: mask_tot = mask_tot | mask hist, bin_edge = np.histogram( tot_cond.to_numpy(), weights=ww, density=False, range=(0, 100), bins=self.n_bins, ) hist_sum = hist.sum() total_sum += hist_sum conditional_scores.append(hist) sum_per_number_of_cat_df = pd.DataFrame.from_records(sum_per_number_of_cat) bin_width = bin_edge[1] - bin_edge[0] conditional_scores_df = pd.DataFrame().from_records(conditional_scores) conditional_scores_df.index = conditional_scores_df.index.rename("n_categories") conditional_scores_df = conditional_scores_df.T conditional_scores_df.index = bin_edge[:-1] conditional_scores_df /= total_sum * bin_width check_sum = conditional_scores_df.sum().sum() * bin_width _logger.debug(f"sum {check_sum}") _logger.info(f"Writing to {self.cate_pkl_file}") conditional_scores_df.to_pickle(self.cate_pkl_file) sum_file = self.cate_pkl_file.parent / Path( self.cate_pkl_file.stem + "_sum.pkl" ) _logger.info(f"Writing to {sum_file}") sum_per_number_of_cat_df.to_pickle(sum_file)
[docs] def calculate_correlations_and_scores(self): if ( self.corr_pkl_file.exists() and self.score_pkl_file.exists() and self.reset is None ): _logger.info( f"Cache {self.corr_pkl_file} and {self.score_pkl_file} already exist. " f"Skip calculation and go to plot" ) return if self.dataframe is None: msg = "For correlations you need the microdata. Run with --reset 1" raise ValueError(msg) data_df_count, count = self.get_correct_categories_count() index_columns = self.correlations["index_correlations"] _logger.info("Calculating correlations") col_sel = list(index_columns.keys()) _logger.debug(f"make selection\n{col_sel}") data_df: pd.DataFrame = self.dataframe[col_sel] # alleen 1 wordt als succes beschouwd data_df = data_df == 1 # verkrijg de categorieën van variabele met hoge correlatie categories = dict() for col_name, categorie in index_columns.items(): try: categories[categorie].append(col_name) except KeyError: categories[categorie] = [col_name] # bereken de score per category en vergelijk met de internet.nl-score self.score_df = self.dataframe[["percentage"]].copy() / 100 self.score_df.rename(columns={"percentage": "score"}, inplace=True) for categorie, columns in categories.items(): selection = data_df[columns] max_score = len(columns) self.score_df[categorie] = selection.sum(axis=1) / max_score self.score_df = pd.concat([self.score_df, count], axis=1) desc = data_df.describe() _logger.debug(f"making descr\n{desc}") # reken correlatie twee keer uit corr = data_df.corr() ordered_index = corr.sum().sort_values(ascending=False).index data_df = data_df[ordered_index] corr = data_df.corr() self.correlation_coefficient_df = corr _logger.info(f"Schrijf naar {self.corr_outfile}") with sqlite3.connect(str(self.corr_outfile)) as connection: corr.to_sql(name="correlations", con=connection, if_exists="replace") _logger.info(f"Schrijf naar {self.corr_pkl_file}") corr.to_pickle(self.corr_pkl_file.as_posix()) _logger.info(f"Schrijf naar {self.score_pkl_file}") self.score_df.to_pickle(self.score_pkl_file.as_posix()) _logger.debug(f"making corrected\n{corr}")
[docs] def calculate_statistics(self): _logger.info("Calculating statistics") self.all_stats_per_format = dict() self.all_hist_per_format = dict() missing_groups = None for file_base, props in self.statistics.items(): scan_data = props.get("scan_data", self.scan_data_key) if scan_data != self.scan_data_key: _logger.debug(f"SKipping {scan_data} for {self.scan_data_key}") continue if not props.get("do_it", True): _logger.debug( f"SKipping breakdown {file_base} for {self.scan_data_key}" ) continue _logger.info(f"Processing {file_base}") file_name = Path("_".join([file_base, self.scan_data_key]) + ".pkl") cache_file = self.cache_directory / file_name group_by = list(props["groupby"].values()) group_by_original = None if ( group_by_if_not_exist := props.get("groupby_if_not_exist") ) and self.dataframe is not None: have_missing_groups = False for group in group_by: if group not in self.dataframe.columns: have_missing_groups = True if have_missing_groups: group_by_original = group_by group_by = list(group_by_if_not_exist.values()) missing_groups = props.get("missing_groups") combination: list = props.get("combination") if combination is None: if cache_file.exists() and self.reset is None: _logger.info(f"Reading stats from cache {cache_file}") with open(str(cache_file), "rb") as stream: stat_df, all_hist = pickle.load(stream) elif self.dataframe is not None: _logger.info("Calculating statistics from micro data") all_stats, all_hist = self.calculate_statistics_one_breakdown( group_by=group_by ) if group_by_original is not None: all_stats = add_missing_groups( all_stats, group_by, group_by_original, missing_groups ) if all_stats is None: _logger.info( f"Could not calculate statistics for breakdown {group_by}. Skip" ) continue # maak er een pandas data frame van stat_df = pd.concat(list(all_stats.values()), axis=1, sort=False) is_nan = stat_df.index == "nan" stat_df = stat_df.loc[~is_nan] _logger.info(f"Writing stats to cache {cache_file}") with open(str(cache_file), "wb") as stream: pickle.dump([stat_df, all_hist], stream) else: _logger.info(f"Statistics not available for {group_by}. Skipping") continue else: stats = list() for file_com in combination: try: prev_stats = self.all_stats_per_format[file_com] except KeyError: raise KeyError( f"Trying to add to combination {file_com}, but does not exists" ) else: stats.append(prev_stats) stat_df = pd.concat(stats, axis=0, sort=False) self.all_stats_per_format[file_base] = stat_df self.all_hist_per_format[file_base] = all_hist _logger.debug("Done with statistics")
[docs] def write_data(self): """write the combined data frame to sqlite lite""" count_per_lower_col = Counter([col.lower() for col in self.dataframe.columns]) for col_lower, multiplicity in count_per_lower_col.items(): if multiplicity > 1: for col in self.dataframe.columns: if col.lower() == col_lower: _logger.info(f"Dropping duplicated column {col}") self.dataframe.drop([col], axis=1, inplace=True) break output_file_name = self.cache_file.with_suffix(".sqlite") _logger.info(f"Writing dataframe to {output_file_name}") with sqlite3.connect(str(output_file_name)) as connection: self.dataframe.to_sql(name="dataframe", con=connection, if_exists="replace")
[docs] def read_data(self): if not self.cache_file.exists() or self.reset == 0: index_name = self.be_id _logger.info(f"Reading table data from {self.records_cache_info.file_name}") records = read_tables_from_sqlite( self.records_cache_info.file_name, self.records_cache_info.table_names, index_name, ) table_names = ["report", "scoring", "status", "results"] index_name = "index" _logger.info( f"Reading tables {table_names} from {self.internet_nl_filename}" ) tables = read_tables_from_sqlite( self.internet_nl_filename, table_names, index_name ) _logger.info(f"Done") tables.reset_index(inplace=True) tables.rename(columns=dict(index=self.url_key), inplace=True) # na are always set to 0 # some urls did not give any results, drop the empty lines. note that # the first column is left out as that is the url name tables.dropna(axis=0, how="all", subset=tables.columns[1:], inplace=True) # split the tables in numerical and non-numerical part and fill with either NA (for # numerical) or with 'nan' for strings. original_columns = tables.columns tables_num = tables.select_dtypes(include="number") number_columns = tables_num.columns non_number_columns = [ col for col in original_columns if col not in number_columns ] tables_non_num = tables[non_number_columns] # fill with NA for numerical values and 'nan' for non-numerical values tables_num = tables_num.fillna(pd.NA) tables_non_num = tables_non_num.fillna("nan") # put back to one data frame again and set order equal to orignal dataframe tables = pd.concat([tables_num, tables_non_num], axis=1) tables = tables[original_columns] self.translations["nans"] = dict(nan=0) if self.translations is not None: tables = fill_booleans( tables, translations=self.translations.copy(), variables=self.variables, ) rename_all_variables(tables, self.variables) for column in tables: try: var_props = self.variables.loc[column, :] except KeyError: _logger.debug(f"Column {column} not defined in settings. Skipping") continue var_type = var_props.get("type") var_translate = var_props.get("translateopts") if var_translate is not None: # op deze manier kunnen we de vertaling {Nee: 0, Ja: 1} op de column waardes los # laten, zodat we alle Nee met 0 en Ja met 1 vervangen trans = yaml.load(str(var_translate), Loader=yaml.Loader) for nan_string in ("na", "nan", "NaN"): if nan_string in trans.keys(): # we have added an 'na' option for the translations. Take care of it is_na = tables[column].isna() if is_na.any(): # should not happen anymore because of the dropna above _logger.info( f"Filling {nan_string} with na in {column}" ) tables[column] = tables[column].fillna(nan_string) unique_values = set(tables[column].unique()) vals_to_translate = set(trans.keys()).intersection(unique_values) missing_values = unique_values.difference(set(trans.keys())) if vals_to_translate: if missing_values: _logger.warning( f"Column {column} misses the translations for " f"{missing_values}. Please update your settings" ) _logger.debug(f"Convert for {column} trans keys {trans}") tables[column] = tables[column].map(trans) else: _logger.debug(f"No Convert for {column} trans keys {trans}") if var_type == "dict": tables[column] = tables[column].astype("category") elif var_type in ("bool", "percentage", "float"): tables[column] = tables[column].astype("float64") # Hier gaan we de url name opschonen. Sla eerst de oorspronkelijke url op original_url = "_".join([self.url_key, "original"]) records = pd.concat( [records, records[self.url_key].rename(original_url)], axis=1 ) tables = pd.concat( [tables, tables[self.url_key].rename(original_url)], axis=1 ) _logger.info("Start cleaning urls...") if _logger.getEffectiveLevel() > logging.DEBUG: show_progress = True else: show_progress = False clean_url_cache = self.cache_directory / Path("clean_url_cache.pkl") if clean_url_cache.exists(): _logger.info(f"Reading clean urls from cache {clean_url_cache}") with open(clean_url_cache, "rb") as stream: all_clean_urls, all_suffix = pickle.load(stream) else: all_clean_urls, all_suffix = get_all_clean_urls( urls=records[self.url_key], show_progress=show_progress, cache_directory=self.tld_extract_cache_directory, ) _logger.info(f"Writing clean urls to cache {clean_url_cache}") with open(clean_url_cache, "wb") as stream: pickle.dump([all_clean_urls, all_suffix], stream) _logger.info("Done!") records[self.url_key] = all_clean_urls suffix_df = pd.DataFrame( index=records.index, data=all_suffix, columns=[self.suffix_key] ) suffix_df_org = suffix_df.rename( columns={self.suffix_key: self.suffix_key + "_org"} ) records = pd.concat([records, suffix_df, suffix_df_org], axis=1) records.dropna(subset=[self.url_key], axis=0, inplace=True) records.reset_index(inplace=True) duplicated = tables[self.url_key].duplicated(keep="first") tables = tables[~duplicated] tables.dropna(subset=[self.url_key], axis=0, inplace=True) tables.dropna(how="all", axis=1, inplace=True) # hier voegen we nog afgeleide kolommen to tables = add_derived_variables(tables, self.variables) # Doe een left join omdat meerdere be's dezelfde url kunnen hebben. Dit is sowieso # het geval voor holdings. Dan moeten we de score van holdings ook meerdere keren # meenemen self.dataframe = pd.merge( left=records, right=tables, on=self.url_key, how="left" ) self.dataframe.dropna( subset=[self.weight_key], axis="index", how="any", inplace=True ) try: has_url = self.dataframe["url"].notnull() except KeyError as err: _logger.warning(err) else: self.dataframe = self.dataframe[has_url] mask = self.dataframe[self.be_id].duplicated() self.dataframe = self.dataframe[~mask] self.dataframe.set_index(self.be_id, inplace=True, drop=True) self.dataframe = clean_all_suffix( dataframe=self.dataframe, suffix_key=self.suffix_key, variables=self.variables, ) _logger.info( f"Writing {self.dataframe.index.size} records to " f"cache {self.cache_file.absolute()}" ) with open(str(self.cache_file), "wb") as stream: self.dataframe.to_pickle(stream) else: _logger.debug(f"Reading tables from cache {self.cache_file}") with open(str(self.cache_file), "rb") as stream: self.dataframe = pd.read_pickle(stream) _logger.info( f"Read {self.dataframe.index.size} records from " f"cache {self.cache_file.absolute()}" ) if self.dump_cache_as_sqlite: sqlite_cache = self.cache_file.with_suffix(".sqlite") dump_data_frame_as_sqlite(dataframe=self.dataframe, file_name=sqlite_cache)
[docs] class DomainPlotter: def __init__( self, scan_data, scan_data_key=None, default_scan=None, plot_info=None, show_plots=False, barh=False, image_directory=None, cache_directory=None, image_type="pdf", max_plots=None, tex_prepend_path=None, statistics=None, variables=None, cdf_plot=False, bar_plot=False, cor_plot=False, add_logo=True, cumulative=False, show_title=False, breakdown_labels=None, translations: dict = None, export_highcharts=False, highcharts_directory=None, correlations=None, tex_horizontal_shift=None, bovenschrift=True, variables_to_plot=None, exclude_variables=None, force_plots=False, latex_files=False, years_to_add_to_plot_legend=None, module_info=None, english=False, ): self.english = english self.scan_data = scan_data self.scan_data_key = scan_data_key self.default_scan = default_scan self.plot_info = plot_info self.show_plots = show_plots self.barh = barh self.max_plots = max_plots self.tex_prepend_path = tex_prepend_path self.cache_directory = Path(cache_directory) self.statistics = statistics self.variables = variables self.bar_plot = bar_plot self.cdf_plot = cdf_plot self.cumulative = cumulative self.show_title = show_title self.translations = translations self.correlations = correlations self.export_highcharts = export_highcharts self.force_plots = force_plots self.years_to_add_to_plot_legend = years_to_add_to_plot_legend if highcharts_directory is None: self.highcharts_directory = Path(".") else: self.highcharts_directory = Path(highcharts_directory) self.variables_to_plot = variables_to_plot self.exclude_variables = exclude_variables self.image_type = image_type self.image_directory = image_directory self.breakdown_labels = breakdown_labels self.image_info = ImageFileInfo( scan_data_key=scan_data_key, cache_directory=self.cache_directory ) self.image_info.read_cache() self.make_plots(add_logo=add_logo) self.image_info.fix_order(self.variables) self.image_info.write_cache() if latex_files: _logger.debug(f"making latex with bovenschrift={bovenschrift}") make_latex_overview( image_info=self.image_info, variables=self.variables, image_directory=self.image_directory, cache_directory=self.cache_directory, image_files=Path("image_files"), tex_prepend_path=self.tex_prepend_path, tex_horizontal_shift=tex_horizontal_shift, bovenschrift=bovenschrift, module_info=module_info, ) #
[docs] def get_plot_cache(self, scan_data_key, plot_key, year_key): year_label = f"{year_key}" cache_directory = "_".join([self.cache_directory.as_posix(), year_label]) cache_file = make_plot_cache_file_name( cache_directory=Path(cache_directory), prefix=scan_data_key, file_base=plot_key, ) _logger.debug(f"Reading {cache_file}") try: with open(cache_file, "rb") as stream: stats_df_per_year = pickle.load(stream) except FileNotFoundError as err: if self.scan_data[scan_data_key][year_key].get("data_file") is None: _logger.debug("We are skipping this year as the data is not available.") else: # we missen de pkl file terwijl we wel een data file hebben. Genereer de foutmelding _logger.warning(err) _logger.warning("Run script with option '--statistics_to_xls' first") stats_df_per_year = None return stats_df_per_year
[docs] def make_plots(self, add_logo=True): _logger.info("Making the plot") legend_translates = dict() for plot_key, plot_prop in self.plot_info.items(): if not plot_prop.get("do_it", True): _logger.debug(f"Skipping plot {plot_key}") continue _logger.debug(f"Plotting plot {plot_key}") label = plot_prop.get("label", plot_key) figsize = plot_prop.get("figsize") highcharts_height = plot_prop.get("highcharts_height") stat_prop = self.statistics[plot_key] scan_data_key = stat_prop.get("scan_data", self.scan_data_key) scan_data_per_year = self.scan_data[scan_data_key] last_year = list(scan_data_per_year.keys())[-1] scan_data_analyses = scan_data_per_year[last_year]["analyses"] variables = scan_data_analyses.variables try: report_number_empty = variables["report_number"].isna() except KeyError: report_number_empty = True variables["report_number"] = False else: variables.loc[report_number_empty, "report_number"] = False module_info = scan_data_analyses.module_info stats_df_per_year = {} last_year = None df_index_names = None for year, scan_info in scan_data_per_year.items(): df = self.get_plot_cache( scan_data_key=scan_data_key, plot_key=plot_key, year_key=year ) year_label = scan_info.get("label", year) if df is not None: stats_df_per_year[year_label] = df last_year = year df_index_names = list(df.index.names) if not self.english: jaar_level_name = "Jaar" else: jaar_level_name = "Year" index_names = [jaar_level_name] + df_index_names new_index_names = df_index_names + [jaar_level_name] module_level_name = new_index_names[0] question_level_name = new_index_names[1] stats_df = pd.concat(stats_df_per_year, names=index_names) # zet module vraag optie jaar als volgorde. stats_df = stats_df.reorder_levels(new_index_names) highcharts_title = plot_prop.get("title") export_svg_cdf = False export_svg_bar = False export_highcharts_cdf = self.export_highcharts export_highcharts_bar = self.export_highcharts highcharts_directory_cdf = None highcharts_directory_bar = None cdf_variables = {} if self.cdf_plot: plot_cdf = plot_prop.get("cdf_plot") if isinstance(plot_cdf, dict): cdf_variables = plot_cdf["variables"][scan_data_key] highcharts_title = cdf_variables.get("title") export_svg_cdf = False if plot_cdf: if cdf_fig_size := plot_cdf.get("figsize"): figsize = cdf_fig_size tex_horizontal_shift = None if self.bar_plot: plot_bar = plot_prop.get("bar_plot") highcharts_directory_bar = self.highcharts_directory if isinstance(plot_bar, dict): if hc_sub_dir := plot_bar.get("highcharts_output_directory"): highcharts_directory_bar = highcharts_directory_bar / Path( hc_sub_dir ) export_svg_bar = plot_bar.get("export_svg", False) export_hc_bar = plot_bar.get("export_highcharts") tex_horizontal_shift = get_windows_or_linux_value( plot_bar.get("tex_horizontal_shift") ) plot_bar = plot_bar.get("apply", True) if export_hc_bar is not None: export_highcharts_cdf = export_hc_bar else: plot_bar = False y_max_pdf_plot = plot_prop.get("y_max_pdf_plot", 10) y_spacing_pdf_plot = plot_prop.get("y_spacing_pdf_plot", 5) y_max_bar_plot = plot_prop.get("y_max_bar_plot") legend_position = plot_prop.get("legend_position") legend_max_columns = plot_prop.get("legend_max_columns") y_spacing_bar_plot = plot_prop.get("y_spacing_bar_plot") bar_width = plot_prop.get("bar_width") box_margin = plot_prop.get("box_margin") sort_values = plot_prop.get("sort_values", False) subplot_adjust = plot_prop.get("subplot_adjust") reference_lines = plot_prop.get("reference_lines") if reference_lines is not None: for ref_key, ref_prop in reference_lines.items(): stat_prop = self.statistics[ref_key] scan_data_key = stat_prop.get("scan_data", self.default_scan) ref_stat = self.get_plot_cache( scan_data_key=scan_data_key, plot_key=plot_key, year_key=last_year, ) reference_lines[ref_key]["data"] = ref_stat if plot_prop.get("use_breakdown_keys", False): breakdown = self.breakdown_labels[plot_key] renames = {v: k for k, v in breakdown.items()} stats_df.rename(columns=renames, inplace=True) _logger.info(f"Plotting {plot_key}") plot_count = 0 stop_plotting = False if stats_df is not None: for module_name, module_df in stats_df.groupby( level=module_level_name, sort=False ): do_this_module = True for mod_key, mod_prop in module_info.items(): if mod_prop.get("label") == module_name and not mod_prop.get( "include", True ): do_this_module = False if not do_this_module: continue _logger.info(f"Module {module_name}") if stop_plotting: break for question_name, question_df in module_df.groupby( level=question_level_name, sort=False ): _logger.debug(f"Question {question_name}") # voorlaatste kolom bevat de variabele namen variable_name_key = question_df.index.names[-2] plot_variable = question_df.index.get_level_values( variable_name_key ).values[0] original_name = re.sub(r"_\d\.0$", "", plot_variable) question_type = variables.loc[original_name, "type"] unit = variables.loc[original_name, "unit"] keep_options = variables.loc[original_name, "keep_options"] section = variables.loc[original_name, "section"] question_df_clean = question_df.droplevel(variable_name_key) # variables_to_plot wordt als een list van een list in een tuple meegegeven # dus ([[variable1], [variables2]). Haal eerst level 0 eruit om te tuple # te verwijderen. Als variable_to_plot niet gegeven is dan is deze waarde # None, en slaan we het over. Als hij wel gegeven is dan zetten we de list # van lists om in een platte list if self.variables_to_plot is not None: var_to_plot_clean = [ vv[0] for vv in self.variables_to_plot if vv is not None ] if original_name not in var_to_plot_clean: _logger.debug( f"{original_name} not in variables to plot {self.variables_to_plot}. " f"Skipping..." ) continue if self.exclude_variables is not None: exclude_vars_clean = [ vv[0] for vv in self.exclude_variables if vv is not None ] if original_name in exclude_vars_clean: _logger.debug( f"{original_name} in exclude variables {self.exclude_variables}. " f"Skipping..." ) continue plot_info = PlotInfo( variables_df=variables, var_name=original_name, breakdown_name=plot_key, ) export_highcharts = export_highcharts_bar if cdf_prop := cdf_variables.get(original_name): highcharts_directory_cdf = self.highcharts_directory if highcharts_info_per_year := cdf_prop.get( "highcharts_info_per_year" ): for ( hc_year_key, hc_year_prop, ) in highcharts_info_per_year.items(): hc_dir = highcharts_directory_cdf / Path( hc_year_prop["highcharts_directory"] ) hc_lab = hc_year_prop.get("highcharts_label") highcharts_info_per_year[hc_year_key] = dict( highcharts_directory=hc_dir, highcharts_label=hc_lab, ) export_svg_cdf = cdf_prop.get("export_svg", False) export_hc_cdf = cdf_prop.get("export_highcharts") plot_cdf = cdf_prop.get("apply", True) if export_hc_cdf is not None: export_highcharts_cdf = export_hc_cdf else: plot_cdf = False highcharts_info_per_year = None if plot_info.directory is not None: # we overschrijven hier de subdir die onder de statistiek opgegeven is highcharts_directory = ( self.highcharts_directory / plot_info.directory ) else: if plot_bar: highcharts_directory = highcharts_directory_bar else: highcharts_directory = highcharts_directory_cdf if plot_info.label is not None: title = plot_info.label else: title = highcharts_title if title is not None: title = re.sub("\s{2,}", " ", title) if plot_info.y_max is not None: y_max = plot_info.y_max else: y_max = y_max_bar_plot if plot_info.legend_position is not None: legend_pos = plot_info.legend_position else: legend_pos = legend_position if plot_info.y_spacing is not None: y_spacing = plot_info.y_spacing else: y_spacing = y_spacing_bar_plot if plot_info.bar_width is not None: bar_width = plot_info.bar_width else: bar_width = bar_width if keep_options: # als keep options gegeven is dan houden we alle opties valide_opties = variables.loc[ original_name, "options" ].values() mask = get_option_mask( question_df=question_df_clean, variables=variables, question_type=question_type, valid_options=valide_opties, ) plot_df = module_df.loc[ (module_name, question_name, mask) ].copy() else: # neem de default die we als true bestempelen valide_opties = None mask = get_option_mask( question_df=question_df_clean, variables=variables, question_type=question_type, valid_options=valide_opties, ) plot_df = question_df_clean.loc[ (module_name, question_name, mask) ].copy() # dit is niet meer nodig omdat de kleuren toch gelijk blijven # plot_df = add_missing_years(plot_df, # years_to_plot=self.years_to_add_to_plot_legend, # jaar_level_name=jaar_level_name, # column=original_name) if variables.loc[original_name, "report_number"]: normalize_data = True else: normalize_data = False if self.translations is not None: plot_df.rename(columns=self.translations, inplace=True) xoff = 0 yoff = 0 if reference_lines is not None: for ref_key, ref_prop in reference_lines.items(): ref_stat_df = reference_lines[ref_key]["data"] ref_quest_df = None for ref_quest_name, ref_quest_df in ref_stat_df.groupby( level=question_level_name ): if ref_quest_name == question_name: break if ref_quest_df is not None: mask2 = get_option_mask( question_df=ref_quest_df, variables=variables, question_type=question_type, ) try: ref_df = ref_quest_df.loc[ (module_name, question_name, mask2) ].copy() except KeyError as err: _logger.warning(err) else: reference_lines[ref_key]["plot_df"] = ref_df _logger.info(f"Plot nr {plot_count}") if plot_bar: if keep_options: for ( year_key, local_scan_info, ) in scan_data_per_year.items(): year = local_scan_info.get("label", year_key) this_year_df = plot_df.loc[ slice(None), slice(None), slice(None), slice(None), year, ] image_file = make_bar_plot_stacked( year=year_key, plot_df=this_year_df, add_logo=add_logo, plot_key=plot_key, plot_variable=plot_variable, scan_data_key=scan_data_key, module_name=module_name, question_name=question_name, image_directory=self.image_directory, show_plots=self.show_plots, figsize=figsize, image_type=self.image_type, reference_lines=reference_lines, xoff=xoff, yoff=yoff, show_title=self.show_title, barh=self.barh, subplot_adjust=subplot_adjust, box_margin=box_margin, sort_values=sort_values, y_max_bar_plot=y_max, y_spacing_bar_plot=y_spacing, translations=self.variables.loc[ original_name, "options" ], export_highcharts=export_highcharts, export_svg=export_svg_bar, highcharts_directory=highcharts_directory, title=title, legend_position=legend_pos, normalize_data=normalize_data, force_plot=self.force_plots, enable_highcharts_legend=plot_info.enable_highcharts_legend, unit=unit, english=self.english, ) else: image_file = make_bar_plot( plot_df=plot_df, plot_key=plot_key, plot_variable=plot_variable, add_logo=add_logo, scan_data_key=scan_data_key, module_name=module_name, question_name=question_name, image_directory=self.image_directory, show_plots=self.show_plots, figsize=figsize, highcharts_height=highcharts_height, image_type=self.image_type, reference_lines=reference_lines, xoff=xoff, yoff=yoff, show_title=self.show_title, barh=self.barh, subplot_adjust=subplot_adjust, box_margin=box_margin, sort_values=sort_values, y_max_bar_plot=y_max, y_spacing_bar_plot=y_spacing, translations=self.translations, export_highcharts=export_highcharts, export_svg=export_svg_bar, highcharts_directory=highcharts_directory, title=title, legend_position=legend_pos, legend_max_columns=legend_max_columns, normalize_data=normalize_data, force_plot=self.force_plots, enable_highcharts_legend=plot_info.enable_highcharts_legend, unit=unit, english=self.english, bar_width=bar_width, ) _logger.debug( f"Store [{original_name}][{label}] : {image_file}" ) self.image_info.add_entry( plot_key=plot_key, plot_info=self.plot_info, image_key=original_name, section=section, file_name=image_file, sub_image_label=label, tex_right_shift=tex_horizontal_shift, ) if plot_cdf: for year in scan_data_per_year.keys(): try: scan_data_analyses_year = scan_data_per_year[year][ "analyses" ] except KeyError: _logger.info( f"Year {year} does not have data. Skipping" ) continue hist_info = scan_data_analyses_year.all_hist_per_format[ plot_key ][original_name] highcharts_info = highcharts_info_per_year[year] if hist_info is not None and isinstance( hist_info, dict ): for grp_key, hist in hist_info.items(): if hist is None: _logger.warning( f"Hist {grp_key} does not have a histogram. Skipping" ) continue im_file_2 = make_cdf_plot( hist=hist, plot_key=plot_key, scan_data_key=scan_data_key, grp_key=grp_key, module_name=module_name, question_name=question_name, image_file_base=original_name, image_directory=self.image_directory, show_plots=self.show_plots, figsize=figsize, image_type=self.image_type, reference_lines=reference_lines, cummulative=self.cumulative, xoff=xoff, yoff=yoff, y_max=y_max_pdf_plot, y_spacing=y_spacing_pdf_plot, translations=self.translations, export_highcharts=export_highcharts_cdf, export_svg=export_svg_cdf, highcharts_info=highcharts_info, title=title, year=year, ) if self.show_plots: plt.show() plot_count += 1 if self.max_plots is not None and plot_count == self.max_plots: _logger.info( f"Maximum number of plot ({self.max_plots}) reached" ) stop_plotting = True break
[docs] class PlotInfo: def __init__(self, variables_df, var_name, breakdown_name): self.variables_df = variables_df self.var_name = var_name self.breakdown_name = breakdown_name self.label = None self.directory = None self.y_max = None self.y_spacing = None self.legend_position = None self.enable_highcharts_legend = True self.bar_width = None self.get_plot_info()
[docs] def get_plot_info(self): """In de variables dataframe kunnen we ook uitdrukkelijk de highcharts directory en highcharts label opgeven per variabele. Zoek dat hier op""" label = None directory = None try: var_prop = self.variables_df.loc[self.var_name] except KeyError: _logger.debug( f"could not find variable {self.var_name} in variables dataframe" ) else: info_per_breakdown = var_prop["info_per_breakdown"] if info_per_breakdown is not None: try: info = info_per_breakdown[self.breakdown_name] except KeyError: _logger.debug( f"variable {self.var_name} does not have a breakdown defined" ) else: self.directory = info.get("highcharts_directory") if self.directory is not None: self.directory = Path(self.directory) self.label = info.get("highcharts_label") self.y_max = info.get("y_max") self.y_spacing = info.get("y_spacing") self.bar_width = info.get("bar_width") self.legend_position = get_windows_or_linux_value( info.get("legend_position") ) self.enable_highcharts_legend = info.get( "enable_highcharts_legend", True )
[docs] def add_missing_years(plot_df, years_to_plot=None, jaar_level_name="Jaar", column=None): """ Voeg missende jaren toe Args: plot_df: pd.DataFrame DataFrame om te plotetn years_to_plot: list De jaren die we willen plotten jaar_level_name: str De naam van de level= van de jaren column: str Naam van de column voor de foutmelding Returns: pd.DataFrame """ years_in_plot = plot_df.index.get_level_values(jaar_level_name) missing_years = set(years_to_plot).difference(years_in_plot) if missing_years: index_names = plot_df.index.names df = plot_df.reset_index().set_index(jaar_level_name) try: df = df.reindex(years_to_plot) except ValueError as err: _logger.warning(f"{err}. Check {column}") for column_name in index_names: if column_name == jaar_level_name: continue df[column_name] = df[column_name].pad() plot_df = df.reset_index().set_index(index_names, drop=True) return plot_df
[docs] def calculate_histogram_per_breakdown( data: DataFrame, var_key: str, df_weights: Series, n_bins: int = 100 ) -> dict: """ Bereken per breakdown van de data het histogram die hoort bij var_key Parameters ---------- data: DataFrame De data met breakdown op de index var_key: str De naam van de kolom waarvoor we de histogram gaan berekenen df_weights: Series De weegfactoren die we voor de histogram gebruiken n_bins: int Aan binnen in het histogram Returns ------- dict: De histogrammen per breakdown """ histogram_per_breakdown = dict() for grp_key, df in data.groupby(level=0): # initieer histogram voor deze breakdown met None histogram_per_breakdown[grp_key] = None try: ww = df_weights.loc[grp_key, "ratio_units"].to_numpy() except KeyError: _logger.debug("Could not get weight factors. Skip for now") continue try: dd = df.loc[grp_key, var_key].to_numpy() except KeyError: _logger.debug(f"Could not get data belonging to {var_key}. Skip for now") continue try: histogram = np.histogram( dd, weights=ww, density=False, bins=n_bins, range=(0, 100), ) except ValueError as err: _logger.debug("Fails for dicts. Skip for now") else: _logger.debug(f"Success with {var_key}") histogram_per_breakdown[grp_key] = histogram return histogram_per_breakdown