fugue_dask#

fugue_dask.dataframe#

class fugue_dask.dataframe.DaskDataFrame(df=None, schema=None, metadata=None, num_partitions=0, type_safe=True)[source]#

Bases: fugue.dataframe.dataframe.DataFrame

DataFrame that wraps Dask DataFrame. Please also read the DataFrame Tutorial to understand this Fugue concept

Parameters
  • df (Any) – dask.dataframe.DataFrame, pandas DataFrame or list or iterable of arrays

  • schema (Any) – Schema like object or pyspark.sql.types.StructType, defaults to None.

  • metadata (Any) – Parameters like object, defaults to None

  • num_partitions (int) – initial number of partitions for the dask dataframe defaults to 0 to get the value from fugue.dask.dataframe.default.partitions

  • type_safe – whether to cast input data to ensure type safe, defaults to True

Note

For dask.dataframe.DataFrame, schema must be None

alter_columns(columns)[source]#

Change column types

Parameters

columns (Any) – Schema like object, all columns should be contained by the dataframe schema

Returns

a new dataframe with altered columns, the order of the original schema will not change

Return type

fugue.dataframe.dataframe.DataFrame

as_array(columns=None, type_safe=False)[source]#

Convert to 2-dimensional native python array

Parameters
  • columns (Optional[List[str]]) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns

2-dimensional native python array

Return type

