Source code for fugue.dataframe.array_dataframe

from typing import Any, Dict, Iterable, List, Optional

from fugue.dataframe.dataframe import (
    DataFrame,
    LocalBoundedDataFrame,
    _get_schema_change,
    as_fugue_dataset,
)
from fugue.exceptions import FugueDataFrameOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.pyarrow import apply_schema


[docs] class ArrayDataFrame(LocalBoundedDataFrame): """DataFrame that wraps native python 2-dimensional arrays. Please read |DataFrameTutorial| to understand the concept :param df: 2-dimensional array, iterable of arrays, or :class:`~fugue.dataframe.dataframe.DataFrame` :param schema: |SchemaLikeObject| .. admonition:: Examples >>> a = ArrayDataFrame([[0,'a'],[1,'b']],"a:int,b:str") >>> b = ArrayDataFrame(a) """ def __init__(self, df: Any = None, schema: Any = None): # noqa: C901 if df is None: super().__init__(schema) self._native = [] elif isinstance(df, DataFrame): if schema is None: super().__init__(df.schema) self._native = df.as_array(type_safe=False) else: schema, _ = _get_schema_change(df.schema, schema) super().__init__(schema) self._native = df.as_array(schema.names, type_safe=False) elif isinstance(df, Iterable): super().__init__(schema) self._native = df if isinstance(df, List) else list(df) else: raise ValueError(f"{df} is incompatible with ArrayDataFrame") @property def native(self) -> List[Any]: """2-dimensional native python array""" return self._native @property def empty(self) -> bool: return self.count() == 0
[docs] def peek_array(self) -> List[Any]: self.assert_not_empty() return list(self.native[0])
[docs] def count(self) -> int: return len(self.native)
def _drop_cols(self, cols: List[str]) -> DataFrame: return ArrayDataFrame(self, self.schema - cols) def _select_cols(self, keys: List[Any]) -> DataFrame: return ArrayDataFrame(self, self.schema.extract(keys))
[docs] def rename(self, columns: Dict[str, str]) -> DataFrame: try: schema = self.schema.rename(columns) except Exception as e: raise FugueDataFrameOperationError from e return ArrayDataFrame(self.native, schema)
[docs] def alter_columns(self, columns: Any) -> DataFrame: new_schema = self.schema.alter(columns) if new_schema == self.schema: return self temp = ArrayDataFrame(self.native, new_schema).as_array(type_safe=True) return ArrayDataFrame(temp, new_schema)
[docs] def as_array( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> List[Any]: if not type_safe and columns is None: return self.native return list(self.as_array_iterable(columns, type_safe=type_safe))
[docs] def as_array_iterable( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> Iterable[Any]: if columns is None: pos = [] else: pos = [self.schema.index_of_key(k) for k in columns] assert_or_throw(len(pos) > 0, "columns if set must be non empty") if not type_safe: for item in self._iter_cols(pos): yield item else: sub = self.schema if columns is None else self.schema.extract(columns) for item in apply_schema( sub.pa_schema, self._iter_cols(pos), copy=True, deep=True, str_as_json=True, ): yield item
[docs] def head( self, n: int, columns: Optional[List[str]] = None ) -> LocalBoundedDataFrame: res = ArrayDataFrame(self.native[:n], self.schema) return res if columns is None else res[columns] # type: ignore
def _iter_cols(self, pos: List[int]) -> Iterable[List[Any]]: if len(pos) == 0: for row in self.native: yield row else: for row in self.native: yield [row[p] for p in pos]
@as_fugue_dataset.candidate(lambda df, **kwargs: isinstance(df, list), priority=0.9) def _arr_to_fugue(df: List[Any], **kwargs: Any) -> ArrayDataFrame: return ArrayDataFrame(df, **kwargs)