import json
from abc import abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union
import pandas as pd
import pyarrow as pa
from triad import SerializableRLock
from triad.collections.schema import Schema
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.pandas_like import PD_UTILS
from triad.utils.pyarrow import cast_pa_table
from .._utils.display import PrettyTable
from ..collections.yielded import Yielded
from ..dataset import (
Dataset,
DatasetDisplay,
as_fugue_dataset,
as_local,
as_local_bounded,
get_dataset_display,
)
from ..exceptions import FugueDataFrameOperationError
AnyDataFrame = TypeVar("AnyDataFrame", "DataFrame", object)
[docs]
class DataFrame(Dataset):
"""Base class of Fugue DataFrame. Please read
|DataFrameTutorial| to understand the concept
:param schema: |SchemaLikeObject|
.. note::
This is an abstract class, and normally you don't construct it by yourself
unless you are
implementing a new :class:`~fugue.execution.execution_engine.ExecutionEngine`
"""
def __init__(self, schema: Any = None):
super().__init__()
if not callable(schema):
schema = _input_schema(schema).assert_not_empty()
schema.set_readonly()
self._schema: Union[Schema, Callable[[], Schema]] = schema
self._schema_discovered = True
else:
self._schema: Union[Schema, Callable[[], Schema]] = schema # type: ignore
self._schema_discovered = False
self._lazy_schema_lock = SerializableRLock()
@property
def schema(self) -> Schema:
"""The schema of the dataframe"""
if self.schema_discovered:
# we must keep it simple because it could be called on every row by a user
assert isinstance(self._schema, Schema)
return self._schema # type: ignore
with self._lazy_schema_lock:
self._schema = _input_schema(
self._schema()
).assert_not_empty() # type: ignore
self._schema.set_readonly()
self._schema_discovered = True
return self._schema
@property
def schema_discovered(self) -> Schema:
"""Whether the schema has been discovered or still a lambda"""
return self._schema_discovered
@property
def columns(self) -> List[str]:
"""The column names of the dataframe"""
return self.schema.names
[docs]
@abstractmethod
def native_as_df(self) -> AnyDataFrame: # pragma: no cover
"""The dataframe form of the native object this Dataset class wraps.
Dataframe form means the object contains schema information. For example
the native an ArrayDataFrame is a python array, it doesn't contain schema
information, and its ``native_as_df`` should be either a pandas dataframe
or an arrow dataframe.
"""
raise NotImplementedError
[docs]
def as_local(self) -> "LocalDataFrame": # pragma: no cover
"""Convert this dataframe to a :class:`.LocalDataFrame`"""
return self.as_local_bounded()
[docs]
@abstractmethod
def as_local_bounded(self) -> "LocalBoundedDataFrame": # pragma: no cover
"""Convert this dataframe to a :class:`.LocalBoundedDataFrame`"""
raise NotImplementedError
[docs]
@abstractmethod
def peek_array(self) -> List[Any]: # pragma: no cover
"""Peek the first row of the dataframe as array
:raises FugueDatasetEmptyError: if it is empty
"""
raise NotImplementedError
[docs]
def peek_dict(self) -> Dict[str, Any]:
"""Peek the first row of the dataframe as dict
:raises FugueDatasetEmptyError: if it is empty
"""
arr = self.peek_array()
return {self.columns[i]: arr[i] for i in range(len(self.columns))}
[docs]
def as_pandas(self) -> pd.DataFrame:
"""Convert to pandas DataFrame"""
pdf = pd.DataFrame(self.as_array(), columns=self.columns)
return PD_UTILS.cast_df(
pdf, self.schema.pa_schema, use_extension_types=True, use_arrow_dtype=False
)
[docs]
def as_arrow(self, type_safe: bool = False) -> pa.Table:
"""Convert to pyArrow DataFrame"""
pdf = pd.DataFrame(self.as_array(), columns=self.columns)
adf = pa.Table.from_pandas(
pdf,
preserve_index=False,
safe=type_safe,
)
return cast_pa_table(adf, self.schema.pa_schema)
[docs]
@abstractmethod
def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]: # pragma: no cover
"""Convert to 2-dimensional native python array
:param columns: columns to extract, defaults to None
:param type_safe: whether to ensure output conforms with its schema,
defaults to False
:return: 2-dimensional native python array
.. note::
If ``type_safe`` is False, then the returned values are 'raw' values.
"""
raise NotImplementedError
[docs]
@abstractmethod
def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]: # pragma: no cover
"""Convert to iterable of native python arrays
:param columns: columns to extract, defaults to None
:param type_safe: whether to ensure output conforms with its schema,
defaults to False
:return: iterable of native python arrays
.. note::
If ``type_safe`` is False, then the returned values are 'raw' values.
"""
raise NotImplementedError
@abstractmethod
def _drop_cols(self, cols: List[str]) -> "DataFrame": # pragma: no cover
raise NotImplementedError
[docs]
@abstractmethod
def rename(self, columns: Dict[str, str]) -> "DataFrame": # pragma: no cover
"""Rename the dataframe using a mapping dict
:param columns: key: the original column name, value: the new name
:return: a new dataframe with the new names
"""
raise NotImplementedError
[docs]
@abstractmethod
def alter_columns(self, columns: Any) -> "DataFrame": # pragma: no cover
"""Change column types
:param columns: |SchemaLikeObject|,
all columns should be contained by the dataframe schema
:return: a new dataframe with altered columns, the order of the
original schema will not change
"""
raise NotImplementedError
@abstractmethod
def _select_cols(self, cols: List[Any]) -> "DataFrame": # pragma: no cover
raise NotImplementedError
[docs]
def drop(self, columns: List[str]) -> "DataFrame":
"""Drop certain columns and return a new dataframe
:param columns: columns to drop
:raises FugueDataFrameOperationError: if
``columns`` are not strictly contained by this dataframe, or it is the
entire dataframe columns
:return: a new dataframe removing the columns
"""
try:
schema = self.schema - columns
except Exception as e:
raise FugueDataFrameOperationError from e
if len(schema) == 0:
raise FugueDataFrameOperationError(
"can't remove all columns of a dataframe"
)
return self._drop_cols(columns)
def __getitem__(self, columns: List[Any]) -> "DataFrame":
"""Get certain columns of the dataframe as a new dataframe
:raises FugueDataFrameOperationError: if ``columns`` are not strictly contained
by this dataframe or it is empty
:return: a new dataframe with these columns
"""
try:
schema = self.schema.extract(columns)
except Exception as e:
raise FugueDataFrameOperationError from e
if len(schema) == 0:
raise FugueDataFrameOperationError("must select at least one column")
return self._select_cols(columns)
[docs]
@abstractmethod
def head(
self, n: int, columns: Optional[List[str]] = None
) -> "LocalBoundedDataFrame": # pragma: no cover
"""Get first n rows of the dataframe as a new local bounded dataframe
:param n: number of rows
:param columns: selected columns, defaults to None (all columns)
:return: a local bounded dataframe
"""
raise NotImplementedError
[docs]
def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Convert to a list of python dicts
:param columns: columns to extract, defaults to None
:return: a list of python dicts
.. note::
The default implementation enforces ``type_safe`` True
"""
if columns is None:
columns = self.columns
idx = range(len(columns))
return [
{columns[i]: x[i] for i in idx}
for x in self.as_array(columns, type_safe=True)
]
[docs]
def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
"""Convert to iterable of python dicts
:param columns: columns to extract, defaults to None
:return: iterable of python dicts
.. note::
The default implementation enforces ``type_safe`` True
"""
if columns is None:
columns = self.columns
idx = range(len(columns))
for x in self.as_array_iterable(columns, type_safe=True):
yield {columns[i]: x[i] for i in idx}
[docs]
def get_info_str(self) -> str:
"""Get dataframe information (schema, type, metadata) as json string
:return: json string
"""
return json.dumps(
{
"schema": str(self.schema),
"type": "{}.{}".format(
self.__class__.__module__, self.__class__.__name__
),
"metadata": self.metadata if self.has_metadata else {},
}
)
def __copy__(self) -> "DataFrame":
return self
def __deepcopy__(self, memo: Any) -> "DataFrame":
return self
def _get_altered_schema(self, subschema: Any) -> Schema: # pragma: no cover
# TODO: remove
return self.schema.alter(subschema)
[docs]
class LocalDataFrame(DataFrame):
"""Base class of all local dataframes. Please read
:ref:`this <tutorial:tutorials/advanced/schema_dataframes:dataframe>`
to understand the concept
:param schema: a `schema-like <triad.collections.schema.Schema>`_ object
.. note::
This is an abstract class, and normally you don't construct it by yourself
unless you are
implementing a new :class:`~fugue.execution.execution_engine.ExecutionEngine`
"""
[docs]
def native_as_df(self) -> AnyDataFrame:
return self.as_pandas()
@property
def is_local(self) -> bool:
"""Always True because it's a LocalDataFrame"""
return True
@property
def num_partitions(self) -> int: # pragma: no cover
"""Always 1 because it's a LocalDataFrame"""
return 1
[docs]
class LocalBoundedDataFrame(LocalDataFrame):
"""Base class of all local bounded dataframes. Please read
:ref:`this <tutorial:tutorials/advanced/schema_dataframes:dataframe>`
to understand the concept
:param schema: |SchemaLikeObject|
.. note::
This is an abstract class, and normally you don't construct it by yourself
unless you are
implementing a new :class:`~fugue.execution.execution_engine.ExecutionEngine`
"""
@property
def is_bounded(self) -> bool:
"""Always True because it's a bounded dataframe"""
return True
[docs]
def as_local_bounded(self) -> "LocalBoundedDataFrame":
"""Always True because it's a bounded dataframe"""
return self
[docs]
class LocalUnboundedDataFrame(LocalDataFrame):
"""Base class of all local unbounded dataframes. Read
this <https://fugue-tutorials.readthedocs.io/
en/latest/tutorials/advanced/schema_dataframes.html#DataFrame>`_
to understand the concept
:param schema: |SchemaLikeObject|
.. note::
This is an abstract class, and normally you don't construct it by yourself
unless you are
implementing a new :class:`~fugue.execution.execution_engine.ExecutionEngine`
"""
@property
def is_bounded(self):
"""Always False because it's an unbounded dataframe"""
return False
[docs]
def as_local(self) -> "LocalDataFrame":
return self
[docs]
def count(self) -> int:
"""
:raises InvalidOperationError: You can't count an unbounded dataframe
"""
raise InvalidOperationError("Impossible to count an LocalUnboundedDataFrame")
[docs]
class YieldedDataFrame(Yielded):
"""Yielded dataframe from :class:`~fugue.workflow.workflow.FugueWorkflow`.
Users shouldn't create this object directly.
:param yid: unique id for determinism
"""
def __init__(self, yid: str):
super().__init__(yid)
self._df: Any = None
@property
def is_set(self) -> bool:
return self._df is not None
[docs]
def set_value(self, df: DataFrame) -> None:
"""Set the yielded dataframe after compute. Users should not
call it.
:param path: file path
"""
self._df = df
@property
def result(self) -> DataFrame:
"""The yielded dataframe, it will be set after the parent
workflow is computed
"""
assert_or_throw(self.is_set, "value is not set")
return self._df
[docs]
class DataFrameDisplay(DatasetDisplay):
""":class:`~.DataFrame` plain display class"""
@property
def df(self) -> DataFrame:
"""The target :class:`~.DataFrame`"""
return self._ds # type: ignore
[docs]
def show(
self, n: int = 10, with_count: bool = False, title: Optional[str] = None
) -> None:
best_width = 100
head_rows = self.df.head(n).as_array(type_safe=True)
if len(head_rows) < n:
count = len(head_rows)
else:
count = self.df.count() if with_count else -1
with DatasetDisplay._SHOW_LOCK:
if title is not None and title != "":
print(title)
print(type(self.df).__name__)
tb = PrettyTable(self.df.schema, head_rows, best_width)
print("\n".join(tb.to_string()))
if count >= 0:
print(f"Total count: {count}")
print("")
if self.df.has_metadata:
print("Metadata:")
try:
# try pretty print, but if not convertible to json, print original
print(self.df.metadata.to_json(indent=True))
except Exception: # pragma: no cover
print(self.df.metadata)
print("")
[docs]
def as_fugue_df(df: AnyDataFrame, **kwargs: Any) -> DataFrame:
"""Wrap the object as a Fugue DataFrame.
:param df: the object to wrap
"""
ds = as_fugue_dataset(df, **kwargs)
if isinstance(ds, DataFrame):
return ds
raise TypeError(f"{type(df)} {kwargs} is not recognized as a Fugue DataFrame: {ds}")
@get_dataset_display.candidate(lambda ds: isinstance(ds, DataFrame), priority=0.1)
def _get_dataframe_display(ds: DataFrame):
return DataFrameDisplay(ds)
@as_local.candidate(lambda df: isinstance(df, DataFrame))
def _df_to_local(df: DataFrame) -> LocalDataFrame:
return df.as_local()
@as_local_bounded.candidate(lambda df: isinstance(df, DataFrame))
def _df_to_local_bounded(df: DataFrame) -> LocalBoundedDataFrame:
return df.as_local_bounded()
def _get_schema_change(
orig_schema: Optional[Schema], schema: Any
) -> Tuple[Schema, List[int]]:
if orig_schema is None:
schema = _input_schema(schema).assert_not_empty()
return schema, []
elif schema is None:
return orig_schema.assert_not_empty(), []
if isinstance(schema, (str, Schema)) and orig_schema == schema:
return orig_schema.assert_not_empty(), []
if schema in orig_schema:
# keys list or schema like object that is a subset of orig
schema = orig_schema.extract(schema).assert_not_empty()
pos = [orig_schema.index_of_key(x) for x in schema.names]
if pos == list(range(len(orig_schema))):
pos = []
return schema, pos
# otherwise it has to be a schema like object that must be a subset
# of orig, and that has mismatched types
schema = _input_schema(schema).assert_not_empty()
pos = [orig_schema.index_of_key(x) for x in schema.names]
if pos == list(range(len(orig_schema))):
pos = []
return schema, pos
def _input_schema(schema: Any) -> Schema:
return schema if isinstance(schema, Schema) else Schema(schema)