fugue.extensions.transformer#
fugue.extensions.transformer.constants#
fugue.extensions.transformer.convert#
- fugue.extensions.transformer.convert.cotransformer(schema, **validation_rules)[source]#
Decorator for cotransformers
Please read the CoTransformer Tutorial
- Parameters
schema (Any) –
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsCoTransformer]
- fugue.extensions.transformer.convert.output_cotransformer(**validation_rules)[source]#
Decorator for cotransformers
Please read the CoTransformer Tutorial
- Parameters
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsCoTransformer]
- fugue.extensions.transformer.convert.output_transformer(**validation_rules)[source]#
Decorator for transformers
Please read the Transformer Tutorial
- Parameters
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsTransformer]
- fugue.extensions.transformer.convert.register_output_transformer(alias, obj, on_dup=0)[source]#
Register output transformer with an alias.
- Parameters
alias (str) – alias of the transformer
obj (Any) – the object that can be converted to
OutputTransformer
orOutputCoTransformer
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read the Transformer Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The transformer implementation in file
pn/pn/transformers.py
import pandas as pd def my_transformer(df:pd.DataFrame) -> None: df.to_parquet("<unique_path>")
Then in
pn/pn/__init__.py
from .transformers import my_transformer from fugue import register_transformer def register_extensions(): register_transformer("mt", my_transformer) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() # use my_transformer by alias dag.df([[0]],"a:int").out_transform("mt") dag.run()
- fugue.extensions.transformer.convert.register_transformer(alias, obj, on_dup=0)[source]#
Register transformer with an alias.
- Parameters
alias (str) – alias of the transformer
obj (Any) – the object that can be converted to
Transformer
orCoTransformer
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read the Transformer Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The transformer implementation in file
pn/pn/transformers.py
import pandas as pd # schema: * def my_transformer(df:pd.DataFrame) -> pd.DataFrame: return df
Then in
pn/pn/__init__.py
from .transformers import my_transformer from fugue import register_transformer def register_extensions(): register_transformer("mt", my_transformer) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() # use my_transformer by alias dag.df([[0]],"a:int").transform("mt").show() dag.run()
- fugue.extensions.transformer.convert.transformer(schema, **validation_rules)[source]#
Decorator for transformers
Please read the Transformer Tutorial
- Parameters
schema (Any) –
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsTransformer]
fugue.extensions.transformer.transformer#
- class fugue.extensions.transformer.transformer.CoTransformer[source]#
Bases:
ExtensionContext
The interface to process logical partitions of a zipped dataframe. A dataframe such as SparkDataFrame can be distributed. But this interface is about local process, scalability and throughput is not a concern of CoTransformer.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Commonly, if you don’t need to implement
on_init()
, you can choose the interfaceless approach which may decouple your code from Fugue.It’s important to understand Zip & Comap, and please also read the CoTransformer Tutorial
Due to similar issue on spark pickling ABC objects. This class is not ABC. If you encounter the similar issue, possible solution at
- get_format_hint()[source]#
Get the transformer’s preferred data format, for example it can be
pandas
,pyarrow
and None. This is to help the execution engine use the most efficient way to execute the logic.- Return type
Optional[str]
- get_output_schema(dfs)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
Currently,
dfs
is a collection of empty dataframes with the same structure and schemasNormally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
dfs (DataFrames) – the collection of dataframes you are going to transform. They are empty dataframes with the same structure and schemas
- Returns
Schema like object, should not be None or empty
- Return type
Any
- on_init(dfs)[source]#
Callback for initializing physical partition that contains one or multiple logical partitions. You may put expensive initialization logic here so you will not have to repeat that in
transform()
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class
You can get physical partition no (if available from the execution egnine) from
cursor()
Currently,
dfs
is a collection of empty dataframes with the same structure and schemas
- Parameters
dfs (DataFrames) – a collection of empty dataframes with the same structure and schemas
- Return type
None
- transform(dfs)[source]#
The transformation logic from a collection of dataframes (with the same partition keys) to a local dataframe.
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this class
- Parameters
dfs (DataFrames) – a collection of dataframes with the same partition keys
- Returns
transformed dataframe
- Return type
- class fugue.extensions.transformer.transformer.OutputCoTransformer[source]#
Bases:
CoTransformer
- get_output_schema(dfs)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
Currently,
dfs
is a collection of empty dataframes with the same structure and schemasNormally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
dfs (DataFrames) – the collection of dataframes you are going to transform. They are empty dataframes with the same structure and schemas
- Returns
Schema like object, should not be None or empty
- Return type
Any
- process(dfs)[source]#
- Parameters
dfs (DataFrames) –
- Return type
None
- transform(dfs)[source]#
The transformation logic from a collection of dataframes (with the same partition keys) to a local dataframe.
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this class
- Parameters
dfs (DataFrames) – a collection of dataframes with the same partition keys
- Returns
transformed dataframe
- Return type
- class fugue.extensions.transformer.transformer.OutputTransformer[source]#
Bases:
Transformer
- get_output_schema(df)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
This is the only function in this interface that is facing the entire DataFrame that is not necessarily local, for example a SparkDataFrame
Normally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
df (DataFrame) – the entire dataframe you are going to transform.
- Returns
Schema like object, should not be None or empty
- Return type
Any
- process(df)[source]#
- Parameters
df (LocalDataFrame) –
- Return type
None
- transform(df)[source]#
The transformation logic from one local dataframe to another local dataframe.
Note
This function operates on logical partition level
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this classThe input dataframe may be unbounded, but must be empty aware. It’s safe to consume it for ONLY ONCE
The input dataframe is never empty. Empty dataframes are skipped
- Parameters
df (LocalDataFrame) – one logical partition to transform on
- Returns
transformed dataframe
- Return type
- class fugue.extensions.transformer.transformer.Transformer[source]#
Bases:
ExtensionContext
The interface to process logical partitions of a dataframe. A dataframe such as SparkDataFrame can be distributed. But this interface is about local process, scalability and throughput is not a concern of Transformer.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Commonly, if you don’t need to implement
on_init()
, you can choose the interfaceless approach which may decouple your code from Fugue.It’s important to understand the Partition Tutorial, and please also read the Transformer Tutorial
Due to similar issue on spark pickling ABC objects. This class is not ABC. If you encounter the similar issue, possible solution at
- get_format_hint()[source]#
Get the transformer’s preferred data format, for example it can be
pandas
,pyarrow
and None. This is to help the execution engine use the most efficient way to execute the logic.- Return type
Optional[str]
- get_output_schema(df)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
This is the only function in this interface that is facing the entire DataFrame that is not necessarily local, for example a SparkDataFrame
Normally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
df (DataFrame) – the entire dataframe you are going to transform.
- Returns
Schema like object, should not be None or empty
- Return type
Any
- on_init(df)[source]#
Callback for initializing physical partition that contains one or multiple logical partitions. You may put expensive initialization logic here so you will not have to repeat that in
transform()
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class
You can get physical partition no (if available from the execution egnine) from
cursor()
The input dataframe may be unbounded, but must be empty aware. That means you must not consume the df by any means, and you can not count. However you can safely peek the first row of the dataframe for multiple times.
The input dataframe is never empty. Empty physical partitions are skipped
- Parameters
df (DataFrame) – the entire dataframe of this physical partition
- Return type
None
- transform(df)[source]#
The transformation logic from one local dataframe to another local dataframe.
Note
This function operates on logical partition level
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this classThe input dataframe may be unbounded, but must be empty aware. It’s safe to consume it for ONLY ONCE
The input dataframe is never empty. Empty dataframes are skipped
- Parameters
df (LocalDataFrame) – one logical partition to transform on
- Returns
transformed dataframe
- Return type