Source code for wip.datatools.shap_ops


"""
This Module defines the functions needed to apply SHAP to ML models and datasets.

Functions
---------
This module defines the following functions:

* ``preprocess_df_train``: Preprocess the dataframe.
* ``filter_status_column``: Filter out columns from the dataframe.
* ``filter_production_range``: Filter the dataframe using the production range.
* ``select_best_model``: Select the best ridge regression model based on a metric.
* ``compute_shap_values``: Compute SHAP values for a given Ridge model and dataset.
* ``process_columns``: Process and extract relevant columns based on certain conditions.
* ``apply_shap``: Process and apply SHAP (SHapley Additive exPlanations) to datasets.

The main function called inside ``wip.otm.py`` is ``apply_shap``.

"""
from __future__ import annotations

from typing import Dict
from typing import List

import numpy as np
import pandas as pd
import shap
import sklearn
from scipy.interpolate import interp1d

from wip.constants import constants
from wip.logging_config import logger
from wip.modules import ops as operations


[docs]def preprocess_df_train(df_train: pd.DataFrame) -> pd.DataFrame: """Preprocess the dataframe by filtering and handling missing and infinite values. This function filters out rows where "PROD_PQ_Y@08US" is non-positive, replaces infinite values with NaN, interpolates NaN values using linear interpolation, and then fills any remaining NaN values with 0. Parameters ---------- df_train : pd.DataFrame Input dataframe to be preprocessed. Returns ------- pd.DataFrame Preprocessed dataframe with non-positive "PROD_PQ_Y@08US" rows removed, and missing and infinite values handled. """ df_train = (df_train.replace([np.inf, -np.inf], np.nan).interpolate( "linear", limit_direction="both").fillna(0)) return df_train[(df_train["PROD_PQ_Y@08US"] > 0)]
[docs]def filter_status_column(df_train: pd.DataFrame) -> pd.DataFrame: """ Filter out columns with names containing 'status' or 'ProducaoPQ_Moagem'. This function identifies and drops columns from the input dataframe if the column names contain the substring 'status' or 'ProducaoPQ_Moagem'. Parameters ---------- df_train : pd.DataFrame Input dataframe from which columns need to be filtered out. Returns ------- pd.DataFrame A ``pandas.Dataframe`` without the columns containing the substrings 'status' or 'ProducaoPQ_Moagem'. """ status_column = [ column_name for column_name in df_train.columns if "status" in column_name or "ProducaoPQ_Moagem" in column_name ] if status_column: df_train = df_train.drop(columns=status_column) return df_train
[docs]def filter_production_range( df_train: pd.DataFrame, range_min: int, range_max: int, prod_pq: bool, ) -> pd.DataFrame: """ Filter the dataframe based on a production range and optionally drop a column. This function filters the provided dataframe based on a specified range of production values from the column "PROD_PQ_Y@08US". Additionally, it can drop the "PROD_PQ_Y@08US" column from the resultant dataframe if the ``prod_pq`` parameter is set to True. Parameters ---------- df_train : pd.DataFrame Input dataframe to apply the filter to. range_min : int Minimum value of the production range for filtering. range_max : int Maximum value of the production range for filtering. prod_pq : bool If True, drop the "PROD_PQ_Y@08US" column from the filtered dataframe. Returns ------- pd.DataFrame Filtered dataframe. """ df_train_c = df_train[(df_train["PROD_PQ_Y@08US"] >= range_min) & (df_train["PROD_PQ_Y@08US"] <= range_max)].copy() if prod_pq: df_train_c = df_train_c.drop(columns=["PROD_PQ_Y@08US"]) return df_train_c
[docs]def select_best_model(models_results: list, metric: str = "MAPE") -> int: """ Select the best model based on the specified evaluation metric. This function takes a list of dictionaries containing model results, constructs a DataFrame to collate the performance metrics, and then sorts the models based on the specified metric. The function returns the index of the best model. Parameters ---------- models_results : list A list of dictionaries where each dictionary has keys: - ``"conf"``: Configuration or name of the model. - ``"model"``: Trained model object. - ``"metrics"``: A dictionary of performance metrics that includes: - ``"mse"``: Mean squared error. - ``"mape"``: Median absolute percentage error. - ``"r2"``: R-squared for the test set. - ``"r"``: R-squared for the train set. - ``"r2_train"``: R-squared for the train set. - ``"r2_train_adj"``: Adjusted R-squared metric : str {"MSE", "MAPE", "R2", "R", "R2 Train", "R2 Train Adj"}, default="MAPE" The evaluation metric based on which the models are to be ranked. Possible options are: ``"MSE"``, ``"R2"``, ``"R"``, ``"R2 Train"``, and ``"R2 Train Adj"``. Returns ------- int Index of the best model based on the specified metric. """ df_result = pd.DataFrame( [( best["conf"], str(best["model"].get_params()), best["metrics"]["mse"], best["metrics"]["mape"], best["metrics"]["r2"], best["metrics"]["r"], best["metrics"]["r2_train"], best["metrics"]["r2_train_adj"], ) for best in models_results], columns=[ "Modelo", "Params", "MSE", "MAPE", "R2", "R", "R2 Train", "R2 Train Adj", ], ) return df_result.sort_values(by=[metric]).index[0]
[docs]def compute_shap_values(model: sklearn.linear_model.Ridge, dataset: pd.DataFrame) -> np.ndarray: """Compute SHAP values for a given Ridge model and dataset. This function uses the ``shap.LinearExplainer`` to compute the SHAP values for the provided Ridge regression model and dataset. The dataset is expected to have the response variable in the last column. Parameters ---------- model : sklearn.linear_model.Ridge THE Ridge regression model for which SHAP values are to be computed. dataset : pd.DataFrame Dataset with feature columns and response variable. The response variable is assumed to be in the last column of the DataFrame. Returns ------- np.ndarray Array of SHAP values for each sample in the dataset. """ shap_explainer = shap.LinearExplainer(model, dataset[dataset.columns[:-1]]) return shap_explainer.shap_values(dataset[dataset.columns[:-1]])
[docs]def process_columns( dataset: pd.DataFrame, train_shap_values: np.ndarray, scalers: Dict[str, sklearn.preprocessing.MinMaxScaler], range_max: int, qualidade: str, ) -> pd.DataFrame: """ Process and extract relevant columns based on certain conditions. This function processes the input dataframe columns based on certain conditions and then extracts information about them, such as the actual and normalized values of certain metrics, whether the values are ascending, etc. It then returns a new dataframe with this extracted information. Parameters ---------- dataset : pd.DataFrame Input dataframe to process. train_shap_values : np.ndarray SHAP values for each feature in the dataset. scalers : Dict[str, sklearn.preprocessing.MinMaxScaler] Dictionary containing the MinMaxScaler for each column in the dataset. range_max : int The maximum range value for the data. qualidade : str String indicating the quality parameter to process. Possible values are ``"compressao"``, ``"relacao gran"``, ``"SE PR"``, ``"SE PP"``, and ``"umidade"``. Returns ------- pd.DataFrame A ``pandas.Dataframe`` containing the extracted information with columns: - "Range_max" - "TAG" - "Valor_Real" - "Valor_Norm" - "Ascending" """ temp_limits_columns = ["Range_max", "TAG", "Valor_Real", "Valor_Norm", "Ascending"] temp_limits = pd.DataFrame(columns=temp_limits_columns) for feature in dataset.columns: if ( # pylint: disable=too-many-boolean-expressions ("TEMP1_I@08QU-QU-855I-GQ" not in feature and qualidade in "compressao") or ( "GRAN_OCS_TM@08PE-BD-840I-" not in feature and qualidade in "relacao gran" ) or ("corpo_moedor_especifico" not in feature and qualidade in "SE PR") or ("POTE1_I@08FI-BV-827I-" not in feature and qualidade in "SE PP") or feature in "POTE1_I@08FI-BV-827I-02M1" or "ROTA1_I@08FI-FL-827I-" not in feature and qualidade in "umidade" ): continue y_values = train_shap_values[:, dataset.columns.get_loc(feature)] x_values = dataset[feature].values y_interp = interp1d(y_values, x_values) feat = (pd.DataFrame([x_values, y_values]).T .sort_values(by=[0]) .reset_index(drop=True)) shap_norm_value = operations.normalize_feature(scalers, feature, y_interp(0.0)) temp_limits = pd.concat( [ temp_limits, pd.DataFrame({ "Range_max": [range_max], "TAG": [feature], "Valor_Real": [y_interp(0.0)], "Valor_Norm": [shap_norm_value], "Ascending": [(feat[1][1] < feat[1][len(feat[1]) - 1])], }), ], ignore_index=True, ) return temp_limits
[docs]def apply_shap( datasets: Dict[str, pd.DataFrame], models_results: Dict[str, List[Dict[str, ...]]], scalers: Dict[str, sklearn.preprocessing.MinMaxScaler], shap_cols: List[str] | None = None, ) -> pd.DataFrame: """ Process and apply SHAP (SHapley Additive exPlanations) to the provided datasets. Given datasets, model results, and scalers, the function computes SHAP values to interpret the output of machine learning models. It returns a DataFrame with information regarding feature importance in relation to the target feature. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary containing the data for different models. Each key corresponds to a model name, and each value is a `pandas.DataFrame`. models_results : Dict[str, List[Dict[str, ...]]] Dictionary containing model results for different ridge regression models. ach key corresponds to a model name, and each value is a list of dictionaries with the keys: "conf", "model", and "metrics". scalers : Dict[str, sklearn.preprocessing.MinMaxScaler] A dictionary of tag's scalers. shap_cols : List[str] | None, optional List of column names in datasets to which SHAP is applied. By default, SHAP is applied to ["compressao", "SE PR", "umidade", "SE PP"]. Returns ------- pd.DataFrame DataFrame containing the columns 'Range_max', 'TAG', 'Valor_Real', 'Valor_Norm', and 'Ascending' that provide information regarding the SHAP values and their relationship with the target features. Notes ----- The function applies SHAP specifically for Linear models and makes use of the ``LinearExplainer`` from the SHAP library. Given a set of models and datasets, it selects the best model (based on MAPE) for each quality and calculates the SHAP values. Based on the SHAP values and certain conditions, a DataFrame is returned with information about the importance of features for different quality categories. """ shap_cols = shap_cols or ["compressao", "SE PR", "umidade", "SE PP"] temp_limits = pd.DataFrame( columns=["Range_max", "TAG", "Valor_Real", "Valor_Norm", "Ascending"]) for qualidade in shap_cols: for range_min, range_max in constants.production_range: prod_pq = False df_train = datasets[qualidade].copy() if "PROD_PQ_Y@08US" not in df_train.columns: df_train.insert( loc=0, column="PROD_PQ_Y@08US", value=datasets["gas"]["PROD_PQ_Y@08US"], ) prod_pq = True df_train = preprocess_df_train(df_train).pipe(filter_status_column) df_train_c = filter_production_range(df_train, range_min, range_max, prod_pq) if df_train_c.shape[0] <= 1: logger.warning( "Not enough data to process SHAP for model: %s", qualidade ) logger.warning("Skipping it") continue best_model_idx = select_best_model(models_results[qualidade]) train_shap_values = compute_shap_values( models_results[qualidade][best_model_idx]["model"], df_train_c) temp_limits = pd.concat( [ temp_limits, process_columns(df_train_c, train_shap_values, scalers, range_max, qualidade), ], ignore_index=True, ) return temp_limits