fugue.workflow
fugue.workflow.api
- fugue.workflow.api.out_transform(df, using, params=None, partition=None, callback=None, ignore_errors=None, engine=None, engine_conf=None)[source]
Transform this dataframe using transformer. It’s a wrapper of
out_transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters:
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
params (Any | None) – Parameters like object to run the processor, defaults to None The transformer will be able to access this value from
params()
partition (Any | None) – Partition like object, defaults to None
callback (Any | None) – RPChandler like object, defaults to None
ignore_errors (List[Any] | None) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Any | None) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Any | None) – Parameters like object, defaults to None
- Return type:
None
Note
This function can only take parquet file paths in df. CSV and JSON file formats are disallowed.
This transformation is guaranteed to execute immediately (eager) and return nothing
- fugue.workflow.api.raw_sql(*statements, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Run raw SQL on the execution engine
- Parameters:
statements (Any) – a sequence of sub-statements in string or dataframe-like objects
engine (Any | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether return a local dataframe, defaults to False
- Returns:
the result dataframe
- Return type:
AnyDataFrame
Caution
Currently, only
SELECT
statements are supportedExamples
import pandas as pd import fugue.api as fa with fa.engine_context("duckdb"): a = fa.as_fugue_df([[0,1]], schema="a:long,b:long") b = pd.DataFrame([[0,10]], columns=["a","b"]) c = fa.raw_sql("SELECT * FROM",a,"UNION SELECT * FROM",b) fa.as_pandas(c)
- fugue.workflow.api.transform(df, using, schema=None, params=None, partition=None, callback=None, ignore_errors=None, persist=False, as_local=False, save_path=None, checkpoint=False, engine=None, engine_conf=None, as_fugue=False)[source]
Transform this dataframe using transformer. It’s a wrapper of
transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Also, only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters:
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
schema (Any | None) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Any | None) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
partition (Any | None) – Partition like object, defaults to None
callback (Any | None) – RPChandler like object, defaults to None
ignore_errors (List[Any] | None) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Any | None) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Any | None) – Parameters like object, defaults to None
as_fugue (bool) – If true, the function will always return a
FugueDataFrame
, otherwise, ifdf
is in native dataframe types such as pandas dataframe, then the output will also return in its native format. Defaults to Falsepersist (bool) – Whether to persist(materialize) the dataframe before returning
as_local (bool) – If true, the result will be converted to a
LocalDataFrame
save_path (str | None) – Whether to save the output to a file (see the note)
checkpoint (bool) – Whether to add a checkpoint for the output (see the note)
- Returns:
the transformed dataframe, if
df
is a native dataframe (e.g. pd.DataFrame, spark dataframe, etc), the output will be a native dataframe, the type is determined by the execution engine you use. But ifdf
is of typeDataFrame
, then the output will also be aDataFrame
- Return type:
Any
Note
This function may be lazy and return the transformed dataframe.
Note
When you use callback in this function, you must be careful that the output dataframe must be materialized. Otherwise, if the real compute happens out of the function call, the callback receiver is already shut down. To do that you can either use
persist
oras_local
, both will materialize the dataframe before the callback receiver shuts down.Note
When save_path is None and checkpoint is False, then the output will not be saved into a file. The return will be a dataframe.
When save_path is None and checkpoint is True, then the output is saved into the path set by fugue.workflow.checkpoint.path, the name will be randomly chosen, and it is NOT a deterministic checkpoint, so if you run multiple times, the output will be saved into different files. The return will be a dataframe.
When save_path is not None and checkpoint is False, then the output will be saved into save_path. The return will be the value of save_path
When save_path is not None and checkpoint is True, then the output will be saved into save_path. The return will be the dataframe from save_path
This function can only take parquet file paths in df and save_path. Csv and other file formats are disallowed.
The checkpoint here is NOT deterministic, so re-run will generate new checkpoints.
If you want to read and write other file formats or if you want to use deterministic checkpoints, please use
FugueWorkflow
.
fugue.workflow.input
- fugue.workflow.input.register_raw_df_type(df_type)[source]
TODO: This function is to be removed before 0.9.0
Deprecated since version 0.8.0: Register using
fugue.api.is_df()
instead.- Parameters:
df_type (Type)
- Return type:
None
fugue.workflow.module
fugue.workflow.workflow
- class fugue.workflow.workflow.FugueWorkflow(compile_conf=None)[source]
Bases:
object
Fugue Workflow, also known as the Fugue Programming Interface.
In Fugue, we use DAG to represent workflows, DAG construction and execution are different steps, this class is mainly used in the construction step, so all things you added to the workflow is description and they are not executed until you call
run()
Read this to learn how to initialize it in different ways and pros and cons.
- Parameters:
compile_conf (Any)
- add(task, *args, **kwargs)[source]
This method should not be called directly by users. Use
create()
,process()
,output()
instead- Parameters:
task (FugueTask)
args (Any)
kwargs (Any)
- Return type:
- assert_eq(*dfs, **params)[source]
Compare if these dataframes are equal. It’s for internal, unit test purpose only. It will convert both dataframes to
LocalBoundedDataFrame
, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.- Parameters:
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any)
- Raises:
AssertionError – if not equal
- Return type:
None
- assert_not_eq(*dfs, **params)[source]
Assert if all dataframes are not equal to the first dataframe. It’s for internal, unit test purpose only. It will convert both dataframes to
LocalBoundedDataFrame
, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.- Parameters:
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any)
- Raises:
AssertionError – if any dataframe equals to the first dataframe
- Return type:
None
- create(using, schema=None, params=None)[source]
Run a creator to create a dataframe.
Please read the Creator Tutorial
- Parameters:
using (Any) – creator-like object, if it is a string, then it must be the alias of a registered creator
schema (Any | None) – Schema like object, defaults to None. The creator will be able to access this value from
output_schema()
params (Any | None) – Parameters like object to run the creator, defaults to None. The creator will be able to access this value from
params()
pre_partition – Partition like object, defaults to None. The creator will be able to access this value from
partition_spec()
- Returns:
result dataframe
- Return type:
- create_data(data, schema=None, data_determiner=None)[source]
Create dataframe.
- Parameters:
data (Any) – DataFrame like object or
Yielded
schema (Any | None) – Schema like object, defaults to None
data_determiner (Callable[[Any], Any] | None) – a function to compute unique id from
data
- Returns:
a dataframe of the current workflow
- Return type:
Note
By default, the input
data
does not affect the determinism of the workflow (butschema
andetadata
do), because the amount of compute can be unpredictable. But if you wantdata
to affect the determinism of the workflow, you can provide the function to compute the unique id ofdata
usingdata_determiner
- df(data, schema=None, data_determiner=None)[source]
Create dataframe. Alias of
create_data()
- Parameters:
data (Any) – DataFrame like object or
Yielded
schema (Any | None) – Schema like object, defaults to None
data_determiner (Callable[[Any], str] | None) – a function to compute unique id from
data
- Returns:
a dataframe of the current workflow
- Return type:
Note
By default, the input
data
does not affect the determinism of the workflow (butschema
andetadata
do), because the amount of compute can be unpredictable. But if you wantdata
to affect the determinism of the workflow, you can provide the function to compute the unique id ofdata
usingdata_determiner
- get_result(df)[source]
After
run()
, get the result of a dataframe defined in the dag- Returns:
a calculated dataframe
- Parameters:
df (WorkflowDataFrame)
- Return type:
Examples
dag = FugueWorkflow() df1 = dag.df([[0]],"a:int") dag.run() dag.get_result(df1).show()
- intersect(*dfs, distinct=True)[source]
Intersect dataframes in
dfs
.- Parameters:
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after intersection, default to True
- Returns:
intersected dataframe
- Return type:
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- join(*dfs, how, on=None)[source]
Join dataframes. Read Join tutorials on workflow and engine for details
- Parameters:
dfs (Any) – DataFrames like object
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
- Returns:
joined dataframe
- Return type:
- property last_df: WorkflowDataFrame | None
- load(path, fmt='', columns=None, **kwargs)[source]
Load dataframe from persistent storage. Read this for details.
- Parameters:
path (str) – file path
fmt (str) – format hint can accept
parquet
,csv
,json
, defaults to “”, meaning to infercolumns (Any | None) – list of columns or a Schema like object, defaults to None
kwargs (Any)
- Returns:
dataframe from the file
- Return type:
- out_transform(*dfs, using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]
Transform dataframes using transformer, it materializes the execution immediately and returns nothing
Please read the Transformer Tutorial
- Parameters:
dfs (Any) – DataFrames like object
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer
schema – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Any | None) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then callout_transform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Any | None) – RPChandler like object, defaults to None
- Return type:
None
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- output(*dfs, using, params=None, pre_partition=None)[source]
Run a outputter on dataframes.
Please read the Outputter Tutorial
- Parameters:
using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter
params (Any | None) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. The outputter will be able to access this value from
partition_spec()
dfs (Any)
- Return type:
None
- process(*dfs, using, schema=None, params=None, pre_partition=None)[source]
Run a processor on the dataframes.
Please read the Processor Tutorial
- Parameters:
dfs (Any) – DataFrames like object
using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor
schema (Any | None) – Schema like object, defaults to None. The processor will be able to access this value from
output_schema()
params (Any | None) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. The processor will be able to access this value from
partition_spec()
- Returns:
result dataframe
- Return type:
- run(engine=None, conf=None, **kwargs)[source]
Execute the workflow and compute all dataframes.
Note
For inputs, please read
engine_context()
- Parameters:
engine (Any | None) – object that can be recognized as an engine, defaults to None
conf (Any | None) – engine config, defaults to None
kwargs (Any) – additional parameters to initialize the execution engine
- Returns:
the result set
- Return type:
Examples
dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df2 = dag.df([[0]],"b:int") dag.run(SparkExecutionEngine) df1.result.show() df2.result.show() dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df1.yield_dataframe_as("x") result = dag.run(SparkExecutionEngine) result["x"] # SparkDataFrame
Read this to learn how to run in different ways and pros and cons.
- select(*statements, sql_engine=None, sql_engine_params=None, dialect='spark')[source]
Execute
SELECT
statement usingSQLEngine
- Parameters:
statements (Any) – a list of sub-statements in string or
WorkflowDataFrame
sql_engine (Any | None) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an
SQLEngine
type, or theSQLEngine
instance (you can useNone
to use the default one), defaults to Nonesql_engine_params (Any | None)
dialect (str | None)
- Returns:
result of the
SELECT
statement- Return type:
Examples
with FugueWorkflow() as dag: a = dag.df([[0,"a"]],a:int,b:str) b = dag.df([[0]],a:int) c = dag.select("SELECT a FROM",a,"UNION SELECT * FROM",b) dag.run()
Please read this for more examples
- set_op(how, *dfs, distinct=True)[source]
Union, subtract or intersect dataframes.
- Parameters:
how (str) – can accept
union
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after the set operation, default to True
- Returns:
result dataframe of the set operation
- Return type:
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- show(*dfs, n=10, with_count=False, title=None)[source]
Show the dataframes. See examples.
- Parameters:
dfs (Any) – DataFrames like object
n (int) – max number of rows, defaults to 10
with_count (bool) – whether to show total count, defaults to False
title (str | None) – title to display on top of the dataframe, defaults to None
best_width – max width for the output table, defaults to 100
- Return type:
None
Note
When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need topersist()
the dataframe. Or you can turn on Auto Persist
- subtract(*dfs, distinct=True)[source]
Subtract
dfs[1:]
fromdfs[0]
.- Parameters:
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after subtraction, default to True
- Returns:
subtracted dataframe
- Return type:
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- transform(*dfs, using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]
Transform dataframes using transformer.
Please read the Transformer Tutorial
- Parameters:
dfs (Any) – DataFrames like object
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer
schema (Any | None) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Any | None) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then calltransform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Any | None) – RPChandler like object, defaults to None
- Returns:
the transformed dataframe
- Return type:
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- union(*dfs, distinct=True)[source]
Union dataframes in
dfs
.- Parameters:
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after union, default to True
- Returns:
unioned dataframe
- Return type:
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=-1)[source]
Zip multiple dataframes together with given partition specifications.
- Parameters:
dfs (Any) – DataFrames like object
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition (Any | None) – Partition like object, defaults to None.
temp_path (str | None) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
- Returns:
a zipped dataframe
- Return type:
Note
If
dfs
is dict like, the zipped dataframe will be dict like, Ifdfs
is list like, the zipped dataframe will be list likeIt’s fine to contain only one dataframe in
dfs
See also
Read CoTransformer and Zip & Comap for details
- class fugue.workflow.workflow.FugueWorkflowResult(yields)[source]
Bases:
DataFrames
The result object of
run()
. Users should not construct this object.- Parameters:
DataFrames – yields of the workflow
yields (Dict[str, Yielded])
- property yields: Dict[str, Any]
- class fugue.workflow.workflow.WorkflowDataFrame(workflow, task)[source]
Bases:
DataFrame
It represents the edges in the graph constructed by
FugueWorkflow
. In Fugue, we use DAG to represent workflows, and the edges are strictly dataframes. DAG construction and execution are different steps, this class is used in the construction step. Although it inherits fromDataFrame
, it’s not concerete data. So a lot of the operations are not allowed. If you want to obtain the concrete FugueDataFrame
, usecompute()
to execute the workflow.Normally, you don’t construct it by yourself, you will just use the methods of it.
- Parameters:
workflow (FugueWorkflow) – the parent workflow it belongs to
task (FugueTask) – the task that generates this dataframe
- aggregate(*agg_cols, **kwagg_cols)[source]
Aggregate on dataframe
- Parameters:
df – the dataframe to aggregate on
agg_cols (ColumnExpr) – aggregation expressions
kwagg_cols (ColumnExpr) – aggregation expressions to be renamed to the argument names
self (TDF)
- Returns:
the aggregated result as a dataframe
- Return type:
TDF
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # SELECT MAX(b) AS b FROM df df.aggregate(f.max(col("b"))) # SELECT a, MAX(b) AS x FROM df GROUP BY a df.partition_by("a").aggregate(f.max(col("b")).alias("x")) df.partition_by("a").aggregate(x=f.max(col("b")))
- alter_columns(columns)[source]
Change column types
- Parameters:
columns (Any) – Schema like object
self (TDF)
- Returns:
a new dataframe with the new column types
- Return type:
Note
The output dataframe will not change the order of original schema.
Examples
>>> df.alter_columns("a:int,b;str")
- anti_join(*dfs, on=None)[source]
LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- as_array(columns=None, type_safe=False)[source]
- Raises:
NotImplementedError – don’t call this method
- Parameters:
columns (List[str] | None)
type_safe (bool)
- Return type:
List[Any]
- as_array_iterable(columns=None, type_safe=False)[source]
- Raises:
NotImplementedError – don’t call this method
- Parameters:
columns (List[str] | None)
type_safe (bool)
- Return type:
Iterable[Any]
- assert_eq(*dfs, **params)[source]
Wrapper of
fugue.workflow.workflow.FugueWorkflow.assert_eq()
to compare this dataframe with other dataframes.- Parameters:
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any)
- Raises:
AssertionError – if not equal
- Return type:
None
- assert_not_eq(*dfs, **params)[source]
Wrapper of
fugue.workflow.workflow.FugueWorkflow.assert_not_eq()
to compare this dataframe with other dataframes.- Parameters:
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any)
- Raises:
AssertionError – if any dataframe is equal to the first dataframe
- Return type:
None
- assign(*args, **kwargs)[source]
Update existing columns with new values and add new columns
- Parameters:
df – the dataframe to set columns
args (ColumnExpr) – column expressions
kwargs (Any) – column expressions to be renamed to the argument names, if a value is not ColumnExpr, it will be treated as a literal
self (TDF)
- Returns:
a new dataframe with the updated values
- Return type:
TDF
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue import FugueWorkflow dag = FugueWorkflow() df = dag.df(pandas_df) # add/set 1 as column x df.assign(lit(1,"x")) df.assign(x=1) # add/set x to be a+b df.assign((col("a")+col("b")).alias("x")) df.assign(x=col("a")+col("b")) # cast column a data type to double df.assign(col("a").cast(float)) # cast + new columns df.assign(col("a").cast(float),x=1,y=col("a")+col("b"))
- broadcast()[source]
Broadcast the current dataframe
- Returns:
the broadcasted dataframe
- Return type:
- Parameters:
self (TDF)
- compute(*args, **kwargs)[source]
Trigger the parent workflow to
run()
and to generate and cache the result dataframe this instance represent.Examples
>>> df = FugueWorkflow().df([[0]],"a:int").transform(a_transformer) >>> df.compute().as_pandas() # pandas dataframe >>> df.compute(SparkExecutionEngine).native # spark dataframe
Note
Consider using
fugue.workflow.workflow.FugueWorkflow.run()
instead. Because this method actually triggers the entire workflow to run, so it may be confusing to use this method because extra time may be taken to compute unrelated dataframes.dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df2 = dag.df([[0]],"b:int") dag.run(SparkExecutionEngine) df1.result.show() df2.result.show()
- Return type:
- cross_join(*dfs)[source]
CROSS Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
self (TDF)
- Returns:
joined dataframe
- Return type:
- deterministic_checkpoint(storage_type='file', lazy=False, partition=None, single=False, namespace=None, **kwargs)[source]
Cache the dataframe as a temporary file
- Parameters:
storage_type (str) – can be either
file
ortable
, defaults tofile
lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)
partition (Any | None) – Partition like object, defaults to None.
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – paramteters for the underlying execution engine function
namespace (Any | None) – a value to control determinism, defaults to None.
self (TDF)
- Returns:
the cached dataframe
- Return type:
TDF
Note
The difference vs
strong_checkpoint()
is that this checkpoint is not removed after execution, so it can take effect cross execution if the dependent compute logic is not changed.
- distinct()[source]
Get distinct dataframe. Equivalent to
SELECT DISTINCT * FROM df
- Returns:
dataframe with unique records
- Parameters:
self (TDF)
- Return type:
TDF
- drop(columns, if_exists=False)[source]
Drop columns from the dataframe.
- Parameters:
columns (List[str]) – columns to drop
if_exists (bool) – if setting to True, it will ignore non-existent columns, defaults to False
self (TDF)
- Returns:
the dataframe after dropping columns
- Return type:
- dropna(how='any', thresh=None, subset=None)[source]
Drops records containing NA records
- Parameters:
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (int | None) – int, drops rows that have less than thresh non-null values
subset (List[str] | None) – list of columns to operate on
self (TDF)
- Returns:
dataframe with incomplete records dropped
- Return type:
TDF
- property empty: bool
- Raises:
NotImplementedError – don’t call this method
- fillna(value, subset=None)[source]
Fills NA values with replacement values
- Parameters:
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary
self (TDF)
- Returns:
dataframe with NA records filled
- Return type:
TDF
- filter(condition)[source]
Filter rows by the given condition
- Parameters:
df – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
self (TDF)
- Returns:
a new filtered dataframe
- Return type:
TDF
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f from fugue import FugueWorkflow dag = FugueWorkflow() df = dag.df(pandas_df) df.filter((col("a")>1) & (col("b")=="x")) df.filter(f.coalesce(col("a"),col("b"))>1)
- full_outer_join(*dfs, on=None)[source]
CROSS Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- head(n, columns=None)[source]
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters:
n (int) – number of rows
columns (List[str] | None) – selected columns, defaults to None (all columns)
- Returns:
a local bounded dataframe
- Return type:
- inner_join(*dfs, on=None)[source]
INNER Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- intersect(*dfs, distinct=True)[source]
Intersect this dataframe with
dfs
.- Parameters:
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after intersection, default to True
self (TDF)
- Returns:
intersected dataframe
- Return type:
TDF
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- property is_bounded: bool
- Raises:
NotImplementedError – don’t call this method
- property is_local: bool
- Raises:
NotImplementedError – don’t call this method
- join(*dfs, how, on=None)[source]
Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- left_anti_join(*dfs, on=None)[source]
LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- left_outer_join(*dfs, on=None)[source]
LEFT OUTER Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- left_semi_join(*dfs, on=None)[source]
LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- property name: str
Name of its task spec
- property native: Any
The native object this Dataset class wraps
- native_as_df()[source]
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type:
Any
- property num_partitions: int
- Raises:
NotImplementedError – don’t call this method
- out_transform(using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]
Transform this dataframe using transformer. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.out_transform()
Please read the Transformer Tutorial
- Parameters:
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer
params (Any | None) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then calltransform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Any | None) – RPChandler like object, defaults to None
self (TDF)
- Return type:
None
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- output(using, params=None, pre_partition=None)[source]
Run a outputter on this dataframe. It’s a simple wrapper of
fugue.workflow.workflow.FugueWorkflow.output()
Please read the Outputter Tutorial
- Parameters:
using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter
params (Any | None) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. The outputter will be able to access this value from
partition_spec()
- Return type:
None
- partition(*args, **kwargs)[source]
Partition the current dataframe. Please read the Partition Tutorial
- Parameters:
args (Any) – Partition like object
kwargs (Any) – Partition like object
self (TDF)
- Returns:
dataframe with the partition hint
- Return type:
Note
Normally this step is fast because it’s to add a partition hint for the next step.
- partition_by(*keys, **kwargs)[source]
Partition the current dataframe by keys. Please read the Partition Tutorial. This is a wrapper of
partition()
- Parameters:
keys (str) – partition keys
kwargs (Any) – Partition like object excluding
by
andpartition_by
self (TDF)
- Returns:
dataframe with the partition hint
- Return type:
- property partition_spec: PartitionSpec
The partition spec set on the dataframe for next steps to use
Examples
dag = FugueWorkflow() df = dag.df([[0],[1]], "a:int") assert df.partition_spec.empty df2 = df.partition(by=["a"]) assert df.partition_spec.empty assert df2.partition_spec == PartitionSpec(by=["a"])
- per_partition_by(*keys)[source]
Partition the current dataframe by keys so each physical partition contains only one logical partition. Please read the Partition Tutorial. This is a wrapper of
partition()
- Parameters:
keys (str) – partition keys
self (TDF)
- Returns:
dataframe that is both logically and physically partitioned by
keys
- Return type:
Note
This is a hint but not enforced, certain execution engines will not respect this hint.
- per_row()[source]
Partition the current dataframe to one row per partition. Please read the Partition Tutorial. This is a wrapper of
partition()
- Returns:
dataframe that is evenly partitioned by row count
- Return type:
- Parameters:
self (TDF)
Note
This is a hint but not enforced, certain execution engines will not respect this hint.
- persist()[source]
Persist the current dataframe
- Returns:
the persisted dataframe
- Return type:
- Parameters:
self (TDF)
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.persist
method is considered as weak checkpoint. Sometimes, it may be necessary to use strong checkpint, which ischeckpoint()
- process(using, schema=None, params=None, pre_partition=None)[source]
Run a processor on this dataframe. It’s a simple wrapper of
fugue.workflow.workflow.FugueWorkflow.process()
Please read the Processor Tutorial
- Parameters:
using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor
schema (Any | None) – Schema like object, defaults to None. The processor will be able to access this value from
output_schema()
params (Any | None) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. The processor will be able to access this value from
partition_spec()
self (TDF)
- Returns:
result dataframe
- Return type:
- rename(*args, **kwargs)[source]
Rename the dataframe using a mapping dict
- Parameters:
args (Any) – list of dicts containing rename maps
kwargs (Any) – rename map
self (TDF)
- Returns:
a new dataframe with the new names
- Return type:
Note
This interface is more flexible than
fugue.dataframe.dataframe.DataFrame.rename()
Examples
>>> df.rename({"a": "b"}, c="d", e="f")
- property result: DataFrame
The concrete DataFrame obtained from
compute()
. This property will not trigger compute again, but compute should have been called earlier and the result is cached.
- right_outer_join(*dfs, on=None)[source]
RIGHT OUTER Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- sample(n=None, frac=None, replace=False, seed=None)[source]
Sample dataframe by number of rows or by fraction
- Parameters:
n (int | None) – number of rows to sample, one and only one of
n
andfact
must be setfrac (float | None) – fraction [0,1] to sample, one and only one of
n
andfact
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (int | None) – seed for randomness, defaults to None
self (TDF)
- Returns:
sampled dataframe
- Return type:
TDF
- save(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]
Save this dataframe to a persistent storage
- Parameters:
path (str) – output path
fmt (str) – format hint can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Any | None) – Partition like object, how to partition the dataframe before saving, defaults to empty
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type:
None
For more details and examples, read Save & Load.
- save_and_use(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]
Save this dataframe to a persistent storage and load back to use in the following steps
- Parameters:
path (str) – output path
fmt (str) – format hint can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Any | None) – Partition like object, how to partition the dataframe before saving, defaults to empty
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
self (TDF)
- Return type:
TDF
For more details and examples, read Save & Load.
- select(*columns, where=None, having=None, distinct=False)[source]
The functional interface for SQL select statement
- Parameters:
columns (str | ColumnExpr) – column expressions, for strings they will represent the column names
where (ColumnExpr | None) –
WHERE
condition expression, defaults to Nonehaving (ColumnExpr | None) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to Nonedistinct (bool) – whether to return distinct result, defaults to False
self (TDF)
- Returns:
the select result as a new dataframe
- Return type:
TDF
New Since
0.6.0
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f from fugue import FugueWorkflow dag = FugueWorkflow() df = dag.df(pandas_df) # select existed and new columns df.select("a","b",lit(1,"another"))) df.select("a",(col("b")+lit(1)).alias("x")) # select distinct df.select("a","b",lit(1,"another")),distinct=True) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df df.select(f.count_distinct(all_cols()).alias("x")) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a df.select("a",f.max(col("b")+lit(1)).alias("x")) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 df.select( "a",f.max(col("b")+lit(1)).alias("x"), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- semi_join(*dfs, on=None)[source]
LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters:
dfs (Any) – DataFrames like object
on (Iterable[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF)
- Returns:
joined dataframe
- Return type:
- show(n=10, with_count=False, title=None, best_width=100)[source]
Show the dataframe. See examples.
- Parameters:
n (int) – max number of rows, defaults to 10
with_count (bool) – whether to show total count, defaults to False
title (str | None) – title to display on top of the dataframe, defaults to None
best_width (int) – max width for the output table, defaults to 100
- Return type:
None
Note
When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need topersist()
the dataframe. Or you can turn on tutorial:tutorials/advanced/useful_config:auto persist
- strong_checkpoint(storage_type='file', lazy=False, partition=None, single=False, **kwargs)[source]
Cache the dataframe as a temporary file
- Parameters:
storage_type (str) – can be either
file
ortable
, defaults tofile
lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)
partition (Any | None) – Partition like object, defaults to None.
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – paramteters for the underlying execution engine function
self (TDF)
- Returns:
the cached dataframe
- Return type:
TDF
Note
Strong checkpoint guarantees the output dataframe compute dependency is from the temporary file. Use strong checkpoint only when
weak_checkpoint()
can’t be used.Strong checkpoint file will be removed after the execution of the workflow.
- subtract(*dfs, distinct=True)[source]
Subtract
dfs
from this dataframe.- Parameters:
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after subtraction, default to True
self (TDF)
- Returns:
subtracted dataframe
- Return type:
TDF
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- take(n, presort=None, na_position='last')[source]
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort
- Parameters:
n (int) – number of rows to return
presort (str | None) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
self (TDF)
- Returns:
n rows of DataFrame per partition
- Return type:
TDF
- transform(using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]
Transform this dataframe using transformer. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.transform()
Please read the Transformer Tutorial
- Parameters:
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer
schema (Any | None) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Any | None) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Any | None) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then calltransform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Any | None) – RPChandler like object, defaults to None
self (TDF)
- Returns:
the transformed dataframe
- Return type:
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- union(*dfs, distinct=True)[source]
Union this dataframe with
dfs
.- Parameters:
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after union, default to True
self (TDF)
- Returns:
unioned dataframe
- Return type:
TDF
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- weak_checkpoint(lazy=False, **kwargs)[source]
Cache the dataframe in memory
- Parameters:
lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)
kwargs (Any) – paramteters for the underlying execution engine function
self (TDF)
- Returns:
the cached dataframe
- Return type:
TDF
Note
Weak checkpoint in most cases is the best choice for caching a dataframe to avoid duplicated computation. However it does not guarantee to break up the the compute dependency for this dataframe, so when you have very complicated compute, you may encounter issues such as stack overflow. Also, weak checkpoint normally caches the dataframe in memory, if memory is a concern, then you should consider
strong_checkpoint()
- property workflow: FugueWorkflow
The parent workflow
- yield_dataframe_as(name, as_local=False)[source]
Yield a dataframe that can be accessed without the current execution engine
- Parameters:
name (str) – the name of the yielded dataframe
as_local (bool) – yield the local version of the dataframe
self (TDF)
- Return type:
None
Note
When
as_local
is True, it can trigger an additional compute to do the conversion. To avoid recompute, you should addpersist
before yielding.
- yield_file_as(name)[source]
Cache the dataframe in file
- Parameters:
name (str) – the name of the yielded dataframe
self (TDF)
- Return type:
None
Note
In only the following cases you can yield file/table:
you have not checkpointed (persisted) the dataframe, for example
df.yield_file_as("a")
you have used
deterministic_checkpoint()
, for exampledf.deterministic_checkpoint().yield_file_as("a")
yield is workflow, compile level logic
For the first case, the yield will also be a strong checkpoint so whenever you yield a dataframe as a file, the dataframe has been saved as a file and loaded back as a new dataframe.
- yield_table_as(name)[source]
Cache the dataframe as a table
- Parameters:
name (str) – the name of the yielded dataframe
self (TDF)
- Return type:
None
Note
In only the following cases you can yield file/table:
you have not checkpointed (persisted) the dataframe, for example
df.yield_file_as("a")
you have used
deterministic_checkpoint()
, for exampledf.deterministic_checkpoint().yield_file_as("a")
yield is workflow, compile level logic
For the first case, the yield will also be a strong checkpoint so whenever you yield a dataframe as a file, the dataframe has been saved as a file and loaded back as a new dataframe.
- zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=-1)[source]
Zip this data frame with multiple dataframes together with given partition specifications. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.zip()
.- Parameters:
dfs (Any) – DataFrames like object
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition (Any | None) – Partition like object, defaults to None.
temp_path (str | None) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
self (TDF)
- Returns:
a zipped dataframe
- Return type:
Note
dfs
must be list like, the zipped dataframe will be list likedfs
is fine to be emptyIf you want dict-like zip, use
fugue.workflow.workflow.FugueWorkflow.zip()
See also
Read CoTransformer and Zip & Comap for details
- class fugue.workflow.workflow.WorkflowDataFrames(*args, **kwargs)[source]
Bases:
DataFrames
Ordered dictionary of WorkflowDataFrames. There are two modes: with keys and without keys. If without key
_<n>
will be used as the key for each dataframe, and it will be treated as an array in Fugue framework.It’s immutable, once initialized, you can’t add or remove element from it.
It’s a subclass of
DataFrames
, but different from DataFrames, in the initialization you should always useWorkflowDataFrame
, and they should all come from the sameFugueWorkflow
.Examples
dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df2 = dag.df([[0]],"b:int") dfs1 = WorkflowDataFrames(df1, df2) # as array dfs2 = WorkflowDataFrames([df1, df2]) # as array dfs3 = WorkflowDataFrames(a=df1, b=df2) # as dict dfs4 = WorkflowDataFrames(dict(a=df1, b=df2)) # as dict dfs5 = WorkflowDataFrames(dfs4, c=df2) # copy and update dfs5["b"].show() # how you get element when it's a dict dfs1[0].show() # how you get element when it's an array
- Parameters:
args (Any)
kwargs (Any)
- property workflow: FugueWorkflow
The parent workflow