Fugue API Docs#
Fugue is a unified interface for distributed computing that lets users execute Python, pandas, and SQL code on Spark, Dask, and Ray with minimal rewrites.
This documentation page is mainly an API reference. To learn more about Fugue, the Github repo README and the tutorials will be the best places to start. The API reference is mainly for users looking for specific functions and methods.
Installation#
Fugue is available on both pip and conda. Detailed instructions can be found on the README.
Community#
Please join the Fugue Slack to ask questions. We will try to reply as soon as possible.
For contributing, start with the contributing guide
Fugue Tutorials#
To directly read the tutorials without running them:
You may launch a Fugue tutorial notebook environemnt on binder
But it runs slow on binder, the machine on binder isn’t powerful enough for a distributed framework such as Spark. Parallel executions can become sequential, so some of the performance comparison examples will not give you the correct numbers.
Alternatively, you should get decent performance if running its docker image on your own machine:
docker run -p 8888:8888 fugueproject/tutorials:latest
Top Level User API Reference#
IO#
- fugue.api.as_fugue_dataset(data, **kwargs)[source]#
Wrap the input as a
Dataset
- Parameters
data (AnyDataset) – the dataset to be wrapped
kwargs (Any) –
- Return type
- fugue.api.as_fugue_df(df, **kwargs)[source]#
Wrap the object as a Fugue DataFrame.
- Parameters
df (AnyDataFrame) – the object to wrap
kwargs (Any) –
- Return type
- fugue.api.as_fugue_engine_df(engine, df, schema=None)[source]#
Convert a dataframe to a Fugue engine dependent DataFrame. This function is used internally by Fugue. It is not recommended to use
- Parameters
engine (ExecutionEngine) – the ExecutionEngine to use, must not be None
df (AnyDataFrame) – a dataframe like object
schema (Any) – the schema of the dataframe, defaults to None
- Returns
the engine dependent DataFrame
- Return type
- fugue.api.load(path, format_hint=None, columns=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
an engine compatible dataframe
- Return type
AnyDataFrame
For more details and examples, read Zip & Comap.
- fugue.api.save(df, path, format_hint=None, mode='overwrite', partition=None, force_single=False, engine=None, engine_conf=None, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Optional[Any]) – how to partition the dataframe before saving, defaults to None
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
None
For more details and examples, read Load & Save.
Information#
- fugue.api.count(data)[source]#
The number of elements in the dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
int
- fugue.api.is_bounded(data)[source]#
Whether the dataset is local
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.is_empty(data)[source]#
Whether the dataset is empty
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.is_local(data)[source]#
Whether the dataset is local
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.show(data, n=10, with_count=False, title=None)[source]#
Display the Dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
n (int) – number of rows to print, defaults to 10
with_count (bool) – whether to show dataset count, defaults to False
title (Optional[str]) – title of the dataset, defaults to None
- Return type
None
Note
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need tofugue.execution.execution_engine.ExecutionEngine.persist()
the dataset.
- fugue.api.get_column_names(df)[source]#
A generic function to get column names of any dataframe
- Parameters
df (AnyDataFrame) – the dataframe object
- Returns
the column names
- Return type
List[Any]
Note
In order to support a new type of dataframe, an implementation must be registered, for example
- fugue.api.get_num_partitions(data)[source]#
Get the number of partitions of the dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
bool
- fugue.api.get_schema(df)[source]#
Get the schema of the
df
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
- Returns
the Schema object
- Return type
- fugue.api.is_df(df)[source]#
Whether
df
is a DataFrame like object- Parameters
df (Any) –
- Return type
bool
Transformation#
- fugue.api.transform(df, using, schema=None, params=None, partition=None, callback=None, ignore_errors=None, persist=False, as_local=False, save_path=None, checkpoint=False, engine=None, engine_conf=None, as_fugue=False)[source]#
Transform this dataframe using transformer. It’s a wrapper of
transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Also, only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
partition (Optional[Any]) – Partition like object, defaults to None
callback (Optional[Any]) – RPChandler like object, defaults to None
ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Optional[Any]) – Parameters like object, defaults to None
as_fugue (bool) – If true, the function will always return a
FugueDataFrame
, otherwise, ifdf
is in native dataframe types such as pandas dataframe, then the output will also return in its native format. Defaults to Falsepersist (bool) – Whether to persist(materialize) the dataframe before returning
as_local (bool) – If true, the result will be converted to a
LocalDataFrame
save_path (Optional[str]) – Whether to save the output to a file (see the note)
checkpoint (bool) – Whether to add a checkpoint for the output (see the note)
- Returns
the transformed dataframe, if
df
is a native dataframe (e.g. pd.DataFrame, spark dataframe, etc), the output will be a native dataframe, the type is determined by the execution engine you use. But ifdf
is of typeDataFrame
, then the output will also be aDataFrame
- Return type
Any
Note
This function may be lazy and return the transformed dataframe.
Note
When you use callback in this function, you must be careful that the output dataframe must be materialized. Otherwise, if the real compute happens out of the function call, the callback receiver is already shut down. To do that you can either use
persist
oras_local
, both will materialize the dataframe before the callback receiver shuts down.Note
When save_path is None and checkpoint is False, then the output will not be saved into a file. The return will be a dataframe.
When save_path is None and checkpoint is True, then the output is saved into the path set by fugue.workflow.checkpoint.path, the name will be randomly chosen, and it is NOT a deterministic checkpoint, so if you run multiple times, the output will be saved into different files. The return will be a dataframe.
When save_path is not None and checkpoint is False, then the output will be saved into save_path. The return will be the value of save_path
When save_path is not None and checkpoint is True, then the output will be saved into save_path. The return will be the dataframe from save_path
This function can only take parquet file paths in df and save_path. Csv and other file formats are disallowed.
The checkpoint here is NOT deterministic, so re-run will generate new checkpoints.
If you want to read and write other file formats or if you want to use deterministic checkpoints, please use
FugueWorkflow
.
- fugue.api.out_transform(df, using, params=None, partition=None, callback=None, ignore_errors=None, engine=None, engine_conf=None)[source]#
Transform this dataframe using transformer. It’s a wrapper of
out_transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
params (Optional[Any]) – Parameters like object to run the processor, defaults to None The transformer will be able to access this value from
params()
partition (Optional[Any]) – Partition like object, defaults to None
callback (Optional[Any]) – RPChandler like object, defaults to None
ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Optional[Any]) – Parameters like object, defaults to None
- Return type
None
Note
This function can only take parquet file paths in df. CSV and JSON file formats are disallowed.
This transformation is guaranteed to execute immediately (eager) and return nothing
- fugue.api.alter_columns(df, columns, as_fugue=False)[source]#
Change column types
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
AnyDataFrame
- fugue.api.drop_columns(df, columns, as_fugue=False)[source]#
Drop certain columns and return a new dataframe
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (List[str]) – columns to drop
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
a new dataframe removing the columns
- Return type
AnyDataFrame
- fugue.api.head(df, n, columns=None, as_fugue=False)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.df (AnyDataFrame) –
- Returns
a local bounded dataframe
- Return type
AnyDataFrame
- fugue.api.normalize_column_names(df)[source]#
A generic function to normalize any dataframe’s column names to follow Fugue naming rules
Note
This is a temporary solution before
Schema
can take arbitrary namesExamples
[0,1]
=>{"_0":0, "_1":1}
["1a","2b"]
=>{"_1a":"1a", "_2b":"2b"}
["*a","-a"]
=>{"_a":"*a", "_a_1":"-a"}
- Parameters
df (AnyDataFrame) – a dataframe object
- Returns
the renamed dataframe and the rename operations as a dict that can undo the change
- Return type
Tuple[AnyDataFrame, Dict[str, Any]]
See also
- fugue.api.rename(df, columns, as_fugue=False)[source]#
A generic function to rename column names of any dataframe
- Parameters
df (AnyDataFrame) – the dataframe object
columns (Dict[str, Any]) – the rename operations as a dict:
old name => new name
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
the renamed dataframe
- Return type
AnyDataFrame
Note
In order to support a new type of dataframe, an implementation must be registered, for example
- fugue.api.select_columns(df, columns, as_fugue=False)[source]#
Select certain columns and return a new dataframe
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (List[Any]) – columns to return
as_fugue (bool) – whether return a Fugue
DataFrame
, default to False. If False, then if the inputdf
is not a Fugue DataFrame then it will return the underlying DataFrame object.
- Returns
a new dataframe with the selected the columns
- Return type
AnyDataFrame
- fugue.api.distinct(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Equivalent to
SELECT DISTINCT * FROM df
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the result with distinct rows
- Return type
AnyDataFrame
- fugue.api.dropna(df, how='any', thresh=None, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Drop NA recods from dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records dropped
- Return type
AnyDataFrame
- fugue.api.fillna(df, value, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records filled
- Return type
AnyDataFrame
- fugue.api.sample(df, n=None, frac=None, replace=False, seed=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the sampled dataframe
- Return type
AnyDataFrame
- fugue.api.take(df, n, presort, na_position='last', partition=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition (Optional[Any]) – PartitionSpec to apply the take operation, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
n rows of DataFrame per partition
- Return type
AnyDataFrame
SQL#
- fugue.api.fugue_sql(query, *args, fsql_ignore_case=None, fsql_dialect=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Simplified Fugue SQL interface. This function can still take multiple dataframe inputs but will always return the last generated dataframe in the SQL workflow. And
YIELD
should NOT be used with this function. If you want to use Fugue SQL to represent the full workflow, or want to see more Fugue SQL examples, please readfugue_sql_flow()
.- Parameters
query (str) – the Fugue SQL string (can be a jinja template)
args (Any) – variables related to the SQL string
fsql_ignore_case (Optional[bool]) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).
fsql_dialect (Optional[str]) – the dialect of this fsql, defaults to None (it depends on the engine/global config).
kwargs (Any) – variables related to the SQL string
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether return a local dataframe, defaults to False
- Returns
the result dataframe
- Return type
AnyDataFrame
Note
This function is different from
raw_sql()
which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates aFugueSQLWorkflow
which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.
Caution
Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.
That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.
We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.
import pandas as pd import fugue.api as fa def tr(df:pd.DataFrame) -> pd.DataFrame: return df.assign(c=2) input = pd.DataFrame([[0,1],[3.4]], columns=["a","b"]) with fa.engine_context("duckdb"): res = fa.fugue_sql(''' SELECT * FROM input WHERE a<{{x}} TRANSFORM USING tr SCHEMA *,c:int ''', x=2) assert fa.as_array(res) == [[0,1,2]]
- fugue.api.fugue_sql_flow(query, *args, fsql_ignore_case=None, fsql_dialect=None, **kwargs)[source]#
Fugue SQL full functional interface. This function allows full workflow definition using Fugue SQL, and it allows multiple outputs using
YIELD
.- Parameters
query (str) – the Fugue SQL string (can be a jinja template)
args (Any) – variables related to the SQL string
fsql_ignore_case (Optional[bool]) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).
fsql_dialect (Optional[str]) – the dialect of this fsql, defaults to None (it depends on the engine/global config).
kwargs (Any) – variables related to the SQL string
- Returns
the translated Fugue workflow
- Return type
Note
This function is different from
raw_sql()
which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates aFugueSQLWorkflow
which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.
Caution
Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.
That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.
We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.
import fugue.api.fugue_sql_flow as fsql import fugue.api as fa # Basic case fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run() # With external data sources df = pd.DataFrame([[0],[1]], columns=["a"]) fsql(''' SELECT * FROM df WHERE a=0 PRINT ''').run() # With external variables df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' SELECT * FROM df WHERE a={{t}} PRINT ''').run() # The following is the explicit way to specify variables and datafrems # (recommended) df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' SELECT * FROM df WHERE a={{t}} PRINT ''', df=df, t=t).run() # Using extensions def dummy(df:pd.DataFrame) -> pd.DataFrame: return df fsql(''' CREATE [[0]] SCHEMA a:int TRANSFORM USING dummy SCHEMA * PRINT ''').run() # It's recommended to provide full path of the extension inside # Fugue SQL, so the SQL definition and exeuction can be more # independent from the extension definition. # Run with different execution engines sql = ''' CREATE [[0]] SCHEMA a:int TRANSFORM USING dummy SCHEMA * PRINT ''' fsql(sql).run(spark_session) fsql(sql).run("dask") with fa.engine_context("duckdb"): fsql(sql).run() # Passing dataframes between fsql calls result = fsql(''' CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x CREATE [[1]] SCHEMA a:int YIELD DATAFRAME AS y ''').run(DaskExecutionEngine) fsql(''' SELECT * FROM x UNION SELECT * FROM y UNION SELECT * FROM z PRINT ''', result, z=pd.DataFrame([[2]], columns=["z"])).run() # Get framework native dataframes result["x"].native # Dask dataframe result["y"].native # Dask dataframe result["x"].as_pandas() # Pandas dataframe # Use lower case fugue sql df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' select * from df where a={{t}} print ''', df=df, t=t, fsql_ignore_case=True).run()
- fugue.api.raw_sql(*statements, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Run raw SQL on the execution engine
- Parameters
statements (Any) – a sequence of sub-statements in string or dataframe-like objects
engine (Optional[Any]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether return a local dataframe, defaults to False
- Returns
the result dataframe
- Return type
AnyDataFrame
Caution
Currently, only
SELECT
statements are supportedExamples
import pandas as pd import fugue.api as fa with fa.engine_context("duckdb"): a = fa.as_fugue_df([[0,1]], schema="a:long,b:long") b = pd.DataFrame([[0,10]], columns=["a","b"]) c = fa.raw_sql("SELECT * FROM",a,"UNION SELECT * FROM",b) fa.as_pandas(c)
- fugue.api.join(df1, df2, *dfs, how, on=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
Note
Please read
get_join_schemas()
- fugue.api.semi_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left semi-join two dataframes. This is a wrapper of
join()
withhow="semi"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.anti_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left anti-join two dataframes. This is a wrapper of
join()
withhow="anti"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.inner_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Inner join two dataframes. This is a wrapper of
join()
withhow="inner"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.left_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left outer join two dataframes. This is a wrapper of
join()
withhow="left_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.right_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Right outer join two dataframes. This is a wrapper of
join()
withhow="right_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.full_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Full outer join two dataframes. This is a wrapper of
join()
withhow="full_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.cross_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Cross join two dataframes. This is a wrapper of
join()
withhow="cross"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.api.union(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to union
distinct (bool) –
true
forUNION
(==UNION DISTINCT
),false
forUNION ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all dataframes must be identical, or an exception will be thrown.
- fugue.api.intersect(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Intersect
df1
anddf2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to intersect with
distinct (bool) –
true
forINTERSECT
(==INTERSECT DISTINCT
),false
forINTERSECT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- fugue.api.subtract(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
df1 - df2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to subtract
distinct (bool) –
true
forEXCEPT
(==EXCEPT DISTINCT
),false
forEXCEPT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all datafrmes must be identical, or an exception will be thrown.
- fugue.api.assign(df, engine=None, engine_conf=None, as_fugue=False, as_local=False, **columns)[source]#
Update existing columns with new values and add new columns
- Parameters
df (AnyDataFrame) – the dataframe to set columns
columns (Any) – column expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the updated dataframe
- Return type
AnyDataFrame
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa # assume df has schema: a:int,b:str with fa.engine_context("duckdb"): # add constant column x fa.assign(df, x=1) # change column b to be a constant integer fa.assign(df, b=1) # add new x to be a+b fa.assign(df, x=col("a")+col("b")) # cast column a data type to double fa.assign(df, a=col("a").cast(float))
- fugue.api.select(df, *columns, where=None, having=None, distinct=False, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
The functional interface for SQL select statement
- Parameters
df (AnyDataFrame) – the dataframe to be operated on
columns (Union[str, ColumnExpr]) – column expressions, for strings they will represent the column names
where (Optional[ColumnExpr]) –
WHERE
condition expression, defaults to Nonehaving (Optional[ColumnExpr]) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to Nonedistinct (bool) – whether to return distinct result, defaults to False
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the select result as a dataframe
- Return type
AnyDataFrame
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, lit, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # select existed and new columns fa.select(df, col("a"),col("b"),lit(1,"another")) fa.select(df, col("a"),(col("b")+lit(1)).alias("x")) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df fa.select( df, f.count_distinct(all_cols()).alias("x")) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x")) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x"), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- fugue.api.filter(df, condition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Filter rows by the given condition
- Parameters
df (AnyDataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the filtered dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): fa.filter(df, (col("a")>1) & (col("b")=="x")) fa.filter(df, f.coalesce(col("a"),col("b"))>1)
- fugue.api.aggregate(df, partition_by=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **agg_kwcols)[source]#
Aggregate on dataframe
- Parameters
df (AnyDataFrame) – the dataframe to aggregate on
partition_by (Union[None, str, List[str]]) – partition key(s), defaults to None
agg_kwcols (ColumnExpr) – aggregation expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the aggregated result as a dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # SELECT MAX(b) AS b FROM df fa.aggregate(df, b=f.max(col("b"))) # SELECT a, MAX(b) AS x FROM df GROUP BY a fa.aggregate(df, "a", x=f.max(col("b")))
Conversion#
- fugue.api.as_local(data)[source]#
Convert the dataset to a local dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
AnyDataset
- fugue.api.as_local_bounded(data)[source]#
Convert the dataset to a local bounded dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
- Return type
AnyDataset
- fugue.api.as_array(df, columns=None, type_safe=False)[source]#
Convert df to 2-dimensional native python array
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- fugue.api.as_array_iterable(df, columns=None, type_safe=False)[source]#
Convert df to iterable of native python arrays
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- fugue.api.as_arrow(df)[source]#
Convert
df
to a PyArrow Table- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
- Returns
the PyArrow Table
- Return type
- fugue.api.as_dict_iterable(df, columns=None)[source]#
Convert df to iterable of native python dicts
- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
columns (Optional[List[str]]) – columns to extract, defaults to None
- Returns
iterable of native python dicts
- Return type
Iterable[Dict[str, Any]]
Note
The default implementation enforces
type_safe
True
- fugue.api.as_pandas(df)[source]#
Convert
df
to a Pandas DataFrame- Parameters
df (AnyDataFrame) – the object that can be recognized as a dataframe by Fugue
- Returns
the Pandas DataFrame
- Return type
- fugue.api.get_native_as_df(df)[source]#
Return the dataframe form of the input
df
. Ifdf
is aDataFrame
, then call thenative_as_df()
, otherwise, it depends on whether there is a correspondent function handling it.- Parameters
df (AnyDataFrame) –
- Return type
AnyDataFrame
ExecutionEngine#
- fugue.api.engine_context(engine=None, engine_conf=None, infer_by=None)[source]#
Make an execution engine and set it as the context engine. This function is thread safe and async safe.
- Parameters
engine (AnyExecutionEngine) – an engine like object, defaults to None
engine_conf (Any) – the configs for the engine, defaults to None
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
- Return type
Iterator[ExecutionEngine]
Note
For more details, please read
make_execution_engine()
Examples
import fugue.api as fa with fa.engine_context(spark_session): transform(df, func) # will use spark in this transformation
- fugue.api.set_global_engine(engine, engine_conf=None)[source]#
Make an execution engine and set it as the global execution engine
- Parameters
engine (AnyExecutionEngine) – an engine like object, must not be None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
Caution
In general, it is not a good practice to set a global engine. You should consider
engine_context()
instead. The exception is when you iterate in a notebook and cross cells, this could simplify the code.Note
For more details, please read
make_execution_engine()
andset_global()
Examples
import fugue.api as fa fa.set_global_engine(spark_session) transform(df, func) # will use spark in this transformation fa.clear_global_engine() # remove the global setting
- fugue.api.clear_global_engine()[source]#
Remove the global exeuction engine (if set)
- Return type
None
- fugue.api.get_context_engine()[source]#
Get the execution engine in the current context. Regarding the order of the logic please read
make_execution_engine()
- Return type
Big Data Operations#
- fugue.api.broadcast(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Broadcast the dataframe to all workers of a distributed computing backend
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine-like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the broadcasted dataframe
- Return type
AnyDataFrame
- fugue.api.persist(df, lazy=False, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the persisted dataframe
- Return type
AnyDataFrame
- fugue.api.repartition(df, partition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Partition the input dataframe using
partition
.- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
partition (PartitionSpec) – how you want to partition the dataframe
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the repartitioned dataframe
- Return type
AnyDataFrame
Caution
This function is experimental, and may be removed in the future.
Development#
- fugue.api.run_engine_function(func, engine=None, engine_conf=None, as_fugue=False, as_local=False, infer_by=None)[source]#
Run a lambda function based on the engine provided
- Parameters
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
func (Callable[[ExecutionEngine], Any]) –
- Returns
None or a Fugue
DataFrame
ifas_fugue
is True, otherwise ifinfer_by
contains any Fugue DataFrame, then return the Fugue DataFrame, otherwise it returns the underlying dataframe usingnative_as_df()
- Return type
Any
Note
This function is for deveopment use. Users should not need it.
API Reference#
fugue#
fugue.bag#
fugue.bag.array_bag#
- class fugue.bag.array_bag.ArrayBag(data, copy=True)[source]#
Bases:
LocalBoundedBag
- Parameters
data (Any) –
copy (bool) –
- as_array()[source]#
Convert to a native python array
- Returns
the native python array
- Return type
List[Any]
- property empty: bool#
Whether this dataframe is empty
- head(n)[source]#
Take the first n elements
- Returns
the python array of the first n elements
- Parameters
n (int) –
- Return type
- property native: List[Any]#
The underlying Python list object
- peek()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
Any
fugue.bag.bag#
- class fugue.bag.bag.Bag[source]#
Bases:
Dataset
The base class of Fugue Bags. Bag contains a collection of unordered objects.
- abstract as_array()[source]#
Convert to a native python array
- Returns
the native python array
- Return type
List[Any]
- abstract as_local_bounded()[source]#
Convert this bag to a
LocalBoundedBag
- Return type
- abstract head(n)[source]#
Take the first n elements
- Returns
the python array of the first n elements
- Parameters
n (int) –
- Return type
- abstract peek()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
Any
- class fugue.bag.bag.BagDisplay(ds)[source]#
Bases:
DatasetDisplay
Bag
plain display class- Parameters
ds (Dataset) –
- class fugue.bag.bag.LocalBag[source]#
Bases:
Bag
- property is_local: bool#
Whether this dataframe is a local Dataset
- property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
fugue.collections#
fugue.collections.partition#
- class fugue.collections.partition.BagPartitionCursor(physical_partition_no)[source]#
Bases:
DatasetPartitionCursor
The cursor pointing at the first bag item of each logical partition inside a physical partition.
It’s important to understand the concept of partition, please read the Partition Tutorial
- Parameters
physical_partition_no (int) – physical partition number passed in by
ExecutionEngine
- class fugue.collections.partition.DatasetPartitionCursor(physical_partition_no)[source]#
Bases:
object
The cursor pointing at the first item of each logical partition inside a physical partition.
It’s important to understand the concept of partition, please read the Partition Tutorial
- Parameters
physical_partition_no (int) – physical partition number passed in by
ExecutionEngine
- property item: Any#
Get current item
- property partition_no: int#
Logical partition number
- property physical_partition_no: int#
Physical partition number
- set(item, partition_no, slice_no)[source]#
reset the cursor to a row (which should be the first row of a new logical partition)
- Parameters
item (Any) – an item of the dataset, or an function generating the item
partition_no (int) – logical partition number
slice_no (int) – slice number inside the logical partition (to be deprecated)
- Return type
None
- property slice_no: int#
Slice number (inside the current logical partition), for now it should always be 0
- class fugue.collections.partition.PartitionCursor(schema, spec, physical_partition_no)[source]#
Bases:
DatasetPartitionCursor
The cursor pointing at the first row of each logical partition inside a physical partition.
It’s important to understand the concept of partition, please read the Partition Tutorial
- Parameters
schema (Schema) – input dataframe schema
spec (PartitionSpec) – partition spec
physical_partition_no (int) – physical partition number passed in by
ExecutionEngine
- property key_value_array: List[Any]#
Based on current row, get the partition key values as an array
- property key_value_dict: Dict[str, Any]#
Based on current row, get the partition key values as a dict
- property row: List[Any]#
Get current row data
- set(row, partition_no, slice_no)[source]#
reset the cursor to a row (which should be the first row of a new logical partition)
- Parameters
row (Any) – list-like row data or a function generating a list-like row
partition_no (int) – logical partition number
slice_no (int) – slice number inside the logical partition (to be deprecated)
- Return type
None
- class fugue.collections.partition.PartitionSpec(*args, **kwargs)[source]#
Bases:
object
Fugue Partition Specification.
Examples
>>> PartitionSepc(num=4) >>> PartitionSepc(4) # == PartitionSepc(num=4) >>> PartitionSepc(num="ROWCOUNT/4 + 3") # It can be an expression >>> PartitionSepc(by=["a","b"]) >>> PartitionSepc(["a","b"]) # == PartitionSepc(by=["a","b"]) >>> PartitionSpec(by=["a"], presort="b DESC, c ASC") >>> PartitionSpec(algo="even", num=4) >>> p = PartitionSpec(num=4, by=["a"]) >>> p_override = PartitionSpec(p, by=["a","b"], algo="even") >>> PartitionSpec(by="a") # == PartitionSpec(by=["a"]) >>> PartitionSpec("a") # == PartitionSpec(by=["a"]) >>> PartitionSpec("per_row") # == PartitionSpec(num="ROWCOUNT", algo="even")
It’s important to understand this concept, please read the Partition Tutorial
Partition consists for these specs:
algo: can be one of
hash
(default),rand
,even
orcoarse
num or num_partitions: number of physical partitions, it can be an expression or integer numbers, e.g
(ROWCOUNT+4) / 3
by or partition_by: keys to partition on
presort: keys to sort other than partition keys. E.g.
a
anda asc
means presort by column a ascendingly,a,b desc
means presort by a ascendingly and then by b descendingly.row_limit and size_limit are to be deprecated
- Parameters
args (Any) –
kwargs (Any) –
- property algo: str#
Get algo of the spec, one of
hash
(default),rand
even
orcoarse
- property empty: bool#
Whether this spec didn’t specify anything
- get_cursor(schema, physical_partition_no)[source]#
Get
PartitionCursor
based on dataframe schema and physical partition number. You normally don’t call this method directly- Parameters
schema (Schema) – the dataframe schema this partition spec to operate on
physical_partition_no (int) – physical partition no passed in by
ExecutionEngine
- Returns
PartitionCursor object
- Return type
- get_num_partitions(**expr_map_funcs)[source]#
Convert
num_partitions
expression to int number- Parameters
expr_map_funcs (Any) – lambda functions (no parameter) for keywords
- Returns
integer value of the partitions
- Return type
int
Examples
>>> p = PartitionSpec(num="ROWCOUNT/2") >>> p.get_num_partitions(ROWCOUNT=lambda: df.count())
- get_partitioner(schema)[source]#
Get
SchemaedDataPartitioner
by input dataframe schema- Parameters
schema (Schema) – the dataframe schema this partition spec to operate on
- Returns
SchemaedDataPartitioner object
- Return type
- get_sorts(schema, with_partition_keys=True)[source]#
Get keys for sorting in a partition, it’s the combination of partition keys plus the presort keys
- Parameters
schema (Schema) – the dataframe schema this partition spec to operate on
with_partition_keys (bool) – whether to include partition keys
- Returns
an ordered dictionary of key, order pairs
- Return type
IndexedOrderedDict[str, bool]
Examples
>>> p = PartitionSpec(by=["a"],presort="b , c dESc") >>> schema = Schema("a:int,b:int,c:int,d:int")) >>> assert p.get_sorts(schema) == {"a":True, "b":True, "c": False}
- property num_partitions: str#
Number of partitions, it can be a string expression or int
- property partition_by: List[str]#
Get partition keys of the spec
- property presort: IndexedOrderedDict[str, bool]#
Get presort pairs of the spec
Examples
>>> p = PartitionSpec(by=["a"],presort="b,c desc") >>> assert p.presort == {"b":True, "c":False}
- property presort_expr: str#
Get normalized presort expression
Examples
>>> p = PartitionSpec(by=["a"],presort="b , c dESc") >>> assert p.presort_expr == "b ASC,c DESC"
- fugue.collections.partition.parse_presort_exp(presort)[source]#
Returns ordered column sorting direction where ascending order would return as true, and descending as false.
- Parameters
presort (Any) – string that contains column and sorting direction or list of tuple that contains column and boolean sorting direction
- Returns
column and boolean sorting direction of column, order matters.
- Return type
IndexedOrderedDict[str, bool]
Examples
>>> parse_presort_exp("b desc, c asc") >>> parse_presort_exp([("b", True), ("c", False))]) both return IndexedOrderedDict([("b", True), ("c", False))])
fugue.collections.sql#
- class fugue.collections.sql.StructuredRawSQL(statements, dialect=None)[source]#
Bases:
object
The Raw SQL object containing table references and dialect information.
- Parameters
statements (Iterable[Tuple[bool, str]]) – In each tuple, the first value indicates whether the second value is a dataframe name reference (True), or just a part of the statement (False)
dialect (Optional[str]) – the dialect of the statements, defaults to None
Note
dialect
None means no transpilation will be done when constructing the final sql.- construct(name_map=None, dialect=None, log=None)[source]#
Construct the final SQL given the
dialect
- Parameters
name_map (Union[None, Callable[[str], str], Dict[str, str]]) – the name map from the original statement to the expected names, defaults to None. It can be a function or a dictionary
dialect (Optional[str]) – the expected dialect, defaults to None
log (Optional[Logger]) – the logger to log information, defaults to None
- Returns
the final SQL string
- property dialect: Optional[str]#
The dialect of this query
- static from_expr(sql, prefix='<tmpdf:', suffix='>', dialect=None)[source]#
Parse the
StructuredRawSQL
from thesql
expression. The sql should look likeSELECT * FROM <tmpdf:dfname>
. This function can identify the tmpdfs with the given syntax, and construct theStructuredRawSQL
- Parameters
sql (str) – the SQL expression with
<tmpdf:?>
prefix (str) – the prefix of the temp df
suffix (str) – the suffix of the temp df
dialect (Optional[str]) – the dialect of the sql expression, defaults to None
- Returns
the parsed object
- Return type
fugue.collections.yielded#
- class fugue.collections.yielded.PhysicalYielded(yid, storage_type)[source]#
Bases:
Yielded
Physical yielded object from
FugueWorkflow
. Users shouldn’t create this object directly.- Parameters
yid (str) – unique id for determinism
storage_type (str) –
file
ortable
- property is_set: bool#
Whether the value is set. It can be false if the parent workflow has not been executed.
- property name: str#
The name reference of the yield
- set_value(name)[source]#
Set the storage name after compute
- Parameters
name (str) – name reference of the storage
- Return type
None
- property storage_type: str#
The storage type of this yield
- class fugue.collections.yielded.Yielded(yid)[source]#
Bases:
object
Yields from
FugueWorkflow
. Users shouldn’t create this object directly.- Parameters
yid (str) – unique id for determinism
- property is_set: bool#
Whether the value is set. It can be false if the parent workflow has not been executed.
fugue.column#
fugue.column.expressions#
- class fugue.column.expressions.ColumnExpr[source]#
Bases:
object
Fugue column expression class. It is inspired from
pyspark.sql.Column
and it is working in progress.New Since
0.6.0
Caution
This is a base class of different column classes, and users are not supposed to construct this class directly. Use
col()
andlit()
instead.- alias(as_name)[source]#
Assign or remove alias of a column. To remove, set
as_name
to empty- Returns
a new column with the alias value
- Parameters
as_name (str) –
- Return type
Examples
assert "b" == col("a").alias("b").as_name assert "x" == (col("a") * 2).alias("x").as_name assert "" == col("a").alias("b").alias("").as_name
- property as_name: str#
The name assigned by
alias()
- Returns
the alias
Examples
assert "" == col("a").as_name assert "b" == col("a").alias("b").as_name assert "x" == (col("a") * 2).alias("x").as_name
- property as_type: Optional[DataType]#
The type assigned by
cast()
- Returns
the pyarrow datatype if
cast()
was called otherwise None
Examples
import pyarrow as pa assert col("a").as_type is None assert pa.int64() == col("a").cast(int).as_type assert pa.string() == (col("a") * 2).cast(str).as_type
- property body_str: str#
The string expression of this column without cast type and alias. This is only used for debug purpose. It is not SQL expression.
- Returns
the string expression
- cast(data_type)[source]#
Cast the column to a new data type
- Parameters
data_type (Any) – It can be string expressions, python primitive types, python datetime.datetime and pyarrow types. For details read Fugue Data Types
- Returns
a new column instance with the assigned data type
- Return type
Caution
Currently, casting to struct or list type has undefined behavior.
Examples
import pyarrow as pa assert pa.int64() == col("a").cast(int).as_type assert pa.string() == col("a").cast(str).as_type assert pa.float64() == col("a").cast(float).as_type assert pa._bool() == col("a").cast(bool).as_type # string follows the type expression of Triad Schema assert pa.int32() == col("a").cast("int").as_type assert pa.int32() == col("a").cast("int32").as_type assert pa.int32() == col("a").cast(pa.int32()).as_type
- infer_alias()[source]#
Infer alias of a column. If the column’s
output_name()
is not empty then it returns itself without change. Otherwise it tries to infer alias from the underlying columns.- Returns
a column instance with inferred alias
- Return type
Caution
Users should not use it directly.
Examples
import fugue.column.functions as f assert "a" == col("a").infer_alias().output_name assert "" == (col("a") * 2).infer_alias().output_name assert "a" == col("a").is_null().infer_alias().output_name assert "a" == f.max(col("a").is_null()).infer_alias().output_name
- infer_type(schema)[source]#
Infer data type of this column given the input schema
- Parameters
schema (Schema) – the schema instance to infer from
- Returns
a pyarrow datatype or None if failed to infer
- Return type
Optional[DataType]
Caution
Users should not use it directly.
Examples
import pyarrow as pa from triad import Schema import fugue.column.functions as f schema = Schema("a:int,b:str") assert pa.int32() == col("a").infer_schema(schema) assert pa.int32() == (-col("a")).infer_schema(schema) # due to overflow risk, can't infer certain operations assert (col("a")+1).infer_schema(schema) is None assert (col("a")+col("a")).infer_schema(schema) is None assert pa.int32() == f.max(col("a")).infer_schema(schema) assert pa.int32() == f.min(col("a")).infer_schema(schema) assert f.sum(col("a")).infer_schema(schema) is None
- is_null()[source]#
Same as SQL
<col> IS NULL
.- Returns
a new column with the boolean values
- Return type
- property name: str#
The original name of this column, default empty
- Returns
the name
Examples
assert "a" == col("a").name assert "b" == col("a").alias("b").name assert "" == lit(1).name assert "" == (col("a") * 2).name
- fugue.column.expressions.col(obj, alias='')[source]#
Convert the
obj
to aColumnExpr
object- Parameters
obj (Union[str, ColumnExpr]) – a string representing a column name or a
ColumnExpr
objectalias (str) – the alias of this column, defaults to “” (no alias)
- Returns
a literal column expression
- Return type
New Since
0.6.0
Examples
import fugue.column import col import fugue.column.functions as f col("a") col("a").alias("x") col("a", "x") # unary operations -col("a") # negative value of a ~col("a") # NOT a col("a").is_null() # a IS NULL col("a").not_null() # a IS NOT NULL # binary operations col("a") + 1 # col("a") + lit(1) 1 - col("a") # lit(1) - col("a") col("a") * col("b") col("a") / col("b") # binary boolean expressions col("a") == 1 # col("a") == lit(1) 2 != col("a") # col("a") != lit(2) col("a") < 5 col("a") > 5 col("a") <= 5 col("a") >= 5 (col("a") < col("b")) & (col("b") > 1) | col("c").is_null() # with functions f.max(col("a")) f.max(col("a")+col("b")) f.max(col("a")) + f.min(col("b")) f.count_distinct(col("a")).alias("dcount")
- fugue.column.expressions.function(name, *args, arg_distinct=False, **kwargs)[source]#
Construct a function expression
- Parameters
name (str) – the name of the function
arg_distinct (bool) – whether to add
DISTINCT
before all arguments, defaults to Falseargs (Any) –
- Returns
the function expression
- Return type
Caution
Users should not use this directly
- fugue.column.expressions.lit(obj, alias='')[source]#
Convert the
obj
to a literal column. Currentlyobj
must beint
,bool
,float
orstr
, otherwise an exception will be raised- Parameters
obj (Any) – an arbitrary value
alias (str) – the alias of this literal column, defaults to “” (no alias)
- Returns
a literal column expression
- Return type
New Since
0.6.0
Examples
import fugue.column import lit lit("abc") lit(100).alias("x") lit(100, "x")
fugue.column.functions#
- fugue.column.functions.avg(col)[source]#
SQL
AVG
function (aggregation)- Parameters
col (ColumnExpr) – the column to find average
- Return type
Note
this function cannot infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f f.avg(col("a")) # AVG(a) AS a # you can specify explicitly # CAST(AVG(a) AS double) AS a f.avg(col("a")).cast(float)
- fugue.column.functions.coalesce(*args)[source]#
SQL
COALESCE
function- Parameters
args (Any) – If a value is not
ColumnExpr
then it’s converted to a literal column bycol()
- Return type
Note
this function can infer neither type nor alias
New Since
0.6.0
Examples
import fugue.column.functions as f f.coalesce(col("a"), col("b")+col("c"), 1)
- fugue.column.functions.count(col)[source]#
SQL
COUNT
function (aggregation)- Parameters
col (ColumnExpr) – the column to find count
- Return type
Note
this function cannot infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f f.count(all_cols()) # COUNT(*) f.count(col("a")) # COUNT(a) AS a # you can specify explicitly # CAST(COUNT(a) AS double) AS a f.count(col("a")).cast(float)
- fugue.column.functions.count_distinct(col)[source]#
SQL
COUNT DISTINCT
function (aggregation)- Parameters
col (ColumnExpr) – the column to find distinct element count
- Return type
Note
this function cannot infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f f.count_distinct(all_cols()) # COUNT(DISTINCT *) f.count_distinct(col("a")) # COUNT(DISTINCT a) AS a # you can specify explicitly # CAST(COUNT(DISTINCT a) AS double) AS a f.count_distinct(col("a")).cast(float)
- fugue.column.functions.first(col)[source]#
SQL
FIRST
function (aggregation)- Parameters
col (ColumnExpr) – the column to find first
- Return type
Note
this function can infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f # assume col a has type double f.first(col("a")) # CAST(FIRST(a) AS double) AS a f.first(-col("a")) # CAST(FIRST(-a) AS double) AS a # neither type nor alias can be inferred in the following cases f.first(col("a")+1) f.first(col("a")+col("b")) # you can specify explicitly # CAST(FIRST(a+b) AS int) AS x f.first(col("a")+col("b")).cast(int).alias("x")
- fugue.column.functions.is_agg(column)[source]#
Check if a column contains aggregation operation
- Parameters
col – the column to check
column (Any) –
- Returns
whether the column is
ColumnExpr
and contains aggregation operations- Return type
bool
New Since
0.6.0
Examples
import fugue.column.functions as f assert not f.is_agg(1) assert not f.is_agg(col("a")) assert not f.is_agg(col("a")+lit(1)) assert f.is_agg(f.max(col("a"))) assert f.is_agg(-f.max(col("a"))) assert f.is_agg(f.max(col("a")+1)) assert f.is_agg(f.max(col("a"))+f.min(col("a"))))
- fugue.column.functions.last(col)[source]#
SQL
LAST
function (aggregation)- Parameters
col (ColumnExpr) – the column to find last
- Return type
Note
this function can infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f # assume col a has type double f.last(col("a")) # CAST(LAST(a) AS double) AS a f.last(-col("a")) # CAST(LAST(-a) AS double) AS a # neither type nor alias can be inferred in the following cases f.last(col("a")+1) f.last(col("a")+col("b")) # you can specify explicitly # CAST(LAST(a+b) AS int) AS x f.last(col("a")+col("b")).cast(int).alias("x")
- fugue.column.functions.max(col)[source]#
SQL
MAX
function (aggregation)- Parameters
col (ColumnExpr) – the column to find max
- Return type
Note
this function can infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f # assume col a has type double f.max(col("a")) # CAST(MAX(a) AS double) AS a f.max(-col("a")) # CAST(MAX(-a) AS double) AS a # neither type nor alias can be inferred in the following cases f.max(col("a")+1) f.max(col("a")+col("b")) # you can specify explicitly # CAST(MAX(a+b) AS int) AS x f.max(col("a")+col("b")).cast(int).alias("x")
- fugue.column.functions.min(col)[source]#
SQL
MIN
function (aggregation)- Parameters
col (ColumnExpr) – the column to find min
- Return type
Note
this function can infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f # assume col a has type double f.min(col("a")) # CAST(MIN(a) AS double) AS a f.min(-col("a")) # CAST(MIN(-a) AS double) AS a # neither type nor alias can be inferred in the following cases f.min(col("a")+1) f.min(col("a")+col("b")) # you can specify explicitly # CAST(MIN(a+b) AS int) AS x f.min(col("a")+col("b")).cast(int).alias("x")
- fugue.column.functions.sum(col)[source]#
SQL
SUM
function (aggregation)- Parameters
col (ColumnExpr) – the column to find sum
- Return type
Note
this function cannot infer type from
col
typethis function can infer alias from
col
’s inferred alias
New Since
0.6.0
Examples
import fugue.column.functions as f f.sum(col("a")) # SUM(a) AS a # you can specify explicitly # CAST(SUM(a) AS double) AS a f.sum(col("a")).cast(float)
fugue.column.sql#
- class fugue.column.sql.SQLExpressionGenerator(enable_cast=True)[source]#
Bases:
object
SQL generator for
SelectColumns
- Parameters
enable_cast (bool) – whether convert
cast
into the statement, defaults to True
New Since
0.6.0
- add_func_handler(name, handler)[source]#
Add special function handler.
- Parameters
name (str) – name of the function
handler (Callable[[_FuncExpr], Iterable[str]]) – the function to convert the function expression to SQL clause
- Returns
the instance itself
- Return type
Caution
Users should not use this directly
- correct_select_schema(input_schema, select, output_schema)[source]#
Do partial schema inference from
input_schema
andselect
columns, then compare with the SQL output dataframe schema, and return the different part as a new schema, or None if there is no difference- Parameters
input_schema (Schema) – input dataframe schema for the select statement
select (SelectColumns) – the collection of select columns
output_schema (Schema) – schema of the output dataframe after executing the SQL
- Returns
the difference as a new schema or None if no difference
- Return type
Optional[Schema]
Tip
This is particularly useful when the SQL engine messed up the schema of the output. For example,
SELECT *
should return a dataframe with the same schema of the input. However, for example a columna:int
could becomea:long
in the output dataframe because of information loss. This function is designed to make corrections on column types when they can be inferred. This may not be perfect but it can solve major discrepancies.
- generate(expr)[source]#
Convert
ColumnExpr
to SQL clause- Parameters
expr (ColumnExpr) – the column expression to convert
- Returns
the SQL clause for this expression
- Return type
str
- select(columns, table, where=None, having=None)[source]#
Construct the full
SELECT
statement on a single table- Parameters
columns (SelectColumns) – columns to select, it may contain aggregations, if so, the group keys are inferred. See
group_keys()
table (str) – table name to select from
where (Optional[ColumnExpr]) –
WHERE
condition, defaults to Nonehaving (Optional[ColumnExpr]) –
HAVING
condition, defaults to None. It is used only when there is aggregation
- Returns
the full
SELECT
statement- Return type
Iterable[Tuple[bool, str]]
- where(condition, table)[source]#
Generate a
SELECT *
statement with the given where clause- Parameters
condition (ColumnExpr) – column expression for
WHERE
table (str) – table name for
FROM
- Returns
the SQL statement
- Raises
ValueError – if
condition
contains aggregation- Return type
Iterable[Tuple[bool, str]]
Examples
gen = SQLExpressionGenerator(enable_cast=False) # SELECT * FROM tb WHERE a>1 AND b IS NULL gen.where((col("a")>1) & col("b").is_null(), "tb")
- class fugue.column.sql.SelectColumns(*cols, arg_distinct=False)[source]#
Bases:
object
SQL
SELECT
columns collection.- Parameters
cols (ColumnExpr) – collection of
ColumnExpr
arg_distinct (bool) – whether this is
SELECT DISTINCT
, defaults to False
New Since
0.6.0
- property agg_funcs: List[ColumnExpr]#
All columns with aggregation operations
- property all_cols: List[ColumnExpr]#
All columns (with inferred aliases)
- assert_all_with_names()[source]#
Assert every column have explicit alias or the alias can be inferred (non empty value). It will also validate there is no duplicated aliases
- Raises
ValueError – if there are columns without alias, or there are duplicated names.
- Returns
the instance itself
- Return type
- assert_no_agg()[source]#
Assert there is no aggregation operation on any column.
- Raises
AssertionError – if there is any aggregation in the collection.
- Returns
the instance itself
- Return type
See also
Go to
is_agg()
to see how the aggregations are detected.
- assert_no_wildcard()[source]#
Assert there is no
*
on first level columns- Raises
AssertionError – if
all_cols()
exists- Returns
the instance itself
- Return type
- property group_keys: List[ColumnExpr]#
Group keys inferred from the columns.
Note
if there is no aggregation, the result will be empty
it is
simple_cols()
plusnon_agg_funcs()
- property has_agg: bool#
Whether this select is an aggregation
- property has_literals: bool#
Whether this select contains literal columns
- property is_distinct: bool#
Whether this is a
SELECT DISTINCT
- property literals: List[ColumnExpr]#
All literal columns
- property non_agg_funcs: List[ColumnExpr]#
All columns with non-aggregation operations
- replace_wildcard(schema)[source]#
Replace wildcard
*
with explicit column names- Parameters
schema (Schema) – the schema used to parse the wildcard
- Returns
a new instance containing only explicit columns
- Return type
Note
It only replaces the top level
*
. For examplecount_distinct(all_cols())
will not be transformed because this*
is not first level.
- property simple: bool#
Whether this select contains only simple column representations
- property simple_cols: List[ColumnExpr]#
All columns directly representing column names
fugue.dataframe#
fugue.dataframe.api#
- fugue.dataframe.api.get_native_as_df(df)[source]#
Return the dataframe form of the input
df
. Ifdf
is aDataFrame
, then call thenative_as_df()
, otherwise, it depends on whether there is a correspondent function handling it.- Parameters
df (AnyDataFrame) –
- Return type
AnyDataFrame
- fugue.dataframe.api.normalize_column_names(df)[source]#
A generic function to normalize any dataframe’s column names to follow Fugue naming rules
Note
This is a temporary solution before
Schema
can take arbitrary namesExamples
[0,1]
=>{"_0":0, "_1":1}
["1a","2b"]
=>{"_1a":"1a", "_2b":"2b"}
["*a","-a"]
=>{"_a":"*a", "_a_1":"-a"}
- Parameters
df (AnyDataFrame) – a dataframe object
- Returns
the renamed dataframe and the rename operations as a dict that can undo the change
- Return type
Tuple[AnyDataFrame, Dict[str, Any]]
See also
fugue.dataframe.array_dataframe#
- class fugue.dataframe.array_dataframe.ArrayDataFrame(df=None, schema=None)[source]#
Bases:
LocalBoundedDataFrame
DataFrame that wraps native python 2-dimensional arrays. Please read the DataFrame Tutorial to understand the concept
- Parameters
df (Any) – 2-dimensional array, iterable of arrays, or
DataFrame
schema (Any) – Schema like object
Examples
>>> a = ArrayDataFrame([[0,'a'],[1,'b']],"a:int,b:str") >>> b = ArrayDataFrame(a)
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property native: List[Any]#
2-dimensional native python array
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue.dataframe.arrow_dataframe#
- class fugue.dataframe.arrow_dataframe.ArrowDataFrame(df=None, schema=None)[source]#
Bases:
LocalBoundedDataFrame
DataFrame that wraps
pyarrow.Table
. Please also read the DataFrame Tutorial to understand this Fugue concept- Parameters
df (Any) – 2-dimensional array, iterable of arrays,
pyarrow.Table
or pandas DataFrameschema (Any) – Schema like object
Examples
>>> ArrowDataFrame([[0,'a'],[1,'b']],"a:int,b:str") >>> ArrowDataFrame(schema = "a:int,b:int") # empty dataframe >>> ArrowDataFrame(pd.DataFrame([[0]],columns=["a"])) >>> ArrowDataFrame(ArrayDataFrame([[0]],"a:int).as_arrow())
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
- peek_dict()[source]#
Peek the first row of the dataframe as dict
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
Dict[str, Any]
fugue.dataframe.dataframe#
- class fugue.dataframe.dataframe.DataFrame(schema=None)[source]#
Bases:
Dataset
Base class of Fugue DataFrame. Please read the DataFrame Tutorial to understand the concept
- Parameters
schema (Any) – Schema like object
Note
This is an abstract class, and normally you don’t construct it by yourself unless you are implementing a new
ExecutionEngine
- abstract alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- abstract as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- abstract as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- as_dict_iterable(columns=None)[source]#
Convert to iterable of native python dicts
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
- Returns
iterable of native python dicts
- Return type
Iterable[Dict[str, Any]]
Note
The default implementation enforces
type_safe
True
- as_local()[source]#
Convert this dataframe to a
LocalDataFrame
- Return type
- abstract as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property columns: List[str]#
The column names of the dataframe
- drop(columns)[source]#
Drop certain columns and return a new dataframe
- Parameters
columns (List[str]) – columns to drop
- Raises
FugueDataFrameOperationError – if
columns
are not strictly contained by this dataframe, or it is the entire dataframe columns- Returns
a new dataframe removing the columns
- Return type
- get_info_str()[source]#
Get dataframe information (schema, type, metadata) as json string
- Returns
json string
- Return type
str
- abstract head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- abstract native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
AnyDataFrame
- abstract peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
- peek_dict()[source]#
Peek the first row of the dataframe as dict
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
Dict[str, Any]
- class fugue.dataframe.dataframe.DataFrameDisplay(ds)[source]#
Bases:
DatasetDisplay
DataFrame
plain display class- Parameters
ds (Dataset) –
- class fugue.dataframe.dataframe.LocalBoundedDataFrame(schema=None)[source]#
Bases:
LocalDataFrame
Base class of all local bounded dataframes. Please read this to understand the concept
- Parameters
schema (Any) – Schema like object
Note
This is an abstract class, and normally you don’t construct it by yourself unless you are implementing a new
ExecutionEngine
- property is_bounded: bool#
Always True because it’s a bounded dataframe
- class fugue.dataframe.dataframe.LocalDataFrame(schema=None)[source]#
Bases:
DataFrame
Base class of all local dataframes. Please read this to understand the concept
- Parameters
schema (Any) – a schema-like object
Note
This is an abstract class, and normally you don’t construct it by yourself unless you are implementing a new
ExecutionEngine
- property is_local: bool#
Always True because it’s a LocalDataFrame
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
AnyDataFrame
- property num_partitions: int#
Always 1 because it’s a LocalDataFrame
- class fugue.dataframe.dataframe.LocalUnboundedDataFrame(schema=None)[source]#
Bases:
LocalDataFrame
Base class of all local unbounded dataframes. Read this <https://fugue-tutorials.readthedocs.io/ en/latest/tutorials/advanced/schema_dataframes.html#DataFrame>`_ to understand the concept
- Parameters
schema (Any) – Schema like object
Note
This is an abstract class, and normally you don’t construct it by yourself unless you are implementing a new
ExecutionEngine
- as_local()[source]#
Convert this dataframe to a
LocalDataFrame
- Return type
- count()[source]#
- Raises
InvalidOperationError – You can’t count an unbounded dataframe
- Return type
int
- property is_bounded#
Always False because it’s an unbounded dataframe
- class fugue.dataframe.dataframe.YieldedDataFrame(yid)[source]#
Bases:
Yielded
Yielded dataframe from
FugueWorkflow
. Users shouldn’t create this object directly.- Parameters
yid (str) – unique id for determinism
- property is_set: bool#
Whether the value is set. It can be false if the parent workflow has not been executed.
fugue.dataframe.dataframe_iterable_dataframe#
- class fugue.dataframe.dataframe_iterable_dataframe.IterableArrowDataFrame(df=None, schema=None)[source]#
Bases:
LocalDataFrameIterableDataFrame
- Parameters
df (Any) –
schema (Any) –
- class fugue.dataframe.dataframe_iterable_dataframe.IterablePandasDataFrame(df=None, schema=None)[source]#
Bases:
LocalDataFrameIterableDataFrame
- Parameters
df (Any) –
schema (Any) –
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- class fugue.dataframe.dataframe_iterable_dataframe.LocalDataFrameIterableDataFrame(df=None, schema=None)[source]#
Bases:
LocalUnboundedDataFrame
DataFrame that wraps an iterable of local dataframes
- Parameters
df (Any) – an iterable of
DataFrame
. If any is not local, they will be converted toLocalDataFrame
byas_local()
schema (Any) – Schema like object, if it is provided, it must match the schema of the dataframes
Examples
def get_dfs(seq): yield IterableDataFrame([], "a:int,b:int") yield IterableDataFrame([[1, 10]], "a:int,b:int") yield ArrayDataFrame([], "a:int,b:str") df = LocalDataFrameIterableDataFrame(get_dfs()) for subdf in df.native: subdf.show()
Note
It’s ok to peek the dataframe, it will not affect the iteration, but it’s invalid to count.
schema
can be used when the iterable contains no dataframe. But if there is any dataframe,schema
must match the schema of the dataframes.For the iterable of dataframes, if there is any empty dataframe, they will be skipped and their schema will not matter. However, if all dataframes in the interable are empty, then the last empty dataframe will be used to set the schema.
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property native: EmptyAwareIterable[LocalDataFrame]#
Iterable of dataframes
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue.dataframe.dataframes#
- class fugue.dataframe.dataframes.DataFrames(*args, **kwargs)[source]#
Bases:
IndexedOrderedDict
[str
,DataFrame
]Ordered dictionary of DataFrames. There are two modes: with keys and without keys. If without key
_<n>
will be used as the key for each dataframe, and it will be treated as an array in Fugue framework.It’s a subclass of dict, so it supports all dict operations. It’s also ordered, so you can trust the order of keys and values.
The initialization is flexible
>>> df1 = ArrayDataFrame([[0]],"a:int") >>> df2 = ArrayDataFrame([[1]],"a:int") >>> dfs = DataFrames(df1,df2) # init as [df1, df2] >>> assert not dfs.has_key >>> assert df1 is dfs[0] and df2 is dfs[1] >>> dfs_array = list(dfs.values()) >>> dfs = DataFrames(a=df1,b=df2) # init as {a:df1, b:df2} >>> assert dfs.has_key >>> assert df1 is dfs[0] and df2 is dfs[1] # order is guaranteed >>> df3 = ArrayDataFrame([[1]],"b:int") >>> dfs2 = DataFrames(dfs, c=df3) # {a:df1, b:df2, c:df3} >>> dfs2 = DataFrames(dfs, df3) # invalid, because dfs has key, df3 doesn't >>> dfs2 = DataFrames(dict(a=df1,b=df2)) # init as {a:df1, b:df2} >>> dfs2 = DataFrames([df1,df2],df3) # init as [df1, df2, df3]
- Parameters
args (Any) –
kwargs (Any) –
- convert(func)[source]#
Create another DataFrames with the same structure, but all converted by
func
- Returns
the new DataFrames
- Parameters
- Return type
Examples
>>> dfs2 = dfs.convert(lambda df: df.as_local()) # convert all to local
- property has_key#
If this collection has key (dict-like) or not (list-like)
fugue.dataframe.function_wrapper#
- class fugue.dataframe.function_wrapper.DataFrameFunctionWrapper(func, params_re='.*', return_re='.*')[source]#
Bases:
FunctionWrapper
- Parameters
func (Callable) –
params_re (str) –
return_re (str) –
- property need_output_schema: Optional[bool]#
- class fugue.dataframe.function_wrapper.DataFrameParam(param)[source]#
Bases:
_DataFrameParamBase
- Parameters
param (Optional[Parameter]) –
- class fugue.dataframe.function_wrapper.LocalDataFrameParam(param)[source]#
Bases:
DataFrameParam
- Parameters
param (Optional[Parameter]) –
- count(df)[source]#
- Parameters
df (LocalDataFrame) –
- Return type
int
- to_output_df(output, schema, ctx)[source]#
- Parameters
output (LocalDataFrame) –
schema (Any) –
ctx (Any) –
- Return type
fugue.dataframe.iterable_dataframe#
- class fugue.dataframe.iterable_dataframe.IterableDataFrame(df=None, schema=None)[source]#
Bases:
LocalUnboundedDataFrame
DataFrame that wraps native python iterable of arrays. Please read the DataFrame Tutorial to understand the concept
- Parameters
df (Any) – 2-dimensional array, iterable of arrays, or
DataFrame
schema (Any) – Schema like object
Examples
>>> a = IterableDataFrame([[0,'a'],[1,'b']],"a:int,b:str") >>> b = IterableDataFrame(a)
Note
It’s ok to peek the dataframe, it will not affect the iteration, but it’s invalid operation to count
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property native: EmptyAwareIterable[Any]#
Iterable of native python arrays
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue.dataframe.pandas_dataframe#
- class fugue.dataframe.pandas_dataframe.PandasDataFrame(df=None, schema=None, pandas_df_wrapper=False)[source]#
Bases:
LocalBoundedDataFrame
DataFrame that wraps pandas DataFrame. Please also read the DataFrame Tutorial to understand this Fugue concept
- Parameters
df (Any) – 2-dimensional array, iterable of arrays or pandas DataFrame
schema (Any) – Schema like object
pandas_df_wrapper (bool) – if this is a simple wrapper, default False
Examples
>>> PandasDataFrame([[0,'a'],[1,'b']],"a:int,b:str") >>> PandasDataFrame(schema = "a:int,b:int") # empty dataframe >>> PandasDataFrame(pd.DataFrame([[0]],columns=["a"])) >>> PandasDataFrame(ArrayDataFrame([[0]],"a:int).as_pandas())
Note
If
pandas_df_wrapper
is True, then the constructor will not do any type check otherwise, it will enforce type according to the input schema after the construction- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue.dataframe.utils#
- fugue.dataframe.utils.deserialize_df(json_str, fs=None)[source]#
Deserialize json string to
LocalBoundedDataFrame
- Parameters
json_str (str) – json string containing the base64 data or a file path
fs (Optional[FileSystem]) –
FileSystem
, defaults to None
- Raises
ValueError – if the json string is invalid, not generated from
serialize_df()
- Returns
LocalBoundedDataFrame
ifjson_str
contains a dataframe or None if its valid but contains no data- Return type
Optional[LocalBoundedDataFrame]
- fugue.dataframe.utils.get_join_schemas(df1, df2, how, on)[source]#
Get
Schema
object after joiningdf1
anddf2
. Ifon
is not empty, it’s mainly for validation purpose.- Parameters
df1 (DataFrame) – first dataframe
df2 (DataFrame) – second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated agained the inferred keys.
- Returns
the pair key schema and schema after join
- Return type
Note
In Fugue, joined schema can always be inferred because it always uses the input dataframes’ common keys as the join keys. So you must make sure to
rename()
to input dataframes so they follow this rule.
- fugue.dataframe.utils.pickle_df(df)[source]#
Pickles a dataframe to bytes array. It firstly converts the dataframe local bounded, and then serialize the underlying data.
- Parameters
df (DataFrame) – input DataFrame
- Returns
pickled binary data
- Return type
bytes
Note
Be careful to use on large dataframes or non-local, un-materialized dataframes, it can be slow. You should always use
unpickle_df()
to deserialize.
- fugue.dataframe.utils.serialize_df(df, threshold=-1, file_path=None, fs=None)[source]#
Serialize input dataframe to base64 string or to file if it’s larger than threshold
- Parameters
df (Optional[DataFrame]) – input DataFrame
threshold (int) – file byte size threshold, defaults to -1
file_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than
threshold
), defaults to Nonefs (Optional[FileSystem]) –
FileSystem
, defaults to None
- Raises
InvalidOperationError – if file is large but
file_path
is not provided- Returns
a json string either containing the base64 data or the file path
- Return type
str
Note
If fs is not provided but it needs to write to disk, then it will use
open_fs()
to try to open the file to write.
- fugue.dataframe.utils.unpickle_df(stream)[source]#
Unpickles a dataframe from bytes array.
- Parameters
stream (bytes) – binary data
- Returns
unpickled dataframe
- Return type
Note
The data must be serialized by
pickle_df()
to deserialize.
fugue.dataset#
fugue.dataset.api#
- fugue.dataset.api.show(data, n=10, with_count=False, title=None)[source]#
Display the Dataset
- Parameters
data (AnyDataset) – the dataset that can be recognized by Fugue
n (int) – number of rows to print, defaults to 10
with_count (bool) – whether to show dataset count, defaults to False
title (Optional[str]) – title of the dataset, defaults to None
- Return type
None
Note
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need tofugue.execution.execution_engine.ExecutionEngine.persist()
the dataset.
fugue.dataset.dataset#
- class fugue.dataset.dataset.Dataset[source]#
Bases:
ABC
The base class of Fugue
DataFrame
andBag
.Note
This is for internal use only.
- assert_not_empty()[source]#
Assert this dataframe is not empty
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
None
- abstract property empty: bool#
Whether this dataframe is empty
- property has_metadata: bool#
Whether this dataframe contains any metadata
- abstract property is_bounded: bool#
Whether this dataframe is bounded
- abstract property is_local: bool#
Whether this dataframe is a local Dataset
- abstract property native: Any#
The native object this Dataset class wraps
- abstract property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
- show(n=10, with_count=False, title=None)[source]#
Display the Dataset
- Parameters
n (int) – number of rows to print, defaults to 10
with_count (bool) – whether to show dataset count, defaults to False
title (Optional[str]) – title of the dataset, defaults to None
- Return type
None
Note
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need tofugue.execution.execution_engine.ExecutionEngine.persist()
the dataset.
- class fugue.dataset.dataset.DatasetDisplay(ds)[source]#
Bases:
ABC
The base class for display handlers of
Dataset
- Parameters
ds (Dataset) – the Dataset
- repr()[source]#
The string representation of the
Dataset
- Returns
the string representation
- Return type
str
fugue.execution#
fugue.execution.api#
- fugue.execution.api.aggregate(df, partition_by=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **agg_kwcols)[source]#
Aggregate on dataframe
- Parameters
df (AnyDataFrame) – the dataframe to aggregate on
partition_by (Union[None, str, List[str]]) – partition key(s), defaults to None
agg_kwcols (ColumnExpr) – aggregation expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the aggregated result as a dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # SELECT MAX(b) AS b FROM df fa.aggregate(df, b=f.max(col("b"))) # SELECT a, MAX(b) AS x FROM df GROUP BY a fa.aggregate(df, "a", x=f.max(col("b")))
- fugue.execution.api.anti_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left anti-join two dataframes. This is a wrapper of
join()
withhow="anti"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.assign(df, engine=None, engine_conf=None, as_fugue=False, as_local=False, **columns)[source]#
Update existing columns with new values and add new columns
- Parameters
df (AnyDataFrame) – the dataframe to set columns
columns (Any) – column expressions
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the updated dataframe
- Return type
AnyDataFrame
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa # assume df has schema: a:int,b:str with fa.engine_context("duckdb"): # add constant column x fa.assign(df, x=1) # change column b to be a constant integer fa.assign(df, b=1) # add new x to be a+b fa.assign(df, x=col("a")+col("b")) # cast column a data type to double fa.assign(df, a=col("a").cast(float))
- fugue.execution.api.broadcast(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Broadcast the dataframe to all workers of a distributed computing backend
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine-like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the broadcasted dataframe
- Return type
AnyDataFrame
- fugue.execution.api.clear_global_engine()[source]#
Remove the global exeuction engine (if set)
- Return type
None
- fugue.execution.api.cross_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Cross join two dataframes. This is a wrapper of
join()
withhow="cross"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.distinct(df, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Equivalent to
SELECT DISTINCT * FROM df
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the result with distinct rows
- Return type
AnyDataFrame
- fugue.execution.api.dropna(df, how='any', thresh=None, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Drop NA recods from dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records dropped
- Return type
AnyDataFrame
- fugue.execution.api.engine_context(engine=None, engine_conf=None, infer_by=None)[source]#
Make an execution engine and set it as the context engine. This function is thread safe and async safe.
- Parameters
engine (AnyExecutionEngine) – an engine like object, defaults to None
engine_conf (Any) – the configs for the engine, defaults to None
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
- Return type
Iterator[ExecutionEngine]
Note
For more details, please read
make_execution_engine()
Examples
import fugue.api as fa with fa.engine_context(spark_session): transform(df, func) # will use spark in this transformation
- fugue.execution.api.fillna(df, value, subset=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
DataFrame with NA records filled
- Return type
AnyDataFrame
- fugue.execution.api.filter(df, condition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Filter rows by the given condition
- Parameters
df (AnyDataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the filtered dataframe
- Return type
AnyDataFrame
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, functions as f import fugue.api as fa with fa.engine_context("duckdb"): fa.filter(df, (col("a")>1) & (col("b")=="x")) fa.filter(df, f.coalesce(col("a"),col("b"))>1)
- fugue.execution.api.full_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Full outer join two dataframes. This is a wrapper of
join()
withhow="full_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.get_context_engine()[source]#
Get the execution engine in the current context. Regarding the order of the logic please read
make_execution_engine()
- Return type
- fugue.execution.api.get_current_conf()[source]#
Get the current configs either in the defined engine context or by the global configs (see
register_global_conf()
)- Return type
- fugue.execution.api.get_current_parallelism()[source]#
Get the current parallelism of the current global/context engine. If there is no global/context engine, it creates a temporary engine using
make_execution_engine()
to get its parallelism- Returns
the size of the parallelism
- Return type
int
- fugue.execution.api.inner_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Inner join two dataframes. This is a wrapper of
join()
withhow="inner"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.intersect(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Intersect
df1
anddf2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to intersect with
distinct (bool) –
true
forINTERSECT
(==INTERSECT DISTINCT
),false
forINTERSECT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- fugue.execution.api.join(df1, df2, *dfs, how, on=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
Note
Please read
get_join_schemas()
- fugue.execution.api.left_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left outer join two dataframes. This is a wrapper of
join()
withhow="left_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.load(path, format_hint=None, columns=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
an engine compatible dataframe
- Return type
AnyDataFrame
For more details and examples, read Zip & Comap.
- fugue.execution.api.persist(df, lazy=False, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the persisted dataframe
- Return type
AnyDataFrame
- fugue.execution.api.repartition(df, partition, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Partition the input dataframe using
partition
.- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
partition (PartitionSpec) – how you want to partition the dataframe
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the repartitioned dataframe
- Return type
AnyDataFrame
Caution
This function is experimental, and may be removed in the future.
- fugue.execution.api.right_outer_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Right outer join two dataframes. This is a wrapper of
join()
withhow="right_outer"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.run_engine_function(func, engine=None, engine_conf=None, as_fugue=False, as_local=False, infer_by=None)[source]#
Run a lambda function based on the engine provided
- Parameters
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
infer_by (Optional[List[Any]]) – a list of objects to infer the engine, defaults to None
func (Callable[[ExecutionEngine], Any]) –
- Returns
None or a Fugue
DataFrame
ifas_fugue
is True, otherwise ifinfer_by
contains any Fugue DataFrame, then return the Fugue DataFrame, otherwise it returns the underlying dataframe usingnative_as_df()
- Return type
Any
Note
This function is for deveopment use. Users should not need it.
- fugue.execution.api.sample(df, n=None, frac=None, replace=False, seed=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the sampled dataframe
- Return type
AnyDataFrame
- fugue.execution.api.save(df, path, format_hint=None, mode='overwrite', partition=None, force_single=False, engine=None, engine_conf=None, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Optional[Any]) – how to partition the dataframe before saving, defaults to None
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
None
For more details and examples, read Load & Save.
- fugue.execution.api.select(df, *columns, where=None, having=None, distinct=False, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
The functional interface for SQL select statement
- Parameters
df (AnyDataFrame) – the dataframe to be operated on
columns (Union[str, ColumnExpr]) – column expressions, for strings they will represent the column names
where (Optional[ColumnExpr]) –
WHERE
condition expression, defaults to Nonehaving (Optional[ColumnExpr]) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to Nonedistinct (bool) – whether to return distinct result, defaults to False
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the select result as a dataframe
- Return type
AnyDataFrame
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue.column import col, lit, functions as f import fugue.api as fa with fa.engine_context("duckdb"): # select existed and new columns fa.select(df, col("a"),col("b"),lit(1,"another")) fa.select(df, col("a"),(col("b")+lit(1)).alias("x")) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df fa.select( df, f.count_distinct(all_cols()).alias("x")) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x")) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 fa.select( df, col("a"),f.max(col("b")+lit(1)).alias("x"), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- fugue.execution.api.semi_join(df1, df2, *dfs, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Left semi-join two dataframes. This is a wrapper of
join()
withhow="semi"
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to join
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the joined dataframe
- Return type
AnyDataFrame
- fugue.execution.api.set_global_engine(engine, engine_conf=None)[source]#
Make an execution engine and set it as the global execution engine
- Parameters
engine (AnyExecutionEngine) – an engine like object, must not be None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
- Return type
Caution
In general, it is not a good practice to set a global engine. You should consider
engine_context()
instead. The exception is when you iterate in a notebook and cross cells, this could simplify the code.Note
For more details, please read
make_execution_engine()
andset_global()
Examples
import fugue.api as fa fa.set_global_engine(spark_session) transform(df, func) # will use spark in this transformation fa.clear_global_engine() # remove the global setting
- fugue.execution.api.subtract(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
df1 - df2
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to subtract
distinct (bool) –
true
forEXCEPT
(==EXCEPT DISTINCT
),false
forEXCEPT ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all datafrmes must be identical, or an exception will be thrown.
- fugue.execution.api.take(df, n, presort, na_position='last', partition=None, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (AnyDataFrame) – an input dataframe that can be recognized by Fugue
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition (Optional[Any]) – PartitionSpec to apply the take operation, defaults to None
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
n rows of DataFrame per partition
- Return type
AnyDataFrame
- fugue.execution.api.union(df1, df2, *dfs, distinct=True, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Join two dataframes
- Parameters
df1 (AnyDataFrame) – the first dataframe
df2 (AnyDataFrame) – the second dataframe
dfs (AnyDataFrame) – more dataframes to union
distinct (bool) –
true
forUNION
(==UNION DISTINCT
),false
forUNION ALL
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether to force return a local DataFrame, defaults to False
- Returns
the unioned dataframe
- Return type
AnyDataFrame
Note
Currently, the schema of all dataframes must be identical, or an exception will be thrown.
fugue.execution.execution_engine#
- class fugue.execution.execution_engine.EngineFacet(execution_engine)[source]#
Bases:
FugueEngineBase
The base class for different factes of the execution engines.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property conf: ParamDict#
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- property execution_engine: ExecutionEngine#
the execution engine this sql engine will run on
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property log: Logger#
Logger of this engine instance
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.execution_engine.ExecutionEngine(conf)[source]#
Bases:
FugueEngineBase
The abstract base class for execution engines. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user’s higher level logic.
Please read the ExecutionEngine Tutorial to understand this most important Fugue concept
- Parameters
conf (Any) – dict-like config, read this to learn Fugue specific options
- aggregate(df, partition_spec, agg_cols)[source]#
Aggregate on dataframe
- Parameters
df (DataFrame) – the dataframe to aggregate on
partition_spec (Optional[PartitionSpec]) – PartitionSpec to specify partition keys
agg_cols (List[ColumnExpr]) – aggregation expressions
- Returns
the aggregated result as a dataframe
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # SELECT MAX(b) AS b FROM df engine.aggregate( df, partition_spec=None, agg_cols=[f.max(col("b"))]) # SELECT a, MAX(b) AS x FROM df GROUP BY a engine.aggregate( df, partition_spec=PartitionSpec(by=["a"]), agg_cols=[f.max(col("b")).alias("x")])
- as_context()[source]#
Set this execution engine as the context engine. This function is thread safe and async safe.
Examples
with engine.as_context(): transform(df, func) # will use engine in this transformation
- Return type
Iterator[ExecutionEngine]
- assign(df, columns)[source]#
Update existing columns with new values and add new columns
- Parameters
df (DataFrame) – the dataframe to set columns
columns (List[ColumnExpr]) – column expressions
- Returns
the updated dataframe
- Return type
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
# assume df has schema: a:int,b:str # add constant column x engine.assign(df, lit(1,"x")) # change column b to be a constant integer engine.assign(df, lit(1,"b")) # add new x to be a+b engine.assign(df, (col("a")+col("b")).alias("x")) # cast column a data type to double engine.assign(df, col("a").cast(float))
- abstract broadcast(df)[source]#
Broadcast the dataframe to all workers for a distributed computing framework
- comap(df, map_func, output_schema, partition_spec, on_init=None)[source]#
Apply a function to each zipped partition on the zipped dataframe.
- Parameters
df (DataFrame) – input dataframe, it must be a zipped dataframe (it has to be a dataframe output from
zip()
orzip_all()
)map_func (Callable[[PartitionCursor, DataFrames], LocalDataFrame]) – the function to apply on every zipped partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification for processing the zipped zipped dataframe.
on_init (Optional[Callable[[int, DataFrames], Any]]) – callback function when the physical partition is initializaing, defaults to None
- Returns
the dataframe after the comap operation
Note
The input of this method must be an output of
zip()
orzip_all()
The
partition_spec
here is NOT related with how you zipped the dataframe and however you set it, will only affect the processing speed, actually the partition keys will be overriden to the zipped dataframe partition keys. You may use it in this way to improve the efficiency:PartitionSpec(algo="even", num="ROWCOUNT")
, this tells the execution engine to put each zipped partition into a physical partition so it can achieve the best possible load balance.If input dataframe has keys, the dataframes you get in
map_func
andon_init
will have keys, otherwise you will get list-like dataframeson_init function will get a DataFrames object that has the same structure, but has all empty dataframes, you can use the schemas but not the data.
See also
For more details and examples, read Zip & Comap
- property conf: ParamDict#
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- convert_yield_dataframe(df, as_local)[source]#
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns
another DataFrame that can be used after this execution engine stops
- Return type
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- abstract dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- abstract fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- filter(df, condition)[source]#
Filter rows by the given condition
- Parameters
df (DataFrame) – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
- Returns
the filtered dataframe
- Return type
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f engine.filter(df, (col("a")>1) & (col("b")=="x")) engine.filter(df, f.coalesce(col("a"),col("b"))>1)
- abstract property fs: FileSystem#
File system of this engine instance
- abstract get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- property in_context: bool#
Whether this engine is being used as a context engine
- abstract intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_global: bool#
Whether this engine is being used as THE global engine
- abstract join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- abstract load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- property map_engine: MapEngine#
The
MapEngine
currently used by this execution engine. You should useset_map_engine()
to set a new MapEngine instance. If not set, the default iscreate_default_map_engine()
- on_enter_context()[source]#
The event hook when calling
set_blobal_engine()
orengine_context()
, defaults to no operation- Return type
None
- on_exit_context()[source]#
The event hook when calling
clear_blobal_engine()
or exiting fromengine_context()
, defaults to no operation- Return type
None
- abstract persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- abstract repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- abstract sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- abstract save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- select(df, cols, where=None, having=None)[source]#
The functional interface for SQL select statement
- Parameters
df (DataFrame) – the dataframe to be operated on
cols (SelectColumns) – column expressions
where (Optional[ColumnExpr]) –
WHERE
condition expression, defaults to Nonehaving (Optional[ColumnExpr]) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to None
- Returns
the select result as a dataframe
- Return type
New Since
0.6.0
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # select existed and new columns engine.select(df, SelectColumns(col("a"),col("b"),lit(1,"another"))) engine.select(df, SelectColumns(col("a"),(col("b")+lit(1)).alias("x"))) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df engine.select( df, SelectColumns(f.count_distinct(all_cols()).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- set_global()[source]#
Set this execution engine to be the global execution engine.
Note
Global engine is also considered as a context engine, so
in_context()
will also become true for the global engine.Examples
engine1.set_global(): transform(df, func) # will use engine1 in this transformation with engine2.as_context(): transform(df, func) # will use engine2 transform(df, func) # will use engine1
- Return type
- set_sql_engine(engine)[source]#
Set
SQLEngine
for this execution engine. If not set, the default iscreate_default_sql_engine()
- property sql_engine: SQLEngine#
The
SQLEngine
currently used by this execution engine. You should useset_sql_engine()
to set a new SQLEngine instance. If not set, the default iscreate_default_sql_engine()
- stop()[source]#
Stop this execution engine, do not override You should customize
stop_engine()
if necessary. This function ensuresstop_engine()
to be called only onceNote
Once the engine is stopped it should not be used again
- Return type
None
- stop_engine()[source]#
Custom logic to stop the execution engine, defaults to no operation
- Return type
None
- abstract subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- abstract take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- abstract union(df1, df2, distinct=True)[source]#
Join two dataframes
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- zip(df1, df2, how='inner', partition_spec=None, temp_path=None, to_file_threshold=-1, df1_name=None, df2_name=None)[source]#
Partition the two dataframes in the same way with
partition_spec
and zip the partitions together on the partition keys.- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition_spec (PartitionSpec, optional) – partition spec to partition each dataframe, defaults to empty.
temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
df1_name (Optional[str]) – df1’s name in the zipped dataframe, defaults to None
df2_name (Optional[str]) – df2’s name in the zipped dataframe, defaults to None
- Returns
a zipped dataframe, the metadata of the dataframe will indicate it’s zipped
Note
Different from join,
df1
anddf2
can have common columns that you will not use as partition keys.If
on
is not specified it will also use the common columns of the two dataframes (if it’s not a cross zip)For non-cross zip, the two dataframes must have common columns, or error will be thrown
See also
For more details and examples, read Zip & Comap.
- zip_all(dfs, how='inner', partition_spec=None, temp_path=None, to_file_threshold=-1)[source]#
Zip multiple dataframes together with given partition specifications.
- Parameters
dfs (DataFrames) – DataFrames like object
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition_spec (Optional[PartitionSpec]) – Partition like object, defaults to empty.
temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
- Returns
a zipped dataframe, the metadata of the dataframe will indicated it’s zipped
- Return type
Note
Please also read
zip()
If
dfs
is dict like, the zipped dataframe will be dict like, Ifdfs
is list like, the zipped dataframe will be list likeIt’s fine to contain only one dataframe in
dfs
See also
For more details and examples, read Zip & Comap
- class fugue.execution.execution_engine.ExecutionEngineParam(param)[source]#
Bases:
AnnotatedParam
- Parameters
param (Optional[Parameter]) –
- class fugue.execution.execution_engine.FugueEngineBase[source]#
Bases:
ABC
- abstract property conf: ParamDict#
All configurations of this engine instance.
Note
It can contain more than you providec, for example in
SparkExecutionEngine
, the Spark session can bring in more config, they are all accessible using this property.
- abstract property is_distributed: bool#
Whether this engine is a distributed engine
- abstract property log: Logger#
Logger of this engine instance
- abstract to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.execution_engine.MapEngine(execution_engine)[source]#
Bases:
EngineFacet
The abstract base class for different map operation implementations.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- map_bag(bag, map_func, partition_spec, on_init=None)[source]#
Apply a function to each partition after you partition the bag in a specified way.
- Parameters
df – input dataframe
map_func (Callable[[BagPartitionCursor, LocalBag], LocalBag]) – the function to apply on every logical partition
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, Bag], Any]]) – callback function when the physical partition is initializaing, defaults to None
bag (Bag) –
- Returns
the bag after the map operation
- Return type
- abstract map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- class fugue.execution.execution_engine.SQLEngine(execution_engine)[source]#
Bases:
EngineFacet
The abstract base class for different SQL execution implementations. Please read this to understand the concept
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: Optional[str]#
- encode(dfs, statement)[source]#
- Parameters
dfs (DataFrames) –
statement (StructuredRawSQL) –
- Return type
Tuple[DataFrames, str]
- load_table(table, **kwargs)[source]#
Load table as a dataframe
- Parameters
table (str) – the table name
kwargs (Any) –
- Returns
an engine compatible dataframe
- Return type
- save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]#
Save the dataframe to a table
- Parameters
df (DataFrame) – the dataframe to save
table (str) – the table name
mode (str) – can accept
overwrite
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults None
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
- abstract select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
fugue.execution.factory#
- fugue.execution.factory.is_pandas_or(objs, obj_type)[source]#
Check whether the input contains at least one
obj_type
object and the rest are Pandas DataFrames. This function is a utility function for extendinginfer_execution_engine()
- Parameters
objs (List[Any]) – the list of objects to check
obj_type (Any) –
- Returns
whether all objs are of type
obj_type
or pandas DataFrame and at least one is of typeobj_type
- Return type
bool
- fugue.execution.factory.make_execution_engine(engine=None, conf=None, infer_by=None, **kwargs)[source]#
Create
ExecutionEngine
with specifiedengine
- Parameters
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneconf (Optional[Any]) – Parameters like object, defaults to None
infer_by (Optional[List[Any]]) – List of objects that can be used to infer the execution engine using
infer_execution_engine()
kwargs (Any) – additional parameters to initialize the execution engine
- Returns
the
ExecutionEngine
instance- Return type
Note
This function finds/constructs the engine in the following order:
If
engine
is None, it first try to see if there is any defined context engine to use (=> engine)If
engine
is still empty, then it will try to get the global execution engine. Seeset_global()
If
engine
is still empty, then ifinfer_by
is given, it will try to infer the execution engine (=> engine)If
engine
is still empty, then it will construct the default engine defined byregister_default_execution_engine()
(=> engine)Now,
engine
must not be empty, if it is an object other thanExecutionEngine
, we will useparse_execution_engine()
to construct (=> engine)Now,
engine
must have been an ExecutionEngine object. We update its SQL engine if specified, then update its config usingconf
andkwargs
Examples
register_default_execution_engine(lambda conf: E1(conf)) register_execution_engine("e2", lambda conf, **kwargs: E2(conf, **kwargs)) register_sql_engine("s", lambda conf: S2(conf)) # E1 + E1.create_default_sql_engine() make_execution_engine() # E2 + E2.create_default_sql_engine() make_execution_engine(e2) # E1 + S2 make_execution_engine((None, "s")) # E2(conf, a=1, b=2) + S2 make_execution_engine(("e2", "s"), conf, a=1, b=2) # SparkExecutionEngine + SparkSQLEngine make_execution_engine(SparkExecutionEngine) make_execution_engine(SparkExecutionEngine(spark_session, conf)) # SparkExecutionEngine + S2 make_execution_engine((SparkExecutionEngine, "s")) # assume object e2_df can infer E2 engine make_execution_engine(infer_by=[e2_df]) # an E2 engine # global e_global = E1(conf) e_global.set_global() make_execution_engine() # e_global # context with E2(conf).as_context() as ec: make_execution_engine() # ec make_execution_engine() # e_global
- fugue.execution.factory.make_sql_engine(engine=None, execution_engine=None, **kwargs)[source]#
Create
SQLEngine
with specifiedengine
- Parameters
engine (Optional[Any]) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an
SQLEngine
type, or theSQLEngine
instance (you can useNone
to use the default one), defaults to Noneexecution_engine (Optional[ExecutionEngine]) – the
ExecutionEngine
instance to create theSQLEngine
. Normally you should always provide this value.kwargs (Any) – additional parameters to initialize the sql engine
- Returns
the
SQLEngine
instance- Return type
Note
For users, you normally don’t need to call this function directly. Use
make_execution_engine
insteadExamples
register_default_sql_engine(lambda conf: S1(conf)) register_sql_engine("s2", lambda conf: S2(conf)) engine = NativeExecutionEngine() # S1(engine) make_sql_engine(None, engine) # S1(engine, a=1) make_sql_engine(None, engine, a=1) # S2(engine) make_sql_engine("s2", engine)
- fugue.execution.factory.register_default_execution_engine(func, on_dup='overwrite')[source]#
Register
ExecutionEngine
as the default engine.- Parameters
func (Callable) – a callable taking Parameters like object and
**kwargs
and returning anExecutionEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type
None
Examples
# create a new engine with name my (overwrites if existed) register_default_execution_engine(lambda conf: MyExecutionEngine(conf)) # the following examples will use MyExecutionEngine # 0 make_execution_engine() make_execution_engine(None, {"myconfig":"value}) # 1 dag = FugueWorkflow() dag.create([[0]],"a:int").show() dag.run(None, {"myconfig":"value}) # 2 fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run("", {"myconfig":"value})
- fugue.execution.factory.register_default_sql_engine(func, on_dup='overwrite')[source]#
Register
SQLEngine
as the default engine- Parameters
func (Callable) – a callable taking
ExecutionEngine
and**kwargs
and returning aSQLEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting) or “throw” (throw exception), defaults to “overwrite”.
- Raises
KeyError – if
on_dup
isthrow
and thename
already exists- Return type
None
Note
You should be careful to use this function, because when you set a custom SQL engine as default, all execution engines you create will use this SQL engine unless you are explicit. For example if you set the default SQL engine to be a Spark specific one, then if you start a NativeExecutionEngine, it will try to use it and will throw exceptions.
So it’s always a better idea to use
register_sql_engine
insteadExamples
# create a new engine with name my (overwrites if existed) register_default_sql_engine(lambda engine: MySQLEngine(engine)) # create NativeExecutionEngine with MySQLEngine as the default make_execution_engine() # create SparkExecutionEngine with MySQLEngine instead of SparkSQLEngine make_execution_engine("spark") # NativeExecutionEngine with MySQLEngine with FugueWorkflow() as dag: dag.create([[0]],"a:int").show() dag.run()
- fugue.execution.factory.register_execution_engine(name_or_type, func, on_dup='overwrite')[source]#
Register
ExecutionEngine
with a given name.- Parameters
name_or_type (Union[str, Type]) – alias of the execution engine, or type of an object that can be converted to an execution engine
func (Callable) – a callable taking Parameters like object and
**kwargs
and returning anExecutionEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type
None
Examples
Alias registration examples:
# create a new engine with name my (overwrites if existed) register_execution_engine("my", lambda conf: MyExecutionEngine(conf)) # 0 make_execution_engine("my") make_execution_engine("my", {"myconfig":"value}) # 1 dag = FugueWorkflow() dag.create([[0]],"a:int").show() dag.run("my", {"myconfig":"value}) # 2 fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run("my")
Type registration examples:
from pyspark.sql import SparkSession from fugue_spark import SparkExecutionEngine from fugue import fsql register_execution_engine( SparkSession, lambda session, conf: SparkExecutionEngine(session, conf)) spark_session = SparkSession.builder.getOrCreate() fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run(spark_session)
- fugue.execution.factory.register_sql_engine(name, func, on_dup='overwrite')[source]#
Register
SQLEngine
with a given name.- Parameters
name (str) – name of the SQL engine
func (Callable) – a callable taking
ExecutionEngine
and**kwargs
and returning aSQLEngine
instanceon_dup – action on duplicated
name
. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.
- Return type
None
Examples
# create a new engine with name my (overwrites if existed) register_sql_engine("mysql", lambda engine: MySQLEngine(engine)) # create execution engine with MySQLEngine as the default make_execution_engine(("", "mysql")) # create DaskExecutionEngine with MySQLEngine as the default make_execution_engine(("dask", "mysql")) # default execution engine + MySQLEngine with FugueWorkflow() as dag: dag.create([[0]],"a:int").show() dag.run(("","mysql"))
- fugue.execution.factory.try_get_context_execution_engine()[source]#
If the global execution engine is set (see
set_global()
) or the context is set (seeas_context()
), then return the engine, else return None- Return type
Optional[ExecutionEngine]
fugue.execution.native_execution_engine#
- class fugue.execution.native_execution_engine.NativeExecutionEngine(conf=None)[source]#
Bases:
ExecutionEngine
The execution engine based on native python and pandas. This execution engine is mainly for prototyping and unit tests.
Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
- dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- property fs: FileSystem#
File system of this engine instance
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_distributed: bool#
Whether this engine is a distributed engine
- join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- property log: Logger#
Logger of this engine instance
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- property pl_utils: PandasUtils#
Pandas-like dataframe utils
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.native_execution_engine.PandasMapEngine(execution_engine)[source]#
Bases:
MapEngine
- Parameters
execution_engine (ExecutionEngine) –
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue.execution.native_execution_engine.QPDPandasEngine(execution_engine)[source]#
Bases:
SQLEngine
QPD execution implementation.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: Optional[str]#
- property is_distributed: bool#
Whether this engine is a distributed engine
- select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
fugue.extensions#
fugue.extensions.creator#
- fugue.extensions.creator.convert.creator(schema=None)[source]#
Decorator for creators
Please read Creator Tutorial
- Parameters
schema (Optional[Any]) –
- Return type
Callable[[Any], _FuncAsCreator]
- fugue.extensions.creator.convert.register_creator(alias, obj, on_dup=0)[source]#
Register creator with an alias. This is a simplified version of
parse_creator()
- Parameters
alias (str) – alias of the creator
obj (Any) – the object that can be converted to
Creator
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read Creator Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The creator implementation in file
pn/pn/creators.py
import pandas import pd def my_creator() -> pd.DataFrame: return pd.DataFrame()
Then in
pn/pn/__init__.py
from .creators import my_creator from fugue import register_creator def register_extensions(): register_creator("mc", my_creator) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() dag.create("mc").show() # use my_creator by alias dag.run()
- class fugue.extensions.creator.creator.Creator[source]#
Bases:
ExtensionContext
,ABC
The interface is to generate single DataFrame from params. For example reading data from file should be a type of Creator. Creator is task level extension, running on driver, and execution engine aware.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Implementing Creator is commonly unnecessary. You can choose the interfaceless approach which may decouple your code from Fugue.
See also
Please read Creator Tutorial
- abstract create()[source]#
Create DataFrame on driver side
Note
It runs on driver side
The output dataframe is not necessarily local, for example a SparkDataFrame
It is engine aware, you can put platform dependent code in it (for example native pyspark code) but by doing so your code may not be portable. If you only use the functions of the general ExecutionEngine interface, it’s still portable.
- Returns
result dataframe
- Return type
fugue.extensions.outputter#
- fugue.extensions.outputter.convert.outputter(**validation_rules)[source]#
Decorator for outputters
Please read Outputter Tutorial
- Parameters
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsOutputter]
- fugue.extensions.outputter.convert.register_outputter(alias, obj, on_dup=0)[source]#
Register outputter with an alias.
- Parameters
alias (str) – alias of the processor
obj (Any) – the object that can be converted to
Outputter
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read Outputter Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The processor implementation in file
pn/pn/outputters.py
from fugue import DataFrame def my_outputter(df:DataFrame) -> None: print(df)
Then in
pn/pn/__init__.py
from .outputters import my_outputter from fugue import register_outputter def register_extensions(): register_outputter("mo", my_outputter) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() # use my_outputter by alias dag.df([[0]],"a:int").output("mo") dag.run()
- class fugue.extensions.outputter.outputter.Outputter[source]#
Bases:
ExtensionContext
,ABC
The interface to process one or multiple incoming dataframes without returning anything. For example printing or saving dataframes should be a type of Outputter. Outputter is task level extension, running on driver, and execution engine aware.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Implementing Outputter is commonly unnecessary. You can choose the interfaceless approach which may decouple your code from Fugue.
See also
Please read Outputter Tutorial
- abstract process(dfs)[source]#
Process the collection of dataframes on driver side
Note
It runs on driver side
The dataframes are not necessarily local, for example a SparkDataFrame
It is engine aware, you can put platform dependent code in it (for example native pyspark code) but by doing so your code may not be portable. If you only use the functions of the general ExecutionEngine, it’s still portable.
- Parameters
dfs (DataFrames) – dataframe collection to process
- Return type
None
fugue.extensions.processor#
- fugue.extensions.processor.convert.processor(schema=None, **validation_rules)[source]#
Decorator for processors
Please read Processor Tutorial
- Parameters
schema (Optional[Any]) –
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsProcessor]
- fugue.extensions.processor.convert.register_processor(alias, obj, on_dup=0)[source]#
Register processor with an alias.
- Parameters
alias (str) – alias of the processor
obj (Any) – the object that can be converted to
Processor
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read Processor Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The processor implementation in file
pn/pn/processors.py
from fugue import DataFrame def my_processor(df:DataFrame) -> DataFrame: return df
Then in
pn/pn/__init__.py
from .processors import my_processor from fugue import register_processor def register_extensions(): register_processor("mp", my_processor) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() # use my_processor by alias dag.df([[0]],"a:int").process("mp").show() dag.run()
- class fugue.extensions.processor.processor.Processor[source]#
Bases:
ExtensionContext
,ABC
The interface to process one or multiple incoming dataframes and return one DataFrame. For example dropping a column of df should be a type of Processor. Processor is task level extension, running on driver, and execution engine aware.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Implementing Processor is commonly unnecessary. You can choose the interfaceless approach which may decouple your code from Fugue.
See also
Please read Processor Tutorial
- abstract process(dfs)[source]#
Process the collection of dataframes on driver side
Note
It runs on driver side
The dataframes are not necessarily local, for example a SparkDataFrame
It is engine aware, you can put platform dependent code in it (for example native pyspark code) but by doing so your code may not be portable. If you only use the functions of the general ExecutionEngine, it’s still portable.
- Parameters
dfs (DataFrames) – dataframe collection to process
- Returns
the result dataframe
- Return type
fugue.extensions.transformer#
- fugue.extensions.transformer.convert.cotransformer(schema, **validation_rules)[source]#
Decorator for cotransformers
Please read the CoTransformer Tutorial
- Parameters
schema (Any) –
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsCoTransformer]
- fugue.extensions.transformer.convert.output_cotransformer(**validation_rules)[source]#
Decorator for cotransformers
Please read the CoTransformer Tutorial
- Parameters
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsCoTransformer]
- fugue.extensions.transformer.convert.output_transformer(**validation_rules)[source]#
Decorator for transformers
Please read the Transformer Tutorial
- Parameters
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsTransformer]
- fugue.extensions.transformer.convert.register_output_transformer(alias, obj, on_dup=0)[source]#
Register output transformer with an alias.
- Parameters
alias (str) – alias of the transformer
obj (Any) – the object that can be converted to
OutputTransformer
orOutputCoTransformer
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read the Transformer Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The transformer implementation in file
pn/pn/transformers.py
import pandas as pd def my_transformer(df:pd.DataFrame) -> None: df.to_parquet("<unique_path>")
Then in
pn/pn/__init__.py
from .transformers import my_transformer from fugue import register_transformer def register_extensions(): register_transformer("mt", my_transformer) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() # use my_transformer by alias dag.df([[0]],"a:int").out_transform("mt") dag.run()
- fugue.extensions.transformer.convert.register_transformer(alias, obj, on_dup=0)[source]#
Register transformer with an alias.
- Parameters
alias (str) – alias of the transformer
obj (Any) – the object that can be converted to
Transformer
orCoTransformer
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Tip
Registering an extension with an alias is particularly useful for projects such as libraries. This is because by using alias, users don’t have to import the specific extension, or provide the full path of the extension. It can make user’s code less dependent and easy to understand.
New Since
0.6.0
See also
Please read the Transformer Tutorial
Examples
Here is an example how you setup your project so your users can benefit from this feature. Assume your project name is
pn
The transformer implementation in file
pn/pn/transformers.py
import pandas as pd # schema: * def my_transformer(df:pd.DataFrame) -> pd.DataFrame: return df
Then in
pn/pn/__init__.py
from .transformers import my_transformer from fugue import register_transformer def register_extensions(): register_transformer("mt", my_transformer) # ... register more extensions register_extensions()
In users code:
import pn # register_extensions will be called from fugue import FugueWorkflow dag = FugueWorkflow() # use my_transformer by alias dag.df([[0]],"a:int").transform("mt").show() dag.run()
- fugue.extensions.transformer.convert.transformer(schema, **validation_rules)[source]#
Decorator for transformers
Please read the Transformer Tutorial
- Parameters
schema (Any) –
validation_rules (Any) –
- Return type
Callable[[Any], _FuncAsTransformer]
- class fugue.extensions.transformer.transformer.CoTransformer[source]#
Bases:
ExtensionContext
The interface to process logical partitions of a zipped dataframe. A dataframe such as SparkDataFrame can be distributed. But this interface is about local process, scalability and throughput is not a concern of CoTransformer.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Commonly, if you don’t need to implement
on_init()
, you can choose the interfaceless approach which may decouple your code from Fugue.It’s important to understand Zip & Comap, and please also read the CoTransformer Tutorial
Due to similar issue on spark pickling ABC objects. This class is not ABC. If you encounter the similar issue, possible solution at
- get_format_hint()[source]#
Get the transformer’s preferred data format, for example it can be
pandas
,pyarrow
and None. This is to help the execution engine use the most efficient way to execute the logic.- Return type
Optional[str]
- get_output_schema(dfs)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
Currently,
dfs
is a collection of empty dataframes with the same structure and schemasNormally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
dfs (DataFrames) – the collection of dataframes you are going to transform. They are empty dataframes with the same structure and schemas
- Returns
Schema like object, should not be None or empty
- Return type
Any
- on_init(dfs)[source]#
Callback for initializing physical partition that contains one or multiple logical partitions. You may put expensive initialization logic here so you will not have to repeat that in
transform()
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class
You can get physical partition no (if available from the execution egnine) from
cursor()
Currently,
dfs
is a collection of empty dataframes with the same structure and schemas
- Parameters
dfs (DataFrames) – a collection of empty dataframes with the same structure and schemas
- Return type
None
- transform(dfs)[source]#
The transformation logic from a collection of dataframes (with the same partition keys) to a local dataframe.
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this class
- Parameters
dfs (DataFrames) – a collection of dataframes with the same partition keys
- Returns
transformed dataframe
- Return type
- class fugue.extensions.transformer.transformer.OutputCoTransformer[source]#
Bases:
CoTransformer
- get_output_schema(dfs)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
Currently,
dfs
is a collection of empty dataframes with the same structure and schemasNormally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
dfs (DataFrames) – the collection of dataframes you are going to transform. They are empty dataframes with the same structure and schemas
- Returns
Schema like object, should not be None or empty
- Return type
Any
- process(dfs)[source]#
- Parameters
dfs (DataFrames) –
- Return type
None
- transform(dfs)[source]#
The transformation logic from a collection of dataframes (with the same partition keys) to a local dataframe.
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this class
- Parameters
dfs (DataFrames) – a collection of dataframes with the same partition keys
- Returns
transformed dataframe
- Return type
- class fugue.extensions.transformer.transformer.OutputTransformer[source]#
Bases:
Transformer
- get_output_schema(df)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
This is the only function in this interface that is facing the entire DataFrame that is not necessarily local, for example a SparkDataFrame
Normally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
df (DataFrame) – the entire dataframe you are going to transform.
- Returns
Schema like object, should not be None or empty
- Return type
Any
- process(df)[source]#
- Parameters
df (LocalDataFrame) –
- Return type
None
- transform(df)[source]#
The transformation logic from one local dataframe to another local dataframe.
Note
This function operates on logical partition level
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this classThe input dataframe may be unbounded, but must be empty aware. It’s safe to consume it for ONLY ONCE
The input dataframe is never empty. Empty dataframes are skipped
- Parameters
df (LocalDataFrame) – one logical partition to transform on
- Returns
transformed dataframe
- Return type
- class fugue.extensions.transformer.transformer.Transformer[source]#
Bases:
ExtensionContext
The interface to process logical partitions of a dataframe. A dataframe such as SparkDataFrame can be distributed. But this interface is about local process, scalability and throughput is not a concern of Transformer.
To implement this class, you should not have
__init__
, please directly implement the interface functions.Note
Before implementing this class, do you really need to implement this interface? Do you know the interfaceless feature of Fugue? Commonly, if you don’t need to implement
on_init()
, you can choose the interfaceless approach which may decouple your code from Fugue.It’s important to understand the Partition Tutorial, and please also read the Transformer Tutorial
Due to similar issue on spark pickling ABC objects. This class is not ABC. If you encounter the similar issue, possible solution at
- get_format_hint()[source]#
Get the transformer’s preferred data format, for example it can be
pandas
,pyarrow
and None. This is to help the execution engine use the most efficient way to execute the logic.- Return type
Optional[str]
- get_output_schema(df)[source]#
Generate the output schema on the driver side.
Note
This is running on driver
This is the only function in this interface that is facing the entire DataFrame that is not necessarily local, for example a SparkDataFrame
Normally, you should not consume this dataframe in this step, and you s hould only use its schema
You can access all properties except for
cursor()
- Parameters
df (DataFrame) – the entire dataframe you are going to transform.
- Returns
Schema like object, should not be None or empty
- Return type
Any
- on_init(df)[source]#
Callback for initializing physical partition that contains one or multiple logical partitions. You may put expensive initialization logic here so you will not have to repeat that in
transform()
Note
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the context from the properties of this class
You can get physical partition no (if available from the execution egnine) from
cursor()
The input dataframe may be unbounded, but must be empty aware. That means you must not consume the df by any means, and you can not count. However you can safely peek the first row of the dataframe for multiple times.
The input dataframe is never empty. Empty physical partitions are skipped
- Parameters
df (DataFrame) – the entire dataframe of this physical partition
- Return type
None
- transform(df)[source]#
The transformation logic from one local dataframe to another local dataframe.
Note
This function operates on logical partition level
This call can be on a random machine (depending on the ExecutionEngine you use), you should get the
context
from the properties of this classThe input dataframe may be unbounded, but must be empty aware. It’s safe to consume it for ONLY ONCE
The input dataframe is never empty. Empty dataframes are skipped
- Parameters
df (LocalDataFrame) – one logical partition to transform on
- Returns
transformed dataframe
- Return type
fugue.extensions.context#
- class fugue.extensions.context.ExtensionContext[source]#
Bases:
object
Context variables that extensions can access. It’s also the base class of all extensions.
- property callback: RPCClient#
RPC client to talk to driver, this is for transformers only, and available on both driver and workers
- property cursor: PartitionCursor#
Cursor of the current logical partition, this is for transformers only, and only available on worker side
- property execution_engine: ExecutionEngine#
Execution engine for the current execution, this is only available on driver side
- property has_callback: bool#
Whether this transformer has callback
- property key_schema: Schema#
Partition keys schema, this is for transformers only, and available on both driver and workers
- property output_schema: Schema#
Output schema of the operation. This is accessible for all extensions ( if defined), and on both driver and workers
- property params: ParamDict#
Parameters set for using this extension.
Examples
>>> FugueWorkflow().df(...).transform(using=dummy, params={"a": 1})
You will get
{"a": 1}
as params in thedummy
transformer
- property partition_spec: PartitionSpec#
Partition specification, this is for all extensions except for creators, and available on both driver and workers
- property rpc_server: RPCServer#
RPC client to talk to driver, this is for transformers only, and available on both driver and workers
- validate_on_runtime(data)[source]#
- Parameters
data (Union[DataFrame, DataFrames]) –
- Return type
None
- property validation_rules: Dict[str, Any]#
Extension input validation rules defined by user
- property workflow_conf: ParamDict#
Workflow level configs, this is accessible even in
Transformer
andCoTransformer
Examples
>>> dag = FugueWorkflow().df(...).transform(using=dummy) >>> dag.run(NativeExecutionEngine(conf={"b": 10}))
You will get
{"b": 10}
as workflow_conf in thedummy
transformer on both driver and workers.
fugue.rpc#
fugue.rpc.base#
- class fugue.rpc.base.EmptyRPCHandler[source]#
Bases:
RPCHandler
The class representing empty
RPCHandler
- class fugue.rpc.base.NativeRPCClient(server, key)[source]#
Bases:
RPCClient
Native RPC Client that can only be used locally. Use
make_client()
to create this instance.- Parameters
server (NativeRPCServer) – the parent
NativeRPCServer
key (str) – the unique key for the handler and this client
- class fugue.rpc.base.NativeRPCServer(conf)[source]#
Bases:
RPCServer
Native RPC Server that can only be used locally.
- Parameters
conf (Any) – the Fugue Configuration Tutorial
- make_client(handler)[source]#
Add
handler
and correspondentNativeRPCClient
- Parameters
handler (Any) – RPChandler like object
- Returns
the native RPC client
- Return type
- class fugue.rpc.base.RPCFunc(func)[source]#
Bases:
RPCHandler
RPCHandler wrapping a python function.
- Parameters
func (Callable) – a python function
- class fugue.rpc.base.RPCHandler[source]#
Bases:
RPCClient
RPC handler hosting the real logic on driver side
- property running: bool#
Whether the handler is in running state
- start()[source]#
Start the handler, wrapping
start_handler()
- Returns
the instance itself
- Return type
- stop()[source]#
Stop the handler, wrapping
stop_handler()
- Return type
None
- class fugue.rpc.base.RPCServer(conf)[source]#
Bases:
RPCHandler
,ABC
Server abstract class hosting multiple
RPCHandler
.- Parameters
conf (Any) – the Fugue Configuration Tutorial
- invoke(key, *args, **kwargs)[source]#
Invoke the correspondent handler
- Parameters
key (str) – key of the handler
args (Any) –
kwargs (Any) –
- Returns
the return value of the handler
- Return type
Any
- abstract make_client(handler)[source]#
Make a
RPCHandler
and return the correspondentRPCClient
- Parameters
handler (Any) – RPChandler like object
- Returns
the client connecting to the handler
- Return type
- register(handler)[source]#
Register the hander into the server
- Parameters
handler (Any) – RPChandler like object
- Returns
the unique key of the handler
- Return type
str
- start_handler()[source]#
Wrapper to start the server, do not override or call directly
- Return type
None
- fugue.rpc.base.make_rpc_server(conf)[source]#
Make
RPCServer
based on configuration. If ‘fugue.rpc.server` is set, then the value will be used as the server type for the initialization. Otherwise, aNativeRPCServer
instance will be returned- Parameters
conf (Any) – the Fugue Configuration Tutorial
- Returns
the RPC server
- Return type
- fugue.rpc.base.to_rpc_handler(obj)[source]#
Convert object to
RPCHandler
. If the object is alreadyRPCHandler
, then the original instance will be returned. If the object isNone
thenEmptyRPCHandler
will be returned. If the object is a python function thenRPCFunc
will be returned.- Parameters
obj (Any) – RPChandler like object
- Returns
the RPC handler
- Return type
fugue.rpc.flask#
- class fugue.rpc.flask.FlaskRPCClient(key, host, port, timeout_sec)[source]#
Bases:
RPCClient
Flask RPC Client that can be used distributedly. Use
make_client()
to create this instance.- Parameters
key (str) – the unique key for the handler and this client
host (str) – the host address of the flask server
port (int) – the port of the flask server
timeout_sec (float) – timeout seconds for flask clients
- class fugue.rpc.flask.FlaskRPCServer(conf)[source]#
Bases:
RPCServer
Flask RPC server that can be used in a distributed environment. It’s required to set
fugue.rpc.flask_server.host
andfugue.rpc.flask_server.port
. Iffugue.rpc.flask_server.timeout
is not set, then the client could hang until the connection is closed.- Parameters
conf (Any) – the Fugue Configuration Tutorial
Examples
conf = { "fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer", "fugue.rpc.flask_server.host": "127.0.0.1", "fugue.rpc.flask_server.port": "1234", "fugue.rpc.flask_server.timeout": "2 sec", } with make_rpc_server(conf).start() as server: server...
- make_client(handler)[source]#
Add
handler
and correspondentFlaskRPCClient
- Parameters
handler (Any) – RPChandler like object
- Returns
the flask RPC client that can be distributed
- Return type
fugue.sql#
fugue.sql.api#
- fugue.sql.api.fugue_sql(query, *args, fsql_ignore_case=None, fsql_dialect=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]#
Simplified Fugue SQL interface. This function can still take multiple dataframe inputs but will always return the last generated dataframe in the SQL workflow. And
YIELD
should NOT be used with this function. If you want to use Fugue SQL to represent the full workflow, or want to see more Fugue SQL examples, please readfugue_sql_flow()
.- Parameters
query (str) – the Fugue SQL string (can be a jinja template)
args (Any) – variables related to the SQL string
fsql_ignore_case (Optional[bool]) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).
fsql_dialect (Optional[str]) – the dialect of this fsql, defaults to None (it depends on the engine/global config).
kwargs (Any) – variables related to the SQL string
engine (Optional[AnyExecutionEngine]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether return a local dataframe, defaults to False
- Returns
the result dataframe
- Return type
AnyDataFrame
Note
This function is different from
raw_sql()
which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates aFugueSQLWorkflow
which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.
Caution
Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.
That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.
We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.
import pandas as pd import fugue.api as fa def tr(df:pd.DataFrame) -> pd.DataFrame: return df.assign(c=2) input = pd.DataFrame([[0,1],[3.4]], columns=["a","b"]) with fa.engine_context("duckdb"): res = fa.fugue_sql(''' SELECT * FROM input WHERE a<{{x}} TRANSFORM USING tr SCHEMA *,c:int ''', x=2) assert fa.as_array(res) == [[0,1,2]]
- fugue.sql.api.fugue_sql_flow(query, *args, fsql_ignore_case=None, fsql_dialect=None, **kwargs)[source]#
Fugue SQL full functional interface. This function allows full workflow definition using Fugue SQL, and it allows multiple outputs using
YIELD
.- Parameters
query (str) – the Fugue SQL string (can be a jinja template)
args (Any) – variables related to the SQL string
fsql_ignore_case (Optional[bool]) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).
fsql_dialect (Optional[str]) – the dialect of this fsql, defaults to None (it depends on the engine/global config).
kwargs (Any) – variables related to the SQL string
- Returns
the translated Fugue workflow
- Return type
Note
This function is different from
raw_sql()
which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates aFugueSQLWorkflow
which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.
Caution
Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.
That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.
We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.
import fugue.api.fugue_sql_flow as fsql import fugue.api as fa # Basic case fsql(''' CREATE [[0]] SCHEMA a:int PRINT ''').run() # With external data sources df = pd.DataFrame([[0],[1]], columns=["a"]) fsql(''' SELECT * FROM df WHERE a=0 PRINT ''').run() # With external variables df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' SELECT * FROM df WHERE a={{t}} PRINT ''').run() # The following is the explicit way to specify variables and datafrems # (recommended) df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' SELECT * FROM df WHERE a={{t}} PRINT ''', df=df, t=t).run() # Using extensions def dummy(df:pd.DataFrame) -> pd.DataFrame: return df fsql(''' CREATE [[0]] SCHEMA a:int TRANSFORM USING dummy SCHEMA * PRINT ''').run() # It's recommended to provide full path of the extension inside # Fugue SQL, so the SQL definition and exeuction can be more # independent from the extension definition. # Run with different execution engines sql = ''' CREATE [[0]] SCHEMA a:int TRANSFORM USING dummy SCHEMA * PRINT ''' fsql(sql).run(spark_session) fsql(sql).run("dask") with fa.engine_context("duckdb"): fsql(sql).run() # Passing dataframes between fsql calls result = fsql(''' CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x CREATE [[1]] SCHEMA a:int YIELD DATAFRAME AS y ''').run(DaskExecutionEngine) fsql(''' SELECT * FROM x UNION SELECT * FROM y UNION SELECT * FROM z PRINT ''', result, z=pd.DataFrame([[2]], columns=["z"])).run() # Get framework native dataframes result["x"].native # Dask dataframe result["y"].native # Dask dataframe result["x"].as_pandas() # Pandas dataframe # Use lower case fugue sql df = pd.DataFrame([[0],[1]], columns=["a"]) t = 1 fsql(''' select * from df where a={{t}} print ''', df=df, t=t, fsql_ignore_case=True).run()
fugue.sql.workflow#
- class fugue.sql.workflow.FugueSQLWorkflow(compile_conf=None)[source]#
Bases:
FugueWorkflow
Fugue workflow that supports Fugue SQL. Please read the Fugue SQL Tutorial.
- Parameters
compile_conf (Any) –
- property sql_vars: Dict[str, WorkflowDataFrame]#
fugue.workflow#
fugue.workflow.api#
- fugue.workflow.api.out_transform(df, using, params=None, partition=None, callback=None, ignore_errors=None, engine=None, engine_conf=None)[source]#
Transform this dataframe using transformer. It’s a wrapper of
out_transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
params (Optional[Any]) – Parameters like object to run the processor, defaults to None The transformer will be able to access this value from
params()
partition (Optional[Any]) – Partition like object, defaults to None
callback (Optional[Any]) – RPChandler like object, defaults to None
ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Optional[Any]) – Parameters like object, defaults to None
- Return type
None
Note
This function can only take parquet file paths in df. CSV and JSON file formats are disallowed.
This transformation is guaranteed to execute immediately (eager) and return nothing
- fugue.workflow.api.raw_sql(*statements, engine=None, engine_conf=None, as_fugue=False, as_local=False)[source]#
Run raw SQL on the execution engine
- Parameters
statements (Any) – a sequence of sub-statements in string or dataframe-like objects
engine (Optional[Any]) – an engine like object, defaults to None
engine_conf (Optional[Any]) – the configs for the engine, defaults to None
as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False
as_local (bool) – whether return a local dataframe, defaults to False
- Returns
the result dataframe
- Return type
AnyDataFrame
Caution
Currently, only
SELECT
statements are supportedExamples
import pandas as pd import fugue.api as fa with fa.engine_context("duckdb"): a = fa.as_fugue_df([[0,1]], schema="a:long,b:long") b = pd.DataFrame([[0,10]], columns=["a","b"]) c = fa.raw_sql("SELECT * FROM",a,"UNION SELECT * FROM",b) fa.as_pandas(c)
- fugue.workflow.api.transform(df, using, schema=None, params=None, partition=None, callback=None, ignore_errors=None, persist=False, as_local=False, save_path=None, checkpoint=False, engine=None, engine_conf=None, as_fugue=False)[source]#
Transform this dataframe using transformer. It’s a wrapper of
transform()
andrun()
. It will let you do the basic dataframe transformation without usingFugueWorkflow
andDataFrame
. Also, only native types are accepted for both input and output.Please read the Transformer Tutorial
- Parameters
df (Any) – DataFrame like object or
Yielded
or a path string to a parquet fileusing (Any) – transformer-like object, can’t be a string expression
schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
partition (Optional[Any]) – Partition like object, defaults to None
callback (Optional[Any]) – RPChandler like object, defaults to None
ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)
engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an
ExecutionEngine
type, or theExecutionEngine
instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can useNone
for either of them to use the default one), defaults to Noneengine_conf (Optional[Any]) – Parameters like object, defaults to None
as_fugue (bool) – If true, the function will always return a
FugueDataFrame
, otherwise, ifdf
is in native dataframe types such as pandas dataframe, then the output will also return in its native format. Defaults to Falsepersist (bool) – Whether to persist(materialize) the dataframe before returning
as_local (bool) – If true, the result will be converted to a
LocalDataFrame
save_path (Optional[str]) – Whether to save the output to a file (see the note)
checkpoint (bool) – Whether to add a checkpoint for the output (see the note)
- Returns
the transformed dataframe, if
df
is a native dataframe (e.g. pd.DataFrame, spark dataframe, etc), the output will be a native dataframe, the type is determined by the execution engine you use. But ifdf
is of typeDataFrame
, then the output will also be aDataFrame
- Return type
Any
Note
This function may be lazy and return the transformed dataframe.
Note
When you use callback in this function, you must be careful that the output dataframe must be materialized. Otherwise, if the real compute happens out of the function call, the callback receiver is already shut down. To do that you can either use
persist
oras_local
, both will materialize the dataframe before the callback receiver shuts down.Note
When save_path is None and checkpoint is False, then the output will not be saved into a file. The return will be a dataframe.
When save_path is None and checkpoint is True, then the output is saved into the path set by fugue.workflow.checkpoint.path, the name will be randomly chosen, and it is NOT a deterministic checkpoint, so if you run multiple times, the output will be saved into different files. The return will be a dataframe.
When save_path is not None and checkpoint is False, then the output will be saved into save_path. The return will be the value of save_path
When save_path is not None and checkpoint is True, then the output will be saved into save_path. The return will be the dataframe from save_path
This function can only take parquet file paths in df and save_path. Csv and other file formats are disallowed.
The checkpoint here is NOT deterministic, so re-run will generate new checkpoints.
If you want to read and write other file formats or if you want to use deterministic checkpoints, please use
FugueWorkflow
.
fugue.workflow.input#
- fugue.workflow.input.register_raw_df_type(df_type)[source]#
TODO: This function is to be removed before 0.9.0
Deprecated since version 0.8.0: Register using
fugue.api.is_df()
instead.- Parameters
df_type (Type) –
- Return type
None
fugue.workflow.module#
fugue.workflow.workflow#
- class fugue.workflow.workflow.FugueWorkflow(compile_conf=None)[source]#
Bases:
object
Fugue Workflow, also known as the Fugue Programming Interface.
In Fugue, we use DAG to represent workflows, DAG construction and execution are different steps, this class is mainly used in the construction step, so all things you added to the workflow is description and they are not executed until you call
run()
Read this to learn how to initialize it in different ways and pros and cons.
- Parameters
compile_conf (Any) –
- add(task, *args, **kwargs)[source]#
This method should not be called directly by users. Use
create()
,process()
,output()
instead- Parameters
task (FugueTask) –
args (Any) –
kwargs (Any) –
- Return type
- assert_eq(*dfs, **params)[source]#
Compare if these dataframes are equal. It’s for internal, unit test purpose only. It will convert both dataframes to
LocalBoundedDataFrame
, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.- Parameters
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any) –
- Raises
AssertionError – if not equal
- Return type
None
- assert_not_eq(*dfs, **params)[source]#
Assert if all dataframes are not equal to the first dataframe. It’s for internal, unit test purpose only. It will convert both dataframes to
LocalBoundedDataFrame
, so it assumes all dataframes are small and fast enough to convert. DO NOT use it on critical or expensive tasks.- Parameters
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any) –
- Raises
AssertionError – if any dataframe equals to the first dataframe
- Return type
None
- create(using, schema=None, params=None)[source]#
Run a creator to create a dataframe.
Please read the Creator Tutorial
- Parameters
using (Any) – creator-like object, if it is a string, then it must be the alias of a registered creator
schema (Optional[Any]) – Schema like object, defaults to None. The creator will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the creator, defaults to None. The creator will be able to access this value from
params()
pre_partition – Partition like object, defaults to None. The creator will be able to access this value from
partition_spec()
- Returns
result dataframe
- Return type
- create_data(data, schema=None, data_determiner=None)[source]#
Create dataframe.
- Parameters
data (Any) – DataFrame like object or
Yielded
schema (Optional[Any]) – Schema like object, defaults to None
data_determiner (Optional[Callable[[Any], Any]]) – a function to compute unique id from
data
- Returns
a dataframe of the current workflow
- Return type
Note
By default, the input
data
does not affect the determinism of the workflow (butschema
andetadata
do), because the amount of compute can be unpredictable. But if you wantdata
to affect the determinism of the workflow, you can provide the function to compute the unique id ofdata
usingdata_determiner
- df(data, schema=None, data_determiner=None)[source]#
Create dataframe. Alias of
create_data()
- Parameters
data (Any) – DataFrame like object or
Yielded
schema (Optional[Any]) – Schema like object, defaults to None
data_determiner (Optional[Callable[[Any], str]]) – a function to compute unique id from
data
- Returns
a dataframe of the current workflow
- Return type
Note
By default, the input
data
does not affect the determinism of the workflow (butschema
andetadata
do), because the amount of compute can be unpredictable. But if you wantdata
to affect the determinism of the workflow, you can provide the function to compute the unique id ofdata
usingdata_determiner
- get_result(df)[source]#
After
run()
, get the result of a dataframe defined in the dag- Returns
a calculated dataframe
- Parameters
df (WorkflowDataFrame) –
- Return type
Examples
dag = FugueWorkflow() df1 = dag.df([[0]],"a:int") dag.run() dag.get_result(df1).show()
- intersect(*dfs, distinct=True)[source]#
Intersect dataframes in
dfs
.- Parameters
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after intersection, default to True
- Returns
intersected dataframe
- Return type
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- join(*dfs, how, on=None)[source]#
Join dataframes. Read Join tutorials on workflow and engine for details
- Parameters
dfs (Any) – DataFrames like object
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
- Returns
joined dataframe
- Return type
- property last_df: Optional[WorkflowDataFrame]#
- load(path, fmt='', columns=None, **kwargs)[source]#
Load dataframe from persistent storage. Read this for details.
- Parameters
path (str) – file path
fmt (str) – format hint can accept
parquet
,csv
,json
, defaults to “”, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) –
- Returns
dataframe from the file
- Return type
- out_transform(*dfs, using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#
Transform dataframes using transformer, it materializes the execution immediately and returns nothing
Please read the Transformer Tutorial
- Parameters
dfs (Any) – DataFrames like object
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer
schema – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then callout_transform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Optional[Any]) – RPChandler like object, defaults to None
- Return type
None
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- output(*dfs, using, params=None, pre_partition=None)[source]#
Run a outputter on dataframes.
Please read the Outputter Tutorial
- Parameters
using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter
params (Optional[Any]) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. The outputter will be able to access this value from
partition_spec()
dfs (Any) –
- Return type
None
- process(*dfs, using, schema=None, params=None, pre_partition=None)[source]#
Run a processor on the dataframes.
Please read the Processor Tutorial
- Parameters
dfs (Any) – DataFrames like object
using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor
schema (Optional[Any]) – Schema like object, defaults to None. The processor will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. The processor will be able to access this value from
partition_spec()
- Returns
result dataframe
- Return type
- run(engine=None, conf=None, **kwargs)[source]#
Execute the workflow and compute all dataframes.
Note
For inputs, please read
engine_context()
- Parameters
engine (Optional[Any]) – object that can be recognized as an engine, defaults to None
conf (Optional[Any]) – engine config, defaults to None
kwargs (Any) – additional parameters to initialize the execution engine
- Returns
the result set
- Return type
Examples
dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df2 = dag.df([[0]],"b:int") dag.run(SparkExecutionEngine) df1.result.show() df2.result.show() dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df1.yield_dataframe_as("x") result = dag.run(SparkExecutionEngine) result["x"] # SparkDataFrame
Read this to learn how to run in different ways and pros and cons.
- select(*statements, sql_engine=None, sql_engine_params=None, dialect='spark')[source]#
Execute
SELECT
statement usingSQLEngine
- Parameters
statements (Any) – a list of sub-statements in string or
WorkflowDataFrame
sql_engine (Optional[Any]) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an
SQLEngine
type, or theSQLEngine
instance (you can useNone
to use the default one), defaults to Nonesql_engine_params (Optional[Any]) –
dialect (Optional[str]) –
- Returns
result of the
SELECT
statement- Return type
Examples
with FugueWorkflow() as dag: a = dag.df([[0,"a"]],a:int,b:str) b = dag.df([[0]],a:int) c = dag.select("SELECT a FROM",a,"UNION SELECT * FROM",b) dag.run()
Please read this for more examples
- set_op(how, *dfs, distinct=True)[source]#
Union, subtract or intersect dataframes.
- Parameters
how (str) – can accept
union
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after the set operation, default to True
- Returns
result dataframe of the set operation
- Return type
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- show(*dfs, n=10, with_count=False, title=None)[source]#
Show the dataframes. See examples.
- Parameters
dfs (Any) – DataFrames like object
n (int) – max number of rows, defaults to 10
with_count (bool) – whether to show total count, defaults to False
title (Optional[str]) – title to display on top of the dataframe, defaults to None
best_width – max width for the output table, defaults to 100
- Return type
None
Note
When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need topersist()
the dataframe. Or you can turn on Auto Persist
- subtract(*dfs, distinct=True)[source]#
Subtract
dfs[1:]
fromdfs[0]
.- Parameters
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after subtraction, default to True
- Returns
subtracted dataframe
- Return type
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- transform(*dfs, using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#
Transform dataframes using transformer.
Please read the Transformer Tutorial
- Parameters
dfs (Any) – DataFrames like object
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer
schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then calltransform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Optional[Any]) – RPChandler like object, defaults to None
- Returns
the transformed dataframe
- Return type
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- union(*dfs, distinct=True)[source]#
Union dataframes in
dfs
.- Parameters
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after union, default to True
- Returns
unioned dataframe
- Return type
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=-1)[source]#
Zip multiple dataframes together with given partition specifications.
- Parameters
dfs (Any) – DataFrames like object
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition (Optional[Any]) – Partition like object, defaults to None.
temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
- Returns
a zipped dataframe
- Return type
Note
If
dfs
is dict like, the zipped dataframe will be dict like, Ifdfs
is list like, the zipped dataframe will be list likeIt’s fine to contain only one dataframe in
dfs
See also
Read CoTransformer and Zip & Comap for details
- class fugue.workflow.workflow.FugueWorkflowResult(yields)[source]#
Bases:
DataFrames
The result object of
run()
. Users should not construct this object.- Parameters
DataFrames – yields of the workflow
yields (Dict[str, Yielded]) –
- property yields: Dict[str, Any]#
- class fugue.workflow.workflow.WorkflowDataFrame(workflow, task)[source]#
Bases:
DataFrame
It represents the edges in the graph constructed by
FugueWorkflow
. In Fugue, we use DAG to represent workflows, and the edges are strictly dataframes. DAG construction and execution are different steps, this class is used in the construction step. Although it inherits fromDataFrame
, it’s not concerete data. So a lot of the operations are not allowed. If you want to obtain the concrete FugueDataFrame
, usecompute()
to execute the workflow.Normally, you don’t construct it by yourself, you will just use the methods of it.
- Parameters
workflow (FugueWorkflow) – the parent workflow it belongs to
task (FugueTask) – the task that generates this dataframe
- aggregate(*agg_cols, **kwagg_cols)[source]#
Aggregate on dataframe
- Parameters
df – the dataframe to aggregate on
agg_cols (ColumnExpr) – aggregation expressions
kwagg_cols (ColumnExpr) – aggregation expressions to be renamed to the argument names
self (TDF) –
- Returns
the aggregated result as a dataframe
- Return type
TDF
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f # SELECT MAX(b) AS b FROM df df.aggregate(f.max(col("b"))) # SELECT a, MAX(b) AS x FROM df GROUP BY a df.partition_by("a").aggregate(f.max(col("b")).alias("x")) df.partition_by("a").aggregate(x=f.max(col("b")))
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object
self (TDF) –
- Returns
a new dataframe with the new column types
- Return type
Note
The output dataframe will not change the order of original schema.
Examples
>>> df.alter_columns("a:int,b;str")
- anti_join(*dfs, on=None)[source]#
LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- as_array(columns=None, type_safe=False)[source]#
- Raises
NotImplementedError – don’t call this method
- Parameters
columns (Optional[List[str]]) –
type_safe (bool) –
- Return type
List[Any]
- as_array_iterable(columns=None, type_safe=False)[source]#
- Raises
NotImplementedError – don’t call this method
- Parameters
columns (Optional[List[str]]) –
type_safe (bool) –
- Return type
Iterable[Any]
- assert_eq(*dfs, **params)[source]#
Wrapper of
fugue.workflow.workflow.FugueWorkflow.assert_eq()
to compare this dataframe with other dataframes.- Parameters
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any) –
- Raises
AssertionError – if not equal
- Return type
None
- assert_not_eq(*dfs, **params)[source]#
Wrapper of
fugue.workflow.workflow.FugueWorkflow.assert_not_eq()
to compare this dataframe with other dataframes.- Parameters
dfs (Any) – DataFrames like object
digits – precision on float number comparison, defaults to 8
check_order – if to compare the row orders, defaults to False
check_schema – if compare schemas, defaults to True
check_content – if to compare the row values, defaults to True
no_pandas – if true, it will compare the string representations of the dataframes, otherwise, it will convert both to pandas dataframe to compare, defaults to False
params (Any) –
- Raises
AssertionError – if any dataframe is equal to the first dataframe
- Return type
None
- assign(*args, **kwargs)[source]#
Update existing columns with new values and add new columns
- Parameters
df – the dataframe to set columns
args (ColumnExpr) – column expressions
kwargs (Any) – column expressions to be renamed to the argument names, if a value is not ColumnExpr, it will be treated as a literal
self (TDF) –
- Returns
a new dataframe with the updated values
- Return type
TDF
Tip
This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
from fugue import FugueWorkflow dag = FugueWorkflow() df = dag.df(pandas_df) # add/set 1 as column x df.assign(lit(1,"x")) df.assign(x=1) # add/set x to be a+b df.assign((col("a")+col("b")).alias("x")) df.assign(x=col("a")+col("b")) # cast column a data type to double df.assign(col("a").cast(float)) # cast + new columns df.assign(col("a").cast(float),x=1,y=col("a")+col("b"))
- broadcast()[source]#
Broadcast the current dataframe
- Returns
the broadcasted dataframe
- Return type
- Parameters
self (TDF) –
- checkpoint(storage_type='file')[source]#
- Parameters
self (TDF) –
storage_type (str) –
- Return type
TDF
- compute(*args, **kwargs)[source]#
Trigger the parent workflow to
run()
and to generate and cache the result dataframe this instance represent.Examples
>>> df = FugueWorkflow().df([[0]],"a:int").transform(a_transformer) >>> df.compute().as_pandas() # pandas dataframe >>> df.compute(SparkExecutionEngine).native # spark dataframe
Note
Consider using
fugue.workflow.workflow.FugueWorkflow.run()
instead. Because this method actually triggers the entire workflow to run, so it may be confusing to use this method because extra time may be taken to compute unrelated dataframes.dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df2 = dag.df([[0]],"b:int") dag.run(SparkExecutionEngine) df1.result.show() df2.result.show()
- Return type
- cross_join(*dfs)[source]#
CROSS Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
self (TDF) –
- Returns
joined dataframe
- Return type
- deterministic_checkpoint(storage_type='file', lazy=False, partition=None, single=False, namespace=None, **kwargs)[source]#
Cache the dataframe as a temporary file
- Parameters
storage_type (str) – can be either
file
ortable
, defaults tofile
lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)
partition (Optional[Any]) – Partition like object, defaults to None.
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – paramteters for the underlying execution engine function
namespace (Optional[Any]) – a value to control determinism, defaults to None.
self (TDF) –
- Returns
the cached dataframe
- Return type
TDF
Note
The difference vs
strong_checkpoint()
is that this checkpoint is not removed after execution, so it can take effect cross execution if the dependent compute logic is not changed.
- distinct()[source]#
Get distinct dataframe. Equivalent to
SELECT DISTINCT * FROM df
- Returns
dataframe with unique records
- Parameters
self (TDF) –
- Return type
TDF
- drop(columns, if_exists=False)[source]#
Drop columns from the dataframe.
- Parameters
columns (List[str]) – columns to drop
if_exists (bool) – if setting to True, it will ignore non-existent columns, defaults to False
self (TDF) –
- Returns
the dataframe after dropping columns
- Return type
- dropna(how='any', thresh=None, subset=None)[source]#
Drops records containing NA records
- Parameters
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
self (TDF) –
- Returns
dataframe with incomplete records dropped
- Return type
TDF
- property empty: bool#
- Raises
NotImplementedError – don’t call this method
- fillna(value, subset=None)[source]#
Fills NA values with replacement values
- Parameters
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
self (TDF) –
- Returns
dataframe with NA records filled
- Return type
TDF
- filter(condition)[source]#
Filter rows by the given condition
- Parameters
df – the dataframe to be filtered
condition (ColumnExpr) – (boolean) column expression
self (TDF) –
- Returns
a new filtered dataframe
- Return type
TDF
New Since
0.6.0
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f from fugue import FugueWorkflow dag = FugueWorkflow() df = dag.df(pandas_df) df.filter((col("a")>1) & (col("b")=="x")) df.filter(f.coalesce(col("a"),col("b"))>1)
- full_outer_join(*dfs, on=None)[source]#
CROSS Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- inner_join(*dfs, on=None)[source]#
INNER Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- intersect(*dfs, distinct=True)[source]#
Intersect this dataframe with
dfs
.- Parameters
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after intersection, default to True
self (TDF) –
- Returns
intersected dataframe
- Return type
TDF
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- property is_bounded: bool#
- Raises
NotImplementedError – don’t call this method
- property is_local: bool#
- Raises
NotImplementedError – don’t call this method
- join(*dfs, how, on=None)[source]#
Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- left_anti_join(*dfs, on=None)[source]#
LEFT ANTI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- left_outer_join(*dfs, on=None)[source]#
LEFT OUTER Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- left_semi_join(*dfs, on=None)[source]#
LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- property name: str#
Name of its task spec
- property native: Any#
The native object this Dataset class wraps
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
Any
- property num_partitions: int#
- Raises
NotImplementedError – don’t call this method
- out_transform(using, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#
Transform this dataframe using transformer. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.out_transform()
Please read the Transformer Tutorial
- Parameters
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered output transformer/cotransformer
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then calltransform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Optional[Any]) – RPChandler like object, defaults to None
self (TDF) –
- Return type
None
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- output(using, params=None, pre_partition=None)[source]#
Run a outputter on this dataframe. It’s a simple wrapper of
fugue.workflow.workflow.FugueWorkflow.output()
Please read the Outputter Tutorial
- Parameters
using (Any) – outputter-like object, if it is a string, then it must be the alias of a registered outputter
params (Optional[Any]) – Parameters like object to run the outputter, defaults to None. The outputter will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. The outputter will be able to access this value from
partition_spec()
- Return type
None
- partition(*args, **kwargs)[source]#
Partition the current dataframe. Please read the Partition Tutorial
- Parameters
args (Any) – Partition like object
kwargs (Any) – Partition like object
self (TDF) –
- Returns
dataframe with the partition hint
- Return type
Note
Normally this step is fast because it’s to add a partition hint for the next step.
- partition_by(*keys, **kwargs)[source]#
Partition the current dataframe by keys. Please read the Partition Tutorial. This is a wrapper of
partition()
- Parameters
keys (str) – partition keys
kwargs (Any) – Partition like object excluding
by
andpartition_by
self (TDF) –
- Returns
dataframe with the partition hint
- Return type
- property partition_spec: PartitionSpec#
The partition spec set on the dataframe for next steps to use
Examples
dag = FugueWorkflow() df = dag.df([[0],[1]], "a:int") assert df.partition_spec.empty df2 = df.partition(by=["a"]) assert df.partition_spec.empty assert df2.partition_spec == PartitionSpec(by=["a"])
- per_partition_by(*keys)[source]#
Partition the current dataframe by keys so each physical partition contains only one logical partition. Please read the Partition Tutorial. This is a wrapper of
partition()
- Parameters
keys (str) – partition keys
self (TDF) –
- Returns
dataframe that is both logically and physically partitioned by
keys
- Return type
Note
This is a hint but not enforced, certain execution engines will not respect this hint.
- per_row()[source]#
Partition the current dataframe to one row per partition. Please read the Partition Tutorial. This is a wrapper of
partition()
- Returns
dataframe that is evenly partitioned by row count
- Return type
- Parameters
self (TDF) –
Note
This is a hint but not enforced, certain execution engines will not respect this hint.
- persist()[source]#
Persist the current dataframe
- Returns
the persisted dataframe
- Return type
- Parameters
self (TDF) –
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.persist
method is considered as weak checkpoint. Sometimes, it may be necessary to use strong checkpint, which ischeckpoint()
- process(using, schema=None, params=None, pre_partition=None)[source]#
Run a processor on this dataframe. It’s a simple wrapper of
fugue.workflow.workflow.FugueWorkflow.process()
Please read the Processor Tutorial
- Parameters
using (Any) – processor-like object, if it is a string, then it must be the alias of a registered processor
schema (Optional[Any]) – Schema like object, defaults to None. The processor will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The processor will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. The processor will be able to access this value from
partition_spec()
self (TDF) –
- Returns
result dataframe
- Return type
- rename(*args, **kwargs)[source]#
Rename the dataframe using a mapping dict
- Parameters
args (Any) – list of dicts containing rename maps
kwargs (Any) – rename map
self (TDF) –
- Returns
a new dataframe with the new names
- Return type
Note
This interface is more flexible than
fugue.dataframe.dataframe.DataFrame.rename()
Examples
>>> df.rename({"a": "b"}, c="d", e="f")
- property result: DataFrame#
The concrete DataFrame obtained from
compute()
. This property will not trigger compute again, but compute should have been called earlier and the result is cached.
- right_outer_join(*dfs, on=None)[source]#
RIGHT OUTER Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- sample(n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
n (Optional[int]) – number of rows to sample, one and only one of
n
andfact
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfact
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
self (TDF) –
- Returns
sampled dataframe
- Return type
TDF
- save(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]#
Save this dataframe to a persistent storage
- Parameters
path (str) – output path
fmt (str) – format hint can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Optional[Any]) – Partition like object, how to partition the dataframe before saving, defaults to empty
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Save & Load.
- save_and_use(path, fmt='', mode='overwrite', partition=None, single=False, **kwargs)[source]#
Save this dataframe to a persistent storage and load back to use in the following steps
- Parameters
path (str) – output path
fmt (str) – format hint can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition (Optional[Any]) – Partition like object, how to partition the dataframe before saving, defaults to empty
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
self (TDF) –
- Return type
TDF
For more details and examples, read Save & Load.
- select(*columns, where=None, having=None, distinct=False)[source]#
The functional interface for SQL select statement
- Parameters
columns (Union[str, ColumnExpr]) – column expressions, for strings they will represent the column names
where (Optional[ColumnExpr]) –
WHERE
condition expression, defaults to Nonehaving (Optional[ColumnExpr]) –
having
condition expression, defaults to None. It is used whencols
contains aggregation columns, defaults to Nonedistinct (bool) – whether to return distinct result, defaults to False
self (TDF) –
- Returns
the select result as a new dataframe
- Return type
TDF
New Since
0.6.0
Attention
This interface is experimental, it’s subjected to change in new versions.
See also
Please find more expression examples in
fugue.column.sql
andfugue.column.functions
Examples
import fugue.column.functions as f from fugue import FugueWorkflow dag = FugueWorkflow() df = dag.df(pandas_df) # select existed and new columns df.select("a","b",lit(1,"another"))) df.select("a",(col("b")+lit(1)).alias("x")) # select distinct df.select("a","b",lit(1,"another")),distinct=True) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df df.select(f.count_distinct(all_cols()).alias("x")) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a df.select("a",f.max(col("b")+lit(1)).alias("x")) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 df.select( "a",f.max(col("b")+lit(1)).alias("x"), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 )
- semi_join(*dfs, on=None)[source]#
LEFT SEMI Join this dataframe with dataframes. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.join()
. Read Join tutorials on workflow and engine for details- Parameters
dfs (Any) – DataFrames like object
on (Optional[Iterable[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys. Default to None
self (TDF) –
- Returns
joined dataframe
- Return type
- show(n=10, with_count=False, title=None, best_width=100)[source]#
Show the dataframe. See examples.
- Parameters
n (int) – max number of rows, defaults to 10
with_count (bool) – whether to show total count, defaults to False
title (Optional[str]) – title to display on top of the dataframe, defaults to None
best_width (int) – max width for the output table, defaults to 100
- Return type
None
Note
When you call this method, it means you want the dataframe to be printed when the workflow executes. So the dataframe won’t show until you run the workflow.
When
with_count
is True, it can trigger expensive calculation for a distributed dataframe. So if you call this function directly, you may need topersist()
the dataframe. Or you can turn on tutorial:tutorials/advanced/useful_config:auto persist
- strong_checkpoint(storage_type='file', lazy=False, partition=None, single=False, **kwargs)[source]#
Cache the dataframe as a temporary file
- Parameters
storage_type (str) – can be either
file
ortable
, defaults tofile
lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)
partition (Optional[Any]) – Partition like object, defaults to None.
single (bool) – force the output as a single file, defaults to False
kwargs (Any) – paramteters for the underlying execution engine function
self (TDF) –
- Returns
the cached dataframe
- Return type
TDF
Note
Strong checkpoint guarantees the output dataframe compute dependency is from the temporary file. Use strong checkpoint only when
weak_checkpoint()
can’t be used.Strong checkpoint file will be removed after the execution of the workflow.
- subtract(*dfs, distinct=True)[source]#
Subtract
dfs
from this dataframe.- Parameters
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after subtraction, default to True
self (TDF) –
- Returns
subtracted dataframe
- Return type
TDF
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- take(n, presort=None, na_position='last')[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort
- Parameters
n (int) – number of rows to return
presort (Optional[str]) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
self (TDF) –
- Returns
n rows of DataFrame per partition
- Return type
TDF
- transform(using, schema=None, params=None, pre_partition=None, ignore_errors=[], callback=None)[source]#
Transform this dataframe using transformer. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.transform()
Please read the Transformer Tutorial
- Parameters
using (Any) – transformer-like object, if it is a string, then it must be the alias of a registered transformer/cotransformer
schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from
output_schema()
params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from
params()
pre_partition (Optional[Any]) – Partition like object, defaults to None. It’s recommended to use the equivalent wayt, which is to call
partition()
and then calltransform()
without this parameterignore_errors (List[Any]) – list of exception types the transformer can ignore, defaults to empty list
callback (Optional[Any]) – RPChandler like object, defaults to None
self (TDF) –
- Returns
the transformed dataframe
- Return type
Note
transform()
can be lazy and will return the transformed dataframe,out_transform()
is guaranteed to execute immediately (eager) and return nothing
- union(*dfs, distinct=True)[source]#
Union this dataframe with
dfs
.- Parameters
dfs (Any) – DataFrames like object
distinct (bool) – whether to perform distinct after union, default to True
self (TDF) –
- Returns
unioned dataframe
- Return type
TDF
Note
Currently, all dataframes in
dfs
must have identical schema, otherwise exception will be thrown.
- weak_checkpoint(lazy=False, **kwargs)[source]#
Cache the dataframe in memory
- Parameters
lazy (bool) – whether it is a lazy checkpoint, defaults to False (eager)
kwargs (Any) – paramteters for the underlying execution engine function
self (TDF) –
- Returns
the cached dataframe
- Return type
TDF
Note
Weak checkpoint in most cases is the best choice for caching a dataframe to avoid duplicated computation. However it does not guarantee to break up the the compute dependency for this dataframe, so when you have very complicated compute, you may encounter issues such as stack overflow. Also, weak checkpoint normally caches the dataframe in memory, if memory is a concern, then you should consider
strong_checkpoint()
- property workflow: FugueWorkflow#
The parent workflow
- yield_dataframe_as(name, as_local=False)[source]#
Yield a dataframe that can be accessed without the current execution engine
- Parameters
name (str) – the name of the yielded dataframe
as_local (bool) – yield the local version of the dataframe
self (TDF) –
- Return type
None
Note
When
as_local
is True, it can trigger an additional compute to do the conversion. To avoid recompute, you should addpersist
before yielding.
- yield_file_as(name)[source]#
Cache the dataframe in file
- Parameters
name (str) – the name of the yielded dataframe
self (TDF) –
- Return type
None
Note
In only the following cases you can yield file/table:
you have not checkpointed (persisted) the dataframe, for example
df.yield_file_as("a")
you have used
deterministic_checkpoint()
, for exampledf.deterministic_checkpoint().yield_file_as("a")
yield is workflow, compile level logic
For the first case, the yield will also be a strong checkpoint so whenever you yield a dataframe as a file, the dataframe has been saved as a file and loaded back as a new dataframe.
- yield_table_as(name)[source]#
Cache the dataframe as a table
- Parameters
name (str) – the name of the yielded dataframe
self (TDF) –
- Return type
None
Note
In only the following cases you can yield file/table:
you have not checkpointed (persisted) the dataframe, for example
df.yield_file_as("a")
you have used
deterministic_checkpoint()
, for exampledf.deterministic_checkpoint().yield_file_as("a")
yield is workflow, compile level logic
For the first case, the yield will also be a strong checkpoint so whenever you yield a dataframe as a file, the dataframe has been saved as a file and loaded back as a new dataframe.
- zip(*dfs, how='inner', partition=None, temp_path=None, to_file_threshold=-1)[source]#
Zip this data frame with multiple dataframes together with given partition specifications. It’s a wrapper of
fugue.workflow.workflow.FugueWorkflow.zip()
.- Parameters
dfs (Any) – DataFrames like object
how (str) – can accept
inner
,left_outer
,right_outer
,full_outer
,cross
, defaults toinner
partition (Optional[Any]) – Partition like object, defaults to None.
temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than
to_file_threshold
), defaults to Noneto_file_threshold (Any) – file byte size threshold, defaults to -1
self (TDF) –
- Returns
a zipped dataframe
- Return type
Note
dfs
must be list like, the zipped dataframe will be list likedfs
is fine to be emptyIf you want dict-like zip, use
fugue.workflow.workflow.FugueWorkflow.zip()
See also
Read CoTransformer and Zip & Comap for details
- class fugue.workflow.workflow.WorkflowDataFrames(*args, **kwargs)[source]#
Bases:
DataFrames
Ordered dictionary of WorkflowDataFrames. There are two modes: with keys and without keys. If without key
_<n>
will be used as the key for each dataframe, and it will be treated as an array in Fugue framework.It’s immutable, once initialized, you can’t add or remove element from it.
It’s a subclass of
DataFrames
, but different from DataFrames, in the initialization you should always useWorkflowDataFrame
, and they should all come from the sameFugueWorkflow
.Examples
dag = FugueWorkflow() df1 = dag.df([[0]],"a:int").transform(a_transformer) df2 = dag.df([[0]],"b:int") dfs1 = WorkflowDataFrames(df1, df2) # as array dfs2 = WorkflowDataFrames([df1, df2]) # as array dfs3 = WorkflowDataFrames(a=df1, b=df2) # as dict dfs4 = WorkflowDataFrames(dict(a=df1, b=df2)) # as dict dfs5 = WorkflowDataFrames(dfs4, c=df2) # copy and update dfs5["b"].show() # how you get element when it's a dict dfs1[0].show() # how you get element when it's an array
- Parameters
args (Any) –
kwargs (Any) –
- property workflow: FugueWorkflow#
The parent workflow
fugue.api#
fugue.constants#
- fugue.constants.register_global_conf(conf, on_dup=0)[source]#
Register global Fugue configs that can be picked up by any Fugue execution engines as the base configs.
- Parameters
conf (Dict[str, Any]) – the config dictionary
on_dup (int) – see
triad.collections.dict.ParamDict.update()
, defaults toParamDict.OVERWRITE
- Return type
None
Note
When using
ParamDict.THROW
oron_dup
, it’s transactional. If any key inconf
is already in global config and the value is different from the new value, then ValueError will be thrown.Examples
from fugue import register_global_conf, NativeExecutionEngine register_global_conf({"my.value",1}) engine = NativeExecutionEngine() assert 1 == engine.conf["my.value"] engine = NativeExecutionEngine({"my.value",2}) assert 2 == engine.conf["my.value"]
fugue.dev#
All modeuls for developing and extending Fugue
fugue.exceptions#
- exception fugue.exceptions.FugueBug[source]#
Bases:
FugueError
Fugue internal bug
- exception fugue.exceptions.FugueDataFrameError[source]#
Bases:
FugueError
Fugue dataframe related error
- exception fugue.exceptions.FugueDataFrameInitError[source]#
Bases:
FugueDataFrameError
Fugue dataframe initialization error
- exception fugue.exceptions.FugueDataFrameOperationError[source]#
Bases:
FugueDataFrameError
Fugue dataframe invalid operation
- exception fugue.exceptions.FugueDatasetEmptyError[source]#
Bases:
FugueDataFrameError
Fugue dataframe is empty
- exception fugue.exceptions.FugueInterfacelessError[source]#
Bases:
FugueWorkflowCompileError
Fugue interfaceless exceptions
- exception fugue.exceptions.FugueInvalidOperation[source]#
Bases:
FugueError
Invalid operation on the Fugue framework
- exception fugue.exceptions.FuguePluginsRegistrationError[source]#
Bases:
FugueError
Fugue plugins registration error
- exception fugue.exceptions.FugueSQLError[source]#
Bases:
FugueWorkflowCompileError
Fugue SQL error
- exception fugue.exceptions.FugueSQLRuntimeError[source]#
Bases:
FugueWorkflowRuntimeError
Fugue SQL runtime error
- exception fugue.exceptions.FugueSQLSyntaxError[source]#
Bases:
FugueSQLError
Fugue SQL syntax error
- exception fugue.exceptions.FugueWorkflowCompileError[source]#
Bases:
FugueWorkflowError
Fugue workflow compile time error
- exception fugue.exceptions.FugueWorkflowCompileValidationError[source]#
Bases:
FugueWorkflowCompileError
Fugue workflow compile time validation error
- exception fugue.exceptions.FugueWorkflowError[source]#
Bases:
FugueError
Fugue workflow exceptions
- exception fugue.exceptions.FugueWorkflowRuntimeError[source]#
Bases:
FugueWorkflowError
Fugue workflow compile time error
- exception fugue.exceptions.FugueWorkflowRuntimeValidationError[source]#
Bases:
FugueWorkflowRuntimeError
Fugue workflow runtime validation error
fugue.plugins#
fugue.registry#
fugue_sql#
fugue_sql.exceptions#
fugue_duckdb#
fugue_duckdb.dask#
- class fugue_duckdb.dask.DuckDaskExecutionEngine(conf=None, connection=None, dask_client=None)[source]#
Bases:
DuckExecutionEngine
A hybrid engine of DuckDB and Dask. Most operations will be done by DuckDB, but for
map
, it will use Dask to fully utilize local CPUs. The engine can be used with a real Dask cluster, but practically, this is more useful for local process.- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
connection (Optional[DuckDBPyConnection]) – DuckDB connection
dask_client (Optional[Client]) –
- convert_yield_dataframe(df, as_local)[source]#
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns
another DataFrame that can be used after this execution engine stops
- Return type
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
fugue_duckdb.dataframe#
- class fugue_duckdb.dataframe.DuckDataFrame(rel)[source]#
Bases:
LocalBoundedDataFrame
DataFrame that wraps DuckDB
DuckDBPyRelation
.- Parameters
rel (DuckDBPyRelation) –
DuckDBPyRelation
object
- property alias: str#
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property native: DuckDBPyRelation#
DuckDB relation object
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
DuckDBPyRelation
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue_duckdb.execution_engine#
- class fugue_duckdb.execution_engine.DuckDBEngine(execution_engine)[source]#
Bases:
SQLEngine
DuckDB SQL backend implementation.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: Optional[str]#
- property is_distributed: bool#
Whether this engine is a distributed engine
- load_table(table, **kwargs)[source]#
Load table as a dataframe
- Parameters
table (str) – the table name
kwargs (Any) –
- Returns
an engine compatible dataframe
- Return type
- save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]#
Save the dataframe to a table
- Parameters
df (DataFrame) – the dataframe to save
table (str) – the table name
mode (str) – can accept
overwrite
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults None
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
- select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- class fugue_duckdb.execution_engine.DuckExecutionEngine(conf=None, connection=None)[source]#
Bases:
ExecutionEngine
The execution engine using DuckDB. Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
connection (Optional[DuckDBPyConnection]) – DuckDB connection
- property connection: DuckDBPyConnection#
- convert_yield_dataframe(df, as_local)[source]#
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns
another DataFrame that can be used after this execution engine stops
- Return type
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- property fs: FileSystem#
File system of this engine instance
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_distributed: bool#
Whether this engine is a distributed engine
- join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- property log: Logger#
Logger of this engine instance
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- stop_engine()[source]#
Custom logic to stop the execution engine, defaults to no operation
- Return type
None
- subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
fugue_duckdb.ibis_engine#
- class fugue_duckdb.ibis_engine.DuckDBIbisEngine(execution_engine)[source]#
Bases:
IbisEngine
- Parameters
execution_engine (ExecutionEngine) –
- select(dfs, ibis_func)[source]#
Execute the ibis select expression.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
ibis_func (Callable[[BaseBackend], TableExpr]) – the ibis compute function
- Returns
result of the ibis function
- Return type
Note
This interface is experimental, so it is subjected to change.
fugue_duckdb.registry#
fugue_spark#
fugue_spark.dataframe#
- class fugue_spark.dataframe.SparkDataFrame(df=None, schema=None)[source]#
Bases:
DataFrame
DataFrame that wraps Spark DataFrame. Please also read the DataFrame Tutorial to understand this Fugue concept
- Parameters
df (Any) –
pyspark.sql.DataFrame
schema (Any) – Schema like object or
pyspark.sql.types.StructType
, defaults to None.
Note
You should use
fugue_spark.execution_engine.SparkExecutionEngine.to_df()
instead of construction it by yourself.If
schema
is set, then there will be type cast on the Spark DataFrame if the schema is different.
- property alias: str#
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property is_bounded: bool#
Whether this dataframe is bounded
- property is_local: bool#
Whether this dataframe is a local Dataset
- property native: DataFrame#
The wrapped Spark DataFrame
- Return type
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
DataFrame
- property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue_spark.execution_engine#
- class fugue_spark.execution_engine.SparkExecutionEngine(spark_session=None, conf=None)[source]#
Bases:
ExecutionEngine
The execution engine based on
SparkSession
.Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters
spark_session (Optional[SparkSession]) – Spark session, defaults to None to get the Spark session by
getOrCreate()
conf (Any) – Parameters like object defaults to None, read the Fugue Configuration Tutorial to learn Fugue specific options
- broadcast(df)[source]#
Broadcast the dataframe to all workers for a distributed computing framework
- Parameters
df (DataFrame) – the input dataframe
- Returns
the broadcasted dataframe
- Return type
- dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- property fs: FileSystem#
File system of this engine instance
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_distributed: bool#
Whether this engine is a distributed engine
- property is_spark_connect: bool#
- join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- property log: Logger#
Logger of this engine instance
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- register(df, name)[source]#
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
- Parameters
df (DataFrame) –
name (str) –
- Return type
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- property spark_session: SparkSession#
- Returns
The wrapped spark session
- Return type
- subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- to_df(df, schema=None)[source]#
Convert a data structure to
SparkDataFrame
- Parameters
data –
DataFrame
,pyspark.sql.DataFrame
,pyspark.RDD
, pandas DataFrame or list or iterable of arraysschema (Optional[Any]) – Schema like object or
pyspark.sql.types.StructType
defaults to None.df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
if the input is already
SparkDataFrame
, it should return itselfFor
RDD
, list or iterable of arrays,schema
must be specifiedWhen
schema
is not None, a potential type cast may happen to ensure the dataframe’s schema.all other methods in the engine can take arbitrary dataframes and call this method to convert before doing anything
- class fugue_spark.execution_engine.SparkMapEngine(execution_engine)[source]#
Bases:
MapEngine
- Parameters
execution_engine (ExecutionEngine) –
- property is_distributed: bool#
Whether this engine is a distributed engine
- property is_spark_connect: bool#
Whether the spark session is created by spark connect
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- class fugue_spark.execution_engine.SparkSQLEngine(execution_engine)[source]#
Bases:
SQLEngine
Spark SQL execution implementation.
- Parameters
execution_engine (ExecutionEngine) – it must be
SparkExecutionEngine
- Raises
ValueError – if the engine is not
SparkExecutionEngine
- property dialect: Optional[str]#
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
fugue_spark.ibis_engine#
- class fugue_spark.ibis_engine.SparkIbisEngine(execution_engine)[source]#
Bases:
IbisEngine
- Parameters
execution_engine (ExecutionEngine) –
- select(dfs, ibis_func)[source]#
Execute the ibis select expression.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
ibis_func (Callable[[BaseBackend], TableExpr]) – the ibis compute function
- Returns
result of the ibis function
- Return type
Note
This interface is experimental, so it is subjected to change.
fugue_spark.registry#
fugue_dask#
fugue_dask.dataframe#
- class fugue_dask.dataframe.DaskDataFrame(df=None, schema=None, num_partitions=0, type_safe=True)[source]#
Bases:
DataFrame
DataFrame that wraps Dask DataFrame. Please also read the DataFrame Tutorial to understand this Fugue concept
- Parameters
df (Any) –
dask.dataframe.DataFrame
, pandas DataFrame or list or iterable of arraysschema (Any) – Schema like object or
pyspark.sql.types.StructType
, defaults to None.num_partitions (int) – initial number of partitions for the dask dataframe defaults to 0 to get the value from fugue.dask.default.partitions
type_safe – whether to cast input data to ensure type safe, defaults to True
Note
For
dask.dataframe.DataFrame
, schema must be None- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property is_bounded: bool#
Whether this dataframe is bounded
- property is_local: bool#
Whether this dataframe is a local Dataset
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
- property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue_dask.execution_engine#
- class fugue_dask.execution_engine.DaskExecutionEngine(dask_client=None, conf=None)[source]#
Bases:
ExecutionEngine
The execution engine based on Dask.
Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters
dask_client (Optional[Client]) – Dask distributed client, defaults to None. If None, then it will try to get the current active global client. If there is no active client, it will create and use a global Client(processes=True)
conf (Any) – Parameters like object defaults to None, read the Fugue Configuration Tutorial to learn Fugue specific options
Note
You should setup Dask single machine or distributed environment in the common way. Before initializing
DaskExecutionEngine
- dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- property fs: FileSystem#
File system of this engine instance
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_distributed: bool#
Whether this engine is a distributed engine
- join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- property log: Logger#
Logger of this engine instance
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- property pl_utils: DaskUtils#
Pandas-like dataframe utils
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- to_df(df, schema=None)[source]#
Convert a data structure to
DaskDataFrame
- Parameters
data –
DataFrame
,dask.dataframe.DataFrame
, pandas DataFrame or list or iterable of arraysschema (Optional[Any]) – Schema like object, defaults to None.
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
if the input is already
DaskDataFrame
, it should return itselfFor list or iterable of arrays,
schema
must be specifiedWhen
schema
is not None, a potential type cast may happen to ensure the dataframe’s schema.all other methods in the engine can take arbitrary dataframes and call this method to convert before doing anything
- class fugue_dask.execution_engine.DaskMapEngine(execution_engine)[source]#
Bases:
MapEngine
- Parameters
execution_engine (ExecutionEngine) –
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- class fugue_dask.execution_engine.QPDDaskEngine(execution_engine)[source]#
Bases:
SQLEngine
QPD execution implementation.
- Parameters
execution_engine (ExecutionEngine) –
- property dialect: Optional[str]#
- property is_distributed: bool#
Whether this engine is a distributed engine
- select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- fugue_dask.execution_engine.to_dask_engine_df(df, schema=None)[source]#
Convert a data structure to
DaskDataFrame
- Parameters
data –
DataFrame
,dask.dataframe.DataFrame
, pandas DataFrame or list or iterable of arraysschema (Optional[Any]) – Schema like object, defaults to None.
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
if the input is already
DaskDataFrame
, it should return itselfFor list or iterable of arrays,
schema
must be specifiedWhen
schema
is not None, a potential type cast may happen to ensure the dataframe’s schema.all other methods in the engine can take arbitrary dataframes and call this method to convert before doing anything
fugue_dask.ibis_engine#
- class fugue_dask.ibis_engine.DaskIbisEngine(execution_engine)[source]#
Bases:
IbisEngine
- Parameters
execution_engine (ExecutionEngine) –
- select(dfs, ibis_func)[source]#
Execute the ibis select expression.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
ibis_func (Callable[[BaseBackend], TableExpr]) – the ibis compute function
- Returns
result of the ibis function
- Return type
Note
This interface is experimental, so it is subjected to change.
fugue_dask.registry#
fugue_ray#
fugue_ray.dataframe#
- class fugue_ray.dataframe.RayDataFrame(df=None, schema=None, internal_schema=False)[source]#
Bases:
DataFrame
DataFrame that wraps Ray DataSet. Please also read the DataFrame Tutorial to understand this Fugue concept
- Parameters
df (Any) –
ray.data.Dataset
,pyarrow.Table
,pandas.DataFrame
, FugueDataFrame
, or list or iterable of arraysschema (Any) – Schema like object, defaults to None. If the schema is different from the
df
schema, then type casts will happen.internal_schema (bool) – for internal schema, it means the schema is guaranteed by the provider to be consistent with the schema of
df
, so no type cast will happen. Defaults to False. This is for internal use only.
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property is_bounded: bool#
Whether this dataframe is bounded
- property is_local: bool#
Whether this dataframe is a local Dataset
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
- property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue_ray.execution_engine#
- class fugue_ray.execution_engine.RayExecutionEngine(conf=None, connection=None)[source]#
Bases:
DuckExecutionEngine
A hybrid engine of Ray and DuckDB as Phase 1 of Fugue Ray integration. Most operations will be done by DuckDB, but for
map
, it will use Ray.- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
connection (Optional[DuckDBPyConnection]) – DuckDB connection
- convert_yield_dataframe(df, as_local)[source]#
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns
another DataFrame that can be used after this execution engine stops
- Return type
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- property is_distributed: bool#
Whether this engine is a distributed engine
- load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue_ray.execution_engine.RayMapEngine(execution_engine)[source]#
Bases:
MapEngine
- Parameters
execution_engine (ExecutionEngine) –
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
fugue_ray.registry#
fugue_ibis#
fugue_ibis.execution#
fugue_ibis.execution.ibis_engine#
- class fugue_ibis.execution.ibis_engine.IbisEngine(execution_engine)[source]#
Bases:
EngineFacet
The abstract base class for different ibis execution implementations.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this ibis engine will run on
- property is_distributed: bool#
Whether this engine is a distributed engine
- abstract select(dfs, ibis_func)[source]#
Execute the ibis select expression.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
ibis_func (Callable[[BaseBackend], TableExpr]) – the ibis compute function
- Returns
result of the ibis function
- Return type
Note
This interface is experimental, so it is subjected to change.
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (AnyDataFrame) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
fugue_ibis.execution.pandas_backend#
- class fugue_ibis.execution.pandas_backend.PandasIbisEngine(execution_engine)[source]#
Bases:
IbisEngine
- Parameters
execution_engine (ExecutionEngine) –
- select(dfs, ibis_func)[source]#
Execute the ibis select expression.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
ibis_func (Callable[[BaseBackend], TableExpr]) – the ibis compute function
- Returns
result of the ibis function
- Return type
Note
This interface is experimental, so it is subjected to change.
fugue_ibis.dataframe#
- class fugue_ibis.dataframe.IbisDataFrame(table, schema=None)[source]#
Bases:
DataFrame
DataFrame that wraps Ibis
Table
.- Parameters
rel –
DuckDBPyRelation
objecttable (TableExpr) –
schema (Any) –
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property columns: List[str]#
The column names of the dataframe
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property is_bounded: bool#
Whether this dataframe is bounded
- property is_local: bool#
Whether this dataframe is a local Dataset
- property native: TableExpr#
Ibis Table object
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
TableExpr
- property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue_ibis.execution_engine#
- class fugue_ibis.execution_engine.IbisExecutionEngine(conf)[source]#
Bases:
ExecutionEngine
The base execution engine using Ibis. Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
- abstract create_non_ibis_execution_engine()[source]#
Create the execution engine that handles operations beyond SQL
- Return type
- dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- property fs: FileSystem#
File system of this engine instance
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- property ibis_sql_engine: IbisSQLEngine#
- intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- property log: Logger#
Logger of this engine instance
- property non_ibis_engine: ExecutionEngine#
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue_ibis.execution_engine.IbisMapEngine(execution_engine)[source]#
Bases:
MapEngine
IbisExecutionEngine’s MapEngine, it is a wrapper of the map engine of
non_ibis_engine()
- Parameters
execution_engine (ExecutionEngine) – the execution engine this map engine will run on
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- map_bag(bag, map_func, partition_spec, on_init=None)[source]#
Apply a function to each partition after you partition the bag in a specified way.
- Parameters
df – input dataframe
map_func (Callable[[BagPartitionCursor, LocalBag], LocalBag]) – the function to apply on every logical partition
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, Bag], Any]]) – callback function when the physical partition is initializaing, defaults to None
bag (Bag) –
- Returns
the bag after the map operation
- Return type
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- class fugue_ibis.execution_engine.IbisSQLEngine(execution_engine)[source]#
Bases:
SQLEngine
Ibis SQL backend base implementation.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- abstract property backend: BaseBackend#
- load_table(table, **kwargs)[source]#
Load table as a dataframe
- Parameters
table (str) – the table name
kwargs (Any) –
- Returns
an engine compatible dataframe
- Return type
- query_to_table(statement, dfs)[source]#
- Parameters
statement (str) –
dfs (Dict[str, Any]) –
- Return type
TableExpr
- save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]#
Save the dataframe to a table
- Parameters
df (DataFrame) – the dataframe to save
table (str) – the table name
mode (str) – can accept
overwrite
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults None
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
- select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- table_exists(table)[source]#
Whether the table exists
- Parameters
table (str) – the table name
- Returns
whether the table exists
- Return type
bool
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
- Parameters
df (DataFrame) –
n (int) –
presort (str) –
na_position (str) –
partition_spec (Optional[PartitionSpec]) –
- Return type
fugue_ibis.extensions#
- fugue_ibis.extensions.as_fugue(expr, ibis_engine=None)[source]#
Convert a lazy ibis object to Fugue workflow dataframe
- Parameters
expr (TableExpr) – the actual instance should be LazyIbisObject
ibis_engine (Optional[Any]) –
- Returns
the Fugue workflow dataframe
- Return type
Examples
# non-magical approach import fugue as FugueWorkflow from fugue_ibis import as_ibis, as_fugue dag = FugueWorkflow() df1 = dag.df([[0]], "a:int") df2 = dag.df([[1]], "a:int") idf1 = as_ibis(df1) idf2 = as_ibis(df2) idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1) as_fugue(result).show()
# magical approach import fugue as FugueWorkflow import fugue_ibis # must import dag = FugueWorkflow() idf1 = dag.df([[0]], "a:int").as_ibis() idf2 = dag.df([[1]], "a:int").as_ibis() idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1).as_fugue() result.show()
Note
The magic is that when importing
fugue_ibis
, the functionsas_ibis
andas_fugue
are added to the correspondent classes so you can use them as if they are parts of the original classes.This is an idea similar to patching. Ibis uses this programming model a lot. Fugue provides this as an option.
Note
The returned object is not really a
TableExpr
, it’s a ‘super lazy’ object that will be translated intoTableExpr
at run time. This is because to compile an ibis execution graph, the input schemas must be known. However, in Fugue, this is not always true. For example if the previous step is to pivot a table, then the output schema can be known at runtime. So in order to be a part of Fugue, we need to be able to construct ibis expressions before knowing the input schemas.
- fugue_ibis.extensions.as_ibis(df)[source]#
Convert the Fugue workflow dataframe to an ibis table for ibis operations.
- Parameters
df (WorkflowDataFrame) – the Fugue workflow dataframe
- Returns
the object representing the ibis table
- Return type
TableExpr
Examples
# non-magical approach import fugue as FugueWorkflow from fugue_ibis import as_ibis, as_fugue dag = FugueWorkflow() df1 = dag.df([[0]], "a:int") df2 = dag.df([[1]], "a:int") idf1 = as_ibis(df1) idf2 = as_ibis(df2) idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1) as_fugue(result).show()
# magical approach import fugue as FugueWorkflow import fugue_ibis # must import dag = FugueWorkflow() idf1 = dag.df([[0]], "a:int").as_ibis() idf2 = dag.df([[1]], "a:int").as_ibis() idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1).as_fugue() result.show()
Note
The magic is that when importing
fugue_ibis
, the functionsas_ibis
andas_fugue
are added to the correspondent classes so you can use them as if they are parts of the original classes.This is an idea similar to patching. Ibis uses this programming model a lot. Fugue provides this as an option.
Note
The returned object is not really a
TableExpr
, it’s a ‘super lazy’ object that will be translated intoTableExpr
at run time. This is because to compile an ibis execution graph, the input schemas must be known. However, in Fugue, this is not always true. For example if the previous step is to pivot a table, then the output schema can be known at runtime. So in order to be a part of Fugue, we need to be able to construct ibis expressions before knowing the input schemas.
- fugue_ibis.extensions.run_ibis(ibis_func, ibis_engine=None, **dfs)[source]#
Run an ibis workflow wrapped in
ibis_func
- Parameters
ibis_func (Callable[[BaseBackend], TableExpr]) – the function taking in an ibis backend, and returning an Ibis TableExpr
ibis_engine (Optional[Any]) – an object that together with
ExecutionEngine
can determineIbisEngine
, defaults to Nonedfs (WorkflowDataFrame) – dataframes in the same workflow
- Returns
the output workflow dataframe
- Return type
Examples
import fugue as FugueWorkflow from fugue_ibis import run_ibis def func(backend): t = backend.table("tb") return t.mutate(b=t.a+1) dag = FugueWorkflow() df = dag.df([[0]], "a:int") result = run_ibis(func, tb=df) result.show()