fugue.workflow#

fugue.workflow.module#

fugue.workflow.module.module(func=None, as_method=False, name=None, on_dup='overwrite')[source]#

Decorator for module

Please read Module Tutorial

Parameters
  • func (Optional[Callable]) –

  • as_method (bool) –

  • name (Optional[str]) –

  • on_dup (str) –

Return type

Any

fugue.workflow.utils#

fugue.workflow.utils.is_acceptable_raw_df(df)[source]#

Whether the input df can be converted to WorkflowDataFrame

Parameters

df (Any) – input raw dataframe

Returns

whether this dataframe is convertible

Return type

bool

fugue.workflow.utils.register_raw_df_type(df_type)[source]#

Register a base type of dataframe that can be recognized by FugueWorkflow and converted to WorkflowDataFrame

Parameters

df_type (Type) – dataframe type, for example dask.dataframe.DataFrame

Return type

None

fugue.workflow.workflow#

class fugue.workflow.workflow.FugueWorkflow(*args, **kwargs)[source]#

Bases: object

Fugue Workflow, also known as the Fugue Programming Interface.

In Fugue, we use DAG to represent workflows, DAG construction and execution are different steps, this class is mainly used in the construction step, so all things you added to the workflow is description and they are not executed until you call run()

Read this to learn how to initialize it in different ways and pros and cons.

Parameters
  • args (Any) –

  • kwargs (Any) –

add(task, *args, **kwargs)[source]#

This method should not be called directly by users. Use create(), process(), output() instead

Parameters
  • task (fugue.workflow._tasks.FugueTask) –

  • args (Any) –

  • kwargs (Any) –

Return type

fugue.workflow.workflow.WorkflowDataFrame

assert_eq(*dfs, **params)[source]#

Compare if these dataframes are equal. It’s for internal, unit test purpose only. It will convert both dataframes to LocalBoundedDataFrame, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • check_metadata – if to compare the dataframe metadatas, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if not equal

Return type

None

assert_not_eq(*dfs, **params)[source]#

Assert if all dataframes are not equal to the first dataframe. It’s for internal, unit test purpose only. It will convert both dataframes to LocalBoundedDataFrame, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • check_metadata – if to compare the dataframe metadatas, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if any dataframe equals to the first dataframe

Return type

None

property conf: triad.collections.dict.ParamDict#

All configs of this workflow and underlying ExecutionEngine (if given)

create(using, schema=None, params=None)[source]#

Run a creator to create a dataframe.

Please read the Creator Tutorial

Parameters
  • using (Any) – creator-like object, if it is a string, then it must be the alias of a registered creator

  • schema (Optional[Any]) – Schema like object, defaults to None. The creator will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the creator, defaults to None. The creator will be able to access this value from params()

  • pre_partitionPartition like object, defaults to None. The creator will be able to access this value from partition_spec()

Returns

result dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

create_data(data, schema=None, metadata=None, data_determiner=None)[source]#

Create dataframe.

Parameters
  • data (Any) – DataFrame like object or Yielded

  • schema (Optional[Any]) – Schema like object, defaults to None

  • metadata (Optional[Any]) – Parameters like object, defaults to None

  • data_determiner (Optional[Callable[[Any], str]]) – a function to compute unique id from data

Returns

a dataframe of the current workflow

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

By default, the input data does not affect the determinism of the workflow (but schema and etadata do), because the amount of compute can be unpredictable. But if you want data to affect the determinism of the workflow, you can provide the function to compute the unique id of data using data_determiner

df(data, schema=None, metadata=None, data_determiner=None)[source]#

Create dataframe. Alias of create_data()

Parameters
  • data (Any) – DataFrame like object or Yielded

  • schema (Optional[Any]) – Schema like object, defaults to None

  • metadata (Optional[Any]) – Parameters like object, defaults to None

  • data_determiner (Optional[Callable[[Any], str]]) – a function to compute unique id from data

Returns

a dataframe of the current workflow

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

By default, the input data does not affect the determinism of the workflow (but schema and etadata do), because the amount of compute can be unpredictable. But if you want data to affect the determinism of the workflow, you can provide the function to compute the unique id of data using data_determiner

get_result(df)[source]#

After run(), get the result of a dataframe defined in the dag

Returns

a calculated dataframe

