fugue_ray#

fugue_ray.dataframe#

class fugue_ray.dataframe.RayDataFrame(df=None, schema=None, internal_schema=False)[source]#

Bases: fugue.dataframe.dataframe.DataFrame

DataFrame that wraps Ray DataSet. Please also read the DataFrame Tutorial to understand this Fugue concept

Parameters
  • df (Any) – ray.data.Dataset, pyarrow.Table, pandas.DataFrame, Fugue DataFrame, or list or iterable of arrays

  • schema (Any) – Schema like object, defaults to None. If the schema is different from the df schema, then type casts will happen.

  • internal_schema (bool) – for internal schema, it means the schema is guaranteed by the provider to be consistent with the schema of df, so no type cast will happen. Defaults to False. This is for internal use only.

alter_columns(columns)[source]#

Change column types

Parameters

columns (Any) – Schema like object, all columns should be contained by the dataframe schema

Returns

a new dataframe with altered columns, the order of the original schema will not change

Return type

fugue.dataframe.dataframe.DataFrame

as_array(columns=None, type_safe=False)[source]#

Convert to 2-dimensional native python array

Parameters
  • columns (Optional[List[str]]) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns

2-dimensional native python array

Return type

List[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_array_iterable(columns=None, type_safe=False)[source]#

Convert to iterable of native python arrays

Parameters
  • columns (Optional[List[str]]) – columns to extract, defaults to None

  • type_safe (bool) – whether to ensure output conforms with its schema, defaults to False

Returns

iterable of native python arrays

Return type

Iterable[Any]

Note

If type_safe is False, then the returned values are ‘raw’ values.

as_arrow(type_safe=False)[source]#

Convert to pyArrow DataFrame

Parameters

type_safe (bool) –

Return type

pyarrow.lib.Table

as_local()[source]#

Convert this dataframe to a LocalDataFrame

Return type

fugue.dataframe.dataframe.LocalDataFrame

as_pandas()[source]#

Convert to pandas DataFrame

Return type

pandas.core.frame.DataFrame

count()[source]#

Get number of rows of this dataframe

Return type

int

property empty: bool#

Whether this dataframe is empty

head(n, columns=None)[source]#

Get first n rows of the dataframe as a new local bounded dataframe

Parameters
  • n (int) – number of rows

  • columns (Optional[List[str]]) – selected columns, defaults to None (all columns)

Returns

a local bounded dataframe

Return type

fugue.dataframe.dataframe.LocalBoundedDataFrame

property is_bounded: bool#

Whether this dataframe is bounded

property is_local: bool#

Whether this dataframe is a LocalDataFrame

property native: ray.data.dataset.Dataset#

The wrapped ray Dataset

property num_partitions: int#

Number of physical partitions of this dataframe. Please read the Partition Tutorial

peek_array()[source]#

Peek the first row of the dataframe as array

Raises

FugueDatasetEmptyError – if it is empty

Return type

Any

persist(**kwargs)[source]#
Parameters

kwargs (Any) –

Return type

fugue_ray.dataframe.RayDataFrame

rename(columns)[source]#

Rename the dataframe using a mapping dict

Parameters

columns (Dict[str, str]) – key: the original column name, value: the new name

Returns

a new dataframe with the new names

Return type

fugue.dataframe.dataframe.DataFrame

fugue_ray.execution_engine#

class fugue_ray.execution_engine.RayExecutionEngine(conf=None, connection=None)[source]#

Bases: fugue_duckdb.execution_engine.DuckExecutionEngine

A hybrid engine of Ray and DuckDB as Phase 1 of Fugue Ray integration. Most operations will be done by DuckDB, but for map, it will use Ray.

Parameters
  • conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options

  • connection (Optional[duckdb.DuckDBPyConnection]) – DuckDB connection

broadcast(df)[source]#

Broadcast the dataframe to all workers for a distributed computing framework

Parameters

df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

Returns

the broadcasted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

convert_yield_dataframe(df, as_local)[source]#

Convert a yield dataframe to a dataframe that can be used after this execution engine stops.

Parameters
Returns

another DataFrame that can be used after this execution engine stops

Return type

fugue.dataframe.dataframe.DataFrame

Note

By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.

For example, if you custom a spark engine where you start and stop the spark session in this engine’s start_engine() and stop_engine(), then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.

create_default_map_engine()[source]#

Default MapEngine if user doesn’t specify

Return type

fugue.execution.execution_engine.MapEngine

load_df(path, format_hint=None, columns=None, **kwargs)[source]#

Load dataframe from persistent storage

Parameters
  • path (Union[str, List[str]]) – the path to the dataframe

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Optional[Any]) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

Returns

an engine compatible dataframe

Return type

fugue.dataframe.dataframe.DataFrame

For more details and examples, read Zip & Comap.

persist(df, lazy=False, **kwargs)[source]#

Force materializing and caching the dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • args – parameter to pass to the underlying persist implementation

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns

the persisted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

repartition(df, partition_spec)[source]#

Partition the input dataframe using partition_spec.

Parameters
Returns

repartitioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Before implementing please read the Partition Tutorial

save_df(df, path, format_hint=None, mode='overwrite', partition_spec=PartitionSpec(num='0', by=[], presort=''), force_single=False, **kwargs)[source]#

Save dataframe to a persistent storage

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (fugue.collections.partition.PartitionSpec) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type

None

For more details and examples, read Load & Save.

to_df(df, schema=None)[source]#

Convert a data structure to this engine compatible DataFrame

Parameters
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Optional[Any]) – Schema like object, defaults to None

  • df (Any) –

Returns

engine compatible dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

class fugue_ray.execution_engine.RayMapEngine(execution_engine)[source]#

Bases: fugue.execution.execution_engine.MapEngine

Parameters

execution_engine (ExecutionEngine) –

Return type

None

map_dataframe(df, map_func, output_schema, partition_spec, on_init=None)[source]#

Apply a function to each partition after you partition the dataframe in a specified way.

Parameters
Returns

the dataframe after the map operation

Return type

fugue.dataframe.dataframe.DataFrame

Note

Before implementing, you must read this to understand what map is used for and how it should work.

fugue_ray.registry#