fugue.execution#
fugue.execution.api#
- fugue.execution.api.aggregate(df, partition_by=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **agg_kwcols)[source]#
Aggregate on dataframe
- Parameters
df (AnyDataFrame) – the dataframe to aggregate on
partition_by (Union[None, str, List[str]]) – partition key(s), defaults to None
agg_kwcols (ColumnExpr) – aggregation expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the aggregated result as a dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # SELECT MAX(b) AS b FROM df fa.aggregate(df, b=f.max(col("b"))) # SELECT a, MAX(b) AS x FROM df GROUP BY a fa.aggregate(df, "a", x=f.max(col("b")))
- fugue.execution.api.anti_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left anti-join two dataframes. This is a wrapper of
join()
withhow="anti"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.assign(df, engine=None, engine_conf=None, as_fugue=False, as_local=False, **columns)[source]#
Update existing columns with new values and add new columns
- Parameters
df (AnyDataFrame) – the dataframe to set columns
columns (Any) – column expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the updated dataframe
- Return type
AnyDataFrame
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa # assume df has schema: a:int,b:str with fa.engine_context("duckdb"): # add constant column x fa.assign(df, x=1) # change column b to be a constant integer fa.assign(df, b=1) # add new x to be a+b fa.assign(df, x=col("a")+col("b")) # cast column a data type to double fa.assign(df, a=col("a").cast(float))
- fugue.execution.api.broadcast(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Broadcast the dataframe to all workers of a distributed computing backend
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine-like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the broadcasted dataframe
- Return type
AnyDataFrame
- fugue.execution.api.clear_global_engine()[source]#
Remove the global exeuction engine (if set)
- Return type
None
- fugue.execution.api.cross_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Cross join two dataframes. This is a wrapper of
join()
withhow="cross"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.distinct(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Equivalent to
SELECT DISTINCT * FROM df
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the result with distinct rows
- Return type
AnyDataFrame
- fugue.execution.api.dropna(df, how='any', thresh=None, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Drop NA recods from dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records dropped
- Return type
AnyDataFrame
- fugue.execution.api.engine_context(engine=None, engine_conf=None, infer_by=None)[source]#
Make an execution engine and set it as the context engine. This function is thread safe and async safe.
- Parameters
engine (AnyExecutionEngine) – an engine like object, defaults to None
engine_conf (Any) – the configs for the engine, defaults to None
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
- Return type
Iterator[ExecutionEngine]
Note
For more details, please read
make_execution_engine()
Examples
import fugue.api as fa with fa.engine_context(spark_session): transform(df, func) # will use spark in this transformation
- fugue.execution.api.fillna(df, value, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records filled
- Return type
AnyDataFrame
- fugue.execution.api.filter(df, condition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Filter rows by the given condition
- Parameters
df (AnyDataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the filtered dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): fa.filter(df, (col("a")>1) & (col("b")=="x")) fa.filter(df, f.coalesce(col("a"),col("b"))>1)
- fugue.execution.api.full_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Full outer join two dataframes. This is a wrapper of
join()
withhow="full_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.get_context_engine()[source]#
Get the execution engine in the current context. Regarding the order of the logic please read
make_execution_engine()
- Return type
- fugue.execution.api.get_current_conf()[source]#
Get the current configs either in the defined engine context or by the global configs (see
register_global_conf()
)- Return type
- fugue.execution.api.get_current_parallelism()[source]#
Get the current parallelism of the current global/context engine. If there is no global/context engine, it creates a temporary engine using
make_execution_engine()
to get its parallelism- Returns
the size of the parallelism
- Return type
int
- fugue.execution.api.inner_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Inner join two dataframes. This is a wrapper of
join()
withhow="inner"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.intersect(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Intersect
df1
anddf2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to intersect with
distinct (bool) –
true
forINTERSECT
(==INTERSECT DISTINCT
),false
forINTERSECT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- fugue.execution.api.join(df1, df2, *dfs, how, on=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
Note
Please read
get_join_schemas()
- fugue.execution.api.left_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left outer join two dataframes. This is a wrapper of
join()
withhow="left_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.load(path, format_hint=None, columns=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
an engine compatible dataframe
- Return type
AnyDataFrame
For more details and examples, read Zip & Comap.
- fugue.execution.api.persist(df, lazy=False, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the persisted dataframe
- Return type
AnyDataFrame
- fugue.execution.api.repartition(df, partition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Partition the input dataframe using
partition
.- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
partition (PartitionSpec) – how you want to partition the dataframe
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the repartitioned dataframe
- Return type
AnyDataFrame
Caution
This function is experimental, and may be removed in the future.
- fugue.execution.api.right_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Right outer join two dataframes. This is a wrapper of
join()
withhow="right_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.run_engine_function(func, engine=None, engine_conf=None, as_fugue=False, as_local=False, infer_by=None)[source]#
Run a lambda function based on the engine provided
- Parameters
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
func (Callable[[ExecutionEngine], Any]) –
- Returns
None or a Fugue
DataFrame
ifas_fugue
is True, otherwise ifinfer_by
contains any Fugue DataFrame, then return the Fugue DataFrame, otherwise it returns the underlying dataframe usingnative_as_df()
- Return type
Any
Note
This function is for deveopment use. Users should not need it.
- fugue.execution.api.sample(df, n=None, frac=None, replace=False, seed=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the sampled dataframe
- Return type
AnyDataFrame
- fugue.execution.api.save(df, path, format_hint=None, mode='overwrite', partition=None, force_single=False, engine=None, engine_conf=None, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Optional[Any]) – how to partition the dataframe before saving, defaults to None
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
None
For more details and examples, read Load & Save.
- fugue.execution.api.select(df, *columns, where=None, having=None, distinct=False, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
The functional interface for SQL select statement
- Parameters
df (AnyDataFrame) – the dataframe to be operated on
columns (Union[str, ColumnExpr]) – column expressions, for strings they will represent the column names
where (Optional[ColumnExpr]) –
WHERE
condition expression, defaults to Nonehaving (Optional[ColumnExpr]) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to Nonedistinct (bool) – whether to return distinct result, defaults to False
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the select result as a dataframe
- Return type
AnyDataFrame
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, lit, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # select existed and new columns fa.select(df, col("a"),col("b"),lit(1,"another")) fa.select(df, col("a"),(col("b")+lit(1)).alias("x")) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df fa.select( df, f.count_distinct(all_cols()).alias("x")) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x")) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x"), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- fugue.execution.api.semi_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left semi-join two dataframes. This is a wrapper of
join()
withhow="semi"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.set_global_engine(engine, engine_conf=None)[source]#
Make an execution engine and set it as the global execution engine
- Parameters
engine (AnyExecutionEngine) – an engine like object, must not be None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
Caution
In general, it is not a good practice to set a global engine. You should consider
engine_context()
instead. The exception is when you iterate in a notebook and cross cells, this could simplify the code.Note
For more details, please read
make_execution_engine()
andset_global()
Examples
import fugue.api as fa fa.set_global_engine(spark_session) transform(df, func) # will use spark in this transformation fa.clear_global_engine() # remove the global setting
- fugue.execution.api.subtract(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
df1 - df2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to subtract
distinct (bool) –
true
forEXCEPT
(==EXCEPT DISTINCT
),false
forEXCEPT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all datafrmes must be identical, or an exception will be thrown.
- fugue.execution.api.take(df, n, presort, na_position='last', partition=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition (Optional[Any]) – PartitionSpec to apply the take operation, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
n rows of DataFrame per partition
- Return type
AnyDataFrame
- fugue.execution.api.union(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to union
distinct (bool) –
true
forUNION
(==UNION DISTINCT
),false
forUNION ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all dataframes must be identical, or an exception will be thrown.
fugue.execution.execution_engine#
- class fugue.execution.execution_engine.EngineFacet(execution_engine)[source]#
Bases:
FugueEngineBase
The base class for different factes of the execution engines.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property conf: ParamDict#
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- property execution_engine: ExecutionEngine#
the execution engine this sql engine will run on
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property log: Logger#
Logger of this engine instance
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.execution_engine.ExecutionEngine(conf)[source]#
Bases:
FugueEngineBase
The abstract base class for execution engines. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user’s higher level logic.
Please read the ExecutionEngine Tutorial to understand this most important Fugue concept
- Parameters
conf (Any) – dict-like config, read this to learn Fugue specific options
- aggregate(df, partition_spec, agg_cols)[source]#
Aggregate on dataframe
- Parameters
df (DataFrame) – the dataframe to aggregate on
partition_spec (Optional[PartitionSpec]) – PartitionSpec to specify partition keys
agg_cols (List[ColumnExpr]) – aggregation expressions
- Returns
the aggregated result as a dataframe
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # SELECT MAX(b) AS b FROM df engine.aggregate( df, partition_spec=None, agg_cols=[f.max(col("b"))]) # SELECT a, MAX(b) AS x FROM df GROUP BY a engine.aggregate( df, partition_spec=PartitionSpec(by=["a"]), agg_cols=[f.max(col("b")).alias("x")])
- as_context()[source]#
Set this execution engine as the context engine. This function is thread safe and async safe.
Examples
with engine.as_context(): transform(df, func) # will use engine in this transformation
- Return type
Iterator[ExecutionEngine]
- assign(df, columns)[source]#
Update existing columns with new values and add new columns
- Parameters
df (DataFrame) – the dataframe to set columns
columns (List[ColumnExpr]) – column expressions
- Returns
the updated dataframe
- Return type
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
# assume df has schema: a:int,b:str # add constant column x engine.assign(df, lit(1,"x")) # change column b to be a constant integer engine.assign(df, lit(1,"b")) # add new x to be a+b engine.assign(df, (col("a")+col("b")).alias("x")) # cast column a data type to double engine.assign(df, col("a").cast(float))
- abstract broadcast(df)[source]#
Broadcast the dataframe to all workers for a distributed computing framework
- comap(df, map_func, output_schema, partition_spec, on_init=None)[source]#
Apply a function to each zipped partition on the zipped dataframe.
- Parameters
df (DataFrame) – input dataframe, it must be a zipped dataframe (it has to be a dataframe output from
zip()
orzip_all()
)map_func (Callable[[PartitionCursor, DataFrames], LocalDataFrame]) – the function to apply on every zipped partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification for processing the zipped zipped dataframe.
on_init (Optional[Callable[[int, DataFrames], Any]]) – callback function when the physical partition is initializaing, defaults to None
- Returns
the dataframe after the comap operation
Note
The input of this method must be an output of
zip()
orzip_all()
The
partition_spec
here is NOT related with how you zipped the dataframe and however you set it, will only affect the processing speed, actually the partition keys will be overriden to the zipped dataframe partition keys. You may use it in this way to improve the efficiency:PartitionSpec(algo="even", num="ROWCOUNT")
, this tells the execution engine to put each zipped partition into a physical partition so it can achieve the best possible load balance.If input dataframe has keys, the dataframes you get in
map_func
andon_init
will have keys, otherwise you will get list-like dataframeson_init function will get a DataFrames object that has the same structure, but has all empty dataframes, you can use the schemas but not the data.
See also
For more details and examples, read Zip & Comap
- property conf: ParamDict#
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- convert_yield_dataframe(df, as_local)[source]#
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns
another DataFrame that can be used after this execution engine stops
- Return type
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- abstract dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- abstract fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- filter(df, condition)[source]#
Filter rows by the given condition
- Parameters
df (DataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
- Returns
the filtered dataframe
- Return type
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f engine.filter(df, (col("a")>1) & (col("b")=="x")) engine.filter(df, f.coalesce(col("a"),col("b"))>1)
- abstract property fs: FileSystem#
File system of this engine instance
- abstract get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- property in_context: bool#
Whether this engine is being used as a context engine
- abstract intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_global: bool#
Whether this engine is being used as THE global engine
- abstract join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- abstract load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- property map_engine: MapEngine#
The
MapEngine
currently used by this execution engine. You should useset_map_engine()
to set a new MapEngine instance. If not set, the default iscreate_default_map_engine()
- on_enter_context()[source]#
The event hook when calling
set_blobal_engine()
orengine_context()
, defaults to no operation- Return type
None
- on_exit_context()[source]#
The event hook when calling
clear_blobal_engine()
or exiting fromengine_context()
, defaults to no operation- Return type
None
- abstract persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- abstract repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- abstract sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- abstract save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- select(df, cols, where=None, having=None)[source]#
The functional interface for SQL select statement
- Parameters
df (DataFrame) – the dataframe to be operated on
cols (SelectColumns) – column expressions
where (Optional[ColumnExpr]) –
WHERE
condition expression, defaults to Nonehaving (Optional[ColumnExpr]) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to None
- Returns
the select result as a dataframe
- Return type
New Since
0.6.0
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # select existed and new columns engine.select(df, SelectColumns(col("a"),col("b"),lit(1,"another"))) engine.select(df, SelectColumns(col("a"),(col("b")+lit(1)).alias("x"))) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df engine.select( df, SelectColumns(f.count_distinct(all_cols()).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- set_global()[source]#
Set this execution engine to be the global execution engine.
Note
Global engine is also considered as a context engine, so
in_context()
will also become true for the global engine.Examples
engine1.set_global(): transform(df, func) # will use engine1 in this transformation with engine2.as_context(): transform(df, func) # will use engine2 transform(df, func) # will use engine1
- Return type
- set_sql_engine(engine)[source]#
Set
SQLEngine
for this execution engine. If not set, the default iscreate_default_sql_engine()
- property sql_engine: SQLEngine#
The
SQLEngine
currently used by this execution engine. You should useset_sql_engine()
to set a new SQLEngine instance. If not set, the default iscreate_default_sql_engine()
- stop()[source]#
Stop this execution engine, do not override You should customize
stop_engine()
if necessary. This function ensuresstop_engine()
to be called only onceNote
Once the engine is stopped it should not be used again
- Return type
None
- stop_engine()[source]#
Custom logic to stop the execution engine, defaults to no operation
- Return type
None
- abstract subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- abstract take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- abstract union(df1, df2, distinct=True)[source]#
Join two dataframes
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- zip(df1, df2, how='inner', partition_spec=None, temp_path=None, to_file_threshold=-1, df1_name=None, df2_name=None)[source]#
Partition the two dataframes in the same way with
partition_spec
and zip the partitions together on the partition keys.- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition_spec (PartitionSpec, optional) – partition spec to partition each dataframe, defaults to empty.
temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
df1_name (Optional[str]) – df1’s name in the zipped dataframe, defaults to None
df2_name (Optional[str]) – df2’s name in the zipped dataframe, defaults to None
- Returns
a zipped dataframe, the metadata of the dataframe will indicate it’s zipped
Note
Different from join,
df1
anddf2
can have common columns that you will not use as partition keys.If
on
is not specified it will also use the common columns of the two dataframes (if it’s not a cross zip)For non-cross zip, the two dataframes must have common columns, or error will be thrown
See also
For more details and examples, read Zip & Comap.
- zip_all(dfs, how='inner', partition_spec=None, temp_path=None, to_file_threshold=-1)[source]#
Zip multiple dataframes together with given partition specifications.
- Parameters
dfs (DataFrames) – DataFrames like object
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition_spec (Optional[PartitionSpec]) – Partition like object, defaults to empty.
temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
- Returns
a zipped dataframe, the metadata of the dataframe will indicated it’s zipped
- Return type
Note
Please also read
zip()
If
dfs
is dict like, the zipped dataframe will be dict like, Ifdfs
is list like, the zipped dataframe will be list likeIt’s fine to contain only one dataframe in
dfs
See also
For more details and examples, read Zip & Comap
- class fugue.execution.execution_engine.ExecutionEngineParam(param)[source]#
Bases:
AnnotatedParam
- Parameters
param (Optional[Parameter]) –
- class fugue.execution.execution_engine.FugueEngineBase[source]#
Bases:
ABC
- abstract property conf: ParamDict#
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- abstract property is_distributed: bool#
Whether this engine is a distributed engine
- abstract property log: Logger#
Logger of this engine instance
- abstract to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.execution_engine.MapEngine(execution_engine)[source]#
Bases:
EngineFacet
The abstract base class for different map operation implementations.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- map_bag(bag, map_func, partition_spec, on_init=None)[source]#
Apply a function to each partition after you partition the bag in a specified way.
- Parameters
df – input dataframe
map_func (Callable[[BagPartitionCursor, LocalBag], LocalBag]) – the function to apply on every logical partition
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, Bag], Any]]) – callback function when the physical partition is initializaing, defaults to None
bag (Bag) –
- Returns
the bag after the map operation
- Return type
- abstract map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- class fugue.execution.execution_engine.SQLEngine(execution_engine)[source]#
Bases:
EngineFacet
The abstract base class for different SQL execution implementations. Please read this to understand the concept
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: Optional[str]#
- encode(dfs, statement)[source]#
- Parameters
dfs (DataFrames) –
statement (StructuredRawSQL) –
- Return type
Tuple[DataFrames, str]
- load_table(table, **kwargs)[source]#
Load table as a dataframe
- Parameters
table (str) – the table name
kwargs (Any) –
- Returns
an engine compatible dataframe
- Return type
- save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]#
Save the dataframe to a table
- Parameters
df (DataFrame) – the dataframe to save
table (str) – the table name
mode (str) – can accept
overwrite
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults None
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
- abstract select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
fugue.execution.factory#
- fugue.execution.factory.is_pandas_or(objs, obj_type)[source]#
Check whether the input contains at least one
obj_type
object and the rest are Pandas DataFrames. This function is a utility function for extendinginfer_execution_engine()
- Parameters
objs (List[Any]) – the list of objects to check
obj_type (Any) –
- Returns
whether all objs are of type
obj_type
or pandas DataFrame and at least one is of typeobj_type
- Return type
bool
- fugue.execution.factory.make_execution_engine(engine=None, conf=None, infer_by=None, **kwargs)[source]#
Create
ExecutionEngine
with specifiedengine
- Parameters
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneconf (Optional[Any]) – Parameters like object, defaults to None
infer_by (Optional[List[Any]]) – List of objects that can be used to infer the execution engine using
infer_execution_engine()
kwargs (Any) – additional parameters to initialize the execution engine
- Returns
the
ExecutionEngine
instance- Return type
Note
This function finds/constructs the engine in the following order:
If
engine
is None, it first try to see if there is any defined context engine to use (=> engine)If
engine
is still empty, then it will try to get the global execution engine. Seeset_global()
If
engine
is still empty, then ifinfer_by
is given, it will try to infer the execution engine (=> engine)If
engine
is still empty, then it will construct the default engine defined byregister_default_execution_engine()
(=> engine)Now,
engine
must not be empty, if it is an object other thanExecutionEngine
, we will useparse_execution_engine()
to construct (=> engine)Now,
engine
must have been an ExecutionEngine object. We update its SQL engine if specified, then update its config usingconf
andkwargs
Examples
register_default_execution_engine(lambda conf: E1(conf)) register_execution_engine("e2", lambda conf, **kwargs: E2(conf, **kwargs)) register_sql_engine("s", lambda conf: S2(conf)) # E1 + E1.create_default_sql_engine() make_execution_engine() # E2 + E2.create_default_sql_engine() make_execution_engine(e2) # E1 + S2 make_execution_engine((None, "s")) # E2(conf, a=1, b=2) + S2 make_execution_engine(("e2", "s"), conf, a=1, b=2) # SparkExecutionEngine + SparkSQLEngine make_execution_engine(SparkExecutionEngine) make_execution_engine(SparkExecutionEngine(spark_session, conf)) # SparkExecutionEngine + S2 make_execution_engine((SparkExecutionEngine, "s")) # assume object e2_df can infer E2 engine make_execution_engine(infer_by=[e2_df]) # an E2 engine # global e_global = E1(conf) e_global.set_global() make_execution_engine() # e_global # context with E2(conf).as_context() as ec: make_execution_engine() # ec make_execution_engine() # e_global
- fugue.execution.factory.make_sql_engine(engine=None, execution_engine=None, **kwargs)[source]#
Create
SQLEngine
with specifiedengine
- Parameters
engine (Optional[Any]) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an
SQLEngine
type, or theSQLEngine
instance (you can useNone
to use the default one), defaults to Noneexecution_engine (Optional[ExecutionEngine]) – the
ExecutionEngine
instance to create theSQLEngine
. Normally you should always provide this value.kwargs (Any) – additional parameters to initialize the sql engine
- Returns
the
SQLEngine
instance- Return type
Note
For users, you normally don’t need to call this function directly. Use
make_execution_engine
insteadExamples
register_default_sql_engine(lambda conf: S1(conf)) register_sql_engine("s2", lambda conf: S2(conf)) engine = NativeExecutionEngine() # S1(engine) make_sql_engine(None, engine) # S1(engine, a=1) make_sql_engine(None, engine, a=1) # S2(engine) make_sql_engine("s2", engine)
- fugue.execution.factory.register_default_execution_engine(func, on_dup='overwrite')[source]#
Register
ExecutionEngine
as the default engine.- Parameters
func (Callable) – a callable taking Parameters like object and
**kwargs
and returning anExecutionEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type
None
Examples
# create a new engine with name my (overwrites if existed) register_default_execution_engine(lambda conf: MyExecutionEngine(conf)) # the following examples will use MyExecutionEngine # 0 make_execution_engine() make_execution_engine(None, {"myconfig":"value}) # 1 dag = FugueWorkflow() dag.create([[0]],"a:int").show() dag.run(None, {"myconfig":"value}) # 2 fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run("", {"myconfig":"value})
- fugue.execution.factory.register_default_sql_engine(func, on_dup='overwrite')[source]#
Register
SQLEngine
as the default engine- Parameters
func (Callable) – a callable taking
ExecutionEngine
and**kwargs
and returning aSQLEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting) or “throw” (throw exception), defaults to “overwrite”.
- Raises
KeyError – if
on_dup
isthrow
and thename
already exists- Return type
None
Note
You should be careful to use this function, because when you set a custom SQL engine as default, all execution engines you create will use this SQL engine unless you are explicit. For example if you set the default SQL engine to be a Spark specific one, then if you start a NativeExecutionEngine, it will try to use it and will throw exceptions.
So it’s always a better idea to use
register_sql_engine
insteadExamples
# create a new engine with name my (overwrites if existed) register_default_sql_engine(lambda engine: MySQLEngine(engine)) # create NativeExecutionEngine with MySQLEngine as the default make_execution_engine() # create SparkExecutionEngine with MySQLEngine instead of SparkSQLEngine make_execution_engine("spark") # NativeExecutionEngine with MySQLEngine with FugueWorkflow() as dag: dag.create([[0]],"a:int").show() dag.run()
- fugue.execution.factory.register_execution_engine(name_or_type, func, on_dup='overwrite')[source]#
Register
ExecutionEngine
with a given name.- Parameters
name_or_type (Union[str, Type]) – alias of the execution engine, or type of an object that can be converted to an execution engine
func (Callable) – a callable taking Parameters like object and
**kwargs
and returning anExecutionEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type
None
Examples
Alias registration examples:
# create a new engine with name my (overwrites if existed) register_execution_engine("my", lambda conf: MyExecutionEngine(conf)) # 0 make_execution_engine("my") make_execution_engine("my", {"myconfig":"value}) # 1 dag = FugueWorkflow() dag.create([[0]],"a:int").show() dag.run("my", {"myconfig":"value}) # 2 fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run("my")
Type registration examples:
from pyspark.sql import SparkSession from fugue_spark import SparkExecutionEngine from fugue import fsql register_execution_engine( SparkSession, lambda session, conf: SparkExecutionEngine(session, conf)) spark_session = SparkSession.builder.getOrCreate() fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run(spark_session)
- fugue.execution.factory.register_sql_engine(name, func, on_dup='overwrite')[source]#
Register
SQLEngine
with a given name.- Parameters
name (str) – name of the SQL engine
func (Callable) – a callable taking
ExecutionEngine
and**kwargs
and returning aSQLEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type
None
Examples
# create a new engine with name my (overwrites if existed) register_sql_engine("mysql", lambda engine: MySQLEngine(engine)) # create execution engine with MySQLEngine as the default make_execution_engine(("", "mysql")) # create DaskExecutionEngine with MySQLEngine as the default make_execution_engine(("dask", "mysql")) # default execution engine + MySQLEngine with FugueWorkflow() as dag: dag.create([[0]],"a:int").show() dag.run(("","mysql"))
- fugue.execution.factory.try_get_context_execution_engine()[source]#
If the global execution engine is set (see
set_global()
) or the context is set (seeas_context()
), then return the engine, else return None- Return type
Optional[ExecutionEngine]
fugue.execution.native_execution_engine#
- class fugue.execution.native_execution_engine.NativeExecutionEngine(conf=None)[source]#
Bases:
ExecutionEngine
The execution engine based on native python and pandas. This execution engine is mainly for prototyping and unit tests.
Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
- dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- property fs: FileSystem#
File system of this engine instance
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_distributed: bool#
Whether this engine is a distributed engine
- join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- property log: Logger#
Logger of this engine instance
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- property pl_utils: PandasUtils#
Pandas-like dataframe utils
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.native_execution_engine.PandasMapEngine(execution_engine)[source]#
Bases:
MapEngine
- Parameters
execution_engine (ExecutionEngine) –
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.native_execution_engine.QPDPandasEngine(execution_engine)[source]#
Bases:
SQLEngine
QPD execution implementation.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: Optional[str]#
- property is_distributed: bool#
Whether this engine is a distributed engine
- select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything