"""
Module with functions that apply filters to the datasets, before regression.
These functions are executed inside the module `mltrainer`, before training
the regression models.
They're used to filter the datasets generated at the end of the preprocessing
pipeline to ensure the quality of the data used for training the models.
Notes
-----
These functions should be applied only once during the whole execution.
After the `mltrainer` module execution, a clean version of the `datasets`
dictionary is saved to disk.
The latter optimization step should use the clean version of the
`datasets` dictionary to build the optimization model.
It represents the data that the predictive models also used for training,
and additionally contain a more clean version of the data.
"""
from __future__ import annotations
from functools import reduce
from pathlib import Path
from typing import Dict
from typing import Hashable
from typing import List
from typing import Optional
from typing import Union
import numpy as np
import pandas as pd
from wip.constants import DATA_CHECKS_FOLDER_PATH
from wip.constants import DATASETS_CLEAN_FILEPATH
from wip.constants import DF_SQL_CLEAN_FILEPATH
from wip.datatools.data_checks import constant_tags_summary
from wip.logging_config import logger
from wip.ml_configs import LIMITES_FAIXA_700_750
from wip.ml_configs import LIMITES_FAIXA_750_PLUS
from wip.ml_configs import TAG_FILTERS
from wip.ml_configs import TAGS_TO_ADD
from wip.ml_configs import TAGS_TO_DROP
[docs]def mod_filtros(
data: pd.DataFrame, model_name: str, target_column: str
) -> pd.DataFrame:
"""
Apply specific filters to the data based on the model name and target column.
This function identifies the model name and applies the appropriate filter
to the data in the target column. After applying the filter, it returns the
filtered DataFrame.
Parameters
----------
data : pd.DataFrame
The DataFrame to be filtered. It should include the target column.
model_name : str
The name of the model, which determines the filter to be applied.
target_column : str
The name of the column in 'data' that the filter is to be applied to.
This column represents the target variable, and is **usually** the last
column in the DataFrame.
Returns
-------
pd.DataFrame
The filtered DataFrame.
Notes
-----
This function applies model-specific filters based on the `model_name`:
- "cm1": Filters rows where "corpo_moedor_especifico_1" > 0.
- "torque": Filters rows where `target_column` >= 3000.
- "energia_prensa": Filters rows where `target_column` >= 0.15.
- "finos": Filters rows where `target_column` <= 1.5.
- "compressao": Filters rows where `target_column` > 200.
- Otherwise: Filters rows where `target_column` > 0.
"""
if model_name.startswith("cm1"):
target_filter = data["corpo_moedor_especifico_1"] > 0
elif model_name == "torque":
target_filter = data[target_column] >= 3000
elif model_name == "energia_prensa":
target_filter = data[target_column] >= 0.15
elif model_name == "finos":
target_filter = data[target_column] <= 15
elif model_name == "compressao":
target_filter = data[target_column] > 200
else:
target_filter = data[target_column] >= 0
return data[target_filter]
[docs]def filter_status(dataset: pd.DataFrame) -> pd.DataFrame:
"""
Filter out rows with a 'status' value not equal to 1 in a dataset.
This function finds models associated with the 'status' tag in the
provided dataset, and removes rows where 'status' isn't equal to 1.
The 'status' column is then dropped from the DataFrame afterward.
Parameters
----------
dataset : pd.DataFrame
A dataset that contains a column named 'status' to filter.
Returns
-------
pd.DataFrame
The transformed dataset with rows removed where 'status' != 1,
and with the 'status' column dropped.
Notes
-----
This function uses the `query` method of pandas' DataFrame to filter rows.
It only affects datasets that have the 'status' tag associated with them.
"""
if "status" not in dataset.columns:
return dataset
logger.info("Pré-remoção status zerado: %s", f"{dataset.shape[0]:,}")
dataset = dataset.query("status == 1").drop(columns=["status"])
logger.info("Pós-remoção status zerado: %s", f"{dataset.shape[0]:,}")
return dataset
[docs]def find_models_by_tag(tag_name: str, datasets: Dict[str, pd.DataFrame]) -> List[str]:
"""
Find models that have a particular tag in their corresponding DataFrame.
This function iterates over the `datasets` dictionary where keys are model names
and values are DataFrames. It returns a list of all models whose DataFrame contains
`tag_name` as one of its columns.
Parameters
----------
tag_name : str
Tag (column name) to search for in the datasets.
datasets : dict, optional
A dictionary where keys are model names (str) and values are corresponding
DataFrames (pd.DataFrame). If not specified, the default `datasets` dictionary
is used.
Returns
-------
List[str]
A list with model names which have `tag_name` in their corresponding DataFrame.
Examples
--------
Let's assume we have a dictionary of DataFrames `datasets`:
>>> datasets = {
... 'model1': pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}),
... 'model2': pd.DataFrame({'A': [1, 2, 3], 'C': [4, 5, 6]}),
... 'model3': pd.DataFrame({'B': [1, 2, 3], 'C': [4, 5, 6]})
... }
We can find models that have column 'A':
>>> print(find_models_by_tag('A', datasets))
['model1', 'model2']
And models that have column 'C':
>>> print(find_models_by_tag('C', datasets))
['model2', 'model3']
"""
return [model for model, df in datasets.items() if tag_name in df.columns]
[docs]def find_models_by_partial_tag(
tag_pattern: str, datasets: Dict[str, pd.DataFrame]
) -> List[str]:
"""Return the names of models that contain a given tag pattern in their columns.
Parameters
----------
tag_pattern : str
The pattern to search for in the columns of the dataframes.
datasets : dict of str, DataFrame
A dictionary where keys are the names of models and values
are `pandas.DataFrame` objects. It's assumed that each DataFrame
has columns that could match the provided tag pattern.
Returns
-------
list of str
The names of the models that contain the given tag pattern in any of
their respective DataFrame columns.
"""
if not isinstance(tag_pattern, str) or not tag_pattern:
raise TypeError("tag_pattern must be a non-empty string")
if not isinstance(datasets, dict) or not datasets:
raise TypeError("datasets must be a non-empty dictionary")
return [
model
for model, data in datasets.items()
if data.columns.astype(str).str.contains(tag_pattern).any()
]
[docs]def pattern_filter(name: str, model_name_pattern: str) -> bool:
"""
Determine if a `name` matches a given `model_name_pattern`.
This function is designed to filter names based on a specific pattern.
The pattern can start or end with an asterisk (*) which denotes a wildcard.
The pattern without the wildcard should be a substring at the respective
position in the name to be considered a match.
Parameters
----------
name : str
The name to be checked against the pattern.
model_name_pattern : str
The pattern to which the name should be compared. It can start or
end with an asterisk (*) to denote a wildcard at the beginning or
the end respectively.
Returns
-------
bool
True if the name matches the model name pattern, otherwise False.
Examples
--------
>>> pattern_filter("testmodel", "test*")
True
>>> pattern_filter("testmodel", "*model")
True
>>> pattern_filter("testmodel", "model")
True
"""
name = name.lower()
model_name_pattern = model_name_pattern.lower()
if model_name_pattern.endswith("*"):
return name.startswith(model_name_pattern[:-1])
if model_name_pattern.startswith("*"):
return name.endswith(model_name_pattern[1:])
return model_name_pattern in name
[docs]def find_models_by_partial_name(
datasets: Dict[str, pd.DataFrame], model_name_pattern: str
) -> List[str]:
"""
Find model names in the datasets that match the provided pattern.
This function takes a model name pattern as input and matches it
against the model names in the provided datasets.
The pattern can be a substring to match anywhere in the model name,
or it can start or end with an asterisk (*) to match at the beginning or
end of the model name.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys are model names and values are pandas DataFrames.
Each DataFrame represents a dataset corresponding to a particular model.
model_name_pattern : str
A string representing the model name pattern to match against the dataset names.
This can be a substring to match anywhere, or start or end with an '*' to match
at the beginning or end of the model name.
Returns
-------
List[str]
A list of model names from the datasets that match the provided pattern.
Notes
-----
This function converts the model names and pattern to lowercase before matching.
Therefore, the matching is case-insensitive.
Examples
--------
>>> datasets = {
... "model1": pd.DataFrame(),
... "model2": pd.DataFrame(),
... "sample1": pd.DataFrame()
... }
>>> matched_models = find_models_by_partial_name(datasets, "*1")
>>> print(matched_models)
['model1', 'sample1']
"""
return [
model for model in datasets.keys() if pattern_filter(model, model_name_pattern)
]
[docs]def find_models_by_partial_names(
datasets: Dict[str, pd.DataFrame], model_name_patterns: List[str]
) -> List[str]:
"""
Find model names in the datasets that match any of the provided patterns.
This function takes a list of model name patterns as input and matches
each pattern against the model names in the provided datasets.
The matched model names are then returned as a list.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys are model names and values are pandas DataFrames.
Each DataFrame represents a dataset corresponding to a particular model.
model_name_patterns : List[str]
A list of strings representing the model name patterns to match against
the dataset names. Each pattern can be a substring to match anywhere,
or start or end with an '*' to match at the beginning or end of the
model name.
Returns
-------
List[str]
A list of model names from the datasets that match any of the
provided patterns.
Notes
-----
This function uses the `find_models_by_partial_name` function
to match each pattern against the model names.
It then eliminates any duplicates from the matched names.
Examples
--------
>>> datasets = {
... "model1": pd.DataFrame(),
... "model2": pd.DataFrame(),
... "sample1": pd.DataFrame()
... }
>>> matched_models = find_models_by_partial_names(datasets, ["*1", "*2"])
>>> print(matched_models)
['model1', 'sample1', 'model2']
"""
return list(
{
model_name
for model_name_pattern in model_name_patterns
for model_name in find_models_by_partial_name(datasets, model_name_pattern)
}
)
[docs]def filter_quantile(
data: pd.DataFrame, tag_name: str, low_bound: float = 0.02, up_bound: float = 0.98
) -> pd.DataFrame:
"""
Filter a DataFrame based on the quantiles of a specific column.
This function returns a subset of the given DataFrame `df` such that the
values of the column with the name `tag_name` lie between the `low_bound`
and `up_bound` percentiles of the original DataFrame.
Parameters
----------
data : pd.DataFrame
DataFrame to be filtered.
tag_name : str
Name of the column based on whose quantiles the DataFrame is to be filtered.
low_bound : float, optional
Lower percentiles value, between 0 and 1 (default is 0.02).
up_bound : float, optional
Upper percentiles value, between 0 and 1 (default is 0.98).
Returns
-------
pd.DataFrame
DataFrame after being filtered based on the percentiles of the column
`tag_name`.
Examples
--------
Suppose we have a DataFrame `df`:
>>> df = pd.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
>>> print(df)
A B
0 1 10
1 2 20
2 3 30
3 4 40
4 5 50
>>> filtered_df = filter_quantile(df, 'A', 0.25, 0.75)
>>> print(filtered_df)
A B
1 2 20
2 3 30
3 4 40
"""
pandas_series = data[tag_name]
q_low = pandas_series.quantile(low_bound)
q_hi = pandas_series.quantile(up_bound)
return data[(pandas_series >= q_low) & (pandas_series <= q_hi)]
[docs]def filter_quantile_datasets(
datasets: Dict[str, pd.DataFrame],
tag_name: str,
low_bound: float | None = None,
up_bound: float | None = None,
) -> Dict[str, pd.DataFrame]:
"""
Filter all DataFrames in a dictionary based on the quantiles of a column.
This function applies the `filter_quantile` function to each DataFrame in
the `datasets` dictionary where the column with the name `tag_name` exists.
The updated DataFrames are then returned in a dictionary with the same
structure as `datasets`.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys are model names (str) and values are corresponding
DataFrames (pd.DataFrame).
tag_name : str
Name of the column based on whose quantiles the DataFrames are to be filtered.
low_bound : float | None
Lower quantile value, between 0 and 1 (default is 0).
up_bound : float | None
Upper quantile value, between 0 and 1 (default is 1).
Returns
-------
Dict[str, pd.DataFrame]
Dictionary of DataFrames after being filtered based on the quantiles
of the column `tag_name`.
Examples
--------
Let's assume we have a dictionary of DataFrames `datasets`:
>>> datasets = {
... 'model1': pd.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]}),
... 'model2': pd.DataFrame({'A': [6, 7, 8, 9, 10], 'B': [60, 70, 80, 90, 100]})
... }
Let's filter `datasets` based on the 0.25 and 0.75 quantiles of column 'A':
>>> filtered_datasets = filter_quantile_datasets(datasets, 'A', 0.25, 0.75)
>>> for model_name, df in filtered_datasets.items():
... print(f"Model: {model_name}")
... print(df)
...
Model: model1
A B
1 2 20
2 3 30
3 4 40
Model: model2
A B
1 7 70
2 8 80
3 9 90
"""
low_bound = low_bound or 0
up_bound = up_bound or 1
for model_name in find_models_by_tag(tag_name, datasets):
datasets[model_name] = filter_quantile(
datasets[model_name], tag_name, low_bound, up_bound
)
return datasets
[docs]def filter_tag(
datasets: Dict[str, pd.DataFrame],
tag_name: str,
low_bound=None,
up_bound=None,
) -> Dict[str, pd.DataFrame]:
"""Filter all DataFrames in a dictionary based on the quantiles of a column.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys are model names (str) and values are corresponding.
tag_name : str
Name of the column based on whose quantiles the DataFrames are to be filtered.
low_bound : float, optional
Lower quantile value, between 0 and 1.
up_bound : float, optional
Upper quantile value, between 0 and 1.
Returns
-------
Dict[str, pd.DataFrame]
Dictionary of DataFrames after being filtered based on the quantiles
of the column `tag_name`.
"""
if not any([low_bound, up_bound]):
raise ValueError(
"At least one of `low_bound` and `up_bound` must be specified."
)
for model_name in find_models_by_tag(tag_name, datasets):
pandas_df = datasets[model_name]
if low_bound:
pandas_df = pandas_df[pandas_df[tag_name] >= low_bound]
if up_bound:
pandas_df = pandas_df[pandas_df[tag_name] <= up_bound]
datasets[model_name] = pandas_df
return datasets
[docs]def datasets_filter(
datasets: Dict[str, pd.DataFrame],
tags_limits: Dict[str, Dict[str, float | bool]],
) -> Dict[str, pd.DataFrame]:
"""
Filter or replace values from DataFrames in `datasets` using tags limits.
This function iterates over specified tags and their corresponding limits,
and either filters or replaces rows in each DataFrame of the input datasets.
The rows retained are those for which the value of each specified tag is
within the specified quantile or value limits.
This operation is performed in-place, and the function returns the
modified datasets.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary containing model names as keys and associated DataFrames as
values. The DataFrames should contain columns corresponding to the keys
in tags_limits.
tags_limits : Dict[str, Dict[str, float]]
Dictionary with tag names as keys. The associated value for each key
is another dictionary with the following possible keys:
- "min": lower bound to use for filtering data.
- "max": upper bound to use for filtering data.
- "replace": whether function should filter or replace values.
- "quantile": whether "min" and "max" keys are percentiles or actual bounds.
- "mode": when "replace" is set to True, this key determines the operation
that will be used to calculate the replacement value to use.
Possible options are: "mean", "median", "max", "min", "mode", "i%",
where `i` is a number between 0 and 100 and "i%" represents a percentile.
For example:
.. code-block:: python
tags_limits = {
"tag1": {"min": 0.25, "max": 0.75, "quantile": True},
"tag2": {"min": 5, "quantile": False},
"tag3": {"max": 45, "quantile": False}
"tag4": {"max": 45, "quantile": False, "replace": True},
"tag5": {"max": 45, "replace": True},
"tag6": {"max": 45, "replace": True, "mode": "median"},
"tag7": {"max": 45, "replace": True, "mode": "50%"},
}
Returns
-------
Dict[str, pd.DataFrame]
Dictionary containing the modified DataFrames with rows either filtered
or replaced based on the specified `tags_limits` dictionary.
Examples
--------
Consider a scenario with datasets containing two DataFrames 'model1' and
'model2', and we want to filter rows based on quantiles for tags 'tag1'
and 'tag2':
>>> import pandas as pd
>>> data1 = {'tag1': [1, 2, 3], 'tag2': [4, 5, 6]}
>>> data2 = {'tag1': [7, 8, 9], 'tag2': [10, 11, 12]}
>>> datasets = {'model1': pd.DataFrame(data1), 'model2': pd.DataFrame(data2)}
>>> tags_limits = {
... 'tag1': {'min': 0.10, 'max': 0.90},
... 'tag2': {'max': 11, 'quantile': False}
... }
>>> print(
... {
... model_name: df.to_dict('list') # noqa
... for model_name, df in datasets_filter(datasets, tags_limits).items()
... }
... )
{'model1': {'tag1': [2], 'tag2': [5]}, 'model2': {'tag1': [8], 'tag2': [11]}}
In this example, `filtered_datasets` will contain 'model1' and 'model2'
DataFrames with rows filtered according to the quantile limits for
'tag1' and 'tag2'.
"""
for tag_name, quantile_limit in tags_limits.items():
upper_bound = quantile_limit.get("max", None)
lower_bound = quantile_limit.get("min", None)
quantile = quantile_limit.get("quantile", True)
replace = quantile_limit.get("replace", False)
mode = quantile_limit.get("mode", "mean")
if replace:
datasets = replace_values_datasets(
datasets, tag_name, mode, lower_bound, upper_bound, quantile
)
elif quantile:
datasets = filter_quantile_datasets(
datasets, tag_name, lower_bound, upper_bound
)
else:
datasets = filter_tag(datasets, tag_name, lower_bound, upper_bound)
return datasets
[docs]def non_zero_filter(
datasets: Dict[str, pd.DataFrame],
tag_names: List[str],
) -> Dict[str, pd.DataFrame]:
"""
Filter out rows with non-positive values for specified tags in datasets.
This function traverses through the datasets and filters out the rows
where the values corresponding to the specified tag names are non-positive.
This operation is performed in-place, meaning the original DataFrames in
the datasets are modified. The function returns the modified datasets.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary containing model names as keys and corresponding DataFrames
as values. The function expects the DataFrames to have columns
corresponding to the tag_names provided.
tag_names : List[str]
List of strings representing the tag names to be considered for
filtering rows in the DataFrames. Rows with non-positive values for
these tags will be filtered out.
Returns
-------
Dict[str, pd.DataFrame]
Dictionary containing the modified DataFrames with rows having
non-positive values for the specified tags filtered out.
Notes
-----
The `find_models_by_tag` function is used internally to determine which
models (i.e., keys of the `datasets` dictionary) are associated with each
tag name. This implies that `find_models_by_tag` needs to be correctly
implemented and available in the scope of this function.
Examples
--------
Suppose we have a dataset dictionary with two DataFrames 'model1' and
'model2', and we want to filter out rows with non-positive values for
the tags 'tag1' and 'tag2':
>>> import pandas as pd
>>> data1 = {'tag1': [1, -1, 3], 'tag2': [4, 5, 6]}
>>> data2 = {'tag1': [0, 2, 3], 'tag2': [-1, 8, 9]}
>>> datasets = {'model1': pd.DataFrame(data1), 'model2': pd.DataFrame(data2)}
>>> tag_names = ['tag1', 'tag2']
>>> filtered_datasets = non_zero_filter(datasets, tag_names)
Now, `filtered_datasets` will contain the modified 'model1' and 'model2'
DataFrames with non-positive values for 'tag1' and 'tag2' filtered out.
"""
for tag_name in tag_names:
for model_name in find_models_by_tag(tag_name, datasets):
data = datasets[model_name]
datasets[model_name] = data[data[tag_name] > 0]
return datasets
[docs]def filter_inf(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
"""Filter out rows with any value equal to `np.inf` or `-np.inf`.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary with dataframes to filter row values equal to infinity.
Returns
-------
Dict[str, pd.DataFrame]
Dictionary of dataframes without rows with values equal to infinity.
"""
filtered_datasets = {}
for dataset_name, dataset in datasets.items():
inf_mask = (dataset == np.inf) | (dataset == -np.inf)
filtered_dataset = dataset[~inf_mask.any(axis=1)]
filtered_datasets[dataset_name] = filtered_dataset
return filtered_datasets
[docs]def clip_negative_values(
datasets: Dict[str, pd.DataFrame],
negative_rate_limit: float = 0.05,
verbose: bool = True,
) -> Dict[str, pd.DataFrame]:
"""Clip negative values in datasets.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary of datasets to clip negative values.
negative_rate_limit : float, optional
Maximum rate of negative values in a column to be clipped,
by default 0.05. In other words, if less than 5% of the values
in a column are negative, they'll be clipped to 0.
verbose : bool, default=True
If `True`, prints the name of the column being clipped.
Returns
-------
Dict[str, pd.DataFrame]
Dictionary of datasets with negative values clipped.
"""
for model_name, data in datasets.items():
target_name = data.columns[-1]
row_count = data.shape[0]
for column in data.columns:
negative_count = (data[column] < 0).sum()
if negative_count > 0 and negative_count / row_count <= negative_rate_limit:
if verbose:
logger.warning(
"%s: %s - clipping negative values.", model_name, column
)
data.loc[data[column] < 0, column] = 0
target_values = data.pop(target_name)
data[target_name] = target_values
datasets[model_name] = data
return datasets
[docs]def replace_values_by_mean(
datasets: Dict[str, pd.DataFrame],
tag_names: List[str],
lower_bound: Union[float, int],
) -> Dict[str, pd.DataFrame]:
"""
Replace values in specified columns of DataFrames with the column mean.
For each name in tag_names, this function identifies the associated
models and replaces the columns values, which are below a
given `lower_bound` or are `NaN`, with the mean of the non-NaN values
in the respective column.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary containing DataFrames with different model names as keys.
tag_names : List[str]
A list of strings representing the names of the columns in the
DataFrames to be processed.
lower_bound : Union[float, int]
The lower bound threshold value.
If a value in the specified column is below this value or is NaN,
it will be replaced by the average non-NaN values of that column.
Returns
-------
Dict[str, pd.DataFrame]
A dictionary containing the modified DataFrames.
Notes
-----
The function modifies the input DataFrames in-place but also returns
the modified DataFrames in a dictionary. The `find_models_by_tag` is a
function that should be defined elsewhere in your code, which searches
for models in the provided datasets using the tag and returns the names
of found models.
Examples
--------
>>> datasets = {"model1": pd.DataFrame({"tag1": [1, 2, np.nan, 4]})}
>>> modified_datasets = replace_values_by_mean(datasets, ["tag1"], 2)
>>> modified_datasets["model1"]["tag1"].tolist()
[2.3333333333333335, 2, 2.3333333333333335, 4]
"""
for tag_name in tag_names:
for model_name in find_models_by_tag(tag_name, datasets):
data = datasets[model_name]
data.loc[
(data[tag_name] <= lower_bound) | (data[tag_name].isna()), tag_name
] = data[tag_name].mean()
datasets[model_name] = data
return datasets
[docs]def fill_empty_columns(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
"""
Fill DataFrame columns that have only `NaN` values with zeros.
For each DataFrame in the provided dictionary, this function identifies
columns that have `NaN` values for all rows and replaces them with zeros.
A log is generated detailing which columns in which DataFrames were altered.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary with keys as model names (or any descriptive string) and
values as pandas DataFrames. Each DataFrame is checked for columns
that are entirely `NaN` and these columns are filled with zeros.
Returns
-------
Dict[str, pd.DataFrame]
Dictionary of DataFrames, identical to the input but with columns
that were entirely `NaN` filled with zeros.
Notes
-----
This function modifies the input DataFrames in place but also returns
the modified DataFrames for convenience.
Logging is performed for tracking which DataFrames (by their keys in the
input dictionary) had columns filled with zeros.
Examples
--------
>>> import pandas as pd
>>> datasets = {"modelA": pd.DataFrame({"A": [1, 2], "B": [None, None]}),
... "modelB": pd.DataFrame({"A": [None, None], "B": [3, 4]})}
>>> filled_datasets = fill_empty_columns(datasets)
>>> filled_datasets["modelA"]
A B
0 1 0.0
1 2 0.0
"""
for model_name, data in datasets.items():
empty_columns = [col for col in data.columns if data[col].isna().all()]
if empty_columns:
logger.info(
"Filling with 0 the following empty columns from %s: %s",
model_name,
empty_columns,
)
data.loc[:, empty_columns] = 0
datasets[model_name] = data
return datasets
[docs]def fill_all_missing_columns(dataset: pd.DataFrame) -> pd.DataFrame:
"""Fill columns that contain only `NaN` values with zeros.
Parameters
----------
dataset : pd.DataFrame
DataFrame to be processed.
Returns
-------
pd.DataFrame
DataFrame with columns that only contain `NaN` values filled with zeros.
"""
all_missing_columns = [col for col in dataset.columns if dataset[col].isna().all()]
if all_missing_columns:
logger.warning(
"Filling with 0 the following empty columns: %s", all_missing_columns
)
dataset.loc[:, all_missing_columns] = 0
return dataset
[docs]def fix_pres_tag(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
"""Fix the "PRES6_I@08QU-HO-851I-01" tag."""
tag_name = "PRES6_I@08QU-HO-851I-01"
model_names = find_models_by_tag(tag_name, datasets)
name = max(model_names, key=lambda model: datasets[model][tag_name].notnull().sum())
for model_name in set(model_names) - {name}:
target_name = datasets[model_name].columns[-1]
target = datasets[model_name].pop(target_name)
new_data = (
datasets[model_name]
.drop(columns=[tag_name])
.merge(
datasets[name][tag_name],
left_index=True,
right_index=True,
how="left",
)
)
new_data[target_name] = target
datasets[model_name] = new_data
return datasets
[docs]def get_replacement_value(filtered_data: pd.Series, mode: str = "mean") -> float:
"""
Determine the replacement value based on a given `mode`.
This function calculates the replacement value from the `filtered_data`
series based on the specified mode. The `mode` can be:
- "median": Median of the filtered values.
- "mode": Mode of the filtered values.
- "min": Minimum of the filtered values.
- "max": Maximum of the filtered values.
- "i%" where i is a number between 0 and 100 that represents the
percentile of the filtered values to use as the replacement value.
For example, `"75%"` will return the 75th percentile of the filtered
values.
Parameters
----------
filtered_data : pd.Series
The data series from which the replacement value is to be calculated.
mode : str {"mean", "median", "min", "max", "i%"}, default="mean"
The method used to determine the replacement value.
Returns
-------
float
The calculated replacement value.
Raises
------
ValueError
If the provided `mode` is not one of the recognized modes, or if an
invalid percentile value is given.
"""
if mode == "min":
return filtered_data.min()
if mode == "max":
return filtered_data.max()
if mode == "mean":
return filtered_data.mean()
if mode == "median":
return filtered_data.median()
if "%" in mode:
percentile = float(mode.replace('%', '')) / 100
if 0 <= percentile <= 100:
return filtered_data.quantile(q=percentile)
raise ValueError(f"Invalid percentile value: {mode}")
raise ValueError(f"Invalid mode '{mode}' provided.")
[docs]def replace_values(
dataset: pd.DataFrame,
column: str,
mode: str = "mean",
low_bound: float | int | None = None,
up_bound: float | int | None = None,
) -> pd.DataFrame:
"""
Replace values in a dataset column based on specified bounds.
For the values in the specified column of the dataset that fall outside the
range defined by `low_bound` and `up_bound`, this function replaces them with
a value determined by the `mode` parameter.
The `mode` parameter determines the operation performed on the filtered out
values. For example, if `mode` is set to "mean", the mean of the filtered
values is used as the replacement value.
Parameters
----------
dataset : pd.DataFrame
The dataset in which values are to be replaced.
column : str
The name of the column in the dataset where values are to be replaced.
mode : str {"mean", "median", "min", "max", "i%"}, default="mean"
The method used to determine the replacement value.
Other potential methods could be:
- "median": Median of the filtered values.
- "mode": Mode of the filtered values.
- "min": Minimum of the filtered values.
- "max": Maximum of the filtered values.
- "i%" where i is a number between 0 and 100 that represents the
percentile of the filtered values to use as the replacement value.
low_bound : float | int | None, optional
The lower bound for the range of acceptable values in the column.
If None, no lower bound is set. At least one of the parameters
`low_bound` or `up_bound` must be set.
up_bound : float | int | None, optional
The upper bound for the range of acceptable values in the column.
If None, no upper bound is set. At least one of the parameters
`low_bound` or `up_bound` must be set.
Returns
-------
pd.DataFrame
The dataset with values in the specified column replaced according to
the given conditions.
Raises
------
ValueError
If neither `low_bound` nor `up_bound` is set.
TypeError
If `column` is not of type string or hashable.
KeyError
If `column` does not exist inside the dataset.
"""
if not any([low_bound, up_bound]):
raise ValueError(
"No value for either `low_bound` nor `up_bound` is set. "
"At least one of the parameters must be set to a numeric value."
)
if not isinstance(column, Hashable):
raise TypeError(
f"Parameter column of invalid type: {type(column)}. "
"`column` needs to be of type string or hashable."
)
if column not in dataset.columns:
raise KeyError(f"Column {column!r} does not exist inside dataset.")
# Apply the filter conditions
if low_bound is None:
dataset_filters = dataset[column] <= up_bound
elif up_bound is None:
dataset_filters = dataset[column] >= low_bound
else:
dataset_filters = (dataset[column] <= up_bound) & (dataset[column] >= low_bound)
# Get the replacement value based on mode
filtered_data = dataset.loc[dataset_filters, column]
fill_value = get_replacement_value(filtered_data, mode)
# Replace values outside the bounds
dataset.loc[~dataset_filters, column] = fill_value
return dataset
[docs]def replace_values_datasets(
datasets: Dict[str, pd.DataFrame],
tag_name: str,
mode: str = "mean",
low_bound: float | None = None,
up_bound: float | None = None,
is_quantile: bool = False,
) -> Dict[str, pd.DataFrame]:
"""
Replace values in specified column of each DataFrame from a dictionary.
This function iterates over the models that have the specified tag in their
DataFrame, retrieves the dataset for each model, and calls the `replace_values`
function to replace the values in the specified column based on the provided
bounds and mode. The function then updates the dictionary of datasets with
the modified DataFrame for each model.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary containing model names as keys and associated DataFrames as values.
tag_name : str
The name of the column in the dataset where values are to be replaced.
mode : str {'mean', 'i%', 'min', 'max', 'median', 'mode'}, default='mean'
The method used to determine the replacement value.
low_bound : float | None
The lower bound for the range of acceptable values in the column.
up_bound : float | None
The upper bound for the range of acceptable values in the column.
is_quantile : bool, default=False
Indicates whether the bounds are quantiles or actual bounds.
Returns
-------
The dictionary containing the modified DataFrames with values replaced in
the specified column based on the provided `bounds` and `mode`.
"""
for model_name in find_models_by_tag(tag_name, datasets):
dataset = datasets[model_name]
_low_bound, _up_bound = low_bound, up_bound
if is_quantile:
if low_bound is not None:
_low_bound = dataset[tag_name].quantile(low_bound)
if up_bound is not None:
_up_bound = dataset[tag_name].quantile(up_bound)
datasets[model_name] = replace_values(
datasets[model_name], tag_name, mode, _low_bound, _up_bound
)
return datasets
[docs]def filter_temperature_groups(
datasets: Dict[str, pd.DataFrame],
df_sql: pd.DataFrame,
reduced_limits: Optional[Dict[str, Dict[str, float]]] = None,
normal_limits: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, pd.DataFrame]:
"""
Replace values from temperature groups tags outside its safety limits.
For each production range, this function determines which values from a
given tag are outside its established safety limits. Then it replaces these
values with the average of all values that are within the safety limits for
that production range for that tag.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary with dataframes to filter temperature groups.
df_sql : pd.DataFrame
A `pandas.Dataframe` with all available tags.
reduced_limits : Optional[Dict[str, Dict[str, float]]]
The reduced production range temperature limits.
These limits are applied for rows where the column `'PROD_PQ_Y@08US'
is between 700 and 750.
normal_limits : Optional[Dict[str, Dict[str, float]]]
The normal production range temperature limits.
These limits are applied for rows where the column `'PROD_PQ_Y@08US'
is between 750 and 1000.
Returns
-------
Dict[str, pd.DataFrame]
Updated dictionary of dataframes.
Raises
------
ValueError
If for a given tag and production range, no values within the safety
limits exist. If this occurs, then the average value calculated will
return NaN, which can't be used to replace the values outside the
safety limits.
"""
new_datasets = {}
reduced_limits = reduced_limits or LIMITES_FAIXA_700_750
normal_limits = normal_limits or LIMITES_FAIXA_750_PLUS
for name, df in datasets.items():
if df.columns.intersection(list(reduced_limits.keys())).size == 0:
new_datasets[name] = df
continue
remove_prod_pq = False
if "PROD_PQ_Y@08US" not in df.columns:
df = df.merge(
df_sql["PROD_PQ_Y@08US"], left_index=True, right_index=True, how="inner"
)
remove_prod_pq = True
# ldf = Low production range values (PROD_PQ_Y@08US < 750)
# rdf = High production range values (PROD_PQ_Y@08US >= 750)
# Values with PROD_PQ_Y@08US < 700 or >= 1000 are not considered
# and therefore, removed from the set of values to be used for the
# training of the regression models or creation of the optimization
# problem.
ldf = df.loc[(df["PROD_PQ_Y@08US"] >= 700) & (df["PROD_PQ_Y@08US"] < 750), :]
rdf = df.loc[(df["PROD_PQ_Y@08US"] >= 750) & (df["PROD_PQ_Y@08US"] < 1000), :]
for col, limits in normal_limits.items():
reduced_limit = reduced_limits[col]
if col in df.columns:
# Calculate the average value within the tag
# safety limits, for production range smaller than 750 ton/hour.
ldf_values_inside_limits = ldf.loc[
(ldf[col] < reduced_limit["max"])
& (ldf[col] >= reduced_limit["min"]),
col,
]
avg_value_ldf = ldf_values_inside_limits.mean()
# Calculate the average value within the tag
# safety limits, for production range greater than 750 ton/hour.
rdf_values_inside_limits = rdf.loc[
(rdf[col] < limits["max"]) & (rdf[col] >= limits["min"]), col
]
avg_value_rdf = rdf_values_inside_limits.mean()
if pd.isna(avg_value_ldf) or ldf_values_inside_limits.shape[0] <= int(
0.20 * ldf.shape[0]
):
logger.warning(
"Column '%s' for production range '<750' very few "
"values within %s <= col <= %s. Using mean value between "
"the limits lower- and upper-bounds instead.",
col,
reduced_limit["min"],
reduced_limit["max"],
)
avg_value_ldf = np.mean(
[reduced_limit["max"], reduced_limit["min"]]
)
if pd.isna(avg_value_rdf) or rdf_values_inside_limits.shape[0] <= int(
0.10 * rdf.shape[0]
):
logger.warning(
"Column '%s' for production range '≥750' very few "
"values within %s <= col <= %s. Using mean value between "
"the limits lower- and upper-bounds instead.",
col,
limits["min"],
limits["max"],
)
avg_value_rdf = np.mean([limits["max"], limits["min"]])
ldf.loc[
(ldf[col] >= reduced_limit["max"])
| (ldf[col] < reduced_limit["min"]),
col,
] = avg_value_ldf
rdf.loc[
(rdf[col] >= limits["max"]) | (rdf[col] < limits["min"]), col
] = avg_value_rdf
df = pd.concat([ldf, rdf]).sort_index()
if remove_prod_pq:
df = df.drop(columns=["PROD_PQ_Y@08US"], errors="ignore")
new_datasets[name] = df
return new_datasets
[docs]def check_adjacent_temp_group_column_names(
temp_col_left: str, temp_col_right: str
) -> bool:
"""
Check if the last two characters of the provided column names are integers.
This function checks if the last two characters of the provided column names
are integers and if the integer in the right column name is one greater than
the integer in the left column name. The function returns `True` if the
conditions are met, and `False` otherwise.
Parameters
----------
temp_col_left : str
The left column name.
temp_col_right : str
The right column name.
Returns
-------
bool
`True` if the last two characters of the provided column names are integers
and if the integer in the right column name is one greater than the integer
in the left column name. `False` otherwise.
Raises
------
ValueError
If the last two characters of the provided column names are not integers.
Examples
--------
>>> check_adjacent_temp_group_column_names('TEMP01', 'TEMP02')
True
>>> check_adjacent_temp_group_column_names('TEMP01', 'TEMP03')
False
>>> check_adjacent_temp_group_column_names('TEMP01', 'TEMP1')
Traceback (most recent call last):
...
ValueError: The values found in the last two ... temp_col_left: TEMP01, ...
"""
try:
temp_col_left_int = int(temp_col_left[-2:])
temp_col_right_int = int(temp_col_right[-2:])
return temp_col_left_int + 1 == temp_col_right_int
except ValueError as exc:
raise ValueError(
"The values found in the last two characters from `temp_col_left_int` "
"and `temp_col_right` are not integers. "
f"temp_col_left: {temp_col_left}, temp_col_right: {temp_col_right}"
) from exc
[docs]def filter_bentonita_datasets(datasets: dict) -> dict:
datasets_new = {}
for name, df in datasets.items():
if "bentonita" in df.columns:
nrows = df.shape[0]
df = df.loc[~((df["bentonita"] > 0) & (df["bentonita"] < 5)), :]
new_nrows = df.shape[0]
removed_rows = nrows - new_nrows
print(
f"{name} - Removed a total of: {removed_rows:,} | New rows quantity: {new_nrows:,}"
)
datasets_new[name] = df
return datasets_new
[docs]def get_production(datasets: Dict[str, pd.DataFrame]) -> pd.Series:
"""
Extract unique production PQ indexes within a specified range from datasets.
This function sifts through a collection of datasets to identify and operate on those
that include the 'PROD_PQ_Y@08US' column. It applies a filtering based on the specified
minimum and maximum range values, `range_min` and `range_max`, on the 'PROD_PQ_Y@08US'
column to select relevant rows. The indexes of these rows are then gathered across all
qualifying datasets, consolidated into a single series, deduplicated, and returned as
a pandas DatetimeIndex.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A mapping of dataset names (strings) to pandas DataFrame objects. Each DataFrame
contains one or more datasets with various columns, of which 'PROD_PQ_Y@08US' is
targeted for filtering.
Returns
-------
pd.Series
A DatetimeIndex composed of unique indexes from all rows across the filtered
datasets where the 'PROD_PQ_Y@08US' values lie within the specified range.
"""
indexes = []
for name in [
name for name, df in datasets.items() if "PROD_PQ_Y@08US" in df.columns
]:
indexes.append(datasets[name]["PROD_PQ_Y@08US"])
return pd.concat(indexes).drop_duplicates()
[docs]def get_production_pc(datasets: Dict[str, pd.DataFrame]) -> pd.Series:
"""
Extract unique production PC indexes from multiple datasets.
This function iterates through a dictionary of datasets, filtering for those that
contain the `'PROD_PC_I@08US'` column. It then filters these datasets based on a minimum
value (`range_min`) and extracts their indexes. These indexes are then concatenated,
deduplicated, and returned as a pandas DatetimeIndex object.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys are dataset names (str) and values are pandas DataFrames.
Each DataFrame should represent a different dataset.
Returns
-------
pd.Series
A DatetimeIndex containing unique indexes from all filtered datasets where the
'PROD_PC_I@08US' values are equal to or greater than `range_min`.
See Also
--------
pd.DataFrame.loc : Access a group of rows and columns by the label(s) or a boolean array.
pd.concat : Concatenate pandas objects along a particular axis with optional set logic
along the other axes.
wip.otm.build_restrictions : Build optimization problem restrictions.
"""
indexes = []
for name in [
name for name, df in datasets.items() if "PROD_PC_I@08US" in df.columns
]:
indexes.append(datasets[name]["PROD_PC_I@08US"])
return pd.concat(indexes).drop_duplicates()
[docs]def clean_data(
datasets: Dict[str, pd.DataFrame],
df_sql: pd.DataFrame,
skip_transformations: bool = False,
datasets_filepath: str = DATASETS_CLEAN_FILEPATH,
df_sql_filepath: str = DF_SQL_CLEAN_FILEPATH,
constant_tags_summary_filepath: str | None = None,
**kwargs,
) -> Dict[str, pd.DataFrame]:
"""
Clean and transform input datasets according to specified rules.
This function performs a series of cleaning and transformation operations on
given datasets and an SQL data frame. It can optionally skip transformations,
apply specific filters, drop or add tags based on configurations, and output
the processed datasets to a specified path.
It also generates a constant tags summary report.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys are model names and values are their corresponding DataFrame.
df_sql : pd.DataFrame
A `pandas.DataFrame` containing SQL data relevant to the models in the datasets.
skip_transformations : bool, optional
Flag to skip all transformation operations, default is False.
datasets_filepath : str, optional
File path for saving the cleaned datasets, default is DATASETS_CLEAN_FILEPATH.
df_sql_filepath : str, optional
File path for saving the cleaned SQL DataFrame, default is DF_SQL_CLEAN_FILEPATH.
constant_tags_summary_filepath : str or None, optional
File path for saving the constant tags summary reports, default is None,
which sets it to `DATA_CHECKS_FOLDER_PATH/constant_tags_summary.xlsx`.
**kwargs : dict
Additional keyword arguments for cleaning configurations. Supported keywords:
- drop_models: List of model names to be dropped from datasets.
- tags_to_drop: List of tags to be removed from the datasets.
- tags_to_add: List of tags to be added from the SQL data to the datasets.
- tag_filters: List of tag filters to apply to the datasets.
Returns
-------
Dict[str, pd.DataFrame]
The cleaned and optionally transformed datasets as a dictionary.
Raises
------
Exception
If an error occurs during the generation of the constant tags summary report.
See Also
--------
cleaner, drop_tags, add_tags_to_models, filter_inf, clip_negative_values, datasets_filter,
fix_pres_tag, fill_empty_columns, filter_temperature_groups : Functions used for data cleaning and transformations.
Notes
-----
The cleaning process involves dropping specified models, removing or adding tags, applying
various data filters, and handling specific cleaning tasks like clipping negative values or
filtering based on temperature groups.
"""
from wip.datatools import cleaner
from wip.datatools import to_joblib
from wip.temporary import filter_corpo_moedor_especifico
if constant_tags_summary_filepath is None:
constant_tags_summary_filepath = Path(DATA_CHECKS_FOLDER_PATH).joinpath(
"constant_tags_summary.xlsx"
)
drop_models = kwargs.pop("drop_models", ["temp_precipitador", "torque"])
tags_to_drop = kwargs.pop("tags_to_drop", TAGS_TO_DROP)
tags_to_add = kwargs.pop("tags_to_add", TAGS_TO_ADD)
tag_filters = kwargs.pop("tag_filters", TAG_FILTERS)
for model in drop_models:
datasets.pop(model, None)
if skip_transformations:
return datasets
datasets = drop_tags(datasets, tags_to_drop)
datasets = add_tags_to_models(datasets, df_sql, tags_to_add)
# List of columns that contain less than or equal to 2 unique values.
# These columns will be skipped from the outlier removal process to ensure
# that one of the two unique values won't be replaced by the other value.
# The function `auto_clean_datasets` might replace one of the values from
# these binary columns if there's a big imbalance between the two classes
# of values.
datasets = cleaner(datasets)
datasets = filter_corpo_moedor_especifico(datasets)
datasets = filter_inf(datasets)
datasets = clip_negative_values(datasets)
datasets = datasets_filter(datasets, tag_filters)
datasets = fix_pres_tag(datasets)
datasets = fill_empty_columns(datasets)
datasets = filter_temperature_groups(datasets, df_sql)
to_joblib(datasets, datasets_filepath)
to_joblib(df_sql, df_sql_filepath)
try:
constant_tags_summary(df_sql, datasets, constant_tags_summary_filepath, None)
except Exception as exc:
logger.exception(exc)
logger.error(
"Failed to save constant_tags_summary report to %s.",
constant_tags_summary_filepath,
)
return datasets
[docs]def get_most_recent_values(
datasets: Dict[str, pd.DataFrame],
column: str,
faixa: str | None = None,
n_most: int = 15,
) -> pd.Series:
"""
Get the `n_most` recent records of `column` for production range `faixa`.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary containing the data from all models datasets.
column : str
Column to retrieve the last values from.
faixa : str | None, default=None
Which production ranges to retrieve the `n_most` records of
`column` from. If None, retrieve the last `n_most` records,
not considering a production range.
n_most : int default=15
How many records to retrieve for `column`.
Returns
-------
pd.Series
A `pandas.Series` with the `n_most` records for `column` that
have `faixa` production range.
"""
prod_pq = "PROD_PQ_Y@08US"
series_obj = get_column_values_from_datasets(column, datasets)
if faixa is None:
return series_obj[column].tail(n_most)
prod_pq_series = get_column_values_from_datasets(prod_pq, datasets)
lb_faixa, ub_faixa = tuple(map(int, faixa.split("-")))
prod_pq_series = prod_pq_series.loc[
(prod_pq_series[prod_pq] >= lb_faixa) & (prod_pq_series[prod_pq] < ub_faixa),
:,
]
if column != prod_pq:
series_obj = series_obj.merge(
prod_pq_series, left_index=True, right_index=True, how="inner"
).drop(columns=[prod_pq])
else:
series_obj = prod_pq_series
return series_obj[column].tail(n_most)
[docs]def get_column_values_from_datasets(column: str, datasets: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Concatenate column values from multiple datasets into a single DataFrame.
This function filters and aggregates specified column values from multiple
`pandas.DataFrames` within a dictionary, indexed by model names.
It ensures that the final DataFrame does not have duplicate indices by
keeping the last occurrence and sorts the index in ascending order.
The data is also rounded to two decimal places for uniformity.
Parameters
----------
column : str
The name of the column to extract from each DataFrame.
datasets : Dict[str, pd.DataFrame]
A dictionary mapping model names to their respective `pandas.DataFrame`.
Returns
-------
pd.DataFrame
A `pandas.DataFrame` containing the aggregated column values from
the provided `datasets`. The `DataFrame` is indexed by the model names,
sorted, and with duplicates removed.
Examples
--------
>>> datasets = {'model1': pd.DataFrame({'A': [1.111, 2.222]}),
... 'model2': pd.DataFrame({'A': [3.333, 4.444]})}
>>> get_column_values_from_datasets('A', datasets)
A
0 3.33
1 4.44
Notes
-----
The main reason for creating this function is to make sure that we can collect
all the relevant data for a certain tag.
Different filters and transformations are applied to each `DataFrame` values
from the `datasets` dictionary. This might end up making some tag values
to have different shapes depending on which key from `datasets` we use to
select its values. By combining all values from every dataset that uses
the tag, and making sure that we're not duplicating indexes, allows
us to retrieve and operate on all available data.
"""
return (
pd.concat(
[datasets[model_name][column].to_frame(column)
for model_name in find_models_by_tag(column, datasets)]
)
.loc[lambda xdf: ~xdf.index.duplicated(keep='last')]
.sort_index()
.round(2)
)
[docs]def multi_merge(dataframes: List[pd.DataFrame], **merge_kwargs) -> pd.DataFrame:
"""
Merge multiple DataFrames into a single DataFrame using specified kwargs.
This function takes a list of pandas DataFrames and reduces them into a single
DataFrame by consecutively merging them from left to right using the `pd.merge`
function. The merging parameters can be customized via `merge_kwargs`.
Parameters
----------
dataframes : List[pd.DataFrame]
A list of pandas DataFrames to be merged.
**merge_kwargs : dict
Keyword arguments to be passed to `pd.merge` for customizing the merge
behavior, such as 'on', 'how', 'suffixes', etc.
Returns
-------
pd.DataFrame
A DataFrame resulting from the consecutive merging of the input DataFrames
according to the specified merge parameters.
See Also
--------
pd.merge : Merge DataFrame objects with a database-style join.
"""
return reduce(lambda left, right: pd.merge(left, right, **merge_kwargs), dataframes,)