""" Operations applied over the data """
from __future__ import annotations
import os
from io import BufferedReader
from typing import Dict
from typing import List
from typing import Tuple
import chardet
import numpy as np
import pandas as pd
import pulp
from sklearn.preprocessing import MinMaxScaler
from wip.constants import constants
from wip.datatools.io_ops import read_json
from wip.datatools.io_ops import read_text
from wip.datatools.io_ops import to_text
from wip.logging_config import logger
[docs]def normalize_feature(scalers, feature, norm_value):
"""
Normalize a given feature value based on scalers.
This function takes a dictionary of scalers, a feature key, and a value
to normalize. It returns the normalized value based on the given scalers
for the specified feature.
.. attention::
This function assumes that the given scalers are
`sklearn.preprocessing.MinMaxScaler` objects.
It tries to access the attributes `data_range_` and `data_min_`
of the scaler object, that only exist inside the
`MinMaxScaler` class.
Parameters
----------
scalers : Dict[str, MinMaxScaler]
Dictionary containing scaler objects, where the keys are feature names
and the values are scaler objects with `data_min_`
and `data_range_` attributes.
feature : str
The key corresponding to the feature to be normalized in
the `scalers` dictionary.
norm_value : float
The value of the specified feature to normalize.
Returns
-------
float
The normalized value of `norm_value` for the given feature using
the provided scalers. The scaled value represents a value
between 0 and 1.
Examples
--------
>>> from sklearn.preprocessing import MinMaxScaler
>>> import numpy as np
>>> data = np.array([[1, 2], [3, 4], [5, 6]])
>>> scaler = MinMaxScaler().fit(data)
>>> scalers = {"feature1": scaler}
>>> normalize_feature(scalers, "feature1", 4)
0.5
Notes
-----
This function applies the min-max scaling formula manually.
Applying the normalization formula manually allows for single value
normalization, without having to implement logic that transforms
the single value into a numpy array, and then back into a single value.
The min-max scaling formula is as follows:
.. math::
X_{norm} = \\frac{X - X_{min}}{X_{max} - X_{min}}
"""
# normalize_value = norm_value - scalers[feature].data_min_[0]
# return normalize_value / scalers[feature].data_range_[0]
return norm_value
[docs]def unnormalize_feature(
scalers: Dict[str, MinMaxScaler],
feature: str,
norm_value: float,
operation: str = "two feature",
) -> float | Tuple[float, float]:
"""
Unnormalize a given feature value based on scalers.
This method contains two modes of rescailing the data:
- `two feature`: Use this mode to rescale the coefficients and
intercepts of a linear regression model. In other words,
this method can be used to convert the normalized coefficients
and intercepts of a linear regression model, enabling
their use with the original data, without having to normalize
it first. The `first_value` and `second_value` returned by this
method represent the new unscaled coefficient and intercept,
respectively. The `second_value` that's returned needs to be
subtracted from the normalized intercept to obtain its unscaled
value.
- `one feature`: Use this mode to rescale a single feature value
that was normalized using min-max scaling.
Parameters
----------
scalers : Dict[str, MinMaxScaler]
Dictionary containing scaler objects, where the keys represent the
feature names, and the values are their fitted
`sklearn.preprocessing.MinMaxScaler` objects.
feature : str
The key corresponding to the feature to be unnormalized. This feature must be
present in the `scalers` dictionary.
norm_value : float
The normalized value of the specified feature to rescale back to the
original range.
operation : str {"two feature", "one feature"}, default="two feature"
The operation to perform. If `operation` is "two feature", then the
`norm_value` is assumed to be the coefficient of a linear regression model,
related to the specified `feature`. If `operation` is "one feature", then
the `norm_value` is assumed to be a single value normalized using
min-max scaling.
Returns
-------
float | Tuple[float, float]
If `operation` is "two feature", then a tuple containing the unscaled
coefficient and intercept of the linear regression model is returned.
If `operation` is "one feature", then the unscaled value of the specified
feature is returned.
Examples
--------
The example below shows how to use this method to rescale the coefficients
and intercept of a linear regression model.
>>> import numpy as np
>>> from sklearn.preprocessing import MinMaxScaler
>>> from sklearn.linear_model import Ridge
>>> X = np.array([[100, 400], [200, 300], [300, 200], [400, 100]])
>>> y = np.array([1, 2, 3, 4])
>>> scalers = {idx: MinMaxScaler().fit(X[:, idx].reshape(-1, 1)) for idx in range(X.shape[1])}
>>> X_norm = np.array(
... [scalers[idx].transform(X[:, idx].reshape(-1, 1)).reshape(-1)
... for idx in range(X.shape[1])]
... ).T
>>> print(X_norm)
[[0. 1. ]
[0.33333333 0.66666667]
[0.66666667 0.33333333]
[1. 0. ]]
>>> model = Ridge().fit(X_norm, y)
>>> print(X_norm[0].dot(model.coef_) + model.intercept_) # noqa
1.710526315789474
>>> # Same as model.predict(X_norm[0].reshape(1, -1))
>>> intercept = model.intercept_ # noqa
>>> coeff_unscaled = []
>>> for idx, coeff in enumerate(model.coef_): # noqa
... coeff, intercept_unscaled = unnormalize_feature(scalers, idx, coeff)
... intercept -= intercept_unscaled
... coeff_unscaled.append(coeff)
... print(X[0].dot(np.array(coeff_unscaled)) + intercept)
1.710526315789474
The next example demonstrates the use of this method to rescale a single
feature value that was normalized using min-max scaling:
>>> print(unnormalize_feature(scalers, "0", np.array([[X_norm[0, 0]]]), "one_feature"))
array([[100.]])
Notes
-----
The min-max scaling formula is as follows:
.. math::
X_{norm} = \\frac{X - X_{min}}{X_{max} - X_{min}}
The unscaled value for `X` can be obtained by rearranging the above formula:
.. math::
X = X_{norm} \\times (X_{max} - X_{min}) + X_{min}
See Also
--------
solver_ops.solver_operations.write_descriptive_contraints
Function that uses this method to rescale the coefficients and intercepts
before using them to define an LP optimization model.
"""
# scale_var = scalers[feature]
# if operation == "two feature":
# first_value = norm_value / (scale_var.data_max_[0]
# - scale_var.data_min_[0])
# second_value = scale_var.data_min_[0] * first_value
# return first_value, second_value
# return norm_value * scale_var.data_range_[0] + scale_var.data_min_[0]
if operation == "two feature":
# first_value = norm_value / (scale_var.data_max_[0] - scale_var.data_min_[0])
# second_value = scale_var.data_min_[0] * first_value
# return first_value, second_value
return norm_value, 0
# return norm_value * scale_var.data_range_[0] + scale_var.data_min_[0]
return norm_value
[docs]def string_in_list(string: str, list_strings: List[str]) -> bool:
"""Check if a string starts with any element in a list of strings.
This function iterates through the list of strings and checks if the given string
starts with any of the elements in the list. If there's a match, it returns `True`.
Otherwise, it returns `False`. If an empty string is provided, it returns `False`.
Parameters
----------
string : str
The string to be checked for starting substrings.
list_strings : List[str]
The list of strings that'll be checked as starting substrings of
the input string.
Returns
-------
bool
`True` if the input string starts with any element in the list,
`False` otherwise.
Examples
--------
>>> string_in_list("hello", ["hi", "hell"])
True
>>> string_in_list("world", ["wor", "earth"])
True
>>> string_in_list("", ["hi", "hello"])
False
>>> string_in_list("goodbye", ["hi", "hello"])
False
"""
if not string:
return False
for value in list_strings:
if string.startswith(value):
return True
return False
[docs]def read_json_dls(file_path, file_name):
"""Read a json file from ADLS.
Parameters
----------
file_path : str
The path to the json file.
file_name : str
The name of the json file.
Returns
-------
Dict[str, Any]
The data within the json file.
"""
# return load_adls_json(file_path, file_name)
return read_json(os.path.join(file_path, file_name))
[docs]def scaling_target_values(feature, scalers, lmin, lmax):
"""
Scale the lower and upper bounds of a target variable using their scaler.
Parameters
----------
feature : str
Target feature name.
scalers : Dict[str, MinMaxScaler]
Dictionary containing scaler objects, where the keys are feature names
and the values are their fitted `sklearn.preprocessing.MinMaxScaler`
objects.
lmin : float
The lower bound of the target feature.
lmax : float
The upper bound of the target feature.
Returns
-------
Tuple[float, float, str]
The scaled lower and upper bounds of the target feature, along with
the feature name.
Examples
--------
>>> from sklearn.preprocessing import MinMaxScaler
>>> import numpy as np
>>> data = np.array([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
>>> scaler = MinMaxScaler().fit(data.reshape(-1, 1))
>>> feature = "QUIM_CFIX_PP_L@08PR"
>>> scalers = {feature: scaler}
>>> lmin = 0.2
>>> lmax = 0.8
>>> result = scaling_target_values(feature, scalers, lmin, lmax)
>>> print(result)
(20.0, 80.0, 'cfix')
"""
# lmin = lmin * scalers[feature].data_range_[0] + scalers[feature].data_min_[0]
# lmax = lmax * scalers[feature].data_range_[0] + scalers[feature].data_min_[0]
if lmin > lmax:
logger.error(
"Feature %s has lmin > lmax: %s > %s. "
"Switching limits with one another. "
"Please check the data and problem "
"formulation and use output with caution.",
feature,
lmin,
lmax,
)
lmin, lmax = lmax, lmin
return lmin, lmax, constants.TARGETS_IN_MODEL[feature]
[docs]def replace_string_from_file(solver_path, range_min=None, range_max=None):
"""Replace "." by "," in the file.
Parameters
----------
solver_path : str
The path to the solver file.
range_min : int, optional
The minimum range value, by default None
range_max : int, optional
The maximum range value, by default None
"""
file = solver_path
if range_min and range_max:
file = os.path.join(
solver_path, f"restricoes-faixa-{range_min}-{range_max}.txt"
)
data = read_text(file, encoding="ISO-8859-1").replace(".", ",")
# data = read_text_file(file)
# with open(file, "w", encoding="utf-8") as constraint_file:
# constraint_file.write(data)
to_text(data, file)
[docs]def define_real_scalers(datasets: Dict[str, pd.DataFrame]):
"""Define a scaler for each variable from the `datasets`.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
The dictionary of datasets, Each key/value pair represents a model
and its corresponding dataset.
Returns
-------
Dict[str, MinMaxScaler]
The dictionary of scalers, with the new scalers added.
np.ndarray
A 2D array of values used to fit the scalers.
"""
# from wip.temporary import FakeScaler
all_columns = list(map(lambda x: list(datasets[x].columns), datasets))
all_columns = sum(all_columns, [])
all_columns = list(
filter(
lambda x: "qtde" not in x
and "NIVE1" not in x
and "SOMA_FUNC" not in x
and "SOMA FUNC" not in x,
all_columns,
)
)
scalers = {}
for column in set(all_columns):
if column.startswith("qtde") or column.startswith("SOMA FUNC"):
continue
all_df = list(filter(lambda x: column in datasets[x].columns, datasets))
scalers[column] = MinMaxScaler()
if len(all_df) == 1: # then this tag is in more than one dataset
scalers[column].fit(datasets[all_df[0]][[column]])
new_values = datasets[all_df[0]][column].values
else:
values = list(map(lambda x: list(datasets[x][column].values), all_df))
values = sum(values, [])
new_values = []
for value in values:
if isinstance(value, (np.int64, np.float64)):
new_values.append(value)
else:
new_values.extend(value)
if "Consumo de Energia (base minério úmido) kWh/ton" in column:
new_values.append(0)
new_values = np.array(new_values).reshape(-1, 1)
scalers[column].fit(new_values)
return scalers, new_values
[docs]def detect_file_encoding(filename: str | BufferedReader) -> str:
"""Detect the character encoding of a file or `BufferedReader`.
This function uses the `chardet` library to determine the character encoding of the
input file or `BufferedReader` object. The input can be a file path (as a string) or a
`BufferedReader` object.
Parameters
----------
filename : str | BufferedReader
File path (as a string) or a `BufferedReader` object for which the character
encoding needs to be detected.
Returns
-------
str
The detected character encoding of the input file or `BufferedReader`.
Examples
--------
>>> file_path = "example.txt"
>>> encoding = detect_file_encoding(file_path)
>>> print(encoding)
'utf-8'
Notes
-----
This function uses the `chardet` library to detect the character encoding of the
input file or `BufferedReader` object. The `chardet` library evaluates the file
content and returns the encoding along with a confidence score. The confidence score
means the certainty of the encoding being correct. The encoding with the highest
"""
if isinstance(filename, BufferedReader):
result = chardet.detect(filename.read())
return result['encoding']
with open(filename, 'rb') as rawdata:
result = chardet.detect(rawdata.read())
return result['encoding']
[docs]def fit_scalers_to_tag(tag_values: pd.Series) -> object:
"""Fit a `MinMaxScaler` to a tag's values.
Parameters
----------
tag_values : pd.Series
The `pandas.Series` containing the values of a tag.
Returns
-------
object
Fitted scaler.
Notes
-----
When fitting a `MinMaxScaler` to an array-like object that contains a single
column, the array-like object needs to be reshaped to a 2D array prior to
fitting the scaler with it. This function's main purpose is to provide a
simple shortcut to do so, without having to manually reshape the array-like
object every time.
"""
return MinMaxScaler().fit(tag_values.values.reshape(-1, 1))
[docs]def get_original_tag_name(otm_tag_name: str) -> str:
"""Get original tag name from OTM tag name.
Parameters
----------
otm_tag_name : str
OTM tag name
Returns
-------
str
Original tag name
Examples
--------
>>> get_original_tag_name("TEMP1_I@08QU_QU_855I_GQ16")
'TEMP1_I@08QU-QU-855I-GQ16'
>>> get_original_tag_name("cfix")
'cfix'
>>> get_original_tag_name('equalPQmult24div768divFUNC')
'=PQ*24/768/FUNC'
>>> get_original_tag_name('equal192divVELO')
'=192/VELO'
>>> get_original_tag_name('GRAN_OCS_16-18@08PE-BD-840I-01')
'GRAN_OCS_16-18@08PE-BD-840I-01'
>>> get_original_tag_name("POT_TOTAL_VENT___US8")
'POT TOTAL VENT - US8'
"""
sufix = None
prefix, *sufix = (
otm_tag_name.replace("cfix", "QUIM_CFIX_PP_L@08PR")
.replace("gas", "VAZA3_I@08QU-ST-855I-01")
.replace("NUM_FILTR_FUNC___US8", "NUM FILTR FUNC - US8")
.replace("GRAN_OCS_16_18", "GRAN_OCS_16-18")
.replace("GRAN_OCS_12_16", "GRAN_OCS_12-16")
.replace("GRAN_OCS_10_12", "GRAN_OCS_10-12")
.replace("GRAN_OCS_8_10", "GRAN_OCS_8-10")
.replace("GRAN_OCS_5_8", "GRAN_OCS_5-8")
.replace("POT_TOTAL_VENT___US8", "POT TOTAL VENT - US8")
.replace("MAIOR___MENOR_ALT_CAMADA", "MAIOR - MENOR ALT CAMADA")
.replace("PV_TEMP_GQ3_16_MED___US8", "PV TEMP GQ3-16-MED - US8")
.replace("GANHO_PRENSA___US8", "GANHO PRENSA - US8")
.replace("CONS_ESPEC_EE_VENT___US8", "CONS ESPEC EE VENT - US8")
.replace("DESV_MEDIO_ALT_CAMADA", "DESV MEDIO ALT CAMADA")
.replace("GRAN_OCS_10_16@08PE_BD_840I_01", "GRAN_OCS_10-16@08PE-BD-840I-01")
.replace("GRAN_OCS_10_16@08PE_BD_840I_02", "GRAN_OCS_10-16@08PE-BD-840I-02")
.replace("GRAN_OCS_10_16@08PE_BD_840I_03", "GRAN_OCS_10-16@08PE-BD-840I-03")
.replace("GRAN_OCS_10_16@08PE_BD_840I_04", "GRAN_OCS_10-16@08PE-BD-840I-04")
.replace("GRAN_OCS_10_16@08PE_BD_840I_05", "GRAN_OCS_10-16@08PE-BD-840I-05")
.replace("GRAN_OCS_10_16@08PE_BD_840I_06", "GRAN_OCS_10-16@08PE-BD-840I-06")
.replace("GRAN_OCS_10_16@08PE_BD_840I_07", "GRAN_OCS_10-16@08PE-BD-840I-07")
.replace("GRAN_OCS_10_16@08PE_BD_840I_08", "GRAN_OCS_10-16@08PE-BD-840I-08")
.replace("GRAN_OCS_10_16@08PE_BD_840I_09", "GRAN_OCS_10-16@08PE-BD-840I-09")
.replace("GRAN_OCS_10_16@08PE_BD_840I_10", "GRAN_OCS_10-16@08PE-BD-840I-10")
.replace("GRAN_OCS_10_16@08PE_BD_840I_11", "GRAN_OCS_10-16@08PE-BD-840I-11")
.replace("GRAN_OCS_10_16@08PE_BD_840I_12", "GRAN_OCS_10-16@08PE-BD-840I-12")
.replace("GRAN_PR", "GRAN_-0,045_PR_L@08FI")
.replace("SE_PP", "SUP_SE_PP_L@08PR")
.replace("SE_PR", "SUP_SE_PR_L@08FI")
.replace(
"__DIF_PRODUTIVI_EFETIVA___VIRTUAL___CALC___US8",
"10 - DIF PRODUTIVI EFETIVA - VIRTUAL - CALC - US8",
)
.replace("bomba_de_retorno_tanque", "bomba de retorno tanque")
.replace("floticor", "floticor")
.replace("media_GRAN_10_12", "media GRAN 10-12")
.replace("media_GRAN_16_18", "media GRAN 16-18")
.replace("media_GRAN_16_", "media GRAN 16+")
.replace("media_GRAN__5_8", "media GRAN _5-8")
.replace("media_de_densidade", "media de densidade")
.replace("media_press_1", "media press 1")
.replace("media_press_2", "media press 2")
.replace("media_press_3", "media press 3")
.replace("media_press_4", "media press 4")
.replace("media_temp_1", "media temp 1")
.replace("media_temp_2", "media temp 2")
.replace("media_temp_3", "media temp 3")
.replace("media_temp_4", "media temp 4")
.replace("media_tm", "media tm")
.replace("media_vel_de_disco_de_pel", "media vel de disco de pel")
.replace("mediana_de_rotacao", "mediana de rotacao")
.replace("relacao_gran", "rel_gran")
.replace(
"soma_balanca_bentonita_misturador", "soma balanca bentonita misturador"
)
.replace("soma_balanca_minerio_misturador", "soma balanca minerio misturador")
.replace("soma_balanca_retorno_correia", "soma balanca retorno correia")
.replace('div', '/')
.replace('mult', '*')
.replace('plus', '+')
.replace('minus', '-')
.replace('equal', '=')
.replace('___', ' - ')
.split('@')
)
if len(sufix) == 0:
return prefix
return prefix + '@' + (''.join(sufix).replace('___', ' - ').replace('_', '-'))