Source code for fugue.dataframe.utils

import pickle
from typing import Any, Dict, Iterable, List, Optional, Tuple

import pandas as pd
import pyarrow as pa
from fsspec import AbstractFileSystem
from triad import Schema, assert_or_throw
from triad.collections.schema import SchemaError
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_arg_not_none
from triad.utils.assertion import assert_or_throw as aot
from triad.utils.io import url_to_fs
from triad.utils.pyarrow import pa_batch_to_dicts

from .api import as_fugue_df, get_column_names, normalize_column_names, rename
from .dataframe import DataFrame, LocalBoundedDataFrame

# For backward compatibility, TODO: remove!
get_dataframe_column_names = get_column_names
normalize_dataframe_column_names = normalize_column_names
rename_dataframe_column_names = rename


def _pa_type_eq(t1: pa.DataType, t2: pa.DataType) -> bool:
    # should ignore the name difference of list
    # e.g. list<item: string> == list<l: string>
    if pa.types.is_list(t1) and pa.types.is_list(t2):  # pragma: no cover
        return _pa_type_eq(t1.value_type, t2.value_type)
    return t1 == t2


def _schema_eq(s1: Schema, s2: Schema) -> bool:
    if s1 == s2:
        return True
    return s1.names == s2.names and all(
        _pa_type_eq(f1.type, f2.type) for f1, f2 in zip(s1.fields, s2.fields)
    )


def _df_eq(
    df: DataFrame,
    data: Any,
    schema: Any = None,
    digits=8,
    check_order: bool = False,
    check_schema: bool = True,
    check_content: bool = True,
    no_pandas: bool = False,
    throw=False,
) -> bool:
    """Compare if two dataframes are equal. Is for internal, unit test
    purpose only. It will convert both dataframes to
    :class:`~fugue.dataframe.dataframe.LocalBoundedDataFrame`, so it assumes
    both dataframes are small and fast enough to convert. DO NOT use it
    on critical or expensive tasks.

    :param df: first data frame
    :param data: :ref:`DataFrame like
      <tutorial:tutorials/advanced/x-like:dataframe>` object
    :param schema: :ref:`Schema like
      <tutorial:tutorials/advanced/x-like:schema>` object, defaults to None
    :param digits: precision on float number comparison, defaults to 8
    :param check_order: if to compare the row orders, defaults to False
    :param check_schema: if compare schemas, defaults to True
    :param check_content: if to compare the row values, defaults to True
    :param no_pandas: if true, it will compare the string representations of the
      dataframes, otherwise, it will convert both to pandas dataframe to compare,
      defaults to False
    :param throw: if to throw error if not equal, defaults to False
    :return: if they equal
    """
    df1 = as_fugue_df(df).as_local_bounded()
    if schema is not None:
        df2 = as_fugue_df(data, schema=schema).as_local_bounded()
    else:
        df2 = as_fugue_df(data).as_local_bounded()
    try:
        assert (
            df1.count() == df2.count()
        ), f"count mismatch {df1.count()}, {df2.count()}"
        assert not check_schema or _schema_eq(
            df.schema, df2.schema
        ), f"schema mismatch {df.schema.pa_schema}, {df2.schema.pa_schema}"
        if not check_content:
            return True
        cols: Any = df1.columns
        if no_pandas:
            dd1 = [[x.__repr__()] for x in df1.as_array_iterable(type_safe=True)]
            dd2 = [[x.__repr__()] for x in df2.as_array_iterable(type_safe=True)]
            d1 = pd.DataFrame(dd1, columns=["data"])
            d2 = pd.DataFrame(dd2, columns=["data"])
            cols = ["data"]
        else:
            d1 = df1.as_pandas()
            d2 = df2.as_pandas()
        if not check_order:
            d1 = d1.sort_values(cols)
            d2 = d2.sort_values(cols)
        d1 = d1.reset_index(drop=True)
        d2 = d2.reset_index(drop=True)
        pd.testing.assert_frame_equal(
            d1, d2, rtol=0, atol=10 ** (-digits), check_dtype=False, check_exact=False
        )
        return True
    except AssertionError:
        if throw:
            raise
        return False


