fugue_ibis#
fugue_ibis.dataframe#
- class fugue_ibis.dataframe.IbisDataFrame(table, schema=None)[source]#
Bases:
DataFrame
DataFrame that wraps Ibis
Table
.- Parameters
rel –
DuckDBPyRelation
objecttable (TableExpr) –
schema (Any) –
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property columns: List[str]#
The column names of the dataframe
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property is_bounded: bool#
Whether this dataframe is bounded
- property is_local: bool#
Whether this dataframe is a local Dataset
- property native: TableExpr#
Ibis Table object
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
TableExpr
- property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue_ibis.execution_engine#
- class fugue_ibis.execution_engine.IbisExecutionEngine(conf)[source]#
Bases:
ExecutionEngine
The base execution engine using Ibis. Please read the ExecutionEngine Tutorial to understand this important Fugue concept
- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
- abstract create_non_ibis_execution_engine()[source]#
Create the execution engine that handles operations beyond SQL
- Return type
- dropna(df, how='any', thresh=None, subset=None)[source]#
Drop NA recods from dataframe
- Parameters
df (DataFrame) – DataFrame
how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.
thresh (Optional[int]) – int, drops rows that have less than thresh non-null values
subset (Optional[List[str]]) – list of columns to operate on
- Returns
DataFrame with NA records dropped
- Return type
- fillna(df, value, subset=None)[source]#
Fill
NULL
,NAN
,NAT
values in a dataframe- Parameters
df (DataFrame) – DataFrame
value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.
subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary
- Returns
DataFrame with NA records filled
- Return type
- property fs: FileSystem#
File system of this engine instance
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- property ibis_sql_engine: IbisSQLEngine#
- intersect(df1, df2, distinct=True)[source]#
Intersect
df1
anddf2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- join(df1, df2, how, on=None)[source]#
Join two dataframes
- Parameters
df1 (DataFrame) – the first dataframe
df2 (DataFrame) – the second dataframe
how (str) – can accept
semi
,left_semi
,anti
,left_anti
,inner
,left_outer
,right_outer
,full_outer
,cross
on (Optional[List[str]]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.
- Returns
the joined dataframe
- Return type
Note
Please read
get_join_schemas()
- property log: Logger#
Logger of this engine instance
- property non_ibis_engine: ExecutionEngine#
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- sample(df, n=None, frac=None, replace=False, seed=None)[source]#
Sample dataframe by number of rows or by fraction
- Parameters
df (DataFrame) – DataFrame
n (Optional[int]) – number of rows to sample, one and only one of
n
andfrac
must be setfrac (Optional[float]) – fraction [0,1] to sample, one and only one of
n
andfrac
must be setreplace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False
seed (Optional[int]) – seed for randomness, defaults to None
- Returns
sampled dataframe
- Return type
- subtract(df1, df2, distinct=True)[source]#
df1 - df2
- Parameters
- Returns
the unioned dataframe
- Return type
Note
Currently, the schema of
df1
anddf2
must be identical, or an exception will be thrown.
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.
- Parameters
df (DataFrame) – DataFrame
n (int) – number of rows to return
presort (str) – presort expression similar to partition presort
na_position (str) – position of null values during the presort. can accept
first
orlast
partition_spec (Optional[PartitionSpec]) – PartitionSpec to apply the take operation
- Returns
n rows of DataFrame per partition
- Return type
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue_ibis.execution_engine.IbisMapEngine(execution_engine)[source]#
Bases:
MapEngine
IbisExecutionEngine’s MapEngine, it is a wrapper of the map engine of
non_ibis_engine()
- Parameters
execution_engine (ExecutionEngine) – the execution engine this map engine will run on
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- map_bag(bag, map_func, partition_spec, on_init=None)[source]#
Apply a function to each partition after you partition the bag in a specified way.
- Parameters
df – input dataframe
map_func (Callable[[BagPartitionCursor, LocalBag], LocalBag]) – the function to apply on every logical partition
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, Bag], Any]]) – callback function when the physical partition is initializaing, defaults to None
bag (Bag) –
- Returns
the bag after the map operation
- Return type
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.
- class fugue_ibis.execution_engine.IbisSQLEngine(execution_engine)[source]#
Bases:
SQLEngine
Ibis SQL backend base implementation.
- Parameters
execution_engine (ExecutionEngine) – the execution engine this sql engine will run on
- abstract property backend: BaseBackend#
- load_table(table, **kwargs)[source]#
Load table as a dataframe
- Parameters
table (str) – the table name
kwargs (Any) –
- Returns
an engine compatible dataframe
- Return type
- query_to_table(statement, dfs)[source]#
- Parameters
statement (str) –
dfs (Dict[str, Any]) –
- Return type
TableExpr
- save_table(df, table, mode='overwrite', partition_spec=None, **kwargs)[source]#
Save the dataframe to a table
- Parameters
df (DataFrame) – the dataframe to save
table (str) – the table name
mode (str) – can accept
overwrite
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults None
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
- select(dfs, statement)[source]#
Execute select statement on the sql engine.
- Parameters
dfs (DataFrames) – a collection of dataframes that must have keys
statement (StructuredRawSQL) – the
SELECT
statement using thedfs
keys as tables.
- Returns
result of the
SELECT
statement- Return type
Examples
dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")])
Note
There can be tables that is not in
dfs
. For example you want to select from hive without input DataFrames:>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
- table_exists(table)[source]#
Whether the table exists
- Parameters
table (str) – the table name
- Returns
whether the table exists
- Return type
bool
- take(df, n, presort, na_position='last', partition_spec=None)[source]#
- Parameters
df (DataFrame) –
n (int) –
presort (str) –
na_position (str) –
partition_spec (Optional[PartitionSpec]) –
- Return type
fugue_ibis.extensions#
- fugue_ibis.extensions.as_fugue(expr, ibis_engine=None)[source]#
Convert a lazy ibis object to Fugue workflow dataframe
- Parameters
expr (TableExpr) – the actual instance should be LazyIbisObject
ibis_engine (Optional[Any]) –
- Returns
the Fugue workflow dataframe
- Return type
Examples
# non-magical approach import fugue as FugueWorkflow from fugue_ibis import as_ibis, as_fugue dag = FugueWorkflow() df1 = dag.df([[0]], "a:int") df2 = dag.df([[1]], "a:int") idf1 = as_ibis(df1) idf2 = as_ibis(df2) idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1) as_fugue(result).show()
# magical approach import fugue as FugueWorkflow import fugue_ibis # must import dag = FugueWorkflow() idf1 = dag.df([[0]], "a:int").as_ibis() idf2 = dag.df([[1]], "a:int").as_ibis() idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1).as_fugue() result.show()
Note
The magic is that when importing
fugue_ibis
, the functionsas_ibis
andas_fugue
are added to the correspondent classes so you can use them as if they are parts of the original classes.This is an idea similar to patching. Ibis uses this programming model a lot. Fugue provides this as an option.
Note
The returned object is not really a
TableExpr
, it’s a ‘super lazy’ object that will be translated intoTableExpr
at run time. This is because to compile an ibis execution graph, the input schemas must be known. However, in Fugue, this is not always true. For example if the previous step is to pivot a table, then the output schema can be known at runtime. So in order to be a part of Fugue, we need to be able to construct ibis expressions before knowing the input schemas.
- fugue_ibis.extensions.as_ibis(df)[source]#
Convert the Fugue workflow dataframe to an ibis table for ibis operations.
- Parameters
df (WorkflowDataFrame) – the Fugue workflow dataframe
- Returns
the object representing the ibis table
- Return type
TableExpr
Examples
# non-magical approach import fugue as FugueWorkflow from fugue_ibis import as_ibis, as_fugue dag = FugueWorkflow() df1 = dag.df([[0]], "a:int") df2 = dag.df([[1]], "a:int") idf1 = as_ibis(df1) idf2 = as_ibis(df2) idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1) as_fugue(result).show()
# magical approach import fugue as FugueWorkflow import fugue_ibis # must import dag = FugueWorkflow() idf1 = dag.df([[0]], "a:int").as_ibis() idf2 = dag.df([[1]], "a:int").as_ibis() idf3 = idf1.union(idf2) result = idf3.mutate(b=idf3.a+1).as_fugue() result.show()
Note
The magic is that when importing
fugue_ibis
, the functionsas_ibis
andas_fugue
are added to the correspondent classes so you can use them as if they are parts of the original classes.This is an idea similar to patching. Ibis uses this programming model a lot. Fugue provides this as an option.
Note
The returned object is not really a
TableExpr
, it’s a ‘super lazy’ object that will be translated intoTableExpr
at run time. This is because to compile an ibis execution graph, the input schemas must be known. However, in Fugue, this is not always true. For example if the previous step is to pivot a table, then the output schema can be known at runtime. So in order to be a part of Fugue, we need to be able to construct ibis expressions before knowing the input schemas.
- fugue_ibis.extensions.run_ibis(ibis_func, ibis_engine=None, **dfs)[source]#
Run an ibis workflow wrapped in
ibis_func
- Parameters
ibis_func (Callable[[BaseBackend], TableExpr]) – the function taking in an ibis backend, and returning an Ibis TableExpr
ibis_engine (Optional[Any]) – an object that together with
ExecutionEngine
can determineIbisEngine
, defaults to Nonedfs (WorkflowDataFrame) – dataframes in the same workflow
- Returns
the output workflow dataframe
- Return type
Examples
import fugue as FugueWorkflow from fugue_ibis import run_ibis def func(backend): t = backend.table("tb") return t.mutate(b=t.a+1) dag = FugueWorkflow() df = dag.df([[0]], "a:int") result = run_ibis(func, tb=df) result.show()