fugue.execution
fugue.execution.api
- fugue.execution.api.aggregate(df, partition_by=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **agg_kwcols)[source]
Aggregate on dataframe
- Parameters:
df (AnyDataFrame) – the dataframe to aggregate on
partition_by (None | str | List[str]) – partition key(s), defaults to None
agg_kwcols (ColumnExpr) – aggregation expressions
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the aggregated result as a dataframe
- Return type:
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # SELECT MAX(b) AS b FROM df fa.aggregate(df, b=f.max(col("b"))) # SELECT a, MAX(b) AS x FROM df GROUP BY a fa.aggregate(df, "a", x=f.max(col("b")))
- fugue.execution.api.anti_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Left anti-join two dataframes. This is a wrapper of
join()
withhow="anti"
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.assign(df, engine=None, engine_conf=None, as_fugue=False, as_local=False, **columns)[source]
Update existing columns with new values and add new columns
- Parameters:
df (AnyDataFrame) – the dataframe to set columns
columns (Any) – column expressions
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the updated dataframe
- Return type:
AnyDataFrame
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa # assume df has schema: a:int,b:str with fa.engine_context("duckdb"): # add constant column x fa.assign(df, x=1) # change column b to be a constant integer fa.assign(df, b=1) # add new x to be a+b fa.assign(df, x=col("a")+col("b")) # cast column a data type to double fa.assign(df, a=col("a").cast(float))
- fugue.execution.api.broadcast(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Broadcast the dataframe to all workers of a distributed computing backend
- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (AnyExecutionEngine | None) – an engine-like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the broadcasted dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.clear_global_engine()[source]
Remove the global exeuction engine (if set)
- Return type:
None
- fugue.execution.api.cross_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Cross join two dataframes. This is a wrapper of
join()
withhow="cross"
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.distinct(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Equivalent to
SELECT DISTINCT * FROM df
- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the result with distinct rows
- Return type:
AnyDataFrame
- fugue.execution.api.dropna(df, how='any', thresh=None, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Drop NA recods from dataframe
- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (int | None) – int, drops rows that have less than thresh non-null values
subset (List[str] | None) – list of columns to operate on
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
DataFrame with NA records dropped
- Return type:
AnyDataFrame
- fugue.execution.api.engine_context(engine=None, engine_conf=None, infer_by=None)[source]
Make an execution engine and set it as the context engine. This function is thread safe and async safe.
- Parameters:
engine (AnyExecutionEngine) – an engine like object, defaults to None
engine_conf (Any) – the configs for the engine, defaults to None
infer_by (List[Any] | None) – a list of objects to infer the engine, defaults to None
- Return type:
Iterator[ExecutionEngine]
Note
For more details, please read
make_execution_engine()
Examples
import fugue.api as fa with fa.engine_context(spark_session): transform(df, func) # will use spark in this transformation
- fugue.execution.api.fillna(df, value, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
DataFrame with NA records filled
- Return type:
AnyDataFrame
- fugue.execution.api.filter(df, condition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Filter rows by the given condition
- Parameters:
df (AnyDataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the filtered dataframe
- Return type:
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): fa.filter(df, (col("a")>1) & (col("b")=="x")) fa.filter(df, f.coalesce(col("a"),col("b"))>1)
- fugue.execution.api.full_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Full outer join two dataframes. This is a wrapper of
join()
withhow="full_outer"
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.get_context_engine()[source]
Get the execution engine in the current context. Regarding the order of the logic please read
make_execution_engine()
- Return type:
- fugue.execution.api.get_current_conf()[source]
Get the current configs either in the defined engine context or by the global configs (see
register_global_conf()
)- Return type:
- fugue.execution.api.get_current_parallelism()[source]
Get the current parallelism of the current global/context engine. If there is no global/context engine, it creates a temporary engine using
make_execution_engine()
to get its parallelism- Returns:
the size of the parallelism
- Return type:
int
- fugue.execution.api.inner_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Inner join two dataframes. This is a wrapper of
join()
withhow="inner"
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.intersect(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Intersect
df1
anddf2
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to intersect with
distinct (bool) –
true
forINTERSECT
(==INTERSECT DISTINCT
),false
forINTERSECT ALL
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the unioned dataframe
- Return type:
AnyDataFrame
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- fugue.execution.api.join(df1, df2, *dfs, how, on=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Join two dataframes
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
Note
Please read
get_join_schemas()
- fugue.execution.api.left_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Left outer join two dataframes. This is a wrapper of
join()
withhow="left_outer"
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.load(path, format_hint=None, columns=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]
Load dataframe from persistent storage
- Parameters:
path (str | List[str]) – the path to the dataframe
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Any | None) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
an engine compatible dataframe
- Return type:
AnyDataFrame
For more details and examples, read Zip & Comap.
- fugue.execution.api.persist(df, lazy=False, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]
Force materializing and caching the dataframe
- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the persisted dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.repartition(df, partition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Partition the input dataframe using
partition
.- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
partition (PartitionSpec) – how you want to partition the dataframe
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the repartitioned dataframe
- Return type:
AnyDataFrame
Caution
This function is experimental, and may be removed in the future.
- fugue.execution.api.right_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Right outer join two dataframes. This is a wrapper of
join()
withhow="right_outer"
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.run_engine_function(func, engine=None, engine_conf=None, as_fugue=False, as_local=False, infer_by=None)[source]
Run a lambda function based on the engine provided
- Parameters:
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
infer_by (List[Any] | None) – a list of objects to infer the engine, defaults to None
func (Callable[[ExecutionEngine], Any])
- Returns:
None or a Fugue
DataFrame
ifas_fugue
is True, otherwise ifinfer_by
contains any Fugue DataFrame, then return the Fugue DataFrame, otherwise it returns the underlying dataframe usingnative_as_df()
- Return type:
Any
Note
This function is for deveopment use. Users should not need it.
- fugue.execution.api.sample(df, n=None, frac=None, replace=False, seed=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Sample dataframe by number of rows or by fraction
- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (int | None) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (float | None) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (int | None) – seed for randomness, defaults to None
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the sampled dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.save(df, path, format_hint=None, mode='overwrite', partition=None, force_single=False, engine=None, engine_conf=None, **kwargs)[source]
Save dataframe to a persistent storage
- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
path (str) – output path
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Any | None) – how to partition the dataframe before saving, defaults to None
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
- Return type:
None
For more details and examples, read Load & Save.
- fugue.execution.api.select(df, *columns, where=None, having=None, distinct=False, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
The functional interface for SQL select statement
- Parameters:
df (AnyDataFrame) – the dataframe to be operated on
columns (str | ColumnExpr) – column expressions, for strings they will represent the column names
where (ColumnExpr | None) –
WHERE
condition expression, defaults to Nonehaving (ColumnExpr | None) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to Nonedistinct (bool) – whether to return distinct result, defaults to False
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the select result as a dataframe
- Return type:
AnyDataFrame
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, lit, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # select existed and new columns fa.select(df, col("a"),col("b"),lit(1,"another")) fa.select(df, col("a"),(col("b")+lit(1)).alias("x")) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df fa.select( df, f.count_distinct(all_cols()).alias("x")) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x")) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x"), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- fugue.execution.api.semi_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Left semi-join two dataframes. This is a wrapper of
join()
withhow="semi"
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the joined dataframe
- Return type:
AnyDataFrame
- fugue.execution.api.set_global_engine(engine, engine_conf=None)[source]
Make an execution engine and set it as the global execution engine
- Parameters:
engine (AnyExecutionEngine) – an engine like object, must not be None
engine_conf (Any | None) – the configs for the engine, defaults to None
- Return type:
Caution
In general, it is not a good practice to set a global engine. You should consider
engine_context()
instead. The exception is when you iterate in a notebook and cross cells, this could simplify the code.Note
For more details, please read
make_execution_engine()
andset_global()
Examples
import fugue.api as fa fa.set_global_engine(spark_session) transform(df, func) # will use spark in this transformation fa.clear_global_engine() # remove the global setting
- fugue.execution.api.subtract(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
df1 - df2
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to subtract
distinct (bool) –
true
forEXCEPT
(==EXCEPT DISTINCT
),false
forEXCEPT ALL
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the unioned dataframe
- Return type:
AnyDataFrame
Note
Currently, the schema of all datafrmes must be identical, or an exception will be thrown.
- fugue.execution.api.take(df, n, presort, na_position='last', partition=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters:
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition (Any | None) – PartitionSpec to apply the take operation, defaults to None
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
n rows of DataFrame per partition
- Return type:
AnyDataFrame
- fugue.execution.api.union(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]
Join two dataframes
- Parameters:
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to union
distinct (bool) –
true
forUNION
(==UNION DISTINCT
),false
forUNION ALL
engine (AnyExecutionEngine | None) – an engine like object, defaults to None
engine_conf (Any | None) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns:
the unioned dataframe
- Return type:
AnyDataFrame
Note
Currently, the schema of all dataframes must be identical, or an exception will be thrown.
fugue.execution.execution_engine
- class fugue.execution.execution_engine.EngineFacet(execution_engine)[source]
Bases:
FugueEngineBase
The base class for different factes of the execution engines.
- Parameters:
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property conf: ParamDict
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- property execution_engine: ExecutionEngine
the execution engine this sql engine will run on
- property execution_engine_constraint: Type[ExecutionEngine]
This defines the required ExecutionEngine type of this facet
- Returns:
a subtype of
ExecutionEngine
- property log: Logger
Logger of this engine instance
- to_df(df, schema=None)[source]
Convert a data structure to this engine compatible DataFrame
- Parameters:
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Any | None) – Schema like object, defaults to None
df (AnyDataFrame)
- Returns:
engine compatible dataframe
- Return type:
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.execution_engine.ExecutionEngine(conf)[source]
Bases:
FugueEngineBase
The abstract base class for execution engines. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user’s higher level logic.
Please read the ExecutionEngine Tutorial to understand this most important Fugue concept
- Parameters:
conf (Any) – dict-like config, read this to learn Fugue specific options
- aggregate(df, partition_spec, agg_cols)[source]
Aggregate on dataframe
- Parameters:
df (DataFrame) – the dataframe to aggregate on
partition_spec (PartitionSpec | None) – PartitionSpec to specify partition keys
agg_cols (List[ColumnExpr]) – aggregation expressions
- Returns:
the aggregated result as a dataframe
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # SELECT MAX(b) AS b FROM df engine.aggregate( df, partition_spec=None, agg_cols=[f.max(col("b"))]) # SELECT a, MAX(b) AS x FROM df GROUP BY a engine.aggregate( df, partition_spec=PartitionSpec(by=["a"]), agg_cols=[f.max(col("b")).alias("x")])
- as_context()[source]
Set this execution engine as the context engine. This function is thread safe and async safe.
Examples
with engine.as_context(): transform(df, func) # will use engine in this transformation
- Return type:
Iterator[ExecutionEngine]
- assign(df, columns)[source]
Update existing columns with new values and add new columns
- Parameters:
df (DataFrame) – the dataframe to set columns
columns (List[ColumnExpr]) – column expressions
- Returns:
the updated dataframe
- Return type:
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
# assume df has schema: a:int,b:str # add constant column x engine.assign(df, lit(1,"x")) # change column b to be a constant integer engine.assign(df, lit(1,"b")) # add new x to be a+b engine.assign(df, (col("a")+col("b")).alias("x")) # cast column a data type to double engine.assign(df, col("a").cast(float))
- abstract broadcast(df)[source]
Broadcast the dataframe to all workers for a distributed computing framework
- comap(df, map_func, output_schema, partition_spec, on_init=None)[source]
Apply a function to each zipped partition on the zipped dataframe.
- Parameters:
df (DataFrame) – input dataframe, it must be a zipped dataframe (it has to be a dataframe output from
zip()
orzip_all()
)map_func (Callable[[PartitionCursor, DataFrames], LocalDataFrame]) – the function to apply on every zipped partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification for processing the zipped zipped dataframe.
on_init (Callable[[int, DataFrames], Any] | None) – callback function when the physical partition is initializaing, defaults to None
- Returns:
the dataframe after the comap operation
Note
The input of this method must be an output of
zip()
orzip_all()
The
partition_spec
here is NOT related with how you zipped the dataframe and however you set it, will only affect the processing speed, actually the partition keys will be overriden to the zipped dataframe partition keys. You may use it in this way to improve the efficiency:PartitionSpec(algo="even", num="ROWCOUNT")
, this tells the execution engine to put each zipped partition into a physical partition so it can achieve the best possible load balance.If input dataframe has keys, the dataframes you get in
map_func
andon_init
will have keys, otherwise you will get list-like dataframeson_init function will get a DataFrames object that has the same structure, but has all empty dataframes, you can use the schemas but not the data.
See also
For more details and examples, read Zip & Comap
- property conf: ParamDict
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- convert_yield_dataframe(df, as_local)[source]
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters:
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns:
another DataFrame that can be used after this execution engine stops
- Return type:
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- abstract create_default_map_engine()[source]
Default MapEngine if user doesn’t specify
- Return type:
- abstract create_default_sql_engine()[source]
Default SQLEngine if user doesn’t specify
- Return type:
- abstract dropna(df, how='any', thresh=None, subset=None)[source]
Drop NA recods from dataframe
- Parameters:
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (int | None) – int, drops rows that have less than thresh non-null values
subset (List[str] | None) – list of columns to operate on
- Returns:
DataFrame with NA records dropped
- Return type:
- abstract fillna(df, value, subset=None)[source]
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters:
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary
- Returns:
DataFrame with NA records filled
- Return type:
- filter(df, condition)[source]
Filter rows by the given condition
- Parameters:
df (DataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
- Returns:
the filtered dataframe
- Return type:
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f engine.filter(df, (col("a")>1) & (col("b")=="x")) engine.filter(df, f.coalesce(col("a"),col("b"))>1)
- abstract get_current_parallelism()[source]
Get the current number of parallelism of this engine
- Return type:
int
- property in_context: bool
Whether this engine is being used as a context engine
- abstract intersect(df1, df2, distinct=True)[source]
Intersect
df1
anddf2
- Parameters:
- Returns:
the unioned dataframe
- Return type:
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_global: bool
Whether this engine is being used as THE global engine
- abstract join(df1, df2, how, on=None)[source]
Join two dataframes
- Parameters:
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns:
the joined dataframe
- Return type:
Note
Please read
get_join_schemas()
- abstract load_df(path, format_hint=None, columns=None, **kwargs)[source]
Load dataframe from persistent storage
- Parameters:
path (str | List[str]) – the path to the dataframe
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Any | None) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns:
an engine compatible dataframe
- Return type:
For more details and examples, read Zip & Comap.
- property map_engine: MapEngine
The
MapEngine
currently used by this execution engine. You should useset_map_engine()
to set a new MapEngine instance. If not set, the default iscreate_default_map_engine()
- on_enter_context()[source]
The event hook when calling
set_blobal_engine()
orengine_context()
, defaults to no operation- Return type:
None
- on_exit_context()[source]
The event hook when calling
clear_blobal_engine()
or exiting fromengine_context()
, defaults to no operation- Return type:
None
- abstract persist(df, lazy=False, **kwargs)[source]
Force materializing and caching the dataframe
- Parameters:
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns:
the persisted dataframe
- Return type:
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- abstract repartition(df, partition_spec)[source]
Partition the input dataframe using
partition_spec
.- Parameters:
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns:
repartitioned dataframe
- Return type:
Note
Before implementing please read the Partition Tutorial
- abstract sample(df, n=None, frac=None, replace=False, seed=None)[source]
Sample dataframe by number of rows or by fraction
- Parameters:
df (DataFrame) – DataFrame
n (int | None) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (float | None) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (int | None) – seed for randomness, defaults to None
- Returns:
sampled dataframe
- Return type:
- abstract save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]
Save dataframe to a persistent storage
- Parameters:
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type:
None
For more details and examples, read Load & Save.
- select(df, cols, where=None, having=None)[source]
The functional interface for SQL select statement
- Parameters:
df (DataFrame) – the dataframe to be operated on
cols (SelectColumns) – column expressions
where (ColumnExpr | None) –
WHERE
condition expression, defaults to Nonehaving (ColumnExpr | None) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to None
- Returns:
the select result as a dataframe
- Return type:
New Since
0.6.0
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # select existed and new columns engine.select(df, SelectColumns(col("a"),col("b"),lit(1,"another"))) engine.select(df, SelectColumns(col("a"),(col("b")+lit(1)).alias("x"))) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df engine.select( df, SelectColumns(f.count_distinct(all_cols()).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- set_global()[source]
Set this execution engine to be the global execution engine.
Note
Global engine is also considered as a context engine, so
in_context()
will also become true for the global engine.Examples
engine1.set_global(): transform(df, func) # will use engine1 in this transformation with engine2.as_context(): transform(df, func) # will use engine2 transform(df, func) # will use engine1
- Return type:
- set_sql_engine(engine)[source]
Set
SQLEngine
for this execution engine. If not set, the default iscreate_default_sql_engine()
- property sql_engine: SQLEngine
The
SQLEngine
currently used by this execution engine. You should useset_sql_engine()
to set a new SQLEngine instance. If not set, the default iscreate_default_sql_engine()
- stop()[source]
Stop this execution engine, do not override You should customize
stop_engine()
if necessary. This function ensuresstop_engine()
to be called only onceNote
Once the engine is stopped it should not be used again
- Return type:
None
- stop_engine()[source]
Custom logic to stop the execution engine, defaults to no operation
- Return type:
None
- abstract subtract(df1, df2, distinct=True)[source]
df1 - df2
- Parameters:
- Returns:
the unioned dataframe
- Return type:
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- abstract take(df, n, presort, na_position='last', partition_spec=None)[source]
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters:
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (PartitionSpec | None) – PartitionSpec to apply the take operation
- Returns:
n rows of DataFrame per partition
- Return type:
- abstract union(df1, df2, distinct=True)[source]
Join two dataframes
- Parameters:
- Returns:
the unioned dataframe
- Return type:
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- zip(dfs, how='inner', partition_spec=None, temp_path=None, to_file_threshold=-1)[source]
Zip multiple dataframes together with given partition specifications.
- Parameters:
dfs (DataFrames) – DataFrames like object
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition_spec (PartitionSpec | None) – Partition like object, defaults to empty.
temp_path (str | None) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
- Returns:
a zipped dataframe, the metadata of the dataframe will indicated it’s zipped
- Return type:
Note
Different from join, dataframes can have common columns that you will not use as partition keys.
If
by
is not specified it will also use the common columns of all the dataframes (if it’s not a cross zip)For non-cross zip, the dataframes must have common columns, or error will be thrown
Note
Please also read
zip()
If
dfs
is dict like, the zipped dataframe will be dict like, Ifdfs
is list like, the zipped dataframe will be list likeIt’s fine to contain only one dataframe in
dfs
See also
For more details and examples, read Zip & Comap
- class fugue.execution.execution_engine.ExecutionEngineParam(param)[source]
Bases:
AnnotatedParam
- Parameters:
param (Parameter | None)
- class fugue.execution.execution_engine.FugueEngineBase[source]
Bases:
ABC
- abstract property conf: ParamDict
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- abstract property is_distributed: bool
Whether this engine is a distributed engine
- abstract property log: Logger
Logger of this engine instance
- abstract to_df(df, schema=None)[source]
Convert a data structure to this engine compatible DataFrame
- Parameters:
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Any | None) – Schema like object, defaults to None
df (AnyDataFrame)
- Returns:
engine compatible dataframe
- Return type:
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.execution_engine.MapEngine(execution_engine)[source]
Bases:
EngineFacet
The abstract base class for different map operation implementations.
- Parameters:
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- map_bag(bag, map_func, partition_spec, on_init=None)[source]
Apply a function to each partition after you partition the bag in a specified way.
- Parameters:
df – input dataframe
map_func (Callable[[BagPartitionCursor, LocalBag], LocalBag]) – the function to apply on every logical partition
partition_spec (PartitionSpec) – partition specification
on_init (Callable[[int, Bag], Any] | None) – callback function when the physical partition is initializaing, defaults to None
bag (Bag)
- Returns:
the bag after the map operation
- Return type:
- abstract map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters:
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Callable[[int, DataFrame], Any] | None) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (str | None) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns:
the dataframe after the map operation
- Return type:
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- class fugue.execution.execution_engine.SQLEngine(execution_engine)[source]
Bases:
EngineFacet
The abstract base class for different SQL execution implementations. Please read this to understand the concept
- Parameters:
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: str | None
- encode(dfs, statement)[source]
- Parameters:
dfs (DataFrames)
statement (StructuredRawSQL)
- Return type:
Tuple[DataFrames, str]
- load_table(table, **kwargs)[source]
Load table as a dataframe
- Parameters:
table (str) – the table name
kwargs (Any)
- Returns:
an engine compatible dataframe
- Return type:
- save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]
Save the dataframe to a table
- Parameters:
df (DataFrame) – the dataframe to save
table (str) – the table name
mode (str) – can accept
overwrite
,error
, defaults to “overwrite”partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults None
kwargs (Any) – parameters to pass to the underlying framework
- Return type:
None
- abstract select(dfs, statement)[source]
Execute select statement on the sql engine.
- Parameters:
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns:
result of the
SELECT
statement- Return type:
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
fugue.execution.factory
- fugue.execution.factory.is_pandas_or(objs, obj_type)[source]
Check whether the input contains at least one
obj_type
object and the rest are Pandas DataFrames. This function is a utility function for extendinginfer_execution_engine()
- Parameters:
objs (List[Any]) – the list of objects to check
obj_type (Any)
- Returns:
whether all objs are of type
obj_type
or pandas DataFrame and at least one is of typeobj_type
- Return type:
bool
- fugue.execution.factory.make_execution_engine(engine=None, conf=None, infer_by=None, **kwargs)[source]
Create
ExecutionEngine
with specifiedengine
- Parameters:
engine (Any | None) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneconf (Any | None) – Parameters like object, defaults to None
infer_by (List[Any] | None) – List of objects that can be used to infer the execution engine using
infer_execution_engine()
kwargs (Any) – additional parameters to initialize the execution engine
- Returns:
the
ExecutionEngine
instance- Return type:
Note
This function finds/constructs the engine in the following order:
If
engine
is None, it first try to see if there is any defined context engine to use (=> engine)If
engine
is still empty, then it will try to get the global execution engine. Seeset_global()
If
engine
is still empty, then ifinfer_by
is given, it will try to infer the execution engine (=> engine)If
engine
is still empty, then it will construct the default engine defined byregister_default_execution_engine()
(=> engine)Now,
engine
must not be empty, if it is an object other thanExecutionEngine
, we will useparse_execution_engine()
to construct (=> engine)Now,
engine
must have been an ExecutionEngine object. We update its SQL engine if specified, then update its config usingconf
andkwargs
Examples
register_default_execution_engine(lambda conf: E1(conf)) register_execution_engine("e2", lambda conf, **kwargs: E2(conf, **kwargs)) register_sql_engine("s", lambda conf: S2(conf)) # E1 + E1.create_default_sql_engine() make_execution_engine() # E2 + E2.create_default_sql_engine() make_execution_engine(e2) # E1 + S2 make_execution_engine((None, "s")) # E2(conf, a=1, b=2) + S2 make_execution_engine(("e2", "s"), conf, a=1, b=2) # SparkExecutionEngine + SparkSQLEngine make_execution_engine(SparkExecutionEngine) make_execution_engine(SparkExecutionEngine(spark_session, conf)) # SparkExecutionEngine + S2 make_execution_engine((SparkExecutionEngine, "s")) # assume object e2_df can infer E2 engine make_execution_engine(infer_by=[e2_df]) # an E2 engine # global e_global = E1(conf) e_global.set_global() make_execution_engine() # e_global # context with E2(conf).as_context() as ec: make_execution_engine() # ec make_execution_engine() # e_global
- fugue.execution.factory.make_sql_engine(engine=None, execution_engine=None, **kwargs)[source]
Create
SQLEngine
with specifiedengine
- Parameters:
engine (Any | None) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an
SQLEngine
type, or theSQLEngine
instance (you can useNone
to use the default one), defaults to Noneexecution_engine (ExecutionEngine | None) – the
ExecutionEngine
instance to create theSQLEngine
. Normally you should always provide this value.kwargs (Any) – additional parameters to initialize the sql engine
- Returns:
the
SQLEngine
instance- Return type:
Note
For users, you normally don’t need to call this function directly. Use
make_execution_engine
insteadExamples
register_default_sql_engine(lambda conf: S1(conf)) register_sql_engine("s2", lambda conf: S2(conf)) engine = NativeExecutionEngine() # S1(engine) make_sql_engine(None, engine) # S1(engine, a=1) make_sql_engine(None, engine, a=1) # S2(engine) make_sql_engine("s2", engine)
- fugue.execution.factory.register_default_execution_engine(func, on_dup='overwrite')[source]
Register
ExecutionEngine
as the default engine.- Parameters:
func (Callable) – a callable taking Parameters like object and
**kwargs
and returning anExecutionEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type:
None
Examples
# create a new engine with name my (overwrites if existed) register_default_execution_engine(lambda conf: MyExecutionEngine(conf)) # the following examples will use MyExecutionEngine # 0 make_execution_engine() make_execution_engine(None, {"myconfig":"value}) # 1 dag = FugueWorkflow() dag.create([[0]],"a:int").show() dag.run(None, {"myconfig":"value}) # 2 fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run("", {"myconfig":"value})
- fugue.execution.factory.register_default_sql_engine(func, on_dup='overwrite')[source]
Register
SQLEngine
as the default engine- Parameters:
func (Callable) – a callable taking
ExecutionEngine
and**kwargs
and returning aSQLEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting) or “throw” (throw exception), defaults to “overwrite”.
- Raises:
KeyError – if
on_dup
isthrow
and thename
already exists- Return type:
None
Note
You should be careful to use this function, because when you set a custom SQL engine as default, all execution engines you create will use this SQL engine unless you are explicit. For example if you set the default SQL engine to be a Spark specific one, then if you start a NativeExecutionEngine, it will try to use it and will throw exceptions.
So it’s always a better idea to use
register_sql_engine
insteadExamples
# create a new engine with name my (overwrites if existed) register_default_sql_engine(lambda engine: MySQLEngine(engine)) # create NativeExecutionEngine with MySQLEngine as the default make_execution_engine() # create SparkExecutionEngine with MySQLEngine instead of SparkSQLEngine make_execution_engine("spark") # NativeExecutionEngine with MySQLEngine with FugueWorkflow() as dag: dag.create([[0]],"a:int").show() dag.run()
- fugue.execution.factory.register_execution_engine(name_or_type, func, on_dup='overwrite')[source]
Register
ExecutionEngine
with a given name.- Parameters:
name_or_type (str | Type) – alias of the execution engine, or type of an object that can be converted to an execution engine
func (Callable) – a callable taking Parameters like object and
**kwargs
and returning anExecutionEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type:
None
Examples
Alias registration examples:
# create a new engine with name my (overwrites if existed) register_execution_engine("my", lambda conf: MyExecutionEngine(conf)) # 0 make_execution_engine("my") make_execution_engine("my", {"myconfig":"value}) # 1 dag = FugueWorkflow() dag.create([[0]],"a:int").show() dag.run("my", {"myconfig":"value}) # 2 fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run("my")
Type registration examples:
from pyspark.sql import SparkSession from fugue_spark import SparkExecutionEngine from fugue import fsql register_execution_engine( SparkSession, lambda session, conf: SparkExecutionEngine(session, conf)) spark_session = SparkSession.builder.getOrCreate() fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run(spark_session)
- fugue.execution.factory.register_sql_engine(name, func, on_dup='overwrite')[source]
Register
SQLEngine
with a given name.- Parameters:
name (str) – name of the SQL engine
func (Callable) – a callable taking
ExecutionEngine
and**kwargs
and returning aSQLEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type:
None
Examples
# create a new engine with name my (overwrites if existed) register_sql_engine("mysql", lambda engine: MySQLEngine(engine)) # create execution engine with MySQLEngine as the default make_execution_engine(("", "mysql")) # create DaskExecutionEngine with MySQLEngine as the default make_execution_engine(("dask", "mysql")) # default execution engine + MySQLEngine with FugueWorkflow() as dag: dag.create([[0]],"a:int").show() dag.run(("","mysql"))
- fugue.execution.factory.try_get_context_execution_engine()[source]
If the global execution engine is set (see
set_global()
) or the context is set (seeas_context()
), then return the engine, else return None- Return type:
ExecutionEngine | None
fugue.execution.native_execution_engine
- class fugue.execution.native_execution_engine.NativeExecutionEngine(conf=None)[source]
Bases:
ExecutionEngine
The execution engine based on native python and pandas. This execution engine is mainly for prototyping and unit tests.
Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters:
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
- dropna(df, how='any', thresh=None, subset=None)[source]
Drop NA recods from dataframe
- Parameters:
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (int | None) – int, drops rows that have less than thresh non-null values
subset (List[str] | None) – list of columns to operate on
- Returns:
DataFrame with NA records dropped
- Return type:
- fillna(df, value, subset=None)[source]
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters:
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary
- Returns:
DataFrame with NA records filled
- Return type:
- get_current_parallelism()[source]
Get the current number of parallelism of this engine
- Return type:
int
- intersect(df1, df2, distinct=True)[source]
Intersect
df1
anddf2
- Parameters:
- Returns:
the unioned dataframe
- Return type:
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_distributed: bool
Whether this engine is a distributed engine
- join(df1, df2, how, on=None)[source]
Join two dataframes
- Parameters:
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns:
the joined dataframe
- Return type:
Note
Please read
get_join_schemas()
- load_df(path, format_hint=None, columns=None, **kwargs)[source]
Load dataframe from persistent storage
- Parameters:
path (str | List[str]) – the path to the dataframe
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Any | None) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns:
an engine compatible dataframe
- Return type:
For more details and examples, read Zip & Comap.
- property log: Logger
Logger of this engine instance
- persist(df, lazy=False, **kwargs)[source]
Force materializing and caching the dataframe
- Parameters:
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns:
the persisted dataframe
- Return type:
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- property pl_utils: PandasUtils
Pandas-like dataframe utils
- repartition(df, partition_spec)[source]
Partition the input dataframe using
partition_spec
.- Parameters:
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns:
repartitioned dataframe
- Return type:
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]
Sample dataframe by number of rows or by fraction
- Parameters:
df (DataFrame) – DataFrame
n (int | None) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (float | None) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (int | None) – seed for randomness, defaults to None
- Returns:
sampled dataframe
- Return type:
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]
Save dataframe to a persistent storage
- Parameters:
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type:
None
For more details and examples, read Load & Save.
- subtract(df1, df2, distinct=True)[source]
df1 - df2
- Parameters:
- Returns:
the unioned dataframe
- Return type:
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters:
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (PartitionSpec | None) – PartitionSpec to apply the take operation
- Returns:
n rows of DataFrame per partition
- Return type:
- to_df(df, schema=None)[source]
Convert a data structure to this engine compatible DataFrame
- Parameters:
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Any | None) – Schema like object, defaults to None
df (AnyDataFrame)
- Returns:
engine compatible dataframe
- Return type:
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.native_execution_engine.PandasMapEngine(execution_engine)[source]
Bases:
MapEngine
- Parameters:
execution_engine (ExecutionEngine)
- property execution_engine_constraint: Type[ExecutionEngine]
This defines the required ExecutionEngine type of this facet
- Returns:
a subtype of
ExecutionEngine
- property is_distributed: bool
Whether this engine is a distributed engine
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters:
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Callable[[int, DataFrame], Any] | None) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (str | None) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns:
the dataframe after the map operation
- Return type:
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- to_df(df, schema=None)[source]
Convert a data structure to this engine compatible DataFrame
- Parameters:
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Any | None) – Schema like object, defaults to None
df (AnyDataFrame)
- Returns:
engine compatible dataframe
- Return type:
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.native_execution_engine.QPDPandasEngine(execution_engine)[source]
Bases:
SQLEngine
QPD execution implementation.
- Parameters:
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: str | None
- property is_distributed: bool
Whether this engine is a distributed engine
- select(dfs, statement)[source]
Execute select statement on the sql engine.
- Parameters:
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns:
result of the
SELECT
statement- Return type:
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- to_df(df, schema=None)[source]
Convert a data structure to this engine compatible DataFrame
- Parameters:
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Any | None) – Schema like object, defaults to None
df (AnyDataFrame)
- Returns:
engine compatible dataframe
- Return type:
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything