Source code for fugue.execution.native_execution_engine

import logging
import os
from typing import Any, Callable, Dict, List, Optional, Type, Union

import numpy as np
import pandas as pd
from triad import Schema
from triad.collections.dict import IndexedOrderedDict
from triad.utils.assertion import assert_or_throw
from triad.utils.io import makedirs
from triad.utils.pandas_like import PandasUtils

from fugue._utils.io import load_df, save_df
from fugue._utils.misc import import_fsql_dependency
from fugue.collections.partition import (
    PartitionCursor,
    PartitionSpec,
    parse_presort_exp,
)
from fugue.collections.sql import StructuredRawSQL
from fugue.constants import KEYWORD_PARALLELISM, KEYWORD_ROWCOUNT
from fugue.dataframe import (
    AnyDataFrame,
    DataFrame,
    DataFrames,
    LocalBoundedDataFrame,
    LocalDataFrame,
    PandasDataFrame,
    fugue_annotated_param,
)
from fugue.dataframe.dataframe import as_fugue_df
from fugue.dataframe.utils import get_join_schemas

from .execution_engine import (
    ExecutionEngine,
    ExecutionEngineParam,
    MapEngine,
    SQLEngine,
)


[docs] class QPDPandasEngine(SQLEngine): """QPD execution implementation. :param execution_engine: the execution engine this sql engine will run on """ @property def dialect(self) -> Optional[str]: return "spark"
[docs] def to_df(self, df: AnyDataFrame, schema: Any = None) -> DataFrame: return _to_native_execution_engine_df(df, schema)
@property def is_distributed(self) -> bool: return False
[docs] def select(self, dfs: DataFrames, statement: StructuredRawSQL) -> DataFrame: qpd_pandas = import_fsql_dependency("qpd_pandas") _dfs, _sql = self.encode(dfs, statement) _dd = {k: self.to_df(v).as_pandas() for k, v in _dfs.items()} # type: ignore df = qpd_pandas.run_sql_on_pandas(_sql, _dd, ignore_case=True) return self.to_df(df)
[docs] class PandasMapEngine(MapEngine): @property def execution_engine_constraint(self) -> Type[ExecutionEngine]: return NativeExecutionEngine
[docs] def to_df(self, df: AnyDataFrame, schema: Any = None) -> DataFrame: return _to_native_execution_engine_df(df, schema)
@property def is_distributed(self) -> bool: return False
[docs] def map_dataframe( self, df: DataFrame, map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame], output_schema: Any, partition_spec: PartitionSpec, on_init: Optional[Callable[[int, DataFrame], Any]] = None, map_func_format_hint: Optional[str] = None, ) -> DataFrame: # if partition_spec.num_partitions != "0": # self.log.warning( # "%s doesn't respect num_partitions %s", # self, # partition_spec.num_partitions, # ) is_coarse = partition_spec.algo == "coarse" presort = partition_spec.get_sorts(df.schema, with_partition_keys=is_coarse) presort_keys = list(presort.keys()) presort_asc = list(presort.values()) output_schema = Schema(output_schema) cursor = partition_spec.get_cursor(df.schema, 0) if on_init is not None: on_init(0, df) if ( len(partition_spec.partition_by) == 0 or partition_spec.algo == "coarse" ): # no partition if len(partition_spec.presort) > 0: pdf = ( df.as_pandas() .sort_values(presort_keys, ascending=presort_asc) .reset_index(drop=True) ) input_df: LocalDataFrame = PandasDataFrame( pdf, df.schema, pandas_df_wrapper=True ) else: input_df = df.as_local() if ( len(partition_spec.partition_by) == 0 and partition_spec.num_partitions != "0" ): partitions = partition_spec.get_num_partitions( **{ KEYWORD_ROWCOUNT: lambda: df.count(), # type: ignore KEYWORD_PARALLELISM: lambda: 1, } ) dfs: List[DataFrame] = [] for p, subdf in enumerate( np.array_split(input_df.as_pandas(), partitions) ): if len(subdf) > 0: tdf = PandasDataFrame(subdf, df.schema, pandas_df_wrapper=True) cursor.set(lambda: tdf.peek_array(), p, 0) dfs.append(map_func(cursor, tdf).as_pandas()) output_df: LocalDataFrame = PandasDataFrame( pd.concat(dfs, ignore_index=True), schema=output_schema, pandas_df_wrapper=True, ) else: cursor.set(lambda: input_df.peek_array(), 0, 0) output_df = map_func(cursor, input_df) if ( isinstance(output_df, PandasDataFrame) and output_df.schema != output_schema ): # pragma: no cover output_df = PandasDataFrame(output_df.native, output_schema) assert_or_throw( output_df.schema == output_schema, lambda: f"map output {output_df.schema} " f"mismatches given {output_schema}", ) return self.to_df(output_df) # type: ignore def _map(pdf: pd.DataFrame) -> pd.DataFrame: if len(partition_spec.presort) > 0: pdf = pdf.sort_values(presort_keys, ascending=presort_asc).reset_index( drop=True ) input_df = PandasDataFrame(pdf, df.schema, pandas_df_wrapper=True) cursor.set(lambda: input_df.peek_array(), cursor.partition_no + 1, 0) output_df = map_func(cursor, input_df) return output_df.as_pandas() result = self.execution_engine.pl_utils.safe_groupby_apply( # type: ignore df.as_pandas(), partition_spec.partition_by, _map ) return PandasDataFrame(result, output_schema)
[docs] class NativeExecutionEngine(ExecutionEngine): """The execution engine based on native python and pandas. This execution engine is mainly for prototyping and unit tests. Please read |ExecutionEngineTutorial| to understand this important Fugue concept :param conf: |ParamsLikeObject|, read |FugueConfig| to learn Fugue specific options """ def __init__(self, conf: Any = None): super().__init__(conf) self._log = logging.getLogger() def __repr__(self) -> str: return "NativeExecutionEngine" @property def log(self) -> logging.Logger: return self._log @property def is_distributed(self) -> bool: return False
[docs] def create_default_sql_engine(self) -> SQLEngine: return QPDPandasEngine(self)
[docs] def create_default_map_engine(self) -> MapEngine: return PandasMapEngine(self)
[docs] def get_current_parallelism(self) -> int: return 1
@property def pl_utils(self) -> PandasUtils: """Pandas-like dataframe utils""" return PandasUtils()
[docs] def to_df(self, df: AnyDataFrame, schema: Any = None) -> LocalBoundedDataFrame: return _to_native_execution_engine_df(df, schema) # type: ignore
[docs] def repartition( self, df: DataFrame, partition_spec: PartitionSpec ) -> DataFrame: # pragma: no cover # self.log.warning("%s doesn't respect repartition", self) return df
[docs] def broadcast(self, df: DataFrame) -> DataFrame: return self.to_df(df)
[docs] def persist( self, df: DataFrame, lazy: bool = False, **kwargs: Any, ) -> DataFrame: return self.to_df(df)
[docs] def join( self, df1: DataFrame, df2: DataFrame, how: str, on: Optional[List[str]] = None, ) -> DataFrame: key_schema, output_schema = get_join_schemas(df1, df2, how=how, on=on) d = self.pl_utils.join( df1.as_pandas(), df2.as_pandas(), join_type=how, on=key_schema.names ) return PandasDataFrame(d.reset_index(drop=True), output_schema)
[docs] def union( self, df1: DataFrame, df2: DataFrame, distinct: bool = True, ) -> DataFrame: assert_or_throw( df1.schema == df2.schema, lambda: ValueError(f"{df1.schema} != {df2.schema}"), ) d = self.pl_utils.union(df1.as_pandas(), df2.as_pandas(), unique=distinct) return PandasDataFrame(d.reset_index(drop=True), df1.schema)
[docs] def subtract( self, df1: DataFrame, df2: DataFrame, distinct: bool = True, ) -> DataFrame: assert_or_throw( distinct, NotImplementedError("EXCEPT ALL for NativeExecutionEngine") ) assert_or_throw( df1.schema == df2.schema, lambda: ValueError(f"{df1.schema} != {df2.schema}"), ) d = self.pl_utils.except_df(df1.as_pandas(), df2.as_pandas(), unique=distinct) return PandasDataFrame(d.reset_index(drop=True), df1.schema)
[docs] def intersect( self, df1: DataFrame, df2: DataFrame, distinct: bool = True, ) -> DataFrame: assert_or_throw( distinct, NotImplementedError("INTERSECT ALL for NativeExecutionEngine") ) assert_or_throw( df1.schema == df2.schema, lambda: ValueError(f"{df1.schema} != {df2.schema}"), ) d = self.pl_utils.intersect(df1.as_pandas(), df2.as_pandas(), unique=distinct) return PandasDataFrame(d.reset_index(drop=True), df1.schema)
[docs] def distinct( self, df: DataFrame, ) -> DataFrame: d = self.pl_utils.drop_duplicates(df.as_pandas()) return PandasDataFrame(d.reset_index(drop=True), df.schema)
[docs] def dropna( self, df: DataFrame, how: str = "any", thresh: Optional[int] = None, subset: List[str] = None, ) -> DataFrame: kwargs: Dict[str, Any] = dict(axis=0, subset=subset, inplace=False) if thresh is None: kwargs["how"] = how else: kwargs["thresh"] = thresh d = df.as_pandas().dropna(**kwargs) return PandasDataFrame(d.reset_index(drop=True), df.schema)
[docs] def fillna( self, df: DataFrame, value: Any, subset: List[str] = None, ) -> DataFrame: assert_or_throw( (not isinstance(value, list)) and (value is not None), ValueError("fillna value can not None or a list"), ) if isinstance(value, dict): assert_or_throw( (None not in value.values()) and (any(value.values())), ValueError( "fillna dict can not contain None and needs at least one value" ), ) mapping = value else: # If subset is none, apply to all columns subset = subset or df.columns mapping = {col: value for col in subset} d = df.as_pandas().fillna(mapping, inplace=False) return PandasDataFrame(d.reset_index(drop=True), df.schema)
[docs] def sample( self, df: DataFrame, n: Optional[int] = None, frac: Optional[float] = None, replace: bool = False, seed: Optional[int] = None, ) -> DataFrame: assert_or_throw( (n is None and frac is not None) or (n is not None and frac is None), ValueError("one and only one of n and frac should be set"), ) d = df.as_pandas().sample(n=n, frac=frac, replace=replace, random_state=seed) return PandasDataFrame(d.reset_index(drop=True), df.schema)
[docs] def take( self, df: DataFrame, n: int, presort: str, na_position: str = "last", partition_spec: Optional[PartitionSpec] = None, ) -> DataFrame: partition_spec = partition_spec or PartitionSpec() assert_or_throw( isinstance(n, int), ValueError("n needs to be an integer"), ) d = df.as_pandas() # Use presort over partition_spec.presort if possible if presort: presort = parse_presort_exp(presort) _presort: IndexedOrderedDict = presort or partition_spec.presort if len(_presort.keys()) > 0: d = d.sort_values( list(_presort.keys()), ascending=list(_presort.values()), na_position=na_position, ) if len(partition_spec.partition_by) == 0: d = d.head(n) else: d = d.groupby(by=partition_spec.partition_by, dropna=False).head(n) return PandasDataFrame( d.reset_index(drop=True), df.schema, pandas_df_wrapper=True )
[docs] def load_df( self, path: Union[str, List[str]], format_hint: Any = None, columns: Any = None, **kwargs: Any, ) -> LocalBoundedDataFrame: return self.to_df( load_df(path, format_hint=format_hint, columns=columns, **kwargs) )
[docs] def save_df( self, df: DataFrame, path: str, format_hint: Any = None, mode: str = "overwrite", partition_spec: Optional[PartitionSpec] = None, force_single: bool = False, **kwargs: Any, ) -> None: partition_spec = partition_spec or PartitionSpec() if not force_single and not partition_spec.empty: kwargs["partition_cols"] = partition_spec.partition_by makedirs(os.path.dirname(path), exist_ok=True) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, **kwargs)
@fugue_annotated_param(NativeExecutionEngine) class _NativeExecutionEngineParam(ExecutionEngineParam): pass def _to_native_execution_engine_df(df: AnyDataFrame, schema: Any = None) -> DataFrame: fdf = as_fugue_df(df) if schema is None else as_fugue_df(df, schema=schema) return fdf.as_local_bounded()