Source code for fugue.execution.execution_engine

import inspect
import logging
from abc import ABC, abstractmethod
from contextlib import contextmanager
from contextvars import ContextVar
from typing import (
    Any,
    Callable,
    Dict,
    Iterator,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
)
from uuid import uuid4

from triad import ParamDict, Schema, SerializableRLock, assert_or_throw, to_uuid
from triad.collections.function_wrapper import AnnotatedParam
from triad.exceptions import InvalidOperationError
from triad.utils.convert import to_size

from fugue.bag import Bag, LocalBag
from fugue.collections.partition import (
    BagPartitionCursor,
    PartitionCursor,
    PartitionSpec,
)
from fugue.collections.sql import StructuredRawSQL, TempTableName
from fugue.collections.yielded import PhysicalYielded, Yielded
from fugue.column import (
    ColumnExpr,
    SelectColumns,
    SQLExpressionGenerator,
    all_cols,
    col,
    is_agg,
)
from fugue.constants import _FUGUE_GLOBAL_CONF, FUGUE_SQL_DEFAULT_DIALECT
from fugue.dataframe import AnyDataFrame, DataFrame, DataFrames, fugue_annotated_param
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.dataframe import LocalDataFrame
from fugue.dataframe.utils import deserialize_df, serialize_df
from fugue.exceptions import FugueWorkflowRuntimeError

AnyExecutionEngine = TypeVar("AnyExecutionEngine", object, None)

_FUGUE_EXECUTION_ENGINE_CONTEXT = ContextVar(
    "_FUGUE_EXECUTION_ENGINE_CONTEXT", default=None
)

_CONTEXT_LOCK = SerializableRLock()

_FUGUE_SERIALIZED_BLOB_COL = "__fugue_serialized_blob__"
_FUGUE_SERIALIZED_BLOB_NO_COL = "__fugue_serialized_blob_no__"
_FUGUE_SERIALIZED_BLOB_NAME_COL = "__fugue_serialized_blob_name__"
_FUGUE_SERIALIZED_BLOB_DUMMY_COL = "__fugue_serialized_blob_dummy__"

_FUGUE_SERIALIZED_BLOB_SCHEMA = Schema(
    {
        _FUGUE_SERIALIZED_BLOB_COL: bytes,
        _FUGUE_SERIALIZED_BLOB_NO_COL: int,
        _FUGUE_SERIALIZED_BLOB_NAME_COL: str,
        _FUGUE_SERIALIZED_BLOB_DUMMY_COL: int,
    }
)


class _GlobalExecutionEngineContext:
    def __init__(self):
        self._engine: Optional["ExecutionEngine"] = None

    def set(self, engine: Optional["ExecutionEngine"]):
        with _CONTEXT_LOCK:
            if self._engine is not None:
                self._engine._is_global = False
                self._engine._exit_context()
            self._engine = engine
            if engine is not None:
                engine._enter_context()
                engine._is_global = True

    def get(self) -> Optional["ExecutionEngine"]:
        return self._engine


_FUGUE_GLOBAL_EXECUTION_ENGINE_CONTEXT = _GlobalExecutionEngineContext()


[docs] class FugueEngineBase(ABC):
[docs] @abstractmethod def to_df( self, df: AnyDataFrame, schema: Any = None ) -> DataFrame: # pragma: no cover """Convert a data structure to this engine compatible DataFrame :param data: :class:`~fugue.dataframe.dataframe.DataFrame`, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation :param schema: |SchemaLikeObject|, defaults to None :return: engine compatible dataframe .. note:: There are certain conventions to follow for a new implementation: * if the input is already in compatible dataframe type, it should return itself * all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything """ raise NotImplementedError
@property @abstractmethod def log(self) -> logging.Logger: # pragma: no cover """Logger of this engine instance""" raise NotImplementedError @property @abstractmethod def conf(self) -> ParamDict: # pragma: no cover """All configurations of this engine instance. .. note:: It can contain more than you providec, for example in :class:`~.fugue_spark.execution_engine.SparkExecutionEngine`, the Spark session can bring in more config, they are all accessible using this property. """ raise NotImplementedError @property @abstractmethod def is_distributed(self) -> bool: # pragma: no cover """Whether this engine is a distributed engine""" raise NotImplementedError
[docs] class EngineFacet(FugueEngineBase): """The base class for different factes of the execution engines. :param execution_engine: the execution engine this sql engine will run on """ def __init__(self, execution_engine: "ExecutionEngine") -> None: tp = self.execution_engine_constraint if not isinstance(execution_engine, tp): raise TypeError( f"{self} expects the engine type to be " f"{tp}, but got {type(execution_engine)}" ) self._execution_engine = execution_engine @property def execution_engine(self) -> "ExecutionEngine": """the execution engine this sql engine will run on""" return self._execution_engine @property def log(self) -> logging.Logger: return self.execution_engine.log @property def conf(self) -> ParamDict: return self.execution_engine.conf
[docs] def to_df(self, df: AnyDataFrame, schema: Any = None) -> DataFrame: return self.execution_engine.to_df(df, schema)
@property def execution_engine_constraint(self) -> Type["ExecutionEngine"]: """This defines the required ExecutionEngine type of this facet :return: a subtype of :class:`~.ExecutionEngine` """ return ExecutionEngine
[docs] class SQLEngine(EngineFacet): """The abstract base class for different SQL execution implementations. Please read :ref:`this <tutorial:tutorials/advanced/execution_engine:sqlengine>` to understand the concept :param execution_engine: the execution engine this sql engine will run on """ def __init__(self, execution_engine: "ExecutionEngine") -> None: super().__init__(execution_engine) self._uid = "_" + str(uuid4())[:5] + "_" @property def dialect(self) -> Optional[str]: # pragma: no cover return None
[docs] def encode_name(self, name: str) -> str: return self._uid + name
[docs] def encode( self, dfs: DataFrames, statement: StructuredRawSQL ) -> Tuple[DataFrames, str]: d = DataFrames({self.encode_name(k): v for k, v in dfs.items()}) s = statement.construct(self.encode_name, dialect=self.dialect, log=self.log) return d, s
[docs] @abstractmethod def select( self, dfs: DataFrames, statement: StructuredRawSQL ) -> DataFrame: # pragma: no cover """Execute select statement on the sql engine. :param dfs: a collection of dataframes that must have keys :param statement: the ``SELECT`` statement using the ``dfs`` keys as tables. :return: result of the ``SELECT`` statement .. admonition:: Examples .. code-block:: python dfs = DataFrames(a=df1, b=df2) sql_engine.select( dfs, [(False, "SELECT * FROM "), (True,"a"), (False," UNION SELECT * FROM "), (True,"b")]) .. note:: There can be tables that is not in ``dfs``. For example you want to select from hive without input DataFrames: >>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table") """ raise NotImplementedError
[docs] def table_exists(self, table: str) -> bool: # pragma: no cover """Whether the table exists :param table: the table name :return: whether the table exists """ raise NotImplementedError
[docs] def load_table(self, table: str, **kwargs: Any) -> DataFrame: # pragma: no cover """Load table as a dataframe :param table: the table name :return: an engine compatible dataframe """ raise NotImplementedError
[docs] def save_table( self, df: DataFrame, table: str, mode: str = "overwrite", partition_spec: Optional[PartitionSpec] = None, **kwargs: Any, ) -> None: # pragma: no cover """Save the dataframe to a table :param df: the dataframe to save :param table: the table name :param mode: can accept ``overwrite``, ``error``, defaults to "overwrite" :param partition_spec: how to partition the dataframe before saving, defaults None :param kwargs: parameters to pass to the underlying framework """ raise NotImplementedError
[docs] class MapEngine(EngineFacet): """The abstract base class for different map operation implementations. :param execution_engine: the execution engine this sql engine will run on """
[docs] @abstractmethod def map_dataframe( self, df: DataFrame, map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame], output_schema: Any, partition_spec: PartitionSpec, on_init: Optional[Callable[[int, DataFrame], Any]] = None, map_func_format_hint: Optional[str] = None, ) -> DataFrame: # pragma: no cover """Apply a function to each partition after you partition the dataframe in a specified way. :param df: input dataframe :param map_func: the function to apply on every logical partition :param output_schema: |SchemaLikeObject| that can't be None. Please also understand :ref:`why we need this <tutorial:tutorials/beginner/interface:schema>` :param partition_spec: partition specification :param on_init: callback function when the physical partition is initializaing, defaults to None :param map_func_format_hint: the preferred data format for ``map_func``, it can be ``pandas``, `pyarrow`, etc, defaults to None. Certain engines can provide the most efficient map operations based on the hint. :return: the dataframe after the map operation .. note:: Before implementing, you must read :ref:`this <tutorial:tutorials/advanced/execution_engine:map>` to understand what map is used for and how it should work. """ raise NotImplementedError
# @abstractmethod
[docs] def map_bag( self, bag: Bag, map_func: Callable[[BagPartitionCursor, LocalBag], LocalBag], partition_spec: PartitionSpec, on_init: Optional[Callable[[int, Bag], Any]] = None, ) -> Bag: # pragma: no cover """Apply a function to each partition after you partition the bag in a specified way. :param df: input dataframe :param map_func: the function to apply on every logical partition :param partition_spec: partition specification :param on_init: callback function when the physical partition is initializaing, defaults to None :return: the bag after the map operation """ raise NotImplementedError
[docs] class ExecutionEngine(FugueEngineBase): """The abstract base class for execution engines. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user's higher level logic. Please read |ExecutionEngineTutorial| to understand this most important Fugue concept :param conf: dict-like config, read :doc:`this <tutorial:tutorials/advanced/useful_config>` to learn Fugue specific options """ def __init__(self, conf: Any): _conf = ParamDict(conf) self._conf = ParamDict({**_FUGUE_GLOBAL_CONF, **_conf}) self._compile_conf = ParamDict() self._sql_engine: Optional[SQLEngine] = None self._map_engine: Optional[MapEngine] = None self._ctx_count = 0 self._is_global = False self._private_lock = SerializableRLock() self._stop_engine_called = False
[docs] @contextmanager def as_context(self) -> Iterator["ExecutionEngine"]: """Set this execution engine as the context engine. This function is thread safe and async safe. .. admonition:: Examples .. code-block:: python with engine.as_context(): transform(df, func) # will use engine in this transformation """ return self._as_context()
@property def in_context(self) -> bool: """Whether this engine is being used as a context engine""" with _CONTEXT_LOCK: return self._ctx_count > 0
[docs] def set_global(self) -> "ExecutionEngine": """Set this execution engine to be the global execution engine. .. note:: Global engine is also considered as a context engine, so :meth:`~.ExecutionEngine.in_context` will also become true for the global engine. .. admonition:: Examples .. code-block:: python engine1.set_global(): transform(df, func) # will use engine1 in this transformation with engine2.as_context(): transform(df, func) # will use engine2 transform(df, func) # will use engine1 """ _FUGUE_GLOBAL_EXECUTION_ENGINE_CONTEXT.set(self) return self
@property def is_global(self) -> bool: """Whether this engine is being used as THE global engine""" return self._is_global
[docs] def on_enter_context(self) -> None: # pragma: no cover """The event hook when calling :func:`~.fugue.api.set_blobal_engine` or :func:`~.fugue.api.engine_context`, defaults to no operation """ return
[docs] def on_exit_context(self) -> None: # pragma: no cover """The event hook when calling :func:`~.fugue.api.clear_blobal_engine` or exiting from :func:`~.fugue.api.engine_context`, defaults to no operation """ return
[docs] def stop(self) -> None: """Stop this execution engine, do not override You should customize :meth:`~.stop_engine` if necessary. This function ensures :meth:`~.stop_engine` to be called only once .. note:: Once the engine is stopped it should not be used again """ with self._private_lock: if not self._stop_engine_called: self.stop_engine() self._stop_engine_called = True
[docs] def stop_engine(self) -> None: # pragma: no cover """Custom logic to stop the execution engine, defaults to no operation""" return
@property def conf(self) -> ParamDict: return self._conf @property def map_engine(self) -> MapEngine: """The :class:`~.MapEngine` currently used by this execution engine. You should use :meth:`~.set_map_engine` to set a new MapEngine instance. If not set, the default is :meth:`~.create_default_map_engine` """ if self._map_engine is None: self._map_engine = self.create_default_map_engine() return self._map_engine @property def sql_engine(self) -> SQLEngine: """The :class:`~.SQLEngine` currently used by this execution engine. You should use :meth:`~.set_sql_engine` to set a new SQLEngine instance. If not set, the default is :meth:`~.create_default_sql_engine` """ if self._sql_engine is None: self._sql_engine = self.create_default_sql_engine() return self._sql_engine
[docs] def set_sql_engine(self, engine: SQLEngine) -> None: """Set :class:`~.SQLEngine` for this execution engine. If not set, the default is :meth:`~.create_default_sql_engine` :param engine: :class:`~.SQLEngine` instance """ self._sql_engine = engine
[docs] @abstractmethod def create_default_map_engine(self) -> MapEngine: # pragma: no cover """Default MapEngine if user doesn't specify""" raise NotImplementedError
[docs] @abstractmethod def create_default_sql_engine(self) -> SQLEngine: # pragma: no cover """Default SQLEngine if user doesn't specify""" raise NotImplementedError
[docs] @abstractmethod def get_current_parallelism(self) -> int: # pragma: no cover """Get the current number of parallelism of this engine""" raise NotImplementedError
[docs] @abstractmethod def repartition( self, df: DataFrame, partition_spec: PartitionSpec ) -> DataFrame: # pragma: no cover """Partition the input dataframe using ``partition_spec``. :param df: input dataframe :param partition_spec: how you want to partition the dataframe :return: repartitioned dataframe .. note:: Before implementing please read |PartitionTutorial| """ raise NotImplementedError
[docs] @abstractmethod def broadcast(self, df: DataFrame) -> DataFrame: # pragma: no cover """Broadcast the dataframe to all workers for a distributed computing framework :param df: the input dataframe :return: the broadcasted dataframe """ raise NotImplementedError
[docs] @abstractmethod def persist( self, df: DataFrame, lazy: bool = False, **kwargs: Any, ) -> DataFrame: # pragma: no cover """Force materializing and caching the dataframe :param df: the input dataframe :param lazy: ``True``: first usage of the output will trigger persisting to happen; ``False`` (eager): persist is forced to happend immediately. Default to ``False`` :param kwargs: parameter to pass to the underlying persist implementation :return: the persisted dataframe .. note:: ``persist`` can only guarantee the persisted dataframe will be computed for only once. However this doesn't mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn't cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow. """ raise NotImplementedError
[docs] @abstractmethod def join( self, df1: DataFrame, df2: DataFrame, how: str, on: Optional[List[str]] = None, ) -> DataFrame: # pragma: no cover """Join two dataframes :param df1: the first dataframe :param df2: the second dataframe :param how: can accept ``semi``, ``left_semi``, ``anti``, ``left_anti``, ``inner``, ``left_outer``, ``right_outer``, ``full_outer``, ``cross`` :param on: it can always be inferred, but if you provide, it will be validated against the inferred keys. :return: the joined dataframe .. note:: Please read :func:`~.fugue.dataframe.utils.get_join_schemas` """ raise NotImplementedError
[docs] @abstractmethod def union( self, df1: DataFrame, df2: DataFrame, distinct: bool = True, ) -> DataFrame: # pragma: no cover """Join two dataframes :param df1: the first dataframe :param df2: the second dataframe :param distinct: ``true`` for ``UNION`` (== ``UNION DISTINCT``), ``false`` for ``UNION ALL`` :return: the unioned dataframe .. note:: Currently, the schema of ``df1`` and ``df2`` must be identical, or an exception will be thrown. """ raise NotImplementedError
[docs] @abstractmethod def subtract( self, df1: DataFrame, df2: DataFrame, distinct: bool = True, ) -> DataFrame: # pragma: no cover """``df1 - df2`` :param df1: the first dataframe :param df2: the second dataframe :param distinct: ``true`` for ``EXCEPT`` (== ``EXCEPT DISTINCT``), ``false`` for ``EXCEPT ALL`` :return: the unioned dataframe .. note:: Currently, the schema of ``df1`` and ``df2`` must be identical, or an exception will be thrown. """ raise NotImplementedError
[docs] @abstractmethod def intersect( self, df1: DataFrame, df2: DataFrame, distinct: bool = True, ) -> DataFrame: # pragma: no cover """Intersect ``df1`` and ``df2`` :param df1: the first dataframe :param df2: the second dataframe :param distinct: ``true`` for ``INTERSECT`` (== ``INTERSECT DISTINCT``), ``false`` for ``INTERSECT ALL`` :return: the unioned dataframe .. note:: Currently, the schema of ``df1`` and ``df2`` must be identical, or an exception will be thrown. """ raise NotImplementedError
[docs] @abstractmethod def distinct( self, df: DataFrame, ) -> DataFrame: # pragma: no cover """Equivalent to ``SELECT DISTINCT * FROM df`` :param df: dataframe :return: [description] :rtype: DataFrame """ pass
[docs] @abstractmethod def dropna( self, df: DataFrame, how: str = "any", thresh: int = None, subset: List[str] = None, ) -> DataFrame: # pragma: no cover """Drop NA recods from dataframe :param df: DataFrame :param how: 'any' or 'all'. 'any' drops rows that contain any nulls. 'all' drops rows that contain all nulls. :param thresh: int, drops rows that have less than thresh non-null values :param subset: list of columns to operate on :return: DataFrame with NA records dropped :rtype: DataFrame """ pass
[docs] @abstractmethod def fillna( self, df: DataFrame, value: Any, subset: List[str] = None ) -> DataFrame: # pragma: no cover """ Fill ``NULL``, ``NAN``, ``NAT`` values in a dataframe :param df: DataFrame :param value: if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values. :param subset: list of columns to operate on. ignored if value is a dictionary :return: DataFrame with NA records filled :rtype: DataFrame """ pass
[docs] @abstractmethod def sample( self, df: DataFrame, n: Optional[int] = None, frac: Optional[float] = None, replace: bool = False, seed: Optional[int] = None, ) -> DataFrame: # pragma: no cover """ Sample dataframe by number of rows or by fraction :param df: DataFrame :param n: number of rows to sample, one and only one of ``n`` and ``frac`` must be set :param frac: fraction [0,1] to sample, one and only one of ``n`` and ``frac`` must be set :param replace: whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False :param seed: seed for randomness, defaults to None :return: sampled dataframe :rtype: DataFrame """ pass
[docs] @abstractmethod def take( self, df: DataFrame, n: int, presort: str, na_position: str = "last", partition_spec: Optional[PartitionSpec] = None, ) -> DataFrame: # pragma: no cover """ Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value. :param df: DataFrame :param n: number of rows to return :param presort: presort expression similar to partition presort :param na_position: position of null values during the presort. can accept ``first`` or ``last`` :param partition_spec: PartitionSpec to apply the take operation :return: n rows of DataFrame per partition :rtype: DataFrame """ pass
[docs] def select( self, df: DataFrame, cols: SelectColumns, where: Optional[ColumnExpr] = None, having: Optional[ColumnExpr] = None, ) -> DataFrame: """The functional interface for SQL select statement :param df: the dataframe to be operated on :param cols: column expressions :param where: ``WHERE`` condition expression, defaults to None :param having: ``having`` condition expression, defaults to None. It is used when ``cols`` contains aggregation columns, defaults to None :return: the select result as a dataframe .. admonition:: New Since :class: hint **0.6.0** .. attention:: This interface is experimental, it's subjected to change in new versions. .. seealso:: Please find more expression examples in :mod:`fugue.column.sql` and :mod:`fugue.column.functions` .. admonition:: Examples .. code-block:: python import fugue.column.functions as f # select existed and new columns engine.select(df, SelectColumns(col("a"),col("b"),lit(1,"another"))) engine.select(df, SelectColumns(col("a"),(col("b")+lit(1)).alias("x"))) # aggregation # SELECT COUNT(DISTINCT *) AS x FROM df engine.select( df, SelectColumns(f.count_distinct(all_cols()).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df GROUP BY a engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x"))) # SELECT a, MAX(b+1) AS x FROM df # WHERE b<2 AND a>1 # GROUP BY a # HAVING MAX(b+1)>0 engine.select( df, SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")), where=(col("b")<2) & (col("a")>1), having=f.max(col("b")+lit(1))>0 ) """ gen = SQLExpressionGenerator(enable_cast=False) df_name = TempTableName() sql = StructuredRawSQL( gen.select(cols, df_name.key, where=where, having=having), dialect=FUGUE_SQL_DEFAULT_DIALECT, ) res = self.sql_engine.select(DataFrames({df_name.key: self.to_df(df)}), sql) diff = gen.correct_select_schema(df.schema, cols, res.schema) return res if diff is None else res.alter_columns(diff)
[docs] def filter(self, df: DataFrame, condition: ColumnExpr) -> DataFrame: """Filter rows by the given condition :param df: the dataframe to be filtered :param condition: (boolean) column expression :return: the filtered dataframe .. admonition:: New Since :class: hint **0.6.0** .. seealso:: Please find more expression examples in :mod:`fugue.column.sql` and :mod:`fugue.column.functions` .. admonition:: Examples .. code-block:: python import fugue.column.functions as f engine.filter(df, (col("a")>1) & (col("b")=="x")) engine.filter(df, f.coalesce(col("a"),col("b"))>1) """ return self.select(df, cols=SelectColumns(all_cols()), where=condition)
[docs] def assign(self, df: DataFrame, columns: List[ColumnExpr]) -> DataFrame: """Update existing columns with new values and add new columns :param df: the dataframe to set columns :param columns: column expressions :return: the updated dataframe .. tip:: This can be used to cast data types, alter column values or add new columns. But you can't use aggregation in columns. .. admonition:: New Since :class: hint **0.6.0** .. seealso:: Please find more expression examples in :mod:`fugue.column.sql` and :mod:`fugue.column.functions` .. admonition:: Examples .. code-block:: python # assume df has schema: a:int,b:str # add constant column x engine.assign(df, lit(1,"x")) # change column b to be a constant integer engine.assign(df, lit(1,"b")) # add new x to be a+b engine.assign(df, (col("a")+col("b")).alias("x")) # cast column a data type to double engine.assign(df, col("a").cast(float)) """ SelectColumns( *columns ).assert_no_wildcard().assert_all_with_names().assert_no_agg() ck = {v: k for k, v in enumerate(df.columns)} cols = [col(n) for n in ck.keys()] for c in columns: if c.output_name not in ck: cols.append(c) else: cols[ck[c.output_name]] = c return self.select(df, SelectColumns(*cols))
[docs] def aggregate( self, df: DataFrame, partition_spec: Optional[PartitionSpec], agg_cols: List[ColumnExpr], ): """Aggregate on dataframe :param df: the dataframe to aggregate on :param partition_spec: PartitionSpec to specify partition keys :param agg_cols: aggregation expressions :return: the aggregated result as a dataframe .. admonition:: New Since :class: hint **0.6.0** .. seealso:: Please find more expression examples in :mod:`fugue.column.sql` and :mod:`fugue.column.functions` .. admonition:: Examples .. code-block:: python import fugue.column.functions as f # SELECT MAX(b) AS b FROM df engine.aggregate( df, partition_spec=None, agg_cols=[f.max(col("b"))]) # SELECT a, MAX(b) AS x FROM df GROUP BY a engine.aggregate( df, partition_spec=PartitionSpec(by=["a"]), agg_cols=[f.max(col("b")).alias("x")]) """ assert_or_throw(len(agg_cols) > 0, ValueError("agg_cols can't be empty")) assert_or_throw( all(is_agg(x) for x in agg_cols), ValueError("all agg_cols must be aggregation functions"), ) keys: List[ColumnExpr] = [] if partition_spec is not None and len(partition_spec.partition_by) > 0: keys = [col(y) for y in partition_spec.partition_by] cols = SelectColumns(*keys, *agg_cols) return self.select(df, cols=cols)
[docs] def convert_yield_dataframe(self, df: DataFrame, as_local: bool) -> DataFrame: """Convert a yield dataframe to a dataframe that can be used after this execution engine stops. :param df: DataFrame :param as_local: whether yield a local dataframe :return: another DataFrame that can be used after this execution engine stops .. note:: By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid. For example, if you custom a spark engine where you start and stop the spark session in this engine's :meth:`~.start_engine` and :meth:`~.stop_engine`, then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops. """ return df.as_local() if as_local else df
[docs] def zip( self, dfs: DataFrames, how: str = "inner", partition_spec: Optional[PartitionSpec] = None, temp_path: Optional[str] = None, to_file_threshold: Any = -1, ) -> DataFrame: """Zip multiple dataframes together with given partition specifications. :param dfs: |DataFramesLikeObject| :param how: can accept ``inner``, ``left_outer``, ``right_outer``, ``full_outer``, ``cross``, defaults to ``inner`` :param partition_spec: |PartitionLikeObject|, defaults to empty. :param temp_path: file path to store the data (used only if the serialized data is larger than ``to_file_threshold``), defaults to None :param to_file_threshold: file byte size threshold, defaults to -1 :return: a zipped dataframe, the metadata of the dataframe will indicated it's zipped .. note:: * Different from join, dataframes can have common columns that you will not use as partition keys. * If ``by`` is not specified it will also use the common columns of all the dataframes (if it's not a cross zip) * For non-cross zip, the dataframes must have common columns, or error will be thrown .. note:: * Please also read :meth:`~.zip` * If ``dfs`` is dict like, the zipped dataframe will be dict like, If ``dfs`` is list like, the zipped dataframe will be list like * It's fine to contain only one dataframe in ``dfs`` .. seealso:: For more details and examples, read |ZipComap| """ assert_or_throw(len(dfs) > 0, "can't zip 0 dataframes") assert_or_throw( how in ["inner", "left_outer", "right_outer", "full_outer", "cross"], NotImplementedError(f"unsupported join type {how}"), ) partition_spec = partition_spec or PartitionSpec() on = list(partition_spec.partition_by) how = how.lower() if len(dfs) > 1: if len(on) == 0: if how != "cross": on = list( set.intersection(*[set(x.schema.names) for x in dfs.values()]) ) assert_or_throw(len(on) > 0, "no common columns found") else: assert_or_throw( how != "cross", InvalidOperationError("can't specify keys for cross join"), ) partition_spec = PartitionSpec(partition_spec, by=on) else: if len(on) == 0: partition_spec = PartitionSpec(num=1) else: partition_spec = PartitionSpec(partition_spec, by=on) pairs = list(dfs.items()) schemas: Dict[Any, Schema] = {} ser_dfs: List[DataFrame] = [] for i in range(len(dfs)): ser_dfs.append( self._serialize_by_partition( self.to_df(pairs[i][1]), partition_spec, i, pairs[i][0] if dfs.has_key else None, temp_path, to_file_threshold, ) ) schemas[pairs[i][0] if dfs.has_key else i] = pairs[i][1].schema res = ser_dfs[0] for i in range(1, len(dfs)): res = self.union(res, ser_dfs[i], distinct=False) metadata = dict( serialized=True, schemas=schemas, serialized_has_name=dfs.has_key, serialized_join_how=how, ) res.reset_metadata(metadata) return res
[docs] def comap( self, df: DataFrame, map_func: Callable[[PartitionCursor, DataFrames], LocalDataFrame], output_schema: Any, partition_spec: PartitionSpec, on_init: Optional[Callable[[int, DataFrames], Any]] = None, ): """Apply a function to each zipped partition on the zipped dataframe. :param df: input dataframe, it must be a zipped dataframe (it has to be a dataframe output from :meth:`~.zip` or :meth:`~.zip_all`) :param map_func: the function to apply on every zipped partition :param output_schema: |SchemaLikeObject| that can't be None. Please also understand :ref:`why we need this <tutorial:tutorials/beginner/interface:schema>` :param partition_spec: partition specification for processing the zipped zipped dataframe. :param on_init: callback function when the physical partition is initializaing, defaults to None :return: the dataframe after the comap operation .. note:: * The input of this method must be an output of :meth:`~.zip` or :meth:`~.zip_all` * The ``partition_spec`` here is NOT related with how you zipped the dataframe and however you set it, will only affect the processing speed, actually the partition keys will be overriden to the zipped dataframe partition keys. You may use it in this way to improve the efficiency: ``PartitionSpec(algo="even", num="ROWCOUNT")``, this tells the execution engine to put each zipped partition into a physical partition so it can achieve the best possible load balance. * If input dataframe has keys, the dataframes you get in ``map_func`` and ``on_init`` will have keys, otherwise you will get list-like dataframes * on_init function will get a DataFrames object that has the same structure, but has all empty dataframes, you can use the schemas but not the data. .. seealso:: For more details and examples, read |ZipComap| """ assert_or_throw(df.metadata["serialized"], ValueError("df is not serilaized")) key_schema = df.schema - _FUGUE_SERIALIZED_BLOB_SCHEMA cs = _Comap(df, key_schema, map_func, output_schema, on_init) partition_spec = PartitionSpec( partition_spec, by=key_schema.names + [_FUGUE_SERIALIZED_BLOB_DUMMY_COL], presort=_FUGUE_SERIALIZED_BLOB_NO_COL, ) return self.map_engine.map_dataframe( df, cs.run, output_schema, partition_spec, on_init=cs.on_init )
[docs] def load_yielded(self, df: Yielded) -> DataFrame: """Load yielded dataframe :param df: the yielded dataframe :return: an engine compatible dataframe """ if isinstance(df, PhysicalYielded): if df.storage_type == "file": return self.load_df(path=df.name) else: return self.sql_engine.load_table(table=df.name) else: return self.to_df(df.result) # type: ignore
[docs] @abstractmethod def load_df( self, path: Union[str, List[str]], format_hint: Any = None, columns: Any = None, **kwargs: Any, ) -> DataFrame: # pragma: no cover """Load dataframe from persistent storage :param path: the path to the dataframe :param format_hint: can accept ``parquet``, ``csv``, ``json``, defaults to None, meaning to infer :param columns: list of columns or a |SchemaLikeObject|, defaults to None :param kwargs: parameters to pass to the underlying framework :return: an engine compatible dataframe For more details and examples, read |ZipComap|. """ raise NotImplementedError
[docs] @abstractmethod def save_df( self, df: DataFrame, path: str, format_hint: Any = None, mode: str = "overwrite", partition_spec: Optional[PartitionSpec] = None, force_single: bool = False, **kwargs: Any, ) -> None: # pragma: no cover """Save dataframe to a persistent storage :param df: input dataframe :param path: output path :param format_hint: can accept ``parquet``, ``csv``, ``json``, defaults to None, meaning to infer :param mode: can accept ``overwrite``, ``append``, ``error``, defaults to "overwrite" :param partition_spec: how to partition the dataframe before saving, defaults to empty :param force_single: force the output as a single file, defaults to False :param kwargs: parameters to pass to the underlying framework For more details and examples, read |LoadSave|. """ raise NotImplementedError
def __copy__(self) -> "ExecutionEngine": return self def __deepcopy__(self, memo: Any) -> "ExecutionEngine": return self def _as_context(self) -> Iterator["ExecutionEngine"]: """Set this execution engine as the context engine. This function is thread safe and async safe. .. admonition:: Examples .. code-block:: python with engine.as_context(): transform(df, func) # will use engine in this transformation """ with _CONTEXT_LOCK: self._enter_context() token = _FUGUE_EXECUTION_ENGINE_CONTEXT.set(self) # type: ignore try: yield self finally: with _CONTEXT_LOCK: _FUGUE_EXECUTION_ENGINE_CONTEXT.reset(token) self._exit_context() def _enter_context(self): self.on_enter_context() self._ctx_count += 1 def _exit_context(self): self._ctx_count -= 1 self.on_exit_context() if self._ctx_count == 0: self.stop() def _serialize_by_partition( self, df: DataFrame, partition_spec: PartitionSpec, df_no: int, df_name: Optional[str], temp_path: Optional[str], to_file_threshold: Any, ) -> DataFrame: to_file_threshold = _get_file_threshold(to_file_threshold) on = list(filter(lambda k: k in df.schema, partition_spec.partition_by)) presort = list( filter(lambda p: p[0] in df.schema, partition_spec.presort.items()) ) if len(on) == 0: _partition_spec = PartitionSpec( partition_spec, num=1, by=[], presort=presort ) output_schema = _FUGUE_SERIALIZED_BLOB_SCHEMA else: _partition_spec = PartitionSpec(partition_spec, by=on, presort=presort) output_schema = ( partition_spec.get_key_schema(df.schema) + _FUGUE_SERIALIZED_BLOB_SCHEMA ) s = _PartitionSerializer( output_schema, df_no, df_name, temp_path, to_file_threshold ) return self.map_engine.map_dataframe(df, s.run, output_schema, _partition_spec)
[docs] @fugue_annotated_param(ExecutionEngine, "e", child_can_reuse_code=True) class ExecutionEngineParam(AnnotatedParam): def __init__( self, param: Optional[inspect.Parameter], ): super().__init__(param) self._type = self.annotation
[docs] def to_input(self, engine: Any) -> Any: assert_or_throw( isinstance(engine, self._type), FugueWorkflowRuntimeError(f"{engine} is not of type {self._type}"), ) return engine
def __uuid__(self) -> str: return to_uuid(self.code, self.annotation, self._type)
def _get_file_threshold(size: Any) -> int: if size is None: return -1 if isinstance(size, int): return size return to_size(size) class _PartitionSerializer: def __init__( self, output_schema: Schema, no: int, name: Optional[str], temp_path: Optional[str], to_file_threshold: int, ): self.output_schema = output_schema self.no = no self.name = name self.temp_path = temp_path self.to_file_threshold = to_file_threshold def run(self, cursor: PartitionCursor, df: LocalDataFrame) -> LocalDataFrame: data = serialize_df(df, self.to_file_threshold, self.temp_path) row = cursor.key_value_array + [data, self.no, self.name, 1] return ArrayDataFrame([row], self.output_schema) class _Comap: def __init__( self, df: DataFrame, key_schema: Schema, func: Callable, output_schema: Schema, on_init: Optional[Callable[[int, DataFrames], Any]], ): self.schemas = df.metadata["schemas"] self.key_schema = key_schema self.output_schema = output_schema self.dfs_count = len(self.schemas) self.named = df.metadata.get_or_throw("serialized_has_name", bool) self.func = func self.how = df.metadata.get_or_throw("serialized_join_how", str) self._on_init = on_init def on_init(self, partition_no, df: DataFrame) -> None: if self._on_init is None: return # TODO: currently, get_output_schema only gets empty dataframes empty_dfs = _generate_comap_empty_dfs(self.schemas, self.named) self._on_init(partition_no, empty_dfs) def run(self, cursor: PartitionCursor, df: LocalDataFrame) -> LocalDataFrame: data = df.as_dicts() if self.how == "inner": if len(data) < self.dfs_count: return ArrayDataFrame([], self.output_schema) elif self.how == "left_outer": if data[0][_FUGUE_SERIALIZED_BLOB_NO_COL] > 0: return ArrayDataFrame([], self.output_schema) elif self.how == "right_outer": if data[-1][_FUGUE_SERIALIZED_BLOB_NO_COL] != self.dfs_count - 1: return ArrayDataFrame([], self.output_schema) dfs = self._get_dfs(data) # construct a cursor without dummy col _c = PartitionSpec(by=self.key_schema.names).get_cursor( dfs[0].schema, cursor.physical_partition_no ) _c.set(lambda: dfs[0].peek_array(), cursor.partition_no, cursor.slice_no) return self.func(_c, dfs) def _get_dfs(self, rows: List[Dict[str, Any]]) -> DataFrames: tdfs: Dict[Any, DataFrame] = {} for row in rows: df = deserialize_df(row[_FUGUE_SERIALIZED_BLOB_COL]) # type: ignore if df is not None: if self.named: tdfs[row[_FUGUE_SERIALIZED_BLOB_NAME_COL]] = df else: tdfs[row[_FUGUE_SERIALIZED_BLOB_NO_COL]] = df dfs: Dict[Any, DataFrame] = {} for k, schema in self.schemas.items(): if k in tdfs: dfs[k] = tdfs[k] else: dfs[k] = ArrayDataFrame([], schema) return DataFrames(dfs) if self.named else DataFrames(list(dfs.values())) def _generate_comap_empty_dfs(schemas: Any, named: bool) -> DataFrames: if named: return DataFrames({k: ArrayDataFrame([], v) for k, v in schemas.items()}) else: return DataFrames([ArrayDataFrame([], v) for v in schemas.values()])