fugue_duckdb
fugue_duckdb.dask
- class fugue_duckdb.dask.DuckDaskExecutionEngine(conf=None, connection=None, dask_client=None)[source]
Bases:
DuckExecutionEngine
A hybrid engine of DuckDB and Dask. Most operations will be done by DuckDB, but for
map
, it will use Dask to fully utilize local CPUs. The engine can be used with a real Dask cluster, but practically, this is more useful for local process.- Parameters:
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
connection (DuckDBPyConnection | None) – DuckDB connection
dask_client (Client | None)
- convert_yield_dataframe(df, as_local)[source]
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters:
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns:
another DataFrame that can be used after this execution engine stops
- Return type:
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- get_current_parallelism()[source]
Get the current number of parallelism of this engine
- Return type:
int
- persist(df, lazy=False, **kwargs)[source]
Force materializing and caching the dataframe
- Parameters:
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns:
the persisted dataframe
- Return type:
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]
Partition the input dataframe using
partition_spec
.- Parameters:
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns:
repartitioned dataframe
- Return type:
Note
Before implementing please read the Partition Tutorial
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]
Save dataframe to a persistent storage
- Parameters:
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type:
None
For more details and examples, read Load & Save.
- to_df(df, schema=None)[source]
Convert a data structure to this engine compatible DataFrame
- Parameters:
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Any | None) – Schema like object, defaults to None
df (Any)
- Returns:
engine compatible dataframe
- Return type:
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
fugue_duckdb.dataframe
- class fugue_duckdb.dataframe.DuckDataFrame(rel)[source]
Bases:
LocalBoundedDataFrame
DataFrame that wraps DuckDB
DuckDBPyRelation
.- Parameters:
rel (DuckDBPyRelation) –
DuckDBPyRelation
object
- property alias: str
- alter_columns(columns)[source]
Change column types
- Parameters:
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns:
a new dataframe with altered columns, the order of the original schema will not change
- Return type:
- as_array(columns=None, type_safe=False)[source]
Convert to 2-dimensional native python array
- Parameters:
columns (List[str] | None) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns:
2-dimensional native python array
- Return type:
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]
Convert to iterable of native python arrays
- Parameters:
columns (List[str] | None) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns:
iterable of native python arrays
- Return type:
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]
Convert to pyArrow DataFrame
- Parameters:
type_safe (bool)
- Return type:
- as_dict_iterable(columns=None)[source]
Convert to iterable of python dicts
- Parameters:
columns (List[str] | None) – columns to extract, defaults to None
- Returns:
iterable of python dicts
- Return type:
Iterable[Dict[str, Any]]
Note
The default implementation enforces
type_safe
True
- as_dicts(columns=None)[source]
Convert to a list of python dicts
- Parameters:
columns (List[str] | None) – columns to extract, defaults to None
- Returns:
a list of python dicts
- Return type:
List[Dict[str, Any]]
Note
The default implementation enforces
type_safe
True
- property empty: bool
Whether this dataframe is empty
- head(n, columns=None)[source]
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters:
n (int) – number of rows
columns (List[str] | None) – selected columns, defaults to None (all columns)
- Returns:
a local bounded dataframe
- Return type:
- property native: DuckDBPyRelation
DuckDB relation object
- native_as_df()[source]
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type:
DuckDBPyRelation
- peek_array()[source]
Peek the first row of the dataframe as array
- Raises:
FugueDatasetEmptyError – if it is empty
- Return type:
List[Any]
fugue_duckdb.execution_engine
- class fugue_duckdb.execution_engine.DuckDBEngine(execution_engine)[source]
Bases:
SQLEngine
DuckDB SQL backend implementation.
- Parameters:
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- property dialect: str | None
- property is_distributed: bool
Whether this engine is a distributed engine
- load_table(table, **kwargs)[source]
Load table as a dataframe
- Parameters:
table (str) – the table name
kwargs (Any)
- Returns:
an engine compatible dataframe
- Return type:
- save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]
Save the dataframe to a table
- Parameters:
df (DataFrame) – the dataframe to save
table (str) – the table name
mode (str) – can accept
overwrite
,error
, defaults to “overwrite”partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults None
kwargs (Any) – parameters to pass to the underlying framework
- Return type:
None
- select(dfs, statement)[source]
Execute select statement on the sql engine.
- Parameters:
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns:
result of the
SELECT
statement- Return type:
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- class fugue_duckdb.execution_engine.DuckExecutionEngine(conf=None, connection=None)[source]
Bases:
ExecutionEngine
The execution engine using DuckDB. Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters:
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
connection (DuckDBPyConnection | None) – DuckDB connection
- property connection: DuckDBPyConnection
- convert_yield_dataframe(df, as_local)[source]
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters:
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns:
another DataFrame that can be used after this execution engine stops
- Return type:
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- dropna(df, how='any', thresh=None, subset=None)[source]
Drop NA recods from dataframe
- Parameters:
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (int | None) – int, drops rows that have less than thresh non-null values
subset (List[str] | None) – list of columns to operate on
- Returns:
DataFrame with NA records dropped
- Return type:
- fillna(df, value, subset=None)[source]
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters:
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary
- Returns:
DataFrame with NA records filled
- Return type:
- get_current_parallelism()[source]
Get the current number of parallelism of this engine
- Return type:
int
- intersect(df1, df2, distinct=True)[source]
Intersect
df1
anddf2
- Parameters:
- Returns:
the unioned dataframe
- Return type:
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- property is_distributed: bool
Whether this engine is a distributed engine
- join(df1, df2, how, on=None)[source]
Join two dataframes
- Parameters:
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns:
the joined dataframe
- Return type:
Note
Please read
get_join_schemas()
- load_df(path, format_hint=None, columns=None, **kwargs)[source]
Load dataframe from persistent storage
- Parameters:
path (str | List[str]) – the path to the dataframe
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Any | None) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns:
an engine compatible dataframe
- Return type:
For more details and examples, read Zip & Comap.
- property log: Logger
Logger of this engine instance
- persist(df, lazy=False, **kwargs)[source]
Force materializing and caching the dataframe
- Parameters:
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns:
the persisted dataframe
- Return type:
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]
Partition the input dataframe using
partition_spec
.- Parameters:
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns:
repartitioned dataframe
- Return type:
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]
Sample dataframe by number of rows or by fraction
- Parameters:
df (DataFrame) – DataFrame
n (int | None) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (float | None) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (int | None) – seed for randomness, defaults to None
- Returns:
sampled dataframe
- Return type:
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]
Save dataframe to a persistent storage
- Parameters:
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Any | None) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type:
None
For more details and examples, read Load & Save.
- stop_engine()[source]
Custom logic to stop the execution engine, defaults to no operation
- Return type:
None
- subtract(df1, df2, distinct=True)[source]
df1 - df2
- Parameters:
- Returns:
the unioned dataframe
- Return type:
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters:
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (PartitionSpec | None) – PartitionSpec to apply the take operation
- Returns:
n rows of DataFrame per partition
- Return type:
- to_df(df, schema=None)[source]
Convert a data structure to this engine compatible DataFrame
- Parameters:
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Any | None) – Schema like object, defaults to None
df (Any)
- Returns:
engine compatible dataframe
- Return type:
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything