fugue.extensions.processor

fugue.extensions.processor.convert

fugue.extensions.processor.convert.processor(schema=None, **validation_rules)[source]

Decorator for processors

Please read Processor Tutorial

Parameters:
  • schema (Any | None) –

  • validation_rules (Any) –

Return type:

Callable[[Any], _FuncAsProcessor]

fugue.extensions.processor.convert.register_processor(alias, obj, on_dup=0)[source]

Register processor with an alias.

Parameters:
Return type:

None

Tip

Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.

New Since

0.6.0

See also

Please read Processor Tutorial

Examples

Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is pn

The processor implementation in file pn/pn/processors.py

from fugue import DataFrame

def my_processor(df:DataFrame) -> DataFrame:
    return df

Then in pn/pn/__init__.py

from .processors import my_processor
from fugue import register_processor

def register_extensions():
    register_processor("mp", my_processor)
    # ... register more extensions

register_extensions()

In users code:

import pn  # register_extensions will be called
from fugue import FugueWorkflow

dag = FugueWorkflow()
# use my_processor by alias
dag.df([[0]],"a:int").process("mp").show()
dag.run()

fugue.extensions.processor.processor

class fugue.extensions.processor.processor.Processor[source]

Bases: ExtensionContext, ABC

The interface to process one or multiple incoming dataframes and return one DataFrame. For example dropping a column of df should be a type of Processor. Processor is task level extension, running on driver, and execution engine aware.

To implement this class, you should not have __init__, please directly implement the interface functions.

Note

Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Implementing Processor is commonly unnecessary. You can choose the interfaceless approach which may decouple your code from Fugue.

See also

Please read Processor Tutorial

abstract process(dfs)[source]

Process the collection of dataframes on driver side

Note

  • It runs on driver side

  • The dataframes are not necessarily local, for example a SparkDataFrame

  • It is engine aware, you can put platform dependent code in it (for example native pyspark code) but by doing so your code may not be portable. If you only use the functions of the general ExecutionEngine, it’s still portable.

Parameters:

dfs (DataFrames) – dataframe collection to process

Returns:

the result dataframe

Return type:

DataFrame