fugue.workflow#

fugue.workflow.api#

fugue.workflow.api.out_transform(df, using, params=None, partition=None, callback=None, ignore_errors=None, engine=None, engine_conf=None)[source]#

Transform this dataframe using transformer. It’s a wrapper of out_transform() and run(). It will let you do the basic dataframe transformation without using FugueWorkflow and DataFrame. Only native types are accepted for both input and output.

Please read the Transformer Tutorial

Parameters
  • df (Any) – DataFrame like object or Yielded or a path string to a parquet file

  • using (Any) – transformer-like object, can’t be a string expression

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None The transformer will be able to access this value from params()

  • partition (Optional[Any]) – Partition like object, defaults to None

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)

  • engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an ExecutionEngine type, or the ExecutionEngine instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can use None for either of them to use the default one), defaults to None

  • engine_conf (Optional[Any]) – Parameters like object, defaults to None

Return type

None

Note

This function can only take parquet file paths in df. CSV and JSON file formats are disallowed.

This transformation is guaranteed to execute immediately (eager) and return nothing

fugue.workflow.api.raw_sql(*statements, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#

Run raw SQL on the execution engine

Parameters
  • statements (Any) – a sequence of sub-statements in string or dataframe-like objects

  • engine (Optional[Any]) – an engine like object, defaults to None

  • engine_conf (Optional[Any]) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether return a local dataframe, defaults to False

Returns

the result dataframe

Return type

AnyDataFrame

Caution

Currently, only SELECT statements are supported

Examples

import pandas as pd
import fugue.api as fa

with fa.engine_context("duckdb"):
    a = fa.as_fugue_df([[0,1]], schema="a:long,b:long")
    b = pd.DataFrame([[0,10]], columns=["a","b"])
    c = fa.raw_sql("SELECT * FROM",a,"UNION SELECT * FROM",b)
    fa.as_pandas(c)
fugue.workflow.api.transform(df, using, schema=None, params=None, partition=None, callback=None, ignore_errors=None, persist=False, as_local=False, save_path=None, checkpoint=False, engine=None, engine_conf=None, as_fugue=False)[source]#

Transform this dataframe using transformer. It’s a wrapper of transform() and run(). It will let you do the basic dataframe transformation without using FugueWorkflow and DataFrame. Also, only native types are accepted for both input and output.

Please read the Transformer Tutorial

Parameters
  • df (Any) – DataFrame like object or Yielded or a path string to a parquet file

  • using (Any) – transformer-like object, can’t be a string expression

  • schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • partition (Optional[Any]) – Partition like object, defaults to None

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)

  • engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an ExecutionEngine type, or the ExecutionEngine instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can use None for either of them to use the default one), defaults to None

  • engine_conf (Optional[Any]) – Parameters like object, defaults to None

  • as_fugue (bool) – If true, the function will always return a FugueDataFrame, otherwise, if df is in native dataframe types such as pandas dataframe, then the output will also return in its native format. Defaults to False

  • persist (bool) – Whether to persist(materialize) the dataframe before returning

  • as_local (bool) – If true, the result will be converted to a LocalDataFrame

  • save_path (Optional[str]) – Whether to save the output to a file (see the note)

  • checkpoint (bool) – Whether to add a checkpoint for the output (see the note)

Returns

the transformed dataframe, if df is a native dataframe (e.g. pd.DataFrame, spark dataframe, etc), the output will be a native dataframe, the type is determined by the execution engine you use. But if df is of type DataFrame, then the output will also be a DataFrame

Return type

Any

Note

This function may be lazy and return the transformed dataframe.

Note

When you use callback in this function, you must be careful that the output dataframe must be materialized. Otherwise, if the real compute happens out of the function call, the callback receiver is already shut down. To do that you can either use persist or as_local, both will materialize the dataframe before the callback receiver shuts down.

Note

  • When save_path is None and checkpoint is False, then the output will not be saved into a file. The return will be a dataframe.

  • When save_path is None and checkpoint is True, then the output is saved into the path set by fugue.workflow.checkpoint.path, the name will be randomly chosen, and it is NOT a deterministic checkpoint, so if you run multiple times, the output will be saved into different files. The return will be a dataframe.

  • When save_path is not None and checkpoint is False, then the output will be saved into save_path. The return will be the value of save_path

  • When save_path is not None and checkpoint is True, then the output will be saved into save_path. The return will be the dataframe from save_path

This function can only take parquet file paths in df and save_path. Csv and other file formats are disallowed.

