fugue.extensions.transformer#

fugue.extensions.transformer.constants#

fugue.extensions.transformer.convert#

fugue.extensions.transformer.convert.cotransformer(schema, **validation_rules)[source]#

Decorator for cotransformers

Please read the CoTransformer Tutorial

Parameters
  • schema (Any) –

  • validation_rules (Any) –

Return type

Callable[[Any], fugue.extensions.transformer.convert._FuncAsCoTransformer]

fugue.extensions.transformer.convert.output_cotransformer(**validation_rules)[source]#

Decorator for cotransformers

Please read the CoTransformer Tutorial

Parameters

validation_rules (Any) –

Return type

Callable[[Any], fugue.extensions.transformer.convert._FuncAsCoTransformer]

fugue.extensions.transformer.convert.output_transformer(**validation_rules)[source]#

Decorator for transformers

Please read the Transformer Tutorial

Parameters

validation_rules (Any) –

Return type

Callable[[Any], fugue.extensions.transformer.convert._FuncAsTransformer]

fugue.extensions.transformer.convert.register_output_transformer(alias, obj, on_dup=0)[source]#

Register output transformer with an alias.

Parameters
Return type

None

Tip

Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.

New Since

0.6.0

See also

Please read the Transformer Tutorial

Examples

Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is pn

The transformer implementation in file pn/pn/transformers.py

import pandas as pd

def my_transformer(df:pd.DataFrame) -> None:
    df.to_parquet("<unique_path>")

Then in pn/pn/__init__.py

from .transformers import my_transformer
from fugue import register_transformer

def register_extensions():
    register_transformer("mt", my_transformer)
    # ... register more extensions

register_extensions()

In users code:

import pn  # register_extensions will be called
from fugue import FugueWorkflow

dag = FugueWorkflow()
# use my_transformer by alias
dag.df([[0]],"a:int").out_transform("mt")
dag.run()
fugue.extensions.transformer.convert.register_transformer(alias, obj, on_dup=0)[source]#

Register transformer with an alias.

Parameters
Return type

None

Tip

Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.

New Since

0.6.0

See also

Please read the Transformer Tutorial

Examples

Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is pn

The transformer implementation in file pn/pn/transformers.py

import pandas as pd

# schema: *
def my_transformer(df:pd.DataFrame) -> pd.DataFrame:
    return df

Then in pn/pn/__init__.py

from .transformers import my_transformer
from fugue import register_transformer

def register_extensions():
    register_transformer("mt", my_transformer)
    # ... register more extensions

register_extensions()

In users code:

import pn  # register_extensions will be called
from fugue import FugueWorkflow

dag = FugueWorkflow()
# use my_transformer by alias
dag.df([[0]],"a:int").transform("mt").show()
dag.run()
fugue.extensions.transformer.convert.transformer(schema, **validation_rules)[source]#

Decorator for transformers

Please read the Transformer Tutorial

Parameters
  • schema (Any) –

  • validation_rules (Any) –

Return type

Callable[[Any], fugue.extensions.transformer.convert._FuncAsTransformer]

fugue.extensions.transformer.transformer#

class fugue.extensions.transformer.transformer.CoTransformer[source]#

Bases: fugue.extensions.context.ExtensionContext

The interface to process logical partitions of a zipped dataframe. A dataframe such as SparkDataFrame can be distributed. But this interface is about local process, scalability and throughput is not a concern of CoTransformer.

To implement this class, you should not have __init__, please directly implement the interface functions.

Note

Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Commonly, if you don’t need to implement on_init(), you can choose the interfaceless approach which may decouple your code from Fugue.

It’s important to understand Zip & Comap, and please also read the CoTransformer Tutorial

Due to similar issue on spark pickling ABC objects. This class is not ABC. If you encounter the similar issue, possible solution at

get_output_schema(dfs)[source]#

Generate the output schema on the driver side.

Note

  • This is running on driver

  • Currently, dfs is a collection of empty dataframes with the same structure and schemas

  • Normally, you should not consume this dataframe in this step, and you s hould only use its schema

  • You can access all properties except for cursor()

Parameters

dfs (fugue.dataframe.dataframes.DataFrames) – the collection of dataframes you are going to transform. They are empty dataframes with the same structure and schemas

Returns

Schema like object, should not be None or empty

Return type

Any

on_init(dfs)[source]#

Callback for initializing physical partition that contains one or multiple logical partitions. You may put expensive initialization logic here so you will not have to repeat that in transform()

Note

  • This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class

  • You can get physical partition no (if available from the execution egnine) from cursor()

  • Currently, dfs is a collection of empty dataframes with the same structure and schemas

Parameters

dfs (fugue.dataframe.dataframes.DataFrames) – a collection of empty dataframes with the same structure and schemas

Return type

None

transform(dfs)[source]#

The transformation logic from a collection of dataframes (with the same partition keys) to a local dataframe.

Note

  • This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class

Parameters

dfs (fugue.dataframe.dataframes.DataFrames) – a collection of dataframes with the same partition keys

Returns

transformed dataframe

Return type

fugue.dataframe.dataframe.LocalDataFrame

class fugue.extensions.transformer.transformer.OutputCoTransformer[source]#

Bases: fugue.extensions.transformer.transformer.CoTransformer

get_output_schema(dfs)[source]#

Generate the output schema on the driver side.

Note

  • This is running on driver

  • Currently, dfs is a collection of empty dataframes with the same structure and schemas

  • Normally, you should not consume this dataframe in this step, and you s hould only use its schema

  • You can access all properties except for cursor()

Parameters

dfs (fugue.dataframe.dataframes.DataFrames) – the collection of dataframes you are going to transform. They are empty dataframes with the same structure and schemas

Returns

Schema like object, should not be None or empty

Return type

Any

process(dfs)[source]#
Parameters

dfs (fugue.dataframe.dataframes.DataFrames) –

Return type

None

transform(dfs)[source]#

The transformation logic from a collection of dataframes (with the same partition keys) to a local dataframe.

Note

  • This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class

Parameters

dfs (fugue.dataframe.dataframes.DataFrames) – a collection of dataframes with the same partition keys

Returns

transformed dataframe

Return type

fugue.dataframe.dataframe.LocalDataFrame

class fugue.extensions.transformer.transformer.OutputTransformer[source]#

Bases: fugue.extensions.transformer.transformer.Transformer

get_output_schema(df)[source]#

Generate the output schema on the driver side.

Note

  • This is running on driver

  • This is the only function in this interface that is facing the entire DataFrame that is not necessarily local, for example a SparkDataFrame

  • Normally, you should not consume this dataframe in this step, and you s hould only use its schema

  • You can access all properties except for cursor()

Parameters

df (fugue.dataframe.dataframe.DataFrame) – the entire dataframe you are going to transform.

Returns

Schema like object, should not be None or empty

Return type

Any

process(df)[source]#
Parameters

df (fugue.dataframe.dataframe.LocalDataFrame) –

Return type

None

transform(df)[source]#

The transformation logic from one local dataframe to another local dataframe.

Note

  • This function operates on logical partition level

  • This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class

  • The input dataframe may be unbounded, but must be empty aware. It’s safe to consume it for ONLY ONCE

  • The input dataframe is never empty. Empty dataframes are skipped

Parameters

df (fugue.dataframe.dataframe.LocalDataFrame) – one logical partition to transform on

Returns

transformed dataframe

Return type

fugue.dataframe.dataframe.LocalDataFrame

class fugue.extensions.transformer.transformer.Transformer[source]#

Bases: fugue.extensions.context.ExtensionContext

The interface to process logical partitions of a dataframe. A dataframe such as SparkDataFrame can be distributed. But this interface is about local process, scalability and throughput is not a concern of Transformer.

To implement this class, you should not have __init__, please directly implement the interface functions.

Note

Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Commonly, if you don’t need to implement on_init(), you can choose the interfaceless approach which may decouple your code from Fugue.

It’s important to understand the Partition Tutorial, and please also read the Transformer Tutorial

Due to similar issue on spark pickling ABC objects. This class is not ABC. If you encounter the similar issue, possible solution at

get_output_schema(df)[source]#

Generate the output schema on the driver side.

Note

  • This is running on driver

  • This is the only function in this interface that is facing the entire DataFrame that is not necessarily local, for example a SparkDataFrame

  • Normally, you should not consume this dataframe in this step, and you s hould only use its schema

  • You can access all properties except for cursor()

Parameters

df (fugue.dataframe.dataframe.DataFrame) – the entire dataframe you are going to transform.

Returns

Schema like object, should not be None or empty

Return type

Any

on_init(df)[source]#

Callback for initializing physical partition that contains one or multiple logical partitions. You may put expensive initialization logic here so you will not have to repeat that in transform()

Note

  • This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class

  • You can get physical partition no (if available from the execution egnine) from cursor()

  • The input dataframe may be unbounded, but must be empty aware. That means you must not consume the df by any means, and you can not count. However you can safely peek the first row of the dataframe for multiple times.

  • The input dataframe is never empty. Empty physical partitions are skipped

Parameters

df (fugue.dataframe.dataframe.DataFrame) – the entire dataframe of this physical partition

Return type

None

transform(df)[source]#

The transformation logic from one local dataframe to another local dataframe.

Note

  • This function operates on logical partition level

  • This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class

  • The input dataframe may be unbounded, but must be empty aware. It’s safe to consume it for ONLY ONCE

  • The input dataframe is never empty. Empty dataframes are skipped

Parameters

df (fugue.dataframe.dataframe.LocalDataFrame) – one logical partition to transform on

Returns

transformed dataframe

Return type

fugue.dataframe.dataframe.LocalDataFrame