Source code for wip.modules.pulp_solver

"""
Initialize the optimization problem instance and solve it.

This module initializes the optimization problem instance and solves it using
the `PulpSolver` class.

"""

from __future__ import annotations

import os
import re
import unicodedata
from pathlib import Path
from typing import Dict

import pandas as pd
import pulp

from wip.constants import MULTI_OBJECTIVE, SCALERS_FILEPATH
from wip.constants import constants
from wip.datatools.io_ops import read_csv
from wip.datatools.io_ops import read_joblib
from wip.datatools.io_ops import read_json
from wip.datatools.io_ops import read_text
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import write_lp
from wip.files.lp_denorm_constraints import create_denorm_constraints
from wip.files.lp_denorm_constraints import denormalize_lpvar
from wip.files.lp_denorm_constraints import fan_consumption_constraint
from wip.files.tags_relationships import add_relationship_cfix_antracito
from wip.logging_config import logger
from wip.model_diagnostics import find_iis
from wip.modules.constraints import add_energia_pensa_quantile_constraint
from wip.modules.constraints import constraint_taxa_alimentacao_disco
from wip.modules.constraints import filters_vacuum_bombs_relationship
from wip.modules.constraints import fix_grupos_de_queima_limits
from wip.modules.constraints import link_nro_filtros_func_lpvars
from wip.modules.constraints import link_rota_func_filtros
from wip.temporary import fix_vent_control_tags_bounds, link_queima_vars
from wip.utils import dbutils_glob
from wip.utils import remove_files