[docs] def serialize_df( df: Optional[DataFrame], threshold: int = -1, file_path: Optional[str] = None, ) -> Optional[bytes]: """Serialize input dataframe to base64 string or to file if it's larger than threshold :param df: input DataFrame :param threshold: file byte size threshold, defaults to -1 :param file_path: file path to store the data (used only if the serialized data is larger than ``threshold``), defaults to None :raises InvalidOperationError: if file is large but ``file_path`` is not provided :return: a pickled blob either containing the data or the file path """ if df is None: return None data = pickle.dumps(df.as_local_bounded()) size = len(data) if threshold < 0 or size <= threshold: return data else: if file_path is None: raise InvalidOperationError("file_path is not provided") fs, path = url_to_fs(file_path) with fs.open(path, "wb") as f: f.write(data) return pickle.dumps(file_path)
[docs] def deserialize_df( data: Optional[bytes], fs: Optional[AbstractFileSystem] = None ) -> Optional[LocalBoundedDataFrame]: """Deserialize json string to :class:`~fugue.dataframe.dataframe.LocalBoundedDataFrame` :param json_str: json string containing the base64 data or a file path :param fs: the file system to use, defaults to None :raises ValueError: if the json string is invalid, not generated from :func:`~.serialize_df` :return: :class:`~fugue.dataframe.dataframe.LocalBoundedDataFrame` if ``json_str`` contains a dataframe or None if its valid but contains no data """ if data is None: return None obj = pickle.loads(data) if isinstance(obj, LocalBoundedDataFrame): return obj elif isinstance(obj, str): fs, path = url_to_fs(obj) with fs.open(path, "rb") as f: return pickle.load(f) raise ValueError("data is invalid")
[docs] def get_join_schemas( df1: DataFrame, df2: DataFrame, how: str, on: Optional[Iterable[str]] ) -> Tuple[Schema, Schema]: """Get :class:`~triad:triad.collections.schema.Schema` object after joining ``df1`` and ``df2``. If ``on`` is not empty, it's mainly for validation purpose. :param df1: first dataframe :param df2: second dataframe :param how: can accept ``semi``, ``left_semi``, ``anti``, ``left_anti``, ``inner``, ``left_outer``, ``right_outer``, ``full_outer``, ``cross`` :param on: it can always be inferred, but if you provide, it will be validated agained the inferred keys. :return: the pair key schema and schema after join .. note:: In Fugue, joined schema can always be inferred because it always uses the input dataframes' common keys as the join keys. So you must make sure to :meth:`~fugue.dataframe.dataframe.DataFrame.rename` to input dataframes so they follow this rule. """ assert_arg_not_none(how, "how") how = how.lower() aot( how in [ "semi", "left_semi", "anti", "left_anti", "inner", "left_outer", "right_outer", "full_outer", "cross", ], ValueError(f"{how} is not a valid join type"), ) on = list(on) if on is not None else [] aot(len(on) == len(set(on)), f"{on} has duplication") if how != "cross" and len(on) == 0: other = set(df2.columns) on = [c for c in df1.columns if c in other] aot( len(on) > 0, lambda: SchemaError( f"no common columns between {df1.columns} and {df2.columns}" ), ) schema2 = df2.schema aot( how != "outer", ValueError( "'how' must use left_outer, right_outer, full_outer for outer joins" ), ) if how in ["semi", "left_semi", "anti", "left_anti"]: schema2 = schema2.extract(on) aot( on in df1.schema and on in schema2, lambda: SchemaError( f"{on} is not the intersection of {df1.schema} & {df2.schema}" ), ) cm = df1.schema.intersect(on) if how == "cross": cs = df1.schema.intersect(schema2) aot( len(cs) == 0, SchemaError(f"invalid cross join, two dataframes have common columns {cs}"), ) else: aot(len(on) > 0, SchemaError("join on columns must be specified")) return cm, (df1.schema.union(schema2))
[docs] def pa_table_as_array_iterable( df: pa.Table, columns: Optional[List[str]] = None ) -> Iterable[List[List[Any]]]: """Convert a pyarrow table to an iterable of list :param df: pyarrow table :param columns: if not None, only these columns will be returned, defaults to None :return: an iterable of list """ assert_or_throw(columns is None or len(columns) > 0, ValueError("empty columns")) _df = df if columns is None or len(columns) == 0 else df.select(columns) for batch in _df.to_batches(): for x in zip(*batch.to_pydict().values()): yield list(x)
[docs] def pa_table_as_array( df: pa.Table, columns: Optional[List[str]] = None ) -> List[List[List[Any]]]: """Convert a pyarrow table to a list of list :param df: pyarrow table :param columns: if not None, only these columns will be returned, defaults to None :return: a list of list """ return list(pa_table_as_array_iterable(df, columns=columns))
[docs] def pa_table_as_dict_iterable( df: pa.Table, columns: Optional[List[str]] = None ) -> Iterable[Dict[str, Any]]: """Convert a pyarrow table to an iterable of dict :param df: pyarrow table :param columns: if not None, only these columns will be returned, defaults to None :return: an iterable of dict """ for ck in _pa_table_as_dicts_chunks(df, columns=columns): yield from ck
[docs] def pa_table_as_dicts( df: pa.Table, columns: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """Convert a pyarrow table to a list of dict :param df: pyarrow table :param columns: if not None, only these columns will be returned, defaults to None :return: a list of dict """ res: List[Dict[str, Any]] = [] for ck in _pa_table_as_dicts_chunks(df, columns=columns): res += ck return res
def _pa_table_as_dicts_chunks( df: pa.Table, columns: Optional[List[str]] = None ) -> Iterable[List[Dict[str, Any]]]: assert_or_throw(columns is None or len(columns) > 0, ValueError("empty columns")) _df = df if columns is None or len(columns) == 0 else df.select(columns) for batch in _df.to_batches(): yield pa_batch_to_dicts(batch)