"""
Module defines the optimization problem constraints.
"""
from __future__ import annotations
from typing import Dict
from typing import List
import numpy as np
import pandas as pd
import pulp
import sklearn.preprocessing
from sklearn.preprocessing import MinMaxScaler
import wip.modules.ops as operations
from wip.constants import FILTERS_FUNC_TAGS
from wip.constants import FUNC_VACUUM_BOMBS
from wip.constants import ROTA_FILTERS_TAGS
from wip.constants import constants
from wip.constants import critical_cols_dict
from wip.datatools.ml_filters import find_models_by_tag
from wip.files.complex_constraints import complex_constraints
from wip.files.general_constraints import general_constraints
from wip.files.range_complex_constraints import range_complex_constraints
from wip.files.range_constraints import range_constraints
from wip.files.variable_constraints import variable_constraints
from wip.logging_config import logger
from wip.modules import solver_ops as solver_operations
from wip.modules.limits import Limits
from wip.modules.lparray import lp_define_or_constraint
from wip.modules.lparray import lp_multiply
[docs]class Constraints:
"""
A class for defining and manipulating linear programming constraints.
Methods
-------
add_constraint(constraint: pulp.LpConstraint) -> None:
Add a new constraint to the list of constraints.
remove_constraint(constraint: pulp.LpConstraint) -> None:
Remove a constraint from the list of constraints.
get_constraint_names() -> List[str]:
Get a list of all the constraints' names as the list.
get_constraint_values(variables: List[pulp.LpVariable]) -> List[float]:
Get a list values from all constraints in a list of variable values.
get_constraint_coefficients(variables: List[pulp.LpVariable]) -> List[List[float]]:
Get a list of coefficients from all constraints in a list of variable values.
get_constraint_bounds() -> List[Tuple[float, float]]:
Get a list of lower and upper bounds for all constraints.
set_constraint_bounds(bounds: List[Tuple[float, float]]) -> None:
Set the lower and upper bounds of all constraints in the list.
set_constraint_coefficients(coefficients: List[List[float]]) -> None:
Set the coefficients of all constraints in the list.
set_constraint_values(values: List[float]) -> None:
Set the values of all constraints in the list.
"""
[docs] @staticmethod
def write_feature_constraints(feature, file, lmin, lmax):
"""Write lower and upper bound values for a feature (tag).
Parameters
----------
feature : str
The name of the tag to define the lower- and upper-bounds.
file : str | TextIO
File to write the feature's lower- and upper-bounds to.
lmin : int | float | Tuple[float, float]
Lower-bound value of the feature.
lmax : int | float | Tuple[float, float]
Upper-bound value of the feature.
"""
solver_operations.write_constraint(file, f"{feature}_limit_min",
[(feature, 1), -lmin, ("GTE", 0)])
solver_operations.write_constraint(file, f"{feature}_limit_max",
[(feature, 1), -lmax, ("LTE", 0)])
[docs] @staticmethod
def parse_data(data):
return {
key: [
tuple(operation) if isinstance(operation, list) else operation
for operation in value
]
for key, value in data.items()
}
[docs] @staticmethod
def write_simple_constraints(file):
"""Write constraints that are constant and each range"""
new_data = Constraints.parse_data(general_constraints)
for constraint, operation in new_data.items():
solver_operations.write_constraint(file, constraint, operation)
[docs] @staticmethod
def write_targets_limits(file, datasets, features_limits):
"""
Write the constraints created using each model target.
removing those that are already defined
"""
targets = [df.columns[-1] for model_name, df in datasets.items()]
targets_write = list(filter(lambda x: x not in features_limits, targets))
select_dataset = {
target: list(filter(lambda x: target in datasets[x], datasets))
for target in set(targets_write)
}
for target in set(targets_write):
values = datasets[select_dataset[target][0]][target]
if target == "0":
continue
if target in constants.TARGETS_IN_MODEL.keys():
new_target = constants.TARGETS_IN_MODEL[target]
Constraints.write_feature_constraints(new_target, file,
min(values), max(values))
Constraints.write_feature_constraints(target, file, min(values),
max(values))
[docs] @staticmethod
def define_range_terms(range_terms, scalers):
range_constraints = solver_operations.define_range_constraints
if not isinstance(range_terms, list):
range_terms = [range_terms]
all_terms = []
for terms in range_terms:
step = 1
if "step" in terms.keys():
step = terms["step"]
parsed_terms = range_constraints(
terms["feature"], terms["start"], terms["end"] + 1, step
)
norm_features = []
if "norm_feature" in terms.keys():
norm_features = range_constraints(
terms["norm_feature"],
terms["start"],
terms["end"] + 1,
step,
)
term = terms.copy()
for index, feature in enumerate(parsed_terms):
term["feature"] = feature
if norm_features:
_, new_coef = Constraints.measure_new_coef(
term, scalers, norm_features[index]
)
else:
_, new_coef = Constraints.measure_new_coef(term, scalers)
all_terms.append((feature, new_coef))
return all_terms
[docs] @staticmethod
def write_simple_range_terms(file, scalers, features_limits):
"""
opera com range_constraints
"""
range_terms = range_constraints
for constraint_name, terms in range_terms.items():
terms = Constraints.parse_complex_constraints(
terms, features_limits, scalers
)
solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod
def parse_complex_constraints(
terms, features_limits, scalers, range_min=None, range_max=None
):
"""
Parse complex constraints, where there is a function in the feature.
Parameters
----------
terms : list
A list of dictionaries, where each dictionary represents a term
in the constraint.
features_limits : dict
A dictionary containing the minimum and maximum values for each feature.
scalers : dict
A dictionary containing the `MinMaxScaler` objects for each feature.
range_min : float, default=None
The minimum range value for the constraint.
range_max : float, default=None
The maximum range value for the constraint.
Returns
-------
list
A list of tuples, where each tuple represents a term in the constraint.
"""
simple_constraint = []
operator = None
for term in terms: # each term is a map
# If the condition is met, then it represents a simple constraint.
if (
len(term.keys()) == 2
and "feature" in term.keys()
and "coef" in term.keys()
):
simple_constraint.append(tuple(term.values()))
# If the condition is met, then it represents an operator
elif "limit" not in term.keys() and "operator" in term.keys():
operator = tuple(term.values())
elif "limit" in term.keys() and "operator" not in term.keys():
limit = Constraints.define_term_limit(
term, features_limits, range_min, range_max
)
simple_constraint.append((term["feature"], limit))
elif "limit" in term.keys() and "operator" in term.keys():
limit = Constraints.define_term_limit(
term, features_limits, range_min, range_max
)
if term["operator"] == "norm":
method_operator = operations.normalize_feature
simple_constraint.append(
(
term["coef"]
* method_operator(scalers, term["feature"], limit)
)
)
elif "start" in term.keys() and "end" in term.keys():
simple_constraint.extend(Constraints.define_range_terms(term, scalers))
if operator is not None:
simple_constraint.append(operator)
return simple_constraint
[docs] @staticmethod
def define_term_limit(term, features_limits, range_min, range_max):
"""Define the limit for a term in a constraint.
Parameters
----------
term : dict
A dictionary representing a term in a constraint.
features_limits : dict
A dictionary containing the minimum and maximum values for each feature.
range_min : float
The minimum range value for the constraint.
range_max : float
The maximum range value for the constraint.
Returns
-------
float
The limit for the term in the constraint.
"""
if term["limit"] == "fmin":
return range_min
if term["limit"] == "fmax":
return range_max
return features_limits[term["feature"]][term["limit"]]
[docs] @staticmethod
def write_variable_constraints(
file, features_limits, scalers, range_min, range_max
):
"""
Write constraints that are variable for each range (complex constraints).
Parameters
----------
file : str | Path
The file to write the constraints to.
features_limits : dict
A dictionary containing the minimum and maximum values for each feature.
scalers : dict
A dictionary containing the `MinMaxScaler` objects for each feature.
range_min : Optional[Union[int, float]]
The minimum range value for the constraint.
range_max : Optional[Union[int, float]]
The maximum range value for the constraint.
"""
constraints = variable_constraints
constraints_temp = constraints.copy()
for k, constraint in constraints.items():
for sentence in constraint:
if (
"feature" in sentence
and sentence["feature"] not in features_limits.keys()
):
constraints_temp.pop(k)
break
constraints = constraints_temp
for constraint, terms in constraints.items():
parsed_terms = Constraints.parse_complex_constraints(
terms, features_limits, scalers, range_min, range_max
)
solver_operations.write_constraint(file, constraint, parsed_terms)
[docs] @staticmethod
def write_special_constraints(file, scalers):
"""Write special constraints, specific to each production plant."""
# Constraints.write_compressao_min_lim(file)
Constraints.write_calcario_equal(file, scalers)
# Constraints.write_mill_feed_rate(file, scalers)
# Constraints.write_gran_ocs_tm_equality(file, scalers)
# Constraints.write_gran_ocs_tm_min(file, scalers)
[docs] @staticmethod
def write_vent_rotation(file, scalers, range_max):
vent_rotation_token = "ROTA1_I@08QU-PF-852I-{:02}M1".format
increment = (int(range_max) - 700) / 50
constraint_sufix = "_gte_500"
constraint = []
value = 1
token = vent_rotation_token(value)
constraint.append(
(
vent_rotation_token(value),
scalers[vent_rotation_token(value)].data_range_[0],
)
)
constraint.append(
(
"GT",
operations.normalize_feature(scalers, token, (500 + increment * 15))
- scalers[vent_rotation_token(value)].data_min_[0],
)
)
solver_operations.write_constraint(
file, vent_rotation_token(value) + constraint_sufix, constraint
)
constraint_sufix = "_gte_400"
constraint = []
value = 2
token = vent_rotation_token(value)
constraint.append(
(
vent_rotation_token(value),
scalers[vent_rotation_token(value)].data_range_[0],
)
)
constraint.append(
(
"GT",
operations.normalize_feature(scalers, token, (380 + increment * 5))
- scalers[vent_rotation_token(value)].data_min_[0],
)
)
solver_operations.write_constraint(
file, vent_rotation_token(value) + constraint_sufix, constraint
)
constraint_sufix = "_gte_400"
constraint = []
value = 3
token = vent_rotation_token(value)
constraint.append(
(
vent_rotation_token(value),
scalers[vent_rotation_token(value)].data_range_[0],
)
)
constraint.append(
(
"GT",
operations.normalize_feature(scalers, token, (380 + increment * 5))
- scalers[vent_rotation_token(value)].data_min_[0],
)
)
solver_operations.write_constraint(
file, vent_rotation_token(value) + constraint_sufix, constraint
)
constraint_sufix = "_gte_320"
constraint = []
value = 4
token = vent_rotation_token(value)
constraint.append(
(
vent_rotation_token(value),
scalers[vent_rotation_token(value)].data_range_[0],
)
)
constraint.append(
(
"GT",
operations.normalize_feature(scalers, token, (320 + increment * 15))
- scalers[vent_rotation_token(value)].data_min_[0],
)
)
solver_operations.write_constraint(
file, vent_rotation_token(value) + constraint_sufix, constraint
)
constraint_sufix = "_gte_515"
constraint = []
value = 7
token = vent_rotation_token(value)
constraint.append(
(
vent_rotation_token(value),
scalers[vent_rotation_token(value)].data_range_[0],
)
)
constraint.append(
(
"GT",
operations.normalize_feature(scalers, token, (515 + increment * 5))
- scalers[vent_rotation_token(value)].data_min_[0],
)
)
solver_operations.write_constraint(
file, vent_rotation_token(value) + constraint_sufix, constraint
)
constraint_sufix = "_gte_560"
constraint = []
value = 8
token = vent_rotation_token(value)
constraint.append(
(
vent_rotation_token(value),
scalers[vent_rotation_token(value)].data_range_[0],
)
)
constraint.append(
(
"GT",
operations.normalize_feature(scalers, token, (560 + increment * 5))
- scalers[vent_rotation_token(value)].data_min_[0],
)
)
solver_operations.write_constraint(
file, vent_rotation_token(value) + constraint_sufix, constraint
)
[docs] @staticmethod
def write_gran_ocs_tm_equality(file: str, scalers: Dict[str, MinMaxScaler]):
# Equidade das granulometrias como solicitado pelo Rodrigo
tag_base = "GRAN_OCS_TM@08PE-BD-840I-10"
for tag_gran in [
tag_name
for tag_name in scalers.keys()
if "GRAN_OCS_TM@08PE-BD-840I-" in tag_name and "10" not in tag_name
]:
constraint_name = f"{tag_base}_igual_{tag_gran}"
bm = scalers[tag_base].data_min_[0] - scalers[tag_gran].data_min_[0]
terms = [
(tag_base, -scalers[tag_base].data_range_[0]),
(tag_gran, scalers[tag_gran].data_range_[0]),
("E", bm),
]
solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod
def write_gran_ocs_tm_min(file, scalers):
# Equidade das granulometrias como solicitado pelo rodrigo
for tag_gran in [
tag for tag in scalers.keys() if "GRAN_OCS_TM@08PE-BD-840I-" in tag
]:
constraint_name = f"{tag_gran}_GT_than13"
bm = -1 * scalers[tag_gran].data_min_[0]
terms = [
(tag_gran, scalers[tag_gran].data_range_[0]),
("GTE", 13 + bm),
]
solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod
def write_compressao_min_lim(file):
"""
Define the minimum allowable compression value.
Original Tag name: `"COMP_MCOMP_PQ_L@08QU"`
Default lower-bound value: 290
The function tries to get the lower-bound value from the
`wip.constants.critical_cols_dict` dictionary.
It searches for the key "COMP_MCOMP_PQ_L@08QU", and if not found,
it tries searching for the key "compressao".
If none of the keys are found, then the default value of 290 is used.
Parameters
----------
file : File
The constraints' file to write constraint to.
scalers : Dict[str, sklearn.preprocessing.MinMaxScaler]
Dictionary of scalers fitted to each tag, during the predictive
models' creation.
"""
compressao_min = critical_cols_dict.get(
"COMP_MCOMP_PQ_L@08QU",
critical_cols_dict.get("compressao", {"lmin": 270})
)["lmin"]
terms = []
constraint_name = "compressao_min"
terms.append(("COMP_MCOMP_PQ_L@08QU", 1))
terms.append(("GTE", compressao_min))
solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod
def write_calcario_equal(file, scalers):
bm = (scalers["PESO1_I@08MO-BW-821I-01M1"].data_range_[0]
+ scalers["PESO1_I@08MO-BW-821I-02M1"].data_range_[0]
+ scalers["PESO1_I@08MO-BW-821I-03M1"].data_range_[0])
bm *= 0.8762 * scalers["calcario"].data_range_[0]
bm = operations.normalize_feature(scalers, "PESO1_I@08MO-BW-813I-01M1",
bm)
constraint_name = "calcario_equality"
terms = [("calcario", -bm), ("PESO1_I@08MO-BW-813I-01M1", 1),
("GTE", 0)]
solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod
def write_mill_feed_rate(file, scalers):
# Taxa de alimentação do moinho
constraint_name = "taxa_alimentacao_moinho_vs_producao"
aproveitamento_massa_moinho = 0.8 # 0.8762
terms = []
for i in range(1, 4):
v_func = "FUNC1_D@08MO-BW-821I-{:02}M1".format(
i) # v_func = 'FUNC1_D@08MO-MO-821I-{:02}M1'.format(i)
v_taxa = "PESO1_I@08MO-BW-821I-{:02}M1".format(
i) # PESO1_I@08MO-BW-821I-{:02}M1
coef = aproveitamento_massa_moinho * scalers[v_taxa].data_range_[0]
terms.append((
v_func,
-aproveitamento_massa_moinho * scalers[v_taxa].data_min_[0],
))
terms.append((v_taxa, coef))
terms.append(
("PROD_PQ_Y@08US", -scalers["PROD_PQ_Y@08US"].data_range_[0]))
terms.append(("E", scalers["PROD_PQ_Y@08US"].data_min_[0]))
solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod
def generic_term_writing(
file,
first_token,
second_token,
first_coef,
second_coef,
operator,
final_coef,
commom_token=True,
constraint_name=None,
):
terms = []
if commom_token:
terms.append((second_token, first_coef))
terms.append((second_token, second_coef))
terms.append((operator, final_coef))
solver_operations.write_constraint(file, first_token, terms)
else:
terms.append((first_token, first_coef))
terms.append((second_token, second_coef))
terms.append((operator, final_coef))
solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod
def measure_new_coef(term, scalers, norm_feature=None):
feature = term["feature"]
new_coef = 1
if "operator" in term.keys():
if term["operator"] == "norm":
if norm_feature:
feature = norm_feature
new_coef = operations.normalize_feature(
scalers, feature, term["limit"])
elif term["operator"] == "scaler":
if term["position"] == "range":
new_coef = scalers[feature].data_range_[0]
elif term["position"] == "min":
new_coef = scalers[feature].data_min_[0]
else: # it's max
new_coef = scalers[feature].data_max_[0]
new_coef = term["coef"] * new_coef
return term, new_coef
[docs] @staticmethod
def define_operator_term(terms):
_operations = ["LT", "LTE", "GTE", "GT", "E"]
condition_met = False
_index = None
operator = (None, None)
for index, term in enumerate(terms):
condition_one = any(operation in term.values()
for operation in _operations)
if condition_one and "operation" not in term.keys():
operator = (term["operator"], term["coef"])
condition_met = True
_index = index
break
if condition_met:
terms.pop(_index)
return terms, operator
[docs] @staticmethod
def parse_range_complex_constraints(file, scalers):
constraints = range_complex_constraints
constraints = Constraints.parse_data(constraints)
for constraint_name, terms in constraints.items():
# defining the constraint name
start, end = terms[0]["start"], terms[0]["end"] + 1
constraint_name = constraint_name.format
constraints_names = [
constraint_name(value) for value in range(start, end)
]
terms, operator = Constraints.define_operator_term(terms)
range_terms = list(
map(lambda term: Constraints.define_range_terms(term, scalers),
terms))
composed_terms = []
for term_index in range(0, len(range_terms[0])):
composed_terms.append(
list(map(lambda term: term[term_index], range_terms)))
for constraint_name, terms in zip(constraints_names,
composed_terms):
terms = list(terms)
terms.append(operator)
solver_operations.write_constraint(file, constraint_name,
tuple(terms))
[docs] @staticmethod
def write_complex_constraints(file, scalers):
constraints = complex_constraints
for constraint_name, terms in constraints.items():
new_terms = Constraints.parse_type_complex_terms(
constraint_name, terms, scalers)
solver_operations.write_constraint(file, constraint_name,
new_terms)
[docs] @staticmethod
def parse_type_complex_terms(constraint_name, terms, scalers):
operations = ["LT", "LTE", "GTE", "GT", "E"]
new_terms, operator = [], []
right_terms = None
for term in terms:
if "start" in term.keys() and "end" in term.keys():
new_terms.extend(Constraints.define_range_terms(term, scalers))
elif any(operation in term.values() for operation in operations):
operator = term.copy()
elif "terms" in term.keys(): # complex factor after the operator
right_terms = Constraints.parse_type_complex_terms(
constraint_name, term["terms"], scalers)
elif "operator" in term.keys() and "position" in term.keys():
_, new_coef = Constraints.measure_new_coef(term, scalers)
new_terms.append((term["feature"], new_coef))
else: # it's a static feature
new_terms.append((term["feature"], term["coef"]))
if isinstance(operator, dict) and right_terms is not None:
sum_coefs = sum([list(value)[1] for value in right_terms])
operator = (operator["operator"], operator["coef"] * sum_coefs)
new_terms.append(operator)
return new_terms
def write_media_pres_constraints(file: str, scalers: Dict[str, MinMaxScaler]):
"""
Write media pressure and temperature constraints to a given file.
This function writes constraints to the file, when the following
tags exist in the `scalers` dictionary:
- 'media pres 1'
- 'media pres 2'
- 'media pres 3'
- 'media pres 4'
- 'media temp 1'
- 'media temp 2'
- 'media temp 3'
- 'media temp 4'
Parameters
----------
file : str
The file path to which the constraints are written.
scalers : Dict[str, MinMaxScaler]
A dictionary where keys represent constraint names and values
represent associated `MinMaxScaler` objects.
See Also
--------
solver_operations.write_constraint : The method used to write constraints
to the file.
Notes
-----
The function makes use of a hard-coded mapping dictionary for media
pressure and temperature constraints. Each mapping associates a constraint
name with a list of identifiers.
"""
media_mappings = {
'media pres 1': [
"PRES1_I@08QU-WB-851I-01",
"PRES1_I@08QU-WB-851I-02",
"PRES1_I@08QU-WB-851I-03A",
],
'media press 2': [
"PRES1_I@08QU-WB-851I-03B",
"PRES1_I@08QU-WB-851I-04",
"PRES1_I@08QU-WB-851I-05",
"PRES1_I@08QU-WB-851I-06",
],
'media press 3': [
"PRES1_I@08QU-WB-851I-06",
"PRES1_I@08QU-WB-851I-07",
"PRES1_I@08QU-WB-851I-08",
"PRES1_I@08QU-WB-851I-09",
],
'media press 4': [
"PRES1_I@08QU-WB-851I-09",
"PRES1_I@08QU-WB-851I-10",
"PRES1_I@08QU-WB-851I-11",
"PRES1_I@08QU-WB-851I-12",
"PRES1_I@08QU-WB-851I-13",
"PRES1_I@08QU-WB-851I-14",
"PRES1_I@08QU-WB-851I-15",
"PRES1_I@08QU-WB-851I-16",
"PRES1_I@08QU-WB-851I-17",
"PRES1_I@08QU-WB-851I-18",
"PRES1_I@08QU-WB-851I-19",
],
'media temp 1': [
"TEMP1_I@08QU-WB-851I-01",
"TEMP1_I@08QU-WB-851I-02",
"TEMP1_I@08QU-WB-851I-03",
],
'media temp 2': [
"TEMP1_I@08QU-WB-851I-03B",
"TEMP1_I@08QU-WB-851I-04",
"TEMP1_I@08QU-WB-851I-05",
"TEMP1_I@08QU-WB-851I-06",
],
'media temp 3': [
"TEMP1_I@08QU-WB-851I-06",
"TEMP1_I@08QU-WB-851I-07",
"TEMP1_I@08QU-WB-851I-08",
"TEMP1_I@08QU-WB-851I-09",
],
'media temp 4': [
"TEMP1_I@08QU-WB-851I-09",
"TEMP1_I@08QU-WB-851I-10",
"TEMP1_I@08QU-WB-851I-11",
"TEMP1_I@08QU-WB-851I-12",
"TEMP1_I@08QU-WB-851I-13",
"TEMP1_I@08QU-WB-851I-14",
"TEMP1_I@08QU-WB-851I-15",
"TEMP1_I@08QU-WB-851I-16",
"TEMP1_I@08QU-WB-851I-17",
"TEMP1_I@08QU-WB-851I-18",
"TEMP1_I@08QU-WB-851I-19",
"TEMP1_I@08QU-WB-851I-20",
],
}
media_columns = [name for name in scalers if name in media_mappings]
for constraint_name in media_columns:
columns = media_mappings[constraint_name]
terms = [(column, 1 / len(columns)) for column in columns]
terms.extend(((constraint_name, -1), 0, ("E", 0)))
solver_operations.write_constraint(file, f"{constraint_name}_equality",
terms)
[docs]def add_energia_pensa_quantile_constraint(prob,
datasets,
prod_range,
lb_limit=0.25,
ub_limit=0.75):
"""
Add constraint to optimization that limits models target tags using quantiles.
Function constraints the following tags:
- `"energia_prensa"`
- `"energia_moinho"`
- `"relacao_gran"`
- `"gas"`
Parameters
----------
prob : pulp.LpProblem
The problem to which the constraint is added.
datasets : Dict[str, pd.DataFrame]
A dictionary where keys represent model names and values represent
the corresponding datasets as `pandas.DataFrame`.
prod_range : int
The production range to which the constraint is applied.
lb_limit : float, default=0.25
The lower-bound limit of the constraint.
ub_limit : float, default=0.75
The upper-bound limit of the constraint.
Notes
-----
The `quantile_limits` module does not work for some of the models' target
variables. Therefore, this function was created to add the quantile
constraint to tags that represent target variables that are not supported
by the `quantile_limits` module.
.. versionchanged:: 2.10.0
Removed from function the lower- and upper-bound definitions
for tag "compressao"
"""
series = prepare_series(datasets, "energia_prensa", prod_range)
lpvar = prob.variablesDict()["energia_prensa"]
lpvar.lowBound = series.quantile(lb_limit)
lpvar.upBound = series.quantile(ub_limit)
series = prepare_series(datasets, "energia_moinho", prod_range)
lpvar = prob.variablesDict()["energia_moinho"]
lpvar.lowBound = series.quantile(lb_limit)
lpvar.upBound = series.quantile(ub_limit)
series = prepare_series(datasets, "relacao gran", prod_range)
lpvar = prob.variablesDict()["relacao_gran"]
lpvar.lowBound = series.quantile(lb_limit)
lpvar.upBound = series.quantile(ub_limit)
series = prepare_series(datasets, "gas", prod_range)
lpvar = prob.variablesDict()["gas"]
lpvar.lowBound = series.quantile(min(lb_limit, 0.1))
lpvar.upBound = series.quantile(min(max(ub_limit, 0.9), 1))
[docs]def prepare_series(datasets: Dict[str, pd.DataFrame], model_name: str,
prod_range: int) -> pd.Series:
"""Prepare a series to be used in the quantile constraint.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys represent model names and values represent
the corresponding datasets as `pandas.DataFrame`.
model_name : str
The name of the model to be used.
prod_range : int
The production range to which the constraint is applied.
Returns
-------
pd.Series
The series to be used in the quantile constraint.
"""
prod_pq = "PROD_PQ_Y@08US"
df = datasets[model_name]
target_col = df.columns[-1]
if prod_pq not in df.columns:
prod_pq_series = (
pd.concat([
datasets[model_name][prod_pq].to_frame(prod_pq)
for model_name in find_models_by_tag(prod_pq, datasets)
])
.sort_index()
.drop_duplicates()
)
df = (
df[target_col]
.to_frame(target_col)
.merge(prod_pq_series, left_index=True, right_index=True, how="inner")
)
else:
df = df[[target_col, prod_pq]]
return df.loc[
lambda xdf: ((xdf[prod_pq] >= prod_range)
& (xdf[prod_pq] < prod_range + 50)),
target_col,
]
[docs]def link_rota_func_filtros(
prob: pulp.LpProblem,
rota_filtros_low_bound: float = 0.8,
rota_filtros_up_bound: float = 1.1,
min_filters_active: int = 6,
) -> pulp.LpProblem:
"""
Link filter rotation and "functioning" tags.
This function iterates over predefined pairs of binary and continuous
variables in a linear programming problem. It links these variables using the
`lp_define_or_constraint` function, applying optional MinMax inverse scaling
to the continuous variables, if a value for the parameter :param:`scalers` is
provided. The function mainly operates on a provided PuLP problem
object, modifying it by adding constraints.
The filter rotation and functioning tags are linked as follows:
- If functioning tag is equal to 0, then the rotation tag is also zero.
- If functioning tag is equal to 1, then the rotation tag must be between
the specified :param:`rota_filtros_low_bound` and
:param:`rota_filtros_up_bound` lower- and upper-bounds.
Parameters
----------
prob : pulp.LpProblem
The linear programming problem to which constraints and links are added.
rota_filtros_low_bound : float, default=0.8
The lower bound to be used in the `lp_define_or_constraint` function for
linking variables.
rota_filtros_up_bound : float, default=1.1
The upper bound to be used in the `lp_define_or_constraint` function for
linking variables.
min_filters_active : int, default=6
Minimum quantity of filters that need to be working in parallel.
In other words:
.. math::
\\sum_{i=1}^{10} \\text{FUNC1_D@08FI-FL-827I-0i} \\geq \\text{min_filters_active}
Returns
-------
pulp.LpProblem
The modified linear programming problem with added constraints.
Examples
--------
>>> prob = pulp.LpProblem("Example_Problem", pulp.LpMaximize)
>>> prob = link_rota_func_filtros(prob)
This adds constraints to 'prob', linking predefined pairs of binary and
continuous variables, and optionally applies scaling to the continuous
variables.
Notes
-----
If a scaler name corresponding to a certain filter rotation variable is not
found, the variable won't get re-scaled back to its original range,
even though the :param:`scalers` dictionary was provided.
"""
features_to_link = [{
"bin_var": func_tag,
"lpvar": rota_tag
} for func_tag, rota_tag in zip(FILTERS_FUNC_TAGS, ROTA_FILTERS_TAGS)]
lp_vars = prob.variablesDict()
bin_lpvars = []
for feature in features_to_link:
bin_feature_name = feature["bin_var"]
lp_feature_name = feature["lpvar"]
bin_var_name = (
bin_feature_name
.replace("-", "_")
.replace(" ", "_")
.replace("+", "_")
.replace("/", "_")
)
lp_var_name = (
lp_feature_name.replace("-", "_")
.replace(" ", "_")
.replace("+", "_")
.replace("/", "_")
)
bin_lpvar = lp_vars.get(bin_var_name)
cont_lpvar = lp_vars.get(lp_var_name)
if bin_lpvar is not None and cont_lpvar is not None:
prob = lp_multiply(bin_lpvar, cont_lpvar, prob)
lp_define_or_constraint(
prob,
cont_lpvar,
bin_lpvar,
rota_filtros_low_bound,
rota_filtros_up_bound,
)
bin_lpvars.append(bin_lpvar)
else:
logger.error(
"Failed to link variables: '%s' and '%s'",
bin_feature_name, lp_feature_name,
)
if len(bin_lpvars) == 0:
raise ValueError("No filter variables found.")
if len(bin_lpvars) < min_filters_active:
raise ValueError(
"The specified number of 'ROTA1_I@08FI-FL-827I-XXM1' tags that "
"should be working in parallel is greater than the amount of "
"filter variables found. Number of filters that should be "
f"working in parallel: >={min_filters_active}. "
f"Number of filters found: {len(bin_lpvars)}", )
prob += pulp.lpSum(bin_lpvars) >= min_filters_active
return prob
[docs]def link_nro_filtros_func_lpvars(
prob: pulp.LpProblem,
func_tags: List[str] | None = None,
n_filters_tag: str | None = None,
) -> pulp.LpProblem:
if func_tags is None:
func_tags = FILTERS_FUNC_TAGS
if n_filters_tag is None:
n_filters_tag = "NUM_FILTR_FUNC___US8"
lpvars = prob.variablesDict()
if n_filters_tag not in lpvars:
pattern = "FILTR"
matches = [name for name in lpvars.keys() if pattern in name]
if len(matches) <= 0:
logger.error(
"LP variable '%s' not found. Skipping constraint "
"creation.",
n_filters_tag,
)
return prob
n_filters_tag = matches[0]
logger.warning(
"Input 'NÂș FILTR FUNC - US8' tag LP variable not "
"found. Using '%s' variable instead.",
n_filters_tag,
)
lp_sum_func_tags = pulp.lpSum(
[lpvars[pulp.LpVariable(func_tag).name] for func_tag in func_tags]
)
prob += lp_sum_func_tags == lpvars[n_filters_tag]
return prob
[docs]def filters_vacuum_bombs_relationship(
prob: pulp.LpProblem,
func_filters: List[str] = None,
func_vacuum_bombs: List[str] = None,
) -> pulp.LpProblem:
if func_filters is None:
func_filters = FILTERS_FUNC_TAGS
if func_vacuum_bombs is None:
func_vacuum_bombs = FUNC_VACUUM_BOMBS
lpvars = prob.variablesDict()
prob += (
pulp.lpSum([
lpvars[pulp.LpVariable(vacuum_func).name]
for vacuum_func in FUNC_VACUUM_BOMBS
]) <= 1.2 * pulp.lpSum([
lpvars[pulp.LpVariable(filter_func).name]
for filter_func in FILTERS_FUNC_TAGS
]),
"max_FUNC_FILTROS_BOMBAS_VACUO_RATIO",
)
prob += (
pulp.lpSum([
lpvars[pulp.LpVariable(vacuum_func).name]
for vacuum_func in FUNC_VACUUM_BOMBS
]) >= 0.85 * pulp.lpSum([
lpvars[pulp.LpVariable(filter_func).name]
for filter_func in FILTERS_FUNC_TAGS
]),
"min_FUNC_FILTROS_BOMBAS_VACUO_RATIO",
)
return prob
[docs]def constraint_taxa_alimentacao_disco(
prob: pulp.LpProblem,
lb_value: int = 90,
ub_value: int = 140,
) -> pulp.LpProblem:
"""
Apply constraints to ensure specific variables are zero or within bounds.
This function enforces constraints on a set of variables within a linear
programming problem to ensure that each variable `PESO1_I@08PE-BW-840I-XXM1`
is set to 0 if its corresponding binary variable `FUNC1_D@08PE-BD-840I-XXM1`
is 0. Otherwise, it ensures that the `PESO1_I` variables are within specified
lower and upper bounds.
Parameters
----------
prob : pulp.LpProblem
The linear programming problem instance to which the constraints will be added.
lb_value : int, default=90
The lower-bound value for the `PESO1_I` variables when their
corresponding `FUNC1_D` variables are 1.
ub_value : int, default=140
The upper-bound value for the `PESO1_I` variables when their
corresponding `FUNC1_D` variables are 1.
Returns
-------
pulp.LpProblem
The modified LP problem instance with the new constraints added.
Notes
-----
- The function assumes that the `scalers` dictionary's keys correctly match
the `PESO1_I` variable names and that each scaler has an `inverse_transform`
method to rescale bounds.
- The function modifies the `prob` object directly by adding constraints to it.
"""
func_cols = [f"FUNC1_D@08PE-BD-840I-{idx:02d}M1" for idx in range(1, 13)]
peso_cols = [f"PESO1_I@08PE-BW-840I-{idx:02d}M1" for idx in range(1, 13)]
lpvars = prob.variablesDict()
for func_col, peso_col in zip(func_cols, peso_cols):
peso_lpvar = lpvars[pulp.LpVariable(peso_col).name]
func_lpvar = lpvars[pulp.LpVariable(func_col).name]
func_lpvar.cat = pulp.LpBinary
lp_define_or_constraint(prob, peso_lpvar, func_lpvar, lb_value, ub_value)
return prob
[docs]def fix_grupos_de_queima_limits(prob: pulp.LpProblem, scalers: Dict[str, MinMaxScaler]):
"""
Ensures subsequent variables have intersecting limits in an LP problem.
This function adjusts the upper limit of predefined variables
within a linear programming (LP) problem to ensure that for each pair of
subsequent variables, their limits intersect. Specifically, it focuses on
variables named `TEMP1_I@08QU-QU-855I-GQXX` for `XX` in the range 9 to 16,
adjusting the upper limit of the first variable in the pair to match the
lower limit of the second if they do not intersect. This is relevant in
scenarios such as optimizing temperature control processes where continuity
and overlap in operational ranges are required.
Parameters
----------
prob : pulp.LpProblem
The linear programming problem instance containing the variables to be
adjusted.
scalers : Dict[str, MinMaxScaler]
A dictionary mapping variable names to scaler objects. These scalers
are used to transform the bounds of the variables to and from a
standardized scale.
Returns
-------
pulp.LpProblem
The modified LP problem instance with adjusted variable limits.
Notes
-----
This function directly modifies the `prob` object passed to it, adjusting
the upper and lower bounds of specific variables based on the provided
scalers. It is specifically designed for variables with names following the
pattern `TEMP1_I@08QU-QU-855I-GQXX` where `XX` ranges from 09 to 16.
"""
lpvars = prob.variablesDict()
for idx in range(9, 16):
first_gq_name = f"TEMP1_I@08QU-QU-855I-GQ{idx:02d}"
second_gq_name = f"TEMP1_I@08QU-QU-855I-GQ{idx + 1:02d}"
first_gq = lpvars[pulp.LpVariable(first_gq_name).name]
second_gq = lpvars[pulp.LpVariable(second_gq_name).name]
first_gq_scaler = scalers[first_gq_name]
second_gq_scaler = scalers[second_gq_name]
lb_first_gq, ub_first_gq = first_gq.lowBound, first_gq.upBound
lb_second_gq, ub_second_gq = second_gq.lowBound, second_gq.upBound
lb_first_gq, ub_first_gq = first_gq_scaler.inverse_transform(
[[lb_first_gq], [ub_first_gq]]
).reshape(-1)
lb_second_gq, ub_second_gq = second_gq_scaler.inverse_transform(
[[lb_second_gq], [ub_second_gq]]
).reshape(-1)
if lb_second_gq > ub_first_gq:
ub_first_gq = first_gq_scaler.transform([[lb_second_gq]]).reshape(-1)[0]
first_gq.upBound = ub_first_gq
return prob