[docs]class PulpSolver: """ Class that defines and solves the optimization problem. This class supports multiple solvers, that can be used to find the optimal solution to the problem. The attribute `solver_option` contains a dictionary, with the following allowed solvers: * `'cbc'` * `'gurobi'` * `'glpk'` * `'cplex'` The default solver is `'cbc'` """ solvers = { 'cplex': pulp.CPLEX(), 'gurobi': pulp.GUROBI(), 'glpk': pulp.GLPK(), 'cbc': pulp.PULP_CBC_CMD(msg=False), }
[docs] def __init__( self, path_to_constraints: str | Path, path_to_costs: str | Path, solver_option: str = "cbc", ): """Set the initial parameters for the optimization problem. Parameters ---------- path_to_constraints : str | Path The path to the folder containing the constraints' files. path_to_costs : str | Path The path to the file containing the costs for each variable. solver_option : str {'cbc', 'gurobi', 'glpk', 'cplex'}, default="cbc" The solver to use to solve the optimization problem. """ self.__variables = {} self.probs = {} # Texto do arquivo `.lp` self.df_results = {} self.solver_option = solver_option self.path_to_constraints = path_to_constraints self.path_to_costs = path_to_costs self.constraints = {} self.logs = '' self.scalers = read_joblib(SCALERS_FILEPATH) # O `costs.csv` é gerado em `outputs` sendo advindo do arquivo # `custo_real.json`, gerado em `Otimizacao`, que por sua vez é criado a partir # do arquivo `wip/files/custo_real.py`. path_to_objective_function = os.path.join(path_to_constraints, 'costs.csv') self.df_obj_function = read_csv( path_to_objective_function, sep=';', decimal=',' ) self.__costs = read_json(path_to_costs, encoding="utf-8") # with open(path_to_costs, "r", encoding="utf-8") as fp: # self.__costs = json.load(fp) for file_path in dbutils_glob( os.path.join(path_to_constraints, 'restricoes-faixa-*-*.txt') ): self.filepath = file_path key = self._extract_key_name(file_path) self._parser_restriction_files(file_path, key) self._parse_objective_file(key) logger.info('Finished "%s" initialization', self.__class__.__name__)
def _parser_restriction_files(self, file_path, key): sentences = read_text(file_path, mode='r', encoding='utf-8').splitlines() self.probs[key] = pulp.LpProblem(key, pulp.LpMinimize) self.__variables[key] = {} constraint_old_name = sentences[0].split(';')[0] constraint = pulp.LpConstraint() bound_value = None for i in range(1, len(sentences)): # terms = cada linha das restrições-faixa terms = sentences[i].split(';') logger.debug('\r\nterms: %s', terms) constraint_name = terms.pop(0) # modelo, ex: abrasão if constraint_name != constraint_old_name: # se mudou para novo if constraint_old_name in self.probs[key].constraints: constraint.setName( self._sub_specific_characters(f'{constraint_old_name}_v2') ) else: constraint.setName( self._sub_specific_characters(constraint_old_name) ) if ( constraint.getName() not in self.probs[key].constraints and 'limit_' not in constraint.getName() ): self.probs[key] += constraint self.constraints[constraint.getName()] = constraint elif 'limit_' not in constraint.getName(): logger.warning( 'Constraint "%s" already in lp problem, skipping it.', constraint.getName(), ) constraint = pulp.LpConstraint() constraint_old_name = constraint_name if 2 <= len(terms) <= 3: feature, coef = terms[0].lstrip(), terms[1] if coef == " 0,0": logger.warning( "%s: feature '%s' won't be included due to coefficient equal to 0", constraint_name, feature, ) constraint = self._mount_multiply(feature, coef, constraint, key) if ( '_limit_' not in constraint_name and '_limit_' not in constraint_old_name and ('min' in constraint_name or 'max' in constraint_name) ): if 'FUNC' not in feature: feature_to_set_limit = feature else: bound_value = float(coef.replace(',', '.')) if feature_to_set_limit and bound_value: self._save_variable_limits( bound_value, constraint_name, key, feature_to_set_limit ) feature_to_set_limit = None bound_value = None elif len(terms) == 1: coef = terms[0].split() if len(coef) == 1: constraint, value = self._mount_add(coef, constraint) if '_limit_' in constraint_name: self._save_variable_limits(value, constraint_name, key, feature) else: constraint, value = self._mount_comparison(coef, constraint) def _parse_objective_file(self, key): target_names = constants.TARGETS_IN_MODEL.values() obj = 0 for name, cost in zip( self.df_obj_function['TAG'], self.df_obj_function['Custo'] ): lpv = self.__variables[key].get(name) if lpv is None: continue lpv = lpv['obj'] if name not in target_names: lpv = denormalize_lpvar(name, lpv, self.scalers) obj += lpv * cost self.probs[key].setObjective(obj) def _add_variable(self, feature, key): self.__variables[key][feature] = {} simple_feature_name = self._sub_specific_characters(feature) if feature.startswith('FUNC') or feature.startswith('status'): self.__variables[key][feature]['obj'] = pulp.LpVariable( simple_feature_name, cat="Binary", lowBound=0, upBound=1 ) self.__variables[key][feature]['min'] = 0 self.__variables[key][feature]['max'] = 1 elif any( feature.startswith(substr) for substr in ['SOMA FUNC', 'qtde', 'SOMA_FUNC'] ): self.__variables[key][feature]['obj'] = pulp.LpVariable( simple_feature_name, cat="Integer" ) else: self.__variables[key][feature]['obj'] = pulp.LpVariable(simple_feature_name) def _mount_multiply(self, feature, coef, constraint, key): try: coef = float(coef.replace(',', '.')) except Exception as err: raise ValueError( f'Erro de conversão de tipo valor: {coef} - Feature: {feature} ' f'- Nome da restrição: {constraint}' ) from err if feature not in self.__variables[key]: self._add_variable(feature, key) constraint += self.__variables[key][feature]['obj'] * coef return constraint def _mount_add(self, coef, constraint): value = float(coef[0].replace(',', '.')) constraint += value return constraint, value def _mount_comparison(self, coef, constraint, errors: str = 'ignore'): logical_operator = coef[0] value = float(coef[1].replace(',', '.')) if 'GT' in logical_operator: constraint = constraint >= value elif 'LT' in logical_operator: constraint = constraint <= value elif logical_operator in ['E', 'EQ']: constraint = constraint == value elif errors == 'raise': raise ValueError(f'Invalid Logical operator: "{logical_operator}"') return constraint, value def _save_variable_limits(self, value, constraint_name, key, feature): limit_key = 'min' if constraint_name.endswith('min') else 'max' if limit_key == 'min': self.__variables[key][feature]['obj'].lowBound = -1 * value else: self.__variables[key][feature]['obj'].upBound = -1 * value self.__variables[key][feature][limit_key] = -1 * value
[docs] def _check_variable_bounds(self, key: str, feature: str): """Check if a variable has min and max bounds, if not, set them to 0 and 1. Parameters ---------- key : str Production range. Possible values are: '750-800', '800-850', '850-900', '900-950', '950-1000' feature : str Name of the variable name to check bounds. """ if 'min' not in self.__variables[key][feature]: self.__variables[key][feature]['min'] = 0 self.__variables[key][feature]['obj'].lowBound = 0 if 'max' not in self.__variables[key][feature]: self.__variables[key][feature]['max'] = 1 self.__variables[key][feature]['obj'].upBound = 1
[docs] def solve_range( self, ranges=None, tmp_path='/dbfs/tmp/us_not_defined', scalers=None, datasets=None, df_sql=None, save_relaxed_prob: bool = False, **kwargs, ): """ Solve the optimization problems for each production range. This method iterates over every production range, defines additional constraints that to be defined, requires the variables and optimization problems to be already defined, and solves the problem using the specified solver. Parameters ---------- ranges : list, optional A list of production ranges to solve. If not specified, all ranges are solved. Acceptable values are: - `'700-750'` - `'750-800'` - `'800-850'` - `'850-900'` - `'900-950'` - `'950-1000'` tmp_path : str, default='/dbfs/tmp/us_not_defined' File path where the results should be saved to. scalers : Dict[str, sklearn.preprocessing.MinMaxScaler], default={} A dictionary containing the scalers for each variable. datasets : Dict[str, pd.DataFrame], default={} A dictionary containing the datasets for each ridge regression model used to create the optimization model. save_relaxed_prob : bool, default=False If one of the optimization problem instances is found to be infeasible, find the constraints that are causing the infeasibility and save the relaxed problem instead of the original problem. kwargs : Any Additional keyword arguments. """ if not isinstance(df_sql, pd.DataFrame): raise ValueError("Parameter 'df_sql' must be a pandas DataFrame") use_floticor = kwargs.pop("use_floticor", False) multi_objective = kwargs.pop("multi_objective", MULTI_OBJECTIVE) range_list = sorted(ranges or self.probs.keys()) extra_constraints = kwargs.pop("extra_constraints", {}) # cfix_value = None for idx, k in enumerate(range_list): prod_range, _ = k.split("-") prod_range = int(prod_range) for variable in self.__variables[k].keys(): self._check_variable_bounds(k, variable) try: prob = self.probs[k] for constraint_name, constraint_value in extra_constraints.items(): if constraint_name not in prob.variablesDict().keys(): continue xvar = prob.variablesDict()[constraint_name] lmin = constraint_value.get('lmin', None) lmax = constraint_value.get('lmax', None) if lmin: self.probs[k] += xvar >= lmin, f'extra_{constraint_name}_min' xvar.lowBound = lmin if lmax: self.probs[k] += xvar <= lmax, f'extra_{constraint_name}_max' xvar.upBound = lmax self.probs[k].variablesDict()[constraint_name] = xvar # Define constraints from `wip.files.lp_denorm_constraints.py` self.probs[k] = create_denorm_constraints(self.probs[k], scalers) # Define `floticor` and `bentonita` relationship. # Rules: # - If 'SE PP' is greater than 1500, then `floticor` must be greater than 0. # - Otherwise, `bentonita` must be greater than 0 and `floticor` must be 0. lpvars = self.probs[k].variablesDict() if use_floticor: floticor_lpvar = denormalize_lpvar( 'floticor', lpvars['floticor'], scalers ) self.probs[k] += lpvars['SE_PP'] >= 1500, "MIN_SE_PP_FLOTICOR" self.probs[k] += floticor_lpvar >= 3, "MIN_FLOTICOR_VALUE" self.probs[k] += floticor_lpvar <= 4.99, "MAX_FLOTICOR_VALUE" self.probs[k] += lpvars['bentonita'] == 0, "MIN_MAX_BENTONITA" else: bentonita_lpvar = lpvars['bentonita'] bentonita_lpvar.lowBound = 5 bentonita_lpvar.upBound = 5.50 self.probs[k] += bentonita_lpvar >= 5, "MIN_BENTONITA_VALUE" self.probs[k] += bentonita_lpvar <= 5.50, "MAX_BENTONITA_VALUE" self.probs[k] += lpvars['floticor'] == 0, "MIN_MAX_FLOTICOR" # Create restriction that binds `cfix` and `antracito` # relationship using formulation. self.probs[k] = add_relationship_cfix_antracito( lpvars['cfix'], lpvars['antracito'], self.probs[k], scalers ) # Limit variables `f"PESO1_I@08PE-BW-840I-{idx:02d}M1"`, # where 1 <= idx <= 12, to be either 0 when # `f"FUNC1_D@08PE-BD-840I-{idx:02d}M1"` is 0 or a value # between 90 and 140 when `f"FUNC1_D@08PE-BD-840I-{idx:02d}M1"` # equals to 1. # self.probs[k] = fix_grupos_de_queima_limits(self.probs[k], scalers) self.probs[k] = constraint_taxa_alimentacao_disco(self.probs[k]) self.probs[k] = fan_consumption_constraint(self.probs[k]) self.probs[k] = link_rota_func_filtros(self.probs[k], min_filters_active=0) self.probs[k] = link_nro_filtros_func_lpvars(self.probs[k]) self.probs[k] = filters_vacuum_bombs_relationship(self.probs[k]) # self = energy_cons_vents_faixas(self, k, df_sql) # self = temp_production_ranges_ascending(self, k) # Set the current problem's `cfix` value equal to the previous value. # if cfix_value is not None: # self.probs[k] += lpvars['cfix'] == cfix_value, "cfix_constant_value" add_energia_pensa_quantile_constraint(self.probs[k], datasets, prod_range) fix_vent_control_tags_bounds(self.probs[k], datasets, k) self.probs[k] = link_queima_vars(self.probs[k], datasets, k) solver = self._get_solver(self.solver_option) if multi_objective: primary_objective = self.probs[k].objective secondary_objective = pulp.lpSum( self.probs[k].variablesDict()[f"TEMP1_I@08QU_QU_855I_GQ{idx:02d}"] for idx in range(9, 17) ) status = self.probs[k].sequentialSolve( [primary_objective, secondary_objective], solver=solver ) status = min(status) else: status = self.probs[k].solve(solver, **kwargs) # if status == pulp.LpStatusOptimal and cfix_value is None: # cfix_value = self.probs[k].variablesDict()["cfix"].value() if status == pulp.LpStatusInfeasible and save_relaxed_prob: relaxed_prob, iis = find_iis(self.probs[k], solver=solver) # noqa logger.error("Model %s is infeasible. Infeasible constraints names: %s", k, ", ".join(iis)) self.probs[k] = relaxed_prob except Exception as err: # pylint: disable=broad-except logger.exception(err) logger.error("Failed to optimize range: %s | Status: %s", k, pulp.constants.LpStatus[self.probs[k].status]) status = self.probs[k].status if status == 1: logger.info('%s: %s', k, pulp.constants.LpStatus[status]) self._create_solver_result(k) else: logger.critical('%s: %s', k, pulp.constants.LpStatus[status]) self.create_lp_file(tmp_path)
[docs] def create_lp_file(self, tmp_path: str): """Write the optimization problem instances to `.lp` files. Parameters ---------- tmp_path : str The path where the optimization problem instances should be saved to. """ for name, prob in self.probs.items(): try: write_lp(prob, name, tmp_path) except TypeError as exc: logger.exception(exc) logger.error( "Failed to save .lp file for production range: %s atr %s", name, tmp_path, )
def _get_solver(self, solver_option): return self.solvers.get(solver_option, pulp.COIN()) def _create_solver_result(self, key): dict_results = {key: {}} prob = self.probs[key] inv_variables = { str(v["obj"]): k for k, v in self.__variables[key].items() if len(k) > 0 } for variable in prob.variables(): if variable.name.startswith("abs_var"): continue variable_name = inv_variables.get(str(variable), None) if ( variable_name is None or self.__variables.get(key, {}).get(inv_variables[str(variable)], None) is None ): continue dict_results[key][variable_name] = {} variable_min = ( self.__variables[key][inv_variables[str(variable)]]["min"] if "min" in self.__variables[key][variable_name] else 0 ) variable_max = ( self.__variables[key][inv_variables[str(variable)]]["max"] if "max" in self.__variables[key][variable_name] else 0 ) obj_coef = ( self.df_obj_function.loc[ self.df_obj_function["TAG"] == variable_name, "Custo" ].values[0] if variable_name in self.df_obj_function["TAG"].tolist() else 0 ) dict_results[key][variable_name][" VariableName"] = variable_name dict_results[key][variable_name][" LB"] = float(variable_min) dict_results[key][variable_name][" UB"] = float(variable_max) dict_results[key][variable_name][" ObjCoeff"] = obj_coef dict_results[key][variable_name][" Value"] = variable.varValue self.dict_results_ = dict_results self.df_results[key] = pd.DataFrame.from_dict(dict_results[key]).T self.df_results[key].sort_values( by=[" ObjCoeff"], ascending=[False], inplace=True )
[docs] def _extract_key_name(self, file_path): """Extract key name from a file path Parameters ---------- file_path : str File path to extract key name from Returns ------- str Key name extracted from the file path Examples -------- >>> _extract_key_name('C:/Users/username/Desktop/2020-01-01_2020-01-31_restricoes_01.csv') '2020-01' >>> probs._extract_key_name('C:/Users/username/Desktop_1-10') '1-10' """ pattern = re.compile("\\d+-\\d+") # Pattern example: 2020-01-01_2020-01-31 search = pattern.search(file_path) return search.group()
[docs] def _remove_non_ascii_normalized(self, string: str) -> str: """Remove non-ascii characters from string""" return ( unicodedata.normalize("NFD", string) .encode("ascii", "ignore") .decode("utf8") )
[docs] def _sub_specific_characters(self, sentence): """ Replace specific characters from sentence Method replaces "*", "=", "/" by "mult", "equal", "div" respectively Parameters ---------- sentence : str Sentence to be fixed Returns ------- str Fixed sentence, with math operations replaced by their literal abbreviations """ fixed_sentence = ( sentence.replace('*', 'mult').replace('=', 'equal').replace('/', 'div') ) fixed_sentence = self._remove_non_ascii_normalized(fixed_sentence) return fixed_sentence
[docs] def get_probs(self) -> Dict[str, pulp.LpProblem]: """Retrieve the optimization problem instances. Returns ------- Dict[str, pulp.LpProblem] A dictionary containing the optimization problem instances. """ return self.probs
[docs] def export_results(self): """ Export the optimization results to a CSV file. Method removes any previously generated results, prior to saving the optimization results. The results to each production range optimization problem are saved to the :attr:`PulpSolver.path_to_constraints` folder using the following name pattern: `'Variables - VarX_<PRODUCTION_RANGE>.csv'`, where `'<PRODUCTION_RANGE>'` is one of the following values: - `'700-750'` - `'750-800'` - `'800-850'` - `'850-900'` - `'900-950'` - `'950-1000'` """ remove_files(self.path_to_constraints, "Variables - VarX_*.csv", True) for prod_range, result in self.df_results.items(): export_file_name = f"Variables - VarX_{prod_range}.csv" export_file_path = os.path.join(self.path_to_constraints, export_file_name) to_csv( result, export_file_path, sep=";", index=False, encoding="ISO-8859-1" )