Source code for wip.datatools.ml_filters

"""
Module with functions that apply filters to the datasets, before regression.

These functions are executed inside the module `mltrainer`, before training
the regression models.

They're used to filter the datasets generated at the end of the preprocessing
pipeline to ensure the quality of the data used for training the models.

Notes
-----
These functions should be applied only once during the whole execution.
After the `mltrainer` module execution, a clean version of the `datasets`
dictionary is saved to disk.

The latter optimization step should use the clean version of the
`datasets` dictionary to build the optimization model.
It represents the data that the predictive models also used for training,
and additionally contain a more clean version of the data.
"""

from __future__ import annotations

from functools import reduce
from pathlib import Path
from typing import Dict
from typing import Hashable
from typing import List
from typing import Optional
from typing import Union

import numpy as np
import pandas as pd

from wip.constants import DATA_CHECKS_FOLDER_PATH
from wip.constants import DATASETS_CLEAN_FILEPATH
from wip.constants import DF_SQL_CLEAN_FILEPATH
from wip.datatools.data_checks import constant_tags_summary
from wip.logging_config import logger
from wip.ml_configs import LIMITES_FAIXA_700_750
from wip.ml_configs import LIMITES_FAIXA_750_PLUS
from wip.ml_configs import TAG_FILTERS
from wip.ml_configs import TAGS_TO_ADD
from wip.ml_configs import TAGS_TO_DROP


