"""
This Module defines the functions needed to apply SHAP to ML models and datasets.
Functions
---------
This module defines the following functions:
* ``preprocess_df_train``: Preprocess the dataframe.
* ``filter_status_column``: Filter out columns from the dataframe.
* ``filter_production_range``: Filter the dataframe using the production range.
* ``select_best_model``: Select the best ridge regression model based on a metric.
* ``compute_shap_values``: Compute SHAP values for a given Ridge model and dataset.
* ``process_columns``: Process and extract relevant columns based on certain conditions.
* ``apply_shap``: Process and apply SHAP (SHapley Additive exPlanations) to datasets.
The main function called inside ``wip.otm.py`` is ``apply_shap``.
"""
from __future__ import annotations
from typing import Dict
from typing import List
import numpy as np
import pandas as pd
import shap
import sklearn
from scipy.interpolate import interp1d
from wip.constants import constants
from wip.logging_config import logger
from wip.modules import ops as operations
[docs]def preprocess_df_train(df_train: pd.DataFrame) -> pd.DataFrame:
"""Preprocess the dataframe by filtering and handling missing and infinite values.
This function filters out rows where "PROD_PQ_Y@08US" is non-positive,
replaces infinite values with NaN, interpolates NaN values using linear
interpolation, and then fills any remaining NaN values with 0.
Parameters
----------
df_train : pd.DataFrame
Input dataframe to be preprocessed.
Returns
-------
pd.DataFrame
Preprocessed dataframe with non-positive "PROD_PQ_Y@08US" rows removed,
and missing and infinite values handled.
"""
df_train = (df_train.replace([np.inf, -np.inf], np.nan).interpolate(
"linear", limit_direction="both").fillna(0))
return df_train[(df_train["PROD_PQ_Y@08US"] > 0)]
[docs]def filter_status_column(df_train: pd.DataFrame) -> pd.DataFrame:
"""
Filter out columns with names containing 'status' or 'ProducaoPQ_Moagem'.
This function identifies and drops columns from the input dataframe if the
column names contain the substring 'status' or 'ProducaoPQ_Moagem'.
Parameters
----------
df_train : pd.DataFrame
Input dataframe from which columns need to be filtered out.
Returns
-------
pd.DataFrame
A ``pandas.Dataframe`` without the columns containing the substrings
'status' or 'ProducaoPQ_Moagem'.
"""
status_column = [
column_name for column_name in df_train.columns
if "status" in column_name or "ProducaoPQ_Moagem" in column_name
]
if status_column:
df_train = df_train.drop(columns=status_column)
return df_train
[docs]def filter_production_range(
df_train: pd.DataFrame,
range_min: int,
range_max: int,
prod_pq: bool,
) -> pd.DataFrame:
"""
Filter the dataframe based on a production range and optionally drop a column.
This function filters the provided dataframe based on a specified range of
production values from the column "PROD_PQ_Y@08US". Additionally,
it can drop the "PROD_PQ_Y@08US" column from the resultant dataframe
if the ``prod_pq`` parameter is set to True.
Parameters
----------
df_train : pd.DataFrame
Input dataframe to apply the filter to.
range_min : int
Minimum value of the production range for filtering.
range_max : int
Maximum value of the production range for filtering.
prod_pq : bool
If True, drop the "PROD_PQ_Y@08US" column from the filtered dataframe.
Returns
-------
pd.DataFrame
Filtered dataframe.
"""
df_train_c = df_train[(df_train["PROD_PQ_Y@08US"] >= range_min)
& (df_train["PROD_PQ_Y@08US"] <= range_max)].copy()
if prod_pq:
df_train_c = df_train_c.drop(columns=["PROD_PQ_Y@08US"])
return df_train_c
[docs]def select_best_model(models_results: list, metric: str = "MAPE") -> int:
"""
Select the best model based on the specified evaluation metric.
This function takes a list of dictionaries containing model results,
constructs a DataFrame to collate the performance metrics, and then sorts
the models based on the specified metric.
The function returns the index of the best model.
Parameters
----------
models_results : list
A list of dictionaries where each dictionary has keys:
- ``"conf"``: Configuration or name of the model.
- ``"model"``: Trained model object.
- ``"metrics"``: A dictionary of performance metrics that includes:
- ``"mse"``: Mean squared error.
- ``"mape"``: Median absolute percentage error.
- ``"r2"``: R-squared for the test set.
- ``"r"``: R-squared for the train set.
- ``"r2_train"``: R-squared for the train set.
- ``"r2_train_adj"``: Adjusted R-squared
metric : str {"MSE", "MAPE", "R2", "R", "R2 Train", "R2 Train Adj"}, default="MAPE"
The evaluation metric based on which the models are to be ranked.
Possible options are: ``"MSE"``, ``"R2"``, ``"R"``, ``"R2 Train"``, and ``"R2 Train Adj"``.
Returns
-------
int
Index of the best model based on the specified metric.
"""
df_result = pd.DataFrame(
[(
best["conf"],
str(best["model"].get_params()),
best["metrics"]["mse"],
best["metrics"]["mape"],
best["metrics"]["r2"],
best["metrics"]["r"],
best["metrics"]["r2_train"],
best["metrics"]["r2_train_adj"],
) for best in models_results],
columns=[
"Modelo",
"Params",
"MSE",
"MAPE",
"R2",
"R",
"R2 Train",
"R2 Train Adj",
],
)
return df_result.sort_values(by=[metric]).index[0]
[docs]def compute_shap_values(model: sklearn.linear_model.Ridge,
dataset: pd.DataFrame) -> np.ndarray:
"""Compute SHAP values for a given Ridge model and dataset.
This function uses the ``shap.LinearExplainer`` to compute the SHAP values
for the provided Ridge regression model and dataset. The dataset is expected
to have the response variable in the last column.
Parameters
----------
model : sklearn.linear_model.Ridge
THE Ridge regression model for which SHAP values are to be computed.
dataset : pd.DataFrame
Dataset with feature columns and response variable.
The response variable is assumed to be in the last column of
the DataFrame.
Returns
-------
np.ndarray
Array of SHAP values for each sample in the dataset.
"""
shap_explainer = shap.LinearExplainer(model, dataset[dataset.columns[:-1]])
return shap_explainer.shap_values(dataset[dataset.columns[:-1]])
[docs]def process_columns(
dataset: pd.DataFrame,
train_shap_values: np.ndarray,
scalers: Dict[str, sklearn.preprocessing.MinMaxScaler],
range_max: int,
qualidade: str,
) -> pd.DataFrame:
"""
Process and extract relevant columns based on certain conditions.
This function processes the input dataframe columns based on certain conditions
and then extracts information about them, such as the actual and
normalized values of certain metrics, whether the values are ascending, etc.
It then returns a new dataframe with this extracted information.
Parameters
----------
dataset : pd.DataFrame
Input dataframe to process.
train_shap_values : np.ndarray
SHAP values for each feature in the dataset.
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler]
Dictionary containing the MinMaxScaler for each column in the dataset.
range_max : int
The maximum range value for the data.
qualidade : str
String indicating the quality parameter to process.
Possible values are ``"compressao"``, ``"relacao gran"``, ``"SE PR"``, ``"SE PP"``,
and ``"umidade"``.
Returns
-------
pd.DataFrame
A ``pandas.Dataframe`` containing the extracted information with columns:
- "Range_max"
- "TAG"
- "Valor_Real"
- "Valor_Norm"
- "Ascending"
"""
temp_limits_columns = ["Range_max", "TAG", "Valor_Real", "Valor_Norm", "Ascending"]
temp_limits = pd.DataFrame(columns=temp_limits_columns)
for feature in dataset.columns:
if ( # pylint: disable=too-many-boolean-expressions
("TEMP1_I@08QU-QU-855I-GQ" not in feature and qualidade in "compressao")
or (
"GRAN_OCS_TM@08PE-BD-840I-" not in feature
and qualidade in "relacao gran"
)
or ("corpo_moedor_especifico" not in feature and qualidade in "SE PR")
or ("POTE1_I@08FI-BV-827I-" not in feature and qualidade in "SE PP")
or feature in "POTE1_I@08FI-BV-827I-02M1"
or "ROTA1_I@08FI-FL-827I-" not in feature
and qualidade in "umidade"
):
continue
y_values = train_shap_values[:, dataset.columns.get_loc(feature)]
x_values = dataset[feature].values
y_interp = interp1d(y_values, x_values)
feat = (pd.DataFrame([x_values, y_values]).T
.sort_values(by=[0])
.reset_index(drop=True))
shap_norm_value = operations.normalize_feature(scalers, feature, y_interp(0.0))
temp_limits = pd.concat(
[
temp_limits,
pd.DataFrame({
"Range_max": [range_max],
"TAG": [feature],
"Valor_Real": [y_interp(0.0)],
"Valor_Norm": [shap_norm_value],
"Ascending": [(feat[1][1] < feat[1][len(feat[1]) - 1])],
}),
],
ignore_index=True,
)
return temp_limits
[docs]def apply_shap(
datasets: Dict[str, pd.DataFrame],
models_results: Dict[str, List[Dict[str, ...]]],
scalers: Dict[str, sklearn.preprocessing.MinMaxScaler],
shap_cols: List[str] | None = None,
) -> pd.DataFrame:
"""
Process and apply SHAP (SHapley Additive exPlanations) to the provided datasets.
Given datasets, model results, and scalers, the function computes SHAP values to
interpret the output of machine learning models. It returns a DataFrame with
information regarding feature importance in relation to the target feature.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary containing the data for different models.
Each key corresponds to a model name, and each value is a `pandas.DataFrame`.
models_results : Dict[str, List[Dict[str, ...]]]
Dictionary containing model results for different ridge regression models.
ach key corresponds to a model name, and each value is a list of dictionaries
with the keys: "conf", "model", and "metrics".
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler]
A dictionary of tag's scalers.
shap_cols : List[str] | None, optional
List of column names in datasets to which SHAP is applied.
By default, SHAP is applied to ["compressao", "SE PR", "umidade", "SE PP"].
Returns
-------
pd.DataFrame
DataFrame containing the columns 'Range_max', 'TAG', 'Valor_Real', 'Valor_Norm',
and 'Ascending' that provide information regarding the SHAP values and their
relationship with the target features.
Notes
-----
The function applies SHAP specifically for Linear models and makes use of the
``LinearExplainer`` from the SHAP library. Given a set of models and datasets, it
selects the best model (based on MAPE) for each quality and calculates the SHAP
values. Based on the SHAP values and certain conditions, a DataFrame is returned
with information about the importance of features for different quality categories.
"""
shap_cols = shap_cols or ["compressao", "SE PR", "umidade", "SE PP"]
temp_limits = pd.DataFrame(
columns=["Range_max", "TAG", "Valor_Real", "Valor_Norm", "Ascending"])
for qualidade in shap_cols:
for range_min, range_max in constants.production_range:
prod_pq = False
df_train = datasets[qualidade].copy()
if "PROD_PQ_Y@08US" not in df_train.columns:
df_train.insert(
loc=0,
column="PROD_PQ_Y@08US",
value=datasets["gas"]["PROD_PQ_Y@08US"],
)
prod_pq = True
df_train = preprocess_df_train(df_train).pipe(filter_status_column)
df_train_c = filter_production_range(df_train, range_min, range_max, prod_pq)
if df_train_c.shape[0] <= 1:
logger.warning(
"Not enough data to process SHAP for model: %s", qualidade
)
logger.warning("Skipping it")
continue
best_model_idx = select_best_model(models_results[qualidade])
train_shap_values = compute_shap_values(
models_results[qualidade][best_model_idx]["model"], df_train_c)
temp_limits = pd.concat(
[
temp_limits,
process_columns(df_train_c, train_shap_values, scalers,
range_max, qualidade),
],
ignore_index=True,
)
return temp_limits