Parameters

df (fugue.workflow.workflow.WorkflowDataFrame) –

Return type

fugue.dataframe.dataframe.DataFrame

Examples

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int")
dag.run()
dag.get_result(df1).show()
intersect(*dfs, distinct=True)[source]#

Intersect dataframes in dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after intersection, default to True

Returns

intersected dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

join(*dfs, how, on=None)[source]#

Join dataframes. Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

Returns

joined dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

load(path, fmt='', columns=None, **kwargs)[source]#

Load dataframe from persistent storage. Read this for details.

Parameters
  • path (str) – file path

  • fmt (str) – format hint can accept parquet, csv, json, defaults to “”, meaning to infer

  • columns (Optional[Any]) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) –

Returns

dataframe from the file

Return type

WorkflowDataFrame

out_transform(*dfs, using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform dataframes using transformer, it materializes the execution immediately and returns nothing

Please read the Transformer Tutorial

Parameters
  • dfs (Any) – DataFrames like object

  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer

  • schemaSchema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call out_transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

Return type

None

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

output(*dfs, using, params=None, pre_partition=None)[source]#

Run a outputter on dataframes.

Please read the Outputter Tutorial

Parameters
  • using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter

  • params (Optional[Any]) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The outputter will be able to access this value from partition_spec()

  • dfs (Any) –

Return type

None

process(*dfs, using, schema=None, params=None, pre_partition=None)[source]#

Run a processor on the dataframes.

Please read the Processor Tutorial

Parameters
  • dfs (Any) – DataFrames like object

  • using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor

  • schema (Optional[Any]) – Schema like object, defaults to None. The processor will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The processor will be able to access this value from partition_spec()

Returns

result dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

run(*args, **kwargs)[source]#

Execute the workflow and compute all dataframes. If not arguments, it will use NativeExecutionEngine to run the workflow.

Examples

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df2 = dag.df([[0]],"b:int")

dag.run(SparkExecutionEngine)
df1.result.show()
df2.result.show()

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df1.yield_dataframe_as("x")

result = dag.run(SparkExecutionEngine)
result["x"]  # SparkDataFrame

Read this to learn how to run in different ways and pros and cons.

Parameters
  • args (Any) –

  • kwargs (Any) –

Return type

fugue.dataframe.dataframes.DataFrames

select(*statements, sql_engine=None, sql_engine_params=None)[source]#

Execute SELECT statement using SQLEngine

Parameters
  • statements (Any) – a list of sub-statements in string or WorkflowDataFrame

  • sql_engine (Optional[Any]) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an SQLEngine type, or the SQLEngine instance (you can use None to use the default one), defaults to None

  • sql_engine_params (Optional[Any]) –

Returns

result of the SELECT statement

Return type

fugue.workflow.workflow.WorkflowDataFrame

Examples

with FugueWorkflow() as dag:
    a = dag.df([[0,"a"]],a:int,b:str)
    b = dag.df([[0]],a:int)
    c = dag.select("SELECT a FROM",a,"UNION SELECT * FROM",b)

Please read this for more examples

set_op(how, *dfs, distinct=True)[source]#

Union, subtract or intersect dataframes.

Parameters
  • how (str) – can accept union, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after the set operation, default to True

Returns

result dataframe of the set operation

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

show(*dfs, rows=10, show_count=False, title=None)[source]#

Show the dataframes. See examples.

Parameters
  • dfs (Any) – DataFrames like object

  • rows (int) – max number of rows, defaults to 10

  • show_count (bool) – whether to show total count, defaults to False

  • title (Optional[str]) – title to display on top of the dataframe, defaults to None

  • best_width – max width for the output table, defaults to 100

Return type

None

Note

  • When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.

  • When show_count is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need to persist() the dataframe. Or you can turn on Auto Persist

spec_uuid()[source]#

UUID of the workflow spec (description)

Return type

str

subtract(*dfs, distinct=True)[source]#

Subtract dfs[1:] from dfs[0].

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after subtraction, default to True

Returns

subtracted dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

transform(*dfs, using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform dataframes using transformer.

Please read the Transformer Tutorial

Parameters
  • dfs (Any) – DataFrames like object

  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer

  • schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

Returns

the transformed dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

union(*dfs, distinct=True)[source]#

Union dataframes in dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after union, default to True

Returns

unioned dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

property yields: Dict[str, fugue.collections.yielded.Yielded]#
zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=- 1)[source]#

Zip multiple dataframes together with given partition specifications.

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept inner, left_outer, right_outer, full_outer, cross, defaults to inner

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than to_file_threshold), defaults to None

  • to_file_threshold (Any) – file byte size threshold, defaults to -1

