Source code for wip.datatools.io_ops

"""
Module with I/O operations that work locally and inside DataBricks interchangeably.

The functions in this module are intended to be used instead of the native Python
functions to read and write files, or `pandas` I/O functions, such as
`pandas.DataFrame.to_csv`, `pandas.DataFrame.to_excel`, and `pandas.read_csv`,
etc.

In other words, instead of using:

.. code-block:: python

    import pandas as pd
    df = pd.read_csv('path/to/file.csv')

Use:

.. code-block:: python

    from wip.datatools.io_ops import read_csv
    df = read_csv('path/to/file.csv')

The same logic applies to other functions, such as `pandas.DataFrame.to_csv`.

Notes
-----
All functions inside this module should be able to handle both local and
ABFS filepaths. In other words, they should be able to handle both:

.. code-block:: python

    from wip.datatools.io_ops import read_csv
    df = read_csv('path/to/file.csv')
    # Or:
    df = read_csv('abfss://insight@usazu1valesa001.dfs.core.windows.net/path/to/file.csv')

"""
from __future__ import annotations

import json
import os
import pickle
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import Dict
from typing import Tuple

import joblib
import pandas as pd
import pulp

from wip.constants import DATASETS_FILEPATH
from wip.constants import DF_SQL_FILEPATH
from wip.utils import get_dbutils
from wip.utils import get_function_kwargs
from wip.utils import get_spark_context
from wip.utils import is_running_on_databricks


NoneType = type(None)


