Top Level User API Reference#
IO#
- fugue.api.as_fugue_dataset(data, **kwargs)[source]#
Wrap the input as a
Dataset
- Parameters
data (AnyDataset) – the dataset to be wrapped
kwargs (Any) –
- Return type
- fugue.api.as_fugue_df(df, **kwargs)[source]#
Wrap the object as a Fugue DataFrame.
- Parameters
df (AnyDataFrame) – the object to wrap
kwargs (Any) –
- Return type
- fugue.api.as_fugue_engine_df(engine, df, schema=None)[source]#
Convert a dataframe to a Fugue engine dependent DataFrame. This function is used internally by Fugue. It is not recommended to use
- Parameters
engine (ExecutionEngine) – the ExecutionEngine to use, must not be None
df (AnyDataFrame) – a dataframe like object
schema (Any) – the schema of the dataframe, defaults to None
- Returns
the engine dependent DataFrame
- Return type
- fugue.api.load(path, format_hint=None, columns=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
an engine compatible dataframe
- Return type
AnyDataFrame
For more details and examples, read Zip & Comap.
- fugue.api.save(df, path, format_hint=None, mode='overwrite', partition=None, force_single=False, engine=None, engine_conf=None, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Optional[Any]) – how to partition the dataframe before saving, defaults to None
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
None
For more details and examples, read Load & Save.
Information#
- fugue.api.count(data)[source]#
The number of elements in the dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
int
- fugue.api.is_bounded(data)[source]#
Whether the dataset is local
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.is_empty(data)[source]#
Whether the dataset is empty
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.is_local(data)[source]#
Whether the dataset is local
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.show(data, n=10, with_count=False, title=None)[source]#
Display the Dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
n (int) – number of rows to print, defaults to 10
with_count (bool) – whether to show dataset count, defaults to False
title (Optional[str]) – title of the dataset, defaults to None
- Return type
None
Note
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need tofugue.execution.execution_engine.ExecutionEngine.persist()
the dataset.
- fugue.api.get_column_names(df)[source]#
A generic function to get column names of any dataframe
- Parameters
df (AnyDataFrame) – the dataframe object
- Returns
the column names
- Return type
List[Any]
Note
In order to support a new type of dataframe, an implementation must be registered, for example
- fugue.api.get_num_partitions(data)[source]#
Get the number of partitions of the dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.get_schema(df)[source]#
Get the schema of the
df
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
- Returns
the Schema object
- Return type
- fugue.api.is_df(df)[source]#
Whether
df
is a DataFrame like object- Parameters
df (Any) –
- Return type
bool
Transformation#
- fugue.api.transform(df, using, schema=None, params=None, partition=None, callback=None, ignore_errors=None, persist=False, as_local=False, save_path=None, checkpoint=False, engine=None, engine_conf=None, as_fugue=False)[source]#
Transform this dataframe using transformer. It’s a wrapper of
transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Also, only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
partition (Optional[Any]) – Partition like object, defaults to None
callback (Optional[Any]) – RPChandler like object, defaults to None
ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Optional[Any]) – Parameters like object, defaults to None
as_fugue (bool) – If true, the function will always return a
FugueDataFrame
, otherwise, ifdf
is in native dataframe types such as pandas dataframe, then the output will also return in its native format. Defaults to Falsepersist (bool) – Whether to persist(materialize) the dataframe before returning
as_local (bool) – If true, the result will be converted to a
LocalDataFrame
save_path (Optional[str]) – Whether to save the output to a file (see the note)
checkpoint (bool) – Whether to add a checkpoint for the output (see the note)
- Returns
the transformed dataframe, if
df
is a native dataframe (e.g. pd.DataFrame, spark dataframe, etc), the output will be a native dataframe, the type is determined by the execution engine you use. But ifdf
is of typeDataFrame
, then the output will also be aDataFrame
- Return type
Any
Note
This function may be lazy and return the transformed dataframe.
Note
When you use callback in this function, you must be careful that the output dataframe must be materialized. Otherwise, if the real compute happens out of the function call, the callback receiver is already shut down. To do that you can either use
persist
oras_local
, both will materialize the dataframe before the callback receiver shuts down.Note
When save_path is None and checkpoint is False, then the output will not be saved into a file. The return will be a dataframe.
When save_path is None and checkpoint is True, then the output is saved into the path set by fugue.workflow.checkpoint.path, the name will be randomly chosen, and it is NOT a deterministic checkpoint, so if you run multiple times, the output will be saved into different files. The return will be a dataframe.
When save_path is not None and checkpoint is False, then the output will be saved into save_path. The return will be the value of save_path
When save_path is not None and checkpoint is True, then the output will be saved into save_path. The return will be the dataframe from save_path
This function can only take parquet file paths in df and save_path. Csv and other file formats are disallowed.
The checkpoint here is NOT deterministic, so re-run will generate new checkpoints.
If you want to read and write other file formats or if you want to use deterministic checkpoints, please use
FugueWorkflow
.
- fugue.api.out_transform(df, using, params=None, partition=None, callback=None, ignore_errors=None, engine=None, engine_conf=None)[source]#
Transform this dataframe using transformer. It’s a wrapper of
out_transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
params (Optional[Any]) – Parameters like object to run the processor, defaults to None The transformer will be able to access this value from
params()
partition (Optional[Any]) – Partition like object, defaults to None
callback (Optional[Any]) – RPChandler like object, defaults to None
ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Optional[Any]) – Parameters like object, defaults to None
- Return type
None
Note
This function can only take parquet file paths in df. CSV and JSON file formats are disallowed.
This transformation is guaranteed to execute immediately (eager) and return nothing
- fugue.api.alter_columns(df, columns, as_fugue=False)[source]#
Change column types
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
AnyDataFrame
- fugue.api.drop_columns(df, columns, as_fugue=False)[source]#
Drop certain columns and return a new dataframe
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (List[str]) – columns to drop
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
a new dataframe removing the columns
- Return type
AnyDataFrame
- fugue.api.head(df, n, columns=None, as_fugue=False)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.df (AnyDataFrame) –
- Returns
a local bounded dataframe
- Return type
AnyDataFrame
- fugue.api.normalize_column_names(df)[source]#
A generic function to normalize any dataframe’s column names to follow Fugue naming rules
Note
This is a temporary solution before
Schema
can take arbitrary namesExamples
[0,1]
=>{"_0":0, "_1":1}
["1a","2b"]
=>{"_1a":"1a", "_2b":"2b"}
["*a","-a"]
=>{"_a":"*a", "_a_1":"-a"}
- Parameters
df (AnyDataFrame) – a dataframe object
- Returns
the renamed dataframe and the rename operations as a dict that can undo the change
- Return type
Tuple[AnyDataFrame, Dict[str, Any]]
See also
- fugue.api.rename(df, columns, as_fugue=False)[source]#
A generic function to rename column names of any dataframe
- Parameters
df (AnyDataFrame) – the dataframe object
columns (Dict[str, Any]) – the rename operations as a dict:
old name => new name
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
the renamed dataframe
- Return type
AnyDataFrame
Note
In order to support a new type of dataframe, an implementation must be registered, for example
- fugue.api.select_columns(df, columns, as_fugue=False)[source]#
Select certain columns and return a new dataframe
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (List[Any]) – columns to return
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
a new dataframe with the selected the columns
- Return type
AnyDataFrame
- fugue.api.distinct(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Equivalent to
SELECT DISTINCT * FROM df
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the result with distinct rows
- Return type
AnyDataFrame
- fugue.api.dropna(df, how='any', thresh=None, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Drop NA recods from dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records dropped
- Return type
AnyDataFrame
- fugue.api.fillna(df, value, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records filled
- Return type
AnyDataFrame
- fugue.api.sample(df, n=None, frac=None, replace=False, seed=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the sampled dataframe
- Return type
AnyDataFrame
- fugue.api.take(df, n, presort, na_position='last', partition=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition (Optional[Any]) – PartitionSpec to apply the take operation, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
n rows of DataFrame per partition
- Return type
AnyDataFrame
SQL#
- fugue.api.fugue_sql(query, *args, fsql_ignore_case=None, fsql_dialect=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Simplified Fugue SQL interface. This function can still take multiple dataframe inputs but will always return the last generated dataframe in the SQL workflow. And
YIELD
should NOT be used with this function. If you want to use Fugue SQL to represent the full workflow, or want to see more Fugue SQL examples, please readfugue_sql_flow()
.- Parameters
query (str) – the Fugue SQL string (can be a jinja template)
args (Any) – variables related to the SQL string
fsql_ignore_case (Optional[bool]) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).
fsql_dialect (Optional[str]) – the dialect of this fsql, defaults to None (it depends on the engine/global config).
kwargs (Any) – variables related to the SQL string
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether return a local dataframe, defaults to False
- Returns
the result dataframe
- Return type
AnyDataFrame
Note
This function is different from
raw_sql()
which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates aFugueSQLWorkflow
which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.
Caution
Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.
That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.
We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.
import pandas as pd import fugue.api as fa def tr(df:pd.DataFrame) -> pd.DataFrame: return df.assign(c=2) input = pd.DataFrame([[0,1],[3.4]], columns=["a","b"]) with fa.engine_context("duckdb"): res = fa.fugue_sql(''' SELECT * FROM input WHERE a<{{x}} TRANSFORM USING tr SCHEMA *,c:int ''', x=2) assert fa.as_array(res) == [[0,1,2]]
- fugue.api.fugue_sql_flow(query, *args, fsql_ignore_case=None, fsql_dialect=None, **kwargs)[source]#
Fugue SQL full functional interface. This function allows full workflow definition using Fugue SQL, and it allows multiple outputs using
YIELD
.- Parameters
query (str) – the Fugue SQL string (can be a jinja template)
args (Any) – variables related to the SQL string
fsql_ignore_case (Optional[bool]) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).
fsql_dialect (Optional[str]) – the dialect of this fsql, defaults to None (it depends on the engine/global config).
kwargs (Any) – variables related to the SQL string
- Returns
the translated Fugue workflow
- Return type
Note
This function is different from
raw_sql()
which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates aFugueSQLWorkflow
which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.
Caution
Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.
That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.
We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.
import fugue.api.fugue_sql_flow as fsql import fugue.api as fa # Basic case fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run() # With external data sources df = pd.DataFrame([[0],[1]], columns=["a"]) fsql(''' SELECT * FROM df WHERE a=0 PRINT ''').run() # With external variables df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' SELECT * FROM df WHERE a={{t}} PRINT ''').run() # The following is the explicit way to specify variables and datafrems # (recommended) df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' SELECT * FROM df WHERE a={{t}} PRINT ''', df=df, t=t).run() # Using extensions def dummy(df:pd.DataFrame) -> pd.DataFrame: return df fsql(''' CREATE [[0]] SCHEMA a:int TRANSFORM USING dummy SCHEMA * PRINT ''').run() # It's recommended to provide full path of the extension inside # Fugue SQL, so the SQL definition and exeuction can be more # independent from the extension definition. # Run with different execution engines sql = ''' CREATE [[0]] SCHEMA a:int TRANSFORM USING dummy SCHEMA * PRINT ''' fsql(sql).run(spark_session) fsql(sql).run("dask") with fa.engine_context("duckdb"): fsql(sql).run() # Passing dataframes between fsql calls result = fsql(''' CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x CREATE [[1]] SCHEMA a:int YIELD DATAFRAME AS y ''').run(DaskExecutionEngine) fsql(''' SELECT * FROM x UNION SELECT * FROM y UNION SELECT * FROM z PRINT ''', result, z=pd.DataFrame([[2]], columns=["z"])).run() # Get framework native dataframes result["x"].native # Dask dataframe result["y"].native # Dask dataframe result["x"].as_pandas() # Pandas dataframe # Use lower case fugue sql df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' select * from df where a={{t}} print ''', df=df, t=t, fsql_ignore_case=True).run()
- fugue.api.raw_sql(*statements, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Run raw SQL on the execution engine
- Parameters
statements (Any) – a sequence of sub-statements in string or dataframe-like objects
engine (Optional[Any]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether return a local dataframe, defaults to False
- Returns
the result dataframe
- Return type
AnyDataFrame
Caution
Currently, only
SELECT
statements are supportedExamples
import pandas as pd import fugue.api as fa with fa.engine_context("duckdb"): a = fa.as_fugue_df([[0,1]], schema="a:long,b:long") b = pd.DataFrame([[0,10]], columns=["a","b"]) c = fa.raw_sql("SELECT * FROM",a,"UNION SELECT * FROM",b) fa.as_pandas(c)
- fugue.api.join(df1, df2, *dfs, how, on=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
Note
Please read
get_join_schemas()
- fugue.api.semi_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left semi-join two dataframes. This is a wrapper of
join()
withhow="semi"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.anti_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left anti-join two dataframes. This is a wrapper of
join()
withhow="anti"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.inner_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Inner join two dataframes. This is a wrapper of
join()
withhow="inner"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.left_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left outer join two dataframes. This is a wrapper of
join()
withhow="left_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.right_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Right outer join two dataframes. This is a wrapper of
join()
withhow="right_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.full_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Full outer join two dataframes. This is a wrapper of
join()
withhow="full_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.cross_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Cross join two dataframes. This is a wrapper of
join()
withhow="cross"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.union(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to union
distinct (bool) –
true
forUNION
(==UNION DISTINCT
),false
forUNION ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all dataframes must be identical, or an exception will be thrown.
- fugue.api.intersect(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Intersect
df1
anddf2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to intersect with
distinct (bool) –
true
forINTERSECT
(==INTERSECT DISTINCT
),false
forINTERSECT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- fugue.api.subtract(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
df1 - df2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to subtract
distinct (bool) –
true
forEXCEPT
(==EXCEPT DISTINCT
),false
forEXCEPT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all datafrmes must be identical, or an exception will be thrown.
- fugue.api.assign(df, engine=None, engine_conf=None, as_fugue=False, as_local=False, **columns)[source]#
Update existing columns with new values and add new columns
- Parameters
df (AnyDataFrame) – the dataframe to set columns
columns (Any) – column expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the updated dataframe
- Return type
AnyDataFrame
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa # assume df has schema: a:int,b:str with fa.engine_context("duckdb"): # add constant column x fa.assign(df, x=1) # change column b to be a constant integer fa.assign(df, b=1) # add new x to be a+b fa.assign(df, x=col("a")+col("b")) # cast column a data type to double fa.assign(df, a=col("a").cast(float))
- fugue.api.select(df, *columns, where=None, having=None, distinct=False, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
The functional interface for SQL select statement
- Parameters
df (AnyDataFrame) – the dataframe to be operated on
columns (Union[str, ColumnExpr]) – column expressions, for strings they will represent the column names
where (Optional[ColumnExpr]) –
WHERE
condition expression, defaults to Nonehaving (Optional[ColumnExpr]) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to Nonedistinct (bool) – whether to return distinct result, defaults to False
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the select result as a dataframe
- Return type
AnyDataFrame
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, lit, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # select existed and new columns fa.select(df, col("a"),col("b"),lit(1,"another")) fa.select(df, col("a"),(col("b")+lit(1)).alias("x")) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df fa.select( df, f.count_distinct(all_cols()).alias("x")) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x")) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x"), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- fugue.api.filter(df, condition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Filter rows by the given condition
- Parameters
df (AnyDataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the filtered dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): fa.filter(df, (col("a")>1) & (col("b")=="x")) fa.filter(df, f.coalesce(col("a"),col("b"))>1)
- fugue.api.aggregate(df, partition_by=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **agg_kwcols)[source]#
Aggregate on dataframe
- Parameters
df (AnyDataFrame) – the dataframe to aggregate on
partition_by (Union[None, str, List[str]]) – partition key(s), defaults to None
agg_kwcols (ColumnExpr) – aggregation expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the aggregated result as a dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # SELECT MAX(b) AS b FROM df fa.aggregate(df, b=f.max(col("b"))) # SELECT a, MAX(b) AS x FROM df GROUP BY a fa.aggregate(df, "a", x=f.max(col("b")))
Conversion#
- fugue.api.as_local(data)[source]#
Convert the dataset to a local dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
AnyDataset
- fugue.api.as_local_bounded(data)[source]#
Convert the dataset to a local bounded dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
AnyDataset
- fugue.api.as_array(df, columns=None, type_safe=False)[source]#
Convert df to 2-dimensional native python array
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- fugue.api.as_array_iterable(df, columns=None, type_safe=False)[source]#
Convert df to iterable of native python arrays
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- fugue.api.as_arrow(df)[source]#
Convert
df
to a PyArrow Table- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
- Returns
the PyArrow Table
- Return type
- fugue.api.as_dict_iterable(df, columns=None)[source]#
Convert df to iterable of native python dicts
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Optional[List[str]]) – columns to extract, defaults to None
- Returns
iterable of native python dicts
- Return type
Iterable[Dict[str, Any]]
Note
The default implementation enforces
type_safe
True
- fugue.api.as_pandas(df)[source]#
Convert
df
to a Pandas DataFrame- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
- Returns
the Pandas DataFrame
- Return type
- fugue.api.get_native_as_df(df)[source]#
Return the dataframe form of the input
df
. Ifdf
is aDataFrame
, then call thenative_as_df()
, otherwise, it depends on whether there is a correspondent function handling it.- Parameters
df (AnyDataFrame) –
- Return type
AnyDataFrame
ExecutionEngine#
- fugue.api.engine_context(engine=None, engine_conf=None, infer_by=None)[source]#
Make an execution engine and set it as the context engine. This function is thread safe and async safe.
- Parameters
engine (AnyExecutionEngine) – an engine like object, defaults to None
engine_conf (Any) – the configs for the engine, defaults to None
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
- Return type
Iterator[ExecutionEngine]
Note
For more details, please read
make_execution_engine()
Examples
import fugue.api as fa with fa.engine_context(spark_session): transform(df, func) # will use spark in this transformation
- fugue.api.set_global_engine(engine, engine_conf=None)[source]#
Make an execution engine and set it as the global execution engine
- Parameters
engine (AnyExecutionEngine) – an engine like object, must not be None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
Caution
In general, it is not a good practice to set a global engine. You should consider
engine_context()
instead. The exception is when you iterate in a notebook and cross cells, this could simplify the code.Note
For more details, please read
make_execution_engine()
andset_global()
Examples
import fugue.api as fa fa.set_global_engine(spark_session) transform(df, func) # will use spark in this transformation fa.clear_global_engine() # remove the global setting
- fugue.api.clear_global_engine()[source]#
Remove the global exeuction engine (if set)
- Return type
None
- fugue.api.get_context_engine()[source]#
Get the execution engine in the current context. Regarding the order of the logic please read
make_execution_engine()
- Return type
Big Data Operations#
- fugue.api.broadcast(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Broadcast the dataframe to all workers of a distributed computing backend
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine-like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the broadcasted dataframe
- Return type
AnyDataFrame
- fugue.api.persist(df, lazy=False, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the persisted dataframe
- Return type
AnyDataFrame
- fugue.api.repartition(df, partition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Partition the input dataframe using
partition
.- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
partition (PartitionSpec) – how you want to partition the dataframe
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the repartitioned dataframe
- Return type
AnyDataFrame
Caution
This function is experimental, and may be removed in the future.
Development#
- fugue.api.run_engine_function(func, engine=None, engine_conf=None, as_fugue=False, as_local=False, infer_by=None)[source]#
Run a lambda function based on the engine provided
- Parameters
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
func (Callable[[ExecutionEngine], Any]) –
- Returns
None or a Fugue
DataFrame
ifas_fugue
is True, otherwise ifinfer_by
contains any Fugue DataFrame, then return the Fugue DataFrame, otherwise it returns the underlying dataframe usingnative_as_df()
- Return type
Any
Note
This function is for deveopment use. Users should not need it.