Source code for wip.datatools.data_checks
"""
Module contains data quality check functions.
Data quality functions:
- `constant_tags_summary`: save a summary of tags with constant values to an Excel file.
- `find_last_non_missing_columns`: for every column in a dictionary of
dataframes, find the first non-null element of each column, in respect to
the last existing index.
The other functions present in this module are intended to be used internally by
the functions listed above.
"""
from __future__ import annotations
import datetime
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple
import pandas as pd
from wip.datatools.io_ops import to_csv, to_excel
from wip.logging_config import logger
from wip.utils import is_running_on_databricks
[docs]def get_constant_tags(
df_sql: pd.DataFrame,
datasets: Dict[str, pd.DataFrame],
window: str = "30D",
) -> List[str]:
"""
Return tags with constant values over a rolling window.
For the input `df_sql`, this function identifies columns/tags which have
standard deviation of zero over the specified rolling window.
It then returns those tags which are also present in the provided datasets.
Parameters
----------
df_sql : pd.DataFrame
Input dataframe which contains time series data.
datasets : Dict[str, pd.DataFrame]
Dictionary of dataframes. Each dataframe contains columns which
are potential tags to be checked for constancy in `df_sql`.
window : str, optional
Rolling window specification for which standard deviation is computed,
default is "30D" (30 days).
Returns
-------
List[str]
List of tags that have constant values over the specified rolling
window in `df_sql` and are present in `datasets`.
"""
columns = {col for df in datasets.values() for col in df.columns}
return (df_sql.rolling(window=window) # type: ignore
.std().iloc[-1].to_frame("std").query(
"std == 0").index.intersection(columns).to_list())
[docs]def constant_tags_summary(
df_sql: pd.DataFrame,
datasets: Dict[str, pd.DataFrame],
save_path: str | Path,
max_constant_columns: int | None = 100,
window: str = "30D",
):
"""
Identify and save summary of tags with constant values to an Excel or CSV file.
This function finds the tags in the `df_sql` dataframe which have constant
values over the specified rolling window. It then saves a summary to an
Excel or CSV file, depending on whether the function is being executed
inside Databricks or locally. If the number of constant tags exceeds the
limit specified by the parameter :param:`max_constant_columns`,
a `ValueError` exception is raised and the regression models training
process gets interrupted.
This function is intended to be used as a quality gate to the process.
If there's something wrong with the input data, it's better to interrupt the
execution and inform the error, rather than keep going and generate
results that are completely wrong.
Parameters
----------
df_sql : pd.DataFrame
Input dataframe which contains time series data.
datasets : Dict[str, pd.DataFrame]
Dictionary of dataframes. The last column of each dataframe represents
target columns to be checked for constancy in `df_sql`.
save_path : str | Path
The filepath where the Excel file with the summary of constant tags
will be saved.
max_constant_columns : int | None, default=100
The maximum number of constant columns allowed.
If the number of constant columns exceeds this limit,
a `ValueError` is raised. If set to None, this function only raises an
exception if all columns in `df_sql` have constant or missing values.
window : str, default="30D"
Rolling window specification for which standard deviation is computed,
default is "30D" (30 days).
Raises
------
ValueError
If the number of constant tags exceeds the `max_constant_columns` limit.
Notes
-----
This function uses the `get_constant_tags` function to determine the
tags with constant values. The resulting summary contains the number of
constant values, the percentage of constant values, the number of days the
value remains constant, and a boolean flag indicating if the tag is a target.
"""
column_count = df_sql.shape[1]
max_constant_columns = (max_constant_columns if isinstance(
max_constant_columns, int) else column_count)
errors_dict = create_errors_dict(datasets, df_sql, window)
errors_df = pd.DataFrame(
errors_dict.values(),
index=list(errors_dict.keys()),
columns=[
"Qde. Valores Constantes",
"% Valores Constantes",
"Qde. Dias",
"Target",
],
)
errors_df.index.name = "Tag"
error_count = errors_df.shape[0]
logger.warning(
"There are a total of %s tags with constant values in the input data",
error_count,
)
if not is_running_on_databricks():
save_path = Path(save_path).with_suffix(".xlsx")
save_path.parent.mkdir(exist_ok=True, parents=True)
logger.warning(
"Saving tags with more than '%s' constant values to: '%s'",
str(window).replace("D", " days"),
str(save_path),
)
to_excel(errors_df, save_path)
else:
save_path = Path(save_path).with_suffix(".csv")
logger.warning(
"Saving tags with more than '%s' constant values to: '%s'",
str(window).replace("D", " days"),
str(save_path),
)
to_csv(errors_df, save_path)
if error_count > max_constant_columns:
raise ValueError(
f"Number of constant tags {error_count:,} exceeds limit of "
f"{max_constant_columns:,} possible tags with constant values.\n"
"You can find a summary of which tags have constant "
f"values at: {str(save_path)!r}"
)
[docs]def create_errors_dict(
datasets: Dict[str, pd.DataFrame],
df_sql: pd.DataFrame,
window: str = "30D",
) -> Dict[str, List[int | str | int | bool]]:
"""
Generate a dictionary summary of tags with constant values.
This function evaluates tags in the `df_sql` which have constant values over
the specified rolling window and creates a summary dictionary.
Each key in the dictionary corresponds to a tag, and its associated value
is a list containing the number of constant values, the percentage of constant
values in relation to the total, the number of days the value remains constant,
and a boolean indicating if the tag is a target based on `datasets`.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
Dictionary of dataframes. The last column of each dataframe represents
target columns to be checked for constancy in `df_sql`.
df_sql : pd.DataFrame
Input dataframe which contains time series data.
window : str, default="30D"
Rolling window specification for which standard deviation is computed.
Returns
-------
dict
Dictionary where keys are tags from `df_sql` and the values are lists
containing:
- Index of the tag's last change.
- Percentage position of the index within the dataframe.
- Number of days since the last change.
- Boolean indicating if the tag is in the target columns of `datasets`.
Notes
-----
The function leverages the `get_constant_tags` function to identify the
tags with constant values.
"""
target_columns = {df.columns[-1] for df in datasets.values()}
row_count = df_sql.shape[0]
errors_dict = {}
for tag in get_constant_tags(df_sql, datasets, window):
last_date = df_sql[tag].index[-1]
index = -1
while (df_sql[tag].iloc[index] == df_sql[tag].iloc[-1]
and -1 * index < row_count):
index -= 1
index *= -1
index -= 1
number_of_days = (last_date - df_sql[tag].index[-1 * index]).days
errors_dict[tag] = [
index,
f"{index / row_count:.2%}",
number_of_days,
tag in target_columns,
]
return errors_dict
[docs]def find_last_non_missing_column(
tag_name: str,
df_sql_raw: pd.DataFrame,
) -> Tuple[datetime.datetime, datetime.datetime, pd.Timedelta]:
"""
Find the last non-missing value in a specific column of a pandas DataFrame.
Parameters
----------
tag_name : str
The name of the column to search for the last non-missing value.
df_sql_raw : pd.DataFrame
The pandas DataFrame to search within.
Returns
-------
Tuple[datetime, datetime, pd.Timedelta]
Function returns the following information:
- The last non-missing date in the specified column.
- The maximum datetime value in the DataFrame index.
- The difference between last_datetime and last_non_null_date.
Examples
--------
>>> tag_name = "column_name"
>>> df_sql_raw = pd.DataFrame({"column_name": [1, 2, None, 4, None, 6]})
>>> last_non_null_date, last_datetime, date_diff = find_last_non_missing_column(
... tag_name, df_sql_raw
... )
>>> print(last_non_null_date) # Output: 4
4
>>> print(last_datetime) # Output: 6
6
>>> print(date_diff) # Output: 2 days
2
"""
nrows = df_sql_raw.shape[0] - 1
while pd.isna(df_sql_raw[tag_name].iloc[nrows]):
nrows -= 1
last_non_null_date = df_sql_raw.index[nrows]
last_datetime = df_sql_raw.index.max()
date_diff = last_datetime - last_non_null_date
return last_non_null_date, last_datetime, date_diff
[docs]def find_last_non_missing_columns(
datasets: Dict[str, pd.DataFrame],
df_sql_raw: pd.DataFrame,
) -> pd.DataFrame:
"""
Find the last non-missing dates for each column from `datasets` dataframes.
The function returns a new DataFrame that contains each tag name,
last non-null date, and the difference between the last non-null date and
the last datetime for each column.
Parameters
----------
datasets : Dict[str, pd.DataFrame]
A dictionary where the keys are dataset names and the values are
pandas DataFrames. Each DataFrame represents a dataset and contains
columns with tag names.
df_sql_raw : pd.DataFrame
A pandas DataFrame that represents the raw SQL data.
It contains columns with tag names.
Returns
-------
pd.DataFrame
A pandas DataFrame that contains the tag name, last non-null date,
and date difference for each tag name in `df_sql_raw` that is also
present in the columns of the DataFrames in datasets.
"""
tags_used = {
tag_name
for df in datasets.values()
for tag_name in df.columns
}
days_missing = []
for tag_name in df_sql_raw.columns.intersection(tags_used):
last_non_null_date, _, date_diff = find_last_non_missing_column(
tag_name, df_sql_raw)
days_missing.append([tag_name, last_non_null_date, date_diff])
days_missing_df = pd.DataFrame(
days_missing, columns=["tag", "last_real_date", "date_diff"])
return days_missing_df