[docs]def read_joblib(path: str | Path) -> Any: """ Read `.joblib` extension files from a local directory or DataBricks. The Function determines whether the code is being executed locally or inside DataBricks automatically, and determines how to read the files accordingly. Parameters ---------- path : str | Path The path to the `.joblib` extension file. Returns ------- Any The `.joblib` file contents. """ if not is_running_on_databricks(): return joblib.load(path) _sc = get_spark_context() binary_data = _sc.binaryFiles(path).collect() return joblib.load(BytesIO(binary_data[0][1]))
[docs]def to_joblib(obj: object, path: str | Path, **kwargs: Any): """ Save an object as a joblib file locally or to DataBricks. The Function automatically detects if code is being executed locally or inside DataBricks, and applies the necessary actions to save the object as a joblib file based on where the code is being executed. Parameters ---------- obj : object Pandas DataFrame to save as a joblib file. path : str | Path Where to save the resulting joblib file. kwargs : Any Keyword arguments to pass to the `joblib.dump` method. """ if not is_running_on_databricks(): return joblib.dump(obj, path, **kwargs) dump_path = '/dbfs/tmp/' filename = Path(path).name joblib.dump(obj, os.path.join(dump_path, filename), **kwargs) dbutils = get_dbutils() path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split('/', 2)[-1], filename), path)
[docs]def read_local_datasets_df_sql( ) -> Tuple[Dict[str, pd.DataFrame], pd.DataFrame]: """Read the `datasets` and `df_sql` files from the local filesystem. Returns ------- Tuple[Dict[str, pd.DataFrame], pd.DataFrame] A tuple with the `datasets` and `df_sql` files. Raises ------ RuntimeError If the code is being executed inside DataBricks. """ if is_running_on_databricks(): raise RuntimeError( "This function should not be executed on DataBricks.") return read_joblib(DATASETS_FILEPATH), read_joblib(DF_SQL_FILEPATH)
[docs]def read_json(path: str | Path, **kwargs: Any) -> dict | list: """ Read a JSON file and convert it to a Python object. This Function works on both local and DataBricks environments. Parameters ---------- path : str | Path The file path where the JSON data is stored. It can be either a string or a `pathlib.Path` object. kwargs : Any Additional keyword arguments. If 'encoding' is not specified, it defaults to 'utf-8'. Other kwargs are passed to the `open` and `json.load` functions. Returns ------- dict | list The JSON data converted to a Python object. """ encoding = kwargs.pop("encoding", "utf-8") func_kwargs, other_kwargs = get_function_kwargs(open, **kwargs) if not is_running_on_databricks(): with open(path, "r", encoding=encoding, **func_kwargs) as fp: func_kwargs, _ = get_function_kwargs(json.load, **other_kwargs) return json.load(fp, **func_kwargs) dbutils = get_dbutils() dump_path = "/dbfs/tmp/" filename = Path(path).name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") dbutils.fs.cp( # pylint: disable=E0602 path, os.path.join(dump_path.split("/", 2)[-1], filename)) with open(os.path.join(dump_path, filename), "r", encoding=encoding, **func_kwargs) as fp: func_kwargs, _ = get_function_kwargs(json.dump, **other_kwargs) return json.load(fp, **func_kwargs)
[docs]def read_csv(path: str | Path, **kwargs: Any) -> pd.DataFrame: """ Read a CSV file and convert it to a `pandas.DataFrame`. This Function works on both local and DataBricks environments. Parameters ---------- path : str | Path The file path where the CSV data is stored. It can be either a string or a `pathlib.Path` object. kwargs : Any Additional keyword arguments passed to the `pandas.read_csv` function. Returns ------- pd.DataFrame The CSV data converted to a `pandas.DataFrame`. """ if not is_running_on_databricks(): return pd.read_csv(path, **kwargs) dbutils = get_dbutils() dump_path = "/dbfs/tmp/" filename = Path(path).name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") dbutils.fs.cp( # pylint: disable=E0602 path, os.path.join(dump_path.split("/", 2)[-1], filename)) return pd.read_csv(os.path.join(dump_path, filename), **kwargs)
[docs]def read_text(path: str | Path, mode: str = "r", encoding: str = "utf-8", **kwargs: Any) -> str: """ Read a text file and convert it to a string. This Function works on both local and DataBricks environments. Parameters ---------- path : str | Path The file path where the text data is stored. It can be either a string or a `pathlib.Path` object. mode : str, default="r" The mode in which the file is opened. Possible values are: 'r', 'r+', 'rb'. encoding : str, default="utf-8" The encoding to use to read the file. Encoding ensures that the file is read correctly. Possible values are: 'utf-8', 'utf-16', 'latin-1', etc. See the Notes section for more information. kwargs : Any Additional keyword arguments. If 'encoding' is not specified, it defaults to 'utf-8'. Other kwargs are passed to the `open` and `json.load` functions. Returns ------- str The text data converted to a string. Notes ----- For a list of standard encodings that Python support, see: `https://docs.python.org/3.11/library/codecs.html#standard-encodings` """ if not is_running_on_databricks(): with open(Path(path).as_posix(), mode, encoding=encoding, **kwargs) as fp: return fp.read() dbutils = get_dbutils() dump_path = "/dbfs/tmp/" filename = Path(path).name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") dbutils.fs.cp( # pylint: disable=E0602 path, os.path.join(dump_path.split("/", 2)[-1], filename)) with open(os.path.join(dump_path, filename), mode, encoding=encoding, **kwargs) as fp: return fp.read()
[docs]def write_lp(prob: pulp.LpProblem, key: str, tmp_path: str | Path) -> Any: """Write the linear programming problem to an `.lp` file. Parameters ---------- prob : pulp.LpProblem The linear programming problem. key : str The key to identify the problem. tmp_path : str | Path The path to the temporary directory. """ lp_files_path = Path(tmp_path).joinpath('lpfiles') _lp_files_path = lp_files_path.joinpath(key.replace( 'restricoes', 'modelo')).with_suffix(".lp") if not is_running_on_databricks(): lp_files_path.mkdir(parents=True, exist_ok=True) return prob.writeLP(str(_lp_files_path)) dbutils = get_dbutils() dump_path = "/dbfs/tmp/" filename = Path(_lp_files_path).name path = str(_lp_files_path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") prob.writeLP(os.path.join(dump_path, filename)) return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split("/", 2)[-1], filename), path )
[docs]def to_lp(prob: pulp.LpProblem, path: str | Path): """Write a linear programming problem to an `.lp` file. Parameters ---------- prob : pulp.LpProblem The linear programming problem. path : str | Path The path to the `.lp` file. """ path = Path(path).with_suffix(".lp") if not is_running_on_databricks(): return prob.writeLP(str(path)) dbutils = get_dbutils() dump_path = "/dbfs/tmp/" filename = path.name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") prob.writeLP(os.path.join(dump_path, filename)) return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_mps(prob: pulp.LpProblem, path: str | Path): """Write a linear programming problem to an `.mps` file. Parameters ---------- prob : pulp.LpProblem The linear programming problem. path : str | Path The path to the `.mps` file. """ path = Path(path).with_suffix(".mps") if not is_running_on_databricks(): return prob.writeMPS(str(path)) dbutils = get_dbutils() dump_path = "/dbfs/tmp/" filename = path.name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") prob.writeMPS(os.path.join(dump_path, filename)) return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_pickle(obj: object, path: str | Path, **kwargs: Any): """Save an object as a pickle file locally or to DataBricks. Parameters ---------- obj : object The object to save as a pickle file. path : str | Path The file path where the object is to be saved as pickle. It can be either a string or a `pathlib.Path` object. kwargs : Any Additional keyword arguments. This function saves objects in byte mode ("wb"), therefore, no 'encoding' should be specified, as this mode does not need one. Other kwargs are passed to the `open` and `pickle.dump` functions. """ func_kwargs, other_kwargs = get_function_kwargs(open, **kwargs) if not is_running_on_databricks(): with open(path, "wb", **func_kwargs) as fp: func_kwargs, _ = get_function_kwargs(pickle.dump, **other_kwargs) return pickle.dump(obj, fp, **func_kwargs) dbutils = get_dbutils() filename = Path(path).name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") dump_path = "/dbfs/tmp/" with open(os.path.join(dump_path, filename), "wb", **func_kwargs) as fp: func_kwargs, _ = get_function_kwargs(pickle.dump, **other_kwargs) pickle.dump(obj, fp, **func_kwargs) return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_excel(data: pd.DataFrame, path: str | Path, **kwargs: Any): """ Save a `pandas.DataFrame` as Excel locally or to DataBricks. The Function automatically detects if code is being executed locally or inside DataBricks, and applies the necessary actions to save the results as Excel files based on where the code is being executed. Parameters ---------- data : pd.DataFrame Pandas DataFrame to save as Excel file. path : str | Path Where to save the resulting Excel file. kwargs : Any Keyword arguments to pass to the `pandas.DataFrame.to_excel` method. """ if not is_running_on_databricks(): return data.to_excel(path, **kwargs) dump_path = '/dbfs/tmp/' filename = Path(path).name data.to_csv(os.path.join(dump_path, filename), **kwargs) dbutils = get_dbutils() path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split('/', 2)[-1], filename), path)
[docs]def to_json( data: dict | list | str | int | float | bool | NoneType, path: str | Path, **kwargs: Any, ): """Convert and save data to a JSON file. This function takes various data types, converts them into a JSON format, and writes them to a file specified by the `path`. The function supports additional keyword arguments that are passed to the file open function. Parameters ---------- data : dict | list | str | int | float | bool | NoneType The data to be converted to JSON. This can be a dictionary, list, string, integer, float, boolean, or None. path : str | Path The file path where the JSON data should be stored. Can be a string or Path object. Other Parameters ---------------- **kwargs : Any Additional keyword arguments. If 'encoding' is not specified, it defaults to 'utf-8'. Other kwargs are passed to the `open` and `json.dump` functions. Examples -------- >>> data = {"name": "John", "age": 30, "city": "New York"} >>> to_json(data, 'path/to/file.json') # This will save the data in JSON format in the specified file path. >>> to_json(["apple", "banana", "cherry"], 'path/to/list.json', encoding='ascii') # Saves the list as a JSON in ASCII encoding. Notes ----- The function uses `json.dump` for serialization. Custom serialization can be handled by passing a custom `cls` parameter in `kwargs` if needed. Raises ------ TypeError If the `data` cannot be serialized to JSON. OSError If there is an issue writing to the file. """ encoding = kwargs.pop("encoding", "utf-8") func_kwargs, other_kwargs = get_function_kwargs(open, **kwargs) if not is_running_on_databricks(): with open(path, "w", encoding=encoding, **func_kwargs) as fp: func_kwargs, _ = get_function_kwargs(json.dump, **other_kwargs) return json.dump(data, fp, **func_kwargs) dbutils = get_dbutils() filename = Path(path).name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") dump_path = "/dbfs/tmp/" with open(os.path.join(dump_path, filename), "w", encoding=encoding, **func_kwargs) as fp: func_kwargs, _ = get_function_kwargs(json.dump, **other_kwargs) json.dump(data, fp, **func_kwargs) return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_csv(data: pd.DataFrame, path: str | Path, **kwargs: Any): """ Save a `pandas.DataFrame` as csv locally or to DataBricks. The Function automatically detects if code is being executed locally or inside DataBricks, and applies the necessary actions to save the results as csv files based on where the code is being executed. Parameters ---------- data : pd.DataFrame Pandas DataFrame to save as csv file. path : str | Path Where to save the resulting Csv file. kwargs : Any Keyword arguments to pass to the `pandas.DataFrame.to_csv` method. """ if not is_running_on_databricks(): return data.to_csv(path, **kwargs) dump_path = '/dbfs/tmp/' filename = Path(path).name sep = kwargs.pop('sep', ';') encoding = kwargs.pop('encoding', 'ISO-8859-1') index = kwargs.pop('index', False) data.to_csv( os.path.join(dump_path, filename), sep=sep, encoding=encoding, index=index, **kwargs, ) dbutils = get_dbutils() path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split('/', 2)[-1], filename), path)
[docs]def to_text(data: str, path: str | Path, **kwargs: Any): """Save an object as a text file locally or to DataBricks. Parameters ---------- data : str The value to save as a text file. path : str | Path The file path where the object is to be saved as text. It can be either a string or a `pathlib.Path` object. kwargs : Any Additional keyword arguments. """ mode = kwargs.pop("mode", "w") encoding = kwargs.pop("encoding", "utf-8") if not is_running_on_databricks(): with open(path, mode, encoding=encoding, **kwargs) as fp: return fp.write(data) dbutils = get_dbutils() filename = Path(path).name path = str(path) if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): path = path.replace(r"abfss:/", r"abfss://") dump_path = "/dbfs/tmp/" with open(os.path.join(dump_path, filename), mode, encoding=encoding, **kwargs) as fp: fp.write(data) return dbutils.fs.cp( # pylint: disable=E0602 os.path.join(dump_path.split("/", 2)[-1], filename), path)
# def to_mps(prob: pulp.LpProblem, path: str | Path): # """ # Save a `pulp.LpProblem` instance to a `.mps` file locally or DataBricks. # # The Function automatically detects if code is being executed locally or inside # DataBricks, and applies the necessary actions to save the optimization # model instance as a `.mps` file based on where the code is being executed. # # Parameters # ---------- # prob : pulp.LpProblem # Optimization model instance to save as a `.mps` file. # path : str | Path # Where to save the resulting `.mps` file. # # Examples # -------- # Save the optimization model as an `.mps` file to a conteiner inside Azure: # # >>> import pulp # >>> datalake_path = "abfss://insight@usazu1valesa001.dfs.core.windows.net/Workarea/Pelletizing/Process_Optimization/Usina08/otimizacao" # >>> filename = "700-750.mps" # >>> # Define a dummy instance of an optimization model # >>> prob = pulp.LpProblem("example", sense=pulp.LpMinimize) # >>> to_mps(prob, f"{datalake_path}/{filename}") # # The above example saves the `prob` optimization model to the directory # `"Workarea/Pelletizing/Process_Optimization/Usina08/otimizacao"`, inside # the conteiner **"insight"** of the storage account **"usazu1valesa001"** # # .. attention:: # # To save `.mps` files to the storage account, like in the previous example, # you need to execute the code inside DataBricks # # Save the optimization model as an `.mps` file to your local directory: # # >>> to_mps(prob, "./700-750.mps") # # The above example saves the optimization problem instance as an `.mps` # file on the current working directory, using the name `"700-750.mps"`. # """ # path = Path(path).with_suffix(".mps") # if not is_running_on_databricks(): # return prob.writeMPS(str(path)) # # filename = path.name # temporary_path = Path("/dbfs/tmp/us8").joinpath(filename) # # prob.writeMPS(str(temporary_path)) # # dbutils = get_dbutils() # path = str(path) # if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"): # path = path.replace(r"abfss:/", r"abfss://") # # return dbutils.fs.cp( # pylint: disable=E0602 # os.path.join(temporary_path, filename), path # )