Source code for wip.datatools.data_checks

"""
Module contains data quality check functions.

Data quality functions:

- `constant_tags_summary`: save a summary of tags with constant values to an Excel file.
- `find_last_non_missing_columns`: for every column in a dictionary of
  dataframes, find the first non-null element of each column, in respect to
  the last existing index.

The other functions present in this module are intended to be used internally by
the functions listed above.

"""
from __future__ import annotations

import datetime
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple

import pandas as pd

from wip.datatools.io_ops import to_csv, to_excel
from wip.logging_config import logger
from wip.utils import is_running_on_databricks


[docs]def get_constant_tags( df_sql: pd.DataFrame, datasets: Dict[str, pd.DataFrame], window: str = "30D", ) -> List[str]: """ Return tags with constant values over a rolling window. For the input `df_sql`, this function identifies columns/tags which have standard deviation of zero over the specified rolling window. It then returns those tags which are also present in the provided datasets. Parameters ---------- df_sql : pd.DataFrame Input dataframe which contains time series data. datasets : Dict[str, pd.DataFrame] Dictionary of dataframes. Each dataframe contains columns which are potential tags to be checked for constancy in `df_sql`. window : str, optional Rolling window specification for which standard deviation is computed, default is "30D" (30 days). Returns ------- List[str] List of tags that have constant values over the specified rolling window in `df_sql` and are present in `datasets`. """ columns = {col for df in datasets.values() for col in df.columns} return (df_sql.rolling(window=window) # type: ignore .std().iloc[-1].to_frame("std").query( "std == 0").index.intersection(columns).to_list())
[docs]def constant_tags_summary( df_sql: pd.DataFrame, datasets: Dict[str, pd.DataFrame], save_path: str | Path, max_constant_columns: int | None = 100, window: str = "30D", ): """ Identify and save summary of tags with constant values to an Excel or CSV file. This function finds the tags in the `df_sql` dataframe which have constant values over the specified rolling window. It then saves a summary to an Excel or CSV file, depending on whether the function is being executed inside Databricks or locally. If the number of constant tags exceeds the limit specified by the parameter :param:`max_constant_columns`, a `ValueError` exception is raised and the regression models training process gets interrupted. This function is intended to be used as a quality gate to the process. If there's something wrong with the input data, it's better to interrupt the execution and inform the error, rather than keep going and generate results that are completely wrong. Parameters ---------- df_sql : pd.DataFrame Input dataframe which contains time series data. datasets : Dict[str, pd.DataFrame] Dictionary of dataframes. The last column of each dataframe represents target columns to be checked for constancy in `df_sql`. save_path : str | Path The filepath where the Excel file with the summary of constant tags will be saved. max_constant_columns : int | None, default=100 The maximum number of constant columns allowed. If the number of constant columns exceeds this limit, a `ValueError` is raised. If set to None, this function only raises an exception if all columns in `df_sql` have constant or missing values. window : str, default="30D" Rolling window specification for which standard deviation is computed, default is "30D" (30 days). Raises ------ ValueError If the number of constant tags exceeds the `max_constant_columns` limit. Notes ----- This function uses the `get_constant_tags` function to determine the tags with constant values. The resulting summary contains the number of constant values, the percentage of constant values, the number of days the value remains constant, and a boolean flag indicating if the tag is a target. """ column_count = df_sql.shape[1] max_constant_columns = (max_constant_columns if isinstance( max_constant_columns, int) else column_count) errors_dict = create_errors_dict(datasets, df_sql, window) errors_df = pd.DataFrame( errors_dict.values(), index=list(errors_dict.keys()), columns=[ "Qde. Valores Constantes", "% Valores Constantes", "Qde. Dias", "Target", ], ) errors_df.index.name = "Tag" error_count = errors_df.shape[0] logger.warning( "There are a total of %s tags with constant values in the input data", error_count, ) if not is_running_on_databricks(): save_path = Path(save_path).with_suffix(".xlsx") save_path.parent.mkdir(exist_ok=True, parents=True) logger.warning( "Saving tags with more than '%s' constant values to: '%s'", str(window).replace("D", " days"), str(save_path), ) to_excel(errors_df, save_path) else: save_path = Path(save_path).with_suffix(".csv") logger.warning( "Saving tags with more than '%s' constant values to: '%s'", str(window).replace("D", " days"), str(save_path), ) to_csv(errors_df, save_path) if error_count > max_constant_columns: raise ValueError( f"Number of constant tags {error_count:,} exceeds limit of " f"{max_constant_columns:,} possible tags with constant values.\n" "You can find a summary of which tags have constant " f"values at: {str(save_path)!r}" )
[docs]def create_errors_dict( datasets: Dict[str, pd.DataFrame], df_sql: pd.DataFrame, window: str = "30D", ) -> Dict[str, List[int | str | int | bool]]: """ Generate a dictionary summary of tags with constant values. This function evaluates tags in the `df_sql` which have constant values over the specified rolling window and creates a summary dictionary. Each key in the dictionary corresponds to a tag, and its associated value is a list containing the number of constant values, the percentage of constant values in relation to the total, the number of days the value remains constant, and a boolean indicating if the tag is a target based on `datasets`. Parameters ---------- datasets : Dict[str, pd.DataFrame] Dictionary of dataframes. The last column of each dataframe represents target columns to be checked for constancy in `df_sql`. df_sql : pd.DataFrame Input dataframe which contains time series data. window : str, default="30D" Rolling window specification for which standard deviation is computed. Returns ------- dict Dictionary where keys are tags from `df_sql` and the values are lists containing: - Index of the tag's last change. - Percentage position of the index within the dataframe. - Number of days since the last change. - Boolean indicating if the tag is in the target columns of `datasets`. Notes ----- The function leverages the `get_constant_tags` function to identify the tags with constant values. """ target_columns = {df.columns[-1] for df in datasets.values()} row_count = df_sql.shape[0] errors_dict = {} for tag in get_constant_tags(df_sql, datasets, window): last_date = df_sql[tag].index[-1] index = -1 while (df_sql[tag].iloc[index] == df_sql[tag].iloc[-1] and -1 * index < row_count): index -= 1 index *= -1 index -= 1 number_of_days = (last_date - df_sql[tag].index[-1 * index]).days errors_dict[tag] = [ index, f"{index / row_count:.2%}", number_of_days, tag in target_columns, ] return errors_dict
[docs]def find_last_non_missing_column( tag_name: str, df_sql_raw: pd.DataFrame, ) -> Tuple[datetime.datetime, datetime.datetime, pd.Timedelta]: """ Find the last non-missing value in a specific column of a pandas DataFrame. Parameters ---------- tag_name : str The name of the column to search for the last non-missing value. df_sql_raw : pd.DataFrame The pandas DataFrame to search within. Returns ------- Tuple[datetime, datetime, pd.Timedelta] Function returns the following information: - The last non-missing date in the specified column. - The maximum datetime value in the DataFrame index. - The difference between last_datetime and last_non_null_date. Examples -------- >>> tag_name = "column_name" >>> df_sql_raw = pd.DataFrame({"column_name": [1, 2, None, 4, None, 6]}) >>> last_non_null_date, last_datetime, date_diff = find_last_non_missing_column( ... tag_name, df_sql_raw ... ) >>> print(last_non_null_date) # Output: 4 4 >>> print(last_datetime) # Output: 6 6 >>> print(date_diff) # Output: 2 days 2 """ nrows = df_sql_raw.shape[0] - 1 while pd.isna(df_sql_raw[tag_name].iloc[nrows]): nrows -= 1 last_non_null_date = df_sql_raw.index[nrows] last_datetime = df_sql_raw.index.max() date_diff = last_datetime - last_non_null_date return last_non_null_date, last_datetime, date_diff
[docs]def find_last_non_missing_columns( datasets: Dict[str, pd.DataFrame], df_sql_raw: pd.DataFrame, ) -> pd.DataFrame: """ Find the last non-missing dates for each column from `datasets` dataframes. The function returns a new DataFrame that contains each tag name, last non-null date, and the difference between the last non-null date and the last datetime for each column. Parameters ---------- datasets : Dict[str, pd.DataFrame] A dictionary where the keys are dataset names and the values are pandas DataFrames. Each DataFrame represents a dataset and contains columns with tag names. df_sql_raw : pd.DataFrame A pandas DataFrame that represents the raw SQL data. It contains columns with tag names. Returns ------- pd.DataFrame A pandas DataFrame that contains the tag name, last non-null date, and date difference for each tag name in `df_sql_raw` that is also present in the columns of the DataFrames in datasets. """ tags_used = { tag_name for df in datasets.values() for tag_name in df.columns } days_missing = [] for tag_name in df_sql_raw.columns.intersection(tags_used): last_non_null_date, _, date_diff = find_last_non_missing_column( tag_name, df_sql_raw) days_missing.append([tag_name, last_non_null_date, date_diff]) days_missing_df = pd.DataFrame( days_missing, columns=["tag", "last_real_date", "date_diff"]) return days_missing_df