fugue_duckdb

fugue_duckdb.dask

class fugue_duckdb.dask.DuckDaskExecutionEngine(conf=None, connection=None, dask_client=None)[source]

Bases: DuckExecutionEngine

A hybrid engine of DuckDB and Dask. Most operations will be done by DuckDB, but for map, it will use Dask to fully utilize local CPUs. The engine can be used with a real Dask cluster, but practically, this is more useful for local process.

Parameters:
  • conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options

  • connection (DuckDBPyConnection | None) – DuckDB connection

  • dask_client (Client | None)

broadcast(df)[source]

Broadcast the dataframe to all workers for a distributed computing framework

Parameters:

df (DataFrame) – the input dataframe

Returns:

the broadcasted dataframe

Return type:

DataFrame

convert_yield_dataframe(df, as_local)[source]

Convert a yield dataframe to a dataframe that can be used after this execution engine stops.

Parameters:
  • df (DataFrame) – DataFrame

  • as_local (bool) – whether yield a local dataframe

Returns:

another DataFrame that can be used after this execution engine stops

Return type:

DataFrame

Note

By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.

For example, if you custom a spark engine where you start and stop the spark session in this engine’s start_engine() and stop_engine(), then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.

create_default_map_engine()[source]

Default MapEngine if user doesn’t specify

Return type:

MapEngine

property dask_client: Client
get_current_parallelism()[source]

Get the current number of parallelism of this engine

Return type:

int

persist(df, lazy=False, **kwargs)[source]

Force materializing and caching the dataframe

Parameters:
  • df (DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns:

the persisted dataframe

Return type:

DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

repartition(df, partition_spec)[source]

Partition the input dataframe using partition_spec.

Parameters:
  • df (DataFrame) – input dataframe

  • partition_spec (PartitionSpec) – how you want to partition the dataframe

Returns:

repartitioned dataframe

Return type:

DataFrame

Note

Before implementing please read the Partition Tutorial

save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]

Save dataframe to a persistent storage

Parameters:
  • df (DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type:

None

For more details and examples, read Load & Save.

to_df(df, schema=None)[source]

Convert a data structure to this engine compatible DataFrame

Parameters:
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Any | None) – Schema like object, defaults to None

  • df (Any)

Returns:

engine compatible dataframe

Return type:

DuckDataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

fugue_duckdb.dataframe

class fugue_duckdb.dataframe.DuckDataFrame(rel)[source]

Bases: LocalBoundedDataFrame

DataFrame that wraps DuckDB DuckDBPyRelation.

Parameters:

rel (DuckDBPyRelation) – DuckDBPyRelation object

property alias: str
alter_columns(columns)[source]

Change column types

Parameters:

columns (Any) – Schema like object, all columns should be contained by the dataframe schema

Returns:

a new dataframe with altered columns, the order of the original schema will not change

Return type:

DataFrame

as_array(columns=None, type_safe=False)[source]

Convert to 2-dimensional native python array

Parameters:
  • columns (List[str] | None) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns:

2-dimensional native python array

Return type:

List[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_array_iterable(columns=None, type_safe=False)[source]

Convert to iterable of native python arrays

Parameters:
  • columns (List[str] | None) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns:

iterable of native python arrays

Return type:

Iterable[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_arrow(type_safe=False)[source]

Convert to pyArrow DataFrame

Parameters:

type_safe (bool)

Return type:

Table

as_dict_iterable(columns=None)[source]

Convert to iterable of python dicts

Parameters:

columns (List[str] | None) – columns to extract, defaults to None

Returns:

iterable of python dicts

Return type:

Iterable[Dict[str, Any]]

Note

The default implementation enforces type_safe True

as_dicts(columns=None)[source]

Convert to a list of python dicts

Parameters:

columns (List[str] | None) – columns to extract, defaults to None

Returns:

a list of python dicts

Return type:

List[Dict[str, Any]]

Note

The default implementation enforces type_safe True

as_local_bounded()[source]

Always True because it’s a bounded dataframe

Return type:

LocalBoundedDataFrame

as_pandas()[source]

Convert to pandas DataFrame

Return type:

DataFrame

count()[source]

Get number of rows of this dataframe

Return type:

int

property empty: bool

Whether this dataframe is empty

head(n, columns=None)[source]

Get first n rows of the dataframe as a new local bounded dataframe

Parameters:
  • n (int) – number of rows

  • columns (List[str] | None) – selected columns, defaults to None (all columns)

Returns:

a local bounded dataframe

Return type:

LocalBoundedDataFrame

property native: DuckDBPyRelation

DuckDB relation object

native_as_df()[source]

The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its native_as_df should be either a pandas dataframe or an arrow dataframe.

Return type:

DuckDBPyRelation

peek_array()[source]

Peek the first row of the dataframe as array

Raises:

FugueDatasetEmptyError – if it is empty

Return type:

List[Any]

rename(columns)[source]

Rename the dataframe using a mapping dict

Parameters:

columns (Dict[str, str]) – key: the original column name, value: the new name

Returns:

a new dataframe with the new names

Return type:

DataFrame

fugue_duckdb.execution_engine

class fugue_duckdb.execution_engine.DuckDBEngine(execution_engine)[source]

Bases: SQLEngine

DuckDB SQL backend implementation.

Parameters:

execution_engine (ExecutionEngine) – the execution engine this sql engine will run on

property dialect: str | None
property is_distributed: bool

Whether this engine is a distributed engine

load_table(table, **kwargs)[source]

Load table as a dataframe

Parameters:
  • table (str) – the table name

  • kwargs (Any)

Returns:

an engine compatible dataframe

Return type:

DataFrame

save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]

Save the dataframe to a table

Parameters:
  • df (DataFrame) – the dataframe to save

  • table (str) – the table name

  • mode (str) – can accept overwrite, error, defaults to “overwrite”

  • partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults None

  • kwargs (Any) – parameters to pass to the underlying framework

Return type:

None

select(dfs, statement)[source]

Execute select statement on the sql engine.

Parameters:
  • dfs (DataFrames) – a collection of dataframes that must have keys

  • statement (StructuredRawSQL) – the SELECT statement using the dfs keys as tables.

Returns:

result of the SELECT statement

Return type:

DataFrame

Examples

dfs = DataFrames(a=df1, b=df2)
sql_engine.select(
    dfs,
    [(False, "SELECT * FROM "),
     (True,"a"),
     (False," UNION SELECT * FROM "),
     (True,"b")])

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
table_exists(table)[source]

Whether the table exists

Parameters:

table (str) – the table name

Returns:

whether the table exists

Return type:

bool

class fugue_duckdb.execution_engine.DuckExecutionEngine(conf=None, connection=None)[source]

Bases: ExecutionEngine

The execution engine using DuckDB. Please read the ExecutionEngine Tutorial to understand this important Fugue concept

Parameters:
  • conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options

  • connection (DuckDBPyConnection | None) – DuckDB connection

broadcast(df)[source]

Broadcast the dataframe to all workers for a distributed computing framework

Parameters:

df (DataFrame) – the input dataframe

Returns:

the broadcasted dataframe

Return type:

DataFrame

property connection: DuckDBPyConnection
convert_yield_dataframe(df, as_local)[source]

Convert a yield dataframe to a dataframe that can be used after this execution engine stops.

Parameters:
  • df (DataFrame) – DataFrame

  • as_local (bool) – whether yield a local dataframe

Returns:

another DataFrame that can be used after this execution engine stops

Return type:

DataFrame

Note

By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.

For example, if you custom a spark engine where you start and stop the spark session in this engine’s start_engine() and stop_engine(), then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.

create_default_map_engine()[source]

Default MapEngine if user doesn’t specify

Return type:

MapEngine

create_default_sql_engine()[source]

Default SQLEngine if user doesn’t specify

Return type:

SQLEngine

distinct(df)[source]

Equivalent to SELECT DISTINCT * FROM df

Parameters:

df (DataFrame) – dataframe

Returns:

[description]

Return type:

DataFrame

dropna(df, how='any', thresh=None, subset=None)[source]

Drop NA recods from dataframe

Parameters:
  • df (DataFrame) – DataFrame

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (int | None) – int, drops rows that have less than thresh non-null values

  • subset (List[str] | None) – list of columns to operate on

Returns:

DataFrame with NA records dropped

Return type:

DataFrame

fillna(df, value, subset=None)[source]

Fill NULL, NAN, NAT values in a dataframe

Parameters:
  • df (DataFrame) – DataFrame

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (List[str] | None) – list of columns to operate on. ignored if value is a dictionary

Returns:

DataFrame with NA records filled

Return type:

DataFrame

get_current_parallelism()[source]

Get the current number of parallelism of this engine

Return type:

int

intersect(df1, df2, distinct=True)[source]

Intersect df1 and df2

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for INTERSECT (== INTERSECT DISTINCT), false for INTERSECT ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

property is_distributed: bool

Whether this engine is a distributed engine

join(df1, df2, how, on=None)[source]

Join two dataframes

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (List[str] | None) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

Returns:

the joined dataframe

Return type:

DataFrame

Note

Please read get_join_schemas()

load_df(path, format_hint=None, columns=None, **kwargs)[source]

Load dataframe from persistent storage

Parameters:
  • path (str | List[str]) – the path to the dataframe

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Any | None) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

