Source code for iowa_forecast.utils

"""
General utility functions.
"""
from __future__ import annotations

from typing import List, Tuple
import re

import pandas as pd
import fnmatch

import google
from google.cloud import bigquery


[docs]def normalize_item_name(item_name: str) -> str: """ Convert 'item_name' values to lower case and replace spaces with underscores. Parameters ---------- item_name : str Item names to normalize. Returns ------- str Normalized item names. Examples -------- >>> normalize_item_name("TITOS HANDMADE VODKA") 'titos_handmade_vodka' Notes ----- Used to generate names for the different ARIMA models that are created for each unique item name. """ return item_name.lower().replace(' ', '_')
[docs]def split_table_name_info(table_name: str) -> Tuple[str | None, str | None, str]: """ Extract components from a table name. Parameters ---------- table_name : str Table name to extract components from. Returns ------- Tuple[str | None, str | None, str] A tuple containing the project ID, dataset ID and table name if any of these components are in the table name. If one of the components is not contained inside `table_name`, then they are returned as None. Examples -------- >>> split_table_name_info('my_project.my_dataset.my_table') ('my_project', 'my_dataset', 'my_table') >>> split_table_name_info('my_dataset.my_table') (None, 'my_dataset', 'my_table') >>> split_table_name_info('my_table') (None, None, 'my_table') """ table_components = table_name.split(".") if len(table_components) == 1: return None, None, table_components[0] if len(table_components) == 2: return None, table_components[0], table_components[1] if len(table_components) == 3: return table_components[0], table_components[1], table_components[2] raise ValueError( f"Table name contains more than three components: {table_name}" )
[docs]def create_bigquery_table_from_pandas( client: bigquery.Client, dataframe: pd.DataFrame, table_id: str, dataset_id="bqmlforecast", if_exists: str = "replace", ): """Create a BigQuery table from a pandas DataFrame. Parameters ---------- client : bigquery.Client BigQuery client used to connect to the service. dataframe : pd.DataFrame A `pandas.DataFrame` to load into the BigQuery table. table_id : str ID of the table to create in BigQuery. dataset_id : str, default="bqmlforecast" ID of the dataset where the table will be created. if_exists : {"fail", "replace", "append"}, default="replace" Behavior when the table already exists. Examples -------- >>> client = bigquery.Client() >>> dataframe = pd.DataFrame({'column1': [1, 2], 'column2': ['a', 'b']}) >>> create_bigquery_table_from_pandas(client, dataframe, 'my_table') """ project_id = client.project if dataset_id in table_id: table_id = table_id.replace(f"{dataset_id}.", "") if project_id in table_id: table_id = table_id.replace(f"{project_id}.", "") _project_id, _dataset_id, table_id = split_table_name_info(table_id) dataset_id = _dataset_id if _dataset_id is not None else dataset_id project_id = _project_id if _project_id is not None else project_id table_ref = client.dataset(dataset_id).table(table_id) if if_exists == "replace": client.delete_table(table_ref, not_found_ok=True) load_job = client.load_table_from_dataframe(dataframe, table_ref) elif if_exists == "append": load_job = client.load_table_from_dataframe(dataframe, table_ref) else: if client.get_table(table_ref): raise ValueError(f"Table {table_id} already exists in dataset {dataset_id}") load_job = client.load_table_from_dataframe(dataframe, table_ref) load_job.result()
[docs]def create_dataset_if_not_found( client: bigquery.Client, project_id: str | None = None, dataset_name: str = "bqmlforecast", location: str = "us", ): """ Create a BigQuery dataset if it does not exist. Parameters ---------- client : bigquery.Client BigQuery client used to connect to the service. project_id : str, optional ID of the project where the dataset will be created. If no value is provided, the Project ID gets inferred from the `project` attibute from `client`. dataset_name : str, default="bqmlforecast" Name of the dataset to create. location : str, default="us" Location of the dataset. Raises ------ Exception If any exception other than the error informing the dataset already exists. Examples -------- >>> client = bigquery.Client() >>> create_dataset_if_not_found(client, dataset_name='new_dataset') Dataset 'new_dataset' already exists. Notes ----- This function checks if the specified dataset exists in the given project. If it does not exist, the function creates the dataset. """ if project_id is None: project_id = client.project # Construct a full Dataset object to send to the API dataset_id = f"{project_id}.{dataset_name}" dataset = bigquery.Dataset(dataset_id) # noqa # Set the location dataset.location = location # Check if the dataset exists try: client.get_dataset(dataset_id) # Make an API request. print(f"Dataset '{dataset_name}' already exists.") except Exception as exc: if isinstance(exc, google.api_core.exceptions.NotFound): # noqa # Dataset does not exist, create it dataset = client.create_dataset(dataset) print(f"Created dataset '{dataset_name}'.") else: raise exc
[docs]def list_tables_with_pattern( client: bigquery.Client, dataset_id: str, table_pattern: str, project_id: str | None = None, ) -> List[str]: """ List BigQuery tables matching a specific pattern. Constructs a fully qualified dataset ID, retrieves the dataset, lists all tables, and filters them based on the provided pattern. Parameters ---------- client : bigquery.Client The BigQuery client used to interact with the service. dataset_id : str The ID of the dataset containing the tables to list. table_pattern : str The pattern to match against the table IDs. project_id : str, optional The ID of the project containing the dataset. If None, the client's project is used. Returns ------- List[str] A list of table IDs that match the specified pattern. Notes ----- The `fnmatch` module is used to filter tables based on the pattern. Ensure that the pattern provided is compatible with `fnmatch`. Examples -------- List all tables in a dataset that match the pattern 'sales_*': >>> client = bigquery.Client() >>> tables = list_tables_with_pattern(client, 'my_dataset', 'sales_*') >>> print(tables) ['sales_2021', 'sales_2022'] """ # Construct the fully qualified dataset ID project = client.project if project_id is None else project_id dataset_ref = f"{project}.{dataset_id}" # Get the dataset dataset = client.get_dataset(dataset_ref) # List all tables in the dataset tables = client.list_tables(dataset) # Filter tables based on the pattern using fnmatch matching_tables = [ table.table_id for table in tables if fnmatch.fnmatch(table.table_id, table_pattern) ] return matching_tables
[docs]def parse_combined_string(combined: str) -> dict: """Parse a combined offset string into its components. Parameters ---------- combined : str A combined string specifying the offset, e.g., `'2Y3M2W1D'`. Returns ------- dict A dictionary with keys `'years'`, `'months'`, `'weeks'`, `'days'` and their corresponding values. Raises ------ ValueError If the combined string is invalid. """ pattern = re.compile( r'(?P<years>\d+Y)?(?P<months>\d+M)?(?P<weeks>\d+W)?(?P<days>\d+D)?', re.IGNORECASE ) match = pattern.fullmatch(combined) if not match: raise ValueError(f"The specified `combined` string {combined} is not valid.") return {k: int(v[:-1]) if v else 0 for k, v in match.groupdict().items()}
[docs]def create_date_offset_from_parts(years=0, months=0, weeks=0, days=0) -> pd.DateOffset: """Create a `pandas.DateOffset` object from individual time components. Parameters ---------- years : int, default=0 Number of years for the offset. months : int, default=0 Number of months for the offset. weeks : int, default=0 Number of weeks for the offset. days : int, default=0 Number of days for the offset. Returns ------- pd.DateOffset A `pandas.DateOffset` object for the specified time components. """ return pd.DateOffset(years=years, months=months, weeks=weeks, days=days)
[docs]def date_offset(*args: Union[int, str], freq: str = None) -> pd.DateOffset: """ Generate a `pandas.DateOffset` based on the given frequency and value or a combined string. Parameters ---------- args : int or str * If one argument is provided, it should be a combined string specifying the offset, e.g., `'2Y3M2W1D'`. * If two arguments are provided, they should be `n` (int) and `freq` (str). freq : str {'days', 'weeks', 'months', 'years'}, optional The frequency type. Valid options are `'days'`, `'weeks'`, `'months'`, `'years'`. Ignored if `combined` is provided. Returns ------- pd.DateOffset A `pandas.DateOffset` object for the specified frequency and value. Raises ------ ValueError If `freq` is not one of the valid options or if the combined string is invalid. """ if len(args) == 1 and isinstance(args[0], str): combined = args[0] offset_parts = parse_combined_string(combined) return create_date_offset_from_parts(**offset_parts) if len(args) == 2 and isinstance(args[0], int) and isinstance(args[1], str): n, freq = args freq = freq.lower() valid_freqs = {"d": "days", "day": "days", "days": "days", "w": "weeks", "week": "weeks", "weeks": "weeks", "m": "months", "month": "months", "months": "months", "y": "years", "year": "years", "years": "years"} if freq not in valid_freqs: raise ValueError(f"The specified `freq` {freq} is not a valid frequency. " "Valid frequencies are: 'days', 'weeks', 'months', 'years'.") return create_date_offset_from_parts(**{valid_freqs[freq]: n}) raise ValueError( "Either provide a single combined string or both `n` and `freq` as arguments.")