fugue_ibis#

fugue_ibis.dataframe#

class fugue_ibis.dataframe.IbisDataFrame(table, schema=None)[source]#

Bases: DataFrame

DataFrame that wraps Ibis Table.

Parameters
  • relDuckDBPyRelation object

  • table (TableExpr) –

  • schema (Any) –

alter_columns(columns)[source]#

Change column types

Parameters

columns (Any) – Schema like object, all columns should be contained by the dataframe schema

Returns

a new dataframe with altered columns, the order of the original schema will not change

Return type

DataFrame

as_array(columns=None, type_safe=False)[source]#

Convert to 2-dimensional native python array

Parameters
  • columns (Optional[List[str]]) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns

2-dimensional native python array

Return type

List[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_array_iterable(columns=None, type_safe=False)[source]#

Convert to iterable of native python arrays

Parameters
  • columns (Optional[List[str]]) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns

iterable of native python arrays

Return type

Iterable[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_arrow(type_safe=False)[source]#

Convert to pyArrow DataFrame

Parameters

type_safe (bool) –

Return type

Table

as_local_bounded()[source]#

Convert this dataframe to a LocalBoundedDataFrame

Return type

LocalBoundedDataFrame

as_pandas()[source]#

Convert to pandas DataFrame

Return type

DataFrame

property columns: List[str]#

The column names of the dataframe

count()[source]#

Get number of rows of this dataframe

Return type

int

property empty: bool#

Whether this dataframe is empty

head(n, columns=None)[source]#

Get first n rows of the dataframe as a new local bounded dataframe

Parameters
  • n (int) – number of rows

  • columns (Optional[List[str]]) – selected columns, defaults to None (all columns)

Returns

a local bounded dataframe

Return type

LocalBoundedDataFrame

property is_bounded: bool#

Whether this dataframe is bounded

property is_local: bool#

Whether this dataframe is a local Dataset

property native: TableExpr#

Ibis Table object

native_as_df()[source]#

The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its native_as_df should be either a pandas dataframe or an arrow dataframe.

Return type

TableExpr

property num_partitions: int#

Number of physical partitions of this dataframe. Please read the Partition Tutorial

peek_array()[source]#

Peek the first row of the dataframe as array

Raises

FugueDatasetEmptyError – if it is empty

Return type

List[Any]

rename(columns)[source]#

Rename the dataframe using a mapping dict

Parameters

columns (Dict[str, str]) – key: the original column name, value: the new name

Returns

a new dataframe with the new names

Return type

DataFrame

abstract to_sql()[source]#

Compile IbisTable to SQL

Return type

str

fugue_ibis.execution_engine#

class fugue_ibis.execution_engine.IbisExecutionEngine(conf)[source]#

Bases: ExecutionEngine

The base execution engine using Ibis. Please read the ExecutionEngine Tutorial to understand this important Fugue concept

Parameters

conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options

broadcast(df)[source]#

Broadcast the dataframe to all workers for a distributed computing framework

Parameters

df (DataFrame) – the input dataframe

Returns

the broadcasted dataframe

Return type

DataFrame

create_default_map_engine()[source]#

Default MapEngine if user doesn’t specify

Return type

MapEngine

abstract create_non_ibis_execution_engine()[source]#

Create the execution engine that handles operations beyond SQL

Return type

ExecutionEngine

distinct(df)[source]#

Equivalent to SELECT DISTINCT * FROM df

Parameters

df (DataFrame) – dataframe

Returns

[description]

Return type

DataFrame

dropna(df, how='any', thresh=None, subset=None)[source]#

Drop NA recods from dataframe

Parameters
  • df (DataFrame) – DataFrame

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (Optional[int]) – int, drops rows that have less than thresh non-null values

  • subset (Optional[List[str]]) – list of columns to operate on

Returns

DataFrame with NA records dropped

Return type

DataFrame

fillna(df, value, subset=None)[source]#

Fill NULL, NAN, NAT values in a dataframe

Parameters
  • df (DataFrame) – DataFrame

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary

Returns

DataFrame with NA records filled

Return type

DataFrame

property fs: FileSystem#

File system of this engine instance

get_current_parallelism()[source]#

Get the current number of parallelism of this engine

Return type

int

property ibis_sql_engine: IbisSQLEngine#
intersect(df1, df2, distinct=True)[source]#

Intersect df1 and df2

Parameters
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for INTERSECT (== INTERSECT DISTINCT), false for INTERSECT ALL

Returns

the unioned dataframe

Return type

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

is_non_ibis(ds)[source]#
Parameters

ds (Any) –

Return type

bool

join(df1, df2, how, on=None)[source]#

Join two dataframes

Parameters
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

Returns

the joined dataframe

Return type

DataFrame

Note

Please read get_join_schemas()

property log: Logger#

Logger of this engine instance

property non_ibis_engine: ExecutionEngine#
persist(df, lazy=False, **kwargs)[source]#

Force materializing and caching the dataframe

Parameters
  • df (DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns

the persisted dataframe

Return type

DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

repartition(df, partition_spec)[source]#

Partition the input dataframe using partition_spec.

Parameters
  • df (DataFrame) – input dataframe

  • partition_spec (PartitionSpec) – how you want to partition the dataframe

Returns

repartitioned dataframe

Return type

DataFrame

Note

Before implementing please read the Partition Tutorial

sample(df, n=None, frac=None, replace=False, seed=None)[source]#

Sample dataframe by number of rows or by fraction

Parameters
  • df (DataFrame) – DataFrame

  • n (Optional[int]) – number of rows to sample, one and only one of n and frac must be set

  • frac (Optional[float]) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (Optional[int]) – seed for randomness, defaults to None

Returns

sampled dataframe

Return type

DataFrame

subtract(df1, df2, distinct=True)[source]#

df1 - df2

Parameters
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for EXCEPT (== EXCEPT DISTINCT), false for EXCEPT ALL

Returns

the unioned dataframe

Return type

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

take(df, n, presort, na_position='last', partition_spec=None)[source]#

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters
  • df (DataFrame) – DataFrame

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation

Returns

n rows of DataFrame per partition

Return type

DataFrame

to_df(df, schema=None)[source]#

Convert a data structure to this engine compatible DataFrame

Parameters
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Optional[Any]) – Schema like object, defaults to None

  • df (Any) –

Returns

engine compatible dataframe

Return type

DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

union(df1, df2, distinct=True)[source]#

Join two dataframes

Parameters
  • df1 (DataFrame) – the first dataframe

  • df2 (DataFrame) – the second dataframe

  • distinct (bool) – true for UNION (== UNION DISTINCT), false for UNION ALL

Returns

the unioned dataframe

Return type

DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

class fugue_ibis.execution_engine.IbisMapEngine(execution_engine)[source]#

Bases: MapEngine

IbisExecutionEngine’s MapEngine, it is a wrapper of the map engine of non_ibis_engine()

Parameters

execution_engine (ExecutionEngine) – the execution engine this map engine will run on

property execution_engine_constraint: Type[ExecutionEngine]#

This defines the required ExecutionEngine type of this facet

Returns

a subtype of ExecutionEngine

property is_distributed: bool#

Whether this engine is a distributed engine

map_bag(bag, map_func, partition_spec, on_init=None)[source]#

Apply a function to each partition after you partition the bag in a specified way.

Parameters
  • df – input dataframe

  • map_func (Callable[[BagPartitionCursor, LocalBag], LocalBag]) – the function to apply on every logical partition

  • partition_spec (PartitionSpec) – partition specification

  • on_init (Optional[Callable[[int, Bag], Any]]) – callback function when the physical partition is initializaing, defaults to None

  • bag (Bag) –

Returns

the bag after the map operation

Return type

Bag

map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#

Apply a function to each partition after you partition the dataframe in a specified way.

Parameters
  • df (DataFrame) – input dataframe

  • map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition

  • output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this

  • partition_spec (PartitionSpec) – partition specification

  • on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None

  • map_func_format_hint (Optional[str]) – the preferred data format for map_func, it can be pandas, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.

Returns

the dataframe after the map operation

Return type

DataFrame

Note

Before implementing, you must read this to understand what map is used for and how it should work.

class fugue_ibis.execution_engine.IbisSQLEngine(execution_engine)[source]#

Bases: SQLEngine

Ibis SQL backend base implementation.

Parameters

execution_engine (ExecutionEngine) – the execution engine this sql engine will run on

abstract property backend: BaseBackend#
distinct(df)[source]#
Parameters

df (DataFrame) –

Return type

DataFrame

dropna(df, how='any', thresh=None, subset=None)[source]#
Parameters
  • df (DataFrame) –

  • how (str) –

  • thresh (Optional[int]) –

  • subset (Optional[List[str]]) –

Return type

DataFrame

abstract encode_column_name(name)[source]#
Parameters

name (str) –

Return type

str

fillna(df, value, subset=None)[source]#
Parameters
  • df (DataFrame) –

  • value (Any) –

  • subset (Optional[List[str]]) –

Return type

DataFrame

get_temp_table_name()[source]#
Return type

str

intersect(df1, df2, distinct=True)[source]#
Parameters
Return type

DataFrame

join(df1, df2, how, on=None)[source]#
Parameters
Return type

DataFrame

load_table(table, **kwargs)[source]#

Load table as a dataframe

Parameters
  • table (str) – the table name

  • kwargs (Any) –

Returns

an engine compatible dataframe

Return type

DataFrame

abstract persist(df, lazy=False, **kwargs)[source]#
Parameters
  • df (DataFrame) –

  • lazy (bool) –

  • kwargs (Any) –

Return type

DataFrame

query_to_table(statement, dfs)[source]#
Parameters
  • statement (str) –

  • dfs (Dict[str, Any]) –

Return type

TableExpr

abstract sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Parameters
  • df (DataFrame) –

  • n (Optional[int]) –

  • frac (Optional[float]) –

  • replace (bool) –

  • seed (Optional[int]) –

Return type

DataFrame

save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]#

Save the dataframe to a table

Parameters
  • df (DataFrame) – the dataframe to save

  • table (str) – the table name

  • mode (str) – can accept overwrite, error, defaults to “overwrite”

  • partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults None

  • kwargs (Any) – parameters to pass to the underlying framework

Return type

None

select(dfs, statement)[source]#

Execute select statement on the sql engine.

Parameters
  • dfs (DataFrames) – a collection of dataframes that must have keys

  • statement (StructuredRawSQL) – the SELECT statement using the dfs keys as tables.

Returns

result of the SELECT statement

Return type

DataFrame

Examples

dfs = DataFrames(a=df1, b=df2)
sql_engine.select(
    dfs,
    [(False, "SELECT * FROM "),
     (True,"a"),
     (False," UNION SELECT * FROM "),
     (True,"b")])

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
subtract(df1, df2, distinct=True)[source]#
Parameters
Return type

DataFrame

table_exists(table)[source]#

Whether the table exists

Parameters

table (str) – the table name

Returns

whether the table exists

Return type

bool

take(df, n, presort, na_position='last', partition_spec=None)[source]#
Parameters
  • df (DataFrame) –

  • n (int) –

  • presort (str) –

  • na_position (str) –

  • partition_spec (Optional[PartitionSpec]) –

Return type

DataFrame

union(df1, df2, distinct=True)[source]#
Parameters
Return type

DataFrame

fugue_ibis.extensions#

fugue_ibis.extensions.as_fugue(expr, ibis_engine=None)[source]#

Convert a lazy ibis object to Fugue workflow dataframe

Parameters
  • expr (TableExpr) – the actual instance should be LazyIbisObject

  • ibis_engine (Optional[Any]) –

Returns

the Fugue workflow dataframe

Return type

WorkflowDataFrame

Examples

# non-magical approach
import fugue as FugueWorkflow
from fugue_ibis import as_ibis, as_fugue

dag = FugueWorkflow()
df1 = dag.df([[0]], "a:int")
df2 = dag.df([[1]], "a:int")
idf1 = as_ibis(df1)
idf2 = as_ibis(df2)
idf3 = idf1.union(idf2)
result = idf3.mutate(b=idf3.a+1)
as_fugue(result).show()
# magical approach
import fugue as FugueWorkflow
import fugue_ibis  # must import

dag = FugueWorkflow()
idf1 = dag.df([[0]], "a:int").as_ibis()
idf2 = dag.df([[1]], "a:int").as_ibis()
idf3 = idf1.union(idf2)
result = idf3.mutate(b=idf3.a+1).as_fugue()
result.show()

Note

The magic is that when importing fugue_ibis, the functions as_ibis and as_fugue are added to the correspondent classes so you can use them as if they are parts of the original classes.

This is an idea similar to patching. Ibis uses this programming model a lot. Fugue provides this as an option.

Note

The returned object is not really a TableExpr, it’s a ‘super lazy’ object that will be translated into TableExpr at run time. This is because to compile an ibis execution graph, the input schemas must be known. However, in Fugue, this is not always true. For example if the previous step is to pivot a table, then the output schema can be known at runtime. So in order to be a part of Fugue, we need to be able to construct ibis expressions before knowing the input schemas.

fugue_ibis.extensions.as_ibis(df)[source]#

Convert the Fugue workflow dataframe to an ibis table for ibis operations.

Parameters

df (WorkflowDataFrame) – the Fugue workflow dataframe

Returns

the object representing the ibis table

Return type

TableExpr

Examples

# non-magical approach
import fugue as FugueWorkflow
from fugue_ibis import as_ibis, as_fugue

dag = FugueWorkflow()
df1 = dag.df([[0]], "a:int")
df2 = dag.df([[1]], "a:int")
idf1 = as_ibis(df1)
idf2 = as_ibis(df2)
idf3 = idf1.union(idf2)
result = idf3.mutate(b=idf3.a+1)
as_fugue(result).show()
# magical approach
import fugue as FugueWorkflow
import fugue_ibis  # must import

dag = FugueWorkflow()
idf1 = dag.df([[0]], "a:int").as_ibis()
idf2 = dag.df([[1]], "a:int").as_ibis()
idf3 = idf1.union(idf2)
result = idf3.mutate(b=idf3.a+1).as_fugue()
result.show()

Note

The magic is that when importing fugue_ibis, the functions as_ibis and as_fugue are added to the correspondent classes so you can use them as if they are parts of the original classes.

This is an idea similar to patching. Ibis uses this programming model a lot. Fugue provides this as an option.

Note

The returned object is not really a TableExpr, it’s a ‘super lazy’ object that will be translated into TableExpr at run time. This is because to compile an ibis execution graph, the input schemas must be known. However, in Fugue, this is not always true. For example if the previous step is to pivot a table, then the output schema can be known at runtime. So in order to be a part of Fugue, we need to be able to construct ibis expressions before knowing the input schemas.

fugue_ibis.extensions.run_ibis(ibis_func, ibis_engine=None, **dfs)[source]#

Run an ibis workflow wrapped in ibis_func

Parameters
  • ibis_func (Callable[[BaseBackend], TableExpr]) – the function taking in an ibis backend, and returning an Ibis TableExpr

  • ibis_engine (Optional[Any]) – an object that together with ExecutionEngine can determine IbisEngine , defaults to None

  • dfs (WorkflowDataFrame) – dataframes in the same workflow

Returns

the output workflow dataframe

Return type

WorkflowDataFrame

Examples

import fugue as FugueWorkflow
from fugue_ibis import run_ibis

def func(backend):
    t = backend.table("tb")
    return t.mutate(b=t.a+1)

dag = FugueWorkflow()
df = dag.df([[0]], "a:int")
result = run_ibis(func, tb=df)
result.show()