Source code for wip.modules.ops

""" Operations applied over the data """

from __future__ import annotations

import os
from io import BufferedReader
from typing import Dict
from typing import List
from typing import Tuple

import chardet
import numpy as np
import pandas as pd
import pulp
from sklearn.preprocessing import MinMaxScaler

from wip.constants import constants
from wip.datatools.io_ops import read_json
from wip.datatools.io_ops import read_text
from wip.datatools.io_ops import to_text
from wip.logging_config import logger


[docs]def normalize_feature(scalers, feature, norm_value): """ Normalize a given feature value based on scalers. This function takes a dictionary of scalers, a feature key, and a value to normalize. It returns the normalized value based on the given scalers for the specified feature. .. attention:: This function assumes that the given scalers are `sklearn.preprocessing.MinMaxScaler` objects. It tries to access the attributes `data_range_` and `data_min_` of the scaler object, that only exist inside the `MinMaxScaler` class. Parameters ---------- scalers : Dict[str, MinMaxScaler] Dictionary containing scaler objects, where the keys are feature names and the values are scaler objects with `data_min_` and `data_range_` attributes. feature : str The key corresponding to the feature to be normalized in the `scalers` dictionary. norm_value : float The value of the specified feature to normalize. Returns ------- float The normalized value of `norm_value` for the given feature using the provided scalers. The scaled value represents a value between 0 and 1. Examples -------- >>> from sklearn.preprocessing import MinMaxScaler >>> import numpy as np >>> data = np.array([[1, 2], [3, 4], [5, 6]]) >>> scaler = MinMaxScaler().fit(data) >>> scalers = {"feature1": scaler} >>> normalize_feature(scalers, "feature1", 4) 0.5 Notes ----- This function applies the min-max scaling formula manually. Applying the normalization formula manually allows for single value normalization, without having to implement logic that transforms the single value into a numpy array, and then back into a single value. The min-max scaling formula is as follows: .. math:: X_{norm} = \\frac{X - X_{min}}{X_{max} - X_{min}} """ # normalize_value = norm_value - scalers[feature].data_min_[0] # return normalize_value / scalers[feature].data_range_[0] return norm_value
[docs]def unnormalize_feature( scalers: Dict[str, MinMaxScaler], feature: str, norm_value: float, operation: str = "two feature", ) -> float | Tuple[float, float]: """ Unnormalize a given feature value based on scalers. This method contains two modes of rescailing the data: - `two feature`: Use this mode to rescale the coefficients and intercepts of a linear regression model. In other words, this method can be used to convert the normalized coefficients and intercepts of a linear regression model, enabling their use with the original data, without having to normalize it first. The `first_value` and `second_value` returned by this method represent the new unscaled coefficient and intercept, respectively. The `second_value` that's returned needs to be subtracted from the normalized intercept to obtain its unscaled value. - `one feature`: Use this mode to rescale a single feature value that was normalized using min-max scaling. Parameters ---------- scalers : Dict[str, MinMaxScaler] Dictionary containing scaler objects, where the keys represent the feature names, and the values are their fitted `sklearn.preprocessing.MinMaxScaler` objects. feature : str The key corresponding to the feature to be unnormalized. This feature must be present in the `scalers` dictionary. norm_value : float The normalized value of the specified feature to rescale back to the original range. operation : str {"two feature", "one feature"}, default="two feature" The operation to perform. If `operation` is "two feature", then the `norm_value` is assumed to be the coefficient of a linear regression model, related to the specified `feature`. If `operation` is "one feature", then the `norm_value` is assumed to be a single value normalized using min-max scaling. Returns ------- float | Tuple[float, float] If `operation` is "two feature", then a tuple containing the unscaled coefficient and intercept of the linear regression model is returned. If `operation` is "one feature", then the unscaled value of the specified feature is returned. Examples -------- The example below shows how to use this method to rescale the coefficients and intercept of a linear regression model. >>> import numpy as np >>> from sklearn.preprocessing import MinMaxScaler >>> from sklearn.linear_model import Ridge >>> X = np.array([[100, 400], [200, 300], [300, 200], [400, 100]]) >>> y = np.array([1, 2, 3, 4]) >>> scalers = {idx: MinMaxScaler().fit(X[:, idx].reshape(-1, 1)) for idx in range(X.shape[1])} >>> X_norm = np.array( ... [scalers[idx].transform(X[:, idx].reshape(-1, 1)).reshape(-1) ... for idx in range(X.shape[1])] ... ).T >>> print(X_norm) [[0. 1. ] [0.33333333 0.66666667] [0.66666667 0.33333333] [1. 0. ]] >>> model = Ridge().fit(X_norm, y) >>> print(X_norm[0].dot(model.coef_) + model.intercept_) # noqa 1.710526315789474 >>> # Same as model.predict(X_norm[0].reshape(1, -1)) >>> intercept = model.intercept_ # noqa >>> coeff_unscaled = [] >>> for idx, coeff in enumerate(model.coef_): # noqa ... coeff, intercept_unscaled = unnormalize_feature(scalers, idx, coeff) ... intercept -= intercept_unscaled ... coeff_unscaled.append(coeff) ... print(X[0].dot(np.array(coeff_unscaled)) + intercept) 1.710526315789474 The next example demonstrates the use of this method to rescale a single feature value that was normalized using min-max scaling: >>> print(unnormalize_feature(scalers, "0", np.array([[X_norm[0, 0]]]), "one_feature")) array([[100.]]) Notes ----- The min-max scaling formula is as follows: .. math:: X_{norm} = \\frac{X - X_{min}}{X_{max} - X_{min}} The unscaled value for `X` can be obtained by rearranging the above formula: .. math:: X = X_{norm} \\times (X_{max} - X_{min}) + X_{min} See Also -------- solver_ops.solver_operations.write_descriptive_contraints Function that uses this method to rescale the coefficients and intercepts before using them to define an LP optimization model. """ # scale_var = scalers[feature] # if operation == "two feature": # first_value = norm_value / (scale_var.data_max_[0] # - scale_var.data_min_[0]) # second_value = scale_var.data_min_[0] * first_value # return first_value, second_value # return norm_value * scale_var.data_range_[0] + scale_var.data_min_[0] if operation == "two feature": # first_value = norm_value / (scale_var.data_max_[0] - scale_var.data_min_[0]) # second_value = scale_var.data_min_[0] * first_value # return first_value, second_value return norm_value, 0 # return norm_value * scale_var.data_range_[0] + scale_var.data_min_[0] return norm_value
[docs]def string_in_list(string: str, list_strings: List[str]) -> bool: """Check if a string starts with any element in a list of strings. This function iterates through the list of strings and checks if the given string starts with any of the elements in the list. If there's a match, it returns `True`. Otherwise, it returns `False`. If an empty string is provided, it returns `False`. Parameters ---------- string : str The string to be checked for starting substrings. list_strings : List[str] The list of strings that'll be checked as starting substrings of the input string. Returns ------- bool `True` if the input string starts with any element in the list, `False` otherwise. Examples -------- >>> string_in_list("hello", ["hi", "hell"]) True >>> string_in_list("world", ["wor", "earth"]) True >>> string_in_list("", ["hi", "hello"]) False >>> string_in_list("goodbye", ["hi", "hello"]) False """ if not string: return False for value in list_strings: if string.startswith(value): return True return False
[docs]def read_json_dls(file_path, file_name): """Read a json file from ADLS. Parameters ---------- file_path : str The path to the json file. file_name : str The name of the json file. Returns ------- Dict[str, Any] The data within the json file. """ # return load_adls_json(file_path, file_name) return read_json(os.path.join(file_path, file_name))
[docs]def scaling_target_values(feature, scalers, lmin, lmax): """ Scale the lower and upper bounds of a target variable using their scaler. Parameters ---------- feature : str Target feature name. scalers : Dict[str, MinMaxScaler] Dictionary containing scaler objects, where the keys are feature names and the values are their fitted `sklearn.preprocessing.MinMaxScaler` objects. lmin : float The lower bound of the target feature. lmax : float The upper bound of the target feature. Returns ------- Tuple[float, float, str] The scaled lower and upper bounds of the target feature, along with the feature name. Examples -------- >>> from sklearn.preprocessing import MinMaxScaler >>> import numpy as np >>> data = np.array([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]) >>> scaler = MinMaxScaler().fit(data.reshape(-1, 1)) >>> feature = "QUIM_CFIX_PP_L@08PR" >>> scalers = {feature: scaler} >>> lmin = 0.2 >>> lmax = 0.8 >>> result = scaling_target_values(feature, scalers, lmin, lmax) >>> print(result) (20.0, 80.0, 'cfix') """ # lmin = lmin * scalers[feature].data_range_[0] + scalers[feature].data_min_[0] # lmax = lmax * scalers[feature].data_range_[0] + scalers[feature].data_min_[0] if lmin > lmax: logger.error( "Feature %s has lmin > lmax: %s > %s. " "Switching limits with one another. " "Please check the data and problem " "formulation and use output with caution.", feature, lmin, lmax, ) lmin, lmax = lmax, lmin return lmin, lmax, constants.TARGETS_IN_MODEL[feature]
[docs]def replace_string_from_file(solver_path, range_min=None, range_max=None): """Replace "." by "," in the file. Parameters ---------- solver_path : str The path to the solver file. range_min : int, optional The minimum range value, by default None range_max : int, optional The maximum range value, by default None """ file = solver_path if range_min and range_max: file = os.path.join( solver_path, f"restricoes-faixa-{range_min}-{range_max}.txt" ) data = read_text(file, encoding="ISO-8859-1").replace(".", ",") # data = read_text_file(file) # with open(file, "w", encoding="utf-8") as constraint_file: # constraint_file.write(data) to_text(data, file)
[docs]def define_real_scalers(datasets: Dict[str, pd.DataFrame]): """Define a scaler for each variable from the `datasets`. Parameters ---------- datasets : Dict[str, pd.DataFrame] The dictionary of datasets, Each key/value pair represents a model and its corresponding dataset. Returns ------- Dict[str, MinMaxScaler] The dictionary of scalers, with the new scalers added. np.ndarray A 2D array of values used to fit the scalers. """ # from wip.temporary import FakeScaler all_columns = list(map(lambda x: list(datasets[x].columns), datasets)) all_columns = sum(all_columns, []) all_columns = list( filter( lambda x: "qtde" not in x and "NIVE1" not in x and "SOMA_FUNC" not in x and "SOMA FUNC" not in x, all_columns, ) ) scalers = {} for column in set(all_columns): if column.startswith("qtde") or column.startswith("SOMA FUNC"): continue all_df = list(filter(lambda x: column in datasets[x].columns, datasets)) scalers[column] = MinMaxScaler() if len(all_df) == 1: # then this tag is in more than one dataset scalers[column].fit(datasets[all_df[0]][[column]]) new_values = datasets[all_df[0]][column].values else: values = list(map(lambda x: list(datasets[x][column].values), all_df)) values = sum(values, []) new_values = [] for value in values: if isinstance(value, (np.int64, np.float64)): new_values.append(value) else: new_values.extend(value) if "Consumo de Energia (base minério úmido) kWh/ton" in column: new_values.append(0) new_values = np.array(new_values).reshape(-1, 1) scalers[column].fit(new_values) return scalers, new_values
[docs]def detect_file_encoding(filename: str | BufferedReader) -> str: """Detect the character encoding of a file or `BufferedReader`. This function uses the `chardet` library to determine the character encoding of the input file or `BufferedReader` object. The input can be a file path (as a string) or a `BufferedReader` object. Parameters ---------- filename : str | BufferedReader File path (as a string) or a `BufferedReader` object for which the character encoding needs to be detected. Returns ------- str The detected character encoding of the input file or `BufferedReader`. Examples -------- >>> file_path = "example.txt" >>> encoding = detect_file_encoding(file_path) >>> print(encoding) 'utf-8' Notes ----- This function uses the `chardet` library to detect the character encoding of the input file or `BufferedReader` object. The `chardet` library evaluates the file content and returns the encoding along with a confidence score. The confidence score means the certainty of the encoding being correct. The encoding with the highest """ if isinstance(filename, BufferedReader): result = chardet.detect(filename.read()) return result['encoding'] with open(filename, 'rb') as rawdata: result = chardet.detect(rawdata.read()) return result['encoding']
[docs]def fit_scalers_to_tag(tag_values: pd.Series) -> object: """Fit a `MinMaxScaler` to a tag's values. Parameters ---------- tag_values : pd.Series The `pandas.Series` containing the values of a tag. Returns ------- object Fitted scaler. Notes ----- When fitting a `MinMaxScaler` to an array-like object that contains a single column, the array-like object needs to be reshaped to a 2D array prior to fitting the scaler with it. This function's main purpose is to provide a simple shortcut to do so, without having to manually reshape the array-like object every time. """ return MinMaxScaler().fit(tag_values.values.reshape(-1, 1))
[docs]def inverse_transform_lpvar( lpvar: pulp.LpVariable, scaler: MinMaxScaler ) -> pulp.LpAffineExpression: """Get the inverse transform a `LpVariable`. Parameters ---------- lpvar : pulp.LpVariable The `pulp.LpVariable` to inverse transform. scaler : MinMaxScaler The Scaler used to transform the `LpVariable` Returns ------- pulp.LpAffineExpression Inverse transformed `LpVariable`. Notes ----- Variables on the optimization model are normalized to the range [0, 1] using `sklearn.preprocessing.MinMaxScaler`. Although this normalization is necessary due to the architecture of the optimization problem, some constraints require the comparison of variables that can only occur if they're on their original scale. Examples -------- >>> import numpy as np >>> from sklearn.preprocessing import MinMaxScaler >>> data = np.array([1, 20, 80, 85, 55, 100]) >>> # Fit a scaler to the example data defined above. >>> scaler = MinMaxScaler().fit(data.reshape(-1, 1)) >>> scaled_data = scaler.transform(data.reshape(-1, 1)).reshape(-1) >>> # Show what the scaled data looks like. It should contain values that >>> # range from 0 to 1. >>> print(scaled_data) [0. 0.19191919 0.7979798 0.84848485 0.54545455 1. ] >>> # Rescale the last scaled value back to its original value: >>> print(inverse_transform_lpvar(scaled_data[-1], scaler)) 99.99999999999999 """ # return (lpvar - scaler.min_[0]) * (1 / scaler.scale_[0]) return lpvar
[docs]def get_original_tag_name(otm_tag_name: str) -> str: """Get original tag name from OTM tag name. Parameters ---------- otm_tag_name : str OTM tag name Returns ------- str Original tag name Examples -------- >>> get_original_tag_name("TEMP1_I@08QU_QU_855I_GQ16") 'TEMP1_I@08QU-QU-855I-GQ16' >>> get_original_tag_name("cfix") 'cfix' >>> get_original_tag_name('equalPQmult24div768divFUNC') '=PQ*24/768/FUNC' >>> get_original_tag_name('equal192divVELO') '=192/VELO' >>> get_original_tag_name('GRAN_OCS_16-18@08PE-BD-840I-01') 'GRAN_OCS_16-18@08PE-BD-840I-01' >>> get_original_tag_name("POT_TOTAL_VENT___US8") 'POT TOTAL VENT - US8' """ sufix = None prefix, *sufix = ( otm_tag_name.replace("cfix", "QUIM_CFIX_PP_L@08PR") .replace("gas", "VAZA3_I@08QU-ST-855I-01") .replace("NUM_FILTR_FUNC___US8", "NUM FILTR FUNC - US8") .replace("GRAN_OCS_16_18", "GRAN_OCS_16-18") .replace("GRAN_OCS_12_16", "GRAN_OCS_12-16") .replace("GRAN_OCS_10_12", "GRAN_OCS_10-12") .replace("GRAN_OCS_8_10", "GRAN_OCS_8-10") .replace("GRAN_OCS_5_8", "GRAN_OCS_5-8") .replace("POT_TOTAL_VENT___US8", "POT TOTAL VENT - US8") .replace("MAIOR___MENOR_ALT_CAMADA", "MAIOR - MENOR ALT CAMADA") .replace("PV_TEMP_GQ3_16_MED___US8", "PV TEMP GQ3-16-MED - US8") .replace("GANHO_PRENSA___US8", "GANHO PRENSA - US8") .replace("CONS_ESPEC_EE_VENT___US8", "CONS ESPEC EE VENT - US8") .replace("DESV_MEDIO_ALT_CAMADA", "DESV MEDIO ALT CAMADA") .replace("GRAN_OCS_10_16@08PE_BD_840I_01", "GRAN_OCS_10-16@08PE-BD-840I-01") .replace("GRAN_OCS_10_16@08PE_BD_840I_02", "GRAN_OCS_10-16@08PE-BD-840I-02") .replace("GRAN_OCS_10_16@08PE_BD_840I_03", "GRAN_OCS_10-16@08PE-BD-840I-03") .replace("GRAN_OCS_10_16@08PE_BD_840I_04", "GRAN_OCS_10-16@08PE-BD-840I-04") .replace("GRAN_OCS_10_16@08PE_BD_840I_05", "GRAN_OCS_10-16@08PE-BD-840I-05") .replace("GRAN_OCS_10_16@08PE_BD_840I_06", "GRAN_OCS_10-16@08PE-BD-840I-06") .replace("GRAN_OCS_10_16@08PE_BD_840I_07", "GRAN_OCS_10-16@08PE-BD-840I-07") .replace("GRAN_OCS_10_16@08PE_BD_840I_08", "GRAN_OCS_10-16@08PE-BD-840I-08") .replace("GRAN_OCS_10_16@08PE_BD_840I_09", "GRAN_OCS_10-16@08PE-BD-840I-09") .replace("GRAN_OCS_10_16@08PE_BD_840I_10", "GRAN_OCS_10-16@08PE-BD-840I-10") .replace("GRAN_OCS_10_16@08PE_BD_840I_11", "GRAN_OCS_10-16@08PE-BD-840I-11") .replace("GRAN_OCS_10_16@08PE_BD_840I_12", "GRAN_OCS_10-16@08PE-BD-840I-12") .replace("GRAN_PR", "GRAN_-0,045_PR_L@08FI") .replace("SE_PP", "SUP_SE_PP_L@08PR") .replace("SE_PR", "SUP_SE_PR_L@08FI") .replace( "__DIF_PRODUTIVI_EFETIVA___VIRTUAL___CALC___US8", "10 - DIF PRODUTIVI EFETIVA - VIRTUAL - CALC - US8", ) .replace("bomba_de_retorno_tanque", "bomba de retorno tanque") .replace("floticor", "floticor") .replace("media_GRAN_10_12", "media GRAN 10-12") .replace("media_GRAN_16_18", "media GRAN 16-18") .replace("media_GRAN_16_", "media GRAN 16+") .replace("media_GRAN__5_8", "media GRAN _5-8") .replace("media_de_densidade", "media de densidade") .replace("media_press_1", "media press 1") .replace("media_press_2", "media press 2") .replace("media_press_3", "media press 3") .replace("media_press_4", "media press 4") .replace("media_temp_1", "media temp 1") .replace("media_temp_2", "media temp 2") .replace("media_temp_3", "media temp 3") .replace("media_temp_4", "media temp 4") .replace("media_tm", "media tm") .replace("media_vel_de_disco_de_pel", "media vel de disco de pel") .replace("mediana_de_rotacao", "mediana de rotacao") .replace("relacao_gran", "rel_gran") .replace( "soma_balanca_bentonita_misturador", "soma balanca bentonita misturador" ) .replace("soma_balanca_minerio_misturador", "soma balanca minerio misturador") .replace("soma_balanca_retorno_correia", "soma balanca retorno correia") .replace('div', '/') .replace('mult', '*') .replace('plus', '+') .replace('minus', '-') .replace('equal', '=') .replace('___', ' - ') .split('@') ) if len(sufix) == 0: return prefix return prefix + '@' + (''.join(sufix).replace('___', ' - ').replace('_', '-'))