"""
Solver generic methods
"""
import math
import re
from typing import List
import pandas as pd
import wip.modules.ops as operations
from wip.constants import RESULTADO_OTIMIZADOR_FILENAME
from wip.constants import constants
from wip.constants import define_targets
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import to_excel
from wip.files.depara_pisystem import pims_to_pisystem_dict
from wip.logging_config import logger
from wip.utils import is_running_on_databricks
[docs]def write_constraint(file, constraint, terms, target=False, description=False):
"""Write a set of constraints to a file.
Parameters
----------
file : _io.TextIOWrapper
Filepath to write the constraints to.
constraint : str
Name of the restriction to add to the file.
terms : tuple
Terms from the constraint equation.
target : bool, default=False
Features that are target variables get treated by
`write_descriptive_constraints` function.
description : bool, default=False
Whether to include a description of the constraint to the file.
"""
if description:
target, coef, description = terms
print(f"{constraint}; {target}; {coef}; {description}", file=file)
elif target:
target, new_tag, new_coef = terms
print(f"{target}; {new_tag}; {new_coef}; {constraint}", file=file)
elif constraint:
operators = ["E", "LTE", "LT", "GT", "GTE"]
for index, term in enumerate(terms):
verify_int = isinstance(term, int)
verify_float = isinstance(term, float)
# It isn't the last
verify_last = index != len(terms) - 1
if not isinstance(term, tuple) and math.fabs(float(term)) <= 0.00001:
continue
if (verify_int or verify_float) and verify_last:
print(f"{constraint}; {term:.5f}", file=file)
else:
# the term is a tuple that can be composed by:
# a feature and a coefficient value or,
# an operation and a coefficient value
operation, value = term
sep = "; "
if operation in operators:
sep = " "
print(f"{constraint}; {operation}{sep}{value}", file=file)
else:
feature, value = terms
if math.fabs(float(value)) > 0.00001:
print(f"{feature}; {value}", file=file)
[docs]def adjust_real_cost(real_cost, features, mult_coef=1, div_coef=3):
"""Adjust the real cost of some features"""
for key in real_cost.keys():
if key in features:
real_cost[key] = (real_cost[key] * mult_coef) / div_coef
return real_cost
[docs]def retrieve_best_model(model, models_results, metric="mape"):
"""Sort the models by its metrics"""
return sorted(models_results[model], key=lambda x: x["metrics"][metric])[0]
[docs]def retrieve_model_coeficients(model: str, models_results: dict):
"""Retrieve the ridge regression model coefficients.
Parameters
----------
model : str
Name of the model.
models_results : dict
Dictionary with the models' results.
Returns
-------
zip
Coefficient of each feature used in the model
"""
best_model = retrieve_best_model(model, models_results)
# Coefficient of each feature used in the model
return zip(best_model["columns"], best_model["model"].coef_)
[docs]def write_descriptive_constraints(
file,
model_target,
datasets,
df_detailed,
scalers,
models_coeficients,
features_coeficient,
models_results,
):
"""
Write constraint built from the Ridge Regression model coefficients.
Model target → name of the selected model
Some target constraints have a different treatment when compared to
other features.
Parameters
----------
file : TextIOWrapper
File to write the constraints
model_target : str
Name of the constraint being written.
This is the same name of the Ridge model.
datasets : Dict[str, pd.DataFrame]
Dictionary with the datasets
df_detailed : pd.DataFrame
Table with descriptions for each term of the constraint
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler]
Dictionary with the scalers for each tag (column)
models_coeficients : Dict[str, Dict[str, float]]
Dictionary with the coefficients of each model. The dictionary
should contain the following structure:
.. code-block:: python
{
"model-name": {
"tag": coefficient,
# ...
},
"model-name-2": {
"tag": coefficient,
# ...
},
# ...
}
features_coeficient : zip
List of tuples with the features and their coefficients
models_results : Dict[str, List[Dict[str, Any]]]
Dictionary with the results of each model.
Returns
-------
Dict[str, Dict[str, float]]
Dictionary with the coefficients of each model.
"""
unnormalize_constant = 0
target_column_names = define_targets(datasets)
for tag, coef in features_coeficient:
# If that tag repeats in the dataframe
tag_count = df_detailed.columns.tolist().count(tag)
description = ""
if tag_count > 1:
description = df_detailed[tag].loc["Descrição"].iloc[0]
elif tag_count == 1:
description = df_detailed[tag]["Descrição"]
terms = (tag, coef, description)
if tag in target_column_names:
new_coef, constant = operations.unnormalize_feature(scalers, tag, coef)
new_tag = constants.TARGETS_IN_MODEL[tag]
unnormalize_constant -= constant
# models_coeficients[model_target][tag] = coef
models_coeficients[model_target][tag] = new_coef
terms = (new_tag, new_coef, description)
write_constraint(file, model_target, terms, description=True)
write_simple_constraints(file, model_target, models_results, unnormalize_constant)
return models_coeficients
[docs]def write_simple_constraints(file, model_target, models_results, unnormalize_constant):
# Will return None when the target is rota_disco,
# and when the tag limit is defined as None
limit = (
None
if model_target.startswith("rota_disco_")
else constants.LIMITS.get(model_target)
)
best_conf = retrieve_best_model(model_target, models_results)
if not limit:
# O problema é aqui
if "custo" in model_target:
write_constraint(
file, model_target, [(model_target.replace("custo_", ""), -1)]
)
elif "eq_termica" in model_target:
write_constraint(file, model_target, [("gas", -1)])
else:
write_constraint(file, model_target, [(model_target, -1)])
feat_target = None
if model_target.startswith("rota_disco"):
try:
# Encontra o número do disco para a associação entre a
# tag de funcionamento.
disc_number = re.findall(r"\d+", model_target)[0]
except Exception as exc:
raise ValueError("No disk number provided") from exc
feat_target = f"FUNC1_D@08PE-BD-840I-{int(disc_number):02d}M1"
if feat_target:
write_constraint(
file,
model_target,
[
(
feat_target,
best_conf["model"].intercept_ + unnormalize_constant,
)
],
)
else:
write_constraint(
file,
None,
(
model_target,
best_conf["model"].intercept_ + unnormalize_constant,
),
)
write_constraint(file, model_target, [("E", "0")])
else:
operator, value = constants.LIMITS[model_target]
write_constraint(
file,
None,
(
model_target,
best_conf["model"].intercept_ + unnormalize_constant,
),
)
write_constraint(file, model_target, [(operator, value)])
[docs]def define_range_constraints(
token: str,
range_start: int,
range_end: int,
step: int = 1,
) -> List[str]:
"""
Generate a list of strings by applying a token format over a defined range.
This function receives a formatting `token`, and applies this format to each
number in the range specified by `range_start`, `range_end`, and `step`.
The formatted strings are then returned in a list.
Parameters
----------
token : str
A string with `{}` as a placeholder for the integer to be formatted.
range_start : int
The start of the range to which the `token` is applied.
range_end : int
The end of the range to which the `token` is applied. This value is not
included in the output list.
step : int, optional
The step between consecutive integers in the range, default is 1.
Returns
-------
list
A list of strings obtained by applying the `token`
to each number in the specified range.
Examples
--------
>>> token = "{}_formatted"
>>> define_range_constraints(token, 1, 4)
['1_formatted', '2_formatted', '3_formatted']
>>> token = "prefix_{}_suffix"
>>> define_range_constraints(token, 1, 5, 2)
['prefix_1_suffix', 'prefix_3_suffix']
"""
# considering that in the token will be an
# acting range, that has to be defined
return [token.format(i) for i in range(range_start, range_end, step)]
[docs]def save_solver_results(
solver_path,
df,
resultado_otimizador_filename: str = RESULTADO_OTIMIZADOR_FILENAME,
):
"""
Save optimization results to Azure Container Storage, or to a local filepath.
The default file name that is used to save the optimization results to
Azure Data Lake is:
.. code-block:: python
f"resultado_otimizador-US{US_SUFIX}_{datetime.today().strftime('%Y-%m-%d')}.csv"
::
For example, the file name should be similar to the following:
`"resultado_otimizador-US08_2024-03-21.csv"`
Parameters
----------
solver_path : str
The Azure Data Lake container URL path or a local folder path where the
optimization results will be saved.
df : pd.DataFrame
A `pandas.DataFrame` containing the optimization results for all production
ranges.
resultado_otimizador_filename : str, default=wip.constants.RESULTADO_OTIMIZADOR_FILENAME
The name of the file to use for saving the optimization results.
.. versionchanged:: 2.8.11
Bugfix error that caused optimization results to try to save it as
a ".csv" file using the suffix ".xlsx" on the filename.
This problem was making the DataBricks job "Integração SensUP" to
read the optimization results in an incorrect format and subsequently
fail to finish executing the job.
"""
format_tags = ["minimo", "maximo", "valor normalizado", "valor real"]
for tag in format_tags:
df[tag] = df[tag].apply(custom_format)
columns_order = [
"faixa",
"TAG",
"minimo",
"maximo",
"valor normalizado",
"valor real",
"custo",
]
df['TAG'] = df['TAG'].apply(
lambda value: (
f"'{value}" if isinstance(value, str) and value.startswith('=') else value
)
)
df['TAG'] = df['TAG'].replace(
dict(
zip(constants.TARGETS_IN_MODEL.values(), constants.TARGETS_IN_MODEL.keys())
)
)
df = df.reindex(columns=columns_order)
remove_tags = [
"FUNC1_D@08MI-AM-832I-01M1",
"CONS ESPEC EE VENT - US8",
# "compressao",
# "NIVE4_I@08QU-FR-851I-01M1",
]
df = df.loc[~df["TAG"].isin(remove_tags)]
# Path to the optimization results that are processed by the
# "Integração SensUP" job on DataBricks later on.
path = f"{solver_path}/{resultado_otimizador_filename.replace('.xlsx', '.csv')}"
to_csv(df, path, sep=";", index=False, encoding="iso-8859-1")
df[format_tags] = df[format_tags].apply(pd.to_numeric, errors="ignore")
df.columns = df.columns.str.capitalize()
df["Valor real"] = df["Valor real"].fillna(0)
df["Valor normalizado"] = (
df["Valor normalizado"].astype(str).str.replace("nan", "0").astype(float)
)
if not is_running_on_databricks():
# Save optimization results as an Excel file only when running locally.
df = get_pi_system_tag_names(df)
to_excel(
df,
f"{solver_path}/{resultado_otimizador_filename.replace('.csv', '.xlsx')}",
index=False,
)
operations.replace_string_from_file(path)
[docs]def get_pi_system_tag_names(dataset: pd.DataFrame) -> pd.DataFrame:
"""Get the PI System tag names from the PIMS tag names.
This function takes a DataFrame and returns a copy of it with a new column
containing the PI System tag names.
Parameters
----------
dataset : pd.DataFrame
`pandas.DataFrame` containing the optimization model resultas, with
PIMS tag names as a column named 'Tag'.
Returns
-------
pd.DataFrame
A `pandas.DataFrame` with the PI System tag names in a column named 'Tag PI'.
"""
target_names = {v: k for k, v in constants.TARGETS_IN_MODEL.items()}
calc_names = {'CONS EE PRENSA - US8': 'CONS1_Y@08PR-RP-822I-01'}
dataset['_Tag'] = dataset['Tag']
dataset['_Tag'] = dataset['_Tag'].apply(target_names.get).fillna(dataset['_Tag'])
dataset['_Tag'] = dataset['_Tag'].apply(calc_names.get).fillna(dataset['_Tag'])
dataset['Tag PI'] = dataset['_Tag'].apply(pims_to_pisystem_dict.get)
return dataset.drop(columns=['_Tag'])