Returns

a zipped dataframe

Return type

fugue.workflow.workflow.WorkflowDataFrame

Note

  • If dfs is dict like, the zipped dataframe will be dict like, If dfs is list like, the zipped dataframe will be list like

  • It’s fine to contain only one dataframe in dfs

See also

Read CoTransformer and Zip & Comap for details

class fugue.workflow.workflow.WorkflowDataFrame(workflow, task, metadata=None)[source]#

Bases: fugue.dataframe.dataframe.DataFrame

It represents the edges in the graph constructed by FugueWorkflow. In Fugue, we use DAG to represent workflows, and the edges are strictly dataframes. DAG construction and execution are different steps, this class is used in the construction step. Although it inherits from DataFrame, it’s not concerete data. So a lot of the operations are not allowed. If you want to obtain the concrete Fugue DataFrame, use compute() to execute the workflow.

Normally, you don’t construct it by yourself, you will just use the methods of it.

Parameters
  • workflow (FugueWorkflow) – the parent workflow it belongs to

  • task (fugue.workflow._tasks.FugueTask) – the task that generates this dataframe

  • metadata (Any) – dict-like metadata, defaults to None

aggregate(*agg_cols, **kwagg_cols)[source]#

Aggregate on dataframe

Parameters
Returns

the aggregated result as a dataframe

Return type

fugue.workflow.workflow.TDF

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

# SELECT MAX(b) AS b FROM df
df.aggregate(f.max(col("b")))

# SELECT a, MAX(b) AS x FROM df GROUP BY a
df.partition_by("a").aggregate(f.max(col("b")).alias("x"))
df.partition_by("a").aggregate(x=f.max(col("b")))
alter_columns(columns)[source]#

Change column types

Parameters
  • columns (Any) – Schema like object

  • self (fugue.workflow.workflow.TDF) –

Returns

a new dataframe with the new column types

Return type

WorkflowDataFrame

Note

The output dataframe will not change the order of original schema.

Examples

>>> df.alter_columns("a:int,b;str")
anti_join(*dfs, on=None)[source]#

LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

as_array(columns=None, type_safe=False)[source]#
Raises

NotImplementedError – don’t call this method

Parameters
  • columns (Optional[List[str]]) –

  • type_safe (bool) –

Return type

List[Any]

as_array_iterable(columns=None, type_safe=False)[source]#
Raises

NotImplementedError – don’t call this method

Parameters
  • columns (Optional[List[str]]) –

  • type_safe (bool) –

Return type

Iterable[Any]

as_local()[source]#
Raises

NotImplementedError – don’t call this method

Return type

fugue.dataframe.dataframe.DataFrame

assert_eq(*dfs, **params)[source]#

Wrapper of fugue.workflow.workflow.FugueWorkflow.assert_eq() to compare this dataframe with other dataframes.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • check_metadata – if to compare the dataframe metadatas, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if not equal

Return type

None

assert_not_eq(*dfs, **params)[source]#

Wrapper of fugue.workflow.workflow.FugueWorkflow.assert_not_eq() to compare this dataframe with other dataframes.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • check_metadata – if to compare the dataframe metadatas, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if any dataframe is equal to the first dataframe

Return type

None

assign(*args, **kwargs)[source]#

Update existing columns with new values and add new columns

Parameters
  • df – the dataframe to set columns

  • args (fugue.column.expressions.ColumnExpr) – column expressions

  • kwargs (Any) – column expressions to be renamed to the argument names, if a value is not ColumnExpr, it will be treated as a literal

  • self (fugue.workflow.workflow.TDF) –

Returns

a new dataframe with the updated values

Return type

fugue.workflow.workflow.TDF

Tip

This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

from fugue import FugueWorkflow

dag = FugueWorkflow()
df = dag.df(pandas_df)

# add/set 1 as column x
df.assign(lit(1,"x"))
df.assign(x=1)

