"""
Module defines objects that are meant to be temporary.
"""
from __future__ import annotations
import math
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
import joblib
import numpy as np
import openpyxl
import pandas as pd
import pulp
from dateutil.relativedelta import relativedelta
from openpyxl import Workbook
from openpyxl.chart import LineChart
from openpyxl.chart import Reference
from openpyxl.styles import Alignment
from openpyxl.styles import Font
from openpyxl.styles import PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.preprocessing import MinMaxScaler
from wip.constants import FINAL_DATASETS_FILEPATH
from wip.constants import RESULTADO_OTIMIZADOR_FILENAME
from wip.constants import constants
from wip.datatools.ml_filters import (filter_tag, get_column_values_from_datasets,
get_most_recent_values, multi_merge)
from wip.datatools.ml_filters import find_models_by_tag
from wip.files.lp_denorm_constraints import denormalize_lpvar
from wip.logging_config import logger
from wip.utils import find_filepath
TEMPORARY_MODELS_TO_REMOVE = [
"produtividade filtragem",
"torque",
"dens_moinho_1",
"dens_moinho_2",
"dens_moinho_3",
# "relacao gran",
]
TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE = [
*TEMPORARY_MODELS_TO_REMOVE,
("SE PP", "Calculo da Energia da Filtragem"),
]
TEMPORARY_SCALERS_TO_REMOVE = ['Calculo da Energia da Filtragem']
[docs]def date_select(series_obj: pd.Series, n_days: int = 30) -> pd.Series:
"""Select the last `n_days` days from a `pandas.Series`.
Parameters
----------
series_obj : pd.Series
The `pandas.Series` object to select the last `n_days` days.
This series must contain datetime values as index.
n_days : int, default=30
Thew number of days to select based on the last existing date.
Returns
-------
pd.Series
The `pandas.Series`, with only the last `n_days` days.
"""
last_date = series_obj.index.max()
first_date = last_date - relativedelta(days=n_days)
return series_obj.loc[first_date:]
[docs]def compute_statistics(series_obj: pd.Series, target_names: List[str]) -> dict:
"""
Compute statistical measures for a given pandas Series.
This function calculates various statistical measures for the provided pandas
Series. It checks if the series index is of a datetime type and computes averages
for different periods, standard deviation, mode, median, minimum, maximum, and
percentiles. It also determines if the series name is in the target names list.
Parameters
----------
series_obj : pd.Series
The `pandas.Series` object for which statistics are to be calculated.
The index must be of datetime type.
target_names : List[str]
List of target names to check if the series name is a target.
Returns
-------
dict
A dictionary containing computed statistical values. Keys are statistical
measures and values are their corresponding computed values.
If certain conditions are not met, some values may be None.
Raises
------
ValueError
If the index of `series_obj` is not of a datetime type.
Examples
--------
>>> series = pd.Series([1, 2, 3], index=pd.date_range('20200101', periods=3))
>>> compute_statistics(series, ['target'])
{'Average last 7 days': None, 'Average last 14 days': None, ...}
"""
# Ensure the index is datetime
if not isinstance(series_obj.index, pd.DatetimeIndex):
raise ValueError("Series index must be of datetime type")
is_boolean = set(series_obj.unique()).issubset({0, 1})
is_empty = series_obj.empty
is_target = series_obj.name in target_names
stats = {
"Average last 7 days": (
round(date_select(series_obj, 7).mean(), 2)
if not is_boolean and not is_empty
else None
),
"Average last 14 days": (
round(date_select(series_obj, 14).mean(), 2)
if not is_boolean and not is_empty
else None
),
"Average last 30 days": (
round(date_select(series_obj, 30).mean(), 2)
if not is_boolean and not is_empty
else None
),
"Average last 60 days": (
round(date_select(series_obj, 60).mean(), 2)
if not is_boolean and not is_empty
else None
),
"Average last 120 days": (
round(date_select(series_obj, 120).mean(), 2)
if not is_boolean and not is_empty
else None
),
"Average all periods": (
round(series_obj.mean(), 2) if not is_boolean and not is_empty else None
),
"Standard Deviation all periods": (
round(series_obj.std(), 2) if not is_boolean and not is_empty else None
),
"Mode all periods": (
round(series_obj.mode().iloc[0], 2) if not is_empty else None
),
"Median all periods": (
round(series_obj.median(), 2) if not is_boolean and not is_empty else None
),
"Minimum all periods": round(series_obj.min(), 2) if not is_empty else None,
"Maximum all periods": round(series_obj.max(), 2) if not is_empty else None,
"2% Percentile": (
round(np.percentile(series_obj, 2), 2)
if not is_boolean and not is_empty
else None
),
"5% Percentile": (
round(np.percentile(series_obj, 5), 2)
if not is_boolean and not is_empty
else None
),
"10% Percentile": (
round(np.percentile(series_obj, 10), 2)
if not is_boolean and not is_empty
else None
),
"25% Percentile": (
round(np.percentile(series_obj, 25), 2)
if not is_boolean and not is_empty
else None
),
"50% Percentile": (
round(np.percentile(series_obj, 50), 2)
if not is_boolean and not is_empty
else None
),
"75% Percentile": (
round(np.percentile(series_obj, 75), 2)
if not is_boolean and not is_empty
else None
),
"95% Percentile": (
round(np.percentile(series_obj, 95), 2)
if not is_boolean and not is_empty
else None
),
"98% Percentile": (
round(np.percentile(series_obj, 95), 2)
if not is_boolean and not is_empty
else None
),
"Is target": is_target,
}
return stats
[docs]def compute_statistics_datasets(datasets: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Compute statistics for each column across multiple pandas DataFrames.
Aggregates columns from multiple DataFrames provided in a dictionary and computes
statistical measures for each unique column. The function iterates over all
DataFrames, concatenates column values across them, and calculates statistics
using `compute_statistics`. It handles columns with the same name across different
DataFrames and computes overall statistics.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where keys are dataset names (or identifiers) and values are
pandas DataFrames. The function expects the last column of each DataFrame to
be the target column.
Returns
-------
pd.DataFrame
A DataFrame where each row corresponds to a unique column from the input
DataFrames and contains the computed statistics for that column.
See Also
--------
compute_statistics : Used to compute statistics for individual columns.
Examples
--------
>>> _datasets = {
... "dataset1": pd.DataFrame(...),
... "dataset2": pd.DataFrame(...)
... }
>>> compute_statistics_datasets(_datasets)
# DataFrame with computed statistics for each column across the datasets.
Notes
-----
This function is specifically designed to work with DataFrames where the last
column is considered as the target. The statistics are computed for each column,
considering their presence across multiple DataFrames.
"""
all_columns = {column for df in datasets.values() for column in df.columns}
target_names = [df.columns[-1] for df in datasets.values()]
df_stats = pd.DataFrame()
for column in all_columns:
column_values = (
pd.concat(
[
datasets[model_name][column]
for model_name in find_models_by_tag(column, datasets)
]
)
.dropna()
.drop_duplicates()
.sort_index()
)
column_stats = pd.DataFrame(
compute_statistics(column_values, target_names), index=[column_values.name]
)
df_stats = pd.concat([df_stats, column_stats])
return df_stats
[docs]def drop_models_results(
models_results: Dict[str, list | dict],
models_to_drop: List[str] | None = None,
) -> Dict[str, list | dict]:
"""
Remove specified models from the `models_results` dictionary.
The function filters out specified models from the dictionary containing
model results. If no models are specified, the function defaults to
removing models from the global variable `TEMPORARY_MODELS_TO_REMOVE`.
Parameters
----------
models_results : Dict[str, list | dict]
Dictionary where keys are model names (str) and values are lists of
model results.
models_to_drop : List[str] | None, optional
List of model names to be dropped from `models_results`.
If not provided, the function defaults to using `TEMPORARY_MODELS_TO_REMOVE`.
Returns
-------
Dict[str, list | dict]
Filtered dictionary of model results with specified models removed.
Examples
--------
>>> _models_results = {"model1": [1, 2, 3], "model2": [4, 5, 6],
... "model3": [7, 8, 9]}
>>> drop_models_results(_models_results, ["model1", "model3"])
{"model2": [4, 5, 6]}
"""
models_to_drop = models_to_drop or TEMPORARY_MODELS_TO_REMOVE
return {
model_name: results
for model_name, results in models_results.items()
if model_name not in models_to_drop
}
[docs]def drop_model_coefficients(
model_coefficients: Dict[str, Dict[str, float | int]],
coefficients_to_drop: List[str | Tuple[str, str]] | None = None,
) -> Dict[str, Dict[str, float | int]]:
"""
Remove specified coefficients from the given model coefficients.
This function provides a way to delete specific coefficients from the
model coefficients. If no specific coefficients are provided to remove,
it will fall back to a set of temporary coefficients stored in
`TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE`.
Parameters
----------
model_coefficients : Dict[str, Dict[str, float | int]]
The model coefficients to change. Keys are model names and values
are dictionaries where keys are coefficient names and values are
their respective values.
coefficients_to_drop : List[str | Tuple[str, str]] | None, optional
The coefficients to drop. Can either be a list of model names as
strings or a list of tuples where the first element is the model
name and the second is the coefficient name. If not provided,
coefficients from `TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE` are
removed.
Returns
-------
Dict[str, Dict[str, float | int]]
The modified model coefficients dictionary after dropping the
specified coefficients.
Examples
--------
Assume we have the following model coefficients:
>>> _model_coefficients = {'model1': {'coeff1': 1.2, 'coeff2': 0.5},
... 'model2': {'coeff1': 0.8, 'coeff2': 1.5}}
>>> _coefficients_to_drop = [('model1', 'coeff1'), ('model2', 'coeff1')]
>>> drop_model_coefficients(_model_coefficients, _coefficients_to_drop)
{'model1': {'coeff2': 0.5}, 'model2': {'coeff2': 1.5}}
"""
coefficients_to_drop = (
coefficients_to_drop or TEMPORARY_MODEL_COEFFICIENTS_TO_REMOVE
)
_coefficients_to_drop = [
coefficient
for coefficient in coefficients_to_drop
if isinstance(coefficient, str)
]
model_coefficients = drop_models_results(model_coefficients, _coefficients_to_drop)
_coefficients_to_drop = set(coefficients_to_drop) - set(_coefficients_to_drop)
for model_name, coefficient in _coefficients_to_drop:
model_coefficients[model_name].pop(coefficient, None)
return model_coefficients
[docs]def drop_scalers(
scalers: Dict[str, MinMaxScaler],
scalers_to_drop: List[str] | None = None,
) -> Dict[str, MinMaxScaler]:
"""
Exclude scalers from the provided dictionary of scalers.
This function allows for dropping certain scalers based on their names.
The names of the scalers to be dropped are provided in `scalers_to_drop`.
If `scalers_to_drop` isn't specified, a pre-defined list `SCALERS_TO_REMOVE`
is used.
Parameters
----------
scalers : Dict[str, MinMaxScaler]
A dictionary of scaler objects (value) identified by their names (key).
scalers_to_drop : List[str] | None, optional
List of scaler names to drop from the `scalers` dictionary.
If not provided, defaults to a pre-defined list
named `TEMPORARY_SCALERS_TO_REMOVE`.
Returns
-------
Dict[str, MinMaxScaler]
The modified dictionary of scalers, with specified scalers removed.
Examples
--------
Assuming we have the following scalers dictionary and `SCALERS_TO_REMOVE` list:
scalers = {"scaler1": MinMaxScaler1, "scaler2": MinMaxScaler2, "scaler3": MinMaxScaler3}
SCALERS_TO_REMOVE = ["scaler1", "scaler3"]
Calling the function as:
>>> new_scalers = drop_scalers(scalers)
>>> new_scalers
{"scaler2": MinMaxScaler2}
If we specify the `scalers_to_drop` argument:
>>> new_scalers = drop_scalers(scalers, ["scaler2"])
>>> new_scalers
{"scaler1": MinMaxScaler1, "scaler3": MinMaxScaler3}
"""
scalers_to_drop = scalers_to_drop or TEMPORARY_SCALERS_TO_REMOVE
return {
scaler_name: scaler_value
for scaler_name, scaler_value in scalers.items()
if scaler_name not in scalers_to_drop
}
[docs]def filter_corpo_moedor_especifico(
datasets: Dict[str, pd.DataFrame]
) -> Dict[str, pd.DataFrame]:
"""Filter DataFrames in the datasets' dictionary on a specific condition.
This function filters each DataFrame in the provided datasets dictionary
based on the "corpo_moedor_especifico" tag with an upper bound of 10.
It uses the `filter_tag` function to perform the filtering.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary containing string keys and `pandas.DataFrame` as values.
Each DataFrame should contain a column labeled "corpo_moedor_especifico".
Returns
-------
Dict[str, pd.DataFrame]
A dictionary containing the filtered DataFrames, retaining the
same keys as the input dictionary.
See Also
--------
filter_tag : Function used for performing the filtering based on the
provided tag and boundary.
Examples
--------
Given a dictionary of DataFrames `datasets` where each DataFrame
contains a column "corpo_moedor_especifico":
>>> import pandas as pd
>>> datasets = {"df1": pd.DataFrame({"corpo_moedor_especifico": [5, 15, 7]})}
>>> filtered_datasets = filter_corpo_moedor_especifico(datasets)
>>> print(filtered_datasets["df1"])
corpo_moedor_especifico
0 5
2 7
"""
return filter_tag(datasets, "corpo_moedor_especifico", up_bound=10)
[docs]def process_labels(labels: pd.DataFrame) -> pd.DataFrame:
"""Process the labels to add to the optimization problem.
Parameters
----------
labels : pd.DataFrame
A `pandas.DataFrame` containing the labels for the optimization problem.
Returns
-------
pd.DataFrame
A `pandas.DataFrame` containing the processed labels for the optimization problem.
"""
labels = labels.drop_duplicates(subset=["Tag"])
labels = labels.merge(
pd.DataFrame(
[
[old_name, new_name]
for old_name, new_name in constants.TARGETS_IN_MODEL.items()
],
columns=["Tag", "New"],
),
on="Tag",
how="left",
)
labels["Tag"] = np.where(labels["New"].notnull(), labels["New"], labels["Tag"])
return labels.drop(columns=["New"])
[docs]def pivot_optimization_results(res):
"""Pivot the results of the optimization problem.
Parameters
----------
res : pd.DataFrame
A `pandas.DataFrame` containing the results of the optimization problem.
Returns
-------
pd.DataFrame
A `pandas.DataFrame` containing the pivoted results of the
optimization problem.
"""
pivot_data = res.pivot_table(
index=["Tag"], columns=["Faixa"], values=["Valor real"]
)
pivot_data.columns = pivot_data.columns.to_series().reset_index()["Faixa"].values
pivot_data = pivot_data.reset_index()
pivot_data["Tag"] = pivot_data["Tag"].str.replace("'=", "=")
return pivot_data
[docs]def create_results_ranges(res, labels, datasets):
"""Create a DataFrame with the pivoted results of the optimization problem.
Parameters
----------
res : pd.DataFrame
A `pandas.DataFrame` containing the results of the optimization problem.
labels : pd.DataFrame
A `pandas.DataFrame` containing the labels for the optimization problem.
datasets : Dict[str, pd.DataFrame] | None
A dictionary with `pandas.DataFrame` objects for each model used
to create the optimization model.
This dictionary is used to add additional statistics for each tag.
If None, no statistics are added to the results.
Returns
-------
pd.DataFrame
A `pandas.DataFrame` containing the pivoted results of the optimization problem.
"""
labels = process_labels(labels)
piv = pivot_optimization_results(res)
dataset = (
piv.merge(labels, on="Tag", how="left")[
[*piv.columns, *labels.columns.difference(piv.columns)]
]
.fillna("")
.sort_values(by=["Tag"])
.pipe(add_constant_tags_summary)
)
if datasets is not None:
stats_df = compute_statistics_datasets(datasets)
stats_df.index = stats_df.index.to_series().apply(
lambda value: constants.TARGETS_IN_MODEL.get(value, value)
)
dataset = dataset.merge(stats_df, left_on="Tag", right_index=True, how="left")
dataset["Tag"] = (
dataset["Tag"]
.str.replace("=PQ*24/768/FUNC", "'=PQ*24/768/FUNC", regex=False)
.str.replace("=192/VELO", "'=192/VELO", regex=False)
)
dataset = dataset.drop_duplicates(subset=["Tag"])
# The `assign` + `sort_values` + `drop` is a workaround to sort
# the "Tag" column values in a case-insensitive manner.
return (
dataset.assign(Tag_UPPER=lambda xdf: xdf["Tag"].str.upper())
.sort_values("Tag_UPPER")
.drop(columns=["Tag_UPPER"])
)
[docs]def round_value(value: Any, decimals: int = 2) -> Any:
"""Round numeric values with more than to 3 decimal places.
Parameters
----------
value : Any
Value to be rounded, if it is a float.
decimals : int, default=2
Number of decimal places to round to.
Returns
-------
Any
Rounded value, if it is a float. Otherwise, the original value.
"""
# Check if the value is a float
if isinstance(value, float):
# Split the number into its whole and fractional parts
_, fraction = divmod(value, 1)
# If there are more than 3 decimal places
if len(str(fraction)) > 5: # 0.xxxx
return round(value, decimals)
return value
[docs]def create_perfil_temperature_sheet(
ranges_dataframe: pd.DataFrame, wb, sheet_name: str = 'Perfil Grupos de Queima'
):
"""
Adds a sheet to a given workbook and populates it with filtered temperature data and a line chart.
This function filters a DataFrame for specific tags, copies certain columns into a new sheet in the
given Excel workbook, and creates a line chart visualizing temperature ranges. It ensures the sheet
exists (creating it if necessary), adds filtered data, and then constructs and inserts a line chart
based on the data.
Parameters
----------
ranges_dataframe : pd.DataFrame
The DataFrame containing temperature range data to be filtered and copied.
wb : openpyxl.workbook.workbook.Workbook
The workbook where the sheet will be added and populated with data and a chart.
sheet_name : str, optional
The name of the sheet to be added or used for inserting data. Default is 'Perfil Grupos de Queima'.
Raises
------
ValueError
If `ranges_dataframe` does not contain the necessary columns for filtering or chart generation.
See Also
--------
openpyxl.workbook.workbook.Workbook : The Workbook class from openpyxl used to manipulate Excel files.
pandas.DataFrame : The DataFrame class from pandas used for data manipulation and analysis.
Notes
-----
It's important that the `ranges_dataframe` contains columns for 'Tag' and temperature ranges as these
are crucial for the filtering and chart generation processes. The function dynamically adjusts the
y-axis scale of the chart based on the minimum temperature in the data.
References
----------
OpenPyXL documentation : https://openpyxl.readthedocs.io/
Pandas documentation : https://pandas.pydata.org/pandas-docs/stable/
Examples
--------
>>> import pandas as pd
>>> from openpyxl import Workbook
>>> df = pd.DataFrame({
... 'Tag': ['TEMP1_I@08QU-QU-855I-GQ04', 'other_tag'],
... '700-750': [1, 2],
... '750-800': [3, 4],
... '800-850': [5, 6],
... '850-900': [7, 8],
... '900-950': [9, 10],
... '950-1000': [11, 12]
... })
>>> wb = Workbook()
>>> create_perfil_temperature_sheet(df, wb)
>>> 'Perfil Grupos de Queima' in wb.sheetnames
True
"""
if sheet_name not in wb.sheetnames:
wb.create_sheet(sheet_name)
columns_to_copy = ['Tag', '700-750', '750-800', '800-850',
'850-900', '900-950', '950-1000']
tags_to_filter = [
'TEMP1_I@08QU-QU-855I-GQ04',
'TEMP1_I@08QU-QU-855I-GQ05',
'TEMP1_I@08QU-QU-855I-GQ06',
'TEMP1_I@08QU-QU-855I-GQ07',
'TEMP1_I@08QU-QU-855I-GQ08',
'TEMP1_I@08QU-QU-855I-GQ09',
'TEMP1_I@08QU-QU-855I-GQ10',
'TEMP1_I@08QU-QU-855I-GQ11',
'TEMP1_I@08QU-QU-855I-GQ12',
'TEMP1_I@08QU-QU-855I-GQ13',
'TEMP1_I@08QU-QU-855I-GQ14',
'TEMP1_I@08QU-QU-855I-GQ15',
'TEMP1_I@08QU-QU-855I-GQ16',
]
filtered_df = ranges_dataframe[ranges_dataframe['Tag'].isin(tags_to_filter)]
filtered_data_to_write = filtered_df[columns_to_copy]
data_to_write = [columns_to_copy] + filtered_data_to_write.values.tolist()
ws = wb[sheet_name]
for row in data_to_write:
ws.append(row)
max_row = ws.max_row
# Create a line chart with markers
chart = LineChart()
chart.title = "Perfil Grupos de Queima"
chart.x_axis.title = 'Faixas de Produção'
chart.y_axis.title = 'Temperatura Grupos de Queima'
chart.y_axis.scaling.min = (
math.floor(filtered_data_to_write.min(axis=1).min() / 100) * 100
)
# Adding data to the chart
# Data for the chart (excluding the header)
data = Reference(ws, min_col=2, min_row=1, max_col=7, max_row=max_row)
chart.add_data(data, titles_from_data=True)
# Categories (Temperature Range)
cats = Reference(ws, min_col=1, min_row=2, max_row=max_row)
chart.set_categories(cats)
# Optionally, adjust the chart size or position
chart.width = 20
chart.height = 15
# Add the chart to the sheet
ws.add_chart(chart, "I10") # Adjust cell as needed to position the chart
return wb
[docs]def dataframe_to_worksheet(dataframe, worksheet, index=True, header=True):
"""Convert a `pandas.DataFrame` to an OpenPyXL worksheet.
Parameters
----------
dataframe : pd.DataFrame
A `pandas.DataFrame` containing the data to be converted to an OpenPyXL worksheet.
worksheet : openpyxl.worksheet.worksheet.Worksheet
The Worksheet instance, where the data is being stored to.
index : bool, default=True
Whether to display the index in the formatted Excel file.
header : bool, default=True
Whether to display the header in the formatted Excel file.
"""
for r_idx, row in enumerate(
dataframe_to_rows(dataframe, index=index, header=header), 1
):
for c_idx, value in enumerate(row, 1):
cell = worksheet.cell(row=r_idx, column=c_idx, value=round_value(value))
cell.alignment = Alignment(horizontal='center')
# Set header format
if r_idx == 1:
# "d5d5ec" - Light purple
cell.fill = PatternFill(
start_color="d5d5ec", end_color="d5d5ec", fill_type="solid"
)
# "3d3d3d" - Dark gray
cell.font = Font(color="3d3d3d", bold=True)
[docs]def adjust_column_widths(worksheet: openpyxl.worksheet.worksheet.Worksheet):
"""
Adjust the widths of columns in a worksheet.
Column's widths are adjusted based on the
lengthiest value that each column has.
Parameters
----------
worksheet : openpyxl.worksheet.worksheet.Worksheet
The Worksheet instance, where the data is being stored to.
"""
for column in worksheet.columns:
max_length = 0
column_letter = None
for cell in column:
try:
column_letter = cell.column_letter
if len(str(cell.value)) > max_length:
max_length = len(cell.value)
except (AttributeError, ValueError, TypeError):
pass
adjusted_width = max_length + 10 # add a little extra space
if column_letter is not None:
worksheet.column_dimensions[column_letter].width = adjusted_width
[docs]def apply_style_cells(worksheet) -> Tuple[int, int]:
"""Apply "bad" style to cells that have all values equal to zero.
Parameters
----------
worksheet : openpyxl.worksheet.worksheet.Worksheet
The Worksheet instance, where the data is being stored to.
Returns
-------
Tuple[int, int]
A tuple containing the number of results that have all values equal to
zero and the number of results that have at least one value equal to zero.
"""
bad_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
bad_font = Font(color="9C0006", bold=False)
neutral_fill = PatternFill(
start_color="FFEB9C", end_color="FFEB9C", fill_type="solid"
)
neutral_font = Font(color="9C5700", bold=False)
bad_results = 0
warning_results = 0
for row in worksheet.iter_rows(
min_row=2, max_row=worksheet.max_row, min_col=1, max_col=7
):
if row[0].value.startswith("FUNC") or row[0].value in ["floticor", "cfix"]:
continue
_row = [cell for cell in row[1:7] if isinstance(cell.value, (int, float))]
if all(cell.value == 0 for cell in _row):
bad_results += 1
for cell in row:
cell.fill = bad_fill
cell.font = bad_font
elif any(cell.value == 0 for cell in _row):
row[0].fill = neutral_fill
row[0].font = neutral_font
warning_results += 1
for cell in _row:
if cell.value == 0:
cell.fill = neutral_fill
cell.font = neutral_font
elif all(cell.value == row[1].value for cell in _row):
row[0].fill = neutral_fill
row[0].font = neutral_font
warning_results += 1
for cell in _row:
cell.fill = neutral_fill
cell.font = neutral_font
return bad_results, warning_results
[docs]def energy_cons_vents_slopes(df_sql: pd.DataFrame) -> pd.DataFrame:
cons_vents = [f"CONS1_Y@08QU-PF-852I-{idx:02d}M1" for idx in range(1, 9)]
# Define the bins and their corresponding labels
bins = np.arange(700, 1001, 50)
labels = [f"{lb}-{lb + 50}" for lb in bins[:-1]]
# Create the new column 'faixa' using pd.cut
df_sql["faixa"] = pd.cut(
df_sql["PROD_PQ_Y@08US"], bins=bins, labels=labels, right=False
)
cons_vents_slopes = (
df_sql.groupby("faixa")[cons_vents]
.mean()
.round(2)
.apply(
lambda col: pd.Series(
[
(
1
if prev_faixa < curr_faixa
else -1 if prev_faixa > curr_faixa else 0
)
for prev_faixa, curr_faixa in zip(col.iloc[:-1], col.iloc[1:])
],
index=col.index[1:],
name=col.name,
)
)
)
return cons_vents_slopes
[docs]def energy_cons_vents_faixas(pulp_solver, current_faixa: str, df_sql: pd.DataFrame):
"""
Set constraint that sets the values of each fan energy consumption variable to
be greater, smaller, or equal to the values from the previous production range
based on the historical data tendencies between ranges.
For example, for the optimization problem of the production range `"800-850"`
if the average historical values of the tag `"CONS1_Y@08QU-PF-852I-01M1"`
increase relative to the average historical values of the production range
`"750-800"`, then this function will create a constraint that forces the
`"CONS1_Y@08QU-PF-852I-01M1"` variable to be equal to or greater than
101% of the value obtained during the optimization process of the previous
production range.
Parameters
----------
pulp_solver : PulpSolver
The `PulpSolver` class instance, that contains the optimization problems
for each production range.
current_faixa : str
The current production range.
Values should be represented as strings containing two numeric values
separated by `"-"`. For example, `"700-750"`, `"750-800`", `"800-850"`, etc.
df_sql : pd.DataFrame
The pandas DataFrame with all tags represented as columns.
This function expects that the dataframe used contains the tag values
obtained after performing all data transformations and cleaning operations.
This parameter is used to determine whether the fan energy consumption values
of two adjacent production ranges increase, decrease, or stay the same.
Returns
-------
PulpSolver
The `PulpSolver` class instance with the added constraints.
"""
if current_faixa == min(pulp_solver.probs.keys()):
return pulp_solver
cons_vents_slopes = energy_cons_vents_slopes(df_sql)
scalers = pulp_solver.scalers
lb_faixa, _ = current_faixa.split("-")
_lb_faixa = int(lb_faixa)
previous_faixa = f"{_lb_faixa-50}-{_lb_faixa}"
current_prob = pulp_solver.probs[current_faixa]
previous_prob = pulp_solver.probs[previous_faixa]
if previous_prob.status != pulp.LpStatusOptimal:
return pulp_solver
previous_lp_vars = previous_prob.variablesDict()
current_lp_vars = current_prob.variablesDict()
cons_vent_tags = [f"CONS1_Y@08QU-PF-852I-{idx:02d}M1" for idx in range(1, 9)]
for cons_vent_tag in cons_vent_tags:
lpvar_name = pulp.LpVariable(cons_vent_tag).name
previous_lp_var = previous_lp_vars[lpvar_name]
current_lp_var = current_lp_vars[lpvar_name]
current_lp_var_rescaled = denormalize_lpvar(
cons_vent_tag, current_lp_var, scalers
)
previous_cons_vent = denormalize_lpvar(
cons_vent_tag, previous_lp_var, scalers
).value()
slope = cons_vents_slopes.loc[current_faixa, cons_vent_tag]
if slope == -1:
current_prob += (
current_lp_var_rescaled <= previous_cons_vent * 0.99,
f"{current_faixa.replace('-', '_')}_LT_{previous_faixa.replace('-', '_')}_{lpvar_name}",
)
elif slope == 1:
current_prob += (
current_lp_var_rescaled >= previous_cons_vent * 1.01,
f"{current_faixa.replace('-', '_')}_GT_{previous_faixa.replace('-', '_')}_{lpvar_name}",
)
else:
current_prob += (
current_lp_var_rescaled >= previous_cons_vent * 0.95,
f"{current_faixa.replace('-', '_')}_GT_95_PERCENT_{previous_faixa.replace('-', '_')}_{lpvar_name}",
)
current_prob += (
current_lp_var_rescaled <= previous_cons_vent * 1.05,
f"{current_faixa.replace('-', '_')}_LT_105_PERCENT_{previous_faixa.replace('-', '_')}_{lpvar_name}",
)
pulp_solver.probs[current_faixa] = current_prob
return pulp_solver
[docs]def temp_production_ranges_ascending(pulp_solver, current_faixa: str):
"""
Constraint current production range optimization problem using previous range values.
This function performs the following steps:
1. The function checks if the current_faixa is the optimization
problem from the first production range from the `pulp_solver.probs`
dictionary. If it is, the function returns the `pulp_solver` object unchanged.
2. The function retrieves the dictionary of scalers from the `pulp_solver` object.
3. The function calculates the previous production range by subtracting 50
from the lower bound of the current production range.
4. The function retrieves the current and previous probability objects from
the `pulp_solver.probs` dictionary.
5. If the previous probability object is not optimal, the function logs an
error message and returns the `pulp_solver` object unchanged.
6. The function retrieves the LP variables for the current and previous
production ranges.
7. The function creates a list of `"TEMP1_I@08QU-QU-855I-GQXX"` tag names.
Then it retrieves the LP variables for the current and previous production ranges
optimization problems, denormalizes them using the `denormalize_lpvar`
function, and adds a constraint to the current optimization problem instance.
8. The function updates the `pulp_solver.probs` dictionary with the updated
optimization problem instance.
9. The function returns the updated `pulp_solver` object.
Parameters
----------
pulp_solver : PulpSolver
The `PulpSolver` class instance, that contains the optimization problems
for each production range.
current_faixa : str
The current production range.
Values should be represented as strings containing two numeric values
separated by `"-"`. For example, `"700-750"`, `"750-800`", `"800-850"`, etc.
Returns
-------
PulpSolver
The `PulpSolver` class instance with the added constraints.
Notes
-----
To better explain what's the purpose of this function, consider the following
example:
Suppose we're solving 5 optimization problems, for the following production
ranges: `'750-800'`, `'800-850'`, `'850-900'`, `'900-950'`, and `'950-1000'`.
After defining the first production range problem and solving it, the model
returned a value for the tag `'TEMP1_I@08QU-QU-855I-GQ09'` equal to 1355.
In this scenario, this function will force the value of
`'TEMP1_I@08QU-QU-855I-GQ09'` for the next production range of `'800-850'`
to be equal to 1355 or higher.
The next production range model in turn will restrict values that can be
set to this same column based on the second problem's results and so on.
"""
is_first_faixa = current_faixa == min(pulp_solver.probs.keys())
# if current_faixa == min(pulp_solver.probs.keys()):
# return pulp_solver
max_possible_lb_faixa = 950
scalers = pulp_solver.scalers
lb_faixa, _ = current_faixa.split("-")
_lb_faixa = int(lb_faixa)
if is_first_faixa:
previous_faixa = current_faixa
else:
previous_faixa = f"{_lb_faixa - 50}-{_lb_faixa}"
current_prob = pulp_solver.probs[current_faixa]
previous_prob = pulp_solver.probs[previous_faixa]
if not is_first_faixa and previous_prob.status != pulp.LpStatusOptimal:
logger.error(
"Previous production range '%s' status is not optimal: %s",
previous_faixa,
pulp.LpStatus[previous_prob.status],
)
logger.error(
"Not constraining values for 'TEMP1_I@08QU-QU-855I-GQXX' tags to "
"be greater than or equal to the values from %s production range.",
previous_faixa,
)
return pulp_solver
previous_lp_vars = previous_prob.variablesDict()
current_lp_vars = current_prob.variablesDict()
temp_gq_tags = [f"TEMP1_I@08QU-QU-855I-GQ{idx:02d}" for idx in range(9, 17)]
for temp_gq_tag in temp_gq_tags:
lpvar_name = pulp.LpVariable(temp_gq_tag).name
previous_lp_var = previous_lp_vars[lpvar_name]
current_lp_var = current_lp_vars[lpvar_name]
if lpvar_name.endswith("09"):
# new_ub = 1 - ((max_possible_lb_faixa - _lb_faixa) / 25) * 10 / 1360
current_lp_var.upBound = 1
obj = current_prob.objective
obj += (1 - current_lp_var) * 1_000
current_prob.setObjective(obj)
if is_first_faixa:
pulp_solver.probs[current_faixa] = current_prob
return pulp_solver
# current_lp_var_rescaled = denormalize_lpvar(temp_gq_tag, current_lp_var, scalers)
# previous_lp_var_rescaled = denormalize_lpvar(temp_gq_tag, previous_lp_var, scalers).value()
previous_lp_var.name = f"{previous_faixa}_{previous_lp_var.name}"
current_prob += current_lp_var >= previous_lp_var + 0.000001
pulp_solver.probs[current_faixa] = current_prob
return pulp_solver
[docs]def can_define_inter_problem_constraint(pulp_solver, current_faixa) -> bool:
"""
Perform check to determine if inter-problem constraint can be added to the optimization model.
This function checks whether constraints that compare further
optimization problems can be added to a certain production
range optimization problem.
Parameters
----------
pulp_solver : PulpSolver
The `PulpSolver` class instance, that contains the optimization problems
for each production range.
current_faixa : str
The current production range.
Values should be represented as strings containing two numeric values
separated by `"-"`. For example, `"700-750"`, `"750-800`", `"800-850"`, etc.
Returns
-------
bool
True if the constraint can be added to the optimization model and False otherwise.
"""
if current_faixa == min(pulp_solver.probs.keys()):
return False
lb_faixa, _ = current_faixa.split("-")
_lb_faixa = int(lb_faixa)
previous_faixa = f"{_lb_faixa - 50}-{_lb_faixa}"
previous_prob = pulp_solver.probs[previous_faixa]
if previous_prob.status != pulp.LpStatusOptimal:
logger.error(
"Previous production range '%s' status is not optimal: %s",
previous_faixa,
pulp.LpStatus[previous_prob.status],
)
logger.error(
"Not constraining values for 'TEMP1_I@08QU-QU-855I-GQXX' tags to "
"be greater than or equal to the values from %s production range.",
previous_faixa,
)
return False
return True
[docs]class FakeScaler(BaseEstimator, TransformerMixin):
"""
A fake scaler class that contains attributes used by SciKit Learn scalers.
This class simulates the behavior of a feature scaler but does not
implement any scaling.
Parameters
----------
feature_range : tuple of (min, max), default=(0, 1)
The desired range of transformed data. This parameter is not used
in actual transformations but is kept for interface compatibility.
Attributes
----------
n_features_in_ : int
The number of features observed during `fit`.
n_samples_seen_ : int
The number of samples observed during `fit`.
min_ : ndarray of shape (n_features_in_,)
The minimum value in each feature in the fitted data.
max_ : ndarray of shape (n_features_in_,)
The maximum value in each feature in the fitted data.
data_range_ : ndarray of shape (n_features_in_,)
The data range (max - min) for each feature in the fitted data.
data_min_ : ndarray of shape (n_features_in_,)
The minimum value in each feature in the fitted data.
data_max_ : ndarray of shape (n_features_in_,)
The maximum value in each feature in the fitted data.
scale_ : ndarray of shape (n_features_in_,)
The scaling factors applied to each feature. Set to ones.
mean_ : ndarray of shape (n_features_in_,)
The mean value for each feature in the fitted data.
center_ : ndarray of shape (n_features_in_,)
The centering value for each feature.
Methods
-------
fit(X, y=None)
Compute the necessary attributes from the training data.
transform(X)
Returns the original data as it is.
inverse_transform(X)
Reverses the transformation, effectively returning the original data.
Examples
--------
>>> import numpy as np
>>> X = np.array([[1, 2], [3, 4]])
>>> scaler = FakeScaler()
>>> scaler.fit(X)
FakeScaler()
>>> scaler.transform(X)
array([[1, 2],
[3, 4]])
"""
def __init__(self, feature_range=(0, 1)):
self.feature_range = feature_range
[docs] def fit(self, X, y=None):
"""
Compute the minimum, maximum, mean, and range for each feature in X.
Assumes that `X` is a numpy array, `pandas.Series`, or `pandas.DataFrame`.
This method calculates basic statistics for each feature but does not
scale the data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data, where n_samples is the number of samples and
n_features is the number of features.
y : Ignored
This parameter is not used in this method.
Returns
-------
self : object
Returns self.
Raises
------
ValueError
If the input array `X` does not meet the expected criteria.
"""
# Convert X to a numpy array if it's not already.
X = np.asarray(X)
# Check and handle the shape of X
if X.ndim == 1:
X = X.reshape(-1, 1)
# Update attributes based on X
self.n_samples_seen_, self.n_features_in_ = X.shape
self.min_ = np.min(X, axis=0)
self.max_ = np.max(X, axis=0)
self.data_range_ = self.max_ - self.min_
self.data_min_ = self.min_
self.data_max_ = self.max_
self.scale_ = np.ones(self.n_features_in_)
self.mean_ = np.mean(X, axis=0)
self.center_ = self.mean_
return self
# noinspection PyPep8Naming
# noinspection PyPep8Naming
[docs]def adjust_models_coefficients(models_results: dict, scalers: dict) -> dict:
"""Change the coefficients of Ridge using scalers.
Parameters
----------
models_results : dict
Results of all models
scalers: dict
Adjusted scalers for all tags
Returns
-------
models_results: dict
Results containing the regression models with adjusted coefficients
"""
for model_name in models_results.keys():
for var_index, tag in enumerate(
models_results[model_name][0]['grid'].best_estimator_.feature_names_in_
):
if tag.startswith('qtde') or tag.startswith('SOMA FUNC'):
continue
scaler = scalers[tag]
scaler_name = scaler.__class__.__name__
if scaler_name == 'RobustScaler':
models_results[model_name][0]['grid'].best_estimator_[1].coef_[
var_index
] /= scaler.scale_
models_results[model_name][0]['grid'].best_estimator_[
1
].intercept_ -= np.sum(
models_results[model_name][0]['grid']
.best_estimator_[1]
.coef_[var_index]
* scaler.center_
)
elif scaler_name == 'StandardScaler':
models_results[model_name][0]['grid'].best_estimator_[1].coef_[
var_index
] /= scaler.scale_
models_results[model_name][0]['grid'].best_estimator_[
1
].intercept_ -= np.dot(
models_results[model_name][0]['grid']
.best_estimator_[1]
.coef_[var_index],
scaler.mean_,
)
else:
scaled_coef = (
models_results[model_name][0]['grid']
.best_estimator_[1]
.coef_[var_index]
)
scaled_intercept = (
models_results[model_name][0]['grid'].best_estimator_[1].intercept_
)
unscaled_coef = scaled_coef * scaler.scale_
unscaled_intercept = scaled_intercept - (
scaled_coef * scaler.scale_
).dot(scaler.data_min_)
models_results[model_name][0]['grid'].best_estimator_[1].coef_[
var_index
] = unscaled_coef
models_results[model_name][0]['grid'].best_estimator_[
1
].intercept_ = unscaled_intercept
models_results[model_name][0]['model'] = models_results[model_name][0][
'grid'
].best_estimator_[1]
models_results[model_name][0]['params'] = (
models_results[model_name][0]['grid'].best_estimator_[1].coef_
)
return models_results
[docs]def filter_datasets_df_sql_by_date(
datasets: Dict[str, pd.DataFrame],
df_sql: pd.DataFrame,
n_days: int = 60,
) -> Tuple[Dict[str, pd.DataFrame], pd.DataFrame]:
"""
Filter the `datasets` and `df_sql` DataFrame by date.
This function filters each DataFrame in the provided datasets dictionary
and the df_sql DataFrame based on a lower-bound date. The lower-bound date
is calculated as the current date minus a specified number of days.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary containing string keys and `pandas.DataFrame` as values.
Each DataFrame should contain a DateTime index.
df_sql : pd.DataFrame
A DataFrame with a DateTime index to be filtered.
n_days : int, default=60
The number of days to subtract from the current date to get the
lower-bound date.
Returns
-------
Tuple[Dict[str, pd.DataFrame], pd.DataFrame]
A tuple containing the filtered `datasets` dictionary and
the filtered `df_sql` DataFrame.
"""
lb_date = pd.Timestamp.today() - relativedelta(days=n_days) # '2023-10-12'
new_datasets = {}
for name, df in datasets.items():
_df = df.loc[lb_date:, :]
nrows, ncols = _df.shape
print(f"New {name!r} contains: {nrows:,} rows and {ncols:,} columns")
new_datasets[name] = _df
_df_sql = df_sql.loc[lb_date:, :]
return new_datasets, _df_sql
# def bounds_mapping_groupby(grp: pd.DataFrame):
# return tuple((grp[col].quantile(0.10), grp[col].quantile(0.90)) for col in grp.columns)
[docs]def bounds_mapping_groupby(grp: pd.DataFrame):
return tuple(
[
*[
(round(grp[col].quantile(0.02), 2), round(grp[col].quantile(0.98), 2))
for col in grp.columns.difference(["cfix_group_lb", "cfix_group_ub"])
],
(round(grp["cfix_group_lb"].max(), 2), round(grp["cfix_group_ub"].max(), 2)),
]
)
[docs]def link_queima_vars(problem, datasets, faixa):
lb_faixa, ub_faixa = tuple(map(int, faixa.split("-")))
cfix_tag = "QUIM_CFIX_PP_L@08PR"
gas_tag = "VAZA3_I@08QU-ST-855I-01"
prod_pq_tag = "PROD_PQ_Y@08US"
umid_tag = "UMID_H2O_PR_L@08FI"
abrasao_tag = "ABRA_-0,5_PQ_L@08QU"
compressao_tag = "COMP_MCOMP_PQ_L@08QU"
cons_vents_tags = "CONS1_Y@08QU-VENT"
temp_gq_tags = [f"TEMP1_I@08QU-QU-855I-GQ{idx:02d}" for idx in range(4, 17)]
_datasets = datasets.copy()
for model_name in find_models_by_tag(cfix_tag, _datasets):
df = _datasets[model_name]
_datasets[model_name] = df.loc[(df[cfix_tag] >= 1.15) & (df[cfix_tag] <= 1.40), :]
for model_name in find_models_by_tag(gas_tag, _datasets):
df = _datasets[model_name]
_datasets[model_name] = df.loc[(df[gas_tag] >= 7.50) & (df[gas_tag] <= 18), :]
for model_name in find_models_by_tag(abrasao_tag, _datasets):
df = _datasets[model_name]
_datasets[model_name] = df.loc[df[abrasao_tag] <= 5.50, :]
for model_name in find_models_by_tag(compressao_tag, _datasets):
df = _datasets[model_name]
_datasets[model_name] = df.loc[df[compressao_tag] >= 270, :]
for model_name in find_models_by_tag(prod_pq_tag, _datasets):
df = _datasets[model_name]
_datasets[model_name] = df.loc[(df[prod_pq_tag] >= lb_faixa) & (df[prod_pq_tag] < ub_faixa), :]
for model_name in find_models_by_tag(cons_vents_tags, _datasets):
df = _datasets[model_name]
_datasets[model_name] = df.loc[(df[cons_vents_tags] >= 12) & (df[cons_vents_tags] <= 18), :]
umid_df = get_column_values_from_datasets(umid_tag, _datasets)
prod_pq_df = get_column_values_from_datasets(prod_pq_tag, _datasets)
cfix_df = get_column_values_from_datasets(cfix_tag, _datasets)
gas_df = get_column_values_from_datasets(gas_tag, _datasets)
abrasao_df = get_column_values_from_datasets(abrasao_tag, _datasets)
compressao_df = get_column_values_from_datasets(compressao_tag, _datasets)
cons_vents_df = get_column_values_from_datasets(cons_vents_tags, _datasets)
temp_gq_dfs = [
get_column_values_from_datasets(temp_gq_tag, _datasets)
for temp_gq_tag in temp_gq_tags
]
temp_df = multi_merge(
[
*temp_gq_dfs,
umid_df, prod_pq_df, cfix_df,
gas_df, abrasao_df, compressao_df,
cons_vents_df
],
left_index=True,
right_index=True,
how="inner",
).round(2)
# variables_config = temp_df.columns
# bounds_mapping_config = temp_df.groupby(cfix_tag).apply(bounds_mapping_groupby).to_list()
step = 0.02
bins = np.arange(temp_df[cfix_tag].min(), temp_df[cfix_tag].max() + 0.1, step)
temp_df['cfix_group_lb'] = pd.cut(temp_df[cfix_tag], bins=bins, labels=bins[:-1], right=False, include_lowest=True)
temp_df['cfix_group_ub'] = pd.cut(temp_df[cfix_tag], bins=bins, labels=bins[1:], right=False, include_lowest=True)
temp_df['cfix_group_ub'] = temp_df['cfix_group_ub'].apply(lambda v: v - step / 10)
temp_df = temp_df.drop(columns=[cfix_tag]).astype({"cfix_group_lb": float, "cfix_group_ub": float})
bounds_mapping_config = temp_df.groupby(["cfix_group_lb"]).apply(bounds_mapping_groupby).to_list()
variables_config = [*sorted(temp_df.columns.drop(["cfix_group_lb", "cfix_group_ub"], errors="ignore")), cfix_tag]
problem.constraints.pop("cfix_constant_value", None)
problem = bounds_linking(problem, variables_config, bounds_mapping_config)
return problem
[docs]def bounds_linking(
problem: pulp.LpProblem,
variables: List[str],
bounds_mapping: List[tuple],
) -> pulp.LpProblem:
# Dynamically create variables based on provided definitions
var_dict = {}
for var_name in variables:
var_dict[var_name] = problem.variablesDict()[constants.TARGETS_IN_MODEL.get(var_name, pulp.LpVariable(var_name).name)]
# Define constraints using "big-M" method with binary control
M = 100 # A large constant for the big-M method
binary_variables = []
for idx, bounds in enumerate(bounds_mapping):
binary_name = f"binary_{idx}"
# Create a binary variable for each configuration
bin_var = pulp.LpVariable(binary_name, cat='Binary')
binary_variables.append(bin_var)
for var_name, bound in zip(variables, bounds):
# Apply big-M method constraints to link the binary variable with
# the range for each variable
problem += (
var_dict[var_name] >= bound[0] - M * (1 - bin_var),
f"{binary_name}_Link_Lower_{var_name}",
)
problem += (
var_dict[var_name] <= bound[1] + M * (1 - bin_var),
f"{binary_name}_Link_Upper_{var_name}",
)
# Ensure only one binary variable can be active (1) at any time
problem += pulp.lpSum(binary_variables) == 1, "Single_Active_Binary"
return problem