"""
Initialize the optimization problem instance and solve it.
This module initializes the optimization problem instance and solves it using
the `PulpSolver` class.
"""
from __future__ import annotations
import os
import re
import unicodedata
from pathlib import Path
from typing import Dict
import pandas as pd
import pulp
from wip.constants import MULTI_OBJECTIVE, SCALERS_FILEPATH
from wip.constants import constants
from wip.datatools.io_ops import read_csv
from wip.datatools.io_ops import read_joblib
from wip.datatools.io_ops import read_json
from wip.datatools.io_ops import read_text
from wip.datatools.io_ops import to_csv
from wip.datatools.io_ops import write_lp
from wip.files.lp_denorm_constraints import create_denorm_constraints
from wip.files.lp_denorm_constraints import denormalize_lpvar
from wip.files.lp_denorm_constraints import fan_consumption_constraint
from wip.files.tags_relationships import add_relationship_cfix_antracito
from wip.logging_config import logger
from wip.model_diagnostics import find_iis
from wip.modules.constraints import add_energia_pensa_quantile_constraint
from wip.modules.constraints import constraint_taxa_alimentacao_disco
from wip.modules.constraints import filters_vacuum_bombs_relationship
from wip.modules.constraints import fix_grupos_de_queima_limits
from wip.modules.constraints import link_nro_filtros_func_lpvars
from wip.modules.constraints import link_rota_func_filtros
from wip.temporary import fix_vent_control_tags_bounds, link_queima_vars
from wip.utils import dbutils_glob
from wip.utils import remove_files
[docs]class PulpSolver:
"""
Class that defines and solves the optimization problem.
This class supports multiple solvers, that can be used to find the optimal solution
to the problem.
The attribute `solver_option` contains a dictionary, with the following allowed
solvers:
* `'cbc'`
* `'gurobi'`
* `'glpk'`
* `'cplex'`
The default solver is `'cbc'`
"""
solvers = {
'cplex': pulp.CPLEX(),
'gurobi': pulp.GUROBI(),
'glpk': pulp.GLPK(),
'cbc': pulp.PULP_CBC_CMD(msg=False),
}
[docs] def __init__(
self,
path_to_constraints: str | Path,
path_to_costs: str | Path,
solver_option: str = "cbc",
):
"""Set the initial parameters for the optimization problem.
Parameters
----------
path_to_constraints : str | Path
The path to the folder containing the constraints' files.
path_to_costs : str | Path
The path to the file containing the costs for each variable.
solver_option : str {'cbc', 'gurobi', 'glpk', 'cplex'}, default="cbc"
The solver to use to solve the optimization problem.
"""
self.__variables = {}
self.probs = {} # Texto do arquivo `.lp`
self.df_results = {}
self.solver_option = solver_option
self.path_to_constraints = path_to_constraints
self.path_to_costs = path_to_costs
self.constraints = {}
self.logs = ''
self.scalers = read_joblib(SCALERS_FILEPATH)
# O `costs.csv` é gerado em `outputs` sendo advindo do arquivo
# `custo_real.json`, gerado em `Otimizacao`, que por sua vez é criado a partir
# do arquivo `wip/files/custo_real.py`.
path_to_objective_function = os.path.join(path_to_constraints, 'costs.csv')
self.df_obj_function = read_csv(
path_to_objective_function, sep=';', decimal=','
)
self.__costs = read_json(path_to_costs, encoding="utf-8")
# with open(path_to_costs, "r", encoding="utf-8") as fp:
# self.__costs = json.load(fp)
for file_path in dbutils_glob(
os.path.join(path_to_constraints, 'restricoes-faixa-*-*.txt')
):
self.filepath = file_path
key = self._extract_key_name(file_path)
self._parser_restriction_files(file_path, key)
self._parse_objective_file(key)
logger.info('Finished "%s" initialization', self.__class__.__name__)
def _parser_restriction_files(self, file_path, key):
sentences = read_text(file_path, mode='r', encoding='utf-8').splitlines()
self.probs[key] = pulp.LpProblem(key, pulp.LpMinimize)
self.__variables[key] = {}
constraint_old_name = sentences[0].split(';')[0]
constraint = pulp.LpConstraint()
bound_value = None
for i in range(1, len(sentences)):
# terms = cada linha das restrições-faixa
terms = sentences[i].split(';')
logger.debug('\r\nterms: %s', terms)
constraint_name = terms.pop(0) # modelo, ex: abrasão
if constraint_name != constraint_old_name: # se mudou para novo
if constraint_old_name in self.probs[key].constraints:
constraint.setName(
self._sub_specific_characters(f'{constraint_old_name}_v2')
)
else:
constraint.setName(
self._sub_specific_characters(constraint_old_name)
)
if (
constraint.getName() not in self.probs[key].constraints
and 'limit_' not in constraint.getName()
):
self.probs[key] += constraint
self.constraints[constraint.getName()] = constraint
elif 'limit_' not in constraint.getName():
logger.warning(
'Constraint "%s" already in lp problem, skipping it.',
constraint.getName(),
)
constraint = pulp.LpConstraint()
constraint_old_name = constraint_name
if 2 <= len(terms) <= 3:
feature, coef = terms[0].lstrip(), terms[1]
if coef == " 0,0":
logger.warning(
"%s: feature '%s' won't be included due to coefficient equal to 0",
constraint_name,
feature,
)
constraint = self._mount_multiply(feature, coef, constraint, key)
if (
'_limit_' not in constraint_name
and '_limit_' not in constraint_old_name
and ('min' in constraint_name or 'max' in constraint_name)
):
if 'FUNC' not in feature:
feature_to_set_limit = feature
else:
bound_value = float(coef.replace(',', '.'))
if feature_to_set_limit and bound_value:
self._save_variable_limits(
bound_value, constraint_name, key, feature_to_set_limit
)
feature_to_set_limit = None
bound_value = None
elif len(terms) == 1:
coef = terms[0].split()
if len(coef) == 1:
constraint, value = self._mount_add(coef, constraint)
if '_limit_' in constraint_name:
self._save_variable_limits(value, constraint_name, key, feature)
else:
constraint, value = self._mount_comparison(coef, constraint)
def _parse_objective_file(self, key):
target_names = constants.TARGETS_IN_MODEL.values()
obj = 0
for name, cost in zip(
self.df_obj_function['TAG'], self.df_obj_function['Custo']
):
lpv = self.__variables[key].get(name)
if lpv is None:
continue
lpv = lpv['obj']
if name not in target_names:
lpv = denormalize_lpvar(name, lpv, self.scalers)
obj += lpv * cost
self.probs[key].setObjective(obj)
def _add_variable(self, feature, key):
self.__variables[key][feature] = {}
simple_feature_name = self._sub_specific_characters(feature)
if feature.startswith('FUNC') or feature.startswith('status'):
self.__variables[key][feature]['obj'] = pulp.LpVariable(
simple_feature_name, cat="Binary", lowBound=0, upBound=1
)
self.__variables[key][feature]['min'] = 0
self.__variables[key][feature]['max'] = 1
elif any(
feature.startswith(substr) for substr in ['SOMA FUNC', 'qtde', 'SOMA_FUNC']
):
self.__variables[key][feature]['obj'] = pulp.LpVariable(
simple_feature_name, cat="Integer"
)
else:
self.__variables[key][feature]['obj'] = pulp.LpVariable(simple_feature_name)
def _mount_multiply(self, feature, coef, constraint, key):
try:
coef = float(coef.replace(',', '.'))
except Exception as err:
raise ValueError(
f'Erro de conversão de tipo valor: {coef} - Feature: {feature} '
f'- Nome da restrição: {constraint}'
) from err
if feature not in self.__variables[key]:
self._add_variable(feature, key)
constraint += self.__variables[key][feature]['obj'] * coef
return constraint
def _mount_add(self, coef, constraint):
value = float(coef[0].replace(',', '.'))
constraint += value
return constraint, value
def _mount_comparison(self, coef, constraint, errors: str = 'ignore'):
logical_operator = coef[0]
value = float(coef[1].replace(',', '.'))
if 'GT' in logical_operator:
constraint = constraint >= value
elif 'LT' in logical_operator:
constraint = constraint <= value
elif logical_operator in ['E', 'EQ']:
constraint = constraint == value
elif errors == 'raise':
raise ValueError(f'Invalid Logical operator: "{logical_operator}"')
return constraint, value
def _save_variable_limits(self, value, constraint_name, key, feature):
limit_key = 'min' if constraint_name.endswith('min') else 'max'
if limit_key == 'min':
self.__variables[key][feature]['obj'].lowBound = -1 * value
else:
self.__variables[key][feature]['obj'].upBound = -1 * value
self.__variables[key][feature][limit_key] = -1 * value
[docs] def _check_variable_bounds(self, key: str, feature: str):
"""Check if a variable has min and max bounds, if not, set them to 0 and 1.
Parameters
----------
key : str
Production range.
Possible values are: '750-800', '800-850', '850-900', '900-950', '950-1000'
feature : str
Name of the variable name to check bounds.
"""
if 'min' not in self.__variables[key][feature]:
self.__variables[key][feature]['min'] = 0
self.__variables[key][feature]['obj'].lowBound = 0
if 'max' not in self.__variables[key][feature]:
self.__variables[key][feature]['max'] = 1
self.__variables[key][feature]['obj'].upBound = 1
[docs] def solve_range(
self,
ranges=None,
tmp_path='/dbfs/tmp/us_not_defined',
scalers=None,
datasets=None,
df_sql=None,
save_relaxed_prob: bool = False,
**kwargs,
):
"""
Solve the optimization problems for each production range.
This method iterates over every production range, defines
additional constraints that to be defined, requires the variables
and optimization problems to be already defined,
and solves the problem using the specified solver.
Parameters
----------
ranges : list, optional
A list of production ranges to solve. If not specified, all ranges
are solved. Acceptable values are:
- `'700-750'`
- `'750-800'`
- `'800-850'`
- `'850-900'`
- `'900-950'`
- `'950-1000'`
tmp_path : str, default='/dbfs/tmp/us_not_defined'
File path where the results should be saved to.
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler], default={}
A dictionary containing the scalers for each variable.
datasets : Dict[str, pd.DataFrame], default={}
A dictionary containing the datasets for each ridge regression model
used to create the optimization model.
save_relaxed_prob : bool, default=False
If one of the optimization problem instances is found to be infeasible,
find the constraints that are causing the infeasibility and save the
relaxed problem instead of the original problem.
kwargs : Any
Additional keyword arguments.
"""
if not isinstance(df_sql, pd.DataFrame):
raise ValueError("Parameter 'df_sql' must be a pandas DataFrame")
use_floticor = kwargs.pop("use_floticor", False)
multi_objective = kwargs.pop("multi_objective", MULTI_OBJECTIVE)
range_list = sorted(ranges or self.probs.keys())
extra_constraints = kwargs.pop("extra_constraints", {})
# cfix_value = None
for idx, k in enumerate(range_list):
prod_range, _ = k.split("-")
prod_range = int(prod_range)
for variable in self.__variables[k].keys():
self._check_variable_bounds(k, variable)
try:
prob = self.probs[k]
for constraint_name, constraint_value in extra_constraints.items():
if constraint_name not in prob.variablesDict().keys():
continue
xvar = prob.variablesDict()[constraint_name]
lmin = constraint_value.get('lmin', None)
lmax = constraint_value.get('lmax', None)
if lmin:
self.probs[k] += xvar >= lmin, f'extra_{constraint_name}_min'
xvar.lowBound = lmin
if lmax:
self.probs[k] += xvar <= lmax, f'extra_{constraint_name}_max'
xvar.upBound = lmax
self.probs[k].variablesDict()[constraint_name] = xvar
# Define constraints from `wip.files.lp_denorm_constraints.py`
self.probs[k] = create_denorm_constraints(self.probs[k], scalers)
# Define `floticor` and `bentonita` relationship.
# Rules:
# - If 'SE PP' is greater than 1500, then `floticor` must be greater than 0.
# - Otherwise, `bentonita` must be greater than 0 and `floticor` must be 0.
lpvars = self.probs[k].variablesDict()
if use_floticor:
floticor_lpvar = denormalize_lpvar(
'floticor', lpvars['floticor'], scalers
)
self.probs[k] += lpvars['SE_PP'] >= 1500, "MIN_SE_PP_FLOTICOR"
self.probs[k] += floticor_lpvar >= 3, "MIN_FLOTICOR_VALUE"
self.probs[k] += floticor_lpvar <= 4.99, "MAX_FLOTICOR_VALUE"
self.probs[k] += lpvars['bentonita'] == 0, "MIN_MAX_BENTONITA"
else:
bentonita_lpvar = lpvars['bentonita']
bentonita_lpvar.lowBound = 5
bentonita_lpvar.upBound = 5.50
self.probs[k] += bentonita_lpvar >= 5, "MIN_BENTONITA_VALUE"
self.probs[k] += bentonita_lpvar <= 5.50, "MAX_BENTONITA_VALUE"
self.probs[k] += lpvars['floticor'] == 0, "MIN_MAX_FLOTICOR"
# Create restriction that binds `cfix` and `antracito`
# relationship using formulation.
self.probs[k] = add_relationship_cfix_antracito(
lpvars['cfix'], lpvars['antracito'], self.probs[k], scalers
)
# Limit variables `f"PESO1_I@08PE-BW-840I-{idx:02d}M1"`,
# where 1 <= idx <= 12, to be either 0 when
# `f"FUNC1_D@08PE-BD-840I-{idx:02d}M1"` is 0 or a value
# between 90 and 140 when `f"FUNC1_D@08PE-BD-840I-{idx:02d}M1"`
# equals to 1.
# self.probs[k] = fix_grupos_de_queima_limits(self.probs[k], scalers)
self.probs[k] = constraint_taxa_alimentacao_disco(self.probs[k])
self.probs[k] = fan_consumption_constraint(self.probs[k])
self.probs[k] = link_rota_func_filtros(self.probs[k], min_filters_active=0)
self.probs[k] = link_nro_filtros_func_lpvars(self.probs[k])
self.probs[k] = filters_vacuum_bombs_relationship(self.probs[k])
# self = energy_cons_vents_faixas(self, k, df_sql)
# self = temp_production_ranges_ascending(self, k)
# Set the current problem's `cfix` value equal to the previous value.
# if cfix_value is not None:
# self.probs[k] += lpvars['cfix'] == cfix_value, "cfix_constant_value"
add_energia_pensa_quantile_constraint(self.probs[k], datasets, prod_range)
fix_vent_control_tags_bounds(self.probs[k], datasets, k)
self.probs[k] = link_queima_vars(self.probs[k], datasets, k)
solver = self._get_solver(self.solver_option)
if multi_objective:
primary_objective = self.probs[k].objective
secondary_objective = pulp.lpSum(
self.probs[k].variablesDict()[f"TEMP1_I@08QU_QU_855I_GQ{idx:02d}"]
for idx in range(9, 17)
)
status = self.probs[k].sequentialSolve(
[primary_objective, secondary_objective], solver=solver
)
status = min(status)
else:
status = self.probs[k].solve(solver, **kwargs)
# if status == pulp.LpStatusOptimal and cfix_value is None:
# cfix_value = self.probs[k].variablesDict()["cfix"].value()
if status == pulp.LpStatusInfeasible and save_relaxed_prob:
relaxed_prob, iis = find_iis(self.probs[k], solver=solver) # noqa
logger.error("Model %s is infeasible. Infeasible constraints names: %s",
k, ", ".join(iis))
self.probs[k] = relaxed_prob
except Exception as err: # pylint: disable=broad-except
logger.exception(err)
logger.error("Failed to optimize range: %s | Status: %s",
k, pulp.constants.LpStatus[self.probs[k].status])
status = self.probs[k].status
if status == 1:
logger.info('%s: %s', k, pulp.constants.LpStatus[status])
self._create_solver_result(k)
else:
logger.critical('%s: %s', k, pulp.constants.LpStatus[status])
self.create_lp_file(tmp_path)
[docs] def create_lp_file(self, tmp_path: str):
"""Write the optimization problem instances to `.lp` files.
Parameters
----------
tmp_path : str
The path where the optimization problem instances should be saved to.
"""
for name, prob in self.probs.items():
try:
write_lp(prob, name, tmp_path)
except TypeError as exc:
logger.exception(exc)
logger.error(
"Failed to save .lp file for production range: %s atr %s",
name,
tmp_path,
)
def _get_solver(self, solver_option):
return self.solvers.get(solver_option, pulp.COIN())
def _create_solver_result(self, key):
dict_results = {key: {}}
prob = self.probs[key]
inv_variables = {
str(v["obj"]): k for k, v in self.__variables[key].items() if len(k) > 0
}
for variable in prob.variables():
if variable.name.startswith("abs_var"):
continue
variable_name = inv_variables.get(str(variable), None)
if (
variable_name is None
or self.__variables.get(key, {}).get(inv_variables[str(variable)], None)
is None
):
continue
dict_results[key][variable_name] = {}
variable_min = (
self.__variables[key][inv_variables[str(variable)]]["min"]
if "min" in self.__variables[key][variable_name]
else 0
)
variable_max = (
self.__variables[key][inv_variables[str(variable)]]["max"]
if "max" in self.__variables[key][variable_name]
else 0
)
obj_coef = (
self.df_obj_function.loc[
self.df_obj_function["TAG"] == variable_name, "Custo"
].values[0]
if variable_name in self.df_obj_function["TAG"].tolist()
else 0
)
dict_results[key][variable_name][" VariableName"] = variable_name
dict_results[key][variable_name][" LB"] = float(variable_min)
dict_results[key][variable_name][" UB"] = float(variable_max)
dict_results[key][variable_name][" ObjCoeff"] = obj_coef
dict_results[key][variable_name][" Value"] = variable.varValue
self.dict_results_ = dict_results
self.df_results[key] = pd.DataFrame.from_dict(dict_results[key]).T
self.df_results[key].sort_values(
by=[" ObjCoeff"], ascending=[False], inplace=True
)
[docs] def _remove_non_ascii_normalized(self, string: str) -> str:
"""Remove non-ascii characters from string"""
return (
unicodedata.normalize("NFD", string)
.encode("ascii", "ignore")
.decode("utf8")
)
[docs] def _sub_specific_characters(self, sentence):
"""
Replace specific characters from sentence
Method replaces "*", "=", "/" by "mult", "equal", "div" respectively
Parameters
----------
sentence : str
Sentence to be fixed
Returns
-------
str
Fixed sentence, with math operations replaced by their
literal abbreviations
"""
fixed_sentence = (
sentence.replace('*', 'mult').replace('=', 'equal').replace('/', 'div')
)
fixed_sentence = self._remove_non_ascii_normalized(fixed_sentence)
return fixed_sentence
[docs] def get_probs(self) -> Dict[str, pulp.LpProblem]:
"""Retrieve the optimization problem instances.
Returns
-------
Dict[str, pulp.LpProblem]
A dictionary containing the optimization problem instances.
"""
return self.probs
[docs] def export_results(self):
"""
Export the optimization results to a CSV file.
Method removes any previously generated results, prior to saving the
optimization results. The results to each production range optimization
problem are saved to the :attr:`PulpSolver.path_to_constraints` folder
using the following name pattern: `'Variables - VarX_<PRODUCTION_RANGE>.csv'`,
where `'<PRODUCTION_RANGE>'` is one of the following values:
- `'700-750'`
- `'750-800'`
- `'800-850'`
- `'850-900'`
- `'900-950'`
- `'950-1000'`
"""
remove_files(self.path_to_constraints, "Variables - VarX_*.csv", True)
for prod_range, result in self.df_results.items():
export_file_name = f"Variables - VarX_{prod_range}.csv"
export_file_path = os.path.join(self.path_to_constraints, export_file_name)
to_csv(
result, export_file_path, sep=";", index=False, encoding="ISO-8859-1"
)