# add/set x to be a+b
df.assign((col("a")+col("b")).alias("x"))
df.assign(x=col("a")+col("b"))

# cast column a data type to double
df.assign(col("a").cast(float))

# cast + new columns
df.assign(col("a").cast(float),x=1,y=col("a")+col("b"))
broadcast()[source]#

Broadcast the current dataframe

Returns

the broadcasted dataframe

Return type

WorkflowDataFrame

Parameters

self (fugue.workflow.workflow.TDF) –

checkpoint()[source]#
Parameters

self (fugue.workflow.workflow.TDF) –

Return type

fugue.workflow.workflow.TDF

compute(*args, **kwargs)[source]#

Trigger the parent workflow to run() and to generate and cache the result dataframe this instance represent.

Examples

>>> df = FugueWorkflow().df([[0]],"a:int").transform(a_transformer)
>>> df.compute().as_pandas()  # pandas dataframe
>>> df.compute(SparkExecutionEngine).native  # spark dataframe

Note

Consider using fugue.workflow.workflow.FugueWorkflow.run() instead. Because this method actually triggers the entire workflow to run, so it may be confusing to use this method because extra time may be taken to compute unrelated dataframes.

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df2 = dag.df([[0]],"b:int")

dag.run(SparkExecutionEngine)
df1.result.show()
df2.result.show()
Return type

fugue.dataframe.dataframe.DataFrame

count()[source]#
Raises

NotImplementedError – don’t call this method

Return type

int

cross_join(*dfs)[source]#

CROSS Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

deterministic_checkpoint(lazy=False, partition=None, single=False, namespace=None, **kwargs)[source]#

Cache the dataframe as a temporary file

Parameters
  • lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – paramteters for the underlying execution engine function

  • namespace (Optional[Any]) – a value to control determinism, defaults to None.

  • self (fugue.workflow.workflow.TDF) –

Returns

the cached dataframe

Return type

fugue.workflow.workflow.TDF

Note

The difference vs strong_checkpoint() is that this checkpoint is not removed after execution, so it can take effect cross execution if the dependent compute logic is not changed.

distinct()[source]#

Get distinct dataframe. Equivalent to SELECT DISTINCT * FROM df

Returns

dataframe with unique records

Parameters

self (fugue.workflow.workflow.TDF) –

Return type

fugue.workflow.workflow.TDF

drop(columns, if_exists=False)[source]#

Drop columns from the dataframe.

Parameters
  • columns (List[str]) – columns to drop

  • if_exists (bool) – if setting to True, it will ignore non-existent columns, defaults to False

  • self (fugue.workflow.workflow.TDF) –

Returns

the dataframe after dropping columns

Return type

WorkflowDataFrame

dropna(how='any', thresh=None, subset=None)[source]#

Drops records containing NA records

Parameters
  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (Optional[int]) – int, drops rows that have less than thresh non-null values

  • subset (Optional[List[str]]) – list of columns to operate on

  • self (fugue.workflow.workflow.TDF) –

Returns

dataframe with incomplete records dropped

Return type

fugue.workflow.workflow.TDF

property empty: bool#
Raises

NotImplementedError – don’t call this method

fillna(value, subset=None)[source]#

Fills NA values with replacement values

Parameters
  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary

  • self (fugue.workflow.workflow.TDF) –

Returns

dataframe with NA records filled

Return type

fugue.workflow.workflow.TDF

filter(condition)[source]#

Filter rows by the given condition

Parameters
Returns

a new filtered dataframe

Return type

fugue.workflow.workflow.TDF

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f
from fugue import FugueWorkflow

dag = FugueWorkflow()
df = dag.df(pandas_df)

df.filter((col("a")>1) & (col("b")=="x"))
df.filter(f.coalesce(col("a"),col("b"))>1)
full_outer_join(*dfs, on=None)[source]#

CROSS Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

inner_join(*dfs, on=None)[source]#

INNER Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

intersect(*dfs, distinct=True)[source]#

Intersect this dataframe with dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after intersection, default to True

  • self (fugue.workflow.workflow.TDF) –

Returns

intersected dataframe

Return type

fugue.workflow.workflow.TDF

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

property is_bounded: bool#
Raises

NotImplementedError – don’t call this method

property is_local: bool#
Raises

NotImplementedError – don’t call this method

join(*dfs, how, on=None)[source]#

Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

left_anti_join(*dfs, on=None)[source]#

LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

left_outer_join(*dfs, on=None)[source]#

LEFT OUTER Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

left_semi_join(*dfs, on=None)[source]#

LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

property name: str#

Name of its task spec

property num_partitions: int#
Raises

NotImplementedError – don’t call this method

out_transform(using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform this dataframe using transformer. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.out_transform()

Please read the Transformer Tutorial

Parameters
  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • self (fugue.workflow.workflow.TDF) –

Return type

None

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

output(using, params=None, pre_partition=None)[source]#

Run a outputter on this dataframe. It’s a simple wrapper of fugue.workflow.workflow.FugueWorkflow.output()

Please read the Outputter Tutorial

Parameters
  • using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter

  • params (Optional[Any]) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The outputter will be able to access this value from partition_spec()

Return type

None

partition(*args, **kwargs)[source]#

Partition the current dataframe. Please read the Partition Tutorial

Parameters
  • args (Any) – Partition like object

  • kwargs (Any) – Partition like object

  • self (fugue.workflow.workflow.TDF) –

Returns

dataframe with the partition hint

Return type

WorkflowDataFrame

Note

Normally this step is fast because it’s to add a partition hint for the next step.

partition_by(*keys, **kwargs)[source]#

Partition the current dataframe by keys. Please read the Partition Tutorial. This is a wrapper of partition()

Parameters
  • keys (str) – partition keys

  • kwargs (Any) – Partition like object excluding by and partition_by

  • self (fugue.workflow.workflow.TDF) –

Returns

dataframe with the partition hint

Return type

WorkflowDataFrame

property partition_spec: fugue.collections.partition.PartitionSpec#

The partition spec set on the dataframe for next steps to use

Examples

dag = FugueWorkflow()
df = dag.df([[0],[1]], "a:int")
assert df.partition_spec.empty
df2 = df.partition(by=["a"])
assert df.partition_spec.empty
assert df2.partition_spec == PartitionSpec(by=["a"])
peek_array()[source]#
Raises

NotImplementedError – don’t call this method

Return type

Any

per_partition_by(*keys)[source]#

Partition the current dataframe by keys so each physical partition contains only one logical partition. Please read the Partition Tutorial. This is a wrapper of partition()

Parameters
  • keys (str) – partition keys

  • self (fugue.workflow.workflow.TDF) –

Returns

dataframe that is both logically and physically partitioned by keys

Return type

WorkflowDataFrame

Note

This is a hint but not enforced, certain execution engines will not respect this hint.

per_row()[source]#

Partition the current dataframe to one row per partition. Please read the Partition Tutorial. This is a wrapper of partition()

Returns

dataframe that is evenly partitioned by row count

Return type

WorkflowDataFrame

Parameters

self (fugue.workflow.workflow.TDF) –

Note

This is a hint but not enforced, certain execution engines will not respect this hint.

persist()[source]#

Persist the current dataframe

Returns

the persisted dataframe

Return type

WorkflowDataFrame

Parameters

self (fugue.workflow.workflow.TDF) –

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

persist method is considered as weak checkpoint. Sometimes, it may be necessary to use strong checkpint, which is checkpoint()

process(using, schema=None, params=None, pre_partition=None)[source]#

Run a processor on this dataframe. It’s a simple wrapper of fugue.workflow.workflow.FugueWorkflow.process()

Please read the Processor Tutorial

Parameters
  • using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor

  • schema (Optional[Any]) – Schema like object, defaults to None. The processor will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The processor will be able to access this value from partition_spec()

  • self (fugue.workflow.workflow.TDF) –

Returns

result dataframe

Return type

WorkflowDataFrame

rename(*args, **kwargs)[source]#

Rename the dataframe using a mapping dict

Parameters
  • args (Any) – list of dicts containing rename maps

  • kwargs (Any) – rename map

  • self (fugue.workflow.workflow.TDF) –

Returns

a new dataframe with the new names

Return type

WorkflowDataFrame

Note

This interface is more flexible than fugue.dataframe.dataframe.DataFrame.rename()

Examples

>>> df.rename({"a": "b"}, c="d", e="f")
property result: fugue.dataframe.dataframe.DataFrame#

The concrete DataFrame obtained from compute(). This property will not trigger compute again, but compute should have been called earlier and the result is cached.

right_outer_join(*dfs, on=None)[source]#

RIGHT OUTER Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

sample(n=None, frac=None, replace=False, seed=None)[source]#

Sample dataframe by number of rows or by fraction

Parameters
  • n (Optional[int]) – number of rows to sample, one and only one of n and fact must be set

  • frac (Optional[float]) – fraction [0,1] to sample, one and only one of n and fact must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (Optional[int]) – seed for randomness, defaults to None

  • self (fugue.workflow.workflow.TDF) –

Returns

sampled dataframe

Return type

fugue.workflow.workflow.TDF

save(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]#

Save this dataframe to a persistent storage

Parameters
  • path (str) – output path

  • fmt (str) – format hint can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition (Optional[Any]) – Partition like object, how to partition the dataframe before saving, defaults to empty

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type

None

For more details and examples, read Save & Load.

save_and_use(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]#

Save this dataframe to a persistent storage and load back to use in the following steps

Parameters
  • path (str) – output path

  • fmt (str) – format hint can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition (Optional[Any]) – Partition like object, how to partition the dataframe before saving, defaults to empty

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

  • self (fugue.workflow.workflow.TDF) –

Return type

fugue.workflow.workflow.TDF

For more details and examples, read Save & Load.

property schema: triad.collections.schema.Schema#
Raises

NotImplementedError – don’t call this method

select(*columns, where=None, having=None, distinct=False)[source]#

The functional interface for SQL select statement

Parameters
Returns

the select result as a new dataframe

Return type

fugue.workflow.workflow.TDF

New Since

0.6.0

Attention

This interface is experimental, it’s subjected to change in new versions.

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f
from fugue import FugueWorkflow

dag = FugueWorkflow()
df = dag.df(pandas_df)

# select existed and new columns
df.select("a","b",lit(1,"another")))
df.select("a",(col("b")+lit(1)).alias("x"))

