Source code for wip.mltrainer

"""
Module for training the ridge regression models.

Ths module defines the necessary functions to train the ridge regression
models for each of the models from the pelletizing process.

Usage
-----
To train the ML models, run the following command:

.. code-block:: bash

    $ python mltrainer.py

The above command will train the models and save them in the
``outputs/us8/predictive_models`` directory.

Developer Notes
---------------
If you're tasked with refactoring or making any changes to this module,
please read these comments before starting your work.

Scalers
~~~~~~~
One of the first aspects of this module you might be inclined to change
is the scalers being used from `sklearn.preprocessing.MinMaxScaler`
to a more advanced scaler like `sklearn.preprocessing.RobustScaler`.
However, beware that making this change will be more complex than what it seems
at first.

These scalers are created column-wise, and are used later by the module
`otm.py` and on several other occasions.
Making this change will require you to refactor several modules, adapting
them to perform the upscale and downscale operations without
either `data_min_` or `data_range_` attributes that only the
`sklearn.preprocessing.MinMaxScaler` has.

Try counting the number of times that the scalers are
used inside the `modules` subpackage, and the `otm.py` module.
If you do so, you'll probably realize the size of the task it'd be to
change normalization algorithms; that should in normal circumstances be
a simple change to make.
"""

from __future__ import annotations

import warnings
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union

import numpy as np
import pandas as pd
from dateutil.relativedelta import relativedelta
from sklearn.impute import KNNImputer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_percentage_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler

import wip.modules.ops as operations
from wip.constants import DATASETS_CLEAN_FILEPATH, DATASETS_FILEPATH, DF_SQL_FILEPATH
from wip.constants import DF_SQL_CLEAN_FILEPATH
from wip.constants import FINAL_DATASETS_FILEPATH
from wip.constants import MODELS_COEFICIENTS_FILEPATH
from wip.constants import MODELS_FEATURES_FILEPATH
from wip.constants import MODELS_KPIS_FILEPATH
from wip.constants import MODELS_RESULTS_FILEPATH
from wip.constants import SCALERS_FILEPATH
from wip.datatools.display import print_rich_table
from wip.datatools.io_ops import read_joblib
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import to_joblib
from wip.logging_config import logger
from wip.ml_configs import MAX_N_DAYS
from wip.modules.predictive_module import manual_kfold_validation
from wip.modules.predictive_module import mod_filtros
from wip.temporary import FakeScaler
from wip.temporary import adjust_models_coefficients
from wip.temporary import filter_datasets_df_sql_by_date
from wip.temporary import inverse_transform_models_features
from wip.temporary import replace_ventiladores_tags
from wip.utils import is_running_on_databricks


warnings.filterwarnings('ignore')


