Source code for wip.datatools.outlier_removal

"""
The `outlier_removal` module provides functions for removing outliers from data.

Functions:

- `identify_and_replace_outliers`: Identify and replace outliers in a DataFrame's
  numeric columns.
- `convert_columns_to_numeric`: Convert dataframe columns to numeric values.
- `knn_impute`: Impute missing values in a DataFrame using KNN imputation.
- `winsorize_outliers_except_last`: Winsorize outliers in all columns except
  the last column.
- `cleaner`: Handle outliers and perform KNN imputation.

Notes
-----
The main function of this module is the `cleaner` function.
The other functions act as auxiliary functions that are used inside the
`cleaner` function.

"""
from __future__ import annotations

from typing import Dict, List

import pandas as pd
from scipy.stats import mstats
from sklearn.impute import KNNImputer

from wip.logging_config import logger


[docs]def identify_and_replace_outliers( df: pd.DataFrame, columns: List[str] | None = None, exclude_columns: List[str] | None = None, threshold: float = 1.5, q1: float = 0.25, q2: float = 0.75, ) -> pd.DataFrame: """ Identify and replace outliers in a DataFrame's numeric columns. This function goes through each numeric column in a pandas DataFrame and replaces values that fall outside the interquartile range (IQR) defined threshold with the nearest value within the IQR threshold. The IQR is calculated for each column using specified quartiles, typically Q1 (25th percentile) and Q3 (75th percentile). Values below Q1 - (IQR * threshold) or above Q3 + (IQR * threshold) are considered outliers and are replaced. Parameters ---------- df : pd.DataFrame The DataFrame containing the data to process. columns : List[str] | None, default=None A list of column names to include in the outlier removal process. exclude_columns : List[str] | None, default=None A list of column names to exclude from the outlier removal process. threshold : float, default=1.5 The multiplier for IQR to define the cut-off beyond which values are considered outliers. q1 : float, default=0.25 The lower quartile to calculate IQR. Default is 0.25 (25th percentile). q2 : float, default=0.75 The upper quartile to calculate IQR. Default is 0.75 (75th percentile). Returns ------- pd.DataFrame DataFrame with outliers replaced by the nearest value within the acceptable range as defined by the IQR threshold. Raises ------ ValueError If either `q1` or `q2` is not a numeric value between 0 and 1. If the value of `q1` is greater than the value of `q2`. Examples -------- >>> import pandas as pd >>> data = {'value': [1, 2, 3, 4, 5, 100]} >>> df = pd.DataFrame(data) >>> cleaned_df = identify_and_replace_outliers(df) >>> print(cleaned_df) value 0 1.0 1 2.0 2 3.0 3 4.0 4 5.0 5 8.5 In the above example, the last value from `df` had its value replaced from 100 to 8.5 """ original_columns_order = df.columns if not all(isinstance(value, (float, int)) and 0 <= value <= 1 for value in [q1, q2]): raise ValueError( "Value for 'q1' and 'q2' must be a numeric value between 0 and 1. " f"Got q1={q1}, q2={q2}." ) if q1 > q2: raise ValueError( f"Value for 'q1' must greater than 'q2'. Got q1={q1} > q2={q2}." ) if isinstance(columns, list): if not all(col in df.columns for col in columns): raise KeyError( "Some of the specified columns do not exist inside 'df'." ) else: columns = df.select_dtypes(include=['float64', 'int64']).columns if isinstance(exclude_columns, list): columns = list(set(columns) - set(exclude_columns)) for column in columns: Q1 = df[column].quantile(q1) Q3 = df[column].quantile(q2) IQR = Q3 - Q1 lower_bound = Q1 - (IQR * threshold) upper_bound = Q3 + (IQR * threshold) df[column] = df[column].clip(lower=lower_bound, upper=upper_bound) return df[original_columns_order]
[docs]def convert_columns_to_numeric(df: pd.DataFrame) -> pd.DataFrame: """ Convert all columns in the DataFrame to numeric, coercing when necessary. Non-convertible values are set to NaN, then all NaNs in a column are filled with 0. This ensures the DataFrame is suitable for numerical operations and algorithms that require numeric input. Parameters ---------- df : pd.DataFrame The DataFrame to convert. Returns ------- pd.DataFrame The DataFrame with all columns converted to numeric types. """ for col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') for col in df.columns: if df[col].isnull().all(): df[col].fillna(0, inplace=True) logger.warning("Column '%s' is empty. Filling it with 0.", col) return df
[docs]def knn_impute(df: pd.DataFrame, n_neighbors: int = 30) -> pd.DataFrame: """Impute missing values in a DataFrame using KNN imputation. Parameters ---------- df : pd.DataFrame The `pandas.DataFrame` to impute. n_neighbors : int, default=30 The number of neighboring samples to use for imputation. Returns ------- pd.DataFrame A DataFrame with missing values imputed. """ imputer = KNNImputer(n_neighbors=n_neighbors) df = convert_columns_to_numeric(df) numerical_cols = df.columns imputed_data = imputer.fit_transform(df[numerical_cols]) df[numerical_cols] = imputed_data return df
[docs]def winsorize_outliers_except_last( df: pd.DataFrame, threshold: float = 0.05 ) -> pd.DataFrame: """ Winsorize outliers in all columns except the last, excluding 'floticor' and 'status'. Parameters ---------- df : pd.DataFrame The `pandas.DataFrame` to process. threshold : float, default=0.05 A fraction of the data to winsorize on both tails. Returns ------- pd.DataFrame The `pandas.DataFrame` with the columns winsorized. """ original_columns_order = df.columns columns_to_treat = [ col for col in df.columns[:-1] if col not in ['floticor', 'status'] ] for column in columns_to_treat: df[column] = mstats.winsorize(df[column], limits=[threshold, threshold]) return df[original_columns_order]
[docs]def cleaner( datasets: Dict[str, pd.DataFrame], threshold: float = 1.5, threshold_winsorize: float = 0.05, threshold_remove: float = 1.5, n_neighbors: int = 30, ) -> Dict[str, pd.DataFrame]: """Handle outliers and perform KNN imputation. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary of dataset names to DataFrames. threshold : float, default=1.5 IQR multiplier for outlier identification. threshold_winsorize : float, default=0.05 Data fraction for winsorization at both tails. threshold_remove : float, default=1.5 IQR multiplier to remove outliers from the last column. n_neighbors : int, default=30 Number of neighbors for KNN imputation. Returns ------- Dict[str, pd.DataFrame] Dictionary of cleaned DataFrames. """ special_treatment_datasets = [ *[f"rota_disco_{idx}" for idx in range(1, 13)], "SE PP", "basicidade", "energia_forno", "energia_moinho", "finos", "produtividade_filtragem", "relacao_gran", "umidade", ] cleaned_datasets = {} for key, df in datasets.items(): if not df.empty: if key in special_treatment_datasets: df_clean = identify_and_replace_outliers(df, threshold=threshold) else: df_clean = winsorize_outliers_except_last(df, threshold_winsorize) df_clean = identify_and_replace_outliers( df_clean, columns=[df_clean.columns[-1]], exclude_columns=['floticor', 'status'], threshold=threshold_remove, ) df_clean = knn_impute(df_clean, n_neighbors) cleaned_datasets[key] = df_clean else: logger.error("Dataset %s is empty. Skipping it.", key) cleaned_datasets[key] = df return cleaned_datasets