"""
Module for training the ridge regression models.
Ths module defines the necessary functions to train the ridge regression
models for each of the models from the pelletizing process.
Usage
-----
To train the ML models, run the following command:
.. code-block:: bash
$ python mltrainer.py
The above command will train the models and save them in the
``outputs/us8/predictive_models`` directory.
Developer Notes
---------------
If you're tasked with refactoring or making any changes to this module,
please read these comments before starting your work.
Scalers
~~~~~~~
One of the first aspects of this module you might be inclined to change
is the scalers being used from `sklearn.preprocessing.MinMaxScaler`
to a more advanced scaler like `sklearn.preprocessing.RobustScaler`.
However, beware that making this change will be more complex than what it seems
at first.
These scalers are created column-wise, and are used later by the module
`otm.py` and on several other occasions.
Making this change will require you to refactor several modules, adapting
them to perform the upscale and downscale operations without
either `data_min_` or `data_range_` attributes that only the
`sklearn.preprocessing.MinMaxScaler` has.
Try counting the number of times that the scalers are
used inside the `modules` subpackage, and the `otm.py` module.
If you do so, you'll probably realize the size of the task it'd be to
change normalization algorithms; that should in normal circumstances be
a simple change to make.
"""
from __future__ import annotations
import warnings
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
import numpy as np
import pandas as pd
from dateutil.relativedelta import relativedelta
from sklearn.impute import KNNImputer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_percentage_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
import wip.modules.ops as operations
from wip.constants import DATASETS_CLEAN_FILEPATH, DATASETS_FILEPATH, DF_SQL_FILEPATH
from wip.constants import DF_SQL_CLEAN_FILEPATH
from wip.constants import FINAL_DATASETS_FILEPATH
from wip.constants import MODELS_COEFICIENTS_FILEPATH
from wip.constants import MODELS_FEATURES_FILEPATH
from wip.constants import MODELS_KPIS_FILEPATH
from wip.constants import MODELS_RESULTS_FILEPATH
from wip.constants import SCALERS_FILEPATH
from wip.datatools.display import print_rich_table
from wip.datatools.io_ops import read_joblib
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import to_joblib
from wip.logging_config import logger
from wip.ml_configs import MAX_N_DAYS
from wip.modules.predictive_module import manual_kfold_validation
from wip.modules.predictive_module import mod_filtros
from wip.temporary import FakeScaler
from wip.temporary import adjust_models_coefficients
from wip.temporary import filter_datasets_df_sql_by_date
from wip.temporary import inverse_transform_models_features
from wip.temporary import replace_ventiladores_tags
from wip.utils import is_running_on_databricks
warnings.filterwarnings('ignore')
[docs]def apply_naive_model(y_values, test_set_date):
"""
Compute metrics for a naive model using the mean of training data.
This function applies a naive model where predictions on the test set are
based on the average of the training data. It then computes and returns the
Mean Squared Error (MSE), Mean Absolute Percentage Error (MAPE), and the
R-squared (R2) score of the predictions against the actual values.
Parameters
----------
y_values : pandas.Series
Time series data where the index is of a datetime type.
test_set_date : datetime-like
The date used to split the data into training and test sets.
Data after this date is considered as a test set and data before this
date is considered as a training set.
Returns
-------
mse : float
Mean Squared Error of the predictions.
mape : float
The Mean Absolute Percentage Error of the predictions.
r2 : float
R-squared score of the predictions.
Notes
-----
This naive model solely relies on the average of the training data for
predictions on the test set. This means the model does not capture any
temporal trends or seasonality present in the time series data.
Examples
--------
>>> y = pd.Series([1, 2, 3, 4, 5], index=pd.date_range('20200101', periods=5))
>>> _test_set_date = '2020-01-04'
>>> apply_naive_model(y, _test_set_date)
(9.0, 0.6, nan)
"""
y_test = y_values[y_values.index > test_set_date]
y_train = y_values[y_values.index < test_set_date]
predicted = [y_train.mean()] * len(y_test)
mse = round(mean_squared_error(y_test, predicted), 3)
mape = round(mean_absolute_percentage_error(y_test, predicted), 3)
r2_result = round(r2_score(y_test, predicted), 3)
return mse, mape, r2_result
[docs]def apply_model_new(
model_name: str,
df_train: pd.DataFrame,
df_target: pd.Series,
self_train: bool = True,
**kwargs,
) -> List[Dict[str, Union[str, float, np.ndarray, pd.Index]]]:
"""Applies a machine learning model to the provided data.
This function applies a model to the data using a pipeline and performs
grid search with cross-validation. The pipeline and parameter grid are
hard-coded in the function. The function returns the best model, metrics,
and other relevant information.
Parameters
----------
model_name : str
Name of the process that represents the pelletizing
stage (e.g., "abrasao", "basicidade", "compressao", "finos", "gas").
df_train : pd.DataFrame
Training data as a pandas' DataFrame.
df_target : pd.Series
Target values as a Pandas' Series.
self_train : bool, optional, default=True
If True, fits the best estimator to the entire training data.
**kwargs : dict
Additional parameters can include:
- `"param_combination"`.
- `"param_validation"`.
- `"param_plotting"`.
Returns
-------
list
A list containing a dictionary with the following keys:
- `"conf"`: Model configuration.
- `"metrics"`: A dictionary containing the metrics (mse, mape, r2, r,
r2_train, r2_train_adj).
- `"model"`: The best estimator's last step.
- `"grid"`: The fitted grid search object.
- `"columns"`: The columns of the input DataFrame.
- `"params"`: None (not used in the current implementation).
- `"indexes"`: Indexes of the folds used during cross-validation.
- `"ys"`: True target values during cross-validation.
- `"yhats"`: Predicted target values during cross-validation.
- `"predicted"`: The predictions made by the best estimator on the
training data (if selfTrain=True).
"""
param_valid = kwargs.get("param_validation")
validation = kwargs.get("validation", manual_kfold_validation)
pipe = Pipeline([("missing", SimpleImputer()), ("regression", Ridge(alpha=0.1))])
# Define the grid search parameters.
# A pipeline with each combination of
# the parameters defined below will be evaluated.
param_grid = [
{
"missing": [
KNNImputer(weights="distance"),
SimpleImputer(keep_empty_features=True),
],
"regression__alpha": [0.001, 0.01, 0.1, 1, 10, 100, 1000],
"regression__fit_intercept": [True, False],
},
]
grid = GridSearchCV(pipe, param_grid=param_grid, scoring="r2", cv=5)
if self_train:
x_train, x_test, y_train, y_test = train_test_split(
df_train, df_target, train_size=0.8, random_state=42
)
indexes = y_test.index
grid.fit(x_train, y_train)
predicted = grid.predict(x_test)
correlation_values = pd.DataFrame(predicted)[0].corr(
pd.DataFrame(y_test.values)[0]
)
train_predicted = grid.predict(x_train)
mape = mean_absolute_percentage_error(y_test, predicted)
mse = mean_squared_error(y_test, predicted)
r2_test = r2_score(y_test, predicted)
r2_train = r2_score(y_train, train_predicted)
num_observations, num_cols = x_test.shape
adjusted_r2 = 1 - (
(1 - r2_test) * (num_observations - 1) / (num_observations - num_cols - 1)
)
else:
# Fit the grid search object to the data
indexes, y_test, predicted, metrics = validation(
grid, df_train, df_target, kwparams=param_valid
)
mse, mape, r2_test, correlation_values, r2_train, adjusted_r2 = metrics
logger.info("%s", f"Model: {model_name!r}")
logger.info("%s", f"Target name: {df_target.name!r}")
logger.info("%s", f"{' Metrics ':=^50}")
logger.info("%s", f"MAPE: {mape:.2f}")
logger.info("%s", f"R2 Test: {r2_test:.2f}")
logger.info("%s", f"R2 Train: {r2_train:.2f}")
logger.info("%s", f"Adjusted R2 Score: {adjusted_r2:.2f}\n")
return [
{
"conf": {
"model": Ridge,
"params": {
"alpha": [1000, 100, 10, 1, 0.1, 0.01, 0.001],
},
},
"metrics": {
"mse": mse,
"mape": mape,
"r2": r2_test,
"r": correlation_values,
"r2_train": r2_train,
"r2_train_adj": adjusted_r2,
},
"model": grid.best_estimator_[-1],
"grid": grid,
"columns": df_train.columns,
"params": grid.best_estimator_[-1].coef_,
"indexes": indexes,
"ys": y_test,
"yhats": predicted,
"predicted": predicted,
"df_train": df_train,
"df_target": df_target,
}
]
[docs]def get_prediction(
datasets: dict,
) -> Tuple[Dict, Dict, Dict, pd.DataFrame, Dict]:
"""Get prediction results for the given dataset type.
Trains and evaluate a model on the given dataset type, and return the
prediction results, scalers, column limits, concatenated NIVE data,
and target data dictionary.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary containing the datasets.
Returns
-------
results : dict
Prediction results for each model group.
scalers : dict
`sklearn.preprocessing.MinMaxScalers` for each column.
limits : dict
Limits for each column in the `datasets` dictionary.
nive_concat : pd.DataFrame
Pandas DataFrame, with concatenated data from columns that start with "NIVE".
df_target_dict : dict
Dictionary containing target data for each model group.
Notes
-----
The script has some hard-coded column names and specific operations that
mightn't be suitable for other use cases.
The script also has some unused code commented out.
It is recommended to remove or review the commented code before using
this function in a production environment.
.. versionadded:: 0.1.0
Added `datasets` to the function's an input parameter.
Before the function referred to the `datasets`
dictionary indirectly, assuming it existed in the global
variables.
This makes its usage less transparent, and can cause problems when the
function is used in a different context.
"""
from wip.datatools.ml_filters import fill_all_missing_columns
from wip.datatools.ml_filters import filter_status
results = {} # Store results for each model group
limits = {} # Store the limits for each column of every dataset
df_target_dict = {}
# Store the columns values that start with "NIVE".
nive_concat = pd.DataFrame()
# The scalers returned by this method consider each column individually.
# It also identifies columns that are repeated on different datasets,
# and combines them before fitting the scaler.
# Therefore, if some column "A" has a range of values equal to [0, 1000] on
# "dataset 1" and a range of [-1000, 500] on "dataset 2",
# the scaler will consider the range to be [-1000, 1000].
scalers, _ = operations.define_real_scalers(datasets)
for qualidade in datasets.keys():
if qualidade.startswith('cm'):
continue
logger.info("%s", f"{f' {qualidade} ':=^50} ")
results[qualidade] = []
df_train = datasets[qualidade].copy()
col_target = df_train.columns[-1]
logger.info("Target name: %s", col_target)
# Aplicando filtros sobre o df_train
df_train = mod_filtros(df_train, qualidade, col_target)
# Passando por todas as colunas do df (exceto a target)
for col in df_train.columns[:-1]:
# Definindo as colunas que serão ignoradas
if (
col in ["status", "ProducaoPQ_Moagem"]
or col.startswith("qtde")
or col.startswith("SOMA FUNC")
):
df_train[col].fillna(df_train[col].mode().iloc[0], inplace=True)
continue
if (
col
in [
"NIVE1_C@08QU-FR-851I-01M1",
"NIVE2_I@08QU-FR-851I-01M1",
"NIVE3_I@08QU-FR-851I-01M1",
"NIVE4_I@08QU-FR-851I-01M1",
"NIVE5_I@08QU-FR-851I-01M1",
"NIVE6_I@08QU-FR-851I-01M1",
]
and col not in scalers
):
nive_concat = pd.concat([nive_concat, df_train[col]], ignore_index=True)
# Criando os scalers dos atributos
if col not in scalers:
scalers[col] = MinMaxScaler()
tmp = df_train[[col]].copy()
if col.startswith("DENS1_C@08HO-BP"):
tmp[col][0] = 0
scalers[col].fit(tmp.astype(float))
# Escalonando os valores
try:
df_train[col] = scalers[col].transform(df_train[[col]])
except Exception as exc: # pylint: disable=broad-except
logger.exception(exc)
logger.error("Error on scaler for column: %s", col)
logger.error(
"Series trying to be rescaled:\n\n%s", df_train[col].head()
)
df_train = (
df_train.pipe(filter_status)
.drop(columns=["ProducaoPQ_Moagem"], errors="ignore")
.replace([np.inf, -np.inf], np.nan)
.dropna(subset=[col_target])
.interpolate("linear", limit_direction="both")
.fillna(0)
.loc[lambda xdf: xdf[xdf.columns[-1]] > 0, :]
.pipe(fill_all_missing_columns)
)
# Limites de cada coluna
for column in df_train.columns:
limits[column] = df_train[column]
# Gerando datasets de treino e target
y_train = df_train[col_target]
df_train = df_train.drop(columns=[col_target])
# Train each model
index = df_train.index.sort_values()
cv_n = 3
cv_size = 0.3
cv_thresholds = [
index[int(len(index) * (1 - i * cv_size / cv_n)) - 1]
for i in range(cv_n + 1)
][::-1]
results[qualidade] = apply_model_new(
qualidade,
df_train,
y_train,
self_train=True,
param_validation={
"cv_thresholds": cv_thresholds,
"qualidade": qualidade,
},
)
df_target_dict[qualidade] = y_train
# These values will be later used to define the optimization problem.
# Moreover, `results`, and `scalers` are the outputs that are most
# used by the optimization problem.
return results, scalers, limits, nive_concat, df_target_dict
[docs]def get_output_filepaths(outputs_folder: str | Path = None) -> Tuple[Path, ...]:
"""
Return a tuple of output file paths.
This function operates in two modes, depending on the value of `outputs_folder`:
1. If `outputs_folder` is `None`, the default file paths are used.
The default file paths are found inside `wip.constants`.
2. If `outputs_folder` is provided, new file paths in that folder are returned.
Function creates the `outputs_folder` directory if it doesn't exist.
Parameters
----------
outputs_folder : str or Path, optional
Path to the folder where the output files will be created.
If not provided (default is `None`), the default file paths
will be returned.
Returns
-------
Tuple[Path, ...]
A tuple of `Path` objects representing the output file paths.
These are the output file paths returned, in order:
- models_results
- scalers
- models_coefficients
- models_features
- datasets
"""
if outputs_folder is None:
return (
MODELS_RESULTS_FILEPATH,
SCALERS_FILEPATH,
MODELS_COEFICIENTS_FILEPATH,
MODELS_FEATURES_FILEPATH,
FINAL_DATASETS_FILEPATH,
)
outputs_folder = Path(outputs_folder)
outputs_folder.mkdir(parents=True, exist_ok=True)
return (
outputs_folder.joinpath("models_results.joblib"),
outputs_folder.joinpath("scalers.joblib"),
outputs_folder.joinpath("models_coeficients.joblib"),
outputs_folder.joinpath("models_features.joblib"),
outputs_folder.joinpath("datasets.joblib"),
)
[docs]def train_ml_models( # pylint: disable=too-many-locals, too-many-statements
datasets_filepath=None,
df_sql_filepath=None,
outputs_folder=None,
):
"""Train machine learning models and save results.
This function trains the machine learning models for each step of the pelletizing
process, using datasets stored in the given joblib files or default file paths.
The function applies preprocessing steps, filters the data, performs model training,
and calculates metrics.
Finally, it saves the results as joblib files in the specified output folder
or default folder.
Parameters
----------
datasets_filepath : str | Path | None
Path to the file containing datasets for training. If not provided
(default is `None`), the default dataset file path from
`wip.constants.DATASETS_FILEPATH` is used.
df_sql_filepath : str or Path, optional
Path to the file containing the original data used for generating
the `datasets` dictionary. If not provided (default is `None`),
the default data file path from
`wip.constants.DF_SQL_FILEPATH` is used.
outputs_folder : str or Path, optional
Path to the folder where model results and outputs will be saved.
If not provided (default is `None`), the default output folder
from `wip.constants` is used.
Notes
-----
- Preprocessing steps applied to clean the data and replace specific values.
- Machine learning models trained using the Ridge regression algorithm.
- Cross-validation performed with `cv_n` folds and `cv_size` as the test-set size.
- Specific filters applied to the datasets based on model names.
.. versionadded:: 0.1.0
Added the option to specify the inputs and outputs filepaths.
.. versionadded:: 2.3.0
Added the `skip_transformations` parameter to allow for skipping
the initial transformations and outliers' removal processes, given that
the files `datasets_after_cleaning.joblib` and `df_sql_after_cleaning.joblib`
exist, and that the code is being executed outside DataBricks.
.. versionchanged:: 2.4.0
- Made changes to the function to allow for running the code inside
DataBricks.
.. versionchanged:: 2.8.4
- Included all binary columns found inside the datasets to the
parameter `skip_columns` passed to the function `auto_clean_datasets`.
- Removed training of the baseline regression models, since they're not
being used by any later processes.
.. versionchanged:: 2.10.0
- Move the data transformation steps to a new function called `clean_data`
inside `wip.datatools.ml_filters`. This function is now called at the end
of "preprocessamento" workflow.
- Add timestamp column to `models_df` DataFrame, containing the KPIs of
the trained models. New versions of the `models_df` DataFrame are
now appended to the existing KPIs CSV file, to allow tracking
of the model's performances through time.
"""
# Filepaths where the "datasets.joblib" and "df_sql.joblib" files are located.
datasets_filepath = datasets_filepath or DATASETS_FILEPATH
df_sql_filepath = df_sql_filepath or DF_SQL_FILEPATH
df_sql = read_joblib(df_sql_filepath)
datasets = read_joblib(datasets_filepath)
datasets = replace_ventiladores_tags(datasets, df_sql)
if MAX_N_DAYS is not None:
datasets, df_sql = filter_datasets_df_sql_by_date(
datasets, df_sql, n_days=MAX_N_DAYS
)
# datasets = clean_data(datasets, df_sql, skip_transformations)
models_results, scalers, limits, nive_concat, _ = get_prediction(datasets)
models_features = limits
temp_dict, models_coeficients = {}, {}
for model, results in models_results.items():
best = results[0]
best_metrics = best["metrics"]
best_mape = round(best_metrics["mape"], 3)
best_r2 = round(best_metrics["r2"], 3)
temp_dict[model] = {"mape": best_mape, "r2": best_r2}
coefficients = dict(zip(best["columns"], best["model"].coef_))
models_coeficients[model] = coefficients
nive_scaler = MinMaxScaler()
nive_scaler.fit(nive_concat)
scalers["nive"] = nive_scaler
models_df = pd.DataFrame(temp_dict).T.rename(columns={0: "mape", 1: "r2"})
models_df = models_df.reset_index().rename({"index": "model"}, axis=1)
models_df["timestamp"] = pd.Timestamp.now()
# == Save Results ==========================================================
(
models_results_filepath,
scalers_filepath,
models_coeficients_filepath,
models_features_filepath,
final_datasets_filepath,
) = get_output_filepaths(outputs_folder)
_models_results = adjust_models_coefficients(models_results.copy(), scalers)
_models_features = inverse_transform_models_features(
models_features.copy(), scalers
)
_scalers = {}
for feature_name, _ in scalers.items():
values = _models_features.get(feature_name)
if values is None:
logger.warning("Values for feature '%s' not found", feature_name)
if feature_name in df_sql.columns:
values = df_sql[feature_name]
else:
continue
fake_scaler = FakeScaler()
fake_scaler = fake_scaler.fit(values.values.reshape(-1, 1))
_scalers[feature_name] = fake_scaler
_models_coeficients = {}
for model_name, results in _models_results.items():
_models_coeficients[model_name] = dict(
zip(
results[0]['grid'].best_estimator_.feature_names_in_,
results[0]['grid'].best_estimator_[1].coef_,
)
)
to_joblib(_models_results, models_results_filepath)
to_joblib(_scalers, scalers_filepath)
to_joblib(_models_coeficients, models_coeficients_filepath)
to_joblib(_models_features, models_features_filepath)
to_joblib(datasets, final_datasets_filepath)
mode = "a"
if is_running_on_databricks():
mode = "w"
to_csv(models_df, MODELS_KPIS_FILEPATH, index=False, mode=mode)
# Print model's KPIs in a rich-text table representation.
models_df = (
models_df.sort_values("r2")
.assign(
r2_view=lambda xdf: xdf["r2"].apply(
lambda value: "░" * int(round((value * 100) / 5, 0))
)
)
.assign(mape=lambda xdf: xdf["mape"].apply(lambda value: f"{value:.2%}"))
.assign(r2=lambda xdf: xdf["r2"].apply(lambda value: f"{value:.2%}"))
.loc[:, ["model", "mape", "r2", "r2_view"]]
)
print_rich_table(
models_df,
index=False,
justify={
"model": "center",
"mape": "center",
"r2": "center",
"r2_view": "left",
},
style={"r2_view": "green"},
)
logger.info("Done!")
if __name__ == "__main__":
train_ml_models()