"""
Create and solve optimization models.
This module is responsible for creating and solving the optimization
problems that try to find the optimal set-points to reduce production costs
related to the pelletizing process.
This module creates one optimization problem for the following
production ranges:
- 700~750
- 750~800
- 800~850
- 850~900
- 900~950
- 950~1000
"""
from __future__ import annotations
import os.path
import pickle
import warnings
from pathlib import Path
from typing import Dict
from typing import List
import numpy as np
import pandas as pd
import pulp
import sklearn
from rich.progress import track
import wip.modules.ops as operations
from wip.constants import DF_SQL_CLEAN_FILEPATH
from wip.constants import FINAL_DATASETS_FILEPATH
from wip.constants import MODELS_COEFICIENTS_FILEPATH
from wip.constants import MODELS_FEATURES_FILEPATH
from wip.constants import MODELS_RESULTS_FILEPATH
from wip.constants import OTM_OUTPUTS_FOLDER_PATH
from wip.constants import SCALERS_FILEPATH
from wip.constants import USE_FLOTICOR
from wip.constants import constants
from wip.constants import critical_cols_dict
from wip.constants import tags_ventiladores
from wip.datatools.io_ops import read_joblib
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import to_excel
from wip.datatools.io_ops import to_json
from wip.datatools.io_ops import to_lp
from wip.datatools.io_ops import to_mps
from wip.datatools.io_ops import to_pickle
from wip.datatools.ml_filters import get_production
from wip.datatools.ml_filters import get_production_pc
from wip.datatools.shap_ops import apply_shap
from wip.files.constant_limits import constant_limits
from wip.files.custo_real import custo_real
from wip.files.df_detailed import df_detailed
from wip.files.fixed_limits import fixed_limits
from wip.files.norm_limits import norm_limits
from wip.files.quantile_limits import quantile_limits
from wip.files.rolling_limits import rolling_limits
from wip.logging_config import logger
from wip.modules import solver_ops as solver_operations
from wip.modules.constraints import Constraints
from wip.modules.limits import Limits
from wip.modules.outputs import LpScaledConstraint
from wip.modules.outputs import define_optimization_results
from wip.modules.outputs import lp_variables_to_excel
from wip.modules.outputs import write_objective_function_coef
from wip.modules.pulp_solver import PulpSolver
from wip.temporary import drop_model_coefficients
from wip.temporary import drop_models_results
from wip.temporary import drop_scalers
from wip.temporary import format_results
from wip.utils import get_dbutils
from wip.utils import is_running_on_databricks
from wip.utils import remove_files
warnings.filterwarnings('ignore')
np.bool = np.bool_
[docs]def read_model_results() -> tuple:
"""
Read machine learning model results from predefined file paths.
This function reads the results of machine learning models, scalers,
model coefficients, model features, and datasets from their respective file paths.
The file paths are predefined as constants in the code.
The function uses the `read_joblib()` function to read the data from the files.
Returns
-------
tuple
A tuple containing five elements:
- `models_results`: The results of the machine learning models.
- `scalers`: Scikit-Learn scaler objects used to normalize or standardize data.
- `models_coeficients`: The coefficients of the machine learning models.
- `models_features`: The features used by the machine learning models.
- `datasets`: The final datasets used to train or test the machine learning models.
- `df_sql`: The tag values as a `pandas.DataFrame`, after applying all filters
and data transformations.
"""
models_results = read_joblib(MODELS_RESULTS_FILEPATH)
scalers = read_joblib(SCALERS_FILEPATH)
models_coeficients = read_joblib(MODELS_COEFICIENTS_FILEPATH)
models_features = read_joblib(MODELS_FEATURES_FILEPATH)
datasets = read_joblib(FINAL_DATASETS_FILEPATH)
df_sql = read_joblib(DF_SQL_CLEAN_FILEPATH)
return (
models_results,
scalers,
models_coeficients,
models_features,
datasets,
df_sql,
)
[docs]def get_limits():
"""Get limits for the optimization problem.
Returns
-------
List[str]
A list of tags to continue.
"""
_continue_limits = ['POTE1_I@08MO-MO-821I-']
return _continue_limits
[docs]def process_critical_cols_dict(
_critical_cols_dict: dict,
_tags_ventiladores: List[str],
scalers: Dict[str, sklearn.preprocessing.MinMaxScaler],
) -> dict:
"""Process the critical column's dictionary.
Parameters
----------
_critical_cols_dict : dict
Dictionary with lower and upper bounds for the optimization problem variables.
The keys of this dictionary are the tag names and the values are dictionaries,
with one of the following structures:
* Option 1: `"<TAG-NAME>": {"lmin": <float or int>, "lmax": <float or int>}`
* Option 2: Specify different variable bounds for each production range:
.. code-block:: python
# ...
"<TAG-NAME>": {
700: {"lmin": <float or int>, "lmax": <float or int>},
750: {"lmin": <float or int>, "lmax": <float or int>},
800: {"lmin": <float or int>, "lmax": <float or int>},
850: {"lmin": <float or int>, "lmax": <float or int>},
900: {"lmin": <float or int>, "lmax": <float or int>},
950: {"lmin": <float or int>, "lmax": <float or int>},
},
# ...
_tags_ventiladores : List[str]
A list of tag names that represent the fan tags ("ventiladores").
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler]
A dictionary of tag's scalers.
Returns
-------
dict
Dictionary with lower and upper bounds for the optimization problem variables.
Notes
-----
The `scalers` dictionary values must contain the attributes:
* `data_min_`: Contains the minimum values for each feature in the dataset
that you've passed to the `fit` method. It is an array with the same number
of entries as the number of features in your data.
* `data_max_`: Contains the maximum values for each feature in the dataset
that you've passed to the fit method. Similarly, it is an array with the
same number of entries as the number of features in your data.
These attributes are only found inside the `sklearn.preprocessing.MinMaxScaler`
class. If a new type of scaler is used, the roles that these attributes have
must be replicated.
"""
# Adjust min and max to historical values on `_tags_ventiladores`
missing_tags = []
for tag, limits in _critical_cols_dict.items():
_tag = constants.INVERSE_TARGETS_IN_MODEL.get(tag, tag)
if tag in _tags_ventiladores:
historical_min = scalers[tag].data_min_[0]
historical_max = scalers[tag].data_max_[0]
limits["lmax"] = min(limits["lmax"], historical_max)
limits["lmin"] = max(limits["lmin"], historical_min)
_critical_cols_dict[tag] = limits
elif _tag not in scalers:
missing_tags.append(tag)
if missing_tags:
logger.info("Missing tags: %s", missing_tags)
for tag in missing_tags:
_critical_cols_dict.pop(tag)
# Retrieve real tag names
_critical_cols_dict = {
(
(
list(constants.TARGETS_IN_MODEL.keys())[
list(constants.TARGETS_IN_MODEL.values()).index(column)
]
)
if column in constants.TARGETS_IN_MODEL.values()
else column
): val
for column, val in _critical_cols_dict.items()
}
return _critical_cols_dict
[docs]def build_restrictions(
models_results: dict,
tmp_path: str,
models_coefficients: dict,
datasets: dict,
scalers: dict,
models_features: dict,
continue_limits: list,
temp_limits: pd.DataFrame,
_critical_cols_dict: dict,
):
"""
Write constraints for optimization models into text files.
This function builds constraints for optimization models by iterating over
a predefined production range and generating constraints for each range.
It also calculates lower and upper limits for each feature and applies
various transformations and checks.
Flow:
1. The function starts by extracting the production data from the datasets.
2. It then iterates over a predefined production range using the track function.
3. For each range, it creates a text file to write the constraints.
4. It loops through the models results and retrieves the feature's coefficients.
5. It applies various checks and transformations to calculate the
lower and upper limits for each feature.
6. It writes the feature constraints into the text file.
7. It writes the simple range terms, simple constraints,
and complex constraints into the text file.
8. It writes the special constraints, variable constraints, and targets
limits into the text file.
Parameters
----------
models_results : dict
A dictionary with ridge regression models results, that are created
by the `wip.mltrainer` module.
tmp_path : str
Directory path, to save the constraint files to.
models_coefficients : dict
Dictionary with the coefficients of each ridge regression model,
that were created by the `wip.mltrainer` module.
datasets : dict
Dictionary with the datasets that were used to train the ridge regression
models. The keys of this dictionary are the names of the datasets and the
values are the datasets themselves.
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler]
Dictionary with the scalers that were used to normalize the ridge data
before training the ridge regression models. The keys of this dictionary
are the names of the datasets and the values are the fitted scalers.
models_features : Dict[str, pd.Series]
Dictionary with the features used to train the ridge regression models.
The keys of this dictionary are the column names that were used during
model training, and the values are `pandas.Series` with the column values.
continue_limits : List[str]
List of tags to include in the optimization problem constraints.
temp_limits : pd.DataFrame
DataFrame with the limits generated by SHAP.
_critical_cols_dict : dict
Dictionary with lower and upper bounds for some of the optimization
problem variables.
Notes
-----
This function relies heavily on several external modules, classes and functions
including `datasets`, `constants`, `track`, `solver_operations`, `Limits`,
`operations`, and `Constraints`. It assumes these are available in the same context
and that the data and methods they provide are in a specific format.
"""
model_name_prod_pq = max(
{
name: df.shape[0] for name, df in datasets.items()
if "PROD_PQ_Y@08US" in df.columns
}.items(),
key=lambda kv: kv[1]
)[0]
model_name_prod_pc = max(
{
name: df.shape[0] for name, df in datasets.items()
if "PROD_PC_I@08US" in df.columns
}.items(),
key=lambda kv: kv[1]
)[0]
production = datasets[model_name_prod_pq]["PROD_PQ_Y@08US"]
production_pc = datasets[model_name_prod_pc]["PROD_PC_I@08US"]
for range_min, range_max in track(
constants.production_range, description="[green]Generating constraints..."
):
filename = f'restricoes-faixa-{range_min}-{range_max}.txt'
dump_path = '/dbfs/tmp/'
path = os.path.join(tmp_path, filename)
if is_running_on_databricks():
path = os.path.join(dump_path, filename)
with open(path, mode="w", encoding="utf-8") as constraint_files:
for model in models_results:
if model != "SE PP":
features_coefficient = solver_operations.retrieve_model_coeficients(
model, models_results
)
descriptive_args = {
"file": constraint_files,
"model_target": model,
"datasets": datasets,
"df_detailed": df_detailed,
"scalers": scalers,
"models_coeficients": models_coefficients,
"features_coeficient": features_coefficient,
"models_results": models_results,
}
models_coefficients = (
solver_operations.write_descriptive_constraints(
**descriptive_args
)
)
production_query = (
(production >= range_min)
& (production <= range_max)
& (production_pc >= range_min)
).copy()
features_limits = {}
for feature in models_features.keys():
new_feature = str()
if feature in constants.TARGETS_IN_MODEL.keys():
new_feature = constants.TARGETS_IN_MODEL[feature]
# cond_one: False | cond_two: False
cond_one = operations.string_in_list(new_feature, continue_limits)
cond_two = operations.string_in_list(feature, continue_limits)
if (
cond_one
or cond_two
or feature.startswith("FUNC")
or feature in constants.TARGETS_IN_MODEL.values()
or feature in datasets.keys()
):
continue
production_query = (
(production >= range_min) & (production <= range_max)
).copy()
models_features[feature] = (
models_features[feature].groupby(level=0).first()
)
production_query = production_query.filter(
models_features[feature].index
)
production_query = production_query.loc[production_query].index
feature_in_prod = models_features[feature][production_query]
if "CALC1_Y@08FI-FD00" in feature:
lmin, lmax = feature_in_prod.quantile(
0.25
), feature_in_prod.quantile(0.9)
elif feature in [
"PESO1_I@08MO-BW-813I-03M1",
"PESO1_I@08MO-BW-813I-04M1",
]:
lmin, lmax = feature_in_prod.quantile(
0.8
), feature_in_prod.quantile(0.99)
elif feature in [
"TEMP1_I@08QU-QU-855I-GQA",
"TEMP1_I@08QU-QU-855I-GQB",
"TEMP1_I@08QU-QU-855I-GQC",
"TEMP1_I@08QU-QU-855I-GQ01",
"TEMP1_I@08QU-QU-855I-GQ02",
]:
lmin = feature_in_prod.quantile(0.4)
lmax = feature_in_prod.quantile(0.9)
elif (
feature in set(temp_limits.TAG.values)
and "POTE1_I@08FI-BV-827I-" not in feature
and "ROTA1_I@08FI-FL-827I-" not in feature
):
lmin = feature_in_prod.mean() - (feature_in_prod.std())
lmax = feature_in_prod.mean() + (feature_in_prod.std())
if range_max in temp_limits["Range_max"].values:
value = temp_limits[
(temp_limits["Range_max"] == range_max)
& (temp_limits["TAG"] == feature)
]
if value.shape[0] == 0:
continue
lmin = value["Valor_Norm"].values[0]
lmax = value["Valor_Norm"].values[0]
if (
feature
not in [
"TEMP1_I@08QU-QU-855I-GQ01",
"TEMP1_I@08QU-QU-855I-GQ03",
"TEMP1_I@08QU-QU-855I-GQ04",
"TEMP1_I@08QU-QU-855I-GQ05",
"TEMP1_I@08QU-QU-855I-GQ15",
"TEMP1_I@08QU-QU-855I-GQ16",
]
and "ROTA1_I@08FI-FL-827I-" not in feature
):
ascending = value["Ascending"].values[0]
if ascending:
lmin = value["Valor_Norm"].values[0]
lmax = (
value["Valor_Norm"].values[0]
+ feature_in_prod.std()
)
else:
lmin = (
value["Valor_Norm"].values[0]
- feature_in_prod.std()
)
lmax = value["Valor_Norm"].values[0]
elif "ROTA1_I@08QU-PF-852I-" in feature:
lmin = feature_in_prod.quantile(0.02)
lmax = feature_in_prod.quantile(0.98)
elif (
feature.startswith("GRAN_OCS")
and not feature.startswith("GRAN_OCS_TM")
and feature != "GRAN_-0,045_PR_L@08FI"
):
lmin = feature_in_prod.quantile(0.25)
lmax = feature_in_prod.quantile(0.75)
elif feature in fixed_limits:
lmin = lmax = models_features[feature][production_query].mean()
elif operations.string_in_list(feature, constant_limits.keys()):
lmin, lmax = Limits.define_constant_limits(feature, constant_limits)
elif feature == "qtde_filtros":
lmin, lmax = 5, 10
elif feature == "floticor":
if USE_FLOTICOR:
production_query = (production >= range_min) & (
production <= range_max
)
lmin, lmax = Limits.define_flotcor_limit(feature, scalers)
else:
lmin, lmax = 0, 0
elif feature == "bentonita":
if USE_FLOTICOR:
lmin, lmax = 0, 0
else:
production_query = (production >= range_min) & (
production <= range_max
)
lmin, lmax = Limits.define_bentonita_limit(
feature, datasets, production_query, scalers
)
elif feature in rolling_limits.keys():
production_query = (production >= range_min) & (
production <= range_max
)
lmin, lmax = Limits.define_limit_by_rolling_mean(
feature, production_query, datasets, rolling_limits
)
elif feature in norm_limits.keys():
production_query = (production >= range_min) & (
production <= range_max
)
lmin, lmax = Limits.define_limit_by_normalization(
scalers, feature, norm_limits
)
if feature in [
"NIVE7_I@08QU-FR-851I-01M1",
"ROTA1_I@08QU-PF-852I-06M1",
]:
lmax = models_features[feature][production_query].max()
elif feature in quantile_limits.keys():
lmin, lmax = Limits.define_limit_by_quantile(
feature, models_features, production_query, quantile_limits
)
if feature == "rotacaoPeneiraAvg":
lmax = models_features[feature][production_query].max()
elif "Consumo de Energia (base minério úmido) kWh/ton" in feature:
lmin = 0
lmax = models_features[feature][production_query].max()
else:
if isinstance(production_query, pd.DatetimeIndex):
filtered_feature = models_features[feature].loc[
lambda spd: spd.index.isin(production_query)
]
else:
filtered_feature = models_features[feature].loc[
lambda spd: spd.index.isin(
production_query[production_query].index
)
]
lmin = filtered_feature.min()
lmax = filtered_feature.max()
# Check whether any of the limits is missing, and if so,
# replaces them with the historical minimum and maximum values.
if pd.isna([lmin, lmax]).any():
filtered_feature = models_features[feature].loc[
lambda spd: spd.index.isin(
production_query[production_query].index
)
]
lmin = lmin or filtered_feature.min()
lmax = lmax or filtered_feature.max()
lmin, lmax = float(lmin), float(lmax)
features_limits[feature] = {"lmin": lmin, "lmax": lmax}
cond_one = feature in constants.TARGETS_IN_MODEL.keys()
if (
cond_one
and constants.TARGETS_IN_MODEL[feature] in models_features.keys()
):
lmin = operations.unnormalize_feature(
scalers, feature, lmin, "one_operation"
)
lmax = operations.unnormalize_feature(
scalers, feature, lmax, "one_operation"
)
if feature in _critical_cols_dict:
if feature.startswith("TEMP1_I@08QU-QU"):
# corte = 800 if range_max > 750 else 750
corte = range_min
lmin = operations.normalize_feature(
scalers,
feature,
_critical_cols_dict[feature][corte]["lmin"],
)
lmax = operations.normalize_feature(
scalers,
feature,
_critical_cols_dict[feature][corte]["lmax"],
)
elif feature.startswith("PRES") or feature in tags_ventiladores:
feature_limits = _critical_cols_dict[feature]
if "lmin" not in feature_limits.keys():
feature_limits = feature_limits[range_min]
lmin = operations.normalize_feature(
scalers, feature, feature_limits["lmin"]
)
lmax = operations.normalize_feature(
scalers, feature, feature_limits["lmax"]
)
else:
feature_limits = _critical_cols_dict[feature]
if "lmin" not in feature_limits.keys():
feature_limits = feature_limits[range_min]
lmin = feature_limits["lmin"]
lmax = feature_limits.get("lmax")
feature = constants.TARGETS_IN_MODEL.get(feature, feature)
Constraints.write_feature_constraints(
feature, constraint_files, lmin, lmax
)
continue
if (
feature not in constants.TARGETS_IN_MODEL.keys()
or "Calculo" in feature
):
Constraints.write_feature_constraints(
feature, constraint_files, lmin, lmax
)
if (
feature in constants.TARGETS_IN_MODEL.keys()
or "rota_disco_" in feature
):
if feature in _critical_cols_dict:
lmin = _critical_cols_dict[feature]["lmin"]
lmax = _critical_cols_dict[feature]["lmax"]
new_lmin, new_lmax, new_feature = operations.scaling_target_values(
feature, scalers, lmin, lmax
)
if new_feature in ["energia_moinho"]:
new_lmin = 0
Constraints.write_feature_constraints(
new_feature, constraint_files, new_lmin, new_lmax
)
# Termina gravação dos modelos, inicia definição manual de restrições.
# `write_simple_range_terms` utiliza o arquivo `range_constraints`
Constraints.write_simple_range_terms(
constraint_files, scalers, features_limits
)
Constraints.write_simple_constraints(constraint_files)
Constraints.parse_range_complex_constraints(constraint_files, scalers)
# Special constraints are related to specific adjustments,
# defined at `seven_plant` scripts.
# production_query = (production >= range_min) & (production <= range_max)
Constraints.write_special_constraints(constraint_files, scalers)
Constraints.write_complex_constraints(constraint_files, scalers)
Constraints.write_variable_constraints(
constraint_files, features_limits, scalers, range_min, range_max
)
Constraints.write_targets_limits(
constraint_files, datasets, features_limits
)
if is_running_on_databricks():
if tmp_path.startswith(r"abfss:/") and not tmp_path.startswith(r"abfss://"):
tmp_path = tmp_path.replace(r"abfss:/", r"abfss://")
final_path = os.path.join(tmp_path, filename)
dbutils = get_dbutils()
dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split("/", 2)[-1], filename), final_path
)
operations.replace_string_from_file(tmp_path, range_min, range_max)
[docs]def save_otm_results(
solver: PulpSolver,
scalers: Dict[str, sklearn.preprocessing.MinMaxScaler],
datasets: Dict[str, pd.DataFrame],
tmp_path: str | Path,
):
"""
Save the optimization results of linear programming models.
This function saves the results of linear programming optimization using PuLP
in various formats (lp, mps, xlsx, csv, pickle). If the optimization problem
is infeasible or unbounded, it logs warnings and skips outputs generation
for that problem.
Parameters
----------
solver : PulpSolver
The solver object that contains the results of linear programming optimization.
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler]
Dictionary containing the MinMaxScaler for each column in the dataset.
datasets : Dict[str, pd.DataFrame]
Dictionary containing the input datasets for the optimization problems.
tmp_path : str | Path
The path to the temporary directory where the results should be saved.
Notes
-----
This function has dependencies on certain functions and objects that are
not defined in the provided code, such as `is_running_on_databricks`,
`LpScaledConstraint`, `logger`, `lp_variables_to_excel`,
and `define_optimization_results`.
"""
save_models_path = os.path.join(tmp_path, 'lp-debug')
if not is_running_on_databricks():
Path(save_models_path).mkdir(exist_ok=True, parents=True)
to_pickle(solver, f'{save_models_path}/probs.pickle')
ranges_to_drop = []
for problem_name, problem in solver.get_probs().items():
try:
csv_filepath = f'{save_models_path}/Variables - {problem_name}.csv'
excel_filepath = f'{save_models_path}/Variables - {problem_name}.xlsx'
constraints_filepath = (
f'{save_models_path}/Constraints - {problem_name}.xlsx'
)
problem_instance_filepath = f'{save_models_path}/{problem_name}.pickle'
# to_lp(problem, f'{save_models_path}/{problem_name}.lp')
to_mps(problem, f'{save_models_path}/{problem_name}.mps')
remove_files(*os.path.split(csv_filepath), verbose=True)
remove_files(*os.path.split(excel_filepath), verbose=True)
remove_files(*os.path.split(constraints_filepath), verbose=True)
remove_files(*os.path.split(problem_instance_filepath), verbose=True)
if problem.status != 1:
logger.error(
"Problem %s status: %s", problem_name, pulp.LpStatus[problem.status]
)
logger.error("Skipping outputs generation")
# Adding range to the list of problems to drop from results
ranges_to_drop.append(problem_name)
continue
lp_scaled_constraints = LpScaledConstraint(problem, scalers, datasets)
lp_scaled_constraints_df = lp_scaled_constraints.create_constraints()
if not is_running_on_databricks():
to_excel(lp_scaled_constraints_df, constraints_filepath)
lp_variables_to_excel(problem, excel_filepath)
with open(problem_instance_filepath, 'wb') as fh:
pickle.dump(problem, fh)
else:
to_csv(lp_scaled_constraints_df, constraints_filepath)
lp_variables_to_excel(problem, csv_filepath, format='csv')
to_pickle(problem, problem_instance_filepath)
except Exception as exc: # pylint: disable=broad-except
logger.exception(exc)
for problem_name in ranges_to_drop:
logger.error("Dropping problem instance: %s", problem_name)
solver.probs.pop(problem_name)
solver.export_results()
define_optimization_results(tmp_path, scalers, datasets, solver)
if not is_running_on_databricks():
format_results()
[docs]def main_otm():
"""Define and solve the optimization problems for each production range."""
logger.info("Step 1/10 - Loading data")
# Power plant number.
us_suffix = '08'
# Folder name where the results will be saved.
solver_path = f'us{int(us_suffix):01d}'
# Reading the ML model results files.
(
models_results,
scalers,
models_coefficients,
models_features,
datasets,
df_sql,
) = read_model_results()
logger.info("Step 2/10 - Defining constants")
# Define filepath where results and temporary files are to be saved.
tmp_path = OTM_OUTPUTS_FOLDER_PATH
# Removing old output files if they exist.
remove_files(OTM_OUTPUTS_FOLDER_PATH, "**/*.lp", True)
remove_files(OTM_OUTPUTS_FOLDER_PATH, "**/*.mps", True)
remove_files(OTM_OUTPUTS_FOLDER_PATH, "lp-debug/*.pickle", True)
remove_files(OTM_OUTPUTS_FOLDER_PATH, "restricoes-faixa-*-*.txt", True)
remove_files(OTM_OUTPUTS_FOLDER_PATH, "Variables - VarX_*-*.csv", True)
remove_files(OTM_OUTPUTS_FOLDER_PATH, "**/Variables - *-*.xlsx", True)
remove_files(OTM_OUTPUTS_FOLDER_PATH, "**/Constraints - *-*.xlsx", True)
# Creating outputs directory if they don't already exist.
if not is_running_on_databricks():
Path(tmp_path).mkdir(parents=True, exist_ok=True)
Path(tmp_path).joinpath("lp-debug").mkdir(parents=True, exist_ok=True)
Path(tmp_path).joinpath("lpfiles").mkdir(parents=True, exist_ok=True)
logger.info("Step 3/10 - Reading limits")
continue_limits = get_limits()
logger.info("Step 4/10 - Applying SHAP")
# TESTE (2024-04-04): Remoção limites dos grupos de queima criados pelo SHAP
temp_cols = [f"TEMP1_I@08QU-QU-855I-GQ{idx:02d}" for idx in range(4, 17)]
temp_limits = apply_shap(datasets, models_results, scalers)
temp_limits = temp_limits.loc[~temp_limits["TAG"].isin(temp_cols), :]
logger.info("Step 5/10 - Defining hard-coded Limits")
_critical_cols_dict = process_critical_cols_dict(
critical_cols_dict, tags_ventiladores, scalers
)
scalers = drop_scalers(scalers)
logger.info("Step 6/10 - Writing objective function")
to_json(custo_real, f'{tmp_path}/custo_real.json')
write_objective_function_coef(tmp_path, scalers)
logger.info("Step 8/10 - Building restrictions")
models_results = drop_models_results(models_results)
models_coefficients = drop_model_coefficients(models_coefficients)
build_restrictions(
models_results,
tmp_path,
models_coefficients,
datasets,
scalers,
models_features,
continue_limits,
temp_limits,
_critical_cols_dict,
)
logger.info("Step 9/10 - Building and solving optimization problem")
solver = PulpSolver(tmp_path, os.path.join(tmp_path, 'custo_real.json'), 'cbc')
solver.solve_range(
tmp_path=tmp_path,
scalers=scalers,
datasets=datasets,
df_sql=df_sql,
use_floticor=USE_FLOTICOR,
)
logger.info("Step 10/10 - Saving model results")
save_otm_results(solver, scalers, datasets, tmp_path)
if __name__ == "__main__":
main_otm()