[docs]def mod_filtros( data: pd.DataFrame, model_name: str, target_column: str ) -> pd.DataFrame: """ Apply specific filters to the data based on the model name and target column. This function identifies the model name and applies the appropriate filter to the data in the target column. After applying the filter, it returns the filtered DataFrame. Parameters ---------- data : pd.DataFrame The DataFrame to be filtered. It should include the target column. model_name : str The name of the model, which determines the filter to be applied. target_column : str The name of the column in 'data' that the filter is to be applied to. This column represents the target variable, and is **usually** the last column in the DataFrame. Returns ------- pd.DataFrame The filtered DataFrame. Notes ----- This function applies model-specific filters based on the `model_name`: - "cm1": Filters rows where "corpo_moedor_especifico_1" > 0. - "torque": Filters rows where `target_column` >= 3000. - "energia_prensa": Filters rows where `target_column` >= 0.15. - "finos": Filters rows where `target_column` <= 1.5. - "compressao": Filters rows where `target_column` > 200. - Otherwise: Filters rows where `target_column` > 0. """ if model_name.startswith("cm1"): target_filter = data["corpo_moedor_especifico_1"] > 0 elif model_name == "torque": target_filter = data[target_column] >= 3000 elif model_name == "energia_prensa": target_filter = data[target_column] >= 0.15 elif model_name == "finos": target_filter = data[target_column] <= 15 elif model_name == "compressao": target_filter = data[target_column] > 200 else: target_filter = data[target_column] >= 0 return data[target_filter]
[docs]def filter_status(dataset: pd.DataFrame) -> pd.DataFrame: """ Filter out rows with a 'status' value not equal to 1 in a dataset. This function finds models associated with the 'status' tag in the provided dataset, and removes rows where 'status' isn't equal to 1. The 'status' column is then dropped from the DataFrame afterward. Parameters ---------- dataset : pd.DataFrame A dataset that contains a column named 'status' to filter. Returns ------- pd.DataFrame The transformed dataset with rows removed where 'status' != 1, and with the 'status' column dropped. Notes ----- This function uses the `query` method of pandas' DataFrame to filter rows. It only affects datasets that have the 'status' tag associated with them. """ if "status" not in dataset.columns: return dataset logger.info("Pré-remoção status zerado: %s", f"{dataset.shape[0]:,}") dataset = dataset.query("status == 1").drop(columns=["status"]) logger.info("Pós-remoção status zerado: %s", f"{dataset.shape[0]:,}") return dataset
[docs]def find_models_by_tag(tag_name: str, datasets: Dict[str, pd.DataFrame]) -> List[str]: """ Find models that have a particular tag in their corresponding DataFrame. This function iterates over the `datasets` dictionary where keys are model names and values are DataFrames. It returns a list of all models whose DataFrame contains `tag_name` as one of its columns. Parameters ---------- tag_name : str Tag (column name) to search for in the datasets. datasets : dict, optional A dictionary where keys are model names (str) and values are corresponding DataFrames (pd.DataFrame). If not specified, the default `datasets` dictionary is used. Returns ------- List[str] A list with model names which have `tag_name` in their corresponding DataFrame. Examples -------- Let's assume we have a dictionary of DataFrames `datasets`: >>> datasets = { ... 'model1': pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}), ... 'model2': pd.DataFrame({'A': [1, 2, 3], 'C': [4, 5, 6]}), ... 'model3': pd.DataFrame({'B': [1, 2, 3], 'C': [4, 5, 6]}) ... } We can find models that have column 'A': >>> print(find_models_by_tag('A', datasets)) ['model1', 'model2'] And models that have column 'C': >>> print(find_models_by_tag('C', datasets)) ['model2', 'model3'] """ return [model for model, df in datasets.items() if tag_name in df.columns]
[docs]def find_models_by_tags( tags_names: List[str], datasets: Dict[str, pd.DataFrame] ) -> List[str]: """Return the names of models that contain all given tags. Parameters ---------- tags_names : list of str The names of the tags to search for in the dataframes. datasets : dict of str, DataFrame A dictionary where keys are the names of models and values are `pandas.DataFrame` objects. It's assumed that each DataFrame has columns that could match the provided tag names. Returns ------- list of str The names of the models that contain all the given tag names in their respective DataFrame. """ return [ model for model, df in datasets.items() if all(tag_name in df.columns for tag_name in tags_names) ]
[docs]def find_models_by_partial_tag( tag_pattern: str, datasets: Dict[str, pd.DataFrame] ) -> List[str]: """Return the names of models that contain a given tag pattern in their columns. Parameters ---------- tag_pattern : str The pattern to search for in the columns of the dataframes. datasets : dict of str, DataFrame A dictionary where keys are the names of models and values are `pandas.DataFrame` objects. It's assumed that each DataFrame has columns that could match the provided tag pattern. Returns ------- list of str The names of the models that contain the given tag pattern in any of their respective DataFrame columns. """ if not isinstance(tag_pattern, str) or not tag_pattern: raise TypeError("tag_pattern must be a non-empty string") if not isinstance(datasets, dict) or not datasets: raise TypeError("datasets must be a non-empty dictionary") return [ model for model, data in datasets.items() if data.columns.astype(str).str.contains(tag_pattern).any() ]
[docs]def pattern_filter(name: str, model_name_pattern: str) -> bool: """ Determine if a `name` matches a given `model_name_pattern`. This function is designed to filter names based on a specific pattern. The pattern can start or end with an asterisk (*) which denotes a wildcard. The pattern without the wildcard should be a substring at the respective position in the name to be considered a match. Parameters ---------- name : str The name to be checked against the pattern. model_name_pattern : str The pattern to which the name should be compared. It can start or end with an asterisk (*) to denote a wildcard at the beginning or the end respectively. Returns ------- bool True if the name matches the model name pattern, otherwise False. Examples -------- >>> pattern_filter("testmodel", "test*") True >>> pattern_filter("testmodel", "*model") True >>> pattern_filter("testmodel", "model") True """ name = name.lower() model_name_pattern = model_name_pattern.lower() if model_name_pattern.endswith("*"): return name.startswith(model_name_pattern[:-1]) if model_name_pattern.startswith("*"): return name.endswith(model_name_pattern[1:]) return model_name_pattern in name
[docs]def find_models_by_partial_name( datasets: Dict[str, pd.DataFrame], model_name_pattern: str ) -> List[str]: """ Find model names in the datasets that match the provided pattern. This function takes a model name pattern as input and matches it against the model names in the provided datasets. The pattern can be a substring to match anywhere in the model name, or it can start or end with an asterisk (*) to match at the beginning or end of the model name. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys are model names and values are pandas DataFrames. Each DataFrame represents a dataset corresponding to a particular model. model_name_pattern : str A string representing the model name pattern to match against the dataset names. This can be a substring to match anywhere, or start or end with an '*' to match at the beginning or end of the model name. Returns ------- List[str] A list of model names from the datasets that match the provided pattern. Notes ----- This function converts the model names and pattern to lowercase before matching. Therefore, the matching is case-insensitive. Examples -------- >>> datasets = { ... "model1": pd.DataFrame(), ... "model2": pd.DataFrame(), ... "sample1": pd.DataFrame() ... } >>> matched_models = find_models_by_partial_name(datasets, "*1") >>> print(matched_models) ['model1', 'sample1'] """ return [ model for model in datasets.keys() if pattern_filter(model, model_name_pattern) ]
[docs]def find_models_by_partial_names( datasets: Dict[str, pd.DataFrame], model_name_patterns: List[str] ) -> List[str]: """ Find model names in the datasets that match any of the provided patterns. This function takes a list of model name patterns as input and matches each pattern against the model names in the provided datasets. The matched model names are then returned as a list. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys are model names and values are pandas DataFrames. Each DataFrame represents a dataset corresponding to a particular model. model_name_patterns : List[str] A list of strings representing the model name patterns to match against the dataset names. Each pattern can be a substring to match anywhere, or start or end with an '*' to match at the beginning or end of the model name. Returns ------- List[str] A list of model names from the datasets that match any of the provided patterns. Notes ----- This function uses the `find_models_by_partial_name` function to match each pattern against the model names. It then eliminates any duplicates from the matched names. Examples -------- >>> datasets = { ... "model1": pd.DataFrame(), ... "model2": pd.DataFrame(), ... "sample1": pd.DataFrame() ... } >>> matched_models = find_models_by_partial_names(datasets, ["*1", "*2"]) >>> print(matched_models) ['model1', 'sample1', 'model2'] """ return list( { model_name for model_name_pattern in model_name_patterns for model_name in find_models_by_partial_name(datasets, model_name_pattern) } )
[docs]def filter_quantile( data: pd.DataFrame, tag_name: str, low_bound: float = 0.02, up_bound: float = 0.98 ) -> pd.DataFrame: """ Filter a DataFrame based on the quantiles of a specific column. This function returns a subset of the given DataFrame `df` such that the values of the column with the name `tag_name` lie between the `low_bound` and `up_bound` percentiles of the original DataFrame. Parameters ---------- data : pd.DataFrame DataFrame to be filtered. tag_name : str Name of the column based on whose quantiles the DataFrame is to be filtered. low_bound : float, optional Lower percentiles value, between 0 and 1 (default is 0.02). up_bound : float, optional Upper percentiles value, between 0 and 1 (default is 0.98). Returns ------- pd.DataFrame DataFrame after being filtered based on the percentiles of the column `tag_name`. Examples -------- Suppose we have a DataFrame `df`: >>> df = pd.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]}) >>> print(df) A B 0 1 10 1 2 20 2 3 30 3 4 40 4 5 50 >>> filtered_df = filter_quantile(df, 'A', 0.25, 0.75) >>> print(filtered_df) A B 1 2 20 2 3 30 3 4 40 """ pandas_series = data[tag_name] q_low = pandas_series.quantile(low_bound) q_hi = pandas_series.quantile(up_bound) return data[(pandas_series >= q_low) & (pandas_series <= q_hi)]
[docs]def filter_quantile_datasets( datasets: Dict[str, pd.DataFrame], tag_name: str, low_bound: float | None = None, up_bound: float | None = None, ) -> Dict[str, pd.DataFrame]: """ Filter all DataFrames in a dictionary based on the quantiles of a column. This function applies the `filter_quantile` function to each DataFrame in the `datasets` dictionary where the column with the name `tag_name` exists. The updated DataFrames are then returned in a dictionary with the same structure as `datasets`. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys are model names (str) and values are corresponding DataFrames (pd.DataFrame). tag_name : str Name of the column based on whose quantiles the DataFrames are to be filtered. low_bound : float | None Lower quantile value, between 0 and 1 (default is 0). up_bound : float | None Upper quantile value, between 0 and 1 (default is 1). Returns ------- Dict[str, pd.DataFrame] Dictionary of DataFrames after being filtered based on the quantiles of the column `tag_name`. Examples -------- Let's assume we have a dictionary of DataFrames `datasets`: >>> datasets = { ... 'model1': pd.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]}), ... 'model2': pd.DataFrame({'A': [6, 7, 8, 9, 10], 'B': [60, 70, 80, 90, 100]}) ... } Let's filter `datasets` based on the 0.25 and 0.75 quantiles of column 'A': >>> filtered_datasets = filter_quantile_datasets(datasets, 'A', 0.25, 0.75) >>> for model_name, df in filtered_datasets.items(): ... print(f"Model: {model_name}") ... print(df) ... Model: model1 A B 1 2 20 2 3 30 3 4 40 Model: model2 A B 1 7 70 2 8 80 3 9 90 """ low_bound = low_bound or 0 up_bound = up_bound or 1 for model_name in find_models_by_tag(tag_name, datasets): datasets[model_name] = filter_quantile( datasets[model_name], tag_name, low_bound, up_bound ) return datasets
[docs]def filter_tag( datasets: Dict[str, pd.DataFrame], tag_name: str, low_bound=None, up_bound=None, ) -> Dict[str, pd.DataFrame]: """Filter all DataFrames in a dictionary based on the quantiles of a column. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys are model names (str) and values are corresponding. tag_name : str Name of the column based on whose quantiles the DataFrames are to be filtered. low_bound : float, optional Lower quantile value, between 0 and 1. up_bound : float, optional Upper quantile value, between 0 and 1. Returns ------- Dict[str, pd.DataFrame] Dictionary of DataFrames after being filtered based on the quantiles of the column `tag_name`. """ if not any([low_bound, up_bound]): raise ValueError( "At least one of `low_bound` and `up_bound` must be specified." ) for model_name in find_models_by_tag(tag_name, datasets): pandas_df = datasets[model_name] if low_bound: pandas_df = pandas_df[pandas_df[tag_name] >= low_bound] if up_bound: pandas_df = pandas_df[pandas_df[tag_name] <= up_bound] datasets[model_name] = pandas_df return datasets
[docs]def datasets_filter( datasets: Dict[str, pd.DataFrame], tags_limits: Dict[str, Dict[str, float | bool]], ) -> Dict[str, pd.DataFrame]: """ Filter or replace values from DataFrames in `datasets` using tags limits. This function iterates over specified tags and their corresponding limits, and either filters or replaces rows in each DataFrame of the input datasets. The rows retained are those for which the value of each specified tag is within the specified quantile or value limits. This operation is performed in-place, and the function returns the modified datasets. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary containing model names as keys and associated DataFrames as values. The DataFrames should contain columns corresponding to the keys in tags_limits. tags_limits : Dict[str, Dict[str, float]] Dictionary with tag names as keys. The associated value for each key is another dictionary with the following possible keys: - "min": lower bound to use for filtering data. - "max": upper bound to use for filtering data. - "replace": whether function should filter or replace values. - "quantile": whether "min" and "max" keys are percentiles or actual bounds. - "mode": when "replace" is set to True, this key determines the operation that will be used to calculate the replacement value to use. Possible options are: "mean", "median", "max", "min", "mode", "i%", where `i` is a number between 0 and 100 and "i%" represents a percentile. For example: .. code-block:: python tags_limits = { "tag1": {"min": 0.25, "max": 0.75, "quantile": True}, "tag2": {"min": 5, "quantile": False}, "tag3": {"max": 45, "quantile": False} "tag4": {"max": 45, "quantile": False, "replace": True}, "tag5": {"max": 45, "replace": True}, "tag6": {"max": 45, "replace": True, "mode": "median"}, "tag7": {"max": 45, "replace": True, "mode": "50%"}, } Returns ------- Dict[str, pd.DataFrame] Dictionary containing the modified DataFrames with rows either filtered or replaced based on the specified `tags_limits` dictionary. Examples -------- Consider a scenario with datasets containing two DataFrames 'model1' and 'model2', and we want to filter rows based on quantiles for tags 'tag1' and 'tag2': >>> import pandas as pd >>> data1 = {'tag1': [1, 2, 3], 'tag2': [4, 5, 6]} >>> data2 = {'tag1': [7, 8, 9], 'tag2': [10, 11, 12]} >>> datasets = {'model1': pd.DataFrame(data1), 'model2': pd.DataFrame(data2)} >>> tags_limits = { ... 'tag1': {'min': 0.10, 'max': 0.90}, ... 'tag2': {'max': 11, 'quantile': False} ... } >>> print( ... { ... model_name: df.to_dict('list') # noqa ... for model_name, df in datasets_filter(datasets, tags_limits).items() ... } ... ) {'model1': {'tag1': [2], 'tag2': [5]}, 'model2': {'tag1': [8], 'tag2': [11]}} In this example, `filtered_datasets` will contain 'model1' and 'model2' DataFrames with rows filtered according to the quantile limits for 'tag1' and 'tag2'. """ for tag_name, quantile_limit in tags_limits.items(): upper_bound = quantile_limit.get("max", None) lower_bound = quantile_limit.get("min", None) quantile = quantile_limit.get("quantile", True) replace = quantile_limit.get("replace", False) mode = quantile_limit.get("mode", "mean") if replace: datasets = replace_values_datasets( datasets, tag_name, mode, lower_bound, upper_bound, quantile ) elif quantile: datasets = filter_quantile_datasets( datasets, tag_name, lower_bound, upper_bound ) else: datasets = filter_tag(datasets, tag_name, lower_bound, upper_bound) return datasets
[docs]def non_zero_filter( datasets: Dict[str, pd.DataFrame], tag_names: List[str], ) -> Dict[str, pd.DataFrame]: """ Filter out rows with non-positive values for specified tags in datasets. This function traverses through the datasets and filters out the rows where the values corresponding to the specified tag names are non-positive. This operation is performed in-place, meaning the original DataFrames in the datasets are modified. The function returns the modified datasets. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary containing model names as keys and corresponding DataFrames as values. The function expects the DataFrames to have columns corresponding to the tag_names provided. tag_names : List[str] List of strings representing the tag names to be considered for filtering rows in the DataFrames. Rows with non-positive values for these tags will be filtered out. Returns ------- Dict[str, pd.DataFrame] Dictionary containing the modified DataFrames with rows having non-positive values for the specified tags filtered out. Notes ----- The `find_models_by_tag` function is used internally to determine which models (i.e., keys of the `datasets` dictionary) are associated with each tag name. This implies that `find_models_by_tag` needs to be correctly implemented and available in the scope of this function. Examples -------- Suppose we have a dataset dictionary with two DataFrames 'model1' and 'model2', and we want to filter out rows with non-positive values for the tags 'tag1' and 'tag2': >>> import pandas as pd >>> data1 = {'tag1': [1, -1, 3], 'tag2': [4, 5, 6]} >>> data2 = {'tag1': [0, 2, 3], 'tag2': [-1, 8, 9]} >>> datasets = {'model1': pd.DataFrame(data1), 'model2': pd.DataFrame(data2)} >>> tag_names = ['tag1', 'tag2'] >>> filtered_datasets = non_zero_filter(datasets, tag_names) Now, `filtered_datasets` will contain the modified 'model1' and 'model2' DataFrames with non-positive values for 'tag1' and 'tag2' filtered out. """ for tag_name in tag_names: for model_name in find_models_by_tag(tag_name, datasets): data = datasets[model_name] datasets[model_name] = data[data[tag_name] > 0] return datasets
[docs]def filter_inf(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]: """Filter out rows with any value equal to `np.inf` or `-np.inf`. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary with dataframes to filter row values equal to infinity. Returns ------- Dict[str, pd.DataFrame] Dictionary of dataframes without rows with values equal to infinity. """ filtered_datasets = {} for dataset_name, dataset in datasets.items(): inf_mask = (dataset == np.inf) | (dataset == -np.inf) filtered_dataset = dataset[~inf_mask.any(axis=1)] filtered_datasets[dataset_name] = filtered_dataset return filtered_datasets
[docs]def clip_negative_values( datasets: Dict[str, pd.DataFrame], negative_rate_limit: float = 0.05, verbose: bool = True, ) -> Dict[str, pd.DataFrame]: """Clip negative values in datasets. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary of datasets to clip negative values. negative_rate_limit : float, optional Maximum rate of negative values in a column to be clipped, by default 0.05. In other words, if less than 5% of the values in a column are negative, they'll be clipped to 0. verbose : bool, default=True If `True`, prints the name of the column being clipped. Returns ------- Dict[str, pd.DataFrame] Dictionary of datasets with negative values clipped. """ for model_name, data in datasets.items(): target_name = data.columns[-1] row_count = data.shape[0] for column in data.columns: negative_count = (data[column] < 0).sum() if negative_count > 0 and negative_count / row_count <= negative_rate_limit: if verbose: logger.warning( "%s: %s - clipping negative values.", model_name, column ) data.loc[data[column] < 0, column] = 0 target_values = data.pop(target_name) data[target_name] = target_values datasets[model_name] = data return datasets
[docs]def replace_values_by_mean( datasets: Dict[str, pd.DataFrame], tag_names: List[str], lower_bound: Union[float, int], ) -> Dict[str, pd.DataFrame]: """ Replace values in specified columns of DataFrames with the column mean. For each name in tag_names, this function identifies the associated models and replaces the columns values, which are below a given `lower_bound` or are `NaN`, with the mean of the non-NaN values in the respective column. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary containing DataFrames with different model names as keys. tag_names : List[str] A list of strings representing the names of the columns in the DataFrames to be processed. lower_bound : Union[float, int] The lower bound threshold value. If a value in the specified column is below this value or is NaN, it will be replaced by the average non-NaN values of that column. Returns ------- Dict[str, pd.DataFrame] A dictionary containing the modified DataFrames. Notes ----- The function modifies the input DataFrames in-place but also returns the modified DataFrames in a dictionary. The `find_models_by_tag` is a function that should be defined elsewhere in your code, which searches for models in the provided datasets using the tag and returns the names of found models. Examples -------- >>> datasets = {"model1": pd.DataFrame({"tag1": [1, 2, np.nan, 4]})} >>> modified_datasets = replace_values_by_mean(datasets, ["tag1"], 2) >>> modified_datasets["model1"]["tag1"].tolist() [2.3333333333333335, 2, 2.3333333333333335, 4] """ for tag_name in tag_names: for model_name in find_models_by_tag(tag_name, datasets): data = datasets[model_name] data.loc[ (data[tag_name] <= lower_bound) | (data[tag_name].isna()), tag_name ] = data[tag_name].mean() datasets[model_name] = data return datasets
[docs]def add_tags_to_models( ds: Dict[str, pd.DataFrame], df_sql: pd.DataFrame, tags_to_add: Dict[str, Union[str, list]], ) -> Dict[str, pd.DataFrame]: """Add tags from the primary model to other specified models. Parameters ---------- ds : Dict[str, pd.DataFrame] Dictionary of dataframes representing models. df_sql : pd.DataFrame A `pandas.Dataframe` with all available tags. tags_to_add : Dict[str, Union[str, list]] Dictionary where keys are tag names and values are lists of model names to which the tag should be added. See function notes for more details. Returns ------- Dict[str, pd.DataFrame] Updated dictionary of dataframes. Notes ----- The `tags_to_add` dictionary needs to comply with the following structure: .. code-block:: python tags_to_add = { "<TAG-NAME>": ["<MODEL-NAME-1>", "<MODEL-NAME-2>", ...], "<TAG-NAME>": ["<MODEL-NAME-1>"], ... } Where: - `<TAG-NAME>` is the name of the tag to add to the models. - `<MODEL-NAME-1>`, `<MODEL-NAME-2>`, etc. are the names of the models to add the tag to. This function respects the dataset column order, as the last column needs to contain the target variable. """ for tag_name, models_to_add in tags_to_add.items(): # Convert string to list if needed models_to_add = ( [models_to_add] if isinstance(models_to_add, str) else models_to_add ) # Find primary model for the tag from_model = df_sql[tag_name] # Add tag to specified models for model_to_add in models_to_add: if tag_name in ds[model_to_add].columns: continue ds[model_to_add] = ds[model_to_add].merge( from_model, left_index=True, right_index=True, how="inner", )[[tag_name, *ds[model_to_add].columns]] return ds
[docs]def fill_empty_columns(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]: """ Fill DataFrame columns that have only `NaN` values with zeros. For each DataFrame in the provided dictionary, this function identifies columns that have `NaN` values for all rows and replaces them with zeros. A log is generated detailing which columns in which DataFrames were altered. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary with keys as model names (or any descriptive string) and values as pandas DataFrames. Each DataFrame is checked for columns that are entirely `NaN` and these columns are filled with zeros. Returns ------- Dict[str, pd.DataFrame] Dictionary of DataFrames, identical to the input but with columns that were entirely `NaN` filled with zeros. Notes ----- This function modifies the input DataFrames in place but also returns the modified DataFrames for convenience. Logging is performed for tracking which DataFrames (by their keys in the input dictionary) had columns filled with zeros. Examples -------- >>> import pandas as pd >>> datasets = {"modelA": pd.DataFrame({"A": [1, 2], "B": [None, None]}), ... "modelB": pd.DataFrame({"A": [None, None], "B": [3, 4]})} >>> filled_datasets = fill_empty_columns(datasets) >>> filled_datasets["modelA"] A B 0 1 0.0 1 2 0.0 """ for model_name, data in datasets.items(): empty_columns = [col for col in data.columns if data[col].isna().all()] if empty_columns: logger.info( "Filling with 0 the following empty columns from %s: %s", model_name, empty_columns, ) data.loc[:, empty_columns] = 0 datasets[model_name] = data return datasets
[docs]def drop_tags( datasets: Dict[str, pd.DataFrame], tags_to_drop: List[str | Dict[str, List[str]]] ) -> Dict[str, pd.DataFrame]: """ Drop specified columns from DataFrames unless they're the target column. This function iterates through a list of tags and drops the corresponding columns from the specified DataFrames in the input dictionary. If the tag is the target column (last column) of a DataFrame, it won't be dropped and a warning is logged. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary with keys as model names (or any descriptive string) and values as pandas DataFrames. Each DataFrame is processed to drop the specified tags unless they are the target column. tags_to_drop : List[str | Dict[str, List[str]]] List of column names (tags) to be dropped from the DataFrames, or dictionary with the tag as the key and a list of models to drop the tag from as values. Returns ------- Dict[str, pd.DataFrame] Dictionary of DataFrames, identical to the input, but with the specified tags dropped, unless they were the target column. """ for tag_name in tags_to_drop: if isinstance(tag_name, dict): model_names = list(*tag_name.values()) tag_name = list(tag_name.keys())[0] else: model_names = find_models_by_tag(tag_name, datasets) for model_name in model_names: target_name = datasets[model_name].columns[-1] if target_name != tag_name: datasets[model_name] = datasets[model_name].drop(columns=[tag_name]) else: logger.warning( "Tag %s is the target of model %s, won't be dropped.", tag_name, model_name, ) return datasets
[docs]def fill_all_missing_columns(dataset: pd.DataFrame) -> pd.DataFrame: """Fill columns that contain only `NaN` values with zeros. Parameters ---------- dataset : pd.DataFrame DataFrame to be processed. Returns ------- pd.DataFrame DataFrame with columns that only contain `NaN` values filled with zeros. """ all_missing_columns = [col for col in dataset.columns if dataset[col].isna().all()] if all_missing_columns: logger.warning( "Filling with 0 the following empty columns: %s", all_missing_columns ) dataset.loc[:, all_missing_columns] = 0 return dataset
[docs]def fix_pres_tag(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]: """Fix the "PRES6_I@08QU-HO-851I-01" tag.""" tag_name = "PRES6_I@08QU-HO-851I-01" model_names = find_models_by_tag(tag_name, datasets) name = max(model_names, key=lambda model: datasets[model][tag_name].notnull().sum()) for model_name in set(model_names) - {name}: target_name = datasets[model_name].columns[-1] target = datasets[model_name].pop(target_name) new_data = ( datasets[model_name] .drop(columns=[tag_name]) .merge( datasets[name][tag_name], left_index=True, right_index=True, how="left", ) ) new_data[target_name] = target datasets[model_name] = new_data return datasets
[docs]def get_replacement_value(filtered_data: pd.Series, mode: str = "mean") -> float: """ Determine the replacement value based on a given `mode`. This function calculates the replacement value from the `filtered_data` series based on the specified mode. The `mode` can be: - "median": Median of the filtered values. - "mode": Mode of the filtered values. - "min": Minimum of the filtered values. - "max": Maximum of the filtered values. - "i%" where i is a number between 0 and 100 that represents the percentile of the filtered values to use as the replacement value. For example, `"75%"` will return the 75th percentile of the filtered values. Parameters ---------- filtered_data : pd.Series The data series from which the replacement value is to be calculated. mode : str {"mean", "median", "min", "max", "i%"}, default="mean" The method used to determine the replacement value. Returns ------- float The calculated replacement value. Raises ------ ValueError If the provided `mode` is not one of the recognized modes, or if an invalid percentile value is given. """ if mode == "min": return filtered_data.min() if mode == "max": return filtered_data.max() if mode == "mean": return filtered_data.mean() if mode == "median": return filtered_data.median() if "%" in mode: percentile = float(mode.replace('%', '')) / 100 if 0 <= percentile <= 100: return filtered_data.quantile(q=percentile) raise ValueError(f"Invalid percentile value: {mode}") raise ValueError(f"Invalid mode '{mode}' provided.")
[docs]def replace_values( dataset: pd.DataFrame, column: str, mode: str = "mean", low_bound: float | int | None = None, up_bound: float | int | None = None, ) -> pd.DataFrame: """ Replace values in a dataset column based on specified bounds. For the values in the specified column of the dataset that fall outside the range defined by `low_bound` and `up_bound`, this function replaces them with a value determined by the `mode` parameter. The `mode` parameter determines the operation performed on the filtered out values. For example, if `mode` is set to "mean", the mean of the filtered values is used as the replacement value. Parameters ---------- dataset : pd.DataFrame The dataset in which values are to be replaced. column : str The name of the column in the dataset where values are to be replaced. mode : str {"mean", "median", "min", "max", "i%"}, default="mean" The method used to determine the replacement value. Other potential methods could be: - "median": Median of the filtered values. - "mode": Mode of the filtered values. - "min": Minimum of the filtered values. - "max": Maximum of the filtered values. - "i%" where i is a number between 0 and 100 that represents the percentile of the filtered values to use as the replacement value. low_bound : float | int | None, optional The lower bound for the range of acceptable values in the column. If None, no lower bound is set. At least one of the parameters `low_bound` or `up_bound` must be set. up_bound : float | int | None, optional The upper bound for the range of acceptable values in the column. If None, no upper bound is set. At least one of the parameters `low_bound` or `up_bound` must be set. Returns ------- pd.DataFrame The dataset with values in the specified column replaced according to the given conditions. Raises ------ ValueError If neither `low_bound` nor `up_bound` is set. TypeError If `column` is not of type string or hashable. KeyError If `column` does not exist inside the dataset. """ if not any([low_bound, up_bound]): raise ValueError( "No value for either `low_bound` nor `up_bound` is set. " "At least one of the parameters must be set to a numeric value." ) if not isinstance(column, Hashable): raise TypeError( f"Parameter column of invalid type: {type(column)}. " "`column` needs to be of type string or hashable." ) if column not in dataset.columns: raise KeyError(f"Column {column!r} does not exist inside dataset.") # Apply the filter conditions if low_bound is None: dataset_filters = dataset[column] <= up_bound elif up_bound is None: dataset_filters = dataset[column] >= low_bound else: dataset_filters = (dataset[column] <= up_bound) & (dataset[column] >= low_bound) # Get the replacement value based on mode filtered_data = dataset.loc[dataset_filters, column] fill_value = get_replacement_value(filtered_data, mode) # Replace values outside the bounds dataset.loc[~dataset_filters, column] = fill_value return dataset
[docs]def replace_values_datasets( datasets: Dict[str, pd.DataFrame], tag_name: str, mode: str = "mean", low_bound: float | None = None, up_bound: float | None = None, is_quantile: bool = False, ) -> Dict[str, pd.DataFrame]: """ Replace values in specified column of each DataFrame from a dictionary. This function iterates over the models that have the specified tag in their DataFrame, retrieves the dataset for each model, and calls the `replace_values` function to replace the values in the specified column based on the provided bounds and mode. The function then updates the dictionary of datasets with the modified DataFrame for each model. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary containing model names as keys and associated DataFrames as values. tag_name : str The name of the column in the dataset where values are to be replaced. mode : str {'mean', 'i%', 'min', 'max', 'median', 'mode'}, default='mean' The method used to determine the replacement value. low_bound : float | None The lower bound for the range of acceptable values in the column. up_bound : float | None The upper bound for the range of acceptable values in the column. is_quantile : bool, default=False Indicates whether the bounds are quantiles or actual bounds. Returns ------- The dictionary containing the modified DataFrames with values replaced in the specified column based on the provided `bounds` and `mode`. """ for model_name in find_models_by_tag(tag_name, datasets): dataset = datasets[model_name] _low_bound, _up_bound = low_bound, up_bound if is_quantile: if low_bound is not None: _low_bound = dataset[tag_name].quantile(low_bound) if up_bound is not None: _up_bound = dataset[tag_name].quantile(up_bound) datasets[model_name] = replace_values( datasets[model_name], tag_name, mode, _low_bound, _up_bound ) return datasets
[docs]def filter_temperature_groups( datasets: Dict[str, pd.DataFrame], df_sql: pd.DataFrame, reduced_limits: Optional[Dict[str, Dict[str, float]]] = None, normal_limits: Optional[Dict[str, Dict[str, float]]] = None, ) -> Dict[str, pd.DataFrame]: """ Replace values from temperature groups tags outside its safety limits. For each production range, this function determines which values from a given tag are outside its established safety limits. Then it replaces these values with the average of all values that are within the safety limits for that production range for that tag. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary with dataframes to filter temperature groups. df_sql : pd.DataFrame A `pandas.Dataframe` with all available tags. reduced_limits : Optional[Dict[str, Dict[str, float]]] The reduced production range temperature limits. These limits are applied for rows where the column `'PROD_PQ_Y@08US' is between 700 and 750. normal_limits : Optional[Dict[str, Dict[str, float]]] The normal production range temperature limits. These limits are applied for rows where the column `'PROD_PQ_Y@08US' is between 750 and 1000. Returns ------- Dict[str, pd.DataFrame] Updated dictionary of dataframes. Raises ------ ValueError If for a given tag and production range, no values within the safety limits exist. If this occurs, then the average value calculated will return NaN, which can't be used to replace the values outside the safety limits. """ new_datasets = {} reduced_limits = reduced_limits or LIMITES_FAIXA_700_750 normal_limits = normal_limits or LIMITES_FAIXA_750_PLUS for name, df in datasets.items(): if df.columns.intersection(list(reduced_limits.keys())).size == 0: new_datasets[name] = df continue remove_prod_pq = False if "PROD_PQ_Y@08US" not in df.columns: df = df.merge( df_sql["PROD_PQ_Y@08US"], left_index=True, right_index=True, how="inner" ) remove_prod_pq = True # ldf = Low production range values (PROD_PQ_Y@08US < 750) # rdf = High production range values (PROD_PQ_Y@08US >= 750) # Values with PROD_PQ_Y@08US < 700 or >= 1000 are not considered # and therefore, removed from the set of values to be used for the # training of the regression models or creation of the optimization # problem. ldf = df.loc[(df["PROD_PQ_Y@08US"] >= 700) & (df["PROD_PQ_Y@08US"] < 750), :] rdf = df.loc[(df["PROD_PQ_Y@08US"] >= 750) & (df["PROD_PQ_Y@08US"] < 1000), :] for col, limits in normal_limits.items(): reduced_limit = reduced_limits[col] if col in df.columns: # Calculate the average value within the tag # safety limits, for production range smaller than 750 ton/hour. ldf_values_inside_limits = ldf.loc[ (ldf[col] < reduced_limit["max"]) & (ldf[col] >= reduced_limit["min"]), col, ] avg_value_ldf = ldf_values_inside_limits.mean() # Calculate the average value within the tag # safety limits, for production range greater than 750 ton/hour. rdf_values_inside_limits = rdf.loc[ (rdf[col] < limits["max"]) & (rdf[col] >= limits["min"]), col ] avg_value_rdf = rdf_values_inside_limits.mean() if pd.isna(avg_value_ldf) or ldf_values_inside_limits.shape[0] <= int( 0.20 * ldf.shape[0] ): logger.warning( "Column '%s' for production range '<750' very few " "values within %s <= col <= %s. Using mean value between " "the limits lower- and upper-bounds instead.", col, reduced_limit["min"], reduced_limit["max"], ) avg_value_ldf = np.mean( [reduced_limit["max"], reduced_limit["min"]] ) if pd.isna(avg_value_rdf) or rdf_values_inside_limits.shape[0] <= int( 0.10 * rdf.shape[0] ): logger.warning( "Column '%s' for production range '≥750' very few " "values within %s <= col <= %s. Using mean value between " "the limits lower- and upper-bounds instead.", col, limits["min"], limits["max"], ) avg_value_rdf = np.mean([limits["max"], limits["min"]]) ldf.loc[ (ldf[col] >= reduced_limit["max"]) | (ldf[col] < reduced_limit["min"]), col, ] = avg_value_ldf rdf.loc[ (rdf[col] >= limits["max"]) | (rdf[col] < limits["min"]), col ] = avg_value_rdf df = pd.concat([ldf, rdf]).sort_index() if remove_prod_pq: df = df.drop(columns=["PROD_PQ_Y@08US"], errors="ignore") new_datasets[name] = df return new_datasets
[docs]def check_adjacent_temp_group_column_names( temp_col_left: str, temp_col_right: str ) -> bool: """ Check if the last two characters of the provided column names are integers. This function checks if the last two characters of the provided column names are integers and if the integer in the right column name is one greater than the integer in the left column name. The function returns `True` if the conditions are met, and `False` otherwise. Parameters ---------- temp_col_left : str The left column name. temp_col_right : str The right column name. Returns ------- bool `True` if the last two characters of the provided column names are integers and if the integer in the right column name is one greater than the integer in the left column name. `False` otherwise. Raises ------ ValueError If the last two characters of the provided column names are not integers. Examples -------- >>> check_adjacent_temp_group_column_names('TEMP01', 'TEMP02') True >>> check_adjacent_temp_group_column_names('TEMP01', 'TEMP03') False >>> check_adjacent_temp_group_column_names('TEMP01', 'TEMP1') Traceback (most recent call last): ... ValueError: The values found in the last two ... temp_col_left: TEMP01, ... """ try: temp_col_left_int = int(temp_col_left[-2:]) temp_col_right_int = int(temp_col_right[-2:]) return temp_col_left_int + 1 == temp_col_right_int except ValueError as exc: raise ValueError( "The values found in the last two characters from `temp_col_left_int` " "and `temp_col_right` are not integers. " f"temp_col_left: {temp_col_left}, temp_col_right: {temp_col_right}" ) from exc
[docs]def filter_bentonita_datasets(datasets: dict) -> dict: datasets_new = {} for name, df in datasets.items(): if "bentonita" in df.columns: nrows = df.shape[0] df = df.loc[~((df["bentonita"] > 0) & (df["bentonita"] < 5)), :] new_nrows = df.shape[0] removed_rows = nrows - new_nrows print( f"{name} - Removed a total of: {removed_rows:,} | New rows quantity: {new_nrows:,}" ) datasets_new[name] = df return datasets_new
[docs]def get_production(datasets: Dict[str, pd.DataFrame]) -> pd.Series: """ Extract unique production PQ indexes within a specified range from datasets. This function sifts through a collection of datasets to identify and operate on those that include the 'PROD_PQ_Y@08US' column. It applies a filtering based on the specified minimum and maximum range values, `range_min` and `range_max`, on the 'PROD_PQ_Y@08US' column to select relevant rows. The indexes of these rows are then gathered across all qualifying datasets, consolidated into a single series, deduplicated, and returned as a pandas DatetimeIndex. Parameters ---------- datasets : Dict[str, pd.DataFrame] A mapping of dataset names (strings) to pandas DataFrame objects. Each DataFrame contains one or more datasets with various columns, of which 'PROD_PQ_Y@08US' is targeted for filtering. Returns ------- pd.Series A DatetimeIndex composed of unique indexes from all rows across the filtered datasets where the 'PROD_PQ_Y@08US' values lie within the specified range. """ indexes = [] for name in [ name for name, df in datasets.items() if "PROD_PQ_Y@08US" in df.columns ]: indexes.append(datasets[name]["PROD_PQ_Y@08US"]) return pd.concat(indexes).drop_duplicates()
[docs]def get_production_pc(datasets: Dict[str, pd.DataFrame]) -> pd.Series: """ Extract unique production PC indexes from multiple datasets. This function iterates through a dictionary of datasets, filtering for those that contain the `'PROD_PC_I@08US'` column. It then filters these datasets based on a minimum value (`range_min`) and extracts their indexes. These indexes are then concatenated, deduplicated, and returned as a pandas DatetimeIndex object. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys are dataset names (str) and values are pandas DataFrames. Each DataFrame should represent a different dataset. Returns ------- pd.Series A DatetimeIndex containing unique indexes from all filtered datasets where the 'PROD_PC_I@08US' values are equal to or greater than `range_min`. See Also -------- pd.DataFrame.loc : Access a group of rows and columns by the label(s) or a boolean array. pd.concat : Concatenate pandas objects along a particular axis with optional set logic along the other axes. wip.otm.build_restrictions : Build optimization problem restrictions. """ indexes = [] for name in [ name for name, df in datasets.items() if "PROD_PC_I@08US" in df.columns ]: indexes.append(datasets[name]["PROD_PC_I@08US"]) return pd.concat(indexes).drop_duplicates()
[docs]def clean_data( datasets: Dict[str, pd.DataFrame], df_sql: pd.DataFrame, skip_transformations: bool = False, datasets_filepath: str = DATASETS_CLEAN_FILEPATH, df_sql_filepath: str = DF_SQL_CLEAN_FILEPATH, constant_tags_summary_filepath: str | None = None, **kwargs, ) -> Dict[str, pd.DataFrame]: """ Clean and transform input datasets according to specified rules. This function performs a series of cleaning and transformation operations on given datasets and an SQL data frame. It can optionally skip transformations, apply specific filters, drop or add tags based on configurations, and output the processed datasets to a specified path. It also generates a constant tags summary report. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys are model names and values are their corresponding DataFrame. df_sql : pd.DataFrame A `pandas.DataFrame` containing SQL data relevant to the models in the datasets. skip_transformations : bool, optional Flag to skip all transformation operations, default is False. datasets_filepath : str, optional File path for saving the cleaned datasets, default is DATASETS_CLEAN_FILEPATH. df_sql_filepath : str, optional File path for saving the cleaned SQL DataFrame, default is DF_SQL_CLEAN_FILEPATH. constant_tags_summary_filepath : str or None, optional File path for saving the constant tags summary reports, default is None, which sets it to `DATA_CHECKS_FOLDER_PATH/constant_tags_summary.xlsx`. **kwargs : dict Additional keyword arguments for cleaning configurations. Supported keywords: - drop_models: List of model names to be dropped from datasets. - tags_to_drop: List of tags to be removed from the datasets. - tags_to_add: List of tags to be added from the SQL data to the datasets. - tag_filters: List of tag filters to apply to the datasets. Returns ------- Dict[str, pd.DataFrame] The cleaned and optionally transformed datasets as a dictionary. Raises ------ Exception If an error occurs during the generation of the constant tags summary report. See Also -------- cleaner, drop_tags, add_tags_to_models, filter_inf, clip_negative_values, datasets_filter, fix_pres_tag, fill_empty_columns, filter_temperature_groups : Functions used for data cleaning and transformations. Notes ----- The cleaning process involves dropping specified models, removing or adding tags, applying various data filters, and handling specific cleaning tasks like clipping negative values or filtering based on temperature groups. """ from wip.datatools import cleaner from wip.datatools import to_joblib from wip.temporary import filter_corpo_moedor_especifico if constant_tags_summary_filepath is None: constant_tags_summary_filepath = Path(DATA_CHECKS_FOLDER_PATH).joinpath( "constant_tags_summary.xlsx" ) drop_models = kwargs.pop("drop_models", ["temp_precipitador", "torque"]) tags_to_drop = kwargs.pop("tags_to_drop", TAGS_TO_DROP) tags_to_add = kwargs.pop("tags_to_add", TAGS_TO_ADD) tag_filters = kwargs.pop("tag_filters", TAG_FILTERS) for model in drop_models: datasets.pop(model, None) if skip_transformations: return datasets datasets = drop_tags(datasets, tags_to_drop) datasets = add_tags_to_models(datasets, df_sql, tags_to_add) # List of columns that contain less than or equal to 2 unique values. # These columns will be skipped from the outlier removal process to ensure # that one of the two unique values won't be replaced by the other value. # The function `auto_clean_datasets` might replace one of the values from # these binary columns if there's a big imbalance between the two classes # of values. datasets = cleaner(datasets) datasets = filter_corpo_moedor_especifico(datasets) datasets = filter_inf(datasets) datasets = clip_negative_values(datasets) datasets = datasets_filter(datasets, tag_filters) datasets = fix_pres_tag(datasets) datasets = fill_empty_columns(datasets) datasets = filter_temperature_groups(datasets, df_sql) to_joblib(datasets, datasets_filepath) to_joblib(df_sql, df_sql_filepath) try: constant_tags_summary(df_sql, datasets, constant_tags_summary_filepath, None) except Exception as exc: logger.exception(exc) logger.error( "Failed to save constant_tags_summary report to %s.", constant_tags_summary_filepath, ) return datasets
[docs]def get_most_recent_values( datasets: Dict[str, pd.DataFrame], column: str, faixa: str | None = None, n_most: int = 15, ) -> pd.Series: """ Get the `n_most` recent records of `column` for production range `faixa`. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary containing the data from all models datasets. column : str Column to retrieve the last values from. faixa : str | None, default=None Which production ranges to retrieve the `n_most` records of `column` from. If None, retrieve the last `n_most` records, not considering a production range. n_most : int default=15 How many records to retrieve for `column`. Returns ------- pd.Series A `pandas.Series` with the `n_most` records for `column` that have `faixa` production range. """ prod_pq = "PROD_PQ_Y@08US" series_obj = get_column_values_from_datasets(column, datasets) if faixa is None: return series_obj[column].tail(n_most) prod_pq_series = get_column_values_from_datasets(prod_pq, datasets) lb_faixa, ub_faixa = tuple(map(int, faixa.split("-"))) prod_pq_series = prod_pq_series.loc[ (prod_pq_series[prod_pq] >= lb_faixa) & (prod_pq_series[prod_pq] < ub_faixa), :, ] if column != prod_pq: series_obj = series_obj.merge( prod_pq_series, left_index=True, right_index=True, how="inner" ).drop(columns=[prod_pq]) else: series_obj = prod_pq_series return series_obj[column].tail(n_most)
[docs]def get_column_values_from_datasets(column: str, datasets: Dict[str, pd.DataFrame]) -> pd.DataFrame: """ Concatenate column values from multiple datasets into a single DataFrame. This function filters and aggregates specified column values from multiple `pandas.DataFrames` within a dictionary, indexed by model names. It ensures that the final DataFrame does not have duplicate indices by keeping the last occurrence and sorts the index in ascending order. The data is also rounded to two decimal places for uniformity. Parameters ---------- column : str The name of the column to extract from each DataFrame. datasets : Dict[str, pd.DataFrame] A dictionary mapping model names to their respective `pandas.DataFrame`. Returns ------- pd.DataFrame A `pandas.DataFrame` containing the aggregated column values from the provided `datasets`. The `DataFrame` is indexed by the model names, sorted, and with duplicates removed. Examples -------- >>> datasets = {'model1': pd.DataFrame({'A': [1.111, 2.222]}), ... 'model2': pd.DataFrame({'A': [3.333, 4.444]})} >>> get_column_values_from_datasets('A', datasets) A 0 3.33 1 4.44 Notes ----- The main reason for creating this function is to make sure that we can collect all the relevant data for a certain tag. Different filters and transformations are applied to each `DataFrame` values from the `datasets` dictionary. This might end up making some tag values to have different shapes depending on which key from `datasets` we use to select its values. By combining all values from every dataset that uses the tag, and making sure that we're not duplicating indexes, allows us to retrieve and operate on all available data. """ return ( pd.concat( [datasets[model_name][column].to_frame(column) for model_name in find_models_by_tag(column, datasets)] ) .loc[lambda xdf: ~xdf.index.duplicated(keep='last')] .sort_index() .round(2) )
[docs]def multi_merge(dataframes: List[pd.DataFrame], **merge_kwargs) -> pd.DataFrame: """ Merge multiple DataFrames into a single DataFrame using specified kwargs. This function takes a list of pandas DataFrames and reduces them into a single DataFrame by consecutively merging them from left to right using the `pd.merge` function. The merging parameters can be customized via `merge_kwargs`. Parameters ---------- dataframes : List[pd.DataFrame] A list of pandas DataFrames to be merged. **merge_kwargs : dict Keyword arguments to be passed to `pd.merge` for customizing the merge behavior, such as 'on', 'how', 'suffixes', etc. Returns ------- pd.DataFrame A DataFrame resulting from the consecutive merging of the input DataFrames according to the specified merge parameters. See Also -------- pd.merge : Merge DataFrame objects with a database-style join. """ return reduce(lambda left, right: pd.merge(left, right, **merge_kwargs), dataframes,)