Source code for fugue.workflow.api
from typing import Any, Dict, List, Optional
from triad.utils.assertion import assert_or_throw
from ..collections.yielded import Yielded
from ..constants import FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT
from ..dataframe import DataFrame, AnyDataFrame
from ..dataframe.api import get_native_as_df
from ..exceptions import FugueInterfacelessError, FugueWorkflowCompileError
from ..execution import make_execution_engine
from .workflow import FugueWorkflow
def _check_valid_input(df: Any, save_path: Optional[str]) -> None:
# Check valid input
if isinstance(df, str):
assert_or_throw(
(".csv" not in df) and (".json" not in df),
FugueInterfacelessError(
"""Fugue transform can only load parquet file paths.
Csv and json are disallowed"""
),
)
if save_path:
assert_or_throw(
(".csv" not in save_path) and (".json" not in save_path),
FugueInterfacelessError(
"""Fugue transform can only load parquet file paths.
Csv and json are disallowed"""
),
)
[docs]
def transform( # noqa: C901
df: Any,
using: Any,
schema: Any = None,
params: Any = None,
partition: Any = None,
callback: Any = None,
ignore_errors: Optional[List[Any]] = None,
persist: bool = False,
as_local: bool = False,
save_path: Optional[str] = None,
checkpoint: bool = False,
engine: Any = None,
engine_conf: Any = None,
as_fugue: bool = False,
) -> Any:
"""Transform this dataframe using transformer. It's a wrapper of
:meth:`~fugue.workflow.workflow.FugueWorkflow.transform` and
:meth:`~fugue.workflow.workflow.FugueWorkflow.run`. It will let you do
the basic dataframe transformation without using
:class:`~fugue.workflow.workflow.FugueWorkflow` and
:class:`~fugue.dataframe.dataframe.DataFrame`. Also, only native
types are accepted for both input and output.
Please read |TransformerTutorial|
:param df: |DataFrameLikeObject| or :class:`~fugue.workflow.yielded.Yielded`
or a path string to a parquet file
:param using: transformer-like object, can't be a string expression
:param schema: |SchemaLikeObject|, defaults to None. The transformer
will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.output_schema`
:param params: |ParamsLikeObject| to run the processor, defaults to None.
The transformer will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.params`
:param partition: |PartitionLikeObject|, defaults to None
:param callback: |RPCHandlerLikeObject|, defaults to None
:param ignore_errors: list of exception types the transformer can ignore,
defaults to None (empty list)
:param engine: it can be empty string or null (use the default execution
engine), a string (use the registered execution engine), an
:class:`~fugue.execution.execution_engine.ExecutionEngine` type, or
the :class:`~fugue.execution.execution_engine.ExecutionEngine` instance
, or a tuple of two values where the first value represents execution
engine and the second value represents the sql engine (you can use ``None``
for either of them to use the default one), defaults to None
:param engine_conf: |ParamsLikeObject|, defaults to None
:param as_fugue: If true, the function will always return
a ``FugueDataFrame``, otherwise, if ``df`` is in native dataframe types such
as pandas dataframe, then the output will also return in its native format.
Defaults to False
:param persist: Whether to persist(materialize) the dataframe before returning
:param as_local: If true, the result will be converted to a ``LocalDataFrame``
:param save_path: Whether to save the output to a file (see the note)
:param checkpoint: Whether to add a checkpoint for the output (see the note)
:return: the transformed dataframe, if ``df`` is a native dataframe (e.g.
pd.DataFrame, spark dataframe, etc), the output will be a native dataframe,
the type is determined by the execution engine you use. But if ``df`` is
of type :class:`~fugue.dataframe.dataframe.DataFrame`, then the output will
also be a :class:`~fugue.dataframe.dataframe.DataFrame`
.. note::
This function may be lazy and return the transformed dataframe.
.. note::
When you use callback in this function, you must be careful that the output
dataframe must be materialized. Otherwise, if the real compute happens out of
the function call, the callback receiver is already shut down. To do that you
can either use ``persist`` or ``as_local``, both will materialize the dataframe
before the callback receiver shuts down.
.. note::
* When `save_path` is None and `checkpoint` is False, then the output will
not be saved into a file. The return will be a dataframe.
* When `save_path` is None and `checkpoint` is True, then the output is
saved into the path set by `fugue.workflow.checkpoint.path`, the name will
be randomly chosen, and it is NOT a deterministic checkpoint, so if you run
multiple times, the output will be saved into different files. The return
will be a dataframe.
* When `save_path` is not None and `checkpoint` is False, then the output will
be saved into `save_path`. The return will be the value of `save_path`
* When `save_path` is not None and `checkpoint` is True, then the output will
be saved into `save_path`. The return will be the dataframe from `save_path`
This function can only take parquet file paths in `df` and `save_path`.
Csv and other file formats are disallowed.
The checkpoint here is NOT deterministic, so re-run will generate new
checkpoints.
If you want to read and write other file formats or if you want to use
deterministic checkpoints, please use
:class:`~fugue.workflow.workflow.FugueWorkflow`.
"""
_check_valid_input(df, save_path)
dag = FugueWorkflow(compile_conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
try:
src = dag.create(df)
except FugueWorkflowCompileError:
if isinstance(df, str):
src = dag.load(df, fmt="parquet")
else:
raise
tdf = src.transform(
using=using,
schema=schema,
params=params,
pre_partition=partition,
callback=callback,
ignore_errors=ignore_errors or [],
)
if persist:
tdf = tdf.persist()
if checkpoint:
if save_path is None:
def _no_op_processor(df: DataFrame) -> DataFrame:
# this is a trick to force yielding again
# from the file to a dataframe
return df
tdf.yield_file_as("file_result")
tdf.process(_no_op_processor).yield_dataframe_as(
"result", as_local=as_local
)
else:
tdf.save_and_use(save_path, fmt="parquet").yield_dataframe_as(
"result", as_local=as_local
)
else:
if save_path is None:
tdf.yield_dataframe_as("result", as_local=as_local)
else:
tdf.save(save_path, fmt="parquet")
dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
if checkpoint:
result = dag.yields["result"].result # type:ignore
else:
if save_path is None:
result = dag.yields["result"].result # type:ignore
else:
return save_path
if as_fugue or isinstance(df, (DataFrame, Yielded)):
return result
return result.as_pandas() if result.is_local else result.native # type:ignore
[docs]
def out_transform(
df: Any,
using: Any,
params: Any = None,
partition: Any = None,
callback: Any = None,
ignore_errors: Optional[List[Any]] = None,
engine: Any = None,
engine_conf: Any = None,
) -> None:
"""Transform this dataframe using transformer. It's a wrapper of
:meth:`~fugue.workflow.workflow.FugueWorkflow.out_transform` and
:meth:`~fugue.workflow.workflow.FugueWorkflow.run`. It will let you do the
basic dataframe transformation without using
:class:`~fugue.workflow.workflow.FugueWorkflow` and
:class:`~fugue.dataframe.dataframe.DataFrame`. Only native types are
accepted for both input and output.
Please read |TransformerTutorial|
:param df: |DataFrameLikeObject| or :class:`~fugue.workflow.yielded.Yielded`
or a path string to a parquet file
:param using: transformer-like object, can't be a string expression
:param params: |ParamsLikeObject| to run the processor, defaults to None
The transformer will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.params`
:param partition: |PartitionLikeObject|, defaults to None
:param callback: |RPCHandlerLikeObject|, defaults to None
:param ignore_errors: list of exception types the transformer can ignore,
defaults to None (empty list)
:param engine: it can be empty string or null (use the default execution
engine), a string (use the registered execution engine), an
:class:`~fugue.execution.execution_engine.ExecutionEngine` type, or
the :class:`~fugue.execution.execution_engine.ExecutionEngine` instance
, or a tuple of two values where the first value represents execution
engine and the second value represents the sql engine (you can use ``None``
for either of them to use the default one), defaults to None
:param engine_conf: |ParamsLikeObject|, defaults to None
.. note::
This function can only take parquet file paths in `df`. CSV and JSON file
formats are disallowed.
This transformation is guaranteed to execute immediately (eager)
and return nothing
"""
dag = FugueWorkflow(compile_conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
try:
src = dag.create(df)
except FugueWorkflowCompileError:
if isinstance(df, str):
src = dag.load(df, fmt="parquet")
else:
raise
src.out_transform(
using=using,
params=params,
pre_partition=partition,
callback=callback,
ignore_errors=ignore_errors or [],
)
dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
[docs]
def raw_sql(
*statements: Any,
engine: Any = None,
engine_conf: Any = None,
as_fugue: bool = False,
as_local: bool = False,
) -> AnyDataFrame:
"""Run raw SQL on the execution engine
:param statements: a sequence of sub-statements in string
or dataframe-like objects
:param engine: an engine like object, defaults to None
:param engine_conf: the configs for the engine, defaults to None
:param as_fugue: whether to force return a Fugue DataFrame, defaults to False
:param as_local: whether return a local dataframe, defaults to False
:return: the result dataframe
.. caution::
Currently, only ``SELECT`` statements are supported
.. admonition:: Examples
.. code-block:: python
import pandas as pd
import fugue.api as fa
with fa.engine_context("duckdb"):
a = fa.as_fugue_df([[0,1]], schema="a:long,b:long")
b = pd.DataFrame([[0,10]], columns=["a","b"])
c = fa.raw_sql("SELECT * FROM",a,"UNION SELECT * FROM",b)
fa.as_pandas(c)
"""
dag = FugueWorkflow(compile_conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
sp: List[Any] = []
infer_by: List[Any] = []
inputs: Dict[int, Any] = {}
for x in statements:
if isinstance(x, str):
sp.append(x)
else:
if id(x) in inputs:
sp.append(inputs[id(x)])
else:
inputs[id(x)] = dag.create(x)
sp.append(inputs[id(x)])
infer_by.append(x)
engine = make_execution_engine(engine, engine_conf, infer_by=infer_by)
dag.select(*sp).yield_dataframe_as("result", as_local=as_local)
res = dag.run(engine)
return res["result"] if as_fugue else get_native_as_df(res["result"])