The checkpoint here is NOT deterministic, so re-run will generate new checkpoints.

If you want to read and write other file formats or if you want to use deterministic checkpoints, please use FugueWorkflow.

fugue.workflow.input#

fugue.workflow.input.register_raw_df_type(df_type)[source]#

TODO: This function is to be removed before 0.9.0

Deprecated since version 0.8.0: Register using fugue.api.is_df() instead.

Parameters

df_type (Type) –

Return type

None

fugue.workflow.module#

fugue.workflow.module.module(func=None, as_method=False, name=None, on_dup='overwrite')[source]#

Decorator for module

Please read Module Tutorial

Parameters
  • func (Optional[Callable]) –

  • as_method (bool) –

  • name (Optional[str]) –

  • on_dup (str) –

Return type

Any

fugue.workflow.workflow#

class fugue.workflow.workflow.FugueWorkflow(compile_conf=None)[source]#

Bases: object

Fugue Workflow, also known as the Fugue Programming Interface.

In Fugue, we use DAG to represent workflows, DAG construction and execution are different steps, this class is mainly used in the construction step, so all things you added to the workflow is description and they are not executed until you call run()

Read this to learn how to initialize it in different ways and pros and cons.

Parameters

compile_conf (Any) –

add(task, *args, **kwargs)[source]#

This method should not be called directly by users. Use create(), process(), output() instead

Parameters
  • task (FugueTask) –

  • args (Any) –

  • kwargs (Any) –

Return type

WorkflowDataFrame

assert_eq(*dfs, **params)[source]#

Compare if these dataframes are equal. It’s for internal, unit test purpose only. It will convert both dataframes to LocalBoundedDataFrame, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if not equal

Return type

None

assert_not_eq(*dfs, **params)[source]#

Assert if all dataframes are not equal to the first dataframe. It’s for internal, unit test purpose only. It will convert both dataframes to LocalBoundedDataFrame, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if any dataframe equals to the first dataframe

Return type

None

property conf: ParamDict#

Compile time configs

create(using, schema=None, params=None)[source]#

Run a creator to create a dataframe.

Please read the Creator Tutorial

Parameters
  • using (Any) – creator-like object, if it is a string, then it must be the alias of a registered creator

  • schema (Optional[Any]) – Schema like object, defaults to None. The creator will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the creator, defaults to None. The creator will be able to access this value from params()

  • pre_partitionPartition like object, defaults to None. The creator will be able to access this value from partition_spec()

Returns

result dataframe

Return type

WorkflowDataFrame

create_data(data, schema=None, data_determiner=None)[source]#

Create dataframe.

Parameters
  • data (Any) – DataFrame like object or Yielded

  • schema (Optional[Any]) – Schema like object, defaults to None

  • data_determiner (Optional[Callable[[Any], Any]]) – a function to compute unique id from data

Returns

a dataframe of the current workflow

Return type

WorkflowDataFrame

Note

By default, the input data does not affect the determinism of the workflow (but schema and etadata do), because the amount of compute can be unpredictable. But if you want data to affect the determinism of the workflow, you can provide the function to compute the unique id of data using data_determiner

df(data, schema=None, data_determiner=None)[source]#

Create dataframe. Alias of create_data()

Parameters
  • data (Any) – DataFrame like object or Yielded

  • schema (Optional[Any]) – Schema like object, defaults to None

  • data_determiner (Optional[Callable[[Any], str]]) – a function to compute unique id from data

Returns

a dataframe of the current workflow

Return type

WorkflowDataFrame

Note

By default, the input data does not affect the determinism of the workflow (but schema and etadata do), because the amount of compute can be unpredictable. But if you want data to affect the determinism of the workflow, you can provide the function to compute the unique id of data using data_determiner

get_result(df)[source]#

After run(), get the result of a dataframe defined in the dag

Returns

a calculated dataframe

Parameters

df (WorkflowDataFrame) –

Return type

DataFrame

Examples

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int")
dag.run()
dag.get_result(df1).show()
intersect(*dfs, distinct=True)[source]#

Intersect dataframes in dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after intersection, default to True

Returns

intersected dataframe

Return type

WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

join(*dfs, how, on=None)[source]#

Join dataframes. Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

Returns

joined dataframe

Return type

WorkflowDataFrame

property last_df: Optional[WorkflowDataFrame]#
load(path, fmt='', columns=None, **kwargs)[source]#

Load dataframe from persistent storage. Read this for details.

