Source code for wip.temporary

"""
Module defines objects that are meant to be temporary.

"""

from __future__ import annotations

import math
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple

import joblib
import numpy as np
import openpyxl
import pandas as pd
import pulp
from dateutil.relativedelta import relativedelta
from openpyxl import Workbook
from openpyxl.chart import LineChart
from openpyxl.chart import Reference
from openpyxl.styles import Alignment
from openpyxl.styles import Font
from openpyxl.styles import PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.preprocessing import MinMaxScaler

from wip.constants import FINAL_DATASETS_FILEPATH
from wip.constants import RESULTADO_OTIMIZADOR_FILENAME
from wip.constants import constants
from wip.datatools.ml_filters import (filter_tag, get_column_values_from_datasets,
                                      get_most_recent_values, multi_merge)
from wip.datatools.ml_filters import find_models_by_tag
from wip.files.lp_denorm_constraints import denormalize_lpvar
from wip.logging_config import logger
from wip.utils import find_filepath


TEMPORARY_MODELS_TO_REMOVE = [
    "produtividade filtragem",
    "torque",
    "dens_moinho_1",
    "dens_moinho_2",
    "dens_moinho_3",
    # "relacao gran",
]
TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE = [
    *TEMPORARY_MODELS_TO_REMOVE,
    ("SE PP", "Calculo da Energia da Filtragem"),
]
TEMPORARY_SCALERS_TO_REMOVE = ['Calculo da Energia da Filtragem']


