fugue.execution

fugue.execution.api

fugue.execution.api.aggregate(df, partition_by=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **agg_kwcols)[source]

Aggregate on dataframe

Parameters:
  • df (AnyDataFrame) – the dataframe to aggregate on

  • partition_by (None | str | List[str]) – partition key(s), defaults to None

  • agg_kwcols (ColumnExpr) – aggregation expressions

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the aggregated result as a dataframe

Return type:

AnyDataFrame

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

from fugue.column import col, functions as f
import fugue.api as fa

with fa.engine_context("duckdb"):
    # SELECT MAX(b) AS b FROM df
    fa.aggregate(df, b=f.max(col("b")))

    # SELECT a, MAX(b) AS x FROM df GROUP BY a
    fa.aggregate(df, "a", x=f.max(col("b")))
fugue.execution.api.anti_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Left anti-join two dataframes. This is a wrapper of join() with how="anti"

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

fugue.execution.api.assign(df, engine=None, engine_conf=None, as_fugue=False, as_local=False, **columns)[source]

Update existing columns with new values and add new columns

Parameters:
  • df (AnyDataFrame) – the dataframe to set columns

  • columns (Any) – column expressions

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the updated dataframe

Return type:

AnyDataFrame

Tip

This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

from fugue.column import col, functions as f
import fugue.api as fa

# assume df has schema: a:int,b:str

with fa.engine_context("duckdb"):
    # add constant column x
    fa.assign(df, x=1)

    # change column b to be a constant integer
    fa.assign(df, b=1)

    # add new x to be a+b
    fa.assign(df, x=col("a")+col("b"))

    # cast column a data type to double
    fa.assign(df, a=col("a").cast(float))