Parameters
  • path (str) – file path

  • fmt (str) – format hint can accept parquet, csv, json, defaults to “”, meaning to infer

  • columns (Optional[Any]) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) –

Returns

dataframe from the file

Return type

WorkflowDataFrame

out_transform(*dfs, using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform dataframes using transformer, it materializes the execution immediately and returns nothing

Please read the Transformer Tutorial

Parameters
  • dfs (Any) – DataFrames like object

  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer

  • schemaSchema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call out_transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

Return type

None

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

output(*dfs, using, params=None, pre_partition=None)[source]#

Run a outputter on dataframes.

Please read the Outputter Tutorial

Parameters
  • using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter

  • params (Optional[Any]) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The outputter will be able to access this value from partition_spec()

  • dfs (Any) –

Return type

None

process(*dfs, using, schema=None, params=None, pre_partition=None)[source]#

Run a processor on the dataframes.

Please read the Processor Tutorial

Parameters
  • dfs (Any) – DataFrames like object

  • using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor

  • schema (Optional[Any]) – Schema like object, defaults to None. The processor will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The processor will be able to access this value from partition_spec()

Returns

result dataframe

Return type

WorkflowDataFrame

run(engine=None, conf=None, **kwargs)[source]#

Execute the workflow and compute all dataframes.

Note

For inputs, please read engine_context()

Parameters
  • engine (Optional[Any]) – object that can be recognized as an engine, defaults to None

  • conf (Optional[Any]) – engine config, defaults to None

  • kwargs (Any) – additional parameters to initialize the execution engine

Returns

the result set

Return type

FugueWorkflowResult

Examples

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df2 = dag.df([[0]],"b:int")

dag.run(SparkExecutionEngine)
df1.result.show()
df2.result.show()

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df1.yield_dataframe_as("x")

result = dag.run(SparkExecutionEngine)
result["x"]  # SparkDataFrame

Read this to learn how to run in different ways and pros and cons.

select(*statements, sql_engine=None, sql_engine_params=None, dialect='spark')[source]#

Execute SELECT statement using SQLEngine

Parameters
  • statements (Any) – a list of sub-statements in string or WorkflowDataFrame

  • sql_engine (Optional[Any]) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an SQLEngine type, or the SQLEngine instance (you can use None to use the default one), defaults to None

  • sql_engine_params (Optional[Any]) –

  • dialect (Optional[str]) –

Returns

result of the SELECT statement

Return type

WorkflowDataFrame

Examples

with FugueWorkflow() as dag:
    a = dag.df([[0,"a"]],a:int,b:str)
    b = dag.df([[0]],a:int)
    c = dag.select("SELECT a FROM",a,"UNION SELECT * FROM",b)
dag.run()

Please read this for more examples

set_op(how, *dfs, distinct=True)[source]#

Union, subtract or intersect dataframes.

Parameters
  • how (str) – can accept union, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after the set operation, default to True

Returns

result dataframe of the set operation

Return type

WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

show(*dfs, n=10, with_count=False, title=None)[source]#

Show the dataframes. See examples.

Parameters
  • dfs (Any) – DataFrames like object

  • n (int) – max number of rows, defaults to 10

  • with_count (bool) – whether to show total count, defaults to False

  • title (Optional[str]) – title to display on top of the dataframe, defaults to None

  • best_width – max width for the output table, defaults to 100

Return type

None

Note

  • When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.

  • When with_count is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need to persist() the dataframe. Or you can turn on Auto Persist

spec_uuid()[source]#

UUID of the workflow spec (description)

Return type

str

subtract(*dfs, distinct=True)[source]#

Subtract dfs[1:] from dfs[0].

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after subtraction, default to True

Returns

subtracted dataframe

Return type

WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

transform(*dfs, using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform dataframes using transformer.

Please read the Transformer Tutorial

Parameters
  • dfs (Any) – DataFrames like object

  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer

  • schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

Returns

the transformed dataframe

Return type

WorkflowDataFrame

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

union(*dfs, distinct=True)[source]#

Union dataframes in dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after union, default to True

Returns

unioned dataframe

Return type

WorkflowDataFrame

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

property yields: Dict[str, Yielded]#
zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=-1)[source]#

Zip multiple dataframes together with given partition specifications.

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept inner, left_outer, right_outer, full_outer, cross, defaults to inner

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than to_file_threshold), defaults to None

  • to_file_threshold (Any) – file byte size threshold, defaults to -1

Returns

a zipped dataframe

Return type

WorkflowDataFrame

Note

  • If dfs is dict like, the zipped dataframe will be dict like, If dfs is list like, the zipped dataframe will be list like

  • It’s fine to contain only one dataframe in dfs

See also

Read CoTransformer and Zip & Comap for details

class fugue.workflow.workflow.FugueWorkflowResult(yields)[source]#

Bases: DataFrames

The result object of run(). Users should not construct this object.

Parameters
  • DataFrames – yields of the workflow

  • yields (Dict[str, Yielded]) –

property yields: Dict[str, Any]#
class fugue.workflow.workflow.WorkflowDataFrame(workflow, task)[source]#

Bases: DataFrame

It represents the edges in the graph constructed by FugueWorkflow. In Fugue, we use DAG to represent workflows, and the edges are strictly dataframes. DAG construction and execution are different steps, this class is used in the construction step. Although it inherits from DataFrame, it’s not concerete data. So a lot of the operations are not allowed. If you want to obtain the concrete Fugue DataFrame, use compute() to execute the workflow.

Normally, you don’t construct it by yourself, you will just use the methods of it.

Parameters
  • workflow (FugueWorkflow) – the parent workflow it belongs to

  • task (FugueTask) – the task that generates this dataframe

aggregate(*agg_cols, **kwagg_cols)[source]#

Aggregate on dataframe

Parameters
  • df – the dataframe to aggregate on

  • agg_cols (ColumnExpr) – aggregation expressions

  • kwagg_cols (ColumnExpr) – aggregation expressions to be renamed to the argument names

  • self (TDF) –

Returns

the aggregated result as a dataframe

Return type

TDF

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

# SELECT MAX(b) AS b FROM df
df.aggregate(f.max(col("b")))

# SELECT a, MAX(b) AS x FROM df GROUP BY a
df.partition_by("a").aggregate(f.max(col("b")).alias("x"))
df.partition_by("a").aggregate(x=f.max(col("b")))
alter_columns(columns)[source]#

Change column types

Parameters
  • columns (Any) – Schema like object

  • self (TDF) –

Returns

a new dataframe with the new column types

Return type

WorkflowDataFrame

Note

The output dataframe will not change the order of original schema.

Examples

>>> df.alter_columns("a:int,b;str")
anti_join(*dfs, on=None)[source]#

LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

as_array(columns=None, type_safe=False)[source]#
Raises

NotImplementedError – don’t call this method

Parameters
  • columns (Optional[List[str]]) –

  • type_safe (bool) –

Return type

List[Any]

as_array_iterable(columns=None, type_safe=False)[source]#
Raises

NotImplementedError – don’t call this method

Parameters
  • columns (Optional[List[str]]) –

  • type_safe (bool) –

Return type

Iterable[Any]

as_local()[source]#
Raises

NotImplementedError – don’t call this method

Return type

DataFrame

as_local_bounded()[source]#
Raises

NotImplementedError – don’t call this method

Return type

DataFrame

assert_eq(*dfs, **params)[source]#

Wrapper of fugue.workflow.workflow.FugueWorkflow.assert_eq() to compare this dataframe with other dataframes.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if not equal

Return type

None

assert_not_eq(*dfs, **params)[source]#

Wrapper of fugue.workflow.workflow.FugueWorkflow.assert_not_eq() to compare this dataframe with other dataframes.

Parameters
  • dfs (Any) – DataFrames like object

  • digits – precision on float number comparison, defaults to 8

  • check_order – if to compare the row orders, defaults to False

  • check_schema – if compare schemas, defaults to True

  • check_content – if to compare the row values, defaults to True

  • no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False

  • params (Any) –

Raises

AssertionError – if any dataframe is equal to the first dataframe

Return type

None

assign(*args, **kwargs)[source]#

Update existing columns with new values and add new columns

Parameters
  • df – the dataframe to set columns

  • args (ColumnExpr) – column expressions

  • kwargs (Any) – column expressions to be renamed to the argument names, if a value is not ColumnExpr, it will be treated as a literal

  • self (TDF) –

Returns

a new dataframe with the updated values

Return type

TDF

Tip

This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

from fugue import FugueWorkflow

dag = FugueWorkflow()
df = dag.df(pandas_df)

# add/set 1 as column x
df.assign(lit(1,"x"))
df.assign(x=1)

# add/set x to be a+b
df.assign((col("a")+col("b")).alias("x"))
df.assign(x=col("a")+col("b"))

# cast column a data type to double
df.assign(col("a").cast(float))

# cast + new columns
df.assign(col("a").cast(float),x=1,y=col("a")+col("b"))
broadcast()[source]#

Broadcast the current dataframe

Returns

the broadcasted dataframe

Return type

WorkflowDataFrame

Parameters

self (TDF) –

checkpoint(storage_type='file')[source]#
Parameters
  • self (TDF) –

  • storage_type (str) –

Return type

TDF

compute(*args, **kwargs)[source]#

Trigger the parent workflow to run() and to generate and cache the result dataframe this instance represent.

Examples

>>> df = FugueWorkflow().df([[0]],"a:int").transform(a_transformer)
>>> df.compute().as_pandas()  # pandas dataframe
>>> df.compute(SparkExecutionEngine).native  # spark dataframe

Note

Consider using fugue.workflow.workflow.FugueWorkflow.run() instead. Because this method actually triggers the entire workflow to run, so it may be confusing to use this method because extra time may be taken to compute unrelated dataframes.

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df2 = dag.df([[0]],"b:int")

dag.run(SparkExecutionEngine)
df1.result.show()
df2.result.show()
Return type

DataFrame

count()[source]#
Raises

NotImplementedError – don’t call this method

Return type

int

cross_join(*dfs)[source]#

CROSS Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

deterministic_checkpoint(storage_type='file', lazy=False, partition=None, single=False, namespace=None, **kwargs)[source]#

Cache the dataframe as a temporary file

Parameters
  • storage_type (str) – can be either file or table, defaults to file

  • lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – paramteters for the underlying execution engine function

  • namespace (Optional[Any]) – a value to control determinism, defaults to None.

  • self (TDF) –

Returns

the cached dataframe

Return type

TDF

Note

The difference vs strong_checkpoint() is that this checkpoint is not removed after execution, so it can take effect cross execution if the dependent compute logic is not changed.

distinct()[source]#

Get distinct dataframe. Equivalent to SELECT DISTINCT * FROM df

Returns

dataframe with unique records

Parameters

self (TDF) –

Return type

TDF

drop(columns, if_exists=False)[source]#

Drop columns from the dataframe.

Parameters
  • columns (List[str]) – columns to drop

  • if_exists (bool) – if setting to True, it will ignore non-existent columns, defaults to False

  • self (TDF) –

Returns

the dataframe after dropping columns

Return type

WorkflowDataFrame

dropna(how='any', thresh=None, subset=None)[source]#

Drops records containing NA records

Parameters
  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (Optional[int]) – int, drops rows that have less than thresh non-null values

  • subset (Optional[List[str]]) – list of columns to operate on

  • self (TDF) –

Returns

dataframe with incomplete records dropped

Return type

TDF

property empty: bool#
Raises

NotImplementedError – don’t call this method

fillna(value, subset=None)[source]#

Fills NA values with replacement values

Parameters
  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary

  • self (TDF) –

Returns

dataframe with NA records filled

Return type

TDF

filter(condition)[source]#

Filter rows by the given condition

Parameters
  • df – the dataframe to be filtered

  • condition (ColumnExpr) – (boolean) column expression

  • self (TDF) –

Returns

a new filtered dataframe

Return type

TDF

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f
from fugue import FugueWorkflow

dag = FugueWorkflow()
df = dag.df(pandas_df)

df.filter((col("a")>1) & (col("b")=="x"))
df.filter(f.coalesce(col("a"),col("b"))>1)
full_outer_join(*dfs, on=None)[source]#

CROSS Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

head(n, columns=None)[source]#

Get first n rows of the dataframe as a new local bounded dataframe

Parameters
  • n (int) – number of rows

  • columns (Optional[List[str]]) – selected columns, defaults to None (all columns)

Returns

a local bounded dataframe

Return type

LocalBoundedDataFrame

inner_join(*dfs, on=None)[source]#

INNER Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

intersect(*dfs, distinct=True)[source]#

Intersect this dataframe with dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after intersection, default to True

  • self (TDF) –

Returns

intersected dataframe

Return type

TDF

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

property is_bounded: bool#
Raises

NotImplementedError – don’t call this method

property is_local: bool#
Raises

NotImplementedError – don’t call this method

join(*dfs, how, on=None)[source]#

Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

left_anti_join(*dfs, on=None)[source]#

LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

left_outer_join(*dfs, on=None)[source]#

LEFT OUTER Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

left_semi_join(*dfs, on=None)[source]#

LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

property name: str#

Name of its task spec

property native: Any#

The native object this Dataset class wraps

native_as_df()[source]#

The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its native_as_df should be either a pandas dataframe or an arrow dataframe.

Return type

Any

property num_partitions: int#
Raises

NotImplementedError – don’t call this method

out_transform(using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform this dataframe using transformer. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.out_transform()

Please read the Transformer Tutorial

Parameters
  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • self (TDF) –

Return type

None

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

output(using, params=None, pre_partition=None)[source]#

Run a outputter on this dataframe. It’s a simple wrapper of fugue.workflow.workflow.FugueWorkflow.output()

Please read the Outputter Tutorial

Parameters
  • using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter

  • params (Optional[Any]) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The outputter will be able to access this value from partition_spec()

Return type

None

partition(*args, **kwargs)[source]#

Partition the current dataframe. Please read the Partition Tutorial

Parameters
  • args (Any) – Partition like object

  • kwargs (Any) – Partition like object

  • self (TDF) –

Returns

dataframe with the partition hint

Return type

WorkflowDataFrame

Note

Normally this step is fast because it’s to add a partition hint for the next step.

partition_by(*keys, **kwargs)[source]#

Partition the current dataframe by keys. Please read the Partition Tutorial. This is a wrapper of partition()

Parameters
  • keys (str) – partition keys

  • kwargs (Any) – Partition like object excluding by and partition_by

  • self (TDF) –

Returns

dataframe with the partition hint

Return type

WorkflowDataFrame

property partition_spec: PartitionSpec#

The partition spec set on the dataframe for next steps to use

Examples

dag = FugueWorkflow()
df = dag.df([[0],[1]], "a:int")
assert df.partition_spec.empty
df2 = df.partition(by=["a"])
assert df.partition_spec.empty
assert df2.partition_spec == PartitionSpec(by=["a"])
peek_array()[source]#
Raises

NotImplementedError – don’t call this method

Return type

List[Any]

per_partition_by(*keys)[source]#

Partition the current dataframe by keys so each physical partition contains only one logical partition. Please read the Partition Tutorial. This is a wrapper of partition()

Parameters
  • keys (str) – partition keys

  • self (TDF) –

Returns

dataframe that is both logically and physically partitioned by keys

Return type

WorkflowDataFrame

Note

This is a hint but not enforced, certain execution engines will not respect this hint.

per_row()[source]#

Partition the current dataframe to one row per partition. Please read the Partition Tutorial. This is a wrapper of partition()

Returns

dataframe that is evenly partitioned by row count

Return type

WorkflowDataFrame

Parameters

self (TDF) –

Note

This is a hint but not enforced, certain execution engines will not respect this hint.

persist()[source]#

Persist the current dataframe

Returns

the persisted dataframe

Return type

WorkflowDataFrame

Parameters

self (TDF) –

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

persist method is considered as weak checkpoint. Sometimes, it may be necessary to use strong checkpint, which is checkpoint()

process(using, schema=None, params=None, pre_partition=None)[source]#

Run a processor on this dataframe. It’s a simple wrapper of fugue.workflow.workflow.FugueWorkflow.process()

Please read the Processor Tutorial

Parameters
  • using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor

  • schema (Optional[Any]) – Schema like object, defaults to None. The processor will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. The processor will be able to access this value from partition_spec()

  • self (TDF) –

Returns

result dataframe

Return type

WorkflowDataFrame

rename(*args, **kwargs)[source]#

Rename the dataframe using a mapping dict

Parameters
  • args (Any) – list of dicts containing rename maps

  • kwargs (Any) – rename map

  • self (TDF) –

Returns

a new dataframe with the new names

Return type

WorkflowDataFrame

Note

This interface is more flexible than fugue.dataframe.dataframe.DataFrame.rename()

Examples

>>> df.rename({"a": "b"}, c="d", e="f")
property result: DataFrame#

The concrete DataFrame obtained from compute(). This property will not trigger compute again, but compute should have been called earlier and the result is cached.

right_outer_join(*dfs, on=None)[source]#

RIGHT OUTER Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

sample(n=None, frac=None, replace=False, seed=None)[source]#

Sample dataframe by number of rows or by fraction

Parameters
  • n (Optional[int]) – number of rows to sample, one and only one of n and fact must be set

  • frac (Optional[float]) – fraction [0,1] to sample, one and only one of n and fact must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (Optional[int]) – seed for randomness, defaults to None

  • self (TDF) –

Returns

sampled dataframe

Return type

TDF

save(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]#

Save this dataframe to a persistent storage

Parameters
  • path (str) – output path

  • fmt (str) – format hint can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition (Optional[Any]) – Partition like object, how to partition the dataframe before saving, defaults to empty

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type

None

For more details and examples, read Save & Load.

save_and_use(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]#

Save this dataframe to a persistent storage and load back to use in the following steps

Parameters
  • path (str) – output path

  • fmt (str) – format hint can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition (Optional[Any]) – Partition like object, how to partition the dataframe before saving, defaults to empty

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

  • self (TDF) –

Return type

TDF

For more details and examples, read Save & Load.

property schema: Schema#
Raises

NotImplementedError – don’t call this method

select(*columns, where=None, having=None, distinct=False)[source]#

The functional interface for SQL select statement

Parameters
  • columns (Union[str, ColumnExpr]) – column expressions, for strings they will represent the column names

  • where (Optional[ColumnExpr]) – WHERE condition expression, defaults to None

  • having (Optional[ColumnExpr]) – having condition expression, defaults to None. It is used when cols contains aggregation columns, defaults to None

  • distinct (bool) – whether to return distinct result, defaults to False

  • self (TDF) –

Returns

the select result as a new dataframe

Return type

TDF

New Since

0.6.0

Attention

This interface is experimental, it’s subjected to change in new versions.

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f
from fugue import FugueWorkflow

dag = FugueWorkflow()
df = dag.df(pandas_df)

# select existed and new columns
df.select("a","b",lit(1,"another")))
df.select("a",(col("b")+lit(1)).alias("x"))

