from typing import Any, Callable, Dict, List, Optional, Type, Union
import pyarrow as pa
import ray
from duckdb import DuckDBPyConnection
from packaging import version
from triad import Schema, assert_or_throw, to_uuid
from triad.utils.threading import RunOnce
from fugue import (
ArrowDataFrame,
DataFrame,
ExecutionEngine,
LocalDataFrame,
MapEngine,
PartitionCursor,
PartitionSpec,
)
from fugue.constants import KEYWORD_PARALLELISM, KEYWORD_ROWCOUNT
from fugue_duckdb.dataframe import DuckDataFrame
from fugue_duckdb.execution_engine import DuckExecutionEngine
from ._constants import FUGUE_RAY_DEFAULT_BATCH_SIZE, FUGUE_RAY_ZERO_COPY
from ._utils.cluster import get_default_partitions, get_default_shuffle_partitions
from ._utils.dataframe import add_coarse_partition_key, add_partition_key
from ._utils.io import RayIO
from .dataframe import RayDataFrame
_RAY_PARTITION_KEY = "__ray_partition_key__"
[docs]
class RayMapEngine(MapEngine):
@property
def execution_engine_constraint(self) -> Type[ExecutionEngine]:
return RayExecutionEngine
@property
def is_distributed(self) -> bool:
return True
[docs]
def map_dataframe(
self,
df: DataFrame,
map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame],
output_schema: Any,
partition_spec: PartitionSpec,
on_init: Optional[Callable[[int, DataFrame], Any]] = None,
map_func_format_hint: Optional[str] = None,
) -> DataFrame:
if len(partition_spec.partition_by) == 0:
return self._map(
df=df,
map_func=map_func,
output_schema=output_schema,
partition_spec=partition_spec,
on_init=on_init,
)
else:
return self._group_map(
df=df,
map_func=map_func,
output_schema=output_schema,
partition_spec=partition_spec,
on_init=on_init,
)
def _group_map(
self,
df: DataFrame,
map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame],
output_schema: Any,
partition_spec: PartitionSpec,
on_init: Optional[Callable[[int, DataFrame], Any]] = None,
) -> DataFrame:
output_schema = Schema(output_schema)
input_schema = df.schema
presort = partition_spec.get_sorts(
input_schema, with_partition_keys=partition_spec.algo == "coarse"
)
presort_tuples = [
(k, "ascending" if v else "descending") for k, v in presort.items()
]
cursor = partition_spec.get_cursor(input_schema, 0)
on_init_once: Any = (
None
if on_init is None
else RunOnce(
on_init, lambda *args, **kwargs: to_uuid(id(on_init), id(args[0]))
)
)
def _udf(adf: pa.Table) -> pa.Table: # pragma: no cover
if adf.shape[0] == 0:
return output_schema.create_empty_arrow_table()
adf = adf.remove_column(len(input_schema)) # remove partition key
if len(partition_spec.presort) > 0:
if version.parse(pa.__version__).major < 7: # pragma: no cover
idx = pa.compute.sort_indices(
adf, options=pa.compute.SortOptions(presort_tuples)
)
adf = adf.take(idx)
else:
adf = adf.sort_by(presort_tuples)
input_df = ArrowDataFrame(adf)
if on_init_once is not None:
on_init_once(0, input_df)
cursor.set(lambda: input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
return output_df.as_arrow()
_df: RayDataFrame = self.execution_engine._to_ray_df(df) # type: ignore
if partition_spec.num_partitions != "0":
_df = self.execution_engine.repartition(_df, partition_spec) # type: ignore
else:
n = get_default_shuffle_partitions(self.execution_engine)
if n > 0 and n != _df.num_partitions:
# if n==0 or same as the current dataframe partitions
# then no repartition will be done by fugue
# otherwise, repartition the dataset
_df = self.execution_engine.repartition( # type: ignore
_df, PartitionSpec(num=n)
)
if partition_spec.algo != "coarse":
rdf, _ = add_partition_key(
_df.native,
keys=partition_spec.partition_by,
input_schema=input_schema,
output_key=_RAY_PARTITION_KEY,
)
else:
rdf = add_coarse_partition_key(
_df.native,
keys=partition_spec.partition_by,
output_key=_RAY_PARTITION_KEY,
bucket=_df.num_partitions,
)
gdf = rdf.groupby(_RAY_PARTITION_KEY)
sdf = gdf.map_groups(
_udf,
batch_format="pyarrow",
**self.execution_engine._get_remote_args(), # type: ignore
)
return RayDataFrame(sdf, schema=output_schema, internal_schema=True)
def _map(
self,
df: DataFrame,
map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame],
output_schema: Any,
partition_spec: PartitionSpec,
on_init: Optional[Callable[[int, DataFrame], Any]] = None,
) -> DataFrame:
output_schema = Schema(output_schema)
input_schema = df.schema
cursor = partition_spec.get_cursor(input_schema, 0)
on_init_once: Any = (
None
if on_init is None
else RunOnce(
on_init, lambda *args, **kwargs: to_uuid(id(on_init), id(args[0]))
)
)
def _udf(adf: pa.Table) -> pa.Table: # pragma: no cover
if adf.shape[0] == 0:
return output_schema.create_empty_arrow_table()
input_df = ArrowDataFrame(adf)
if on_init_once is not None:
on_init_once(0, input_df)
cursor.set(lambda: input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
return output_df.as_arrow()
rdf = self.execution_engine._to_ray_df(df) # type: ignore
if not partition_spec.empty:
rdf = self.execution_engine.repartition( # type: ignore
rdf, partition_spec=partition_spec
)
elif rdf.num_partitions <= 1:
n = get_default_partitions(self.execution_engine)
if n > 0 and n != rdf.num_partitions:
# if n==0 or same as the current dataframe partitions
# then no repartition will be done by fugue
# otherwise, repartition the dataset
rdf = self.execution_engine.repartition( # type: ignore
rdf, PartitionSpec(num=n)
)
mb_args: Dict[str, Any] = {}
if FUGUE_RAY_DEFAULT_BATCH_SIZE in self.conf:
mb_args["batch_size"] = self.conf.get_or_throw(
FUGUE_RAY_DEFAULT_BATCH_SIZE, int
)
if ray.__version__ >= "2.3":
mb_args["zero_copy_batch"] = self.conf.get(FUGUE_RAY_ZERO_COPY, True)
sdf = rdf.native.map_batches(
_udf,
batch_format="pyarrow",
**mb_args,
**self.execution_engine._get_remote_args(), # type: ignore
)
return RayDataFrame(sdf, schema=output_schema, internal_schema=True)
[docs]
class RayExecutionEngine(DuckExecutionEngine):
"""A hybrid engine of Ray and DuckDB as Phase 1 of Fugue Ray integration.
Most operations will be done by DuckDB, but for ``map``, it will use Ray.
:param conf: |ParamsLikeObject|, read |FugueConfig| to learn Fugue specific options
:param connection: DuckDB connection
"""
def __init__(
self, conf: Any = None, connection: Optional[DuckDBPyConnection] = None
):
if not ray.is_initialized(): # pragma: no cover
ray.init()
super().__init__(conf, connection)
self._io = RayIO(self)
def __repr__(self) -> str:
return "RayExecutionEngine"
@property
def is_distributed(self) -> bool:
return True
[docs]
def create_default_map_engine(self) -> MapEngine:
return RayMapEngine(self)
[docs]
def get_current_parallelism(self) -> int:
res = ray.cluster_resources()
n = res.get("CPU", 0)
if n == 0: # pragma: no cover
res.get("cpu", 0)
return int(n)
[docs]
def to_df(self, df: Any, schema: Any = None) -> DataFrame:
return self._to_ray_df(df, schema=schema)
[docs]
def repartition(self, df: DataFrame, partition_spec: PartitionSpec) -> DataFrame:
def _persist_and_count(df: RayDataFrame) -> int:
self.persist(df)
return df.count()
rdf = self._to_ray_df(df)
num_funcs = {
KEYWORD_ROWCOUNT: lambda: _persist_and_count(rdf),
KEYWORD_PARALLELISM: lambda: self.get_current_parallelism(),
}
num = partition_spec.get_num_partitions(**num_funcs)
pdf = rdf.native
if num > 0:
if partition_spec.algo in ["default", "hash", "even", "coarse"]:
pdf = pdf.repartition(num)
elif partition_spec.algo == "rand":
pdf = pdf.repartition(num, shuffle=True)
else: # pragma: no cover
raise NotImplementedError(partition_spec.algo + " is not supported")
return RayDataFrame(pdf, schema=rdf.schema, internal_schema=True)
[docs]
def broadcast(self, df: DataFrame) -> DataFrame:
return df
[docs]
def persist(
self,
df: DataFrame,
lazy: bool = False,
**kwargs: Any,
) -> DataFrame:
df = self._to_auto_df(df)
if isinstance(df, RayDataFrame):
return df.persist(**kwargs)
return df # pragma: no cover
[docs]
def convert_yield_dataframe(self, df: DataFrame, as_local: bool) -> DataFrame:
if isinstance(df, RayDataFrame):
return df if not as_local else df.as_local()
return super().convert_yield_dataframe(df, as_local)
[docs]
def union(self, df1: DataFrame, df2: DataFrame, distinct: bool = True) -> DataFrame:
if distinct:
return super().union(df1, df2, distinct)
assert_or_throw(
df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}")
)
tdf1 = self._to_ray_df(df1)
tdf2 = self._to_ray_df(df2)
return RayDataFrame(tdf1.native.union(tdf2.native), df1.schema)
[docs]
def load_df( # type:ignore
self,
path: Union[str, List[str]],
format_hint: Any = None,
columns: Any = None,
**kwargs: Any,
) -> DataFrame:
return self._io.load_df(
uri=path, format_hint=format_hint, columns=columns, **kwargs
)
[docs]
def save_df(
self,
df: DataFrame,
path: str,
format_hint: Any = None,
mode: str = "overwrite",
partition_spec: Optional[PartitionSpec] = None,
force_single: bool = False,
**kwargs: Any,
) -> None:
partition_spec = partition_spec or PartitionSpec()
df = self._to_ray_df(df)
self._io.save_df(
df,
uri=path,
format_hint=format_hint,
mode=mode,
partition_spec=partition_spec,
force_single=force_single,
**kwargs,
)
def _to_ray_df(self, df: Any, schema: Any = None) -> RayDataFrame:
# TODO: remove this in phase 2
res = self._to_auto_df(df, schema)
if not isinstance(res, RayDataFrame):
return RayDataFrame(res)
return res
def _to_auto_df(self, df: Any, schema: Any = None) -> DataFrame:
# TODO: remove this in phase 2
if isinstance(df, (DuckDataFrame, RayDataFrame)):
assert_or_throw(
schema is None,
ValueError("schema must be None when df is a DataFrame"),
)
return df
return RayDataFrame(df, schema)
def _get_remote_args(self) -> Dict[str, Any]:
res: Dict[str, Any] = {}
for k, v in self.conf.items():
if k.startswith("fugue.ray.remote."):
key = k.split(".", 3)[-1]
res[key] = v
return res