# select distinct
df.select("a","b",lit(1,"another")),distinct=True)

# aggregation
# SELECT COUNT(DISTINCT *) AS x FROM df
df.select(f.count_distinct(col("*")).alias("x"))

# SELECT a, MAX(b+1) AS x FROM df GROUP BY a
df.select("a",f.max(col("b")+lit(1)).alias("x"))

# SELECT a, MAX(b+1) AS x FROM df
#   WHERE b<2 AND a>1
#   GROUP BY a
#   HAVING MAX(b+1)>0
df.select(
    "a",f.max(col("b")+lit(1)).alias("x"),
    where=(col("b")<2) & (col("a")>1),
    having=f.max(col("b")+lit(1))>0
)
semi_join(*dfs, on=None)[source]#

LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (fugue.workflow.workflow.TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

show(rows=10, show_count=False, title=None, best_width=100)[source]#

Show the dataframe. See examples.

Parameters
  • rows (int) – max number of rows, defaults to 10

  • show_count (bool) – whether to show total count, defaults to False

  • title (Optional[str]) – title to display on top of the dataframe, defaults to None

  • best_width (int) – max width for the output table, defaults to 100

Return type

None

Note

  • When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.

  • When show_count is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need to persist() the dataframe. Or you can turn on tutorial:tutorials/advanced/useful_config:auto persist

spec_uuid()[source]#

UUID of its task spec

Return type

str

strong_checkpoint(lazy=False, partition=None, single=False, **kwargs)[source]#

Cache the dataframe as a temporary file

Parameters
  • lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – paramteters for the underlying execution engine function

  • self (fugue.workflow.workflow.TDF) –

Returns

the cached dataframe

Return type

fugue.workflow.workflow.TDF

Note

Strong checkpoint guarantees the output dataframe compute dependency is from the temporary file. Use strong checkpoint only when weak_checkpoint() can’t be used.

Strong checkpoint file will be removed after the execution of the workflow.

subtract(*dfs, distinct=True)[source]#

Subtract dfs from this dataframe.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after subtraction, default to True

  • self (fugue.workflow.workflow.TDF) –

Returns

subtracted dataframe

Return type

fugue.workflow.workflow.TDF

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

take(n, presort=None, na_position='last')[source]#

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort

Parameters
  • n (int) – number of rows to return

  • presort (Optional[str]) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • self (fugue.workflow.workflow.TDF) –

Returns

n rows of DataFrame per partition

Return type

fugue.workflow.workflow.TDF

transform(using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform this dataframe using transformer. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.transform()

Please read the Transformer Tutorial

Parameters
  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer

  • schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • self (fugue.workflow.workflow.TDF) –

Returns

the transformed dataframe

Return type

WorkflowDataFrame

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

union(*dfs, distinct=True)[source]#

Union this dataframe with dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after union, default to True

  • self (fugue.workflow.workflow.TDF) –

Returns

unioned dataframe

Return type

fugue.workflow.workflow.TDF

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

weak_checkpoint(lazy=False, **kwargs)[source]#

Cache the dataframe in memory

Parameters
  • lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)

  • kwargs (Any) – paramteters for the underlying execution engine function

  • self (fugue.workflow.workflow.TDF) –

Returns

the cached dataframe

Return type

fugue.workflow.workflow.TDF

Note

Weak checkpoint in most cases is the best choice for caching a dataframe to avoid duplicated computation. However it does not guarantee to break up the the compute dependency for this dataframe, so when you have very complicated compute, you may encounter issues such as stack overflow. Also, weak checkpoint normally caches the dataframe in memory, if memory is a concern, then you should consider strong_checkpoint()

property workflow: fugue.workflow.workflow.FugueWorkflow#

The parent workflow

yield_dataframe_as(name, as_local=False)[source]#

Yield a dataframe that can be accessed without the current execution engine

Parameters
  • name (str) – the name of the yielded dataframe

  • as_local (bool) – yield the local version of the dataframe

  • self (fugue.workflow.workflow.TDF) –

Return type

None

Note

When as_local is True, it can trigger an additional compute to do the conversion. To avoid recompute, you should add persist before yielding.

yield_file_as(name)[source]#

Cache the dataframe in file

Parameters
  • name (str) – the name of the yielded dataframe

  • self (fugue.workflow.workflow.TDF) –

Return type

None

Note

In only the following cases you can yield file:

  • you have not checkpointed (persisted) the dataframe, for example df.yield_file_as("a")

  • you have used deterministic_checkpoint(), for example df.deterministic_checkpoint().yield_file_as("a")

  • yield is workflow, compile level logic

For the first case, the yield will also be a strong checkpoint so whenever you yield a dataframe as a file, the dataframe has been saved as a file and loaded back as a new dataframe.

zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=- 1)[source]#

