"""
Module with I/O operations that work locally and inside DataBricks interchangeably.
The functions in this module are intended to be used instead of the native Python
functions to read and write files, or `pandas` I/O functions, such as
`pandas.DataFrame.to_csv`, `pandas.DataFrame.to_excel`, and `pandas.read_csv`,
etc.
In other words, instead of using:
.. code-block:: python
import pandas as pd
df = pd.read_csv('path/to/file.csv')
Use:
.. code-block:: python
from wip.datatools.io_ops import read_csv
df = read_csv('path/to/file.csv')
The same logic applies to other functions, such as `pandas.DataFrame.to_csv`.
Notes
-----
All functions inside this module should be able to handle both local and
ABFS filepaths. In other words, they should be able to handle both:
.. code-block:: python
from wip.datatools.io_ops import read_csv
df = read_csv('path/to/file.csv')
# Or:
df = read_csv('abfss://insight@usazu1valesa001.dfs.core.windows.net/path/to/file.csv')
"""
from __future__ import annotations
import json
import os
import pickle
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import Dict
from typing import Tuple
import joblib
import pandas as pd
import pulp
from wip.constants import DATASETS_FILEPATH
from wip.constants import DF_SQL_FILEPATH
from wip.utils import get_dbutils
from wip.utils import get_function_kwargs
from wip.utils import get_spark_context
from wip.utils import is_running_on_databricks
NoneType = type(None)
[docs]def read_joblib(path: str | Path) -> Any:
"""
Read `.joblib` extension files from a local directory or DataBricks.
The Function determines whether the code is being executed locally or inside
DataBricks automatically, and determines how to read the files accordingly.
Parameters
----------
path : str | Path
The path to the `.joblib` extension file.
Returns
-------
Any
The `.joblib` file contents.
"""
if not is_running_on_databricks():
return joblib.load(path)
_sc = get_spark_context()
binary_data = _sc.binaryFiles(path).collect()
return joblib.load(BytesIO(binary_data[0][1]))
[docs]def to_joblib(obj: object, path: str | Path, **kwargs: Any):
"""
Save an object as a joblib file locally or to DataBricks.
The Function automatically detects if code is being executed locally or inside
DataBricks, and applies the necessary actions to save the object as a joblib
file based on where the code is being executed.
Parameters
----------
obj : object
Pandas DataFrame to save as a joblib file.
path : str | Path
Where to save the resulting joblib file.
kwargs : Any
Keyword arguments to pass to the `joblib.dump` method.
"""
if not is_running_on_databricks():
return joblib.dump(obj, path, **kwargs)
dump_path = '/dbfs/tmp/'
filename = Path(path).name
joblib.dump(obj, os.path.join(dump_path, filename), **kwargs)
dbutils = get_dbutils()
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split('/', 2)[-1], filename), path)
[docs]def read_local_datasets_df_sql(
) -> Tuple[Dict[str, pd.DataFrame], pd.DataFrame]:
"""Read the `datasets` and `df_sql` files from the local filesystem.
Returns
-------
Tuple[Dict[str, pd.DataFrame], pd.DataFrame]
A tuple with the `datasets` and `df_sql` files.
Raises
------
RuntimeError
If the code is being executed inside DataBricks.
"""
if is_running_on_databricks():
raise RuntimeError(
"This function should not be executed on DataBricks.")
return read_joblib(DATASETS_FILEPATH), read_joblib(DF_SQL_FILEPATH)
[docs]def read_json(path: str | Path, **kwargs: Any) -> dict | list:
"""
Read a JSON file and convert it to a Python object.
This Function works on both local and DataBricks environments.
Parameters
----------
path : str | Path
The file path where the JSON data is stored.
It can be either a string or a `pathlib.Path` object.
kwargs : Any
Additional keyword arguments. If 'encoding' is not specified,
it defaults to 'utf-8'.
Other kwargs are passed to the `open` and `json.load` functions.
Returns
-------
dict | list
The JSON data converted to a Python object.
"""
encoding = kwargs.pop("encoding", "utf-8")
func_kwargs, other_kwargs = get_function_kwargs(open, **kwargs)
if not is_running_on_databricks():
with open(path, "r", encoding=encoding, **func_kwargs) as fp:
func_kwargs, _ = get_function_kwargs(json.load, **other_kwargs)
return json.load(fp, **func_kwargs)
dbutils = get_dbutils()
dump_path = "/dbfs/tmp/"
filename = Path(path).name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
dbutils.fs.cp( # pylint: disable=E0602
path, os.path.join(dump_path.split("/", 2)[-1], filename))
with open(os.path.join(dump_path, filename),
"r",
encoding=encoding,
**func_kwargs) as fp:
func_kwargs, _ = get_function_kwargs(json.dump, **other_kwargs)
return json.load(fp, **func_kwargs)
[docs]def read_csv(path: str | Path, **kwargs: Any) -> pd.DataFrame:
"""
Read a CSV file and convert it to a `pandas.DataFrame`.
This Function works on both local and DataBricks environments.
Parameters
----------
path : str | Path
The file path where the CSV data is stored.
It can be either a string or a `pathlib.Path` object.
kwargs : Any
Additional keyword arguments passed to the `pandas.read_csv` function.
Returns
-------
pd.DataFrame
The CSV data converted to a `pandas.DataFrame`.
"""
if not is_running_on_databricks():
return pd.read_csv(path, **kwargs)
dbutils = get_dbutils()
dump_path = "/dbfs/tmp/"
filename = Path(path).name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
dbutils.fs.cp( # pylint: disable=E0602
path, os.path.join(dump_path.split("/", 2)[-1], filename))
return pd.read_csv(os.path.join(dump_path, filename), **kwargs)
[docs]def read_text(path: str | Path,
mode: str = "r",
encoding: str = "utf-8",
**kwargs: Any) -> str:
"""
Read a text file and convert it to a string.
This Function works on both local and DataBricks environments.
Parameters
----------
path : str | Path
The file path where the text data is stored.
It can be either a string or a `pathlib.Path` object.
mode : str, default="r"
The mode in which the file is opened.
Possible values are: 'r', 'r+', 'rb'.
encoding : str, default="utf-8"
The encoding to use to read the file.
Encoding ensures that the file is read correctly.
Possible values are: 'utf-8', 'utf-16', 'latin-1', etc.
See the Notes section for more information.
kwargs : Any
Additional keyword arguments. If 'encoding' is not specified,
it defaults to 'utf-8'.
Other kwargs are passed to the `open` and `json.load` functions.
Returns
-------
str
The text data converted to a string.
Notes
-----
For a list of standard encodings that Python support,
see: `https://docs.python.org/3.11/library/codecs.html#standard-encodings`
"""
if not is_running_on_databricks():
with open(Path(path).as_posix(), mode, encoding=encoding,
**kwargs) as fp:
return fp.read()
dbutils = get_dbutils()
dump_path = "/dbfs/tmp/"
filename = Path(path).name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
dbutils.fs.cp( # pylint: disable=E0602
path, os.path.join(dump_path.split("/", 2)[-1], filename))
with open(os.path.join(dump_path, filename),
mode,
encoding=encoding,
**kwargs) as fp:
return fp.read()
[docs]def write_lp(prob: pulp.LpProblem, key: str, tmp_path: str | Path) -> Any:
"""Write the linear programming problem to an `.lp` file.
Parameters
----------
prob : pulp.LpProblem
The linear programming problem.
key : str
The key to identify the problem.
tmp_path : str | Path
The path to the temporary directory.
"""
lp_files_path = Path(tmp_path).joinpath('lpfiles')
_lp_files_path = lp_files_path.joinpath(key.replace(
'restricoes', 'modelo')).with_suffix(".lp")
if not is_running_on_databricks():
lp_files_path.mkdir(parents=True, exist_ok=True)
return prob.writeLP(str(_lp_files_path))
dbutils = get_dbutils()
dump_path = "/dbfs/tmp/"
filename = Path(_lp_files_path).name
path = str(_lp_files_path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
prob.writeLP(os.path.join(dump_path, filename))
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split("/", 2)[-1], filename), path
)
[docs]def to_lp(prob: pulp.LpProblem, path: str | Path):
"""Write a linear programming problem to an `.lp` file.
Parameters
----------
prob : pulp.LpProblem
The linear programming problem.
path : str | Path
The path to the `.lp` file.
"""
path = Path(path).with_suffix(".lp")
if not is_running_on_databricks():
return prob.writeLP(str(path))
dbutils = get_dbutils()
dump_path = "/dbfs/tmp/"
filename = path.name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
prob.writeLP(os.path.join(dump_path, filename))
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_mps(prob: pulp.LpProblem, path: str | Path):
"""Write a linear programming problem to an `.mps` file.
Parameters
----------
prob : pulp.LpProblem
The linear programming problem.
path : str | Path
The path to the `.mps` file.
"""
path = Path(path).with_suffix(".mps")
if not is_running_on_databricks():
return prob.writeMPS(str(path))
dbutils = get_dbutils()
dump_path = "/dbfs/tmp/"
filename = path.name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
prob.writeMPS(os.path.join(dump_path, filename))
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_pickle(obj: object, path: str | Path, **kwargs: Any):
"""Save an object as a pickle file locally or to DataBricks.
Parameters
----------
obj : object
The object to save as a pickle file.
path : str | Path
The file path where the object is to be saved as pickle.
It can be either a string or a `pathlib.Path` object.
kwargs : Any
Additional keyword arguments.
This function saves objects in byte mode ("wb"),
therefore, no 'encoding' should be specified,
as this mode does not need one.
Other kwargs are passed to the `open` and `pickle.dump` functions.
"""
func_kwargs, other_kwargs = get_function_kwargs(open, **kwargs)
if not is_running_on_databricks():
with open(path, "wb", **func_kwargs) as fp:
func_kwargs, _ = get_function_kwargs(pickle.dump, **other_kwargs)
return pickle.dump(obj, fp, **func_kwargs)
dbutils = get_dbutils()
filename = Path(path).name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
dump_path = "/dbfs/tmp/"
with open(os.path.join(dump_path, filename), "wb", **func_kwargs) as fp:
func_kwargs, _ = get_function_kwargs(pickle.dump, **other_kwargs)
pickle.dump(obj, fp, **func_kwargs)
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_excel(data: pd.DataFrame, path: str | Path, **kwargs: Any):
"""
Save a `pandas.DataFrame` as Excel locally or to DataBricks.
The Function automatically detects if code is being executed locally or inside
DataBricks, and applies the necessary actions to save the results as Excel
files based on where the code is being executed.
Parameters
----------
data : pd.DataFrame
Pandas DataFrame to save as Excel file.
path : str | Path
Where to save the resulting Excel file.
kwargs : Any
Keyword arguments to pass to the `pandas.DataFrame.to_excel` method.
"""
if not is_running_on_databricks():
return data.to_excel(path, **kwargs)
dump_path = '/dbfs/tmp/'
filename = Path(path).name
data.to_csv(os.path.join(dump_path, filename), **kwargs)
dbutils = get_dbutils()
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split('/', 2)[-1], filename), path)
[docs]def to_json(
data: dict | list | str | int | float | bool | NoneType,
path: str | Path,
**kwargs: Any,
):
"""Convert and save data to a JSON file.
This function takes various data types, converts them into a JSON format, and
writes them to a file specified by the `path`. The function supports additional
keyword arguments that are passed to the file open function.
Parameters
----------
data : dict | list | str | int | float | bool | NoneType
The data to be converted to JSON. This can be a dictionary, list, string,
integer, float, boolean, or None.
path : str | Path
The file path where the JSON data should be stored. Can be a string or
Path object.
Other Parameters
----------------
**kwargs : Any
Additional keyword arguments. If 'encoding' is not specified,
it defaults to 'utf-8'.
Other kwargs are passed to the `open` and `json.dump` functions.
Examples
--------
>>> data = {"name": "John", "age": 30, "city": "New York"}
>>> to_json(data, 'path/to/file.json')
# This will save the data in JSON format in the specified file path.
>>> to_json(["apple", "banana", "cherry"], 'path/to/list.json', encoding='ascii')
# Saves the list as a JSON in ASCII encoding.
Notes
-----
The function uses `json.dump` for serialization.
Custom serialization can be handled by passing a custom `cls`
parameter in `kwargs` if needed.
Raises
------
TypeError
If the `data` cannot be serialized to JSON.
OSError
If there is an issue writing to the file.
"""
encoding = kwargs.pop("encoding", "utf-8")
func_kwargs, other_kwargs = get_function_kwargs(open, **kwargs)
if not is_running_on_databricks():
with open(path, "w", encoding=encoding, **func_kwargs) as fp:
func_kwargs, _ = get_function_kwargs(json.dump, **other_kwargs)
return json.dump(data, fp, **func_kwargs)
dbutils = get_dbutils()
filename = Path(path).name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
dump_path = "/dbfs/tmp/"
with open(os.path.join(dump_path, filename),
"w",
encoding=encoding,
**func_kwargs) as fp:
func_kwargs, _ = get_function_kwargs(json.dump, **other_kwargs)
json.dump(data, fp, **func_kwargs)
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split("/", 2)[-1], filename), path)
[docs]def to_csv(data: pd.DataFrame, path: str | Path, **kwargs: Any):
"""
Save a `pandas.DataFrame` as csv locally or to DataBricks.
The Function automatically detects if code is being executed locally or inside
DataBricks, and applies the necessary actions to save the results as csv
files based on where the code is being executed.
Parameters
----------
data : pd.DataFrame
Pandas DataFrame to save as csv file.
path : str | Path
Where to save the resulting Csv file.
kwargs : Any
Keyword arguments to pass to the `pandas.DataFrame.to_csv` method.
"""
if not is_running_on_databricks():
return data.to_csv(path, **kwargs)
dump_path = '/dbfs/tmp/'
filename = Path(path).name
sep = kwargs.pop('sep', ';')
encoding = kwargs.pop('encoding', 'ISO-8859-1')
index = kwargs.pop('index', False)
data.to_csv(
os.path.join(dump_path, filename),
sep=sep,
encoding=encoding,
index=index,
**kwargs,
)
dbutils = get_dbutils()
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split('/', 2)[-1], filename), path)
[docs]def to_text(data: str, path: str | Path, **kwargs: Any):
"""Save an object as a text file locally or to DataBricks.
Parameters
----------
data : str
The value to save as a text file.
path : str | Path
The file path where the object is to be saved as text.
It can be either a string or a `pathlib.Path` object.
kwargs : Any
Additional keyword arguments.
"""
mode = kwargs.pop("mode", "w")
encoding = kwargs.pop("encoding", "utf-8")
if not is_running_on_databricks():
with open(path, mode, encoding=encoding, **kwargs) as fp:
return fp.write(data)
dbutils = get_dbutils()
filename = Path(path).name
path = str(path)
if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
path = path.replace(r"abfss:/", r"abfss://")
dump_path = "/dbfs/tmp/"
with open(os.path.join(dump_path, filename),
mode,
encoding=encoding,
**kwargs) as fp:
fp.write(data)
return dbutils.fs.cp( # pylint: disable=E0602
os.path.join(dump_path.split("/", 2)[-1], filename), path)
# def to_mps(prob: pulp.LpProblem, path: str | Path):
# """
# Save a `pulp.LpProblem` instance to a `.mps` file locally or DataBricks.
#
# The Function automatically detects if code is being executed locally or inside
# DataBricks, and applies the necessary actions to save the optimization
# model instance as a `.mps` file based on where the code is being executed.
#
# Parameters
# ----------
# prob : pulp.LpProblem
# Optimization model instance to save as a `.mps` file.
# path : str | Path
# Where to save the resulting `.mps` file.
#
# Examples
# --------
# Save the optimization model as an `.mps` file to a conteiner inside Azure:
#
# >>> import pulp
# >>> datalake_path = "abfss://insight@usazu1valesa001.dfs.core.windows.net/Workarea/Pelletizing/Process_Optimization/Usina08/otimizacao"
# >>> filename = "700-750.mps"
# >>> # Define a dummy instance of an optimization model
# >>> prob = pulp.LpProblem("example", sense=pulp.LpMinimize)
# >>> to_mps(prob, f"{datalake_path}/{filename}")
#
# The above example saves the `prob` optimization model to the directory
# `"Workarea/Pelletizing/Process_Optimization/Usina08/otimizacao"`, inside
# the conteiner **"insight"** of the storage account **"usazu1valesa001"**
#
# .. attention::
#
# To save `.mps` files to the storage account, like in the previous example,
# you need to execute the code inside DataBricks
#
# Save the optimization model as an `.mps` file to your local directory:
#
# >>> to_mps(prob, "./700-750.mps")
#
# The above example saves the optimization problem instance as an `.mps`
# file on the current working directory, using the name `"700-750.mps"`.
# """
# path = Path(path).with_suffix(".mps")
# if not is_running_on_databricks():
# return prob.writeMPS(str(path))
#
# filename = path.name
# temporary_path = Path("/dbfs/tmp/us8").joinpath(filename)
#
# prob.writeMPS(str(temporary_path))
#
# dbutils = get_dbutils()
# path = str(path)
# if path.startswith(r"abfss:/") and not path.startswith(r"abfss://"):
# path = path.replace(r"abfss:/", r"abfss://")
#
# return dbutils.fs.cp( # pylint: disable=E0602
# os.path.join(temporary_path, filename), path
# )