import itertools
import logging
from abc import abstractmethod
from typing import Any, Callable, Dict, List, Optional, Type
import ibis
from ibis import BaseBackend
from triad import assert_or_throw
from fugue import StructuredRawSQL
from fugue.bag import Bag, LocalBag
from fugue.collections.partition import (
BagPartitionCursor,
PartitionCursor,
PartitionSpec,
parse_presort_exp,
)
from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame
from fugue.dataframe.utils import get_join_schemas
from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine
from ._compat import IbisTable
from ._utils import to_ibis_schema
from .dataframe import IbisDataFrame
_JOIN_RIGHT_SUFFIX = "_ibis_y__"
_GEN_TABLE_NAMES = (f"_fugue_temp_table_{i:d}" for i in itertools.count())
[docs]
class IbisSQLEngine(SQLEngine):
"""Ibis SQL backend base implementation.
:param execution_engine: the execution engine this sql engine will run on
"""
@property
@abstractmethod
def backend(self) -> BaseBackend: # pragma: no cover
raise NotImplementedError
[docs]
def select(self, dfs: DataFrames, statement: StructuredRawSQL) -> DataFrame:
return self.to_df(
self.query_to_table(
statement.construct(dialect=self.dialect, log=self.log),
dfs,
)
)
[docs]
def query_to_table(self, statement: str, dfs: Dict[str, Any]) -> IbisTable:
cte: List[str] = []
for k, v in dfs.items():
idf = self.to_df(v)
cte.append(k + " AS (" + idf.to_sql() + ")") # type: ignore
if len(cte) > 0:
sql = "WITH " + ",\n".join(cte) + "\n" + statement
else:
sql = statement
return self.backend.sql(sql)
[docs]
@abstractmethod
def encode_column_name(self, name: str) -> str: # pragma: no cover
raise NotImplementedError
[docs]
def get_temp_table_name(self) -> str:
return next(_GEN_TABLE_NAMES)
[docs]
@abstractmethod
def persist(
self, df: DataFrame, lazy: bool = False, **kwargs: Any
) -> DataFrame: # pragma: no cover
raise NotImplementedError
[docs]
@abstractmethod
def sample(
self,
df: DataFrame,
n: Optional[int] = None,
frac: Optional[float] = None,
replace: bool = False,
seed: Optional[int] = None,
) -> DataFrame: # pragma: no cover
raise NotImplementedError
[docs]
def join(
self,
df1: DataFrame,
df2: DataFrame,
how: str,
on: Optional[List[str]] = None,
) -> DataFrame:
_df1 = self.to_df(df1)
_df2 = self.to_df(df2)
key_schema, end_schema = get_join_schemas(_df1, _df2, how=how, on=on)
on_fields = [_df1.native[k] == _df2.native[k] for k in key_schema]
if ibis.__version__ < "6": # pragma: no cover
suffixes: Dict[str, Any] = dict(suffixes=("", _JOIN_RIGHT_SUFFIX))
else:
# breaking change in ibis 6.0
suffixes = dict(lname="", rname="{name}" + _JOIN_RIGHT_SUFFIX)
if how.lower() == "cross":
tb = _df1.native.cross_join(_df2.native, **suffixes)
elif how.lower() == "right_outer":
tb = _df2.native.left_join(_df1.native, on_fields, **suffixes)
elif how.lower() == "left_outer":
tb = _df1.native.left_join(_df2.native, on_fields, **suffixes)
elif how.lower() == "full_outer":
tb = _df1.native.outer_join(_df2.native, on_fields, **suffixes)
cols: List[Any] = []
for k in end_schema.names:
if k not in key_schema:
cols.append(k)
else:
cols.append(
ibis.coalesce(tb[k], tb[k + _JOIN_RIGHT_SUFFIX]).name(k)
)
tb = tb[cols]
elif how.lower() in ["semi", "left_semi"]:
tb = _df1.native.semi_join(_df2.native, on_fields, **suffixes)
elif how.lower() in ["anti", "left_anti"]:
tb = _df1.native.anti_join(_df2.native, on_fields, **suffixes)
else:
tb = _df1.native.inner_join(_df2.native, on_fields, **suffixes)
return self.to_df(tb[end_schema.names], schema=end_schema)
[docs]
def union(self, df1: DataFrame, df2: DataFrame, distinct: bool = True) -> DataFrame:
_df1 = self.to_df(df1)
_df2 = self.to_df(df2)
tb = _df1.native.union(_df2.native, distinct=distinct)
return self.to_df(tb, df1.schema)
[docs]
def subtract(
self, df1: DataFrame, df2: DataFrame, distinct: bool = True
) -> DataFrame:
_df1 = self.to_df(df1)
_df2 = self.to_df(df2)
tb = _df1.native.difference(_df2.native, distinct=distinct)
return self.to_df(tb, df1.schema)
[docs]
def intersect(
self, df1: DataFrame, df2: DataFrame, distinct: bool = True
) -> DataFrame:
_df1 = self.to_df(df1)
_df2 = self.to_df(df2)
tb = _df1.native.intersect(_df2.native, distinct=distinct)
return self.to_df(tb, df1.schema)
[docs]
def distinct(self, df: DataFrame) -> DataFrame:
_df = self.to_df(df)
tb = _df.native.distinct()
return self.to_df(tb, df.schema)
[docs]
def dropna(
self,
df: DataFrame,
how: str = "any",
thresh: int = None,
subset: Optional[List[str]] = None,
) -> DataFrame:
schema = df.schema
if subset is not None:
schema = schema.extract(subset)
_df = self.to_df(df)
if thresh is None:
tb = _df.native.dropna(subset=subset, how=how)
return self.to_df(tb, df.schema)
assert_or_throw(
how == "any", ValueError("when thresh is set, how must be 'any'")
)
sm = None
for col in schema.names:
expr = _df.native[col].isnull().ifelse(0, 1)
if sm is None:
sm = expr
else:
sm = sm + expr
tb = _df.native.filter(sm >= ibis.literal(thresh))
return self.to_df(tb, df.schema)
[docs]
def fillna(self, df: DataFrame, value: Any, subset: List[str] = None) -> DataFrame:
def _build_value_dict(names: List[str]) -> Dict[str, str]:
if not isinstance(value, dict):
return {n: value for n in names}
else:
return {n: value[n] for n in names}
names = df.columns
if isinstance(value, dict):
# subset should be ignored
names = list(value.keys())
elif subset is not None:
st = set(names)
assert_or_throw(
st.issuperset(subset),
ValueError(f"{names} is not a superset of {subset}"),
)
names = subset
vd = _build_value_dict(names)
assert_or_throw(
all(v is not None for v in vd.values()),
ValueError("fillna value can not be None or contain None"),
)
tb = self.to_df(df).native
cols = [
ibis.coalesce(tb[f], ibis.literal(vd[f])).name(f) if f in names else tb[f]
for f in df.columns
]
return self.to_df(tb[cols], schema=df.schema)
[docs]
def take(
self,
df: DataFrame,
n: int,
presort: str,
na_position: str = "last",
partition_spec: Optional[PartitionSpec] = None,
) -> DataFrame:
partition_spec = partition_spec or PartitionSpec()
assert_or_throw(
isinstance(n, int),
ValueError("n needs to be an integer"),
)
if presort is not None and presort != "":
_presort = parse_presort_exp(presort)
else:
_presort = partition_spec.presort
tbn = "_temp"
idf = self.to_df(df)
if len(_presort) == 0:
if len(partition_spec.partition_by) == 0:
return self.to_df(idf.native.head(n), schema=df.schema)
pcols = ", ".join(
self.encode_column_name(x) for x in partition_spec.partition_by
)
sql = (
f"SELECT * FROM ("
f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols}) "
f"AS __fugue_take_param FROM {tbn}"
f") WHERE __fugue_take_param<={n}"
)
tb = self.query_to_table(sql, {tbn: idf})
return self.to_df(tb[df.columns], schema=df.schema)
sorts: List[str] = []
for k, v in _presort.items():
s = self.encode_column_name(k)
s += " ASC" if v else " DESC"
s += " NULLS FIRST" if na_position == "first" else " NULLS LAST"
sorts.append(s)
sort_expr = "ORDER BY " + ", ".join(sorts)
if len(partition_spec.partition_by) == 0:
sql = f"SELECT * FROM {tbn} {sort_expr} LIMIT {n}"
tb = self.query_to_table(sql, {tbn: idf})
return self.to_df(tb[df.columns], schema=df.schema)
pcols = ", ".join(
self.encode_column_name(x) for x in partition_spec.partition_by
)
sql = (
f"SELECT * FROM ("
f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols} {sort_expr}) "
f"AS __fugue_take_param FROM {tbn}"
f") WHERE __fugue_take_param<={n}"
)
tb = self.query_to_table(sql, {tbn: idf})
return self.to_df(tb[df.columns], schema=df.schema)
[docs]
def table_exists(self, table: str) -> bool:
return table in self.backend.list_tables()
[docs]
def save_table(
self,
df: DataFrame,
table: str,
mode: str = "overwrite",
partition_spec: Optional[PartitionSpec] = None,
**kwargs: Any,
) -> None:
if mode == "overwrite":
self.backend.drop_table(table, force=True)
if isinstance(df, IbisDataFrame):
self.backend.create_table(table, df.native)
else:
self.backend.create_table(
table, df.as_pandas(), schema=to_ibis_schema(df.schema)
)
[docs]
def load_table(self, table: str, **kwargs: Any) -> DataFrame:
return self.to_df(self.backend.table(table))
[docs]
class IbisMapEngine(MapEngine):
"""IbisExecutionEngine's MapEngine, it is a wrapper of the map engine
of :meth:`~.IbisExecutionEngine.non_ibis_engine`
:param execution_engine: the execution engine this map engine will run on
"""
@property
def is_distributed(self) -> bool:
return self._ibis_engine.non_ibis_engine.map_engine.is_distributed
def __init__(self, execution_engine: ExecutionEngine) -> None:
super().__init__(execution_engine)
self._ibis_engine: IbisExecutionEngine = execution_engine # type: ignore
@property
def execution_engine_constraint(self) -> Type[ExecutionEngine]:
return IbisExecutionEngine
[docs]
def map_dataframe(
self,
df: DataFrame,
map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame],
output_schema: Any,
partition_spec: PartitionSpec,
on_init: Optional[Callable[[int, DataFrame], Any]] = None,
map_func_format_hint: Optional[str] = None,
) -> DataFrame:
_df = self._ibis_engine._to_non_ibis_dataframe(df)
return self._ibis_engine.non_ibis_engine.map_engine.map_dataframe(
_df,
map_func=map_func,
output_schema=output_schema,
partition_spec=partition_spec,
on_init=on_init,
map_func_format_hint=map_func_format_hint,
)
[docs]
def map_bag(
self,
bag: Bag,
map_func: Callable[[BagPartitionCursor, LocalBag], LocalBag],
partition_spec: PartitionSpec,
on_init: Optional[Callable[[int, Bag], Any]] = None,
) -> Bag: # pragma: no cover
return self._ibis_engine.non_ibis_engine.map_engine.map_bag(
bag, map_func, partition_spec, on_init
)
[docs]
class IbisExecutionEngine(ExecutionEngine):
"""The base execution engine using Ibis.
Please read |ExecutionEngineTutorial| to understand this important Fugue concept
:param conf: |ParamsLikeObject|, read |FugueConfig| to learn Fugue specific options
"""
def __init__(self, conf: Any):
super().__init__(conf)
self._non_ibis_engine = self.create_non_ibis_execution_engine()
[docs]
@abstractmethod
def create_non_ibis_execution_engine(self) -> ExecutionEngine:
"""Create the execution engine that handles operations beyond SQL"""
raise NotImplementedError # pragma: no cover
[docs]
def is_non_ibis(self, ds: Any) -> bool:
return not isinstance(ds, (IbisDataFrame, IbisTable))
@property
def non_ibis_engine(self) -> ExecutionEngine:
return self._non_ibis_engine
@property
def ibis_sql_engine(self) -> IbisSQLEngine:
return self.sql_engine # type: ignore
[docs]
def create_default_map_engine(self) -> MapEngine:
return IbisMapEngine(self)
@property
def log(self) -> logging.Logger:
return self.non_ibis_engine.log
[docs]
def get_current_parallelism(self) -> int:
return self.non_ibis_engine.get_current_parallelism()
[docs]
def to_df(self, df: Any, schema: Any = None) -> DataFrame:
if self.is_non_ibis(df):
return self._to_non_ibis_dataframe(df, schema)
return self._to_ibis_dataframe(df, schema=schema)
[docs]
def repartition(self, df: DataFrame, partition_spec: PartitionSpec) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.repartition(df, partition_spec=partition_spec)
self.log.warning("%s doesn't respect repartition", self)
return df
[docs]
def broadcast(self, df: DataFrame) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.broadcast(df)
return df
[docs]
def persist(
self,
df: DataFrame,
lazy: bool = False,
**kwargs: Any,
) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.persist(df, lazy=lazy, **kwargs)
return self.ibis_sql_engine.persist(df, lazy=lazy, **kwargs)
[docs]
def join(
self,
df1: DataFrame,
df2: DataFrame,
how: str,
on: Optional[List[str]] = None,
) -> DataFrame:
if self.is_non_ibis(df1) and self.is_non_ibis(df2):
return self.non_ibis_engine.join(df1, df2, how=how, on=on)
return self.ibis_sql_engine.join(df1, df2, how=how, on=on)
[docs]
def union(self, df1: DataFrame, df2: DataFrame, distinct: bool = True) -> DataFrame:
if self.is_non_ibis(df1) and self.is_non_ibis(df2):
return self.non_ibis_engine.union(df1, df2, distinct=distinct)
return self.ibis_sql_engine.union(df1, df2, distinct=distinct)
[docs]
def subtract(
self, df1: DataFrame, df2: DataFrame, distinct: bool = True
) -> DataFrame:
if self.is_non_ibis(df1) and self.is_non_ibis(df2):
return self.non_ibis_engine.subtract(df1, df2, distinct=distinct)
return self.ibis_sql_engine.subtract(df1, df2, distinct=distinct)
[docs]
def intersect(
self, df1: DataFrame, df2: DataFrame, distinct: bool = True
) -> DataFrame:
if self.is_non_ibis(df1) and self.is_non_ibis(df2):
return self.non_ibis_engine.intersect(df1, df2, distinct=distinct)
return self.ibis_sql_engine.intersect(df1, df2, distinct=distinct)
[docs]
def distinct(self, df: DataFrame) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.distinct(df)
return self.ibis_sql_engine.distinct(df)
[docs]
def dropna(
self,
df: DataFrame,
how: str = "any",
thresh: int = None,
subset: Optional[List[str]] = None,
) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.dropna(
df, how=how, thresh=thresh, subset=subset
)
return self.ibis_sql_engine.dropna(df, how=how, thresh=thresh, subset=subset)
[docs]
def fillna(self, df: DataFrame, value: Any, subset: List[str] = None) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.fillna(df, value=value, subset=subset)
return self.ibis_sql_engine.fillna(df, value=value, subset=subset)
[docs]
def sample(
self,
df: DataFrame,
n: Optional[int] = None,
frac: Optional[float] = None,
replace: bool = False,
seed: Optional[int] = None,
) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.sample(
df,
n=n,
frac=frac,
replace=replace,
seed=seed,
)
return self.ibis_sql_engine.sample(
df,
n=n,
frac=frac,
replace=replace,
seed=seed,
)
[docs]
def take(
self,
df: DataFrame,
n: int,
presort: str,
na_position: str = "last",
partition_spec: Optional[PartitionSpec] = None,
) -> DataFrame:
if self.is_non_ibis(df):
return self.non_ibis_engine.take(
df,
n=n,
presort=presort,
na_position=na_position,
partition_spec=partition_spec,
)
return self.ibis_sql_engine.take(
df,
n=n,
presort=presort,
na_position=na_position,
partition_spec=partition_spec,
)
def _to_ibis_dataframe(self, df: Any, schema: Any = None) -> DataFrame:
"""Create ``IbisDataFrame`` from the dataframe like input
:param df: dataframe like object
:param schema: dataframe schema, defaults to None
:return: the IbisDataFrame
"""
return self.sql_engine.to_df(df, schema=schema)
def _to_non_ibis_dataframe(
self, df: Any, schema: Any = None
) -> DataFrame: # pragma: no cover
"""Create ``DataFrame`` for map operations
from the dataframe like input
:param df: dataframe like object
:param schema: dataframe schema, defaults to None
:return: the DataFrame
"""
return self.non_ibis_engine.to_df(df, schema)