Source code for fugue.extensions.creator.convert

import copy
from typing import Any, Callable, Dict, List, Optional, no_type_check

from triad import ParamDict
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars, to_function, to_instance
from triad.utils.hash import to_uuid

from fugue._utils.interfaceless import parse_output_schema_from_comment
from fugue._utils.registry import fugue_plugin
from fugue.dataframe import DataFrame
from fugue.dataframe.function_wrapper import DataFrameFunctionWrapper
from fugue.exceptions import FugueInterfacelessError
from fugue.extensions.creator.creator import Creator

from .._utils import load_namespace_extensions

_CREATOR_REGISTRY = ParamDict()


@fugue_plugin
def parse_creator(obj: Any) -> Any:
    """Parse an object to another object that can be converted to a Fugue
    :class:`~fugue.extensions.creator.creator.Creator`.

    .. admonition:: Examples

        .. code-block:: python

            from fugue import Creator, FugueWorkflow
            from fugue.plugins import parse_creator
            from triad import to_uuid

            class My(Creator):
                def __init__(self, x):
                    self.x = x

                def create(self) :
                    raise NotImplementedError

                def __uuid__(self) -> str:
                    return to_uuid(super().__uuid__(), self.x)

            @parse_creator.candidate(
                lambda x: isinstance(x, str) and x.startswith("-*"))
            def _parse(obj):
                return My(obj)

            dag = FugueWorkflow()
            dag.create("-*abc").show()
            # == dag.create(My("-*abc")).show()

            dag.run()
    """
    if isinstance(obj, str) and obj in _CREATOR_REGISTRY:
        return _CREATOR_REGISTRY[obj]
    return obj


[docs] def register_creator(alias: str, obj: Any, on_dup: int = ParamDict.OVERWRITE) -> None: """Register creator with an alias. This is a simplified version of :func:`~.parse_creator` :param alias: alias of the creator :param obj: the object that can be converted to :class:`~fugue.extensions.creator.creator.Creator` :param on_dup: see :meth:`triad.collections.dict.ParamDict.update` , defaults to ``ParamDict.OVERWRITE`` .. tip:: Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don't have to import the specific extension, or provide the full path of the extension. It can make user's code less dependent and easy to understand. .. admonition:: New Since :class: hint **0.6.0** .. seealso:: Please read :doc:`Creator Tutorial <tutorial:tutorials/extensions/creator>` .. admonition:: Examples Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is ``pn`` The creator implementation in file ``pn/pn/creators.py`` .. code-block:: python import pandas import pd def my_creator() -> pd.DataFrame: return pd.DataFrame() Then in ``pn/pn/__init__.py`` .. code-block:: python from .creators import my_creator from fugue import register_creator def register_extensions(): register_creator("mc", my_creator) # ... register more extensions register_extensions() In users code: .. code-block:: python import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() dag.create("mc").show() # use my_creator by alias dag.run() """ _CREATOR_REGISTRY.update({alias: obj}, on_dup=on_dup)
[docs] def creator(schema: Any = None) -> Callable[[Any], "_FuncAsCreator"]: """Decorator for creators Please read :doc:`Creator Tutorial <tutorial:tutorials/extensions/creator>` """ def deco(func: Callable) -> "_FuncAsCreator": return _FuncAsCreator.from_func(func, schema) return deco
def _to_creator( obj: Any, schema: Any = None, global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, ) -> Creator: global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars) load_namespace_extensions(obj) obj = parse_creator(obj) exp: Optional[Exception] = None try: return copy.copy( to_instance(obj, Creator, global_vars=global_vars, local_vars=local_vars) ) except Exception as e: exp = e try: f = to_function(obj, global_vars=global_vars, local_vars=local_vars) # this is for string expression of function with decorator if isinstance(f, Creator): return copy.copy(f) # this is for functions without decorator return _FuncAsCreator.from_func(f, schema) except Exception as e: exp = e raise FugueInterfacelessError(f"{obj} is not a valid creator", exp) class _FuncAsCreator(Creator): @no_type_check def create(self) -> DataFrame: args: List[Any] = [] kwargs: Dict[str, Any] = {} if self._engine_param is not None: args.append(self._engine_param.to_input(self.execution_engine)) kwargs.update(self.params) return self._wrapper.run( # type: ignore args=args, kwargs=kwargs, output_schema=self.output_schema if self._need_output_schema else None, ctx=self.execution_engine, ) def __call__(self, *args: Any, **kwargs: Any) -> Any: return self._wrapper(*args, **kwargs) # type: ignore @no_type_check def __uuid__(self) -> str: return to_uuid( self._wrapper, self._engine_param, self._need_output_schema, str(self._output_schema), ) @no_type_check @staticmethod def from_func(func: Callable, schema: Any) -> "_FuncAsCreator": # pylint: disable=W0201 if schema is None: schema = parse_output_schema_from_comment(func) tr = _FuncAsCreator() tr._wrapper = DataFrameFunctionWrapper( # type: ignore func, "^e?x*z?$", "^[dlspq]$" ) tr._engine_param = ( tr._wrapper._params.get_value_by_index(0) if tr._wrapper.input_code.startswith("e") else None ) tr._need_output_schema = tr._wrapper.need_output_schema tr._output_schema = Schema(schema) if len(tr._output_schema) == 0: assert_or_throw( tr._need_output_schema is None or not tr._need_output_schema, FugueInterfacelessError( f"schema must be provided for return type {tr._wrapper._rt}" ), ) else: assert_or_throw( tr._need_output_schema is None or tr._need_output_schema, FugueInterfacelessError( f"schema must not be provided for return type {tr._wrapper._rt}" ), ) return tr