Source code for wip.modules.constraints

"""
Module defines the optimization problem constraints.

"""
from __future__ import annotations

from typing import Dict
from typing import List

import numpy as np
import pandas as pd
import pulp
import sklearn.preprocessing
from sklearn.preprocessing import MinMaxScaler

import wip.modules.ops as operations
from wip.constants import FILTERS_FUNC_TAGS
from wip.constants import FUNC_VACUUM_BOMBS
from wip.constants import ROTA_FILTERS_TAGS
from wip.constants import constants
from wip.constants import critical_cols_dict
from wip.datatools.ml_filters import find_models_by_tag
from wip.files.complex_constraints import complex_constraints
from wip.files.general_constraints import general_constraints
from wip.files.range_complex_constraints import range_complex_constraints
from wip.files.range_constraints import range_constraints
from wip.files.variable_constraints import variable_constraints
from wip.logging_config import logger
from wip.modules import solver_ops as solver_operations
from wip.modules.limits import Limits
from wip.modules.lparray import lp_define_or_constraint
from wip.modules.lparray import lp_multiply


[docs]class Constraints: """ A class for defining and manipulating linear programming constraints. Methods ------- add_constraint(constraint: pulp.LpConstraint) -> None: Add a new constraint to the list of constraints. remove_constraint(constraint: pulp.LpConstraint) -> None: Remove a constraint from the list of constraints. get_constraint_names() -> List[str]: Get a list of all the constraints' names as the list. get_constraint_values(variables: List[pulp.LpVariable]) -> List[float]: Get a list values from all constraints in a list of variable values. get_constraint_coefficients(variables: List[pulp.LpVariable]) -> List[List[float]]: Get a list of coefficients from all constraints in a list of variable values. get_constraint_bounds() -> List[Tuple[float, float]]: Get a list of lower and upper bounds for all constraints. set_constraint_bounds(bounds: List[Tuple[float, float]]) -> None: Set the lower and upper bounds of all constraints in the list. set_constraint_coefficients(coefficients: List[List[float]]) -> None: Set the coefficients of all constraints in the list. set_constraint_values(values: List[float]) -> None: Set the values of all constraints in the list. """
[docs] @staticmethod def write_feature_constraints(feature, file, lmin, lmax): """Write lower and upper bound values for a feature (tag). Parameters ---------- feature : str The name of the tag to define the lower- and upper-bounds. file : str | TextIO File to write the feature's lower- and upper-bounds to. lmin : int | float | Tuple[float, float] Lower-bound value of the feature. lmax : int | float | Tuple[float, float] Upper-bound value of the feature. """ solver_operations.write_constraint(file, f"{feature}_limit_min", [(feature, 1), -lmin, ("GTE", 0)]) solver_operations.write_constraint(file, f"{feature}_limit_max", [(feature, 1), -lmax, ("LTE", 0)])
[docs] @staticmethod def parse_data(data): return { key: [ tuple(operation) if isinstance(operation, list) else operation for operation in value ] for key, value in data.items() }
[docs] @staticmethod def write_simple_constraints(file): """Write constraints that are constant and each range""" new_data = Constraints.parse_data(general_constraints) for constraint, operation in new_data.items(): solver_operations.write_constraint(file, constraint, operation)
[docs] @staticmethod def write_targets_limits(file, datasets, features_limits): """ Write the constraints created using each model target. removing those that are already defined """ targets = [df.columns[-1] for model_name, df in datasets.items()] targets_write = list(filter(lambda x: x not in features_limits, targets)) select_dataset = { target: list(filter(lambda x: target in datasets[x], datasets)) for target in set(targets_write) } for target in set(targets_write): values = datasets[select_dataset[target][0]][target] if target == "0": continue if target in constants.TARGETS_IN_MODEL.keys(): new_target = constants.TARGETS_IN_MODEL[target] Constraints.write_feature_constraints(new_target, file, min(values), max(values)) Constraints.write_feature_constraints(target, file, min(values), max(values))
[docs] @staticmethod def define_range_terms(range_terms, scalers): range_constraints = solver_operations.define_range_constraints if not isinstance(range_terms, list): range_terms = [range_terms] all_terms = [] for terms in range_terms: step = 1 if "step" in terms.keys(): step = terms["step"] parsed_terms = range_constraints( terms["feature"], terms["start"], terms["end"] + 1, step ) norm_features = [] if "norm_feature" in terms.keys(): norm_features = range_constraints( terms["norm_feature"], terms["start"], terms["end"] + 1, step, ) term = terms.copy() for index, feature in enumerate(parsed_terms): term["feature"] = feature if norm_features: _, new_coef = Constraints.measure_new_coef( term, scalers, norm_features[index] ) else: _, new_coef = Constraints.measure_new_coef(term, scalers) all_terms.append((feature, new_coef)) return all_terms
[docs] @staticmethod def write_simple_range_terms(file, scalers, features_limits): """ opera com range_constraints """ range_terms = range_constraints for constraint_name, terms in range_terms.items(): terms = Constraints.parse_complex_constraints( terms, features_limits, scalers ) solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod def parse_complex_constraints( terms, features_limits, scalers, range_min=None, range_max=None ): """ Parse complex constraints, where there is a function in the feature. Parameters ---------- terms : list A list of dictionaries, where each dictionary represents a term in the constraint. features_limits : dict A dictionary containing the minimum and maximum values for each feature. scalers : dict A dictionary containing the `MinMaxScaler` objects for each feature. range_min : float, default=None The minimum range value for the constraint. range_max : float, default=None The maximum range value for the constraint. Returns ------- list A list of tuples, where each tuple represents a term in the constraint. """ simple_constraint = [] operator = None for term in terms: # each term is a map # If the condition is met, then it represents a simple constraint. if ( len(term.keys()) == 2 and "feature" in term.keys() and "coef" in term.keys() ): simple_constraint.append(tuple(term.values())) # If the condition is met, then it represents an operator elif "limit" not in term.keys() and "operator" in term.keys(): operator = tuple(term.values()) elif "limit" in term.keys() and "operator" not in term.keys(): limit = Constraints.define_term_limit( term, features_limits, range_min, range_max ) simple_constraint.append((term["feature"], limit)) elif "limit" in term.keys() and "operator" in term.keys(): limit = Constraints.define_term_limit( term, features_limits, range_min, range_max ) if term["operator"] == "norm": method_operator = operations.normalize_feature simple_constraint.append( ( term["coef"] * method_operator(scalers, term["feature"], limit) ) ) elif "start" in term.keys() and "end" in term.keys(): simple_constraint.extend(Constraints.define_range_terms(term, scalers)) if operator is not None: simple_constraint.append(operator) return simple_constraint
[docs] @staticmethod def define_term_limit(term, features_limits, range_min, range_max): """Define the limit for a term in a constraint. Parameters ---------- term : dict A dictionary representing a term in a constraint. features_limits : dict A dictionary containing the minimum and maximum values for each feature. range_min : float The minimum range value for the constraint. range_max : float The maximum range value for the constraint. Returns ------- float The limit for the term in the constraint. """ if term["limit"] == "fmin": return range_min if term["limit"] == "fmax": return range_max return features_limits[term["feature"]][term["limit"]]
[docs] @staticmethod def write_variable_constraints( file, features_limits, scalers, range_min, range_max ): """ Write constraints that are variable for each range (complex constraints). Parameters ---------- file : str | Path The file to write the constraints to. features_limits : dict A dictionary containing the minimum and maximum values for each feature. scalers : dict A dictionary containing the `MinMaxScaler` objects for each feature. range_min : Optional[Union[int, float]] The minimum range value for the constraint. range_max : Optional[Union[int, float]] The maximum range value for the constraint. """ constraints = variable_constraints constraints_temp = constraints.copy() for k, constraint in constraints.items(): for sentence in constraint: if ( "feature" in sentence and sentence["feature"] not in features_limits.keys() ): constraints_temp.pop(k) break constraints = constraints_temp for constraint, terms in constraints.items(): parsed_terms = Constraints.parse_complex_constraints( terms, features_limits, scalers, range_min, range_max ) solver_operations.write_constraint(file, constraint, parsed_terms)
[docs] @staticmethod def write_special_constraints(file, scalers): """Write special constraints, specific to each production plant.""" # Constraints.write_compressao_min_lim(file) Constraints.write_calcario_equal(file, scalers)
# Constraints.write_mill_feed_rate(file, scalers) # Constraints.write_gran_ocs_tm_equality(file, scalers) # Constraints.write_gran_ocs_tm_min(file, scalers)
[docs] @staticmethod def write_vent_rotation(file, scalers, range_max): vent_rotation_token = "ROTA1_I@08QU-PF-852I-{:02}M1".format increment = (int(range_max) - 700) / 50 constraint_sufix = "_gte_500" constraint = [] value = 1 token = vent_rotation_token(value) constraint.append( ( vent_rotation_token(value), scalers[vent_rotation_token(value)].data_range_[0], ) ) constraint.append( ( "GT", operations.normalize_feature(scalers, token, (500 + increment * 15)) - scalers[vent_rotation_token(value)].data_min_[0], ) ) solver_operations.write_constraint( file, vent_rotation_token(value) + constraint_sufix, constraint ) constraint_sufix = "_gte_400" constraint = [] value = 2 token = vent_rotation_token(value) constraint.append( ( vent_rotation_token(value), scalers[vent_rotation_token(value)].data_range_[0], ) ) constraint.append( ( "GT", operations.normalize_feature(scalers, token, (380 + increment * 5)) - scalers[vent_rotation_token(value)].data_min_[0], ) ) solver_operations.write_constraint( file, vent_rotation_token(value) + constraint_sufix, constraint ) constraint_sufix = "_gte_400" constraint = [] value = 3 token = vent_rotation_token(value) constraint.append( ( vent_rotation_token(value), scalers[vent_rotation_token(value)].data_range_[0], ) ) constraint.append( ( "GT", operations.normalize_feature(scalers, token, (380 + increment * 5)) - scalers[vent_rotation_token(value)].data_min_[0], ) ) solver_operations.write_constraint( file, vent_rotation_token(value) + constraint_sufix, constraint ) constraint_sufix = "_gte_320" constraint = [] value = 4 token = vent_rotation_token(value) constraint.append( ( vent_rotation_token(value), scalers[vent_rotation_token(value)].data_range_[0], ) ) constraint.append( ( "GT", operations.normalize_feature(scalers, token, (320 + increment * 15)) - scalers[vent_rotation_token(value)].data_min_[0], ) ) solver_operations.write_constraint( file, vent_rotation_token(value) + constraint_sufix, constraint ) constraint_sufix = "_gte_515" constraint = [] value = 7 token = vent_rotation_token(value) constraint.append( ( vent_rotation_token(value), scalers[vent_rotation_token(value)].data_range_[0], ) ) constraint.append( ( "GT", operations.normalize_feature(scalers, token, (515 + increment * 5)) - scalers[vent_rotation_token(value)].data_min_[0], ) ) solver_operations.write_constraint( file, vent_rotation_token(value) + constraint_sufix, constraint ) constraint_sufix = "_gte_560" constraint = [] value = 8 token = vent_rotation_token(value) constraint.append( ( vent_rotation_token(value), scalers[vent_rotation_token(value)].data_range_[0], ) ) constraint.append( ( "GT", operations.normalize_feature(scalers, token, (560 + increment * 5)) - scalers[vent_rotation_token(value)].data_min_[0], ) ) solver_operations.write_constraint( file, vent_rotation_token(value) + constraint_sufix, constraint )
[docs] @staticmethod def write_gran_ocs_tm_equality(file: str, scalers: Dict[str, MinMaxScaler]): # Equidade das granulometrias como solicitado pelo Rodrigo tag_base = "GRAN_OCS_TM@08PE-BD-840I-10" for tag_gran in [ tag_name for tag_name in scalers.keys() if "GRAN_OCS_TM@08PE-BD-840I-" in tag_name and "10" not in tag_name ]: constraint_name = f"{tag_base}_igual_{tag_gran}" bm = scalers[tag_base].data_min_[0] - scalers[tag_gran].data_min_[0] terms = [ (tag_base, -scalers[tag_base].data_range_[0]), (tag_gran, scalers[tag_gran].data_range_[0]), ("E", bm), ] solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod def write_gran_ocs_tm_min(file, scalers): # Equidade das granulometrias como solicitado pelo rodrigo for tag_gran in [ tag for tag in scalers.keys() if "GRAN_OCS_TM@08PE-BD-840I-" in tag ]: constraint_name = f"{tag_gran}_GT_than13" bm = -1 * scalers[tag_gran].data_min_[0] terms = [ (tag_gran, scalers[tag_gran].data_range_[0]), ("GTE", 13 + bm), ] solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod def write_compressao_min_lim(file): """ Define the minimum allowable compression value. Original Tag name: `"COMP_MCOMP_PQ_L@08QU"` Default lower-bound value: 290 The function tries to get the lower-bound value from the `wip.constants.critical_cols_dict` dictionary. It searches for the key "COMP_MCOMP_PQ_L@08QU", and if not found, it tries searching for the key "compressao". If none of the keys are found, then the default value of 290 is used. Parameters ---------- file : File The constraints' file to write constraint to. scalers : Dict[str, sklearn.preprocessing.MinMaxScaler] Dictionary of scalers fitted to each tag, during the predictive models' creation. """ compressao_min = critical_cols_dict.get( "COMP_MCOMP_PQ_L@08QU", critical_cols_dict.get("compressao", {"lmin": 270}) )["lmin"] terms = [] constraint_name = "compressao_min" terms.append(("COMP_MCOMP_PQ_L@08QU", 1)) terms.append(("GTE", compressao_min)) solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod def write_calcario_equal(file, scalers): bm = (scalers["PESO1_I@08MO-BW-821I-01M1"].data_range_[0] + scalers["PESO1_I@08MO-BW-821I-02M1"].data_range_[0] + scalers["PESO1_I@08MO-BW-821I-03M1"].data_range_[0]) bm *= 0.8762 * scalers["calcario"].data_range_[0] bm = operations.normalize_feature(scalers, "PESO1_I@08MO-BW-813I-01M1", bm) constraint_name = "calcario_equality" terms = [("calcario", -bm), ("PESO1_I@08MO-BW-813I-01M1", 1), ("GTE", 0)] solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod def write_mill_feed_rate(file, scalers): # Taxa de alimentação do moinho constraint_name = "taxa_alimentacao_moinho_vs_producao" aproveitamento_massa_moinho = 0.8 # 0.8762 terms = [] for i in range(1, 4): v_func = "FUNC1_D@08MO-BW-821I-{:02}M1".format( i) # v_func = 'FUNC1_D@08MO-MO-821I-{:02}M1'.format(i) v_taxa = "PESO1_I@08MO-BW-821I-{:02}M1".format( i) # PESO1_I@08MO-BW-821I-{:02}M1 coef = aproveitamento_massa_moinho * scalers[v_taxa].data_range_[0] terms.append(( v_func, -aproveitamento_massa_moinho * scalers[v_taxa].data_min_[0], )) terms.append((v_taxa, coef)) terms.append( ("PROD_PQ_Y@08US", -scalers["PROD_PQ_Y@08US"].data_range_[0])) terms.append(("E", scalers["PROD_PQ_Y@08US"].data_min_[0])) solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod def generic_term_writing( file, first_token, second_token, first_coef, second_coef, operator, final_coef, commom_token=True, constraint_name=None, ): terms = [] if commom_token: terms.append((second_token, first_coef)) terms.append((second_token, second_coef)) terms.append((operator, final_coef)) solver_operations.write_constraint(file, first_token, terms) else: terms.append((first_token, first_coef)) terms.append((second_token, second_coef)) terms.append((operator, final_coef)) solver_operations.write_constraint(file, constraint_name, terms)
[docs] @staticmethod def measure_new_coef(term, scalers, norm_feature=None): feature = term["feature"] new_coef = 1 if "operator" in term.keys(): if term["operator"] == "norm": if norm_feature: feature = norm_feature new_coef = operations.normalize_feature( scalers, feature, term["limit"]) elif term["operator"] == "scaler": if term["position"] == "range": new_coef = scalers[feature].data_range_[0] elif term["position"] == "min": new_coef = scalers[feature].data_min_[0] else: # it's max new_coef = scalers[feature].data_max_[0] new_coef = term["coef"] * new_coef return term, new_coef
[docs] @staticmethod def define_operator_term(terms): _operations = ["LT", "LTE", "GTE", "GT", "E"] condition_met = False _index = None operator = (None, None) for index, term in enumerate(terms): condition_one = any(operation in term.values() for operation in _operations) if condition_one and "operation" not in term.keys(): operator = (term["operator"], term["coef"]) condition_met = True _index = index break if condition_met: terms.pop(_index) return terms, operator
[docs] @staticmethod def parse_range_complex_constraints(file, scalers): constraints = range_complex_constraints constraints = Constraints.parse_data(constraints) for constraint_name, terms in constraints.items(): # defining the constraint name start, end = terms[0]["start"], terms[0]["end"] + 1 constraint_name = constraint_name.format constraints_names = [ constraint_name(value) for value in range(start, end) ] terms, operator = Constraints.define_operator_term(terms) range_terms = list( map(lambda term: Constraints.define_range_terms(term, scalers), terms)) composed_terms = [] for term_index in range(0, len(range_terms[0])): composed_terms.append( list(map(lambda term: term[term_index], range_terms))) for constraint_name, terms in zip(constraints_names, composed_terms): terms = list(terms) terms.append(operator) solver_operations.write_constraint(file, constraint_name, tuple(terms))
[docs] @staticmethod def write_complex_constraints(file, scalers): constraints = complex_constraints for constraint_name, terms in constraints.items(): new_terms = Constraints.parse_type_complex_terms( constraint_name, terms, scalers) solver_operations.write_constraint(file, constraint_name, new_terms)
[docs] @staticmethod def parse_type_complex_terms(constraint_name, terms, scalers): operations = ["LT", "LTE", "GTE", "GT", "E"] new_terms, operator = [], [] right_terms = None for term in terms: if "start" in term.keys() and "end" in term.keys(): new_terms.extend(Constraints.define_range_terms(term, scalers)) elif any(operation in term.values() for operation in operations): operator = term.copy() elif "terms" in term.keys(): # complex factor after the operator right_terms = Constraints.parse_type_complex_terms( constraint_name, term["terms"], scalers) elif "operator" in term.keys() and "position" in term.keys(): _, new_coef = Constraints.measure_new_coef(term, scalers) new_terms.append((term["feature"], new_coef)) else: # it's a static feature new_terms.append((term["feature"], term["coef"])) if isinstance(operator, dict) and right_terms is not None: sum_coefs = sum([list(value)[1] for value in right_terms]) operator = (operator["operator"], operator["coef"] * sum_coefs) new_terms.append(operator) return new_terms
def write_media_pres_constraints(file: str, scalers: Dict[str, MinMaxScaler]): """ Write media pressure and temperature constraints to a given file. This function writes constraints to the file, when the following tags exist in the `scalers` dictionary: - 'media pres 1' - 'media pres 2' - 'media pres 3' - 'media pres 4' - 'media temp 1' - 'media temp 2' - 'media temp 3' - 'media temp 4' Parameters ---------- file : str The file path to which the constraints are written. scalers : Dict[str, MinMaxScaler] A dictionary where keys represent constraint names and values represent associated `MinMaxScaler` objects. See Also -------- solver_operations.write_constraint : The method used to write constraints to the file. Notes ----- The function makes use of a hard-coded mapping dictionary for media pressure and temperature constraints. Each mapping associates a constraint name with a list of identifiers. """ media_mappings = { 'media pres 1': [ "PRES1_I@08QU-WB-851I-01", "PRES1_I@08QU-WB-851I-02", "PRES1_I@08QU-WB-851I-03A", ], 'media press 2': [ "PRES1_I@08QU-WB-851I-03B", "PRES1_I@08QU-WB-851I-04", "PRES1_I@08QU-WB-851I-05", "PRES1_I@08QU-WB-851I-06", ], 'media press 3': [ "PRES1_I@08QU-WB-851I-06", "PRES1_I@08QU-WB-851I-07", "PRES1_I@08QU-WB-851I-08", "PRES1_I@08QU-WB-851I-09", ], 'media press 4': [ "PRES1_I@08QU-WB-851I-09", "PRES1_I@08QU-WB-851I-10", "PRES1_I@08QU-WB-851I-11", "PRES1_I@08QU-WB-851I-12", "PRES1_I@08QU-WB-851I-13", "PRES1_I@08QU-WB-851I-14", "PRES1_I@08QU-WB-851I-15", "PRES1_I@08QU-WB-851I-16", "PRES1_I@08QU-WB-851I-17", "PRES1_I@08QU-WB-851I-18", "PRES1_I@08QU-WB-851I-19", ], 'media temp 1': [ "TEMP1_I@08QU-WB-851I-01", "TEMP1_I@08QU-WB-851I-02", "TEMP1_I@08QU-WB-851I-03", ], 'media temp 2': [ "TEMP1_I@08QU-WB-851I-03B", "TEMP1_I@08QU-WB-851I-04", "TEMP1_I@08QU-WB-851I-05", "TEMP1_I@08QU-WB-851I-06", ], 'media temp 3': [ "TEMP1_I@08QU-WB-851I-06", "TEMP1_I@08QU-WB-851I-07", "TEMP1_I@08QU-WB-851I-08", "TEMP1_I@08QU-WB-851I-09", ], 'media temp 4': [ "TEMP1_I@08QU-WB-851I-09", "TEMP1_I@08QU-WB-851I-10", "TEMP1_I@08QU-WB-851I-11", "TEMP1_I@08QU-WB-851I-12", "TEMP1_I@08QU-WB-851I-13", "TEMP1_I@08QU-WB-851I-14", "TEMP1_I@08QU-WB-851I-15", "TEMP1_I@08QU-WB-851I-16", "TEMP1_I@08QU-WB-851I-17", "TEMP1_I@08QU-WB-851I-18", "TEMP1_I@08QU-WB-851I-19", "TEMP1_I@08QU-WB-851I-20", ], } media_columns = [name for name in scalers if name in media_mappings] for constraint_name in media_columns: columns = media_mappings[constraint_name] terms = [(column, 1 / len(columns)) for column in columns] terms.extend(((constraint_name, -1), 0, ("E", 0))) solver_operations.write_constraint(file, f"{constraint_name}_equality", terms)
[docs]def add_energia_pensa_quantile_constraint(prob, datasets, prod_range, lb_limit=0.25, ub_limit=0.75): """ Add constraint to optimization that limits models target tags using quantiles. Function constraints the following tags: - `"energia_prensa"` - `"energia_moinho"` - `"relacao_gran"` - `"gas"` Parameters ---------- prob : pulp.LpProblem The problem to which the constraint is added. datasets : Dict[str, pd.DataFrame] A dictionary where keys represent model names and values represent the corresponding datasets as `pandas.DataFrame`. prod_range : int The production range to which the constraint is applied. lb_limit : float, default=0.25 The lower-bound limit of the constraint. ub_limit : float, default=0.75 The upper-bound limit of the constraint. Notes ----- The `quantile_limits` module does not work for some of the models' target variables. Therefore, this function was created to add the quantile constraint to tags that represent target variables that are not supported by the `quantile_limits` module. .. versionchanged:: 2.10.0 Removed from function the lower- and upper-bound definitions for tag "compressao" """ series = prepare_series(datasets, "energia_prensa", prod_range) lpvar = prob.variablesDict()["energia_prensa"] lpvar.lowBound = series.quantile(lb_limit) lpvar.upBound = series.quantile(ub_limit) series = prepare_series(datasets, "energia_moinho", prod_range) lpvar = prob.variablesDict()["energia_moinho"] lpvar.lowBound = series.quantile(lb_limit) lpvar.upBound = series.quantile(ub_limit) series = prepare_series(datasets, "relacao gran", prod_range) lpvar = prob.variablesDict()["relacao_gran"] lpvar.lowBound = series.quantile(lb_limit) lpvar.upBound = series.quantile(ub_limit) series = prepare_series(datasets, "gas", prod_range) lpvar = prob.variablesDict()["gas"] lpvar.lowBound = series.quantile(min(lb_limit, 0.1)) lpvar.upBound = series.quantile(min(max(ub_limit, 0.9), 1))
[docs]def prepare_series(datasets: Dict[str, pd.DataFrame], model_name: str, prod_range: int) -> pd.Series: """Prepare a series to be used in the quantile constraint. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys represent model names and values represent the corresponding datasets as `pandas.DataFrame`. model_name : str The name of the model to be used. prod_range : int The production range to which the constraint is applied. Returns ------- pd.Series The series to be used in the quantile constraint. """ prod_pq = "PROD_PQ_Y@08US" df = datasets[model_name] target_col = df.columns[-1] if prod_pq not in df.columns: prod_pq_series = ( pd.concat([ datasets[model_name][prod_pq].to_frame(prod_pq) for model_name in find_models_by_tag(prod_pq, datasets) ]) .sort_index() .drop_duplicates() ) df = ( df[target_col] .to_frame(target_col) .merge(prod_pq_series, left_index=True, right_index=True, how="inner") ) else: df = df[[target_col, prod_pq]] return df.loc[ lambda xdf: ((xdf[prod_pq] >= prod_range) & (xdf[prod_pq] < prod_range + 50)), target_col, ]
[docs]def filters_vacuum_bombs_relationship( prob: pulp.LpProblem, func_filters: List[str] = None, func_vacuum_bombs: List[str] = None, ) -> pulp.LpProblem: if func_filters is None: func_filters = FILTERS_FUNC_TAGS if func_vacuum_bombs is None: func_vacuum_bombs = FUNC_VACUUM_BOMBS lpvars = prob.variablesDict() prob += ( pulp.lpSum([ lpvars[pulp.LpVariable(vacuum_func).name] for vacuum_func in FUNC_VACUUM_BOMBS ]) <= 1.2 * pulp.lpSum([ lpvars[pulp.LpVariable(filter_func).name] for filter_func in FILTERS_FUNC_TAGS ]), "max_FUNC_FILTROS_BOMBAS_VACUO_RATIO", ) prob += ( pulp.lpSum([ lpvars[pulp.LpVariable(vacuum_func).name] for vacuum_func in FUNC_VACUUM_BOMBS ]) >= 0.85 * pulp.lpSum([ lpvars[pulp.LpVariable(filter_func).name] for filter_func in FILTERS_FUNC_TAGS ]), "min_FUNC_FILTROS_BOMBAS_VACUO_RATIO", ) return prob
[docs]def constraint_taxa_alimentacao_disco( prob: pulp.LpProblem, lb_value: int = 90, ub_value: int = 140, ) -> pulp.LpProblem: """ Apply constraints to ensure specific variables are zero or within bounds. This function enforces constraints on a set of variables within a linear programming problem to ensure that each variable `PESO1_I@08PE-BW-840I-XXM1` is set to 0 if its corresponding binary variable `FUNC1_D@08PE-BD-840I-XXM1` is 0. Otherwise, it ensures that the `PESO1_I` variables are within specified lower and upper bounds. Parameters ---------- prob : pulp.LpProblem The linear programming problem instance to which the constraints will be added. lb_value : int, default=90 The lower-bound value for the `PESO1_I` variables when their corresponding `FUNC1_D` variables are 1. ub_value : int, default=140 The upper-bound value for the `PESO1_I` variables when their corresponding `FUNC1_D` variables are 1. Returns ------- pulp.LpProblem The modified LP problem instance with the new constraints added. Notes ----- - The function assumes that the `scalers` dictionary's keys correctly match the `PESO1_I` variable names and that each scaler has an `inverse_transform` method to rescale bounds. - The function modifies the `prob` object directly by adding constraints to it. """ func_cols = [f"FUNC1_D@08PE-BD-840I-{idx:02d}M1" for idx in range(1, 13)] peso_cols = [f"PESO1_I@08PE-BW-840I-{idx:02d}M1" for idx in range(1, 13)] lpvars = prob.variablesDict() for func_col, peso_col in zip(func_cols, peso_cols): peso_lpvar = lpvars[pulp.LpVariable(peso_col).name] func_lpvar = lpvars[pulp.LpVariable(func_col).name] func_lpvar.cat = pulp.LpBinary lp_define_or_constraint(prob, peso_lpvar, func_lpvar, lb_value, ub_value) return prob
[docs]def fix_grupos_de_queima_limits(prob: pulp.LpProblem, scalers: Dict[str, MinMaxScaler]): """ Ensures subsequent variables have intersecting limits in an LP problem. This function adjusts the upper limit of predefined variables within a linear programming (LP) problem to ensure that for each pair of subsequent variables, their limits intersect. Specifically, it focuses on variables named `TEMP1_I@08QU-QU-855I-GQXX` for `XX` in the range 9 to 16, adjusting the upper limit of the first variable in the pair to match the lower limit of the second if they do not intersect. This is relevant in scenarios such as optimizing temperature control processes where continuity and overlap in operational ranges are required. Parameters ---------- prob : pulp.LpProblem The linear programming problem instance containing the variables to be adjusted. scalers : Dict[str, MinMaxScaler] A dictionary mapping variable names to scaler objects. These scalers are used to transform the bounds of the variables to and from a standardized scale. Returns ------- pulp.LpProblem The modified LP problem instance with adjusted variable limits. Notes ----- This function directly modifies the `prob` object passed to it, adjusting the upper and lower bounds of specific variables based on the provided scalers. It is specifically designed for variables with names following the pattern `TEMP1_I@08QU-QU-855I-GQXX` where `XX` ranges from 09 to 16. """ lpvars = prob.variablesDict() for idx in range(9, 16): first_gq_name = f"TEMP1_I@08QU-QU-855I-GQ{idx:02d}" second_gq_name = f"TEMP1_I@08QU-QU-855I-GQ{idx + 1:02d}" first_gq = lpvars[pulp.LpVariable(first_gq_name).name] second_gq = lpvars[pulp.LpVariable(second_gq_name).name] first_gq_scaler = scalers[first_gq_name] second_gq_scaler = scalers[second_gq_name] lb_first_gq, ub_first_gq = first_gq.lowBound, first_gq.upBound lb_second_gq, ub_second_gq = second_gq.lowBound, second_gq.upBound lb_first_gq, ub_first_gq = first_gq_scaler.inverse_transform( [[lb_first_gq], [ub_first_gq]] ).reshape(-1) lb_second_gq, ub_second_gq = second_gq_scaler.inverse_transform( [[lb_second_gq], [ub_second_gq]] ).reshape(-1) if lb_second_gq > ub_first_gq: ub_first_gq = first_gq_scaler.transform([[lb_second_gq]]).reshape(-1)[0] first_gq.upBound = ub_first_gq return prob