from typing import Any, Dict, Iterable, List, Optional, Tuple
import pandas as pd
import pyarrow as pa
from triad import assert_or_throw
from triad.collections.schema import Schema
from triad.utils.pandas_like import PD_UTILS
from triad.utils.pyarrow import pa_batch_to_dicts
from fugue.dataset.api import (
as_fugue_dataset,
as_local,
as_local_bounded,
count,
get_num_partitions,
is_bounded,
is_empty,
is_local,
)
from fugue.exceptions import FugueDataFrameOperationError
from .api import (
as_array,
as_array_iterable,
as_dict_iterable,
as_dicts,
drop_columns,
get_column_names,
get_schema,
head,
is_df,
rename,
select_columns,
)
from .dataframe import DataFrame, LocalBoundedDataFrame, _input_schema
[docs]
class PandasDataFrame(LocalBoundedDataFrame):
"""DataFrame that wraps pandas DataFrame. Please also read
|DataFrameTutorial| to understand this Fugue concept
:param df: 2-dimensional array, iterable of arrays or pandas DataFrame
:param schema: |SchemaLikeObject|
:param pandas_df_wrapper: if this is a simple wrapper, default False
.. admonition:: Examples
>>> PandasDataFrame([[0,'a'],[1,'b']],"a:int,b:str")
>>> PandasDataFrame(schema = "a:int,b:int") # empty dataframe
>>> PandasDataFrame(pd.DataFrame([[0]],columns=["a"]))
>>> PandasDataFrame(ArrayDataFrame([[0]],"a:int).as_pandas())
.. note::
If ``pandas_df_wrapper`` is True, then the constructor will not do any type
check otherwise, it will enforce type according to the input schema after
the construction
"""
def __init__( # noqa: C901
self,
df: Any = None,
schema: Any = None,
pandas_df_wrapper: bool = False,
):
apply_schema = True
if df is None:
schema = _input_schema(schema).assert_not_empty()
pdf = schema.create_empty_pandas_df(
use_extension_types=True, use_arrow_dtype=False
)
apply_schema = False
elif isinstance(df, PandasDataFrame):
# TODO: This is useless if in this way and wrong
pdf = df.native
schema = None
elif isinstance(df, (pd.DataFrame, pd.Series)):
if isinstance(df, pd.Series):
df = df.to_frame()
pdf = df
schema = None if schema is None else _input_schema(schema)
if pandas_df_wrapper and schema is not None:
apply_schema = False
elif isinstance(df, Iterable):
schema = _input_schema(schema).assert_not_empty()
pdf = pd.DataFrame(df, columns=schema.names)
pdf = PD_UTILS.cast_df(
pdf, schema.pa_schema, use_extension_types=True, use_arrow_dtype=False
)
apply_schema = False
else:
raise ValueError(f"{df} is incompatible with PandasDataFrame")
if apply_schema:
pdf, schema = self._apply_schema(pdf, schema)
super().__init__(schema)
self._native = pdf
@property
def native(self) -> pd.DataFrame:
"""Pandas DataFrame"""
return self._native
[docs]
def native_as_df(self) -> pd.DataFrame:
return self._native
@property
def empty(self) -> bool:
return self.native.empty
[docs]
def peek_array(self) -> List[Any]:
self.assert_not_empty()
return self.native.iloc[0].values.tolist()
[docs]
def count(self) -> int:
return self.native.shape[0]
[docs]
def as_pandas(self) -> pd.DataFrame:
return self._native
def _drop_cols(self, cols: List[str]) -> DataFrame:
cols = (self.schema - cols).names
return self._select_cols(cols)
def _select_cols(self, cols: List[Any]) -> DataFrame:
schema = self.schema.extract(cols)
return PandasDataFrame(
self.native[schema.names], schema, pandas_df_wrapper=True
)
[docs]
def rename(self, columns: Dict[str, str]) -> DataFrame:
try:
schema = self.schema.rename(columns)
except Exception as e:
raise FugueDataFrameOperationError from e
df = self.native.rename(columns=columns)
return PandasDataFrame(df, schema, pandas_df_wrapper=True)
[docs]
def alter_columns(self, columns: Any) -> DataFrame:
new_schema = self.schema.alter(columns)
if new_schema == self.schema:
return self
return PandasDataFrame(self.native, new_schema)
[docs]
def as_arrow(self, type_safe: bool = False) -> pa.Table:
return PD_UTILS.as_arrow(self.native, schema=self.schema.pa_schema)
[docs]
def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return list(self.as_array_iterable(columns, type_safe=type_safe))
[docs]
def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
for row in PD_UTILS.as_array_iterable(
self.native,
schema=self.schema.pa_schema,
columns=columns,
type_safe=type_safe,
):
yield row
[docs]
def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for block in _to_dicts(self.native, columns, self.schema):
res += block
return res
[docs]
def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
for block in _to_dicts(self.native, columns, self.schema):
yield from block
[docs]
def head(
self, n: int, columns: Optional[List[str]] = None
) -> LocalBoundedDataFrame:
pdf = self.native if columns is None else self.native[columns]
schema = self.schema if columns is None else self.schema.extract(columns)
return PandasDataFrame(pdf.head(n), schema=schema, pandas_df_wrapper=True)
def _apply_schema(
self, pdf: pd.DataFrame, schema: Optional[Schema]
) -> Tuple[pd.DataFrame, Schema]:
PD_UTILS.ensure_compatible(pdf)
pschema = _input_schema(pdf)
if schema is None or pschema == schema:
return pdf, pschema.assert_not_empty()
pdf = pdf[schema.assert_not_empty().names]
return (
PD_UTILS.cast_df(
pdf, schema.pa_schema, use_extension_types=True, use_arrow_dtype=False
),
schema,
)
@as_local.candidate(lambda df: isinstance(df, pd.DataFrame))
def _pd_as_local(df: pd.DataFrame) -> pd.DataFrame:
return df
@as_local_bounded.candidate(lambda df: isinstance(df, pd.DataFrame))
def _pd_as_local_bounded(df: pd.DataFrame) -> pd.DataFrame:
return df
@as_fugue_dataset.candidate(lambda df, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_fugue_df(df: pd.DataFrame, **kwargs: Any) -> "PandasDataFrame":
return PandasDataFrame(df, **kwargs)
@is_df.candidate(lambda df: isinstance(df, pd.DataFrame))
def _pd_is_df(df: pd.DataFrame) -> bool:
return True
@count.candidate(lambda df: isinstance(df, pd.DataFrame))
def _pd_count(df: pd.DataFrame) -> int:
return df.shape[0]
@is_bounded.candidate(lambda df: isinstance(df, pd.DataFrame))
def _pd_is_bounded(df: pd.DataFrame) -> bool:
return True
@is_empty.candidate(lambda df: isinstance(df, pd.DataFrame))
def _pd_is_empty(df: pd.DataFrame) -> bool:
return df.shape[0] == 0
@is_local.candidate(lambda df: isinstance(df, pd.DataFrame))
def _pd_is_local(df: pd.DataFrame) -> bool:
return True
@get_num_partitions.candidate(lambda df: isinstance(df, pd.DataFrame))
def _get_pandas_num_partitions(df: pd.DataFrame) -> int:
return 1
@get_column_names.candidate(lambda df: isinstance(df, pd.DataFrame))
def _get_pandas_dataframe_columns(df: pd.DataFrame) -> List[Any]:
return list(df.columns)
@get_schema.candidate(lambda df: isinstance(df, pd.DataFrame))
def _get_pandas_dataframe_schema(df: pd.DataFrame) -> Schema:
return Schema(df)
@rename.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _rename_pandas_dataframe(
df: pd.DataFrame, columns: Dict[str, Any], as_fugue: bool = False
) -> Any:
if len(columns) == 0:
return df
_assert_no_missing(df, columns.keys())
return _adjust_df(df.rename(columns=columns), as_fugue=as_fugue)
@drop_columns.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _drop_pd_columns(
df: pd.DataFrame, columns: List[str], as_fugue: bool = False
) -> Any:
cols = [x for x in df.columns if x not in columns]
if len(cols) == 0:
raise FugueDataFrameOperationError("cannot drop all columns")
if len(cols) + len(columns) != len(df.columns):
_assert_no_missing(df, columns)
return _adjust_df(df[cols], as_fugue=as_fugue)
@select_columns.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _select_pd_columns(
df: pd.DataFrame, columns: List[Any], as_fugue: bool = False
) -> Any:
if len(columns) == 0:
raise FugueDataFrameOperationError("must select at least one column")
_assert_no_missing(df, columns)
return _adjust_df(df[columns], as_fugue=as_fugue)
@head.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_head(
df: pd.DataFrame,
n: int,
columns: Optional[List[str]] = None,
as_fugue: bool = False,
) -> pd.DataFrame:
if columns is not None:
df = df[columns]
return _adjust_df(df.head(n), as_fugue=as_fugue)
@as_array.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_array(
df: pd.DataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return list(_pd_as_array_iterable(df, columns, type_safe=type_safe))
@as_array_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_array_iterable(
df: pd.DataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
for row in PD_UTILS.as_array_iterable(
df,
columns=columns,
type_safe=type_safe,
):
yield row
@as_dicts.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_dicts(
df: pd.DataFrame, columns: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for block in _to_dicts(df, columns):
res += block
return res
@as_dict_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_dict_iterable(
df: pa.Table, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
for block in _to_dicts(df, columns):
yield from block
def _adjust_df(res: pd.DataFrame, as_fugue: bool):
return res if not as_fugue else PandasDataFrame(res)
def _assert_no_missing(df: pd.DataFrame, columns: Iterable[Any]) -> None:
missing = [x for x in columns if x not in df.columns]
if len(missing) > 0:
raise FugueDataFrameOperationError("found nonexistent columns: {missing}")
def _to_dicts(
df: pd.DataFrame,
columns: Optional[List[str]] = None,
schema: Optional[Schema] = None,
) -> Iterable[List[Dict[str, Any]]]:
cols = list(df.columns) if columns is None else columns
assert_or_throw(len(cols) > 0, ValueError("columns cannot be empty"))
pa_schema = schema.extract(cols).pa_schema if schema is not None else None
adf = PD_UTILS.as_arrow(df[cols], schema=pa_schema)
for batch in adf.to_batches():
if batch.num_rows > 0:
yield pa_batch_to_dicts(batch)