fugue_ray#
fugue_ray.dataframe#
- class fugue_ray.dataframe.RayDataFrame(df=None, schema=None, internal_schema=False)[source]#
Bases:
DataFrame
DataFrame that wraps Ray DataSet. Please also read the DataFrame Tutorial to understand this Fugue concept
- Parameters
df (Any) –
ray.data.Dataset
,pyarrow.Table
,pandas.DataFrame
, FugueDataFrame
, or list or iterable of arraysschema (Any) – Schema like object, defaults to None. If the schema is different from the
df
schema, then type casts will happen.internal_schema (bool) – for internal schema, it means the schema is guaranteed by the provider to be consistent with the schema of
df
, so no type cast will happen. Defaults to False. This is for internal use only.
- alter_columns(columns)[source]#
Change column types
- Parameters
columns (Any) – Schema like object, all columns should be contained by the dataframe schema
- Returns
a new dataframe with altered columns, the order of the original schema will not change
- Return type
- as_array(columns=None, type_safe=False)[source]#
Convert to 2-dimensional native python array
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
2-dimensional native python array
- Return type
List[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_array_iterable(columns=None, type_safe=False)[source]#
Convert to iterable of native python arrays
- Parameters
columns (Optional[List[str]]) – columns to extract, defaults to None
type_safe (bool) – whether to ensure output conforms with its schema, defaults to False
- Returns
iterable of native python arrays
- Return type
Iterable[Any]
Note
If
type_safe
is False, then the returned values are ‘raw’ values.
- as_arrow(type_safe=False)[source]#
Convert to pyArrow DataFrame
- Parameters
type_safe (bool) –
- Return type
- as_local_bounded()[source]#
Convert this dataframe to a
LocalBoundedDataFrame
- Return type
- property empty: bool#
Whether this dataframe is empty
- head(n, columns=None)[source]#
Get first n rows of the dataframe as a new local bounded dataframe
- Parameters
n (int) – number of rows
columns (Optional[List[str]]) – selected columns, defaults to None (all columns)
- Returns
a local bounded dataframe
- Return type
- property is_bounded: bool#
Whether this dataframe is bounded
- property is_local: bool#
Whether this dataframe is a local Dataset
- native_as_df()[source]#
The dataframe form of the native object this Dataset class wraps. Dataframe form means the object contains schema information. For example the native an ArrayDataFrame is a python array, it doesn’t contain schema information, and its
native_as_df
should be either a pandas dataframe or an arrow dataframe.- Return type
- property num_partitions: int#
Number of physical partitions of this dataframe. Please read the Partition Tutorial
- peek_array()[source]#
Peek the first row of the dataframe as array
- Raises
FugueDatasetEmptyError – if it is empty
- Return type
List[Any]
fugue_ray.execution_engine#
- class fugue_ray.execution_engine.RayExecutionEngine(conf=None, connection=None)[source]#
Bases:
DuckExecutionEngine
A hybrid engine of Ray and DuckDB as Phase 1 of Fugue Ray integration. Most operations will be done by DuckDB, but for
map
, it will use Ray.- Parameters
conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options
connection (Optional[DuckDBPyConnection]) – DuckDB connection
- convert_yield_dataframe(df, as_local)[source]#
Convert a yield dataframe to a dataframe that can be used after this execution engine stops.
- Parameters
df (DataFrame) – DataFrame
as_local (bool) – whether yield a local dataframe
- Returns
another DataFrame that can be used after this execution engine stops
- Return type
Note
By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark session in this engine’s
start_engine()
andstop_engine()
, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.
- get_current_parallelism()[source]#
Get the current number of parallelism of this engine
- Return type
int
- property is_distributed: bool#
Whether this engine is a distributed engine
- load_df(path, format_hint=None, columns=None, **kwargs)[source]#
Load dataframe from persistent storage
- Parameters
path (Union[str, List[str]]) – the path to the dataframe
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infercolumns (Optional[Any]) – list of columns or a Schema like object, defaults to None
kwargs (Any) – parameters to pass to the underlying framework
- Returns
an engine compatible dataframe
- Return type
For more details and examples, read Zip & Comap.
- persist(df, lazy=False, **kwargs)[source]#
Force materializing and caching the dataframe
- Parameters
df (DataFrame) – the input dataframe
lazy (bool) –
True
: first usage of the output will trigger persisting to happen;False
(eager): persist is forced to happend immediately. Default toFalse
kwargs (Any) – parameter to pass to the underlying persist implementation
- Returns
the persisted dataframe
- Return type
Note
persist
can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.
- repartition(df, partition_spec)[source]#
Partition the input dataframe using
partition_spec
.- Parameters
df (DataFrame) – input dataframe
partition_spec (PartitionSpec) – how you want to partition the dataframe
- Returns
repartitioned dataframe
- Return type
Note
Before implementing please read the Partition Tutorial
- save_df(df, path, format_hint=None, mode='overwrite', partition_spec=None, force_single=False, **kwargs)[source]#
Save dataframe to a persistent storage
- Parameters
df (DataFrame) – input dataframe
path (str) – output path
format_hint (Optional[Any]) – can accept
parquet
,csv
,json
, defaults to None, meaning to infermode (str) – can accept
overwrite
,append
,error
, defaults to “overwrite”partition_spec (Optional[PartitionSpec]) – how to partition the dataframe before saving, defaults to empty
force_single (bool) – force the output as a single file, defaults to False
kwargs (Any) – parameters to pass to the underlying framework
- Return type
None
For more details and examples, read Load & Save.
- to_df(df, schema=None)[source]#
Convert a data structure to this engine compatible DataFrame
- Parameters
data –
DataFrame
, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementationschema (Optional[Any]) – Schema like object, defaults to None
df (Any) –
- Returns
engine compatible dataframe
- Return type
Note
There are certain conventions to follow for a new implementation:
if the input is already in compatible dataframe type, it should return itself
all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything
- class fugue_ray.execution_engine.RayMapEngine(execution_engine)[source]#
Bases:
MapEngine
- Parameters
execution_engine (ExecutionEngine) –
- property execution_engine_constraint: Type[ExecutionEngine]#
This defines the required ExecutionEngine type of this facet
- Returns
a subtype of
ExecutionEngine
- property is_distributed: bool#
Whether this engine is a distributed engine
- map_dataframe(df, map_func, output_schema, partition_spec, on_init=None, map_func_format_hint=None)[source]#
Apply a function to each partition after you partition the dataframe in a specified way.
- Parameters
df (DataFrame) – input dataframe
map_func (Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame]) – the function to apply on every logical partition
output_schema (Any) – Schema like object that can’t be None. Please also understand why we need this
partition_spec (PartitionSpec) – partition specification
on_init (Optional[Callable[[int, DataFrame], Any]]) – callback function when the physical partition is initializaing, defaults to None
map_func_format_hint (Optional[str]) – the preferred data format for
map_func
, it can bepandas
, pyarrow, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint.
- Returns
the dataframe after the map operation
- Return type
Note
Before implementing, you must read this to understand what map is used for and how it should work.