Returns:

an engine compatible dataframe

Return type:

LocalBoundedDataFrame

For more details and examples, read Zip & Comap.

property log: Logger

Logger of this engine instance

persist(df, lazy=False, **kwargs)[source]

Force materializing and caching the dataframe

Parameters:
  • df (DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns:

the persisted dataframe

Return type:

DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

repartition(df, partition_spec)[source]

Partition the input dataframe using partition_spec.

Parameters:
  • df (DataFrame) – input dataframe

  • partition_spec (PartitionSpec) – how you want to partition the dataframe

Returns:

repartitioned dataframe

Return type:

DataFrame

Note

Before implementing please read the Partition Tutorial

sample(df, n=None, frac=None, replace=False, seed=None)[source]

Sample dataframe by number of rows or by fraction

Parameters:
  • df (DataFrame) – DataFrame

  • n (int | None) – number of rows to sample, one and only one of n and frac must be set

  • frac (float | None) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (int | None) – seed for randomness, defaults to None

Returns:

sampled dataframe

Return type:

DataFrame

save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]

Save dataframe to a persistent storage

Parameters:
  • df (DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Any | None) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (PartitionSpec | None) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type:

None

For more details and examples, read Load & Save.

stop_engine()[source]

Custom logic to stop the execution engine, defaults to no operation

Return type:

None

subtract(df1, df2, distinct=True)[source]

df1 - df2

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for EXCEPT (== EXCEPT DISTINCT), false for EXCEPT ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

take(df, n, presort, na_position='last', partition_spec=None)[source]

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters:
  • df (DataFrame) – DataFrame

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition_spec (PartitionSpec | None) – PartitionSpec to apply the take operation

Returns:

n rows of DataFrame per partition

Return type:

DataFrame

to_df(df, schema=None)[source]

Convert a data structure to this engine compatible DataFrame

Parameters:
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Any | None) – Schema like object, defaults to None

  • df (Any)

Returns:

engine compatible dataframe

Return type:

DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

union(df1, df2, distinct=True)[source]

Join two dataframes

Parameters:
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for UNION (== UNION DISTINCT), false for UNION ALL

Returns:

the unioned dataframe

Return type:

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

fugue_duckdb.ibis_engine

fugue_duckdb.registry