# select distinct
df.select("a","b",lit(1,"another")),distinct=True)

# aggregation
# SELECT COUNT(DISTINCT *) AS x FROM df
df.select(f.count_distinct(all_cols()).alias("x"))

# SELECT a, MAX(b+1) AS x FROM df GROUP BY a
df.select("a",f.max(col("b")+lit(1)).alias("x"))

# SELECT a, MAX(b+1) AS x FROM df
#   WHERE b<2 AND a>1
#   GROUP BY a
#   HAVING MAX(b+1)>0
df.select(
    "a",f.max(col("b")+lit(1)).alias("x"),
    where=(col("b")<2) & (col("a")>1),
    having=f.max(col("b")+lit(1))>0
)
semi_join(*dfs, on=None)[source]#

LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.join(). Read Join tutorials on workflow and engine for details

Parameters
  • dfs (Any) – DataFrames like object

  • on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None

  • self (TDF) –

Returns

joined dataframe

Return type

WorkflowDataFrame

show(n=10, with_count=False, title=None, best_width=100)[source]#

Show the dataframe. See examples.

Parameters
  • n (int) – max number of rows, defaults to 10

  • with_count (bool) – whether to show total count, defaults to False

  • title (Optional[str]) – title to display on top of the dataframe, defaults to None

  • best_width (int) – max width for the output table, defaults to 100

Return type

None

Note

  • When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.

  • When with_count is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need to persist() the dataframe. Or you can turn on tutorial:tutorials/advanced/useful_config:auto persist

spec_uuid()[source]#

UUID of its task spec

Return type

str

strong_checkpoint(storage_type='file', lazy=False, partition=None, single=False, **kwargs)[source]#

Cache the dataframe as a temporary file

Parameters
  • storage_type (str) – can be either file or table, defaults to file

  • lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – paramteters for the underlying execution engine function

  • self (TDF) –

Returns

the cached dataframe

Return type

TDF

Note

Strong checkpoint guarantees the output dataframe compute dependency is from the temporary file. Use strong checkpoint only when weak_checkpoint() can’t be used.

Strong checkpoint file will be removed after the execution of the workflow.

subtract(*dfs, distinct=True)[source]#

Subtract dfs from this dataframe.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after subtraction, default to True

  • self (TDF) –

Returns

subtracted dataframe

Return type

TDF

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

take(n, presort=None, na_position='last')[source]#

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort

Parameters
  • n (int) – number of rows to return

  • presort (Optional[str]) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • self (TDF) –

Returns

n rows of DataFrame per partition

Return type

TDF