fugue.execution.api.broadcast(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Broadcast the dataframe to all workers of a distributed computing backend

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • engine (AnyExecutionEngine | None) – an engine-like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the broadcasted dataframe

Return type:

AnyDataFrame

fugue.execution.api.clear_global_engine()[source]

Remove the global exeuction engine (if set)

Return type:

None

fugue.execution.api.cross_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Cross join two dataframes. This is a wrapper of join() with how="cross"

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

fugue.execution.api.distinct(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Equivalent to SELECT DISTINCT * FROM df

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the result with distinct rows

Return type:

AnyDataFrame

fugue.execution.api.dropna(df, how='any', thresh=None, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Drop NA recods from dataframe

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (int | None) – int, drops rows that have less than thresh non-null values

  • subset (List[str] | None) – list of columns to operate on

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

DataFrame with NA records dropped

Return type:

AnyDataFrame

fugue.execution.api.engine_context(engine=None, engine_conf=None, infer_by=None)[source]

Make an execution engine and set it as the context engine. This function is thread safe and async safe.

Parameters:
  • engine (AnyExecutionEngine) – an engine like object, defaults to None

  • engine_conf (Any) – the configs for the engine, defaults to None

  • infer_by (List[Any] | None) – a list of objects to infer the engine, defaults to None

Return type:

Iterator[ExecutionEngine]

Note

For more details, please read make_execution_engine()

Examples

import fugue.api as fa

with fa.engine_context(spark_session):
    transform(df, func)  # will use spark in this transformation
fugue.execution.api.fillna(df, value, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Fill NULL, NAN, NAT values in a dataframe

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

DataFrame with NA records filled

Return type:

AnyDataFrame

fugue.execution.api.filter(df, condition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Filter rows by the given condition

Parameters:
  • df (AnyDataFrame) – the dataframe to be filtered

  • condition (ColumnExpr) – (boolean) column expression

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the filtered dataframe

Return type:

AnyDataFrame

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

from fugue.column import col, functions as f
import fugue.api as fa

with fa.engine_context("duckdb"):
    fa.filter(df, (col("a")>1) & (col("b")=="x"))
    fa.filter(df, f.coalesce(col("a"),col("b"))>1)
fugue.execution.api.full_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Full outer join two dataframes. This is a wrapper of join() with how="full_outer"

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

fugue.execution.api.get_context_engine()[source]

Get the execution engine in the current context. Regarding the order of the logic please read make_execution_engine()

Return type:

ExecutionEngine

fugue.execution.api.get_current_conf()[source]

Get the current configs either in the defined engine context or by the global configs (see register_global_conf())

Return type:

ParamDict

fugue.execution.api.get_current_parallelism()[source]

Get the current parallelism of the current global/context engine. If there is no global/context engine, it creates a temporary engine using make_execution_engine() to get its parallelism

Returns:

the size of the parallelism

Return type:

int

fugue.execution.api.inner_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Inner join two dataframes. This is a wrapper of join() with how="inner"

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • how – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

fugue.execution.api.intersect(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Intersect df1 and df2

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to intersect with

  • distinct (bool) – true for INTERSECT (== INTERSECT DISTINCT), false for INTERSECT ALL

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the unioned dataframe

Return type:

AnyDataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

fugue.execution.api.join(df1, df2, *dfs, how, on=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Join two dataframes

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

Note

Please read get_join_schemas()

fugue.execution.api.left_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Left outer join two dataframes. This is a wrapper of join() with how="left_outer"

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

fugue.execution.api.load(path, format_hint=None, columns=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]

Load dataframe from persistent storage

Parameters:
  • path (str | List[str]) – the path to the dataframe

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Any | None) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

an engine compatible dataframe

Return type:

AnyDataFrame

For more details and examples, read Zip & Comap.

fugue.execution.api.persist(df, lazy=False, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]

Force materializing and caching the dataframe

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • kwargs (Any) – parameter to pass to the underlying persist implementation

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the persisted dataframe

Return type:

AnyDataFrame

fugue.execution.api.repartition(df, partition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Partition the input dataframe using partition.

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • partition (PartitionSpec) – how you want to partition the dataframe

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the repartitioned dataframe

Return type:

AnyDataFrame

Caution

This function is experimental, and may be removed in the future.

fugue.execution.api.right_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Right outer join two dataframes. This is a wrapper of join() with how="right_outer"

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

fugue.execution.api.run_engine_function(func, engine=None, engine_conf=None, as_fugue=False, as_local=False, infer_by=None)[source]

Run a lambda function based on the engine provided

Parameters:
  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

  • infer_by (List[Any] | None) – a list of objects to infer the engine, defaults to None

  • func (Callable[[ExecutionEngine], Any])

Returns:

None or a Fugue DataFrame if as_fugue is True, otherwise if infer_by contains any Fugue DataFrame, then return the Fugue DataFrame, otherwise it returns the underlying dataframe using native_as_df()

Return type:

Any

Note

This function is for deveopment use. Users should not need it.

fugue.execution.api.sample(df, n=None, frac=None, replace=False, seed=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Sample dataframe by number of rows or by fraction

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • n (int | None) – number of rows to sample, one and only one of n and frac must be set

  • frac (float | None) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (int | None) – seed for randomness, defaults to None

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the sampled dataframe

Return type:

AnyDataFrame

fugue.execution.api.save(df, path, format_hint=None, mode='overwrite', partition=None, force_single=False, engine=None, engine_conf=None, **kwargs)[source]

Save dataframe to a persistent storage

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • path (str) – output path

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition (Any | None) – how to partition the dataframe before saving, defaults to None

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

Return type:

None

For more details and examples, read Load & Save.

fugue.execution.api.select(df, *columns, where=None, having=None, distinct=False, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

The functional interface for SQL select statement

Parameters:
  • df (AnyDataFrame) – the dataframe to be operated on

  • columns (str | ColumnExpr) – column expressions, for strings they will represent the column names

  • where (ColumnExpr | None) – WHERE condition expression, defaults to None

  • having (ColumnExpr | None) – having condition expression, defaults to None. It is used when cols contains aggregation columns, defaults to None

  • distinct (bool) – whether to return distinct result, defaults to False

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the select result as a dataframe

Return type:

AnyDataFrame

Attention

This interface is experimental, it’s subjected to change in new versions.

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

from fugue.column import col, lit, functions as f
import fugue.api as fa

with fa.engine_context("duckdb"):
    # select existed and new columns
    fa.select(df, col("a"),col("b"),lit(1,"another"))
    fa.select(df, col("a"),(col("b")+lit(1)).alias("x"))

    # aggregation
    # SELECT COUNT(DISTINCT *) AS x FROM df
    fa.select(
        df,
        f.count_distinct(all_cols()).alias("x"))

    # SELECT a, MAX(b+1) AS x FROM df GROUP BY a
    fa.select(
        df,
        col("a"),f.max(col("b")+lit(1)).alias("x"))

    # SELECT a, MAX(b+1) AS x FROM df
    #   WHERE b<2 AND a>1
    #   GROUP BY a
    #   HAVING MAX(b+1)>0
    fa.select(
        df,
        col("a"),f.max(col("b")+lit(1)).alias("x"),
        where=(col("b")<2) & (col("a")>1),
        having=f.max(col("b")+lit(1))>0
    )
fugue.execution.api.semi_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Left semi-join two dataframes. This is a wrapper of join() with how="semi"

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to join

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the joined dataframe

Return type:

AnyDataFrame

fugue.execution.api.set_global_engine(engine, engine_conf=None)[source]

Make an execution engine and set it as the global execution engine

Parameters:
  • engine (AnyExecutionEngine) – an engine like object, must not be None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

Return type:

ExecutionEngine

Caution

In general, it is not a good practice to set a global engine. You should consider engine_context() instead. The exception is when you iterate in a notebook and cross cells, this could simplify the code.

Note

For more details, please read make_execution_engine() and set_global()

Examples

import fugue.api as fa

fa.set_global_engine(spark_session)
transform(df, func)  # will use spark in this transformation
fa.clear_global_engine()  # remove the global setting
fugue.execution.api.subtract(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

df1 - df2

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to subtract

  • distinct (bool) – true for EXCEPT (== EXCEPT DISTINCT), false for EXCEPT ALL

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the unioned dataframe

Return type:

AnyDataFrame

Note

Currently, the schema of all datafrmes must be identical, or an exception will be thrown.

fugue.execution.api.take(df, n, presort, na_position='last', partition=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters:
  • df (AnyDataFrame) – an input dataframe that can be recognized by Fugue

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition (Any | None) – PartitionSpec to apply the take operation, defaults to None

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

n rows of DataFrame per partition

Return type:

AnyDataFrame

fugue.execution.api.union(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]

Join two dataframes

Parameters:
  • df1 (AnyDataFrame) – the first dataframe

  • df2 (AnyDataFrame) – the second dataframe

  • dfs (AnyDataFrame) – more dataframes to union

  • distinct (bool) – true for UNION (== UNION DISTINCT), false for UNION ALL

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether to force return a local DataFrame, defaults to False

Returns:

the unioned dataframe

Return type:

AnyDataFrame

Note

Currently, the schema of all dataframes must be identical, or an exception will be thrown.

fugue.execution.execution_engine

class fugue.execution.execution_engine.EngineFacet(execution_engine)[source]

Bases: FugueEngineBase

The base class for different factes of the execution engines.

Parameters:

execution_engine (ExecutionEngine) – the execution engine this sql engine will run on

property conf: ParamDict

All configurations of this engine instance.

Note

It can contain more than you providec, for example in SparkExecutionEngine, the Spark session can bring in more config, they are all accessible using this property.

property execution_engine: ExecutionEngine

the execution engine this sql engine will run on

property execution_engine_constraint: Type[ExecutionEngine]

This defines the required ExecutionEngine type of this facet

Returns:

a subtype of ExecutionEngine

property log: Logger

Logger of this engine instance

to_df(df, schema=None)[source]

Convert a data structure to this engine compatible DataFrame

Parameters:
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Any | None) – Schema like object, defaults to None

  • df (AnyDataFrame)

Returns:

engine compatible dataframe

Return type:

DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

class fugue.execution.execution_engine.ExecutionEngine(conf)[source]

Bases: FugueEngineBase

The abstract base class for execution engines. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user’s higher level logic.

Please read the ExecutionEngine Tutorial to understand this most important Fugue concept

Parameters:

conf (Any) – dict-like config, read this to learn Fugue specific options

aggregate(df, partition_spec, agg_cols)[source]

Aggregate on dataframe

Parameters:
  • df (DataFrame) – the dataframe to aggregate on

  • partition_spec (PartitionSpec | None) – PartitionSpec to specify partition keys

  • agg_cols (List[ColumnExpr]) – aggregation expressions

Returns:

the aggregated result as a dataframe

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

# SELECT MAX(b) AS b FROM df
engine.aggregate(
    df,
    partition_spec=None,
    agg_cols=[f.max(col("b"))])

# SELECT a, MAX(b) AS x FROM df GROUP BY a
engine.aggregate(
    df,
    partition_spec=PartitionSpec(by=["a"]),
    agg_cols=[f.max(col("b")).alias("x")])
as_context()[source]

Set this execution engine as the context engine. This function is thread safe and async safe.

Examples

with engine.as_context():
    transform(df, func)  # will use engine in this transformation
Return type:

Iterator[ExecutionEngine]

assign(df, columns)[source]

Update existing columns with new values and add new columns

Parameters:
  • df (DataFrame) – the dataframe to set columns

  • columns (List[ColumnExpr]) – column expressions

Returns:

the updated dataframe

Return type:

DataFrame

Tip

This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

# assume df has schema: a:int,b:str

# add constant column x
engine.assign(df, lit(1,"x"))

# change column b to be a constant integer
engine.assign(df, lit(1,"b"))

# add new x to be a+b
engine.assign(df, (col("a")+col("b")).alias("x"))

# cast column a data type to double
engine.assign(df, col("a").cast(float))
abstract broadcast(df)[source]

Broadcast the dataframe to all workers for a distributed computing framework

Parameters:

df (DataFrame) – the input dataframe

Returns:

the broadcasted dataframe

Return type:

DataFrame

comap(df, map_func, output_schema, partition_spec, on_init=None)[source]

Apply a function to each zipped partition on the zipped dataframe.

Parameters:
  • df (DataFrame) – input dataframe, it must be a zipped dataframe (it has to be a dataframe output from zip() or zip_all())

  • map_func (Callable[[PartitionCursor, DataFrames], LocalDataFrame]) – the function to apply on every zipped partition

  • output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this

  • partition_spec (PartitionSpec) – partition specification for processing the zipped zipped dataframe.

  • on_init (Callable[[int, DataFrames], Any] | None) – callback function when the physical partition is initializaing, defaults to None

Returns:

the dataframe after the comap operation

Note

  • The input of this method must be an output of zip() or zip_all()

  • The partition_spec here is NOT related with how you zipped the dataframe and however you set it, will only affect the processing speed, actually the partition keys will be overriden to the zipped dataframe partition keys. You may use it in this way to improve the efficiency: PartitionSpec(algo="even", num="ROWCOUNT"), this tells the execution engine to put each zipped partition into a physical partition so it can achieve the best possible load balance.

  • If input dataframe has keys, the dataframes you get in map_func and on_init will have keys, otherwise you will get list-like dataframes

  • on_init function will get a DataFrames object that has the same structure, but has all empty dataframes, you can use the schemas but not the data.

See also

For more details and examples, read Zip & Comap

property conf: ParamDict

All configurations of this engine instance.

Note

It can contain more than you providec, for example in SparkExecutionEngine, the Spark session can bring in more config, they are all accessible using this property.

convert_yield_dataframe(df, as_local)[source]

Convert a yield dataframe to a dataframe that can be used after this execution engine stops.

Parameters:
  • df (DataFrame) – DataFrame

  • as_local (bool) – whether yield a local dataframe

Returns:

another DataFrame that can be used after this execution engine stops

Return type:

DataFrame

Note

By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.

For example, if you custom a spark engine where you start and stop the spark session in this engine’s start_engine() and stop_engine(), then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.

abstract create_default_map_engine()[source]

Default MapEngine if user doesn’t specify

Return type:

MapEngine

abstract create_default_sql_engine()[source]

Default SQLEngine if user doesn’t specify

Return type:

SQLEngine

abstract distinct(df)[source]

Equivalent to SELECT DISTINCT * FROM df

Parameters:

df (DataFrame) – dataframe

Returns:

[description]

Return type:

DataFrame

abstract dropna(df, how='any', thresh=None, subset=None)[source]

Drop NA recods from dataframe

Parameters:
  • df (DataFrame) – DataFrame

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (int | None) – int, drops rows that have less than thresh non-null values

  • subset (List[str] | None) – list of columns to operate on

Returns:

DataFrame with NA records dropped

Return type:

DataFrame

abstract fillna(df, value, subset=None)[source]

Fill NULL, NAN, NAT values in a dataframe

Parameters:
  • df (DataFrame) – DataFrame

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary

Returns:

DataFrame with NA records filled

Return type:

DataFrame

filter(df, condition)[source]

Filter rows by the given condition

Parameters:
  • df (DataFrame) – the dataframe to be filtered

  • condition (ColumnExpr) – (boolean) column expression

Returns:

the filtered dataframe

Return type:

DataFrame

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

engine.filter(df, (col("a")>1) & (col("b")=="x"))
engine.filter(df, f.coalesce(col("a"),col("b"))>1)
abstract get_current_parallelism()[source]

Get the current number of parallelism of this engine

Return type:

int

property in_context: bool

Whether this engine is being used as a context engine

abstract intersect(df1, df2, distinct=True)[source]

Intersect df1 and df2

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for INTERSECT (== INTERSECT DISTINCT), false for INTERSECT ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

property is_global: bool

Whether this engine is being used as THE global engine

abstract join(df1, df2, how, on=None)[source]

Join two dataframes

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

Returns:

the joined dataframe

Return type:

DataFrame

Note

Please read get_join_schemas()

abstract load_df(path, format_hint=None, columns=None, **kwargs)[source]

Load dataframe from persistent storage

Parameters:
  • path (str | List[str]) – the path to the dataframe

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Any | None) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

Returns:

an engine compatible dataframe

Return type:

DataFrame

For more details and examples, read Zip & Comap.

load_yielded(df)[source]

Load yielded dataframe

Parameters:

df (Yielded) – the yielded dataframe

Returns:

an engine compatible dataframe

Return type:

DataFrame

property map_engine: MapEngine

The MapEngine currently used by this execution engine. You should use set_map_engine() to set a new MapEngine instance. If not set, the default is create_default_map_engine()

on_enter_context()[source]

The event hook when calling set_blobal_engine() or engine_context(), defaults to no operation

Return type:

None

on_exit_context()[source]

The event hook when calling clear_blobal_engine() or exiting from engine_context(), defaults to no operation

Return type:

None

abstract persist(df, lazy=False, **kwargs)[source]

Force materializing and caching the dataframe

Parameters:
  • df (DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns:

the persisted dataframe

Return type:

DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

abstract repartition(df, partition_spec)[source]

Partition the input dataframe using partition_spec.

Parameters:
  • df (DataFrame) – input dataframe

  • partition_spec (PartitionSpec) – how you want to partition the dataframe

Returns:

repartitioned dataframe

Return type:

DataFrame

Note

Before implementing please read the Partition Tutorial

abstract sample(df, n=None, frac=None, replace=False, seed=None)[source]

Sample dataframe by number of rows or by fraction

Parameters:
  • df (DataFrame) – DataFrame

  • n (int | None) – number of rows to sample, one and only one of n and frac must be set

  • frac (float | None) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (int | None) – seed for randomness, defaults to None

Returns:

sampled dataframe

Return type:

DataFrame

abstract save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]

Save dataframe to a persistent storage

Parameters:
  • df (DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type:

None

For more details and examples, read Load & Save.

select(df, cols, where=None, having=None)[source]

The functional interface for SQL select statement

Parameters:
  • df (DataFrame) – the dataframe to be operated on

  • cols (SelectColumns) – column expressions

  • where (ColumnExpr | None) – WHERE condition expression, defaults to None

  • having (ColumnExpr | None) – having condition expression, defaults to None. It is used when cols contains aggregation columns, defaults to None

Returns:

the select result as a dataframe

Return type:

DataFrame

New Since

0.6.0

Attention

This interface is experimental, it’s subjected to change in new versions.

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

# select existed and new columns
engine.select(df, SelectColumns(col("a"),col("b"),lit(1,"another")))
engine.select(df, SelectColumns(col("a"),(col("b")+lit(1)).alias("x")))

# aggregation
# SELECT COUNT(DISTINCT *) AS x FROM df
engine.select(
    df,
    SelectColumns(f.count_distinct(all_cols()).alias("x")))

# SELECT a, MAX(b+1) AS x FROM df GROUP BY a
engine.select(
    df,
    SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")))

# SELECT a, MAX(b+1) AS x FROM df
#   WHERE b<2 AND a>1
#   GROUP BY a
#   HAVING MAX(b+1)>0
engine.select(
    df,
    SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")),
    where=(col("b")<2) & (col("a")>1),
    having=f.max(col("b")+lit(1))>0
)
set_global()[source]

Set this execution engine to be the global execution engine.

Note

Global engine is also considered as a context engine, so in_context() will also become true for the global engine.

Examples

engine1.set_global():
transform(df, func)  # will use engine1 in this transformation

with engine2.as_context():
    transform(df, func)  # will use engine2

transform(df, func)  # will use engine1
Return type:

ExecutionEngine

set_sql_engine(engine)[source]

Set SQLEngine for this execution engine. If not set, the default is create_default_sql_engine()

Parameters:

engine (SQLEngine) – SQLEngine instance

Return type:

None

property sql_engine: SQLEngine

The SQLEngine currently used by this execution engine. You should use set_sql_engine() to set a new SQLEngine instance. If not set, the default is create_default_sql_engine()

stop()[source]

Stop this execution engine, do not override You should customize stop_engine() if necessary. This function ensures stop_engine() to be called only once

Note

Once the engine is stopped it should not be used again

Return type:

None

stop_engine()[source]

Custom logic to stop the execution engine, defaults to no operation

Return type:

None

abstract subtract(df1, df2, distinct=True)[source]

df1 - df2

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for EXCEPT (== EXCEPT DISTINCT), false for EXCEPT ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

abstract take(df, n, presort, na_position='last', partition_spec=None)[source]

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters:
  • df (DataFrame) – DataFrame

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition_spec (PartitionSpec | None) – PartitionSpec to apply the take operation

Returns:

n rows of DataFrame per partition

Return type:

DataFrame

abstract union(df1, df2, distinct=True)[source]

Join two dataframes

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for UNION (== UNION DISTINCT), false for UNION ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

zip(dfs, how='inner', partition_spec=None, temp_path=None, to_file_threshold=-1)[source]

Zip multiple dataframes together with given partition specifications.

Parameters:
  • dfs (DataFrames) – DataFrames like object

  • how (str) – can accept inner, left_outer, right_outer, full_outer, cross, defaults to inner

  • partition_spec (PartitionSpec | None) – Partition like object, defaults to empty.

  • temp_path (str | None) – file path to store the data (used only if the serialized data is larger than to_file_threshold), defaults to None

  • to_file_threshold (Any) – file byte size threshold, defaults to -1

Returns:

a zipped dataframe, the metadata of the dataframe will indicated it’s zipped

Return type:

DataFrame

Note

  • Different from join, dataframes can have common columns that you will not use as partition keys.

  • If by is not specified it will also use the common columns of all the dataframes (if it’s not a cross zip)

  • For non-cross zip, the dataframes must have common columns, or error will be thrown

Note

  • Please also read zip()

  • If dfs is dict like, the zipped dataframe will be dict like, If dfs is list like, the zipped dataframe will be list like

  • It’s fine to contain only one dataframe in dfs

See also

For more details and examples, read Zip & Comap

class fugue.execution.execution_engine.ExecutionEngineParam(param)[source]

Bases: AnnotatedParam

Parameters:

param (Parameter | None)

to_input(engine)[source]
Parameters:

engine (Any)

Return type:

Any

class fugue.execution.execution_engine.FugueEngineBase[source]

Bases: ABC

abstract property conf: ParamDict

All configurations of this engine instance.

Note

It can contain more than you providec, for example in SparkExecutionEngine, the Spark session can bring in more config, they are all accessible using this property.

abstract property is_distributed: bool

Whether this engine is a distributed engine

abstract property log: Logger

Logger of this engine instance

abstract to_df(df, schema=None)[source]

Convert a data structure to this engine compatible DataFrame

Parameters:
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Any | None) – Schema like object, defaults to None

  • df (AnyDataFrame)

Returns:

engine compatible dataframe

Return type:

DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

class fugue.execution.execution_engine.MapEngine(execution_engine)[source]

Bases: EngineFacet

The abstract base class for different map operation implementations.

Parameters:

execution_engine (ExecutionEngine) – the execution engine this sql engine will run on

map_bag(bag, map_func, partition_spec, on_init=None)[source]

Apply a function to each partition after you partition the bag in a specified way.

Parameters:
  • df – input dataframe

  • map_func (Callable[[BagPartitionCursor, LocalBag], LocalBag]) – the function to apply on every logical partition

  • partition_spec (PartitionSpec) – partition specification

  • on_init (Callable[[int, Bag], Any] | None) – callback function when the physical partition is initializaing, defaults to None

  • bag (Bag)

Returns:

the bag after the map operation

Return type:

Bag

abstract map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]

Apply a function to each partition after you partition the dataframe in a specified way.

Parameters:
  • df (DataFrame) – input dataframe

  • map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition

  • output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this

  • partition_spec (PartitionSpec) – partition specification

  • on_init (Callable[[int, DataFrame], Any] | None) – callback function when the physical partition is initializaing, defaults to None

  • map_func_format_hint (str | None) – the preferred data format for map_func, it can be pandas, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.

Returns:

the dataframe after the map operation

Return type:

DataFrame

Note

Before implementing, you must read this to understand what map is used for and how it should work.

class fugue.execution.execution_engine.SQLEngine(execution_engine)[source]

Bases: EngineFacet

The abstract base class for different SQL execution implementations. Please read this to understand the concept

Parameters:

execution_engine (ExecutionEngine) – the execution engine this sql engine will run on

property dialect: str | None
encode(dfs, statement)[source]
Parameters:
Return type:

Tuple[DataFrames, str]

encode_name(name)[source]
Parameters:

name (str)

Return type:

str

load_table(table, **kwargs)[source]

Load table as a dataframe

Parameters:
  • table (str) – the table name

  • kwargs (Any)

Returns:

an engine compatible dataframe

Return type:

DataFrame

save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]

Save the dataframe to a table

Parameters:
  • df (DataFrame) – the dataframe to save

  • table (str) – the table name

  • mode (str) – can accept overwrite, error, defaults to “overwrite”

  • partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults None

  • kwargs (Any) – parameters to pass to the underlying framework

Return type:

None

abstract select(dfs, statement)[source]

Execute select statement on the sql engine.

Parameters:
  • dfs (DataFrames) – a collection of dataframes that must have keys

  • statement (StructuredRawSQL) – the SELECT statement using the dfs keys as tables.

Returns:

result of the SELECT statement

Return type:

DataFrame

Examples

dfs = DataFrames(a=df1, b=df2)
sql_engine.select(
    dfs,
    [(False, "SELECT * FROM "),
     (True,"a"),
     (False," UNION SELECT * FROM "),
     (True,"b")])

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
table_exists(table)[source]

Whether the table exists

Parameters:

table (str) – the table name

Returns:

whether the table exists

Return type:

bool

fugue.execution.factory

fugue.execution.factory.is_pandas_or(objs, obj_type)[source]

Check whether the input contains at least one obj_type object and the rest are Pandas DataFrames. This function is a utility function for extending infer_execution_engine()

Parameters:
  • objs (List[Any]) – the list of objects to check

  • obj_type (Any)

Returns:

whether all objs are of type obj_type or pandas DataFrame and at least one is of type obj_type

Return type:

bool

fugue.execution.factory.make_execution_engine(engine=None, conf=None, infer_by=None, **kwargs)[source]

Create ExecutionEngine with specified engine

Parameters:
  • engine (Any | None) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an ExecutionEngine type, or the ExecutionEngine instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can use None for either of them to use the default one), defaults to None

  • conf (Any | None) – Parameters like object, defaults to None

  • infer_by (List[Any] | None) – List of objects that can be used to infer the execution engine using infer_execution_engine()

  • kwargs (Any) – additional parameters to initialize the execution engine

Returns:

the ExecutionEngine instance

Return type:

ExecutionEngine

Note

This function finds/constructs the engine in the following order:

  • If engine is None, it first try to see if there is any defined context engine to use (=> engine)

  • If engine is still empty, then it will try to get the global execution engine. See set_global()

  • If engine is still empty, then if infer_by is given, it will try to infer the execution engine (=> engine)

  • If engine is still empty, then it will construct the default engine defined by register_default_execution_engine() (=> engine)

  • Now, engine must not be empty, if it is an object other than ExecutionEngine, we will use parse_execution_engine() to construct (=> engine)

  • Now, engine must have been an ExecutionEngine object. We update its SQL engine if specified, then update its config using conf and kwargs

Examples

register_default_execution_engine(lambda conf: E1(conf))
register_execution_engine("e2", lambda conf, **kwargs: E2(conf, **kwargs))

register_sql_engine("s", lambda conf: S2(conf))

# E1 + E1.create_default_sql_engine()
make_execution_engine()

# E2 + E2.create_default_sql_engine()
make_execution_engine(e2)

# E1 + S2
make_execution_engine((None, "s"))

# E2(conf, a=1, b=2) + S2
make_execution_engine(("e2", "s"), conf, a=1, b=2)

# SparkExecutionEngine + SparkSQLEngine
make_execution_engine(SparkExecutionEngine)
make_execution_engine(SparkExecutionEngine(spark_session, conf))

# SparkExecutionEngine + S2
make_execution_engine((SparkExecutionEngine, "s"))

# assume object e2_df can infer E2 engine
make_execution_engine(infer_by=[e2_df])  # an E2 engine

# global
e_global = E1(conf)
e_global.set_global()
make_execution_engine()  # e_global

# context
with E2(conf).as_context() as ec:
    make_execution_engine()  # ec
make_execution_engine()  # e_global
fugue.execution.factory.make_sql_engine(engine=None, execution_engine=None, **kwargs)[source]

Create SQLEngine with specified engine

Parameters:
  • engine (Any | None) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an SQLEngine type, or the SQLEngine instance (you can use None to use the default one), defaults to None

  • execution_engine (ExecutionEngine | None) – the ExecutionEngine instance to create the SQLEngine. Normally you should always provide this value.

  • kwargs (Any) – additional parameters to initialize the sql engine

Returns:

the SQLEngine instance

Return type:

SQLEngine

Note

For users, you normally don’t need to call this function directly. Use make_execution_engine instead

Examples

register_default_sql_engine(lambda conf: S1(conf))
register_sql_engine("s2", lambda conf: S2(conf))

engine = NativeExecutionEngine()

# S1(engine)
make_sql_engine(None, engine)

# S1(engine, a=1)
make_sql_engine(None, engine, a=1)

# S2(engine)
make_sql_engine("s2", engine)
fugue.execution.factory.register_default_execution_engine(func, on_dup='overwrite')[source]

Register ExecutionEngine as the default engine.

Parameters:
  • func (Callable) – a callable taking Parameters like object and **kwargs and returning an ExecutionEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.

Return type:

None

Examples

# create a new engine with name my (overwrites if existed)
register_default_execution_engine(lambda conf: MyExecutionEngine(conf))

# the following examples will use MyExecutionEngine

# 0
make_execution_engine()
make_execution_engine(None, {"myconfig":"value})

# 1
dag = FugueWorkflow()
dag.create([[0]],"a:int").show()
dag.run(None, {"myconfig":"value})

# 2
fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run("", {"myconfig":"value})
fugue.execution.factory.register_default_sql_engine(func, on_dup='overwrite')[source]

Register SQLEngine as the default engine

Parameters:
  • func (Callable) – a callable taking ExecutionEngine and **kwargs and returning a SQLEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting) or “throw” (throw exception), defaults to “overwrite”.

Raises:

KeyError – if on_dup is throw and the name already exists

Return type:

None

Note

You should be careful to use this function, because when you set a custom SQL engine as default, all execution engines you create will use this SQL engine unless you are explicit. For example if you set the default SQL engine to be a Spark specific one, then if you start a NativeExecutionEngine, it will try to use it and will throw exceptions.

So it’s always a better idea to use register_sql_engine instead

Examples

# create a new engine with name my (overwrites if existed)
register_default_sql_engine(lambda engine: MySQLEngine(engine))

# create NativeExecutionEngine with MySQLEngine as the default
make_execution_engine()

# create SparkExecutionEngine with MySQLEngine instead of SparkSQLEngine
make_execution_engine("spark")

# NativeExecutionEngine with MySQLEngine
with FugueWorkflow() as dag:
    dag.create([[0]],"a:int").show()
dag.run()
fugue.execution.factory.register_execution_engine(name_or_type, func, on_dup='overwrite')[source]

Register ExecutionEngine with a given name.

Parameters:
  • name_or_type (str | Type) – alias of the execution engine, or type of an object that can be converted to an execution engine

  • func (Callable) – a callable taking Parameters like object and **kwargs and returning an ExecutionEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.

Return type:

None

Examples

Alias registration examples:

# create a new engine with name my (overwrites if existed)
register_execution_engine("my", lambda conf: MyExecutionEngine(conf))

# 0
make_execution_engine("my")
make_execution_engine("my", {"myconfig":"value})

# 1
dag = FugueWorkflow()
dag.create([[0]],"a:int").show()
dag.run("my", {"myconfig":"value})

# 2
fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run("my")

Type registration examples:

from pyspark.sql import SparkSession
from fugue_spark import SparkExecutionEngine
from fugue import fsql

register_execution_engine(
    SparkSession,
    lambda session, conf: SparkExecutionEngine(session, conf))

spark_session = SparkSession.builder.getOrCreate()

fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run(spark_session)
fugue.execution.factory.register_sql_engine(name, func, on_dup='overwrite')[source]

Register SQLEngine with a given name.

Parameters:
  • name (str) – name of the SQL engine

  • func (Callable) – a callable taking ExecutionEngine and **kwargs and returning a SQLEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.

Return type:

None

Examples

# create a new engine with name my (overwrites if existed)
register_sql_engine("mysql", lambda engine: MySQLEngine(engine))

# create execution engine with MySQLEngine as the default
make_execution_engine(("", "mysql"))

# create DaskExecutionEngine with MySQLEngine as the default
make_execution_engine(("dask", "mysql"))

# default execution engine + MySQLEngine
with FugueWorkflow() as dag:
    dag.create([[0]],"a:int").show()
dag.run(("","mysql"))
fugue.execution.factory.try_get_context_execution_engine()[source]

If the global execution engine is set (see set_global()) or the context is set (see as_context()), then return the engine, else return None

Return type:

ExecutionEngine | None

fugue.execution.native_execution_engine

class fugue.execution.native_execution_engine.NativeExecutionEngine(conf=None)[source]

Bases: ExecutionEngine

The execution engine based on native python and pandas. This execution engine is mainly for prototyping and unit tests.

Please read the ExecutionEngine Tutorial to understand this important Fugue concept

Parameters:

conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options

broadcast(df)[source]

Broadcast the dataframe to all workers for a distributed computing framework

Parameters:

df (DataFrame) – the input dataframe

Returns:

the broadcasted dataframe

Return type:

DataFrame

create_default_map_engine()[source]

Default MapEngine if user doesn’t specify

Return type:

MapEngine

create_default_sql_engine()[source]

Default SQLEngine if user doesn’t specify

Return type:

SQLEngine

distinct(df)[source]

Equivalent to SELECT DISTINCT * FROM df

Parameters:

df (DataFrame) – dataframe

Returns:

[description]

Return type:

DataFrame

dropna(df, how='any', thresh=None, subset=None)[source]

Drop NA recods from dataframe

Parameters:
  • df (DataFrame) – DataFrame

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (int | None) – int, drops rows that have less than thresh non-null values

  • subset (List[str] | None) – list of columns to operate on

Returns:

DataFrame with NA records dropped

Return type:

DataFrame

fillna(df, value, subset=None)[source]

Fill NULL, NAN, NAT values in a dataframe

Parameters:
  • df (DataFrame) – DataFrame

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary

Returns:

DataFrame with NA records filled

Return type:

DataFrame

get_current_parallelism()[source]

Get the current number of parallelism of this engine

Return type:

int

intersect(df1, df2, distinct=True)[source]

Intersect df1 and df2

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for INTERSECT (== INTERSECT DISTINCT), false for INTERSECT ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

property is_distributed: bool

Whether this engine is a distributed engine

join(df1, df2, how, on=None)[source]

Join two dataframes

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

Returns:

the joined dataframe

Return type:

DataFrame

Note

Please read get_join_schemas()

load_df(path, format_hint=None, columns=None, **kwargs)[source]

Load dataframe from persistent storage

Parameters:
  • path (str | List[str]) – the path to the dataframe

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Any | None) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

Returns:

an engine compatible dataframe

Return type:

LocalBoundedDataFrame

For more details and examples, read Zip & Comap.

property log: Logger

Logger of this engine instance

persist(df, lazy=False, **kwargs)[source]

Force materializing and caching the dataframe

Parameters:
  • df (DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns:

the persisted dataframe

Return type:

DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

property pl_utils: PandasUtils

Pandas-like dataframe utils

repartition(df, partition_spec)[source]

Partition the input dataframe using partition_spec.

Parameters:
  • df (DataFrame) – input dataframe

  • partition_spec (PartitionSpec) – how you want to partition the dataframe

Returns:

repartitioned dataframe

Return type:

DataFrame

Note

Before implementing please read the Partition Tutorial

sample(df, n=None, frac=None, replace=False, seed=None)[source]

Sample dataframe by number of rows or by fraction

Parameters:
  • df (DataFrame) – DataFrame

  • n (int | None) – number of rows to sample, one and only one of n and frac must be set

  • frac (float | None) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (int | None) – seed for randomness, defaults to None

Returns:

sampled dataframe

Return type:

DataFrame

save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]

Save dataframe to a persistent storage

Parameters:
  • df (DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type:

None

For more details and examples, read Load & Save.

subtract(df1, df2, distinct=True)[source]

df1 - df2

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for EXCEPT (== EXCEPT DISTINCT), false for EXCEPT ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

take(df, n, presort, na_position='last', partition_spec=None)[source]

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters:
  • df (DataFrame) – DataFrame

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition_spec (PartitionSpec | None) – PartitionSpec to apply the take operation

Returns:

n rows of DataFrame per partition

Return type:

DataFrame

to_df(df, schema=None)[source]

Convert a data structure to this engine compatible DataFrame

Parameters:
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Any | None) – Schema like object, defaults to None

  • df (AnyDataFrame)

Returns:

engine compatible dataframe

Return type:

LocalBoundedDataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

union(df1, df2, distinct=True)[source]

Join two dataframes

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for UNION (== UNION DISTINCT), false for UNION ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

class fugue.execution.native_execution_engine.PandasMapEngine(execution_engine)[source]

Bases: MapEngine

Parameters:

execution_engine (ExecutionEngine)

property execution_engine_constraint: Type[ExecutionEngine]

This defines the required ExecutionEngine type of this facet

Returns:

a subtype of ExecutionEngine

property is_distributed: bool

Whether this engine is a distributed engine

map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]

Apply a function to each partition after you partition the dataframe in a specified way.

Parameters:
  • df (DataFrame) – input dataframe

  • map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition

  • output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this

  • partition_spec (PartitionSpec) – partition specification

  • on_init (Callable[[int, DataFrame], Any] | None) – callback function when the physical partition is initializaing, defaults to None

  • map_func_format_hint (str | None) – the preferred data format for map_func, it can be pandas, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.

Returns:

the dataframe after the map operation

Return type:

DataFrame

Note

Before implementing, you must read this to understand what map is used for and how it should work.

to_df(df, schema=None)[source]

Convert a data structure to this engine compatible DataFrame

Parameters:
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Any | None) – Schema like object, defaults to None

  • df (AnyDataFrame)

Returns:

engine compatible dataframe

Return type:

DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

class fugue.execution.native_execution_engine.QPDPandasEngine(execution_engine)[source]

Bases: SQLEngine

QPD execution implementation.

Parameters:

execution_engine (ExecutionEngine) – the execution engine this sql engine will run on

property dialect: str | None
property is_distributed: bool

Whether this engine is a distributed engine

select(dfs, statement)[source]

Execute select statement on the sql engine.

Parameters:
  • dfs (DataFrames) – a collection of dataframes that must have keys

  • statement (StructuredRawSQL) – the SELECT statement using the dfs keys as tables.

Returns:

result of the SELECT statement

Return type:

DataFrame

Examples

dfs = DataFrames(a=df1, b=df2)
sql_engine.select(
    dfs,
    [(False, "SELECT * FROM "),
     (True,"a"),
     (False," UNION SELECT * FROM "),
     (True,"b")])

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
to_df(df, schema=None)[source]

Convert a data structure to this engine compatible DataFrame

Parameters:
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Any | None) – Schema like object, defaults to None

  • df (AnyDataFrame)

Returns:

engine compatible dataframe

Return type:

DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything