Source code for fugue.dataframe.dataframe_iterable_dataframe

from typing import Any, Dict, Iterable, List, Optional

import pandas as pd
import pyarrow as pa
from triad import Schema, assert_or_throw
from triad.utils.iter import EmptyAwareIterable, make_empty_aware

from fugue.exceptions import FugueDataFrameInitError

from .array_dataframe import ArrayDataFrame
from .arrow_dataframe import ArrowDataFrame
from .dataframe import (
    DataFrame,
    LocalBoundedDataFrame,
    LocalDataFrame,
    LocalUnboundedDataFrame,
)
from .pandas_dataframe import PandasDataFrame


[docs] class LocalDataFrameIterableDataFrame(LocalUnboundedDataFrame): """DataFrame that wraps an iterable of local dataframes :param df: an iterable of :class:`~fugue.dataframe.dataframe.DataFrame`. If any is not local, they will be converted to :class:`~fugue.dataframe.dataframe.LocalDataFrame` by :meth:`~fugue.dataframe.dataframe.DataFrame.as_local` :param schema: |SchemaLikeObject|, if it is provided, it must match the schema of the dataframes .. admonition:: Examples .. code-block:: python def get_dfs(seq): yield IterableDataFrame([], "a:int,b:int") yield IterableDataFrame([[1, 10]], "a:int,b:int") yield ArrayDataFrame([], "a:int,b:str") df = LocalDataFrameIterableDataFrame(get_dfs()) for subdf in df.native: subdf.show() .. note:: It's ok to peek the dataframe, it will not affect the iteration, but it's invalid to count. ``schema`` can be used when the iterable contains no dataframe. But if there is any dataframe, ``schema`` must match the schema of the dataframes. For the iterable of dataframes, if there is any empty dataframe, they will be skipped and their schema will not matter. However, if all dataframes in the interable are empty, then the last empty dataframe will be used to set the schema. """ def __init__(self, df: Any = None, schema: Any = None): # noqa: C901 if isinstance(df, Iterable): self._native = make_empty_aware(self._dfs_wrapper(df)) orig_schema: Optional[Schema] = None if not self._native.empty: orig_schema = self._native.peek().schema else: raise ValueError( f"{df} is incompatible with LocalDataFrameIterableDataFrame" ) if orig_schema is None and schema is None: raise FugueDataFrameInitError( "schema is not provided and the input is empty" ) elif orig_schema is None and schema is not None: pass elif orig_schema is not None and schema is None: schema = orig_schema else: schema = Schema(schema) if not isinstance(schema, Schema) else schema assert_or_throw( orig_schema == schema, lambda: f"iterable schema {orig_schema} is different from {schema}", ) super().__init__(schema) def _dfs_wrapper(self, dfs: Iterable[DataFrame]) -> Iterable[LocalDataFrame]: last_empty: Any = None last_schema: Any = None yielded = False for df in dfs: if df.empty: last_empty = df else: assert_or_throw( last_schema is None or df.schema == last_schema, lambda: FugueDataFrameInitError( f"encountered schema {df.schema} doesn't match" f" the original schema {df.schema}" ), ) if last_schema is None: last_schema = df.schema yield df.as_local() yielded = True if not yielded and last_empty is not None: yield last_empty @property def native(self) -> EmptyAwareIterable[LocalDataFrame]: """Iterable of dataframes""" return self._native @property def empty(self) -> bool: return self.native.empty or self.native.peek().empty
[docs] def peek_array(self) -> List[Any]: self.assert_not_empty() return self.native.peek().peek_array()
def _select_cols(self, keys: List[Any]) -> DataFrame: if self.empty: return ArrayDataFrame([], self.schema)[keys] def _transform(): for df in self.native: yield df[keys] return LocalDataFrameIterableDataFrame(_transform())
[docs] def rename(self, columns: Dict[str, str]) -> DataFrame: if self.empty: return ArrayDataFrame([], self.schema).rename(columns) def _transform() -> Iterable[DataFrame]: for df in self.native: yield df.rename(columns) return LocalDataFrameIterableDataFrame(_transform())
[docs] def alter_columns(self, columns: Any) -> DataFrame: if self.empty: return ArrayDataFrame([], self.schema).alter_columns(columns) def _transform() -> Iterable[DataFrame]: for df in self.native: yield df.alter_columns(columns) return LocalDataFrameIterableDataFrame(_transform())
[docs] def as_local_bounded(self) -> "LocalBoundedDataFrame": return ArrowDataFrame(self.as_arrow())
[docs] def as_array( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> List[Any]: return sum( (df.as_array(columns=columns, type_safe=type_safe) for df in self.native), [], )
[docs] def as_array_iterable( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> Iterable[Any]: for df in self.native: yield from df.as_array_iterable(columns=columns, type_safe=type_safe)
[docs] def as_pandas(self) -> pd.DataFrame: if self.empty: return PandasDataFrame(schema=self.schema).as_pandas() return pd.concat(df.as_pandas() for df in self.native)
[docs] def as_arrow(self, type_safe: bool = False) -> pa.Table: if self.empty: return ArrayDataFrame([], self.schema).as_arrow() return pa.concat_tables(df.as_arrow() for df in self.native)
[docs] def head( self, n: int, columns: Optional[List[str]] = None ) -> LocalBoundedDataFrame: res: List[Any] = [] for row in self.as_array_iterable(columns, type_safe=True): if n < 1: break res.append(list(row)) n -= 1 return ArrayDataFrame( res, self.schema if columns is None else self.schema.extract(columns) )
def _drop_cols(self, cols: List[str]) -> DataFrame: if self.empty: return ArrayDataFrame([], self.schema)._drop_cols(cols) def _transform() -> Iterable[DataFrame]: for df in self.native: yield df._drop_cols(cols) return LocalDataFrameIterableDataFrame(_transform())
[docs] class IterablePandasDataFrame(LocalDataFrameIterableDataFrame):
[docs] def as_local_bounded(self) -> "LocalBoundedDataFrame": return PandasDataFrame(self.as_pandas(), schema=self.schema)
[docs] class IterableArrowDataFrame(LocalDataFrameIterableDataFrame): pass