Source code for fugue.dataframe.arrow_dataframe

from typing import Any, Dict, Iterable, List, Optional

import pandas as pd
import pyarrow as pa
from triad.collections.schema import Schema
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.pyarrow import cast_pa_table, pa_table_to_pandas

from fugue.dataset.api import (
    as_fugue_dataset,
    as_local,
    as_local_bounded,
    count,
    get_num_partitions,
    is_bounded,
    is_empty,
    is_local,
)
from fugue.exceptions import FugueDataFrameOperationError

from .api import (
    alter_columns,
    as_array,
    as_array_iterable,
    as_dict_iterable,
    as_dicts,
    as_pandas,
    drop_columns,
    get_column_names,
    get_schema,
    is_df,
    rename,
    select_columns,
)
from .dataframe import DataFrame, LocalBoundedDataFrame, _input_schema
from .utils import (
    pa_table_as_array,
    pa_table_as_array_iterable,
    pa_table_as_dict_iterable,
    pa_table_as_dicts,
)


[docs] class ArrowDataFrame(LocalBoundedDataFrame): """DataFrame that wraps :func:`pyarrow.Table <pa:pyarrow.table>`. Please also read |DataFrameTutorial| to understand this Fugue concept :param df: 2-dimensional array, iterable of arrays, :func:`pyarrow.Table <pa:pyarrow.table>` or pandas DataFrame :param schema: |SchemaLikeObject| .. admonition:: Examples >>> ArrowDataFrame([[0,'a'],[1,'b']],"a:int,b:str") >>> ArrowDataFrame(schema = "a:int,b:int") # empty dataframe >>> ArrowDataFrame(pd.DataFrame([[0]],columns=["a"])) >>> ArrowDataFrame(ArrayDataFrame([[0]],"a:int).as_arrow()) """ def __init__( # noqa: C901 self, df: Any = None, schema: Any = None, ): if df is None: schema = _input_schema(schema).assert_not_empty() self._native: pa.Table = schema.create_empty_arrow_table() super().__init__(schema) return elif isinstance(df, pa.Table): assert_or_throw( schema is None, InvalidOperationError("can't reset schema for pa.Table"), ) self._native = df super().__init__(Schema(df.schema)) return elif isinstance(df, (pd.DataFrame, pd.Series)): if isinstance(df, pd.Series): df = df.to_frame() pdf = df if schema is None: self._native = pa.Table.from_pandas( pdf, schema=Schema(pdf).pa_schema, preserve_index=False, safe=True, ) schema = Schema(self._native.schema) else: schema = _input_schema(schema).assert_not_empty() if pdf.shape[0] == 0: self._native = schema.create_empty_arrow_table() else: self._native = pa.Table.from_pandas( pdf, schema=schema.pa_schema, preserve_index=False, safe=True, ) super().__init__(schema) return elif isinstance(df, Iterable): schema = _input_schema(schema).assert_not_empty() pdf = pd.DataFrame(df, columns=schema.names) if pdf.shape[0] == 0: self._native = schema.create_empty_arrow_table() else: for f in schema.fields: if pa.types.is_timestamp(f.type) or pa.types.is_date(f.type): pdf[f.name] = pd.to_datetime(pdf[f.name]) schema = _input_schema(schema).assert_not_empty() self._native = pa.Table.from_pandas( pdf, schema=schema.pa_schema, preserve_index=False, safe=True ) super().__init__(schema) return else: raise ValueError(f"{df} is incompatible with ArrowDataFrame") @property def native(self) -> pa.Table: """:func:`pyarrow.Table <pa:pyarrow.table>`""" return self._native
[docs] def native_as_df(self) -> pa.Table: return self._native
@property def empty(self) -> bool: return self.count() == 0
[docs] def peek_array(self) -> List[Any]: self.assert_not_empty() data = self.native.take([0]).to_pydict() return [v[0] for v in data.values()]
[docs] def peek_dict(self) -> Dict[str, Any]: self.assert_not_empty() data = self.native.take([0]).to_pydict() return {k: v[0] for k, v in data.items()}
[docs] def count(self) -> int: return self.native.shape[0]
[docs] def as_pandas(self) -> pd.DataFrame: return _pa_table_as_pandas(self.native)
[docs] def head( self, n: int, columns: Optional[List[str]] = None ) -> LocalBoundedDataFrame: adf = self.native if columns is None else self.native.select(columns) n = min(n, self.count()) if n == 0: schema = self.schema if columns is None else self.schema.extract(columns) return ArrowDataFrame(None, schema=schema) return ArrowDataFrame(adf.take(list(range(n))))
def _drop_cols(self, cols: List[str]) -> DataFrame: return ArrowDataFrame(self.native.drop(cols)) def _select_cols(self, keys: List[Any]) -> DataFrame: return ArrowDataFrame(self.native.select(keys))
[docs] def rename(self, columns: Dict[str, str]) -> DataFrame: try: cols = dict(columns) new_cols = [cols.pop(c, c) for c in self.columns] assert_or_throw(len(cols) == 0) except Exception as e: raise FugueDataFrameOperationError from e return ArrowDataFrame(self.native.rename_columns(new_cols))
[docs] def alter_columns(self, columns: Any) -> DataFrame: adf = _pa_table_alter_columns(self.native, columns) if adf is self.native: return self return ArrowDataFrame(adf)
[docs] def as_arrow(self, type_safe: bool = False) -> pa.Table: return self.native
[docs] def as_array( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> List[Any]: return pa_table_as_array(self.native, columns=columns)
[docs] def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]: return pa_table_as_dicts(self.native, columns=columns)
[docs] def as_array_iterable( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> Iterable[Any]: yield from pa_table_as_array_iterable(self.native, columns=columns)
[docs] def as_dict_iterable( self, columns: Optional[List[str]] = None ) -> Iterable[Dict[str, Any]]: yield from pa_table_as_dict_iterable(self.native, columns=columns)
@as_local.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_as_local(df: pa.Table) -> pa.Table: return df @as_local_bounded.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_as_local_bounded(df: pa.Table) -> pa.Table: return df @as_pandas.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_as_pandas(df: pa.Table) -> pd.DataFrame: return pa_table_to_pandas( df, use_extension_types=True, use_arrow_dtype=False, use_threads=False, date_as_object=False, ) @as_array.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _pa_table_as_array( df: pa.Table, columns: Optional[List[str]] = None, type_safe: bool = False ) -> List[Any]: return pa_table_as_array(df, columns=columns) @as_array_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _pa_table_as_array_iterable( df: pa.Table, columns: Optional[List[str]] = None, type_safe: bool = False ) -> Iterable[Any]: yield from pa_table_as_array_iterable(df, columns=columns) @as_dicts.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _pa_table_as_dicts( df: pa.Table, columns: Optional[List[str]] = None ) -> List[Dict[str, Any]]: return pa_table_as_dicts(df, columns=columns) @as_dict_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _pa_table_as_dict_iterable( df: pa.Table, columns: Optional[List[str]] = None ) -> Iterable[Dict[str, Any]]: yield from pa_table_as_dict_iterable(df, columns=columns) @alter_columns.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _pa_table_alter_columns( df: pa.Table, columns: Any, as_fugue: bool = False ) -> pa.Table: schema = Schema(df.schema) new_schema = schema.alter(columns) if schema != new_schema: df = cast_pa_table(df, new_schema.pa_schema) return df if not as_fugue else ArrowDataFrame(df) @as_fugue_dataset.candidate(lambda df, **kwargs: isinstance(df, pa.Table)) def _pa_table_as_fugue_df(df: pa.Table, **kwargs: Any) -> "ArrowDataFrame": return ArrowDataFrame(df, **kwargs) @is_df.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_is_df(df: pa.Table) -> bool: return True @count.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_count(df: pa.Table) -> int: return df.shape[0] @is_bounded.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_is_bounded(df: pa.Table) -> bool: return True @is_empty.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_is_empty(df: pa.Table) -> bool: return df.shape[0] == 0 @is_local.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_is_local(df: pa.Table) -> bool: return True @get_num_partitions.candidate(lambda df: isinstance(df, pa.Table)) def _pa_table_get_num_partitions(df: pa.Table) -> int: return 1 @get_column_names.candidate(lambda df: isinstance(df, pa.Table)) def _get_pyarrow_table_columns(df: pa.Table) -> List[Any]: return [f.name for f in df.schema] @get_schema.candidate(lambda df: isinstance(df, pa.Table)) def _get_pyarrow_table_schema(df: pa.Table) -> Schema: return Schema(df.schema) @rename.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _rename_pyarrow_dataframe(df: pa.Table, columns: Dict[str, Any]) -> pa.Table: if len(columns) == 0: return df _assert_no_missing(df, columns.keys()) return df.rename_columns([columns.get(f.name, f.name) for f in df.schema]) @drop_columns.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _drop_pa_columns(df: pa.Table, columns: List[str]) -> pa.Table: cols = [x for x in df.schema.names if x not in columns] if len(cols) == 0: raise FugueDataFrameOperationError("cannot drop all columns") if len(cols) + len(columns) != len(df.columns): _assert_no_missing(df, columns) return df.select(cols) @select_columns.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table)) def _select_pa_columns(df: pa.Table, columns: List[Any]) -> pa.Table: if len(columns) == 0: raise FugueDataFrameOperationError("must select at least one column") _assert_no_missing(df, columns=columns) return df.select(columns) def _build_empty_arrow(schema: Schema) -> pa.Table: # pragma: no cover # TODO: remove return schema.create_empty_arrow_table() def _assert_no_missing(df: pa.Table, columns: Iterable[Any]) -> None: missing = [x for x in columns if x not in df.schema.names] if len(missing) > 0: raise FugueDataFrameOperationError("found nonexistent columns: {missing}")