Source code for wip.modules.outputs

"""
Write output files.
"""

import difflib
import os
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple

import numpy as np
import pandas as pd
import pulp
from scipy import stats as s
from sklearn.preprocessing import MinMaxScaler

import wip.modules.ops as operations
from wip.constants import constants
from wip.datatools.io_ops import read_csv
from wip.datatools.io_ops import read_json
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import to_excel
from wip.files.depara_pisystem import pims_to_pisystem_dict
from wip.logging_config import log
from wip.modules import solver_ops
from wip.utils import exists


[docs]def write_objective_function_coef(solver_path, scalers): real_cost = read_json(f"{solver_path}/custo_real.json") tokens = solver_ops.define_range_constraints("energia_moinho", 1, 2) real_cost = solver_ops.adjust_real_cost(real_cost, tokens) tokens = solver_ops.define_range_constraints("corpo_moedor_especifico_{}", 1, 4) real_cost = solver_ops.adjust_real_cost(real_cost, tokens, 1, 1) # opt_costs = solver_ops.scale_optimization_tags(scalers, real_cost) obj_coefs = pd.DataFrame.from_dict(real_cost, orient='index', columns=['Custo']) obj_coefs.index.name = "TAG" obj_coefs = obj_coefs.loc[constants.OBJ_FUNC_COEF] to_csv( obj_coefs, os.path.join(solver_path, "costs.csv"), decimal=',', sep=';', index=True, ) return obj_coefs
[docs]def rot_pot_filtragem(solver_path, scalers, datasets): custos_reais = ( read_csv( os.path.join(solver_path, "costs.csv"), decimal=",", sep=";", index_col="TAG", ) .to_dict() .get("Custo") ) # custos_reais = solver_ops.unnormalize_optimization_tags( # scalers, custos_reais) results_otm = [] inv_tag_2_var = {v: k for k, v in constants.TARGETS_IN_MODEL.items()} columns = [ "faixa", "TAG", "minimo", "maximo", "valor normalizado", "valor real", "custo", ] df_output = pd.DataFrame(columns=columns) rot_func = { "ROTA1_I@08FI-FL-827I-01M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-02M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-03M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-04M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-05RM1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-06M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-07M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-08M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-09M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "ROTA1_I@08FI-FL-827I-10RM1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, } pot_func = { "POTE1_I@08FI-BV-827I-01M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-02M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-03M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-04M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-05RM1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-06M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-07M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-08M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-09M1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, "POTE1_I@08FI-BV-827I-10RM1": { "750": [], "800": [], "850": [], "900": [], "950": [], "1000": [], }, } # INICIO: pos-processamento para garantir que o ROTA1_I@08FI-FL-827I seja igual ao FUNC * max_rot for fmin, fmax in constants.production_range: f = f"Variables - VarX_{fmin}-{fmax}.csv" if not exists(os.path.join(solver_path, f)): continue df = read_csv(os.path.join(solver_path, f), sep=";", encoding="iso-8859-1") for line in df.iterrows(): var_csv = line[1][" VariableName"] var = var_csv coef_real = coef = line[1][" Value"] obj_coef_readl = obj_coef = line[1][" ObjCoeff"] if "ROTA1_I@08FI-FL-827I" not in var and "POTE1_I@08FI-BV-827I" not in var: continue scaler = scalers.get(var) if scaler is not None: coef_real = scaler.inverse_transform([[coef]])[0][0] if "ROTA1_I@08FI-FL-827I" in var: rot_func[var][str(fmax)] = coef_real if "POTE1_I@08FI-BV-827I" in var: pot_func[var][str(fmax)] = coef_real return rot_func, pot_func
[docs]def increasing_temp_gq(solver_path, scalers, datasets): custos_reais = ( read_csv( os.path.join(solver_path, "costs.csv"), decimal=',', sep=';', index_col='TAG', ) .to_dict() .get('Custo') ) # custos_reais = solver_ops.unnormalize_optimization_tags( # scalers, custos_reais) results_otm = [] inv_tag_2_var = {v: k for k, v in constants.TARGETS_IN_MODEL.items()} columns = [ 'faixa', 'TAG', 'minimo', 'maximo', 'valor normalizado', 'valor real', 'custo', ] df_output = pd.DataFrame(columns=columns) rot_func = { 'TEMP1_I@08QU-QU-855I-GQ03': [], 'TEMP1_I@08QU-QU-855I-GQ04': [], 'TEMP1_I@08QU-QU-855I-GQ05': [], 'TEMP1_I@08QU-QU-855I-GQ06': [], 'TEMP1_I@08QU-QU-855I-GQ07': [], 'TEMP1_I@08QU-QU-855I-GQ08': [], 'TEMP1_I@08QU-QU-855I-GQ09': [], 'TEMP1_I@08QU-QU-855I-GQ10': [], 'TEMP1_I@08QU-QU-855I-GQ11': [], 'TEMP1_I@08QU-QU-855I-GQ12': [], 'TEMP1_I@08QU-QU-855I-GQ13': [], 'TEMP1_I@08QU-QU-855I-GQ14': [], 'TEMP1_I@08QU-QU-855I-GQ15': [], 'TEMP1_I@08QU-QU-855I-GQ16': [], } # INICIO: pos-processamento para garantir que o ROTA1_I@08FI-FL-827I seja igual ao FUNC * max_rot for fmin, fmax in constants.production_range: f = f'Variables - VarX_{fmin}-{fmax}.csv' if not exists(os.path.join(solver_path, f)): continue df = read_csv(os.path.join(solver_path, f), sep=';', encoding='iso-8859-1') for line in df.loc[df[' VariableName'].isin(list(rot_func.keys())), :].sort_values(' VariableName').iterrows(): var_csv = line[1][' VariableName'] var = var_csv coef_real = coef = line[1][' Value'] obj_coef_readl = obj_coef = line[1][' ObjCoeff'] if ( var in [ 'TEMP1_I@08QU-QU-855I-GQ01', 'TEMP1_I@08QU-QU-855I-GQ02', 'TEMP1_I@08QU-QU-855I-GQA', 'TEMP1_I@08QU-QU-855I-GQB', 'TEMP1_I@08QU-QU-855I-GQC', ] or 'TEMP1_I@08QU-QU-855I-GQ' not in var ): continue obj_coef_real = operations.unnormalize_feature( scalers, var, obj_coef, operation='one feature' ) coef_real = operations.unnormalize_feature( scalers, var, coef, operation='one feature' ) if 'TEMP1_I@08QU-QU-855I-GQ' in var: rot_func[var].append(coef_real) # FIM: pos-processamento para garantir que o ROTA1_I@08FI-FL-827I seja igual ao FUNC * max_rot return rot_func
[docs]def define_optimization_results( solver_path: str, scalers: Dict[str, MinMaxScaler], datasets: Dict[str, pd.DataFrame], solver, ): custos_reais = ( read_csv( os.path.join(solver_path, "costs.csv"), decimal=",", sep=";", index_col="TAG", ) .to_dict() .get("Custo") ) # custos_reais = solver_ops.unnormalize_optimization_tags(scalers, custos_reais) results_otm = [] inv_tag_2_var = {v: k for k, v in constants.TARGETS_IN_MODEL.items()} columns = [ "faixa", "TAG", "minimo", "maximo", "valor normalizado", "valor real", "custo", ] df_output = pd.DataFrame(columns=columns) # dens, gran_ocs, rota_fi = tags_equality(solver_path, scalers, datasets) temp_gq = increasing_temp_gq(solver_path, scalers, datasets) # temp_gq = {key: sorted(value) for key, value in temp_gq.items()} production_range = constants.production_range prod_range = {str(lmax): idx for idx, (_, lmax) in enumerate(production_range)} for fmin, fmax in production_range: try: # rota_fi[str(fmax)] = np.array(rota_fi[str(fmax)]) results_otm = [] filename = f"Variables - VarX_{fmin}-{fmax}.csv" optimization_results_fpath = os.path.join(solver_path, filename) if not exists(optimization_results_fpath): log.warning( "No results found for production range %s-%s. " "Filepath: %s. Skipping it.", fmin, fmax, f"{optimization_results_fpath!r}", ) continue df_otimizacao = read_csv( optimization_results_fpath, sep=";", encoding="iso-8859-1" ) custo_final = 0 tag_to_fix = ["SE PP", "SUP_SE_PP_L@08PR", "SUP_SE_PR_L@08FI"] tag_integer = [ "SOMA FUNC FILTROS", "qtde_discos", "qtde_moinhos", "qtde_filtros", ] for line in df_otimizacao.iterrows(): var_csv = line[1][" VariableName"] var = var_csv coef_real = coef = line[1][" Value"] obj_coef_readl = obj_coef = line[1][" ObjCoeff"] lb = line[1][" LB"] ub = line[1][" UB"] svar = var if var in [ "NIVE1_C@08QU-FR-851I-01M1", "NIVE2_I@08QU-FR-851I-01M1", "NIVE3_I@08QU-FR-851I-01M1", "NIVE4_I@08QU-FR-851I-01M1", "NIVE5_I@08QU-FR-851I-01M1", "NIVE6_I@08QU-FR-851I-01M1", ]: svar = "nive" if svar in scalers: if ( svar not in constants.TARGETS_IN_MODEL.keys() and svar not in constants.TARGETS_IN_MODEL.values() and svar not in tag_to_fix and svar not in tag_integer ): if not svar.startswith("FUNC"): coef_real = operations.unnormalize_feature( scalers, svar, coef, operation="one feature" ) obj_coef_real = operations.unnormalize_feature( scalers, svar, obj_coef, operation="one feature" ) ub = operations.unnormalize_feature( scalers, svar, ub, operation="one feature" ) lb = operations.unnormalize_feature( scalers, svar, lb, operation="one feature" ) if np.isnan(coef_real): log.info('%s, " contains NaN ", %s', svar, coef_real) coef_real = 0 elif ( svar in constants.TARGETS_IN_MODEL.values() or svar in constants.TARGETS_IN_MODEL or svar in tag_to_fix or svar in tag_integer ): try: coef = operations.normalize_feature( scalers, inv_tag_2_var.get(svar), coef_real ) except Exception as exc: # pylint: disable=broad-except # log.exception(exc) # log.error("Could not normalize feature %s", svar) if svar in [ "Calculo da Energia da Filtragem", "SUP_SE_PP_L@08PR", "SUP_SE_PR_L@08FI", "COMP_MCOMP_PQ_L@08QU", ]: ub = operations.unnormalize_feature( scalers, svar, ub, operation="one feature" ) lb = operations.unnormalize_feature( scalers, svar, lb, operation="one feature" ) coef = coef_real coef_real = operations.unnormalize_feature( scalers, svar, coef_real, operation="one feature" ) else: coef = operations.normalize_feature( scalers, svar, coef_real ) custo_final += coef_real * custos_reais.get(var, 0) if "DENS1_C@08HO-BP-826I-" in svar: coef_real = scalers[svar].inverse_transform([[coef_real]])[0][0] # coef_real = dens if "GRAN_OCS_TM@08PE-BD-840I" in svar: coef_real = scalers[svar].inverse_transform([[coef_real]])[0][0] # coef_real = gran_ocs[int(svar[-2:]) - 1] # if "ROTA1_I@08FI-FL-827I-" in svar: # rot = rota_fi[str(fmax)].nonzero() # coef_real = rota_fi[str(fmax)][rot].mean() * rot_func[svar][str(fmax)] # coef_real = scalers[svar].inverse_transform([[coef_real]])[0][0] # if "POTE1_I@08FI-BV-827I-" in svar: # coef_real *= pot_func[svar][str(fmax)] if ( "TEMP1_I@08QU-QU-855I-GQ" in svar and svar not in [ "TEMP1_I@08QU-QU-855I-GQ01", "TEMP1_I@08QU-QU-855I-GQA", "TEMP1_I@08QU-QU-855I-GQB", "TEMP1_I@08QU-QU-855I-GQC", ] and svar in temp_gq ): coef_real = temp_gq[svar][prod_range[str(fmax)]] results_otm.append( [ f"{fmin}-{fmax}", var, lb, ub, coef, coef_real, (custos_reais.get(var, 0)), ] ) df = pd.DataFrame(results_otm, columns=columns) df_output = pd.concat([df_output, df]) log.info( "Faixa de produção: %s-%s - Custo: R$ %s", fmin, fmax, f"{custo_final:.2f}", ) except Exception as exc: # pylint: disable=broad-except log.exception(exc) log.error("Failed to save results for production range %s-%s", fmin, fmax) try: solver_ops.save_solver_results(solver_path, df_output) except ValueError as exc: log.exception(exc) log.error("Could not save post-otm results.") return df_output
class LpScaledConstraint: """ Class representing a scaled linear programming constraint. This class is used to manage a set of constraints in a linear programming problem, with methods for creating, updating and working with these constraints. Attributes ---------- prob : pulp.LpProblem The linear programming problem to which constraints are added. scalers : Dict[str, MinMaxScaler] Dictionary mapping column names to MinMaxScaler instances. datasets : Dict[str, pd.DataFrame] Dictionary mapping column names to corresponding pandas DataFrames. Examples -------- After instantiating the `LpScaledConstraint` class, you can generate a pandas `DataFrame` with the unscaled constraints, using the method `create_constraints`: >>> # noinspection PyUnresolvedReferences >>> lp_scaled_constraint = LpScaledConstraint(prob, scalers, datasets) >>> lp_scaled_constraint_df = lp_scaled_constraint.create_constraints() See Also -------- pulp.LpProblem : A class representing a linear programming problem. sklearn.preprocessing.MinMaxScaler : Class used for scaling features. pandas.DataFrame : Class representing a two-dimensional table of data. .. versionchanged:: 0.2.0 * Fix the unscaling process that was being executed for all constraints, instead of only being executed for the ones that needed to be unscaled. """ @property def model_names(self) -> List[str]: """List of model names, with " " replaced by "_". Returns ------- List[str] A list of model names, with any spaces replaced by underscores. For example, "Model 1" becomes "Model_1". """ return [name.replace(' ', '_') for name in self.datasets.keys()] @property def target_columns(self) -> Dict[str, str]: """ Dictionary mapping LP variable names to each model target name. Variables that share the same name as the regression models represent that model's target variable. This property returns the mapping between model names and target column names. Returns ------- Dict[str, str] A dictionary mapping original model names to their corresponding target names. """ return { key.replace(' ', '_'): df.columns[-1] for key, df in self.datasets.items() } def __init__( self, prob: pulp.LpProblem, scalers: Dict[str, MinMaxScaler], datasets: Dict[str, pd.DataFrame], ): """ Initialize the `LpScaledConstraint` with a problem, scalers, and datasets. The `scalers` and `datasets` are needed to unscale some of the constraints, that are created using normalized values. Parameters ---------- prob : pulp.LpProblem The linear programming problem to which constraints are added. scalers : Dict[str, MinMaxScaler] Dictionary mapping column names to `MinMaxScaler` instances. datasets : Dict[str, pd.DataFrame] Dictionary mapping column names to corresponding pandas `DataFrames`. """ self.prob = prob self.scalers = scalers self.datasets = datasets def create_constraints(self) -> pd.DataFrame: """Create a DataFrame of constraints in the linear programming problem. Returns ------- pd.DataFrame A `pandas.DataFrame` containing the constraints of the linear programming problem. Each row represents a constraint with its name, value, and equation. """ constraints = [] for cname, c in self.prob.constraints.items(): unscale = cname in self.model_names value, eq = self.create_single_constraint(c, unscale=unscale) constraints.append([cname, value, eq]) return pd.DataFrame(constraints, columns=['Name', 'Value', 'Equation']) def update_variables( self, coeff: float, xvar: pulp.LpVariable ) -> Tuple[float, float, float]: """Update the coefficient and variable value of a constraint. Parameters ---------- coeff : float The coefficient of the variable in the constraint. xvar : pulp.LpVariable The variable in the constraint. Returns ------- Tuple[float, float, float] A tuple containing the updated coefficient, variable value, and the subtracted intercept. """ name = replace_symbols(xvar.name) name = self.target_columns.get(name, name) matches = difflib.get_close_matches(name, self.scalers.keys(), n=1) xval = xvar.varValue intercept_subtractor = 0 if matches: coeff, intercept_subtractor = operations.unnormalize_feature( self.scalers, matches[0], coeff ) xval = self.scalers[matches[0]].inverse_transform([[xval]])[0][0] return coeff, xval, intercept_subtractor def create_single_constraint( self, c: pulp.LpConstraint, unscale: bool = False ) -> Tuple[float, str]: """ Create a single constraint from a linear programming problem. Parameters ---------- c : pulp.LpConstraint The constraint from the linear programming problem. unscale : bool, default=False Whether to unscale the constraint coefficients and variable values. Returns ------- Tuple[float, str] A tuple containing the result of the constraint equation and the equation itself as a string. """ res = 0 intercept = c.constant eq = "" for lp_coeff, lp_var in zip(c.values(), c): _lp_var = lp_var.varValue if unscale: lp_coeff, _lp_var, intercept_subtractor = self.update_variables( lp_coeff, lp_var ) intercept -= intercept_subtractor eq += f'{lp_coeff} * {lp_var.name} + ' res += _lp_var * lp_coeff eq += f'{intercept} {pulp.LpConstraintSenses[c.sense]} 0' res += intercept return res, eq.replace('-', '- ').replace('+ -', '-')
[docs]def replace_symbols(name: str) -> str: """Replace certain words in a string with mathematical symbols. This function takes a string and replaces occurrences of the words 'equal', 'div', and 'mult' with the mathematical symbols '=', '/', and '*', respectively. Parameters ---------- name : str Input string which may contain the words 'equal', 'div', and 'mult' that need to be replaced with their corresponding mathematical symbols. Returns ------- str Output string after replacing the words with the corresponding mathematical symbols. Examples -------- >>> replace_symbols("x mult y div z equal 10") 'x * y / z = 10' """ return name.replace('equal', '=').replace('div', '/').replace('mult', '*')
[docs]def lp_variables_to_excel( lp_problem: pulp.LpProblem, output_path: str, format: str = "xlsx" ): """Save the LP variables to an Excel file.""" lp_results = [] for name, lp_var in lp_problem.variablesDict().items(): lb = lp_var.lowBound ub = lp_var.upBound value = lp_var.varValue cat = lp_var.cat pisystem_name = get_pisystem_tag_name(name) lp_results.append([name, value, lb, ub, cat, pisystem_name]) results = pd.DataFrame( lp_results, columns=[ "Nome Variável", "Valor", "Limite Inferior", "Limite Superior", "Tipo de Variável", "PI System Tag Name", ], ) filename = Path(output_path).name output_path = output_path.replace( str(filename), str(Path(filename).with_suffix(f".{format}")) ) if format == "xlsx": to_excel(results, output_path, index=False) elif format == "csv": to_csv(results, output_path, index=False)
[docs]def get_pisystem_tag_name(pims_tag_name: str) -> str: """Get the PI System tag name from the PIMS tag name. This function starts by cleaning the `pims_tag_name` string. The cleaning process performs the following operations: 1. If the `pims_tag_name` has the "@" symbol, then the string is split into two parts: the first part is the prefix and the second part is the rest of the string. Then all '_' are replaced with '-' for the second part of the string. Finally, the prefix and the second part of the string are rejoined together with the '@' symbol between them. 2. All '___' are replaced with ' - '. 3. All '__' are replaced with '_'. 4. Function removes leading '_'. After the cleaning process, the function checks if the `pims_tag_name` is in the `pims_to_pisystem_dict` dictionary. If it's, then the corresponding PI System tag name is returned. If it's not, then the function tries to find the key in the `pims_to_pisystem_dict` dictionary that's the most similar to the `pims_tag_name`. If a similar key is found, then the corresponding PI System tag name is returned. If a similar key isn't found, then the function logs an error message and returns the cleaned tag name. Parameters ---------- pims_tag_name : str Tag name in the PIMS format. Returns ------- str PI System tag name or clean tag name if the PI System tag name isn't found in the `pims_to_pisystem_dict` dictionary. Notes ----- This function is intended to be used to convert the variables' names post-optimization, to include them in the final results file. """ _pims_tag_name = pims_tag_name # Underscores ('_') after the '@' symbol are actually dashes ('-'). # PuLP doesn't allow the use of '-' inside variable names, therefore, # during the optimization process, all '-' were replaced with '_'. # We only replace the underscores with '-' on the part of the name # after the '@' symbol, because the prefix of the name actually uses '_' # as a separator, and therefore, we don't want to replace those underscores. if '@' in pims_tag_name: prefix, rest = pims_tag_name.split('@', 1) rest = rest.replace('_', '-') pims_tag_name = f'{prefix}@{rest}' # Function `replace_symbols` replaces math symbols # converted to words backs to their original symbols. # For example, this function transforms a name like 'equal192divVELO' # back to '=192/VELO'. # This is necessary because, due to a PuLP limitation on the characters # that can be used in variable names, these names were converted from math # symbols to text, to conform to the PuLP restrictions. # As this function is mainly intended to be used to generate the optimization # results file, it's necessary to include it here. pims_tag_name = replace_symbols(pims_tag_name).replace('___', ' - ') while '__' in pims_tag_name: pims_tag_name = pims_tag_name.replace('__', '_') while pims_tag_name.startswith('_'): pims_tag_name = pims_tag_name[1:] if pims_tag_name in pims_to_pisystem_dict.keys(): return pims_to_pisystem_dict[pims_tag_name] if _pims_tag_name in pims_to_pisystem_dict.keys(): return pims_to_pisystem_dict[_pims_tag_name] pims_real_name = difflib.get_close_matches( pims_tag_name, pims_to_pisystem_dict.keys(), n=1, cutoff=0.6 ) if pims_real_name: return pims_to_pisystem_dict[pims_real_name[0]] return pims_tag_name