[docs]def date_select(series_obj: pd.Series, n_days: int = 30) -> pd.Series: """Select the last `n_days` days from a `pandas.Series`. Parameters ---------- series_obj : pd.Series The `pandas.Series` object to select the last `n_days` days. This series must contain datetime values as index. n_days : int, default=30 Thew number of days to select based on the last existing date. Returns ------- pd.Series The `pandas.Series`, with only the last `n_days` days. """ last_date = series_obj.index.max() first_date = last_date - relativedelta(days=n_days) return series_obj.loc[first_date:]
[docs]def compute_statistics(series_obj: pd.Series, target_names: List[str]) -> dict: """ Compute statistical measures for a given pandas Series. This function calculates various statistical measures for the provided pandas Series. It checks if the series index is of a datetime type and computes averages for different periods, standard deviation, mode, median, minimum, maximum, and percentiles. It also determines if the series name is in the target names list. Parameters ---------- series_obj : pd.Series The `pandas.Series` object for which statistics are to be calculated. The index must be of datetime type. target_names : List[str] List of target names to check if the series name is a target. Returns ------- dict A dictionary containing computed statistical values. Keys are statistical measures and values are their corresponding computed values. If certain conditions are not met, some values may be None. Raises ------ ValueError If the index of `series_obj` is not of a datetime type. Examples -------- >>> series = pd.Series([1, 2, 3], index=pd.date_range('20200101', periods=3)) >>> compute_statistics(series, ['target']) {'Average last 7 days': None, 'Average last 14 days': None, ...} """ # Ensure the index is datetime if not isinstance(series_obj.index, pd.DatetimeIndex): raise ValueError("Series index must be of datetime type") is_boolean = set(series_obj.unique()).issubset({0, 1}) is_empty = series_obj.empty is_target = series_obj.name in target_names stats = { "Average last 7 days": ( round(date_select(series_obj, 7).mean(), 2) if not is_boolean and not is_empty else None ), "Average last 14 days": ( round(date_select(series_obj, 14).mean(), 2) if not is_boolean and not is_empty else None ), "Average last 30 days": ( round(date_select(series_obj, 30).mean(), 2) if not is_boolean and not is_empty else None ), "Average last 60 days": ( round(date_select(series_obj, 60).mean(), 2) if not is_boolean and not is_empty else None ), "Average last 120 days": ( round(date_select(series_obj, 120).mean(), 2) if not is_boolean and not is_empty else None ), "Average all periods": ( round(series_obj.mean(), 2) if not is_boolean and not is_empty else None ), "Standard Deviation all periods": ( round(series_obj.std(), 2) if not is_boolean and not is_empty else None ), "Mode all periods": ( round(series_obj.mode().iloc[0], 2) if not is_empty else None ), "Median all periods": ( round(series_obj.median(), 2) if not is_boolean and not is_empty else None ), "Minimum all periods": round(series_obj.min(), 2) if not is_empty else None, "Maximum all periods": round(series_obj.max(), 2) if not is_empty else None, "2% Percentile": ( round(np.percentile(series_obj, 2), 2) if not is_boolean and not is_empty else None ), "5% Percentile": ( round(np.percentile(series_obj, 5), 2) if not is_boolean and not is_empty else None ), "10% Percentile": ( round(np.percentile(series_obj, 10), 2) if not is_boolean and not is_empty else None ), "25% Percentile": ( round(np.percentile(series_obj, 25), 2) if not is_boolean and not is_empty else None ), "50% Percentile": ( round(np.percentile(series_obj, 50), 2) if not is_boolean and not is_empty else None ), "75% Percentile": ( round(np.percentile(series_obj, 75), 2) if not is_boolean and not is_empty else None ), "95% Percentile": ( round(np.percentile(series_obj, 95), 2) if not is_boolean and not is_empty else None ), "98% Percentile": ( round(np.percentile(series_obj, 95), 2) if not is_boolean and not is_empty else None ), "Is target": is_target, } return stats
[docs]def compute_statistics_datasets(datasets: Dict[str, pd.DataFrame]) -> pd.DataFrame: """ Compute statistics for each column across multiple pandas DataFrames. Aggregates columns from multiple DataFrames provided in a dictionary and computes statistical measures for each unique column. The function iterates over all DataFrames, concatenates column values across them, and calculates statistics using `compute_statistics`. It handles columns with the same name across different DataFrames and computes overall statistics. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where keys are dataset names (or identifiers) and values are pandas DataFrames. The function expects the last column of each DataFrame to be the target column. Returns ------- pd.DataFrame A DataFrame where each row corresponds to a unique column from the input DataFrames and contains the computed statistics for that column. See Also -------- compute_statistics : Used to compute statistics for individual columns. Examples -------- >>> _datasets = { ... "dataset1": pd.DataFrame(...), ... "dataset2": pd.DataFrame(...) ... } >>> compute_statistics_datasets(_datasets) # DataFrame with computed statistics for each column across the datasets. Notes ----- This function is specifically designed to work with DataFrames where the last column is considered as the target. The statistics are computed for each column, considering their presence across multiple DataFrames. """ all_columns = {column for df in datasets.values() for column in df.columns} target_names = [df.columns[-1] for df in datasets.values()] df_stats = pd.DataFrame() for column in all_columns: column_values = ( pd.concat( [ datasets[model_name][column] for model_name in find_models_by_tag(column, datasets) ] ) .dropna() .drop_duplicates() .sort_index() ) column_stats = pd.DataFrame( compute_statistics(column_values, target_names), index=[column_values.name] ) df_stats = pd.concat([df_stats, column_stats]) return df_stats
[docs]def drop_models_results( models_results: Dict[str, list | dict], models_to_drop: List[str] | None = None, ) -> Dict[str, list | dict]: """ Remove specified models from the `models_results` dictionary. The function filters out specified models from the dictionary containing model results. If no models are specified, the function defaults to removing models from the global variable `TEMPORARY_MODELS_TO_REMOVE`. Parameters ---------- models_results : Dict[str, list | dict] Dictionary where keys are model names (str) and values are lists of model results. models_to_drop : List[str] | None, optional List of model names to be dropped from `models_results`. If not provided, the function defaults to using `TEMPORARY_MODELS_TO_REMOVE`. Returns ------- Dict[str, list | dict] Filtered dictionary of model results with specified models removed. Examples -------- >>> _models_results = {"model1": [1, 2, 3], "model2": [4, 5, 6], ... "model3": [7, 8, 9]} >>> drop_models_results(_models_results, ["model1", "model3"]) {"model2": [4, 5, 6]} """ models_to_drop = models_to_drop or TEMPORARY_MODELS_TO_REMOVE return { model_name: results for model_name, results in models_results.items() if model_name not in models_to_drop }
[docs]def drop_model_coefficients( model_coefficients: Dict[str, Dict[str, float | int]], coefficients_to_drop: List[str | Tuple[str, str]] | None = None, ) -> Dict[str, Dict[str, float | int]]: """ Remove specified coefficients from the given model coefficients. This function provides a way to delete specific coefficients from the model coefficients. If no specific coefficients are provided to remove, it will fall back to a set of temporary coefficients stored in `TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE`. Parameters ---------- model_coefficients : Dict[str, Dict[str, float | int]] The model coefficients to change. Keys are model names and values are dictionaries where keys are coefficient names and values are their respective values. coefficients_to_drop : List[str | Tuple[str, str]] | None, optional The coefficients to drop. Can either be a list of model names as strings or a list of tuples where the first element is the model name and the second is the coefficient name. If not provided, coefficients from `TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE` are removed. Returns ------- Dict[str, Dict[str, float | int]] The modified model coefficients dictionary after dropping the specified coefficients. Examples -------- Assume we have the following model coefficients: >>> _model_coefficients = {'model1': {'coeff1': 1.2, 'coeff2': 0.5}, ... 'model2': {'coeff1': 0.8, 'coeff2': 1.5}} >>> _coefficients_to_drop = [('model1', 'coeff1'), ('model2', 'coeff1')] >>> drop_model_coefficients(_model_coefficients, _coefficients_to_drop) {'model1': {'coeff2': 0.5}, 'model2': {'coeff2': 1.5}} """ coefficients_to_drop = ( coefficients_to_drop or TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE ) _coefficients_to_drop = [ coefficient for coefficient in coefficients_to_drop if isinstance(coefficient, str) ] model_coefficients = drop_models_results(model_coefficients, _coefficients_to_drop) _coefficients_to_drop = set(coefficients_to_drop) - set(_coefficients_to_drop) for model_name, coefficient in _coefficients_to_drop: model_coefficients[model_name].pop(coefficient, None) return model_coefficients
[docs]def drop_scalers( scalers: Dict[str, MinMaxScaler], scalers_to_drop: List[str] | None = None, ) -> Dict[str, MinMaxScaler]: """ Exclude scalers from the provided dictionary of scalers. This function allows for dropping certain scalers based on their names. The names of the scalers to be dropped are provided in `scalers_to_drop`. If `scalers_to_drop` isn't specified, a pre-defined list `SCALERS_TO_REMOVE` is used. Parameters ---------- scalers : Dict[str, MinMaxScaler] A dictionary of scaler objects (value) identified by their names (key). scalers_to_drop : List[str] | None, optional List of scaler names to drop from the `scalers` dictionary. If not provided, defaults to a pre-defined list named `TEMPORARY_SCALERS_TO_REMOVE`. Returns ------- Dict[str, MinMaxScaler] The modified dictionary of scalers, with specified scalers removed. Examples -------- Assuming we have the following scalers dictionary and `SCALERS_TO_REMOVE` list: scalers = {"scaler1": MinMaxScaler1, "scaler2": MinMaxScaler2, "scaler3": MinMaxScaler3} SCALERS_TO_REMOVE = ["scaler1", "scaler3"] Calling the function as: >>> new_scalers = drop_scalers(scalers) >>> new_scalers {"scaler2": MinMaxScaler2} If we specify the `scalers_to_drop` argument: >>> new_scalers = drop_scalers(scalers, ["scaler2"]) >>> new_scalers {"scaler1": MinMaxScaler1, "scaler3": MinMaxScaler3} """ scalers_to_drop = scalers_to_drop or TEMPORARY_SCALERS_TO_REMOVE return { scaler_name: scaler_value for scaler_name, scaler_value in scalers.items() if scaler_name not in scalers_to_drop }
[docs]def filter_corpo_moedor_especifico( datasets: Dict[str, pd.DataFrame] ) -> Dict[str, pd.DataFrame]: """Filter DataFrames in the datasets' dictionary on a specific condition. This function filters each DataFrame in the provided datasets dictionary based on the "corpo_moedor_especifico" tag with an upper bound of 10. It uses the `filter_tag` function to perform the filtering. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary containing string keys and `pandas.DataFrame` as values. Each DataFrame should contain a column labeled "corpo_moedor_especifico". Returns ------- Dict[str, pd.DataFrame] A dictionary containing the filtered DataFrames, retaining the same keys as the input dictionary. See Also -------- filter_tag : Function used for performing the filtering based on the provided tag and boundary. Examples -------- Given a dictionary of DataFrames `datasets` where each DataFrame contains a column "corpo_moedor_especifico": >>> import pandas as pd >>> datasets = {"df1": pd.DataFrame({"corpo_moedor_especifico": [5, 15, 7]})} >>> filtered_datasets = filter_corpo_moedor_especifico(datasets) >>> print(filtered_datasets["df1"]) corpo_moedor_especifico 0 5 2 7 """ return filter_tag(datasets, "corpo_moedor_especifico", up_bound=10)
[docs]def process_labels(labels: pd.DataFrame) -> pd.DataFrame: """Process the labels to add to the optimization problem. Parameters ---------- labels : pd.DataFrame A `pandas.DataFrame` containing the labels for the optimization problem. Returns ------- pd.DataFrame A `pandas.DataFrame` containing the processed labels for the optimization problem. """ labels = labels.drop_duplicates(subset=["Tag"]) labels = labels.merge( pd.DataFrame( [ [old_name, new_name] for old_name, new_name in constants.TARGETS_IN_MODEL.items() ], columns=["Tag", "New"], ), on="Tag", how="left", ) labels["Tag"] = np.where(labels["New"].notnull(), labels["New"], labels["Tag"]) return labels.drop(columns=["New"])
[docs]def pivot_optimization_results(res): """Pivot the results of the optimization problem. Parameters ---------- res : pd.DataFrame A `pandas.DataFrame` containing the results of the optimization problem. Returns ------- pd.DataFrame A `pandas.DataFrame` containing the pivoted results of the optimization problem. """ pivot_data = res.pivot_table( index=["Tag"], columns=["Faixa"], values=["Valor real"] ) pivot_data.columns = pivot_data.columns.to_series().reset_index()["Faixa"].values pivot_data = pivot_data.reset_index() pivot_data["Tag"] = pivot_data["Tag"].str.replace("'=", "=") return pivot_data
[docs]def create_results_ranges(res, labels, datasets): """Create a DataFrame with the pivoted results of the optimization problem. Parameters ---------- res : pd.DataFrame A `pandas.DataFrame` containing the results of the optimization problem. labels : pd.DataFrame A `pandas.DataFrame` containing the labels for the optimization problem. datasets : Dict[str, pd.DataFrame] | None A dictionary with `pandas.DataFrame` objects for each model used to create the optimization model. This dictionary is used to add additional statistics for each tag. If None, no statistics are added to the results. Returns ------- pd.DataFrame A `pandas.DataFrame` containing the pivoted results of the optimization problem. """ labels = process_labels(labels) piv = pivot_optimization_results(res) dataset = ( piv.merge(labels, on="Tag", how="left")[ [*piv.columns, *labels.columns.difference(piv.columns)] ] .fillna("") .sort_values(by=["Tag"]) .pipe(add_constant_tags_summary) ) if datasets is not None: stats_df = compute_statistics_datasets(datasets) stats_df.index = stats_df.index.to_series().apply( lambda value: constants.TARGETS_IN_MODEL.get(value, value) ) dataset = dataset.merge(stats_df, left_on="Tag", right_index=True, how="left") dataset["Tag"] = ( dataset["Tag"] .str.replace("=PQ*24/768/FUNC", "'=PQ*24/768/FUNC", regex=False) .str.replace("=192/VELO", "'=192/VELO", regex=False) ) dataset = dataset.drop_duplicates(subset=["Tag"]) # The `assign` + `sort_values` + `drop` is a workaround to sort # the "Tag" column values in a case-insensitive manner. return ( dataset.assign(Tag_UPPER=lambda xdf: xdf["Tag"].str.upper()) .sort_values("Tag_UPPER") .drop(columns=["Tag_UPPER"]) )
[docs]def round_value(value: Any, decimals: int = 2) -> Any: """Round numeric values with more than to 3 decimal places. Parameters ---------- value : Any Value to be rounded, if it is a float. decimals : int, default=2 Number of decimal places to round to. Returns ------- Any Rounded value, if it is a float. Otherwise, the original value. """ # Check if the value is a float if isinstance(value, float): # Split the number into its whole and fractional parts _, fraction = divmod(value, 1) # If there are more than 3 decimal places if len(str(fraction)) > 5: # 0.xxxx return round(value, decimals) return value
[docs]def create_perfil_temperature_sheet( ranges_dataframe: pd.DataFrame, wb, sheet_name: str = 'Perfil Grupos de Queima' ): """ Adds a sheet to a given workbook and populates it with filtered temperature data and a line chart. This function filters a DataFrame for specific tags, copies certain columns into a new sheet in the given Excel workbook, and creates a line chart visualizing temperature ranges. It ensures the sheet exists (creating it if necessary), adds filtered data, and then constructs and inserts a line chart based on the data. Parameters ---------- ranges_dataframe : pd.DataFrame The DataFrame containing temperature range data to be filtered and copied. wb : openpyxl.workbook.workbook.Workbook The workbook where the sheet will be added and populated with data and a chart. sheet_name : str, optional The name of the sheet to be added or used for inserting data. Default is 'Perfil Grupos de Queima'. Raises ------ ValueError If `ranges_dataframe` does not contain the necessary columns for filtering or chart generation. See Also -------- openpyxl.workbook.workbook.Workbook : The Workbook class from openpyxl used to manipulate Excel files. pandas.DataFrame : The DataFrame class from pandas used for data manipulation and analysis. Notes ----- It's important that the `ranges_dataframe` contains columns for 'Tag' and temperature ranges as these are crucial for the filtering and chart generation processes. The function dynamically adjusts the y-axis scale of the chart based on the minimum temperature in the data. References ---------- OpenPyXL documentation : https://openpyxl.readthedocs.io/ Pandas documentation : https://pandas.pydata.org/pandas-docs/stable/ Examples -------- >>> import pandas as pd >>> from openpyxl import Workbook >>> df = pd.DataFrame({ ... 'Tag': ['TEMP1_I@08QU-QU-855I-GQ04', 'other_tag'], ... '700-750': [1, 2], ... '750-800': [3, 4], ... '800-850': [5, 6], ... '850-900': [7, 8], ... '900-950': [9, 10], ... '950-1000': [11, 12] ... }) >>> wb = Workbook() >>> create_perfil_temperature_sheet(df, wb) >>> 'Perfil Grupos de Queima' in wb.sheetnames True """ if sheet_name not in wb.sheetnames: wb.create_sheet(sheet_name) columns_to_copy = ['Tag', '700-750', '750-800', '800-850', '850-900', '900-950', '950-1000'] tags_to_filter = [ 'TEMP1_I@08QU-QU-855I-GQ04', 'TEMP1_I@08QU-QU-855I-GQ05', 'TEMP1_I@08QU-QU-855I-GQ06', 'TEMP1_I@08QU-QU-855I-GQ07', 'TEMP1_I@08QU-QU-855I-GQ08', 'TEMP1_I@08QU-QU-855I-GQ09', 'TEMP1_I@08QU-QU-855I-GQ10', 'TEMP1_I@08QU-QU-855I-GQ11', 'TEMP1_I@08QU-QU-855I-GQ12', 'TEMP1_I@08QU-QU-855I-GQ13', 'TEMP1_I@08QU-QU-855I-GQ14', 'TEMP1_I@08QU-QU-855I-GQ15', 'TEMP1_I@08QU-QU-855I-GQ16', ] filtered_df = ranges_dataframe[ranges_dataframe['Tag'].isin(tags_to_filter)] filtered_data_to_write = filtered_df[columns_to_copy] data_to_write = [columns_to_copy] + filtered_data_to_write.values.tolist() ws = wb[sheet_name] for row in data_to_write: ws.append(row) max_row = ws.max_row # Create a line chart with markers chart = LineChart() chart.title = "Perfil Grupos de Queima" chart.x_axis.title = 'Faixas de Produção' chart.y_axis.title = 'Temperatura Grupos de Queima' chart.y_axis.scaling.min = ( math.floor(filtered_data_to_write.min(axis=1).min() / 100) * 100 ) # Adding data to the chart # Data for the chart (excluding the header) data = Reference(ws, min_col=2, min_row=1, max_col=7, max_row=max_row) chart.add_data(data, titles_from_data=True) # Categories (Temperature Range) cats = Reference(ws, min_col=1, min_row=2, max_row=max_row) chart.set_categories(cats) # Optionally, adjust the chart size or position chart.width = 20 chart.height = 15 # Add the chart to the sheet ws.add_chart(chart, "I10") # Adjust cell as needed to position the chart return wb
[docs]def dataframe_to_worksheet(dataframe, worksheet, index=True, header=True): """Convert a `pandas.DataFrame` to an OpenPyXL worksheet. Parameters ---------- dataframe : pd.DataFrame A `pandas.DataFrame` containing the data to be converted to an OpenPyXL worksheet. worksheet : openpyxl.worksheet.worksheet.Worksheet The Worksheet instance, where the data is being stored to. index : bool, default=True Whether to display the index in the formatted Excel file. header : bool, default=True Whether to display the header in the formatted Excel file. """ for r_idx, row in enumerate( dataframe_to_rows(dataframe, index=index, header=header), 1 ): for c_idx, value in enumerate(row, 1): cell = worksheet.cell(row=r_idx, column=c_idx, value=round_value(value)) cell.alignment = Alignment(horizontal='center') # Set header format if r_idx == 1: # "d5d5ec" - Light purple cell.fill = PatternFill( start_color="d5d5ec", end_color="d5d5ec", fill_type="solid" ) # "3d3d3d" - Dark gray cell.font = Font(color="3d3d3d", bold=True)
[docs]def adjust_column_widths(worksheet: openpyxl.worksheet.worksheet.Worksheet): """ Adjust the widths of columns in a worksheet. Column's widths are adjusted based on the lengthiest value that each column has. Parameters ---------- worksheet : openpyxl.worksheet.worksheet.Worksheet The Worksheet instance, where the data is being stored to. """ for column in worksheet.columns: max_length = 0 column_letter = None for cell in column: try: column_letter = cell.column_letter if len(str(cell.value)) > max_length: max_length = len(cell.value) except (AttributeError, ValueError, TypeError): pass adjusted_width = max_length + 10 # add a little extra space if column_letter is not None: worksheet.column_dimensions[column_letter].width = adjusted_width
[docs]def save_formatted_dataframe( dataframe: pd.DataFrame, path: str | Path, header: bool = True, index: bool = True, ): """Format the results of the optimization problem and save them to an Excel file. Parameters ---------- dataframe : pd.DataFrame A `pandas.DataFrame` containing the results of the optimization problem. path : str | Path A path to save the formatted results to. header : bool, default=True Whether to display the header in the formatted Excel file. index : bool, default=True Whether to display the index in the formatted Excel file. """ path = Path(path).with_suffix(".xlsx") workbook = Workbook() worksheet = workbook.active worksheet.title = "Resultado Otimizador" dataframe_to_worksheet(dataframe, worksheet, header=header, index=index) adjust_column_widths(worksheet) worksheet.sheet_view.showGridLines = False worksheet.auto_filter.ref = worksheet.dimensions faixas_worksheet = workbook.create_sheet(title="Resultado Otimizador Faixas") labels_filepath = find_filepath("descricao_tags.xlsx", max_upper_dirs=3) labels = pd.read_excel(labels_filepath) datasets = joblib.load(FINAL_DATASETS_FILEPATH) ranges_dataframe = create_results_ranges(dataframe, labels, datasets) dataframe_to_worksheet( ranges_dataframe, faixas_worksheet, header=header, index=index ) bad_results, warning_results = apply_style_cells(faixas_worksheet) adjust_column_widths(faixas_worksheet) faixas_worksheet.sheet_view.showGridLines = False faixas_worksheet.auto_filter.ref = faixas_worksheet.dimensions results_summary_worksheet = workbook.create_sheet(title="Erros Summary") results_summary_worksheet.cell(row=1, column=1, value="Error Type") results_summary_worksheet.cell(row=2, column=1, value="Warning Results") results_summary_worksheet.cell(row=3, column=1, value="Bad Results") results_summary_worksheet.cell(row=1, column=2, value="Quantity") results_summary_worksheet.cell(row=2, column=2, value=warning_results) results_summary_worksheet.cell(row=3, column=2, value=bad_results) results_summary_worksheet.sheet_view.showGridLines = False adjust_column_widths(results_summary_worksheet) results_summary_worksheet.auto_filter.ref = results_summary_worksheet.dimensions create_perfil_temperature_sheet(ranges_dataframe, workbook) workbook.save(path)
[docs]def format_results(header: bool = True, index: bool = False): """ Format the results of the optimization problem and save them to an Excel file. This function will pivot the results, so that the values for all production ranges are set column-wise. Then this function formats the results and saves them to a new Excel file, located in the same directory as the original results file. Parameters ---------- header : bool, default=True Whether to display the header in the formatted Excel file. index : bool, default=False Whether to display the index in the formatted Excel file. """ res_filepath = find_filepath("resultados_modelos_preditivos/us8/") now = pd.Timestamp.now().strftime("%Y-%m-%d %H_%M") folder = res_filepath.joinpath("Resultados Faixas") folder.mkdir(parents=True, exist_ok=True) path = folder.joinpath(f"Resultado Otimizador - Faixas - {now}.xlsx") dataframe = pd.read_excel(str(res_filepath.joinpath(RESULTADO_OTIMIZADOR_FILENAME))) save_formatted_dataframe(dataframe, path, header=header, index=index)
[docs]def apply_style_cells(worksheet) -> Tuple[int, int]: """Apply "bad" style to cells that have all values equal to zero. Parameters ---------- worksheet : openpyxl.worksheet.worksheet.Worksheet The Worksheet instance, where the data is being stored to. Returns ------- Tuple[int, int] A tuple containing the number of results that have all values equal to zero and the number of results that have at least one value equal to zero. """ bad_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") bad_font = Font(color="9C0006", bold=False) neutral_fill = PatternFill( start_color="FFEB9C", end_color="FFEB9C", fill_type="solid" ) neutral_font = Font(color="9C5700", bold=False) bad_results = 0 warning_results = 0 for row in worksheet.iter_rows( min_row=2, max_row=worksheet.max_row, min_col=1, max_col=7 ): if row[0].value.startswith("FUNC") or row[0].value in ["floticor", "cfix"]: continue _row = [cell for cell in row[1:7] if isinstance(cell.value, (int, float))] if all(cell.value == 0 for cell in _row): bad_results += 1 for cell in row: cell.fill = bad_fill cell.font = bad_font elif any(cell.value == 0 for cell in _row): row[0].fill = neutral_fill row[0].font = neutral_font warning_results += 1 for cell in _row: if cell.value == 0: cell.fill = neutral_fill cell.font = neutral_font elif all(cell.value == row[1].value for cell in _row): row[0].fill = neutral_fill row[0].font = neutral_font warning_results += 1 for cell in _row: cell.fill = neutral_fill cell.font = neutral_font return bad_results, warning_results
[docs]def add_constant_tags_summary(dataframe: pd.DataFrame): """Add a column to `dataframe` to indicate whether the tag has constant values. Parameters ---------- dataframe : pd.DataFrame A `pandas.DataFrame` containing the tags to be checked. Returns ------- pd.DataFrame A `pandas.DataFrame` containing the tags and a column indicating whether the tag has constant values. """ constant_tags_summary_filepath = find_filepath( "constant_tags_summary.xlsx", max_upper_dirs=3 ) constant_tags_summary = pd.read_excel(constant_tags_summary_filepath) dataframe["Constant Values"] = np.where( dataframe["Tag"].isin(constant_tags_summary["Tag"]), True, False, ) return dataframe
[docs]def energy_cons_vents_slopes(df_sql: pd.DataFrame) -> pd.DataFrame: cons_vents = [f"CONS1_Y@08QU-PF-852I-{idx:02d}M1" for idx in range(1, 9)] # Define the bins and their corresponding labels bins = np.arange(700, 1001, 50) labels = [f"{lb}-{lb + 50}" for lb in bins[:-1]] # Create the new column 'faixa' using pd.cut df_sql["faixa"] = pd.cut( df_sql["PROD_PQ_Y@08US"], bins=bins, labels=labels, right=False ) cons_vents_slopes = ( df_sql.groupby("faixa")[cons_vents] .mean() .round(2) .apply( lambda col: pd.Series( [ ( 1 if prev_faixa < curr_faixa else -1 if prev_faixa > curr_faixa else 0 ) for prev_faixa, curr_faixa in zip(col.iloc[:-1], col.iloc[1:]) ], index=col.index[1:], name=col.name, ) ) ) return cons_vents_slopes
[docs]def energy_cons_vents_faixas(pulp_solver, current_faixa: str, df_sql: pd.DataFrame): """ Set constraint that sets the values of each fan energy consumption variable to be greater, smaller, or equal to the values from the previous production range based on the historical data tendencies between ranges. For example, for the optimization problem of the production range `"800-850"` if the average historical values of the tag `"CONS1_Y@08QU-PF-852I-01M1"` increase relative to the average historical values of the production range `"750-800"`, then this function will create a constraint that forces the `"CONS1_Y@08QU-PF-852I-01M1"` variable to be equal to or greater than 101% of the value obtained during the optimization process of the previous production range. Parameters ---------- pulp_solver : PulpSolver The `PulpSolver` class instance, that contains the optimization problems for each production range. current_faixa : str The current production range. Values should be represented as strings containing two numeric values separated by `"-"`. For example, `"700-750"`, `"750-800`", `"800-850"`, etc. df_sql : pd.DataFrame The pandas DataFrame with all tags represented as columns. This function expects that the dataframe used contains the tag values obtained after performing all data transformations and cleaning operations. This parameter is used to determine whether the fan energy consumption values of two adjacent production ranges increase, decrease, or stay the same. Returns ------- PulpSolver The `PulpSolver` class instance with the added constraints. """ if current_faixa == min(pulp_solver.probs.keys()): return pulp_solver cons_vents_slopes = energy_cons_vents_slopes(df_sql) scalers = pulp_solver.scalers lb_faixa, _ = current_faixa.split("-") _lb_faixa = int(lb_faixa) previous_faixa = f"{_lb_faixa-50}-{_lb_faixa}" current_prob = pulp_solver.probs[current_faixa] previous_prob = pulp_solver.probs[previous_faixa] if previous_prob.status != pulp.LpStatusOptimal: return pulp_solver previous_lp_vars = previous_prob.variablesDict() current_lp_vars = current_prob.variablesDict() cons_vent_tags = [f"CONS1_Y@08QU-PF-852I-{idx:02d}M1" for idx in range(1, 9)] for cons_vent_tag in cons_vent_tags: lpvar_name = pulp.LpVariable(cons_vent_tag).name previous_lp_var = previous_lp_vars[lpvar_name] current_lp_var = current_lp_vars[lpvar_name] current_lp_var_rescaled = denormalize_lpvar( cons_vent_tag, current_lp_var, scalers ) previous_cons_vent = denormalize_lpvar( cons_vent_tag, previous_lp_var, scalers ).value() slope = cons_vents_slopes.loc[current_faixa, cons_vent_tag] if slope == -1: current_prob += ( current_lp_var_rescaled <= previous_cons_vent * 0.99, f"{current_faixa.replace('-', '_')}_LT_{previous_faixa.replace('-', '_')}_{lpvar_name}", ) elif slope == 1: current_prob += ( current_lp_var_rescaled >= previous_cons_vent * 1.01, f"{current_faixa.replace('-', '_')}_GT_{previous_faixa.replace('-', '_')}_{lpvar_name}", ) else: current_prob += ( current_lp_var_rescaled >= previous_cons_vent * 0.95, f"{current_faixa.replace('-', '_')}_GT_95_PERCENT_{previous_faixa.replace('-', '_')}_{lpvar_name}", ) current_prob += ( current_lp_var_rescaled <= previous_cons_vent * 1.05, f"{current_faixa.replace('-', '_')}_LT_105_PERCENT_{previous_faixa.replace('-', '_')}_{lpvar_name}", ) pulp_solver.probs[current_faixa] = current_prob return pulp_solver
[docs]def temp_production_ranges_ascending(pulp_solver, current_faixa: str): """ Constraint current production range optimization problem using previous range values. This function performs the following steps: 1. The function checks if the current_faixa is the optimization problem from the first production range from the `pulp_solver.probs` dictionary. If it is, the function returns the `pulp_solver` object unchanged. 2. The function retrieves the dictionary of scalers from the `pulp_solver` object. 3. The function calculates the previous production range by subtracting 50 from the lower bound of the current production range. 4. The function retrieves the current and previous probability objects from the `pulp_solver.probs` dictionary. 5. If the previous probability object is not optimal, the function logs an error message and returns the `pulp_solver` object unchanged. 6. The function retrieves the LP variables for the current and previous production ranges. 7. The function creates a list of `"TEMP1_I@08QU-QU-855I-GQXX"` tag names. Then it retrieves the LP variables for the current and previous production ranges optimization problems, denormalizes them using the `denormalize_lpvar` function, and adds a constraint to the current optimization problem instance. 8. The function updates the `pulp_solver.probs` dictionary with the updated optimization problem instance. 9. The function returns the updated `pulp_solver` object. Parameters ---------- pulp_solver : PulpSolver The `PulpSolver` class instance, that contains the optimization problems for each production range. current_faixa : str The current production range. Values should be represented as strings containing two numeric values separated by `"-"`. For example, `"700-750"`, `"750-800`", `"800-850"`, etc. Returns ------- PulpSolver The `PulpSolver` class instance with the added constraints. Notes ----- To better explain what's the purpose of this function, consider the following example: Suppose we're solving 5 optimization problems, for the following production ranges: `'750-800'`, `'800-850'`, `'850-900'`, `'900-950'`, and `'950-1000'`. After defining the first production range problem and solving it, the model returned a value for the tag `'TEMP1_I@08QU-QU-855I-GQ09'` equal to 1355. In this scenario, this function will force the value of `'TEMP1_I@08QU-QU-855I-GQ09'` for the next production range of `'800-850'` to be equal to 1355 or higher. The next production range model in turn will restrict values that can be set to this same column based on the second problem's results and so on. """ is_first_faixa = current_faixa == min(pulp_solver.probs.keys()) # if current_faixa == min(pulp_solver.probs.keys()): # return pulp_solver max_possible_lb_faixa = 950 scalers = pulp_solver.scalers lb_faixa, _ = current_faixa.split("-") _lb_faixa = int(lb_faixa) if is_first_faixa: previous_faixa = current_faixa else: previous_faixa = f"{_lb_faixa - 50}-{_lb_faixa}" current_prob = pulp_solver.probs[current_faixa] previous_prob = pulp_solver.probs[previous_faixa] if not is_first_faixa and previous_prob.status != pulp.LpStatusOptimal: logger.error( "Previous production range '%s' status is not optimal: %s", previous_faixa, pulp.LpStatus[previous_prob.status], ) logger.error( "Not constraining values for 'TEMP1_I@08QU-QU-855I-GQXX' tags to " "be greater than or equal to the values from %s production range.", previous_faixa, ) return pulp_solver previous_lp_vars = previous_prob.variablesDict() current_lp_vars = current_prob.variablesDict() temp_gq_tags = [f"TEMP1_I@08QU-QU-855I-GQ{idx:02d}" for idx in range(9, 17)] for temp_gq_tag in temp_gq_tags: lpvar_name = pulp.LpVariable(temp_gq_tag).name previous_lp_var = previous_lp_vars[lpvar_name] current_lp_var = current_lp_vars[lpvar_name] if lpvar_name.endswith("09"): # new_ub = 1 - ((max_possible_lb_faixa - _lb_faixa) / 25) * 10 / 1360 current_lp_var.upBound = 1 obj = current_prob.objective obj += (1 - current_lp_var) * 1_000 current_prob.setObjective(obj) if is_first_faixa: pulp_solver.probs[current_faixa] = current_prob return pulp_solver # current_lp_var_rescaled = denormalize_lpvar(temp_gq_tag, current_lp_var, scalers) # previous_lp_var_rescaled = denormalize_lpvar(temp_gq_tag, previous_lp_var, scalers).value() previous_lp_var.name = f"{previous_faixa}_{previous_lp_var.name}" current_prob += current_lp_var >= previous_lp_var + 0.000001 pulp_solver.probs[current_faixa] = current_prob return pulp_solver
[docs]def can_define_inter_problem_constraint(pulp_solver, current_faixa) -> bool: """ Perform check to determine if inter-problem constraint can be added to the optimization model. This function checks whether constraints that compare further optimization problems can be added to a certain production range optimization problem. Parameters ---------- pulp_solver : PulpSolver The `PulpSolver` class instance, that contains the optimization problems for each production range. current_faixa : str The current production range. Values should be represented as strings containing two numeric values separated by `"-"`. For example, `"700-750"`, `"750-800`", `"800-850"`, etc. Returns ------- bool True if the constraint can be added to the optimization model and False otherwise. """ if current_faixa == min(pulp_solver.probs.keys()): return False lb_faixa, _ = current_faixa.split("-") _lb_faixa = int(lb_faixa) previous_faixa = f"{_lb_faixa - 50}-{_lb_faixa}" previous_prob = pulp_solver.probs[previous_faixa] if previous_prob.status != pulp.LpStatusOptimal: logger.error( "Previous production range '%s' status is not optimal: %s", previous_faixa, pulp.LpStatus[previous_prob.status], ) logger.error( "Not constraining values for 'TEMP1_I@08QU-QU-855I-GQXX' tags to " "be greater than or equal to the values from %s production range.", previous_faixa, ) return False return True
[docs]class FakeScaler(BaseEstimator, TransformerMixin): """ A fake scaler class that contains attributes used by SciKit Learn scalers. This class simulates the behavior of a feature scaler but does not implement any scaling. Parameters ---------- feature_range : tuple of (min, max), default=(0, 1) The desired range of transformed data. This parameter is not used in actual transformations but is kept for interface compatibility. Attributes ---------- n_features_in_ : int The number of features observed during `fit`. n_samples_seen_ : int The number of samples observed during `fit`. min_ : ndarray of shape (n_features_in_,) The minimum value in each feature in the fitted data. max_ : ndarray of shape (n_features_in_,) The maximum value in each feature in the fitted data. data_range_ : ndarray of shape (n_features_in_,) The data range (max - min) for each feature in the fitted data. data_min_ : ndarray of shape (n_features_in_,) The minimum value in each feature in the fitted data. data_max_ : ndarray of shape (n_features_in_,) The maximum value in each feature in the fitted data. scale_ : ndarray of shape (n_features_in_,) The scaling factors applied to each feature. Set to ones. mean_ : ndarray of shape (n_features_in_,) The mean value for each feature in the fitted data. center_ : ndarray of shape (n_features_in_,) The centering value for each feature. Methods ------- fit(X, y=None) Compute the necessary attributes from the training data. transform(X) Returns the original data as it is. inverse_transform(X) Reverses the transformation, effectively returning the original data. Examples -------- >>> import numpy as np >>> X = np.array([[1, 2], [3, 4]]) >>> scaler = FakeScaler() >>> scaler.fit(X) FakeScaler() >>> scaler.transform(X) array([[1, 2], [3, 4]]) """ def __init__(self, feature_range=(0, 1)): self.feature_range = feature_range
[docs] def fit(self, X, y=None): """ Compute the minimum, maximum, mean, and range for each feature in X. Assumes that `X` is a numpy array, `pandas.Series`, or `pandas.DataFrame`. This method calculates basic statistics for each feature but does not scale the data. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where n_samples is the number of samples and n_features is the number of features. y : Ignored This parameter is not used in this method. Returns ------- self : object Returns self. Raises ------ ValueError If the input array `X` does not meet the expected criteria. """ # Convert X to a numpy array if it's not already. X = np.asarray(X) # Check and handle the shape of X if X.ndim == 1: X = X.reshape(-1, 1) # Update attributes based on X self.n_samples_seen_, self.n_features_in_ = X.shape self.min_ = np.min(X, axis=0) self.max_ = np.max(X, axis=0) self.data_range_ = self.max_ - self.min_ self.data_min_ = self.min_ self.data_max_ = self.max_ self.scale_ = np.ones(self.n_features_in_) self.mean_ = np.mean(X, axis=0) self.center_ = self.mean_ return self
# noinspection PyPep8Naming
[docs] def transform(self, X): """Return the input data unchanged. Parameters ---------- X : array-like of shape (n_samples, n_features) Input data to be transformed. Returns ------- X_transformed : ndarray of shape (n_samples, n_features) The original input data unchanged. """ # Convert X to a numpy array if it's not already. X = np.asarray(X) # Check and handle the shape of X if X.ndim == 1: X = X.reshape(-1, 1) if X.shape[1] != self.n_features_in_: raise ValueError( f"X with {X.shape[1]:,} features does not match training with " f"{self.n_features_in_:,} features" ) return X
# noinspection PyPep8Naming
[docs] def inverse_transform(self, X): """ Return the input data unchanged. Parameters ---------- X : array-like of shape (n_samples, n_features) Input data to be inversely transformed. Returns ------- X_original : ndarray of shape (n_samples, n_features) The original input data unchanged. """ # Convert X to a numpy array if it's not already X = np.asarray(X) # Check and handle the shape of X if X.ndim == 1: X = X.reshape(-1, 1) if X.shape[1] != self.n_features_in_: raise ValueError( f"X with {X.shape[1]:,} features does not match training with " f"{self.n_features_in_:,} features" ) return X
[docs]def adjust_models_coefficients(models_results: dict, scalers: dict) -> dict: """Change the coefficients of Ridge using scalers. Parameters ---------- models_results : dict Results of all models scalers: dict Adjusted scalers for all tags Returns ------- models_results: dict Results containing the regression models with adjusted coefficients """ for model_name in models_results.keys(): for var_index, tag in enumerate( models_results[model_name][0]['grid'].best_estimator_.feature_names_in_ ): if tag.startswith('qtde') or tag.startswith('SOMA FUNC'): continue scaler = scalers[tag] scaler_name = scaler.__class__.__name__ if scaler_name == 'RobustScaler': models_results[model_name][0]['grid'].best_estimator_[1].coef_[ var_index ] /= scaler.scale_ models_results[model_name][0]['grid'].best_estimator_[ 1 ].intercept_ -= np.sum( models_results[model_name][0]['grid'] .best_estimator_[1] .coef_[var_index] * scaler.center_ ) elif scaler_name == 'StandardScaler': models_results[model_name][0]['grid'].best_estimator_[1].coef_[ var_index ] /= scaler.scale_ models_results[model_name][0]['grid'].best_estimator_[ 1 ].intercept_ -= np.dot( models_results[model_name][0]['grid'] .best_estimator_[1] .coef_[var_index], scaler.mean_, ) else: scaled_coef = ( models_results[model_name][0]['grid'] .best_estimator_[1] .coef_[var_index] ) scaled_intercept = ( models_results[model_name][0]['grid'].best_estimator_[1].intercept_ ) unscaled_coef = scaled_coef * scaler.scale_ unscaled_intercept = scaled_intercept - ( scaled_coef * scaler.scale_ ).dot(scaler.data_min_) models_results[model_name][0]['grid'].best_estimator_[1].coef_[ var_index ] = unscaled_coef models_results[model_name][0]['grid'].best_estimator_[ 1 ].intercept_ = unscaled_intercept models_results[model_name][0]['model'] = models_results[model_name][0][ 'grid' ].best_estimator_[1] models_results[model_name][0]['params'] = ( models_results[model_name][0]['grid'].best_estimator_[1].coef_ ) return models_results
[docs]def inverse_transform_models_features( models_features: Dict[str, pd.Series], scalers: dict, ) -> Dict[str, pd.Series]: """ Apply the inverse transformation of the `models_features` dictionary values. If a scaler is not found for a particular series, it logs an error and uses the pre-existing values for that series. Parameters ---------- models_features : Dict[str, pd.Series] A dictionary where keys are tag names and values are `pandas.Series`. scalers : Dict[str, SkLearnScalers] A dictionary where keys are tag names and values are instances of Scikit-learn scalers. Returns ------- Dict[str, pd.Series] A dictionary of tag names and `pandas.Series` values representing these tags original values. """ results = {} for tag_name, values in models_features.items(): scaler = scalers.get(tag_name) if not scaler: logger.error("Scaler not found for tag: '%s'", tag_name) logger.error("Using pre-existing value for the tag instead") results[tag_name] = values continue results[tag_name] = pd.Series( scalers[tag_name].inverse_transform((values,))[0], index=values.index, name=tag_name, ) return results
[docs]def replace_ventiladores_tags( datasets: Dict[str, pd.DataFrame], df_sql: pd.DataFrame, models: List[str] | None = None, old_ventiladores_tags: List[str] | None = None, new_ventiladores_tags: List[str] | None = None, ): """ Replace old ventiladores tags with new ones in specified dataframes. This function updates a collection of pandas DataFrames by removing specified old ventiladores tags if they are not target columns, and merging the DataFrames with a new set of ventiladores tags from another DataFrame. It ensures that the structure of the DataFrames remains consistent, especially with regard to the target column. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary of DataFrames keyed by model name. df_sql : pd.DataFrame A `pandas.DataFrame` containing new ventiladores tags to be merged. models : List[str] | None, optional The list of model names corresponding to keys in `datasets` to be updated. Defaults to a predefined list if None. old_ventiladores_tags : List[str] | None, optional The list of old ventiladores tags to be removed. Defaults to a predefined list if None. new_ventiladores_tags : List[str] | None, optional The list of new ventiladores tags to be merged. Defaults to a predefined list if None. Returns ------- Dict[str, pd.DataFrame] Updated dictionary of DataFrames with old tags replaced by new tags. Raises ------ ValueError If an old ventiladores tag is also a target column in any DataFrame, or if the target column changes after the operation. Notes ----- It is crucial that the target column remains unchanged after this operation. The last column from each DataFrame is assumed to be the model's target. Therefore, after running this function, the last column of each DataFrame should still contain the target values. """ # List of models to drop old ventilators tags and add the new control # variables models = models or [ "abrasao", "basicidade", "compressao", "energia_forno", "energia_prensa", "gas", "relacao gran", "temp_forno", "eq_termica", ] # List of tags to add to models new_ventiladores_tags = new_ventiladores_tags or [ "TEMP1_C@08QU-PF-852I-02M1", "PRES1_I@08QU-WB-851I-19", "PRES2_C@08QU-PF-852I-03M1", "TEMP1_I@08PN-TR-860I-02", "PRES1_C@08QU-PF-852I-04M1", "PRES1_I@08QU-DU-853I-12", "TEMP1_C@08QU-PF-852I-05M1", "PEVT_PF852I05_M1_PIT_OCS_CDV_09a12", "PRES1_C@08QU-PF-852I-07M1", "PRES7_I@08QU-HO-851I-01", "PRES1_I@08QU-WB-851I-21", "TEMP1_I@08QU-HO-851I-01", ] # List of tags to remove from models old_ventiladores_tags = old_ventiladores_tags or [ # "CONS1_Y@08QU-PF-852I-01M1", # "CONS1_Y@08QU-PF-852I-02M1", # "CONS1_Y@08QU-PF-852I-03M1", # "CONS1_Y@08QU-PF-852I-04M1", # "CONS1_Y@08QU-PF-852I-05M1", # "CONS1_Y@08QU-PF-852I-06M1", # "CONS1_Y@08QU-PF-852I-07M1", # "CONS1_Y@08QU-PF-852I-08M1", "POTE1_I@08QU-PF-852I-01M1", "POTE1_I@08QU-PF-852I-02M1", "POTE1_I@08QU-PF-852I-03M1", "POTE1_I@08QU-PF-852I-04M1", "POTE1_I@08QU-PF-852I-05M1", "POTE1_I@08QU-PF-852I-06M1", "POTE1_I@08QU-PF-852I-07M1", "POTE1_I@08QU-PF-852I-08M1", "ROTA1_I@08QU-PF-852I-01M1", "ROTA1_I@08QU-PF-852I-02M1", "ROTA1_I@08QU-PF-852I-03M1", "ROTA1_I@08QU-PF-852I-04M1", "ROTA1_I@08QU-PF-852I-05M1", "ROTA1_I@08QU-PF-852I-06M1", "ROTA1_I@08QU-PF-852I-07M1", "ROTA1_I@08QU-PF-852I-08M1", ] for model_name in models: df = datasets[model_name] target = df.columns[-1] target_values = df[target] if target in old_ventiladores_tags: raise ValueError( f"{model_name}: tag {target} is target, therefore cannot be dropped." ) df = df.drop( columns=[*old_ventiladores_tags, *new_ventiladores_tags, target], errors="ignore", ) new_df = df.merge( df_sql[new_ventiladores_tags], left_index=True, right_index=True, how="inner", ) new_df = new_df.merge( target_values, left_index=True, right_index=True, how="inner" ) if new_df.columns[-1] != target: raise ValueError("Target column changed") datasets[model_name] = new_df return datasets
[docs]def filter_datasets_df_sql_by_date( datasets: Dict[str, pd.DataFrame], df_sql: pd.DataFrame, n_days: int = 60, ) -> Tuple[Dict[str, pd.DataFrame], pd.DataFrame]: """ Filter the `datasets` and `df_sql` DataFrame by date. This function filters each DataFrame in the provided datasets dictionary and the df_sql DataFrame based on a lower-bound date. The lower-bound date is calculated as the current date minus a specified number of days. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary containing string keys and `pandas.DataFrame` as values. Each DataFrame should contain a DateTime index. df_sql : pd.DataFrame A DataFrame with a DateTime index to be filtered. n_days : int, default=60 The number of days to subtract from the current date to get the lower-bound date. Returns ------- Tuple[Dict[str, pd.DataFrame], pd.DataFrame] A tuple containing the filtered `datasets` dictionary and the filtered `df_sql` DataFrame. """ lb_date = pd.Timestamp.today() - relativedelta(days=n_days) # '2023-10-12' new_datasets = {} for name, df in datasets.items(): _df = df.loc[lb_date:, :] nrows, ncols = _df.shape print(f"New {name!r} contains: {nrows:,} rows and {ncols:,} columns") new_datasets[name] = _df _df_sql = df_sql.loc[lb_date:, :] return new_datasets, _df_sql
[docs]def fix_vent_control_tags_bounds( prob: pulp.LpProblem, datasets, faixa, lb_quantile=0.1, ub_quantile=0.9 ): vent_control_tags = [ "TEMP1_C@08QU-PF-852I-02M1", "PRES1_I@08QU-WB-851I-19", "PRES2_C@08QU-PF-852I-03M1", "TEMP1_I@08PN-TR-860I-02", "PRES1_C@08QU-PF-852I-04M1", "PRES1_I@08QU-DU-853I-12", "TEMP1_C@08QU-PF-852I-05M1", "PEVT_PF852I05_M1_PIT_OCS_CDV_09a12", "PRES1_C@08QU-PF-852I-07M1", "PRES7_I@08QU-HO-851I-01", "PRES1_I@08QU-WB-851I-21", "TEMP1_I@08QU-HO-851I-01", ] lpvars = prob.variablesDict() # Loop through each fan control tag for tag in vent_control_tags: # Retrieve the topmost values for the tag that belong to the production # range currently being optimized. series_obj = get_most_recent_values(datasets, tag, faixa) # Get lower- and upper-bounds values from percentiles lb, ub = series_obj.quantile(lb_quantile), series_obj.quantile(ub_quantile) # Name of the tag, converted to PuLP format lp_tag = pulp.LpVariable(tag).name # Get the tag from LP problem or create a new tag if it doesn't exist lpvar = lpvars.get(lp_tag, pulp.LpVariable(tag)) # Set control fan variable bounds based on the percentiles obtained # at the beginning of the iteration. lpvar.lowBound, lpvar.upBound = lb, ub prob.variablesDict()[lp_tag] = lpvar
# def bounds_mapping_groupby(grp: pd.DataFrame): # return tuple((grp[col].quantile(0.10), grp[col].quantile(0.90)) for col in grp.columns)
[docs]def bounds_mapping_groupby(grp: pd.DataFrame): return tuple( [ *[ (round(grp[col].quantile(0.02), 2), round(grp[col].quantile(0.98), 2)) for col in grp.columns.difference(["cfix_group_lb", "cfix_group_ub"]) ], (round(grp["cfix_group_lb"].max(), 2), round(grp["cfix_group_ub"].max(), 2)), ] )
[docs]def bounds_linking( problem: pulp.LpProblem, variables: List[str], bounds_mapping: List[tuple], ) -> pulp.LpProblem: # Dynamically create variables based on provided definitions var_dict = {} for var_name in variables: var_dict[var_name] = problem.variablesDict()[constants.TARGETS_IN_MODEL.get(var_name, pulp.LpVariable(var_name).name)] # Define constraints using "big-M" method with binary control M = 100 # A large constant for the big-M method binary_variables = [] for idx, bounds in enumerate(bounds_mapping): binary_name = f"binary_{idx}" # Create a binary variable for each configuration bin_var = pulp.LpVariable(binary_name, cat='Binary') binary_variables.append(bin_var) for var_name, bound in zip(variables, bounds): # Apply big-M method constraints to link the binary variable with # the range for each variable problem += ( var_dict[var_name] >= bound[0] - M * (1 - bin_var), f"{binary_name}_Link_Lower_{var_name}", ) problem += ( var_dict[var_name] <= bound[1] + M * (1 - bin_var), f"{binary_name}_Link_Upper_{var_name}", ) # Ensure only one binary variable can be active (1) at any time problem += pulp.lpSum(binary_variables) == 1, "Single_Active_Binary" return problem