List[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_array_iterable(columns=None, type_safe=False)[source]#

Convert to iterable of native python arrays

Parameters
  • columns (Optional[List[str]]) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns

iterable of native python arrays

Return type

Iterable[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_local()[source]#

Convert this dataframe to a LocalDataFrame

Return type

fugue.dataframe.dataframe.LocalDataFrame

as_pandas()[source]#

Convert to pandas DataFrame

Return type

pandas.core.frame.DataFrame

count()[source]#

Get number of rows of this dataframe

Return type

int

property empty: bool#

Whether this dataframe is empty

head(n, columns=None)[source]#

Get first n rows of the dataframe as 2-dimensional array :param n: number of rows :param columns: selected columns, defaults to None (all columns) :return: 2-dimensional array

Parameters
  • n (int) –

  • columns (Optional[List[str]]) –

Return type

List[Any]

property is_bounded: bool#

Whether this dataframe is bounded

property is_local: bool#

Whether this dataframe is a LocalDataFrame

property native: dask.dataframe.core.DataFrame#

The wrapped Dask DataFrame

Return type

dask.dataframe.DataFrame

property num_partitions: int#

Number of physical partitions of this dataframe. Please read the Partition Tutorial

peek_array()[source]#

Peek the first row of the dataframe as array

Raises

FugueDataFrameEmptyError – if it is empty

Return type

Any

persist(**kwargs)[source]#
Parameters

kwargs (Any) –

Return type

fugue_dask.dataframe.DaskDataFrame

rename(columns)[source]#

Rename the dataframe using a mapping dict

Parameters

columns (Dict[str, str]) – key: the original column name, value: the new name

Returns

a new dataframe with the new names

Return type

fugue.dataframe.dataframe.DataFrame

fugue_dask.execution_engine#

class fugue_dask.execution_engine.DaskExecutionEngine(dask_client=None, conf=None)[source]#

Bases: fugue.execution.execution_engine.ExecutionEngine

The execution engine based on Dask.

Please read the ExecutionEngine Tutorial to understand this important Fugue concept

Parameters
  • dask_client (Optional[distributed.client.Client]) – Dask distributed client, defaults to None. If None, then it will try to get the current active global client. If there is no active client, it will create and use a global Client(processes=True)

  • conf (Any) – Parameters like object defaults to None, read the Fugue Configuration Tutorial to learn Fugue specific options

Note

You should setup Dask single machine or distributed environment in the common way. Before initializing DaskExecutionEngine

broadcast(df)[source]#

Broadcast the dataframe to all workers for a distributed computing framework

Parameters

df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

Returns

the broadcasted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

property dask_client: distributed.client.Client#

The Dask Client associated with this engine

property default_sql_engine: fugue.execution.execution_engine.SQLEngine#

Default SQLEngine if user doesn’t specify

distinct(df, metadata=None)[source]#

Equivalent to SELECT DISTINCT * FROM df

Parameters
Returns

[description]

Return type

DataFrame

dropna(df, how='any', thresh=None, subset=None, metadata=None)[source]#

Drop NA recods from dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (Optional[int]) – int, drops rows that have less than thresh non-null values

  • subset (Optional[List[str]]) – list of columns to operate on

  • metadata (Any, optional) – dict-like object to add to the result dataframe, defaults to None

Returns

DataFrame with NA records dropped

Return type

DataFrame

fillna(df, value, subset=None, metadata=None)[source]#

Fill NULL, NAN, NAT values in a dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary

  • metadata (Any, optional) – dict-like object to add to the result dataframe, defaults to None

Returns

DataFrame with NA records filled

Return type

DataFrame

property fs: triad.collections.fs.FileSystem#

File system of this engine instance

intersect(df1, df2, distinct=True, metadata=None)[source]#

Intersect df1 and df2

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

join(df1, df2, how, on=[], metadata=None)[source]#

Join two dataframes

Parameters
  • df1 (fugue.dataframe.dataframe.DataFrame) – the first dataframe

  • df2 (fugue.dataframe.dataframe.DataFrame) – the second dataframe

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (List[str]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

the joined dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Please read this

load_df(path, format_hint=None, columns=None, **kwargs)[source]#

Load dataframe from persistent storage

Parameters
  • path (Union[str, List[str]]) – the path to the dataframe

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Optional[Any]) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

Returns

an engine compatible dataframe

Return type

fugue_dask.dataframe.DaskDataFrame

For more details and examples, read Zip & Comap.

property log: logging.Logger#

Logger of this engine instance

map(df, map_func, output_schema, partition_spec, metadata=None, on_init=None)[source]#

Apply a function to each partition after you partition the data in a specified way.

Parameters
Returns

the dataframe after the map operation

Return type

fugue.dataframe.dataframe.DataFrame

Note

Before implementing, you must read this to understand what map is used for and how it should work.

persist(df, lazy=False, **kwargs)[source]#

Force materializing and caching the dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • args – parameter to pass to the underlying persist implementation

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns

the persisted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

property pl_utils: fugue_dask._utils.DaskUtils#

Pandas-like dataframe utils

repartition(df, partition_spec)[source]#

Partition the input dataframe using partition_spec.

Parameters
Returns

repartitioned dataframe

Return type

fugue_dask.dataframe.DaskDataFrame

Note

Before implementing please read the Partition Tutorial

sample(df, n=None, frac=None, replace=False, seed=None, metadata=None)[source]#

Sample dataframe by number of rows or by fraction

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • n (Optional[int]) – number of rows to sample, one and only one of n and frac must be set

  • frac (Optional[float]) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (Optional[int]) – seed for randomness, defaults to None

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

sampled dataframe

Return type

DataFrame

save_df(df, path, format_hint=None, mode='overwrite', partition_spec=PartitionSpec(num='0', by=[], presort=''), force_single=False, **kwargs)[source]#

Save dataframe to a persistent storage

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (fugue.collections.partition.PartitionSpec) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type

None

For more details and examples, read Load & Save.

subtract(df1, df2, distinct=True, metadata=None)[source]#

df1 - df2

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

take(df, n, presort, na_position='last', partition_spec=PartitionSpec(num='0', by=[], presort=''), metadata=None)[source]#

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition_spec (fugue.collections.partition.PartitionSpec) – PartitionSpec to apply the take operation

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

n rows of DataFrame per partition

Return type

DataFrame

to_df(df, schema=None, metadata=None)[source]#

Convert a data structure to DaskDataFrame

Parameters
  • dataDataFrame, dask.dataframe.DataFrame, pandas DataFrame or list or iterable of arrays

  • schema (Optional[Any]) – Schema like object, defaults to None.

  • metadata (Optional[Any]) – Parameters like object, defaults to None

  • df (Any) –

Returns

engine compatible dataframe

Return type

fugue_dask.dataframe.DaskDataFrame

Note

  • if the input is already DaskDataFrame, it should return itself

  • For list or iterable of arrays, schema must be specified

  • When schema is not None, a potential type cast may happen to ensure the dataframe’s schema.

  • all other methods in the engine can take arbitrary dataframes and call this method to convert before doing anything

union(df1, df2, distinct=True, metadata=None)[source]#

Join two dataframes

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

class fugue_dask.execution_engine.QPDDaskEngine(execution_engine)[source]#

Bases: fugue.execution.execution_engine.SQLEngine

QPD execution implementation.

Parameters

execution_engine (fugue.execution.execution_engine.ExecutionEngine) – the execution engine this sql engine will run on

select(dfs, statement)[source]#

Execute select statement on the sql engine.

Parameters
Returns

result of the SELECT statement

Return type

fugue.dataframe.dataframe.DataFrame

Examples

>>> dfs = DataFrames(a=df1, b=df2)
>>> sql_engine.select(dfs, "SELECT * FROM a UNION SELECT * FROM b")

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")

fugue_dask.ibis_engine#

class fugue_dask.ibis_engine.DaskIbisEngine(execution_engine)[source]#

Bases: fugue_ibis.execution.ibis_engine.IbisEngine

Parameters

execution_engine (fugue.execution.execution_engine.ExecutionEngine) –

Return type

None

select(dfs, ibis_func)[source]#

Execute the ibis select expression.

Parameters
Returns

result of the ibis function

Return type

fugue.dataframe.dataframe.DataFrame

Note

This interface is experimental, so it is subjected to change.

fugue_dask.registry#