Source code for wip.utils

"""
Utility functions for general purpose tasks.

This module contains the following utility functions:

- `is_running_on_databricks`: Check if the code is running locally or on Databricks.
- `get_spark_context`: Get the Spark context.
- `find_filepath`: Find a file or folder in the `initial_dir` directory or
  its parent directories.
- `remove_files`: Remove files from a directory matching a specified pattern.
- `display_files`: Display tables of removed and not removed files in a given directory.

"""

from __future__ import annotations

import fnmatch
import glob
import inspect
import os
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Tuple

from pyspark.sql import SparkSession
from rich.console import Console
from rich.table import Table


[docs]def is_running_on_databricks() -> bool: """ Check if the code is running locally or on Azure Databricks. Function checks if the environment variable `DATABRICKS_RUNTIME_VERSION` exists. If it's, then the code is running on Azure Databricks. Returns ------- bool `True` if running on Azure Databricks, `False` otherwise. """ return 'DATABRICKS_RUNTIME_VERSION' in os.environ
[docs]def get_spark_context(): """Get the Spark context. Returns ------- pyspark.context.SparkContext The Spark context. """ if is_running_on_databricks(): _spark = SparkSession.builder.getOrCreate() return _spark.sparkContext raise RuntimeError("Spark Context is retrieved only when running on Databricks")
[docs]def find_filepath( filename: str | Path, initial_dir: str | Path | None = None, max_upper_dirs: int = 4 ) -> Path: """Find a file or folder in the `initial_dir` directory or its parent directories. Parameters ---------- filename : str | Path The filename to find. initial_dir : str | Path | None The initial directory to start searching from. If None, the current directory is used. max_upper_dirs : int, default=3 The maximum number of parent directories to search. Note that increasing the maximum number of parent directories to search can increase search time exponentially. Returns ------- Path The path to the file. Raises ------ FileNotFoundError If one of the following occurs: - If the file isn't found. - If the initial directory doesn't exist. - If the initial directory is a file. """ initial_dir = Path(initial_dir).resolve() if initial_dir is not None else Path.cwd() if initial_dir.is_file(): initial_dir = initial_dir.parent if not initial_dir.is_dir(): raise FileNotFoundError(f"Could not find directory: {initial_dir}") while max_upper_dirs > 0: filepaths_found = list(initial_dir.glob(f"**/{filename}")) if filepaths_found: return filepaths_found[0] initial_dir = initial_dir.parent max_upper_dirs -= 1 raise FileNotFoundError(f"Could not find file: {filename}")
[docs]def remove_files( directory: str | Path, pattern: str, verbose: bool = False ) -> Tuple[List[str], List[str]]: """ Remove files from a directory matching a specified pattern. This function attempts to delete files in a specified directory that match a given pattern. It returns lists of both removed and not removed files. If the directory does not exist or is not a directory, it logs an error. Parameters ---------- directory : str | Path The directory from which files are to be removed. Accepts either a string path or a `Path` object. pattern : str The pattern used to match files for removal, e.g., '*.txt', or the name of the file to remove. verbose : bool, default=False If True, displays tables of removed and not removed files. Returns ------- Tuple[List[str], List[str]] A tuple containing two lists: - The first list contains paths of files successfully removed - The second list contains paths of files that were not removed. Raises ------ Exception General exceptions are caught and logged if file removal fails. See Also -------- os.remove : For the removal of individual files. glob.glob : For a pattern matching of file paths. Examples -------- >>> remove_files("/path/to/dir", "*.txt") (['/path/to/dir/file1.txt', '/path/to/dir/file2.txt'], []) >>> remove_files("/path/to/dir", "**/*.txt") (['/path/to/dir/folder1/file1.txt', '/path/to/dir/folder2/file2.txt'], []) >>> remove_files("/path/to/dir", "file1.txt") (['/path/to/dir/file1.txt'], []) Notes ----- This function logs errors and exceptions using the `logger` from `wip.logging_config`. It uses `Path` from `pathlib` for path manipulations and checks. ..versionadded:: 2.4.0 Include the `remove_files_databricks` function for removing files from ABFSS paths in Databricks. """ # pylint: disable=import-outside-toplevel from wip.logging_config import logger if is_running_on_databricks(): return remove_files_databricks(directory, pattern, verbose) removed_files, not_removed_files = [], [] # Check if the directory exists if not Path(directory).resolve().exists(): logger.error("Directory %s does not exist.", str(directory)) return removed_files, not_removed_files if not Path(directory).resolve().is_dir(): logger.error("%s is not a directory.", str(directory)) return removed_files, not_removed_files # Construct a full path pattern full_path_pattern = os.path.join(str(directory), pattern) # Find files matching the pattern files_to_remove = glob.glob(full_path_pattern) for file in files_to_remove: try: os.remove(file) removed_files.append(file) except Exception as exc: # pylint: disable=broad-except not_removed_files.append(file) logger.exception(exc) if verbose: display_files(removed_files, not_removed_files) return removed_files, not_removed_files
[docs]def display_files(removed_files: List[str], not_removed_files: List[str]): """ Display tables of removed and not removed files in a given directory. This function creates and displays two tables: - One for files successfully removed - Table of files that were not removed from the specified directory. The tables include file names and directory paths. Parameters ---------- removed_files : List[str] List of file paths that were successfully removed. not_removed_files : List[str] List of file paths that were not removed. Notes ----- This function uses `rich.console.Console` and `rich.table.Table` for displaying the tables in a formatted manner. It relies on `logger` for logging the number of removed and not removed files. Examples -------- >>> display_files(["/path/to/dir/removed.txt"], []) # This will display a table of removed files. """ from wip.logging_config import logger # pylint: disable=import-outside-toplevel console = Console() for paths_list, table_name in zip( [removed_files, not_removed_files], ["Removed Files", "Not Removed Files"] ): if len(paths_list) <= 0: continue table = Table(title=table_name) table.add_column("File Name", justify="center", style="cyan", no_wrap=True) table.add_column("Directory", justify="right", no_wrap=True) for file in paths_list: table.add_row(Path(file).name, str(Path(file).parent)) logger.info("%s: %s", table_name, len(paths_list)) console.print(table)
[docs]def get_function_parameters(func: Callable) -> List[str]: """ Returns a list of parameter names accepted by a given function. Parameters ---------- func : Callable The function whose parameters are to be retrieved. Returns ------- List[str] A list of parameter names. """ signature = inspect.signature(func) return [param.name for param in signature.parameters.values()]
[docs]def get_function_kwargs( func: Callable, **kwargs ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Return a dictionary of keyword arguments accepted by a given function. Parameters ---------- func : Callable The function whose keyword arguments are to be retrieved. kwargs : Any Keyword arguments to pass to the function. Returns ------- Tuple[Dict[str, Any], Dict[str, Any]] A dictionary of keyword arguments accepted by a given function and another dictionary with the remaining keyword arguments. """ func_kwargs = {} for param_name in get_function_parameters(func): if param_name in kwargs: func_kwargs[param_name] = kwargs.pop(param_name) return func_kwargs, kwargs
[docs]def get_dbutils(): """Get the Databricks `dbutils` module. Returns ------- ModuleType The Databricks `dbutils` module, that contains modules like `fs`. """ spark = SparkSession.builder.getOrCreate() try: # pylint: disable=import-outside-toplevel from pyspark.dbutils import DBUtils # noqa dbutils = DBUtils(spark) except ImportError: # pylint: disable=import-outside-toplevel import IPython # noqa dbutils = IPython.get_ipython().user_ns["dbutils"] return dbutils
[docs]def dbutils_glob(pattern: str): """ Perform a glob-like pattern matching for files in ABFSS using `dbutils.fs`. Parameters ---------- pattern : str The glob pattern to match against file names. Supports '*' and '?' wildcards. Returns ------- List[str] A list of matched file paths in ABFSS. """ from wip.logging_config import logger # noqa if not is_running_on_databricks(): return glob.glob(pattern) dbutils = get_dbutils() def recursive_list_files(path: str, pattern: str): matched_files = [] try: items = dbutils.fs.ls(path) except Exception as exc: # pylint: disable=broad-except logger.exception(exc) logger.error("The directory %s does not exist", path) return matched_files for item in items: if item.isDir(): matched_files += recursive_list_files(item.path, pattern) elif fnmatch.fnmatch(item.name, pattern): matched_files.append(item.path) return matched_files directory, file_pattern = os.path.split(pattern) if directory.endswith("**"): directory = directory.replace("**", "") return recursive_list_files(directory, file_pattern)
[docs]def remove_files_databricks( directory: str | Path, pattern: str, verbose: bool = True ) -> Tuple[List[str], List[str]]: """ Remove files from a Storage Account container path in Databricks. This function attempts to delete files in a specified directory that match a given pattern. It returns lists of both removed and not removed files. If the directory does not exist or is not a directory, it logs an error. Parameters ---------- directory : str | Path The directory from which files are to be removed. Accepts either a string path or a `Path` object. pattern : str The pattern used to match files for removal, e.g., '*.txt', or the name of the file to remove. verbose : bool, default=False If True, displays tables of removed and not removed files. Returns ------- Tuple[List[str], List[str]] A tuple containing two lists: - The first list contains paths of files successfully removed - The second list contains paths of files that were not removed. Raises ------ Exception General exceptions are caught and logged if file removal fails. Notes ----- This function assumes it's running in a Databricks environment. It uses Databricks' `dbutils.fs` module to interact with ABFSS paths. .. versionchanged:: 2.8.9 Added a try/except clause to check if the path being accessed actually exists inside Azure Container. """ from wip.logging_config import logger # pylint: disable=import-outside-toplevel dbutils = get_dbutils() removed_files, not_removed_files = [], [] # Normalize the directory path directory = str(directory) # Check if the directory exists try: dbutils.fs.ls(directory) except Exception as exc: # pylint: disable=broad-except logger.exception(exc) logger.error("Directory '%s' does not exist.", directory) return removed_files, not_removed_files # Construct a full path pattern full_path_pattern = directory.rstrip('/') + '/' + pattern # Find files matching the pattern files_to_remove = dbutils_glob(full_path_pattern) for file in files_to_remove: try: # The second argument is for recursive deletion dbutils.fs.rm(file, True) removed_files.append(file) except Exception as exc: # pylint: disable=broad-except not_removed_files.append(file) logger.exception(exc) if verbose: display_files(removed_files, not_removed_files) return removed_files, not_removed_files
[docs]def exists(path: str | Path) -> bool: """Check if a file or directory exists locally or in DataBricks. Parameters ---------- path : str | Path The file path to check if it exists. Returns ------- bool Whether the file or directory exists. """ if not is_running_on_databricks(): return Path(path).exists() dbutils = get_dbutils() path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") return len(dbutils.fs.ls(path)) > 0