transform(using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#

Transform this dataframe using transformer. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.transform()

Please read the Transformer Tutorial

Parameters
  • using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer

  • schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call partition() and then call transform() without this parameter

  • ignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • self (TDF) –

Returns

the transformed dataframe

Return type

WorkflowDataFrame

Note

transform() can be lazy and will return the transformed dataframe, out_transform() is guaranteed to execute immediately (eager) and return nothing

union(*dfs, distinct=True)[source]#

Union this dataframe with dfs.

Parameters
  • dfs (Any) – DataFrames like object

  • distinct (bool) – whether to perform distinct after union, default to True

  • self (TDF) –

Returns

unioned dataframe

Return type

TDF

Note

Currently, all dataframes in dfs must have identical schema, otherwise exception will be thrown.

weak_checkpoint(lazy=False, **kwargs)[source]#

Cache the dataframe in memory

Parameters
  • lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)

  • kwargs (Any) – paramteters for the underlying execution engine function

  • self (TDF) –

Returns

the cached dataframe

Return type

TDF

Note

Weak checkpoint in most cases is the best choice for caching a dataframe to avoid duplicated computation. However it does not guarantee to break up the the compute dependency for this dataframe, so when you have very complicated compute, you may encounter issues such as stack overflow. Also, weak checkpoint normally caches the dataframe in memory, if memory is a concern, then you should consider strong_checkpoint()

property workflow: FugueWorkflow#

The parent workflow

yield_dataframe_as(name, as_local=False)[source]#

Yield a dataframe that can be accessed without the current execution engine

Parameters
  • name (str) – the name of the yielded dataframe

  • as_local (bool) – yield the local version of the dataframe

  • self (TDF) –

Return type

None

Note

When as_local is True, it can trigger an additional compute to do the conversion. To avoid recompute, you should add persist before yielding.

yield_file_as(name)[source]#

Cache the dataframe in file

Parameters
  • name (str) – the name of the yielded dataframe

  • self (TDF) –

Return type

None

Note

In only the following cases you can yield file/table:

  • you have not checkpointed (persisted) the dataframe, for example df.yield_file_as("a")

  • you have used deterministic_checkpoint(), for example df.deterministic_checkpoint().yield_file_as("a")

  • yield is workflow, compile level logic

For the first case, the yield will also be a strong checkpoint so whenever you yield a dataframe as a file, the dataframe has been saved as a file and loaded back as a new dataframe.

yield_table_as(name)[source]#

Cache the dataframe as a table

Parameters
  • name (str) – the name of the yielded dataframe

  • self (TDF) –

Return type

None

Note

In only the following cases you can yield file/table:

  • you have not checkpointed (persisted) the dataframe, for example df.yield_file_as("a")

  • you have used deterministic_checkpoint(), for example df.deterministic_checkpoint().yield_file_as("a")

  • yield is workflow, compile level logic

For the first case, the yield will also be a strong checkpoint so whenever you yield a dataframe as a file, the dataframe has been saved as a file and loaded back as a new dataframe.

zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=-1)[source]#

Zip this data frame with multiple dataframes together with given partition specifications. It’s a wrapper of fugue.workflow.workflow.FugueWorkflow.zip().

Parameters
  • dfs (Any) – DataFrames like object

  • how (str) – can accept inner, left_outer, right_outer, full_outer, cross, defaults to inner

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than to_file_threshold), defaults to None

  • to_file_threshold (Any) – file byte size threshold, defaults to -1

  • self (TDF) –

Returns

a zipped dataframe

Return type

WorkflowDataFrame

Note

See also

Read CoTransformer and Zip & Comap for details

class fugue.workflow.workflow.WorkflowDataFrames(*args, **kwargs)[source]#

Bases: DataFrames

Ordered dictionary of WorkflowDataFrames. There are two modes: with keys and without keys. If without key _<n> will be used as the key for each dataframe, and it will be treated as an array in Fugue framework.

It’s immutable, once initialized, you can’t add or remove element from it.

It’s a subclass of DataFrames, but different from DataFrames, in the initialization you should always use WorkflowDataFrame, and they should all come from the same FugueWorkflow.

Examples

dag = FugueWorkflow()
df1 = dag.df([[0]],"a:int").transform(a_transformer)
df2 = dag.df([[0]],"b:int")
dfs1 = WorkflowDataFrames(df1, df2)  # as array
dfs2 = WorkflowDataFrames([df1, df2])  # as array
dfs3 = WorkflowDataFrames(a=df1, b=df2)  # as dict
dfs4 = WorkflowDataFrames(dict(a=df1, b=df2))  # as dict
dfs5 = WorkflowDataFrames(dfs4, c=df2)  # copy and update
dfs5["b"].show()  # how you get element when it's a dict
dfs1[0].show()  # how you get element when it's an array
Parameters
  • args (Any) –

  • kwargs (Any) –

property workflow: FugueWorkflow#

The parent workflow