Zip this data frame with multiple dataframes together with given partition specifications. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.zip().

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept inner, left_outer, right_outer, full_outer, cross, defaults to inner

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than to_file_threshold), defaults to None

  • to_file_threshold (Any) – file byte size threshold, defaults to -1

  • self (fugue.workflow.workflow.TDF) –

Returns

a zipped dataframe

Return type

WorkflowDataFrame

Note

See also

Read CoTransformer and Zip & Comap for details

class fugue.workflow.workflow.WorkflowDataFrames(*args, **kwargs)[source]#

Bases: fugue.dataframe.dataframes.DataFrames

Ordered dictionary of WorkflowDataFrames. There are two modes: with keys and without keys. If without key _<n> will be used as the key for each dataframe, and it will be treated as an array in Fugue framework.

It’s immutable, once initialized, you can’t add or remove element from it.

It’s a subclass of DataFrames, but different from DataFrames, in the initialization you should always use WorkflowDataFrame, and they should all come from the same FugueWorkflow.

Examples

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df2 = dag.df([[0]],"b:int")
dfs1 = WorkflowDataFrames(df1, df2)  # as array
dfs2 = WorkflowDataFrames([df1, df2])  # as array
dfs3 = WorkflowDataFrames(a=df1, b=df2)  # as dict
dfs4 = WorkflowDataFrames(dict(a=df1, b=df2))  # as dict
dfs5 = WorkflowDataFrames(dfs4, c=df2)  # copy and update
dfs5["b"].show()  # how you get element when it's a dict
dfs1[0].show()  # how you get element when it's an array
Parameters
  • args (Any) –

  • kwargs (Any) –

property workflow: fugue.workflow.workflow.FugueWorkflow#

The parent workflow