[docs]def apply_naive_model(y_values, test_set_date): """ Compute metrics for a naive model using the mean of training data. This function applies a naive model where predictions on the test set are based on the average of the training data. It then computes and returns the Mean Squared Error (MSE), Mean Absolute Percentage Error (MAPE), and the R-squared (R2) score of the predictions against the actual values. Parameters ---------- y_values : pandas.Series Time series data where the index is of a datetime type. test_set_date : datetime-like The date used to split the data into training and test sets. Data after this date is considered as a test set and data before this date is considered as a training set. Returns ------- mse : float Mean Squared Error of the predictions. mape : float The Mean Absolute Percentage Error of the predictions. r2 : float R-squared score of the predictions. Notes ----- This naive model solely relies on the average of the training data for predictions on the test set. This means the model does not capture any temporal trends or seasonality present in the time series data. Examples -------- >>> y = pd.Series([1, 2, 3, 4, 5], index=pd.date_range('20200101', periods=5)) >>> _test_set_date = '2020-01-04' >>> apply_naive_model(y, _test_set_date) (9.0, 0.6, nan) """ y_test = y_values[y_values.index > test_set_date] y_train = y_values[y_values.index < test_set_date] predicted = [y_train.mean()] * len(y_test) mse = round(mean_squared_error(y_test, predicted), 3) mape = round(mean_absolute_percentage_error(y_test, predicted), 3) r2_result = round(r2_score(y_test, predicted), 3) return mse, mape, r2_result
[docs]def apply_model_new( model_name: str, df_train: pd.DataFrame, df_target: pd.Series, self_train: bool = True, **kwargs, ) -> List[Dict[str, Union[str, float, np.ndarray, pd.Index]]]: """Applies a machine learning model to the provided data. This function applies a model to the data using a pipeline and performs grid search with cross-validation. The pipeline and parameter grid are hard-coded in the function. The function returns the best model, metrics, and other relevant information. Parameters ---------- model_name : str Name of the process that represents the pelletizing stage (e.g., "abrasao", "basicidade", "compressao", "finos", "gas"). df_train : pd.DataFrame Training data as a pandas' DataFrame. df_target : pd.Series Target values as a Pandas' Series. self_train : bool, optional, default=True If True, fits the best estimator to the entire training data. **kwargs : dict Additional parameters can include: - `"param_combination"`. - `"param_validation"`. - `"param_plotting"`. Returns ------- list A list containing a dictionary with the following keys: - `"conf"`: Model configuration. - `"metrics"`: A dictionary containing the metrics (mse, mape, r2, r, r2_train, r2_train_adj). - `"model"`: The best estimator's last step. - `"grid"`: The fitted grid search object. - `"columns"`: The columns of the input DataFrame. - `"params"`: None (not used in the current implementation). - `"indexes"`: Indexes of the folds used during cross-validation. - `"ys"`: True target values during cross-validation. - `"yhats"`: Predicted target values during cross-validation. - `"predicted"`: The predictions made by the best estimator on the training data (if selfTrain=True). """ param_valid = kwargs.get("param_validation") validation = kwargs.get("validation", manual_kfold_validation) pipe = Pipeline([("missing", SimpleImputer()), ("regression", Ridge(alpha=0.1))]) # Define the grid search parameters. # A pipeline with each combination of # the parameters defined below will be evaluated. param_grid = [ { "missing": [ KNNImputer(weights="distance"), SimpleImputer(keep_empty_features=True), ], "regression__alpha": [0.001, 0.01, 0.1, 1, 10, 100, 1000], "regression__fit_intercept": [True, False], }, ] grid = GridSearchCV(pipe, param_grid=param_grid, scoring="r2", cv=5) if self_train: x_train, x_test, y_train, y_test = train_test_split( df_train, df_target, train_size=0.8, random_state=42 ) indexes = y_test.index grid.fit(x_train, y_train) predicted = grid.predict(x_test) correlation_values = pd.DataFrame(predicted)[0].corr( pd.DataFrame(y_test.values)[0] ) train_predicted = grid.predict(x_train) mape = mean_absolute_percentage_error(y_test, predicted) mse = mean_squared_error(y_test, predicted) r2_test = r2_score(y_test, predicted) r2_train = r2_score(y_train, train_predicted) num_observations, num_cols = x_test.shape adjusted_r2 = 1 - ( (1 - r2_test) * (num_observations - 1) / (num_observations - num_cols - 1) ) else: # Fit the grid search object to the data indexes, y_test, predicted, metrics = validation( grid, df_train, df_target, kwparams=param_valid ) mse, mape, r2_test, correlation_values, r2_train, adjusted_r2 = metrics logger.info("%s", f"Model: {model_name!r}") logger.info("%s", f"Target name: {df_target.name!r}") logger.info("%s", f"{' Metrics ':=^50}") logger.info("%s", f"MAPE: {mape:.2f}") logger.info("%s", f"R2 Test: {r2_test:.2f}") logger.info("%s", f"R2 Train: {r2_train:.2f}") logger.info("%s", f"Adjusted R2 Score: {adjusted_r2:.2f}\n") return [ { "conf": { "model": Ridge, "params": { "alpha": [1000, 100, 10, 1, 0.1, 0.01, 0.001], }, }, "metrics": { "mse": mse, "mape": mape, "r2": r2_test, "r": correlation_values, "r2_train": r2_train, "r2_train_adj": adjusted_r2, }, "model": grid.best_estimator_[-1], "grid": grid, "columns": df_train.columns, "params": grid.best_estimator_[-1].coef_, "indexes": indexes, "ys": y_test, "yhats": predicted, "predicted": predicted, "df_train": df_train, "df_target": df_target, } ]
[docs]def get_prediction( datasets: dict, ) -> Tuple[Dict, Dict, Dict, pd.DataFrame, Dict]: """Get prediction results for the given dataset type. Trains and evaluate a model on the given dataset type, and return the prediction results, scalers, column limits, concatenated NIVE data, and target data dictionary. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary containing the datasets. Returns ------- results : dict Prediction results for each model group. scalers : dict `sklearn.preprocessing.MinMaxScalers` for each column. limits : dict Limits for each column in the `datasets` dictionary. nive_concat : pd.DataFrame Pandas DataFrame, with concatenated data from columns that start with "NIVE". df_target_dict : dict Dictionary containing target data for each model group. Notes ----- The script has some hard-coded column names and specific operations that mightn't be suitable for other use cases. The script also has some unused code commented out. It is recommended to remove or review the commented code before using this function in a production environment. .. versionadded:: 0.1.0 Added `datasets` to the function's an input parameter. Before the function referred to the `datasets` dictionary indirectly, assuming it existed in the global variables. This makes its usage less transparent, and can cause problems when the function is used in a different context. """ from wip.datatools.ml_filters import fill_all_missing_columns from wip.datatools.ml_filters import filter_status results = {} # Store results for each model group limits = {} # Store the limits for each column of every dataset df_target_dict = {} # Store the columns values that start with "NIVE". nive_concat = pd.DataFrame() # The scalers returned by this method consider each column individually. # It also identifies columns that are repeated on different datasets, # and combines them before fitting the scaler. # Therefore, if some column "A" has a range of values equal to [0, 1000] on # "dataset 1" and a range of [-1000, 500] on "dataset 2", # the scaler will consider the range to be [-1000, 1000]. scalers, _ = operations.define_real_scalers(datasets) for qualidade in datasets.keys(): if qualidade.startswith('cm'): continue logger.info("%s", f"{f' {qualidade} ':=^50} ") results[qualidade] = [] df_train = datasets[qualidade].copy() col_target = df_train.columns[-1] logger.info("Target name: %s", col_target) # Aplicando filtros sobre o df_train df_train = mod_filtros(df_train, qualidade, col_target) # Passando por todas as colunas do df (exceto a target) for col in df_train.columns[:-1]: # Definindo as colunas que serão ignoradas if ( col in ["status", "ProducaoPQ_Moagem"] or col.startswith("qtde") or col.startswith("SOMA FUNC") ): df_train[col].fillna(df_train[col].mode().iloc[0], inplace=True) continue if ( col in [ "NIVE1_C@08QU-FR-851I-01M1", "NIVE2_I@08QU-FR-851I-01M1", "NIVE3_I@08QU-FR-851I-01M1", "NIVE4_I@08QU-FR-851I-01M1", "NIVE5_I@08QU-FR-851I-01M1", "NIVE6_I@08QU-FR-851I-01M1", ] and col not in scalers ): nive_concat = pd.concat([nive_concat, df_train[col]], ignore_index=True) # Criando os scalers dos atributos if col not in scalers: scalers[col] = MinMaxScaler() tmp = df_train[[col]].copy() if col.startswith("DENS1_C@08HO-BP"): tmp[col][0] = 0 scalers[col].fit(tmp.astype(float)) # Escalonando os valores try: df_train[col] = scalers[col].transform(df_train[[col]]) except Exception as exc: # pylint: disable=broad-except logger.exception(exc) logger.error("Error on scaler for column: %s", col) logger.error( "Series trying to be rescaled:\n\n%s", df_train[col].head() ) df_train = ( df_train.pipe(filter_status) .drop(columns=["ProducaoPQ_Moagem"], errors="ignore") .replace([np.inf, -np.inf], np.nan) .dropna(subset=[col_target]) .interpolate("linear", limit_direction="both") .fillna(0) .loc[lambda xdf: xdf[xdf.columns[-1]] > 0, :] .pipe(fill_all_missing_columns) ) # Limites de cada coluna for column in df_train.columns: limits[column] = df_train[column] # Gerando datasets de treino e target y_train = df_train[col_target] df_train = df_train.drop(columns=[col_target]) # Train each model index = df_train.index.sort_values() cv_n = 3 cv_size = 0.3 cv_thresholds = [ index[int(len(index) * (1 - i * cv_size / cv_n)) - 1] for i in range(cv_n + 1) ][::-1] results[qualidade] = apply_model_new( qualidade, df_train, y_train, self_train=True, param_validation={ "cv_thresholds": cv_thresholds, "qualidade": qualidade, }, ) df_target_dict[qualidade] = y_train # These values will be later used to define the optimization problem. # Moreover, `results`, and `scalers` are the outputs that are most # used by the optimization problem. return results, scalers, limits, nive_concat, df_target_dict
[docs]def get_output_filepaths(outputs_folder: str | Path = None) -> Tuple[Path, ...]: """ Return a tuple of output file paths. This function operates in two modes, depending on the value of `outputs_folder`: 1. If `outputs_folder` is `None`, the default file paths are used. The default file paths are found inside `wip.constants`. 2. If `outputs_folder` is provided, new file paths in that folder are returned. Function creates the `outputs_folder` directory if it doesn't exist. Parameters ---------- outputs_folder : str or Path, optional Path to the folder where the output files will be created. If not provided (default is `None`), the default file paths will be returned. Returns ------- Tuple[Path, ...] A tuple of `Path` objects representing the output file paths. These are the output file paths returned, in order: - models_results - scalers - models_coefficients - models_features - datasets """ if outputs_folder is None: return ( MODELS_RESULTS_FILEPATH, SCALERS_FILEPATH, MODELS_COEFICIENTS_FILEPATH, MODELS_FEATURES_FILEPATH, FINAL_DATASETS_FILEPATH, ) outputs_folder = Path(outputs_folder) outputs_folder.mkdir(parents=True, exist_ok=True) return ( outputs_folder.joinpath("models_results.joblib"), outputs_folder.joinpath("scalers.joblib"), outputs_folder.joinpath("models_coeficients.joblib"), outputs_folder.joinpath("models_features.joblib"), outputs_folder.joinpath("datasets.joblib"), )
[docs]def train_ml_models( # pylint: disable=too-many-locals, too-many-statements datasets_filepath=None, df_sql_filepath=None, outputs_folder=None, ): """Train machine learning models and save results. This function trains the machine learning models for each step of the pelletizing process, using datasets stored in the given joblib files or default file paths. The function applies preprocessing steps, filters the data, performs model training, and calculates metrics. Finally, it saves the results as joblib files in the specified output folder or default folder. Parameters ---------- datasets_filepath : str | Path | None Path to the file containing datasets for training. If not provided (default is `None`), the default dataset file path from `wip.constants.DATASETS_FILEPATH` is used. df_sql_filepath : str or Path, optional Path to the file containing the original data used for generating the `datasets` dictionary. If not provided (default is `None`), the default data file path from `wip.constants.DF_SQL_FILEPATH` is used. outputs_folder : str or Path, optional Path to the folder where model results and outputs will be saved. If not provided (default is `None`), the default output folder from `wip.constants` is used. Notes ----- - Preprocessing steps applied to clean the data and replace specific values. - Machine learning models trained using the Ridge regression algorithm. - Cross-validation performed with `cv_n` folds and `cv_size` as the test-set size. - Specific filters applied to the datasets based on model names. .. versionadded:: 0.1.0 Added the option to specify the inputs and outputs filepaths. .. versionadded:: 2.3.0 Added the `skip_transformations` parameter to allow for skipping the initial transformations and outliers' removal processes, given that the files `datasets_after_cleaning.joblib` and `df_sql_after_cleaning.joblib` exist, and that the code is being executed outside DataBricks. .. versionchanged:: 2.4.0 - Made changes to the function to allow for running the code inside DataBricks. .. versionchanged:: 2.8.4 - Included all binary columns found inside the datasets to the parameter `skip_columns` passed to the function `auto_clean_datasets`. - Removed training of the baseline regression models, since they're not being used by any later processes. .. versionchanged:: 2.10.0 - Move the data transformation steps to a new function called `clean_data` inside `wip.datatools.ml_filters`. This function is now called at the end of "preprocessamento" workflow. - Add timestamp column to `models_df` DataFrame, containing the KPIs of the trained models. New versions of the `models_df` DataFrame are now appended to the existing KPIs CSV file, to allow tracking of the model's performances through time. """ # Filepaths where the "datasets.joblib" and "df_sql.joblib" files are located. datasets_filepath = datasets_filepath or DATASETS_FILEPATH df_sql_filepath = df_sql_filepath or DF_SQL_FILEPATH df_sql = read_joblib(df_sql_filepath) datasets = read_joblib(datasets_filepath) datasets = replace_ventiladores_tags(datasets, df_sql) if MAX_N_DAYS is not None: datasets, df_sql = filter_datasets_df_sql_by_date( datasets, df_sql, n_days=MAX_N_DAYS ) # datasets = clean_data(datasets, df_sql, skip_transformations) models_results, scalers, limits, nive_concat, _ = get_prediction(datasets) models_features = limits temp_dict, models_coeficients = {}, {} for model, results in models_results.items(): best = results[0] best_metrics = best["metrics"] best_mape = round(best_metrics["mape"], 3) best_r2 = round(best_metrics["r2"], 3) temp_dict[model] = {"mape": best_mape, "r2": best_r2} coefficients = dict(zip(best["columns"], best["model"].coef_)) models_coeficients[model] = coefficients nive_scaler = MinMaxScaler() nive_scaler.fit(nive_concat) scalers["nive"] = nive_scaler models_df = pd.DataFrame(temp_dict).T.rename(columns={0: "mape", 1: "r2"}) models_df = models_df.reset_index().rename({"index": "model"}, axis=1) models_df["timestamp"] = pd.Timestamp.now() # == Save Results ========================================================== ( models_results_filepath, scalers_filepath, models_coeficients_filepath, models_features_filepath, final_datasets_filepath, ) = get_output_filepaths(outputs_folder) _models_results = adjust_models_coefficients(models_results.copy(), scalers) _models_features = inverse_transform_models_features( models_features.copy(), scalers ) _scalers = {} for feature_name, _ in scalers.items(): values = _models_features.get(feature_name) if values is None: logger.warning("Values for feature '%s' not found", feature_name) if feature_name in df_sql.columns: values = df_sql[feature_name] else: continue fake_scaler = FakeScaler() fake_scaler = fake_scaler.fit(values.values.reshape(-1, 1)) _scalers[feature_name] = fake_scaler _models_coeficients = {} for model_name, results in _models_results.items(): _models_coeficients[model_name] = dict( zip( results[0]['grid'].best_estimator_.feature_names_in_, results[0]['grid'].best_estimator_[1].coef_, ) ) to_joblib(_models_results, models_results_filepath) to_joblib(_scalers, scalers_filepath) to_joblib(_models_coeficients, models_coeficients_filepath) to_joblib(_models_features, models_features_filepath) to_joblib(datasets, final_datasets_filepath) mode = "a" if is_running_on_databricks(): mode = "w" to_csv(models_df, MODELS_KPIS_FILEPATH, index=False, mode=mode) # Print model's KPIs in a rich-text table representation. models_df = ( models_df.sort_values("r2") .assign( r2_view=lambda xdf: xdf["r2"].apply( lambda value: "░" * int(round((value * 100) / 5, 0)) ) ) .assign(mape=lambda xdf: xdf["mape"].apply(lambda value: f"{value:.2%}")) .assign(r2=lambda xdf: xdf["r2"].apply(lambda value: f"{value:.2%}")) .loc[:, ["model", "mape", "r2", "r2_view"]] ) print_rich_table( models_df, index=False, justify={ "model": "center", "mape": "center", "r2": "center", "r2_view": "left", }, style={"r2_view": "green"}, ) logger.info("Done!")
if __name__ == "__main__": train_ml_models()