fugue.extensions.processor#
fugue.extensions.processor.convert#
- fugue.extensions.processor.convert.processor(schema=None, **validation_rules)[source]#
Decorator for processors
Please read Processor Tutorial
- Parameters
schema (Optional[Any]) –
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsProcessor]
- fugue.extensions.processor.convert.register_processor(alias, obj, on_dup=0)[source]#
Register processor with an alias.
- Parameters
alias (str) – alias of the processor
obj (Any) – the object that can be converted to
Processor
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read Processor Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The processor implementation in file
pn/pn/processors.py
from fugue import DataFrame def my_processor(df:DataFrame) -> DataFrame: return df
Then in
pn/pn/__init__.py
from .processors import my_processor from fugue import register_processor def register_extensions(): register_processor("mp", my_processor) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() # use my_processor by alias dag.df([[0]],"a:int").process("mp").show() dag.run()
fugue.extensions.processor.processor#
- class fugue.extensions.processor.processor.Processor[source]#
Bases:
ExtensionContext
,ABC
The interface to process one or multiple incoming dataframes and return one DataFrame. For example dropping a column of df should be a type of Processor. Processor is task level extension, running on driver, and execution engine aware.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Implementing Processor is commonly unnecessary. You can choose the interfaceless approach which may decouple your code from Fugue.
See also
Please read Processor Tutorial
- abstract process(dfs)[source]#
Process the collection of dataframes on driver side
Note
It runs on driver side
The dataframes are not necessarily local, for example a SparkDataFrame
It is engine aware, you can put platform dependent code in it (for example native pyspark code) but by doing so your code may not be portable. If you only use the functions of the general ExecutionEngine, it’s still portable.
- Parameters
dfs (DataFrames) – dataframe collection to process
- Returns
the result dataframe
- Return type