import copy
from typing import Any, Callable, Dict, List, Optional, no_type_check
from triad import ParamDict, to_uuid
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars, to_function, to_instance
from fugue._utils.interfaceless import parse_output_schema_from_comment
from fugue._utils.registry import fugue_plugin
from fugue.dataframe import DataFrame, DataFrames
from fugue.dataframe.function_wrapper import DataFrameFunctionWrapper
from fugue.exceptions import FugueInterfacelessError
from fugue.extensions.processor.processor import Processor
from .._utils import (
load_namespace_extensions,
parse_validation_rules_from_comment,
to_validation_rules,
)
_PROCESSOR_REGISTRY = ParamDict()
@fugue_plugin
def parse_processor(obj: Any) -> Any:
"""Parse an object to another object that can be converted to a Fugue
:class:`~fugue.extensions.processor.processor.Processor`.
.. admonition:: Examples
.. code-block:: python
from fugue import Processor, parse_processor, FugueWorkflow
from triad import to_uuid
class My(Processor):
def __init__(self, x):
self.x = x
def process(self, dfs):
raise NotImplementedError
def __uuid__(self) -> str:
return to_uuid(super().__uuid__(), self.x)
@parse_processor.candidate(
lambda x: isinstance(x, str) and x.startswith("-*"))
def _parse(obj):
return My(obj)
dag = FugueWorkflow()
dag.df([[0]], "a:int").process("-*abc")
# == dag.df([[0]], "a:int").process(My("-*abc"))
dag.run()
"""
if isinstance(obj, str) and obj in _PROCESSOR_REGISTRY:
return _PROCESSOR_REGISTRY[obj]
return obj
[docs]
def register_processor(alias: str, obj: Any, on_dup: int = ParamDict.OVERWRITE) -> None:
"""Register processor with an alias.
:param alias: alias of the processor
:param obj: the object that can be converted to
:class:`~fugue.extensions.processor.processor.Processor`
:param on_dup: see :meth:`triad.collections.dict.ParamDict.update`
, defaults to ``ParamDict.OVERWRITE``
.. tip::
Registering an extension with an alias is particularly useful for projects
such as libraries. This is because by using alias, users don't have to
import the specific extension, or provide the full path of the extension.
It can make user's code less dependent and easy to understand.
.. admonition:: New Since
:class: hint
**0.6.0**
.. seealso::
Please read
:doc:`Processor Tutorial <tutorial:tutorials/extensions/processor>`
.. admonition:: Examples
Here is an example how you setup your project so your users can
benefit from this feature. Assume your project name is ``pn``
The processor implementation in file ``pn/pn/processors.py``
.. code-block:: python
from fugue import DataFrame
def my_processor(df:DataFrame) -> DataFrame:
return df
Then in ``pn/pn/__init__.py``
.. code-block:: python
from .processors import my_processor
from fugue import register_processor
def register_extensions():
register_processor("mp", my_processor)
# ... register more extensions
register_extensions()
In users code:
.. code-block:: python
import pn # register_extensions will be called
from fugue import FugueWorkflow
dag = FugueWorkflow()
# use my_processor by alias
dag.df([[0]],"a:int").process("mp").show()
dag.run()
"""
_PROCESSOR_REGISTRY.update({alias: obj}, on_dup=on_dup)
[docs]
def processor(
schema: Any = None, **validation_rules: Any
) -> Callable[[Any], "_FuncAsProcessor"]:
"""Decorator for processors
Please read
:doc:`Processor Tutorial <tutorial:tutorials/extensions/processor>`
"""
# TODO: validation of schema if without * should be done at compile time
def deco(func: Callable) -> "_FuncAsProcessor":
return _FuncAsProcessor.from_func(
func, schema, validation_rules=to_validation_rules(validation_rules)
)
return deco
def _to_processor(
obj: Any,
schema: Any = None,
global_vars: Optional[Dict[str, Any]] = None,
local_vars: Optional[Dict[str, Any]] = None,
validation_rules: Optional[Dict[str, Any]] = None,
) -> Processor:
global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
load_namespace_extensions(obj)
obj = parse_processor(obj)
exp: Optional[Exception] = None
if validation_rules is None:
validation_rules = {}
try:
return copy.copy(
to_instance(obj, Processor, global_vars=global_vars, local_vars=local_vars)
)
except Exception as e:
exp = e
try:
f = to_function(obj, global_vars=global_vars, local_vars=local_vars)
# this is for string expression of function with decorator
if isinstance(f, Processor):
return copy.copy(f)
# this is for functions without decorator
return _FuncAsProcessor.from_func(f, schema, validation_rules=validation_rules)
except Exception as e:
exp = e
raise FugueInterfacelessError(f"{obj} is not a valid processor", exp)
class _FuncAsProcessor(Processor):
@property
def validation_rules(self) -> Dict[str, Any]:
return self._validation_rules # type: ignore
@no_type_check
def process(self, dfs: DataFrames) -> DataFrame:
args: List[Any] = []
kwargs: Dict[str, Any] = {}
if self._engine_param is not None:
args.append(self._engine_param.to_input(self.execution_engine))
if self._use_dfs:
args.append(dfs)
else:
if not dfs.has_key:
args += dfs.values()
else:
kwargs.update(dfs)
kwargs.update(self.params)
return self._wrapper.run(
args=args,
kwargs=kwargs,
output_schema=self.output_schema if self._need_output_schema else None,
ctx=self.execution_engine,
)
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self._wrapper(*args, **kwargs) # type: ignore
@no_type_check
def __uuid__(self) -> str:
return to_uuid(
self._wrapper,
self._engine_param,
self._use_dfs,
self._need_output_schema,
str(self._output_schema),
)
@no_type_check
@staticmethod
def from_func(
func: Callable, schema: Any, validation_rules: Dict[str, Any]
) -> "_FuncAsProcessor":
if schema is None:
schema = parse_output_schema_from_comment(func)
validation_rules.update(parse_validation_rules_from_comment(func))
tr = _FuncAsProcessor()
tr._wrapper = DataFrameFunctionWrapper(
func, "^e?(c|[dlspq]+)x*z?$", "^[dlspq]$"
) # type: ignore
tr._engine_param = (
tr._wrapper._params.get_value_by_index(0)
if tr._wrapper.input_code.startswith("e")
else None
)
tr._use_dfs = "c" in tr._wrapper.input_code
tr._need_output_schema = tr._wrapper.need_output_schema
tr._validation_rules = validation_rules
tr._output_schema = Schema(schema)
if len(tr._output_schema) == 0:
assert_or_throw(
tr._need_output_schema is None or not tr._need_output_schema,
FugueInterfacelessError(
f"schema must be provided for return type {tr._wrapper._rt}"
),
)
else:
assert_or_throw(
tr._need_output_schema is None or tr._need_output_schema,
FugueInterfacelessError(
f"schema must not be provided for return type {tr._wrapper._rt}"
),
)
return tr