"""
Utility functions for general purpose tasks.
This module contains the following utility functions:
- `is_running_on_databricks`: Check if the code is running locally or on Databricks.
- `get_spark_context`: Get the Spark context.
- `find_filepath`: Find a file or folder in the `initial_dir` directory or
its parent directories.
- `remove_files`: Remove files from a directory matching a specified pattern.
- `display_files`: Display tables of removed and not removed files in a given directory.
"""
from __future__ import annotations
import fnmatch
import glob
import inspect
import os
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Tuple
from pyspark.sql import SparkSession
from rich.console import Console
from rich.table import Table
[docs]def is_running_on_databricks() -> bool:
"""
Check if the code is running locally or on Azure Databricks.
Function checks if the environment variable `DATABRICKS_RUNTIME_VERSION`
exists. If it's, then the code is running on Azure Databricks.
Returns
-------
bool
`True` if running on Azure Databricks, `False` otherwise.
"""
return 'DATABRICKS_RUNTIME_VERSION' in os.environ
[docs]def get_spark_context():
"""Get the Spark context.
Returns
-------
pyspark.context.SparkContext
The Spark context.
"""
if is_running_on_databricks():
_spark = SparkSession.builder.getOrCreate()
return _spark.sparkContext
raise RuntimeError("Spark Context is retrieved only when running on Databricks")
[docs]def find_filepath(
filename: str | Path, initial_dir: str | Path | None = None, max_upper_dirs: int = 4
) -> Path:
"""Find a file or folder in the `initial_dir` directory or its parent directories.
Parameters
----------
filename : str | Path
The filename to find.
initial_dir : str | Path | None
The initial directory to start searching from.
If None, the current directory is used.
max_upper_dirs : int, default=3
The maximum number of parent directories to search.
Note that increasing the maximum number of parent directories to search
can increase search time exponentially.
Returns
-------
Path
The path to the file.
Raises
------
FileNotFoundError
If one of the following occurs:
- If the file isn't found.
- If the initial directory doesn't exist.
- If the initial directory is a file.
"""
initial_dir = Path(initial_dir).resolve() if initial_dir is not None else Path.cwd()
if initial_dir.is_file():
initial_dir = initial_dir.parent
if not initial_dir.is_dir():
raise FileNotFoundError(f"Could not find directory: {initial_dir}")
while max_upper_dirs > 0:
filepaths_found = list(initial_dir.glob(f"**/{filename}"))
if filepaths_found:
return filepaths_found[0]
initial_dir = initial_dir.parent
max_upper_dirs -= 1
raise FileNotFoundError(f"Could not find file: {filename}")
[docs]def remove_files(
directory: str | Path, pattern: str, verbose: bool = False
) -> Tuple[List[str], List[str]]:
"""
Remove files from a directory matching a specified pattern.
This function attempts to delete files in a specified directory that match
a given pattern. It returns lists of both removed and not removed files.
If the directory does not exist or is not a directory, it logs an error.
Parameters
----------
directory : str | Path
The directory from which files are to be removed.
Accepts either a string path or a `Path` object.
pattern : str
The pattern used to match files for removal, e.g., '*.txt',
or the name of the file to remove.
verbose : bool, default=False
If True, displays tables of removed and not removed files.
Returns
-------
Tuple[List[str], List[str]]
A tuple containing two lists:
- The first list contains paths of files successfully removed
- The second list contains paths of files that were not removed.
Raises
------
Exception
General exceptions are caught and logged if file removal fails.
See Also
--------
os.remove : For the removal of individual files.
glob.glob : For a pattern matching of file paths.
Examples
--------
>>> remove_files("/path/to/dir", "*.txt")
(['/path/to/dir/file1.txt', '/path/to/dir/file2.txt'], [])
>>> remove_files("/path/to/dir", "**/*.txt")
(['/path/to/dir/folder1/file1.txt', '/path/to/dir/folder2/file2.txt'], [])
>>> remove_files("/path/to/dir", "file1.txt")
(['/path/to/dir/file1.txt'], [])
Notes
-----
This function logs errors and exceptions using the `logger` from
`wip.logging_config`. It uses `Path` from `pathlib` for path
manipulations and checks.
..versionadded:: 2.4.0
Include the `remove_files_databricks` function for removing files
from ABFSS paths in Databricks.
"""
# pylint: disable=import-outside-toplevel
from wip.logging_config import logger
if is_running_on_databricks():
return remove_files_databricks(directory, pattern, verbose)
removed_files, not_removed_files = [], []
# Check if the directory exists
if not Path(directory).resolve().exists():
logger.error("Directory %s does not exist.", str(directory))
return removed_files, not_removed_files
if not Path(directory).resolve().is_dir():
logger.error("%s is not a directory.", str(directory))
return removed_files, not_removed_files
# Construct a full path pattern
full_path_pattern = os.path.join(str(directory), pattern)
# Find files matching the pattern
files_to_remove = glob.glob(full_path_pattern)
for file in files_to_remove:
try:
os.remove(file)
removed_files.append(file)
except Exception as exc: # pylint: disable=broad-except
not_removed_files.append(file)
logger.exception(exc)
if verbose:
display_files(removed_files, not_removed_files)
return removed_files, not_removed_files
[docs]def display_files(removed_files: List[str], not_removed_files: List[str]):
"""
Display tables of removed and not removed files in a given directory.
This function creates and displays two tables:
- One for files successfully removed
- Table of files that were not removed from the specified directory.
The tables include file names and directory paths.
Parameters
----------
removed_files : List[str]
List of file paths that were successfully removed.
not_removed_files : List[str]
List of file paths that were not removed.
Notes
-----
This function uses `rich.console.Console` and `rich.table.Table` for displaying
the tables in a formatted manner. It relies on `logger` for logging the number
of removed and not removed files.
Examples
--------
>>> display_files(["/path/to/dir/removed.txt"], [])
# This will display a table of removed files.
"""
from wip.logging_config import logger # pylint: disable=import-outside-toplevel
console = Console()
for paths_list, table_name in zip(
[removed_files, not_removed_files], ["Removed Files", "Not Removed Files"]
):
if len(paths_list) <= 0:
continue
table = Table(title=table_name)
table.add_column("File Name", justify="center", style="cyan", no_wrap=True)
table.add_column("Directory", justify="right", no_wrap=True)
for file in paths_list:
table.add_row(Path(file).name, str(Path(file).parent))
logger.info("%s: %s", table_name, len(paths_list))
console.print(table)
[docs]def get_function_parameters(func: Callable) -> List[str]:
"""
Returns a list of parameter names accepted by a given function.
Parameters
----------
func : Callable
The function whose parameters are to be retrieved.
Returns
-------
List[str]
A list of parameter names.
"""
signature = inspect.signature(func)
return [param.name for param in signature.parameters.values()]
[docs]def get_function_kwargs(
func: Callable, **kwargs
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Return a dictionary of keyword arguments accepted by a given function.
Parameters
----------
func : Callable
The function whose keyword arguments are to be retrieved.
kwargs : Any
Keyword arguments to pass to the function.
Returns
-------
Tuple[Dict[str, Any], Dict[str, Any]]
A dictionary of keyword arguments accepted by a given function and
another dictionary with the remaining keyword arguments.
"""
func_kwargs = {}
for param_name in get_function_parameters(func):
if param_name in kwargs:
func_kwargs[param_name] = kwargs.pop(param_name)
return func_kwargs, kwargs
[docs]def get_dbutils():
"""Get the Databricks `dbutils` module.
Returns
-------
ModuleType
The Databricks `dbutils` module, that contains modules like `fs`.
"""
spark = SparkSession.builder.getOrCreate()
try:
# pylint: disable=import-outside-toplevel
from pyspark.dbutils import DBUtils # noqa
dbutils = DBUtils(spark)
except ImportError:
# pylint: disable=import-outside-toplevel
import IPython # noqa
dbutils = IPython.get_ipython().user_ns["dbutils"]
return dbutils
[docs]def dbutils_glob(pattern: str):
"""
Perform a glob-like pattern matching for files in ABFSS using `dbutils.fs`.
Parameters
----------
pattern : str
The glob pattern to match against file names.
Supports '*' and '?' wildcards.
Returns
-------
List[str]
A list of matched file paths in ABFSS.
"""
from wip.logging_config import logger # noqa
if not is_running_on_databricks():
return glob.glob(pattern)
dbutils = get_dbutils()
def recursive_list_files(path: str, pattern: str):
matched_files = []
try:
items = dbutils.fs.ls(path)
except Exception as exc: # pylint: disable=broad-except
logger.exception(exc)
logger.error("The directory %s does not exist", path)
return matched_files
for item in items:
if item.isDir():
matched_files += recursive_list_files(item.path, pattern)
elif fnmatch.fnmatch(item.name, pattern):
matched_files.append(item.path)
return matched_files
directory, file_pattern = os.path.split(pattern)
if directory.endswith("**"):
directory = directory.replace("**", "")
return recursive_list_files(directory, file_pattern)
[docs]def remove_files_databricks(
directory: str | Path, pattern: str, verbose: bool = True
) -> Tuple[List[str], List[str]]:
"""
Remove files from a Storage Account container path in Databricks.
This function attempts to delete files in a specified directory that match
a given pattern. It returns lists of both removed and not removed files.
If the directory does not exist or is not a directory, it logs an error.
Parameters
----------
directory : str | Path
The directory from which files are to be removed.
Accepts either a string path or a `Path` object.
pattern : str
The pattern used to match files for removal, e.g., '*.txt',
or the name of the file to remove.
verbose : bool, default=False
If True, displays tables of removed and not removed files.
Returns
-------
Tuple[List[str], List[str]]
A tuple containing two lists:
- The first list contains paths of files successfully removed
- The second list contains paths of files that were not removed.
Raises
------
Exception
General exceptions are caught and logged if file removal fails.
Notes
-----
This function assumes it's running in a Databricks environment.
It uses Databricks' `dbutils.fs` module to interact with ABFSS paths.
.. versionchanged:: 2.8.9
Added a try/except clause to check if the path being accessed
actually exists inside Azure Container.
"""
from wip.logging_config import logger # pylint: disable=import-outside-toplevel
dbutils = get_dbutils()
removed_files, not_removed_files = [], []
# Normalize the directory path
directory = str(directory)
# Check if the directory exists
try:
dbutils.fs.ls(directory)
except Exception as exc: # pylint: disable=broad-except
logger.exception(exc)
logger.error("Directory '%s' does not exist.", directory)
return removed_files, not_removed_files
# Construct a full path pattern
full_path_pattern = directory.rstrip('/') + '/' + pattern
# Find files matching the pattern
files_to_remove = dbutils_glob(full_path_pattern)
for file in files_to_remove:
try:
# The second argument is for recursive deletion
dbutils.fs.rm(file, True)
removed_files.append(file)
except Exception as exc: # pylint: disable=broad-except
not_removed_files.append(file)
logger.exception(exc)
if verbose:
display_files(removed_files, not_removed_files)
return removed_files, not_removed_files
[docs]def exists(path: str | Path) -> bool:
"""Check if a file or directory exists locally or in DataBricks.
Parameters
----------
path : str | Path
The file path to check if it exists.
Returns
-------
bool
Whether the file or directory exists.
"""
if not is_running_on_databricks():
return Path(path).exists()
dbutils = get_dbutils()
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
return len(dbutils.fs.ls(path)) > 0