"""
The `outlier_removal` module provides functions for removing outliers from data.
Functions:
- `identify_and_replace_outliers`: Identify and replace outliers in a DataFrame's
numeric columns.
- `convert_columns_to_numeric`: Convert dataframe columns to numeric values.
- `knn_impute`: Impute missing values in a DataFrame using KNN imputation.
- `winsorize_outliers_except_last`: Winsorize outliers in all columns except
the last column.
- `cleaner`: Handle outliers and perform KNN imputation.
Notes
-----
The main function of this module is the `cleaner` function.
The other functions act as auxiliary functions that are used inside the
`cleaner` function.
"""
from __future__ import annotations
from typing import Dict, List
import pandas as pd
from scipy.stats import mstats
from sklearn.impute import KNNImputer
from wip.logging_config import logger
[docs]def identify_and_replace_outliers(
df: pd.DataFrame,
columns: List[str] | None = None,
exclude_columns: List[str] | None = None,
threshold: float = 1.5,
q1: float = 0.25,
q2: float = 0.75,
) -> pd.DataFrame:
"""
Identify and replace outliers in a DataFrame's numeric columns.
This function goes through each numeric column in a pandas DataFrame and
replaces values that fall outside the interquartile range (IQR) defined
threshold with the nearest value within the IQR threshold. The IQR is
calculated for each column using specified quartiles, typically Q1 (25th
percentile) and Q3 (75th percentile). Values below Q1 - (IQR * threshold) or
above Q3 + (IQR * threshold) are considered outliers and are replaced.
Parameters
----------
df : pd.DataFrame
The DataFrame containing the data to process.
columns : List[str] | None, default=None
A list of column names to include in the outlier removal process.
exclude_columns : List[str] | None, default=None
A list of column names to exclude from the outlier removal process.
threshold : float, default=1.5
The multiplier for IQR to define the cut-off beyond which values are
considered outliers.
q1 : float, default=0.25
The lower quartile to calculate IQR. Default is 0.25 (25th percentile).
q2 : float, default=0.75
The upper quartile to calculate IQR. Default is 0.75 (75th percentile).
Returns
-------
pd.DataFrame
DataFrame with outliers replaced by the nearest value within the
acceptable range as defined by the IQR threshold.
Raises
------
ValueError
If either `q1` or `q2` is not a numeric value between 0 and 1.
If the value of `q1` is greater than the value of `q2`.
Examples
--------
>>> import pandas as pd
>>> data = {'value': [1, 2, 3, 4, 5, 100]}
>>> df = pd.DataFrame(data)
>>> cleaned_df = identify_and_replace_outliers(df)
>>> print(cleaned_df)
value
0 1.0
1 2.0
2 3.0
3 4.0
4 5.0
5 8.5
In the above example, the last value from `df` had its value replaced
from 100 to 8.5
"""
original_columns_order = df.columns
if not all(isinstance(value, (float, int)) and 0 <= value <= 1 for value in [q1, q2]):
raise ValueError(
"Value for 'q1' and 'q2' must be a numeric value between 0 and 1. "
f"Got q1={q1}, q2={q2}."
)
if q1 > q2:
raise ValueError(
f"Value for 'q1' must greater than 'q2'. Got q1={q1} > q2={q2}."
)
if isinstance(columns, list):
if not all(col in df.columns for col in columns):
raise KeyError(
"Some of the specified columns do not exist inside 'df'."
)
else:
columns = df.select_dtypes(include=['float64', 'int64']).columns
if isinstance(exclude_columns, list):
columns = list(set(columns) - set(exclude_columns))
for column in columns:
Q1 = df[column].quantile(q1)
Q3 = df[column].quantile(q2)
IQR = Q3 - Q1
lower_bound = Q1 - (IQR * threshold)
upper_bound = Q3 + (IQR * threshold)
df[column] = df[column].clip(lower=lower_bound, upper=upper_bound)
return df[original_columns_order]
[docs]def convert_columns_to_numeric(df: pd.DataFrame) -> pd.DataFrame:
"""
Convert all columns in the DataFrame to numeric, coercing when necessary.
Non-convertible values are set to NaN, then all NaNs in a column are filled
with 0. This ensures the DataFrame is suitable for numerical operations and
algorithms that require numeric input.
Parameters
----------
df : pd.DataFrame
The DataFrame to convert.
Returns
-------
pd.DataFrame
The DataFrame with all columns converted to numeric types.
"""
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
for col in df.columns:
if df[col].isnull().all():
df[col].fillna(0, inplace=True)
logger.warning("Column '%s' is empty. Filling it with 0.", col)
return df
[docs]def knn_impute(df: pd.DataFrame, n_neighbors: int = 30) -> pd.DataFrame:
"""Impute missing values in a DataFrame using KNN imputation.
Parameters
----------
df : pd.DataFrame
The `pandas.DataFrame` to impute.
n_neighbors : int, default=30
The number of neighboring samples to use for imputation.
Returns
-------
pd.DataFrame
A DataFrame with missing values imputed.
"""
imputer = KNNImputer(n_neighbors=n_neighbors)
df = convert_columns_to_numeric(df)
numerical_cols = df.columns
imputed_data = imputer.fit_transform(df[numerical_cols])
df[numerical_cols] = imputed_data
return df
[docs]def winsorize_outliers_except_last(
df: pd.DataFrame, threshold: float = 0.05
) -> pd.DataFrame:
"""
Winsorize outliers in all columns except the last, excluding 'floticor' and 'status'.
Parameters
----------
df : pd.DataFrame
The `pandas.DataFrame` to process.
threshold : float, default=0.05
A fraction of the data to winsorize on both tails.
Returns
-------
pd.DataFrame
The `pandas.DataFrame` with the columns winsorized.
"""
original_columns_order = df.columns
columns_to_treat = [
col for col in df.columns[:-1] if col not in ['floticor', 'status']
]
for column in columns_to_treat:
df[column] = mstats.winsorize(df[column], limits=[threshold, threshold])
return df[original_columns_order]
[docs]def cleaner(
datasets: Dict[str, pd.DataFrame],
threshold: float = 1.5,
threshold_winsorize: float = 0.05,
threshold_remove: float = 1.5,
n_neighbors: int = 30,
) -> Dict[str, pd.DataFrame]:
"""Handle outliers and perform KNN imputation.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary of dataset names to DataFrames.
threshold : float, default=1.5
IQR multiplier for outlier identification.
threshold_winsorize : float, default=0.05
Data fraction for winsorization at both tails.
threshold_remove : float, default=1.5
IQR multiplier to remove outliers from the last column.
n_neighbors : int, default=30
Number of neighbors for KNN imputation.
Returns
-------
Dict[str, pd.DataFrame]
Dictionary of cleaned DataFrames.
"""
special_treatment_datasets = [
*[f"rota_disco_{idx}" for idx in range(1, 13)],
"SE PP",
"basicidade",
"energia_forno",
"energia_moinho",
"finos",
"produtividade_filtragem",
"relacao_gran",
"umidade",
]
cleaned_datasets = {}
for key, df in datasets.items():
if not df.empty:
if key in special_treatment_datasets:
df_clean = identify_and_replace_outliers(df, threshold=threshold)
else:
df_clean = winsorize_outliers_except_last(df, threshold_winsorize)
df_clean = identify_and_replace_outliers(
df_clean,
columns=[df_clean.columns[-1]],
exclude_columns=['floticor', 'status'],
threshold=threshold_remove,
)
df_clean = knn_impute(df_clean, n_neighbors)
cleaned_datasets[key] = df_clean
else:
logger.error("Dataset %s is empty. Skipping it.", key)
cleaned_datasets[key] = df
return cleaned_datasets