Source code for fugue.dataframe.iterable_dataframe

from typing import Any, Dict, Iterable, List, Optional

from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.dataframe import (
from fugue.exceptions import FugueDataFrameOperationError
from triad.collections.schema import Schema
from triad.utils.iter import EmptyAwareIterable, make_empty_aware
from triad.utils.pyarrow import apply_schema

[docs] class IterableDataFrame(LocalUnboundedDataFrame): """DataFrame that wraps native python iterable of arrays. Please read |DataFrameTutorial| to understand the concept :param df: 2-dimensional array, iterable of arrays, or :class:`~fugue.dataframe.dataframe.DataFrame` :param schema: |SchemaLikeObject| .. admonition:: Examples >>> a = IterableDataFrame([[0,'a'],[1,'b']],"a:int,b:str") >>> b = IterableDataFrame(a) .. note:: It's ok to peek the dataframe, it will not affect the iteration, but it's invalid operation to count """ def __init__(self, df: Any = None, schema: Any = None): # noqa: C901 if df is None: idf: Iterable[Any] = [] orig_schema: Optional[Schema] = None elif isinstance(df, IterableDataFrame): idf = df.native orig_schema = df.schema elif isinstance(df, DataFrame): idf = df.as_array_iterable(type_safe=False) orig_schema = df.schema elif isinstance(df, Iterable): idf = df orig_schema = None else: raise ValueError(f"{df} is incompatible with IterableDataFrame") schema, pos = _get_schema_change(orig_schema, schema) super().__init__(schema) self._pos = pos self._native = make_empty_aware(self._preprocess(idf)) @property def native(self) -> EmptyAwareIterable[Any]: """Iterable of native python arrays""" return self._native @property def empty(self) -> bool: return self.native.empty
[docs] def peek_array(self) -> List[Any]: self.assert_not_empty() return list(self.native.peek())
[docs] def head( self, n: int, columns: Optional[List[str]] = None ) -> LocalBoundedDataFrame: res: List[Any] = [] for row in self.as_array_iterable(columns, type_safe=True): if n < 1: break res.append(list(row)) n -= 1 return ArrayDataFrame( res, self.schema if columns is None else self.schema.extract(columns) )
def _drop_cols(self, cols: List[str]) -> DataFrame: return IterableDataFrame(self, self.schema - cols) def _select_cols(self, keys: List[Any]) -> DataFrame: return IterableDataFrame(self, self.schema.extract(keys))
[docs] def rename(self, columns: Dict[str, str]) -> DataFrame: try: schema = self.schema.rename(columns) except Exception as e: raise FugueDataFrameOperationError from e return IterableDataFrame(self.native, schema)
[docs] def alter_columns(self, columns: Any) -> DataFrame: new_schema = self.schema.alter(columns) if new_schema == self.schema: return self return IterableDataFrame(self.native, new_schema)
[docs] def as_local_bounded(self) -> LocalBoundedDataFrame: return ArrayDataFrame(self.as_array(), schema=self.schema)
[docs] def as_array( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> List[Any]: return list(self.as_array_iterable(columns, type_safe=type_safe))
[docs] def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]: return list(self.as_dict_iterable(columns))
[docs] def as_array_iterable( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> Iterable[Any]: if columns is None: if not type_safe: for item in self.native: yield item else: for item in apply_schema( self.schema.pa_schema, self.native, copy=True, deep=True, str_as_json=True, ): yield item else: df = IterableDataFrame(self, self.schema.extract(columns)) for item in df.as_array_iterable(type_safe=type_safe): yield item
def _preprocess(self, raw_data: Iterable[Any]) -> Iterable[Any]: if len(self._pos) == 0: for x in raw_data: yield x else: for x in raw_data: yield [x[i] for i in self._pos]