Source code for wip.modules.solver_ops

"""
Solver generic methods
"""

import math
import re
from typing import List

import pandas as pd

import wip.modules.ops as operations
from wip.constants import RESULTADO_OTIMIZADOR_FILENAME
from wip.constants import constants
from wip.constants import define_targets
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import to_excel
from wip.files.depara_pisystem import pims_to_pisystem_dict
from wip.logging_config import logger
from wip.utils import is_running_on_databricks


[docs]def write_constraint(file, constraint, terms, target=False, description=False): """Write a set of constraints to a file. Parameters ---------- file : _io.TextIOWrapper Filepath to write the constraints to. constraint : str Name of the restriction to add to the file. terms : tuple Terms from the constraint equation. target : bool, default=False Features that are target variables get treated by `write_descriptive_constraints` function. description : bool, default=False Whether to include a description of the constraint to the file. """ if description: target, coef, description = terms print(f"{constraint}; {target}; {coef}; {description}", file=file) elif target: target, new_tag, new_coef = terms print(f"{target}; {new_tag}; {new_coef}; {constraint}", file=file) elif constraint: operators = ["E", "LTE", "LT", "GT", "GTE"] for index, term in enumerate(terms): verify_int = isinstance(term, int) verify_float = isinstance(term, float) # It isn't the last verify_last = index != len(terms) - 1 if not isinstance(term, tuple) and math.fabs(float(term)) <= 0.00001: continue if (verify_int or verify_float) and verify_last: print(f"{constraint}; {term:.5f}", file=file) else: # the term is a tuple that can be composed by: # a feature and a coefficient value or, # an operation and a coefficient value operation, value = term sep = "; " if operation in operators: sep = " " print(f"{constraint}; {operation}{sep}{value}", file=file) else: feature, value = terms if math.fabs(float(value)) > 0.00001: print(f"{feature}; {value}", file=file)
[docs]def adjust_real_cost(real_cost, features, mult_coef=1, div_coef=3): """Adjust the real cost of some features""" for key in real_cost.keys(): if key in features: real_cost[key] = (real_cost[key] * mult_coef) / div_coef return real_cost
[docs]def retrieve_best_model(model, models_results, metric="mape"): """Sort the models by its metrics""" return sorted(models_results[model], key=lambda x: x["metrics"][metric])[0]
[docs]def retrieve_model_coeficients(model: str, models_results: dict): """Retrieve the ridge regression model coefficients. Parameters ---------- model : str Name of the model. models_results : dict Dictionary with the models' results. Returns ------- zip Coefficient of each feature used in the model """ best_model = retrieve_best_model(model, models_results) # Coefficient of each feature used in the model return zip(best_model["columns"], best_model["model"].coef_)
[docs]def write_descriptive_constraints( file, model_target, datasets, df_detailed, scalers, models_coeficients, features_coeficient, models_results, ): """ Write constraint built from the Ridge Regression model coefficients. Model target → name of the selected model Some target constraints have a different treatment when compared to other features. Parameters ---------- file : TextIOWrapper File to write the constraints model_target : str Name of the constraint being written. This is the same name of the Ridge model. datasets : Dict[str, pd.DataFrame] Dictionary with the datasets df_detailed : pd.DataFrame Table with descriptions for each term of the constraint scalers : Dict[str, sklearn.preprocessing.MinMaxScaler] Dictionary with the scalers for each tag (column) models_coeficients : Dict[str, Dict[str, float]] Dictionary with the coefficients of each model. The dictionary should contain the following structure: .. code-block:: python { "model-name": { "tag": coefficient, # ... }, "model-name-2": { "tag": coefficient, # ... }, # ... } features_coeficient : zip List of tuples with the features and their coefficients models_results : Dict[str, List[Dict[str, Any]]] Dictionary with the results of each model. Returns ------- Dict[str, Dict[str, float]] Dictionary with the coefficients of each model. """ unnormalize_constant = 0 target_column_names = define_targets(datasets) for tag, coef in features_coeficient: # If that tag repeats in the dataframe tag_count = df_detailed.columns.tolist().count(tag) description = "" if tag_count > 1: description = df_detailed[tag].loc["Descrição"].iloc[0] elif tag_count == 1: description = df_detailed[tag]["Descrição"] terms = (tag, coef, description) if tag in target_column_names: new_coef, constant = operations.unnormalize_feature(scalers, tag, coef) new_tag = constants.TARGETS_IN_MODEL[tag] unnormalize_constant -= constant # models_coeficients[model_target][tag] = coef models_coeficients[model_target][tag] = new_coef terms = (new_tag, new_coef, description) write_constraint(file, model_target, terms, description=True) write_simple_constraints(file, model_target, models_results, unnormalize_constant) return models_coeficients
[docs]def write_simple_constraints(file, model_target, models_results, unnormalize_constant): # Will return None when the target is rota_disco, # and when the tag limit is defined as None limit = ( None if model_target.startswith("rota_disco_") else constants.LIMITS.get(model_target) ) best_conf = retrieve_best_model(model_target, models_results) if not limit: # O problema é aqui if "custo" in model_target: write_constraint( file, model_target, [(model_target.replace("custo_", ""), -1)] ) elif "eq_termica" in model_target: write_constraint(file, model_target, [("gas", -1)]) else: write_constraint(file, model_target, [(model_target, -1)]) feat_target = None if model_target.startswith("rota_disco"): try: # Encontra o número do disco para a associação entre a # tag de funcionamento. disc_number = re.findall(r"\d+", model_target)[0] except Exception as exc: raise ValueError("No disk number provided") from exc feat_target = f"FUNC1_D@08PE-BD-840I-{int(disc_number):02d}M1" if feat_target: write_constraint( file, model_target, [ ( feat_target, best_conf["model"].intercept_ + unnormalize_constant, ) ], ) else: write_constraint( file, None, ( model_target, best_conf["model"].intercept_ + unnormalize_constant, ), ) write_constraint(file, model_target, [("E", "0")]) else: operator, value = constants.LIMITS[model_target] write_constraint( file, None, ( model_target, best_conf["model"].intercept_ + unnormalize_constant, ), ) write_constraint(file, model_target, [(operator, value)])
[docs]def define_range_constraints( token: str, range_start: int, range_end: int, step: int = 1, ) -> List[str]: """ Generate a list of strings by applying a token format over a defined range. This function receives a formatting `token`, and applies this format to each number in the range specified by `range_start`, `range_end`, and `step`. The formatted strings are then returned in a list. Parameters ---------- token : str A string with `{}` as a placeholder for the integer to be formatted. range_start : int The start of the range to which the `token` is applied. range_end : int The end of the range to which the `token` is applied. This value is not included in the output list. step : int, optional The step between consecutive integers in the range, default is 1. Returns ------- list A list of strings obtained by applying the `token` to each number in the specified range. Examples -------- >>> token = "{}_formatted" >>> define_range_constraints(token, 1, 4) ['1_formatted', '2_formatted', '3_formatted'] >>> token = "prefix_{}_suffix" >>> define_range_constraints(token, 1, 5, 2) ['prefix_1_suffix', 'prefix_3_suffix'] """ # considering that in the token will be an # acting range, that has to be defined return [token.format(i) for i in range(range_start, range_end, step)]
[docs]def unnormalize_optimization_tags(scalers: dict, real_cost: dict) -> dict: """ Unnormalize optimization tags using scalers. This function receives scalers and real cost dictionaries, filtering the optimization keys, and unnormalizes the real cost based on the provided scalers. The `real_cost` dictionary is updated for each optimization key by dividing the value of the key by its corresponding data range in `scalers`. Parameters ---------- scalers : dict Dictionary containing the scaler objects, where each key represents a column, and the corresponding value is the scaler object associated with that column. real_cost : dict Dictionary containing real cost data, where each key is a column name, and the corresponding value is the real cost associated with that column. Returns ------- real_cost : dict Updated dictionary containing the unnormalized real cost data. Notes ----- This function assumes that the `scalers` dictionary contains Scaler objects with a `data_range_` attribute. The function does not return a new dictionary object, rather it updates the input `real_cost` dictionary in-place and returns it. The keys in `real_cost` that are in `constants.TARGETS_IN_MODEL.values()` or not in `scalers.keys()` are not considered as optimization keys, and thus are not processed. Examples -------- >>> from sklearn.preprocessing import MinMaxScaler >>> scalers = {'key1': MinMaxScaler(data_range=(1, 10))} >>> real_cost = {'key1': 5, 'key2': 10} >>> unnormalize_optimization_tags(scalers, real_cost) {'key1': 0.5, 'key2': 10} """ # opt_keys = list( # filter( # lambda col: col not in constants.TARGETS_IN_MODEL.values() # and col in scalers.keys(), # real_cost.keys(), # ) # ) # # for key in opt_keys: # data_range = scalers[key].data_range_[0] # real_cost[key] = real_cost[key] / data_range return real_cost
[docs]def custom_format(x): """ Format numeric value as a 5 decimal places string. If :param:`x` is not a numeric value, the function returns the original value unchanged. Parameters ---------- x : Any The value to try to format as a string with 5 decimal places. Returns ------- Any The formatted value or the original value if :param:`x` is not a numeric value. Examples -------- >>> custom_format(1.2345678) '1.23457' >>> custom_format('1.2345678') '1.2345678' >>> custom_format(0) '0.00000' >>> custom_format('some text') 'some text' """ try: x = f"{x:.5f}" except (ValueError, TypeError) as exc: logger.exception(exc) logger.error("Failed to format value: %s", x) return x
[docs]def save_solver_results( solver_path, df, resultado_otimizador_filename: str = RESULTADO_OTIMIZADOR_FILENAME, ): """ Save optimization results to Azure Container Storage, or to a local filepath. The default file name that is used to save the optimization results to Azure Data Lake is: .. code-block:: python f"resultado_otimizador-US{US_SUFIX}_{datetime.today().strftime('%Y-%m-%d')}.csv" :: For example, the file name should be similar to the following: `"resultado_otimizador-US08_2024-03-21.csv"` Parameters ---------- solver_path : str The Azure Data Lake container URL path or a local folder path where the optimization results will be saved. df : pd.DataFrame A `pandas.DataFrame` containing the optimization results for all production ranges. resultado_otimizador_filename : str, default=wip.constants.RESULTADO_OTIMIZADOR_FILENAME The name of the file to use for saving the optimization results. .. versionchanged:: 2.8.11 Bugfix error that caused optimization results to try to save it as a ".csv" file using the suffix ".xlsx" on the filename. This problem was making the DataBricks job "Integração SensUP" to read the optimization results in an incorrect format and subsequently fail to finish executing the job. """ format_tags = ["minimo", "maximo", "valor normalizado", "valor real"] for tag in format_tags: df[tag] = df[tag].apply(custom_format) columns_order = [ "faixa", "TAG", "minimo", "maximo", "valor normalizado", "valor real", "custo", ] df['TAG'] = df['TAG'].apply( lambda value: ( f"'{value}" if isinstance(value, str) and value.startswith('=') else value ) ) df['TAG'] = df['TAG'].replace( dict( zip(constants.TARGETS_IN_MODEL.values(), constants.TARGETS_IN_MODEL.keys()) ) ) df = df.reindex(columns=columns_order) remove_tags = [ "FUNC1_D@08MI-AM-832I-01M1", "CONS ESPEC EE VENT - US8", # "compressao", # "NIVE4_I@08QU-FR-851I-01M1", ] df = df.loc[~df["TAG"].isin(remove_tags)] # Path to the optimization results that are processed by the # "Integração SensUP" job on DataBricks later on. path = f"{solver_path}/{resultado_otimizador_filename.replace('.xlsx', '.csv')}" to_csv(df, path, sep=";", index=False, encoding="iso-8859-1") df[format_tags] = df[format_tags].apply(pd.to_numeric, errors="ignore") df.columns = df.columns.str.capitalize() df["Valor real"] = df["Valor real"].fillna(0) df["Valor normalizado"] = ( df["Valor normalizado"].astype(str).str.replace("nan", "0").astype(float) ) if not is_running_on_databricks(): # Save optimization results as an Excel file only when running locally. df = get_pi_system_tag_names(df) to_excel( df, f"{solver_path}/{resultado_otimizador_filename.replace('.csv', '.xlsx')}", index=False, ) operations.replace_string_from_file(path)
[docs]def get_pi_system_tag_names(dataset: pd.DataFrame) -> pd.DataFrame: """Get the PI System tag names from the PIMS tag names. This function takes a DataFrame and returns a copy of it with a new column containing the PI System tag names. Parameters ---------- dataset : pd.DataFrame `pandas.DataFrame` containing the optimization model resultas, with PIMS tag names as a column named 'Tag'. Returns ------- pd.DataFrame A `pandas.DataFrame` with the PI System tag names in a column named 'Tag PI'. """ target_names = {v: k for k, v in constants.TARGETS_IN_MODEL.items()} calc_names = {'CONS EE PRENSA - US8': 'CONS1_Y@08PR-RP-822I-01'} dataset['_Tag'] = dataset['Tag'] dataset['_Tag'] = dataset['_Tag'].apply(target_names.get).fillna(dataset['_Tag']) dataset['_Tag'] = dataset['_Tag'].apply(calc_names.get).fillna(dataset['_Tag']) dataset['Tag PI'] = dataset['_Tag'].apply(pims_to_pisystem_dict.get) return dataset.drop(columns=['_Tag'])