Source code for fugue.column.expressions

from typing import Any, Dict, Iterable, List, Optional, Union

import pyarrow as pa
from triad import Schema, assert_or_throw, to_uuid
from triad.utils.pyarrow import _type_to_expression, to_pa_datatype


[docs] class ColumnExpr: """Fugue column expression class. It is inspired from :class:`spark:pyspark.sql.Column` and it is working in progress. .. admonition:: New Since :class: hint **0.6.0** .. caution:: This is a base class of different column classes, and users are not supposed to construct this class directly. Use :func:`~.col` and :func:`~.lit` instead. """ def __init__(self): self._as_name = "" self._as_type: Optional[pa.DataType] = None @property def name(self) -> str: """The original name of this column, default empty :return: the name .. admonition:: Examples .. code-block:: python assert "a" == col("a").name assert "b" == col("a").alias("b").name assert "" == lit(1).name assert "" == (col("a") * 2).name """ return "" @property def as_name(self) -> str: """The name assigned by :meth:`~.alias` :return: the alias .. admonition:: Examples .. code-block:: python assert "" == col("a").as_name assert "b" == col("a").alias("b").as_name assert "x" == (col("a") * 2).alias("x").as_name """ return self._as_name @property def as_type(self) -> Optional[pa.DataType]: """The type assigned by :meth:`~.cast` :return: the pyarrow datatype if :meth:`~.cast` was called otherwise None .. admonition:: Examples .. code-block:: python import pyarrow as pa assert col("a").as_type is None assert pa.int64() == col("a").cast(int).as_type assert pa.string() == (col("a") * 2).cast(str).as_type """ return self._as_type @property def output_name(self) -> str: """The name assigned by :meth:`~.alias`, but if empty then return the original column name :return: the alias or the original column name .. admonition:: Examples .. code-block:: python assert "a" == col("a").output_name assert "b" == col("a").alias("b").output_name assert "x" == (col("a") * 2).alias("x").output_name """ return self.as_name if self.as_name != "" else self.name
[docs] def alias(self, as_name: str) -> "ColumnExpr": # pragma: no cover """Assign or remove alias of a column. To remove, set ``as_name`` to empty :return: a new column with the alias value .. admonition:: Examples .. code-block:: python assert "b" == col("a").alias("b").as_name assert "x" == (col("a") * 2).alias("x").as_name assert "" == col("a").alias("b").alias("").as_name """ raise NotImplementedError
[docs] def infer_alias(self) -> "ColumnExpr": """Infer alias of a column. If the column's :meth:`~.output_name` is not empty then it returns itself without change. Otherwise it tries to infer alias from the underlying columns. :return: a column instance with inferred alias .. caution:: Users should not use it directly. .. admonition:: Examples .. code-block:: python import fugue.column.functions as f assert "a" == col("a").infer_alias().output_name assert "" == (col("a") * 2).infer_alias().output_name assert "a" == col("a").is_null().infer_alias().output_name assert "a" == f.max(col("a").is_null()).infer_alias().output_name """ return self
[docs] def cast(self, data_type: Any) -> "ColumnExpr": # pragma: no cover """Cast the column to a new data type :param data_type: It can be string expressions, python primitive types, python `datetime.datetime` and pyarrow types. For details read |FugueDataTypes| :return: a new column instance with the assigned data type .. caution:: Currently, casting to struct or list type has undefined behavior. .. admonition:: Examples .. code-block:: python import pyarrow as pa assert pa.int64() == col("a").cast(int).as_type assert pa.string() == col("a").cast(str).as_type assert pa.float64() == col("a").cast(float).as_type assert pa._bool() == col("a").cast(bool).as_type # string follows the type expression of Triad Schema assert pa.int32() == col("a").cast("int").as_type assert pa.int32() == col("a").cast("int32").as_type assert pa.int32() == col("a").cast(pa.int32()).as_type """ raise NotImplementedError
[docs] def infer_type(self, schema: Schema) -> Optional[pa.DataType]: """Infer data type of this column given the input schema :param schema: the schema instance to infer from :return: a pyarrow datatype or None if failed to infer .. caution:: Users should not use it directly. .. admonition:: Examples .. code-block:: python import pyarrow as pa from triad import Schema import fugue.column.functions as f schema = Schema("a:int,b:str") assert pa.int32() == col("a").infer_schema(schema) assert pa.int32() == (-col("a")).infer_schema(schema) # due to overflow risk, can't infer certain operations assert (col("a")+1).infer_schema(schema) is None assert (col("a")+col("a")).infer_schema(schema) is None assert pa.int32() == f.max(col("a")).infer_schema(schema) assert pa.int32() == f.min(col("a")).infer_schema(schema) assert f.sum(col("a")).infer_schema(schema) is None """ return self.as_type # pragma: no cover
def __str__(self) -> str: """String expression of the column, this is only used for debug purpose. It is not SQL expression. :return: the string expression """ res = self.body_str if self.as_type is not None: res = f"CAST({res} AS {_type_to_expression(self.as_type)})" if self.as_name != "": res = res + " AS " + self.as_name return res @property def body_str(self) -> str: # pragma: no cover """The string expression of this column without cast type and alias. This is only used for debug purpose. It is not SQL expression. :return: the string expression """ raise NotImplementedError
[docs] def is_null(self) -> "ColumnExpr": """Same as SQL ``<col> IS NULL``. :return: a new column with the boolean values """ # TODO: should enable infer_schema for this? return _UnaryOpExpr("IS_NULL", self)
[docs] def not_null(self) -> "ColumnExpr": """Same as SQL ``<col> IS NOT NULL``. :return: a new column with the boolean values """ # TODO: should enable infer_schema for this? return _UnaryOpExpr("NOT_NULL", self)
def __neg__(self) -> "ColumnExpr": """The negative value of the current column :return: a new column with the negative value """ return _InvertOpExpr("-", self) def __pos__(self) -> "ColumnExpr": """The original value of the current column :return: the column itself """ return self def __invert__(self) -> "ColumnExpr": """Same as SQL ``NOT <col>`` :return: a new column with the boolean values """ return _NotOpExpr("~", self) def __add__(self, other: Any) -> "ColumnExpr": """Add with another column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("+", self, other) def __radd__(self, other: Any) -> "ColumnExpr": """Add with another column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("+", other, self) def __sub__(self, other: Any) -> "ColumnExpr": """Subtract another column from this column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("-", self, other) def __rsub__(self, other: Any) -> "ColumnExpr": """Subtract this column from the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("-", other, self) def __mul__(self, other: Any) -> "ColumnExpr": """Multiply with another column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("*", self, other) def __rmul__(self, other: Any) -> "ColumnExpr": """Multiply with another column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("*", other, self) def __truediv__(self, other: Any) -> "ColumnExpr": """Divide this column by the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("/", self, other) def __rtruediv__(self, other: Any) -> "ColumnExpr": """Divide the other column by this column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BinaryOpExpr("/", other, self) def __and__(self, other: Any) -> "ColumnExpr": """``AND`` value of the two columns :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BoolBinaryOpExpr("&", self, other) def __rand__(self, other: Any) -> "ColumnExpr": """``AND`` value of the two columns :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BoolBinaryOpExpr("&", other, self) def __or__(self, other: Any) -> "ColumnExpr": """``OR`` value of the two columns :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BoolBinaryOpExpr("|", self, other) def __ror__(self, other: Any) -> "ColumnExpr": """``OR`` value of the two columns :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the result """ return _BoolBinaryOpExpr("|", other, self) def __lt__(self, other: Any) -> "ColumnExpr": """Whether this column is less than the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the boolean result """ return _BoolBinaryOpExpr("<", self, other) def __gt__(self, other: Any) -> "ColumnExpr": """Whether this column is greater than the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the boolean result """ return _BoolBinaryOpExpr(">", self, other) def __le__(self, other: Any) -> "ColumnExpr": """Whether this column is less or equal to the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the boolean result """ return _BoolBinaryOpExpr("<=", self, other) def __ge__(self, other: Any) -> "ColumnExpr": """Whether this column is greater or equal to the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the boolean result """ return _BoolBinaryOpExpr(">=", self, other) def __eq__(self, other: Any) -> "ColumnExpr": # type: ignore """Whether this column equals the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the boolean result """ return _BoolBinaryOpExpr("==", self, other) def __ne__(self, other: Any) -> "ColumnExpr": # type: ignore """Whether this column does not equal the other column :param other: the other column, if it is not a :class:`~.ColumnExpr`, then the value will be converted to a literal (`lit(other)`) :return: a new column with the boolean result """ return _BoolBinaryOpExpr("!=", self, other) def __uuid__(self) -> str: """The unique id of this instance :return: the unique id """ return to_uuid( str(type(self)), self.as_name, self.as_type, *self._uuid_keys(), ) def _uuid_keys(self) -> List[Any]: # pragma: no cover raise NotImplementedError
[docs] def lit(obj: Any, alias: str = "") -> ColumnExpr: """Convert the ``obj`` to a literal column. Currently ``obj`` must be ``int``, ``bool``, ``float`` or ``str``, otherwise an exception will be raised :param obj: an arbitrary value :param alias: the alias of this literal column, defaults to "" (no alias) :return: a literal column expression .. admonition:: New Since :class: hint **0.6.0** .. admonition:: Examples .. code-block:: python import fugue.column import lit lit("abc") lit(100).alias("x") lit(100, "x") """ return ( _LiteralColumnExpr(obj) if alias == "" else _LiteralColumnExpr(obj).alias(alias) )
[docs] def null() -> ColumnExpr: """Equivalent to ``lit(None)``, the ``NULL`` value :return: ``lit(None)`` .. admonition:: New Since :class: hint **0.6.0** """ return lit(None)
[docs] def col(obj: Union[str, ColumnExpr], alias: str = "") -> ColumnExpr: """Convert the ``obj`` to a :class:`~.ColumnExpr` object :param obj: a string representing a column name or a :class:`~.ColumnExpr` object :param alias: the alias of this column, defaults to "" (no alias) :return: a literal column expression .. admonition:: New Since :class: hint **0.6.0** .. admonition:: Examples .. code-block:: python import fugue.column import col import fugue.column.functions as f col("a") col("a").alias("x") col("a", "x") # unary operations -col("a") # negative value of a ~col("a") # NOT a col("a").is_null() # a IS NULL col("a").not_null() # a IS NOT NULL # binary operations col("a") + 1 # col("a") + lit(1) 1 - col("a") # lit(1) - col("a") col("a") * col("b") col("a") / col("b") # binary boolean expressions col("a") == 1 # col("a") == lit(1) 2 != col("a") # col("a") != lit(2) col("a") < 5 col("a") > 5 col("a") <= 5 col("a") >= 5 (col("a") < col("b")) & (col("b") > 1) | col("c").is_null() # with functions f.max(col("a")) f.max(col("a")+col("b")) f.max(col("a")) + f.min(col("b")) f.count_distinct(col("a")).alias("dcount") """ if isinstance(obj, ColumnExpr): return obj if alias == "" else obj.alias(alias) if isinstance(obj, str): return ( _NamedColumnExpr(obj) if alias == "" else _NamedColumnExpr(obj).alias(alias) ) raise NotImplementedError(obj)
[docs] def all_cols() -> ColumnExpr: """The ``*`` expression in SQL""" return _WildcardExpr()
[docs] def function(name: str, *args: Any, arg_distinct: bool = False, **kwargs) -> ColumnExpr: """Construct a function expression :param name: the name of the function :param arg_distinct: whether to add ``DISTINCT`` before all arguments, defaults to False :return: the function expression .. caution:: Users should not use this directly """ return _FuncExpr(name, *args, arg_distinct=arg_distinct, **kwargs)
def _get_column_mentions(column: ColumnExpr) -> Iterable[str]: if isinstance(column, _NamedColumnExpr): yield column.name elif isinstance(column, _FuncExpr): for a in column.args: yield from _get_column_mentions(a) for a in column.kwargs.values(): yield from _get_column_mentions(a) def _to_col(obj: Any) -> ColumnExpr: if isinstance(obj, ColumnExpr): return obj return lit(obj) class _NamedColumnExpr(ColumnExpr): def __init__(self, name: Any): self._name = name super().__init__() @property def body_str(self) -> str: return self.name @property def name(self) -> str: return self._name @property def output_name(self) -> str: return super().output_name def alias(self, as_name: str) -> ColumnExpr: other = _NamedColumnExpr(self.name) other._as_name = as_name other._as_type = self.as_type return other def cast(self, data_type: Any) -> "ColumnExpr": other = _NamedColumnExpr(self.name) other._as_name = self.as_name other._as_type = None if data_type is None else to_pa_datatype(data_type) return other def infer_alias(self) -> ColumnExpr: if self.as_name == "" and self.as_type is not None: return self.alias(self.output_name) return self def infer_type(self, schema: Schema) -> Optional[pa.DataType]: if self.name not in schema: return self.as_type return self.as_type or schema[self.name].type def _uuid_keys(self) -> List[Any]: return [self.name] class _WildcardExpr(ColumnExpr): @property def body_str(self) -> str: return "*" @property def name(self) -> str: # pragma: no cover raise NotImplementedError("wildcard column doesn't have a name") @property def output_name(self) -> str: # pragma: no cover raise NotImplementedError("wildcard column doesn't have an output_name") def __uuid__(self) -> str: """The unique id of this instance :return: the unique id """ return to_uuid(str(type(self))) class _LiteralColumnExpr(ColumnExpr): _VALID_TYPES = (int, bool, float, str) def __init__(self, value: Any): assert_or_throw( value is None or isinstance(value, _LiteralColumnExpr._VALID_TYPES), lambda: NotImplementedError(f"{value}, type: {type(value)}"), ) self._value = value super().__init__() @property def body_str(self) -> str: if self.value is None: return "NULL" elif isinstance(self.value, str): body = self.value.translate( str.maketrans( { # type: ignore "\\": r"\\", "'": r"\'", } ) ) return f"'{body}'" elif isinstance(self.value, bool): return "TRUE" if self.value else "FALSE" else: return str(self.value) @property def value(self) -> Any: return self._value def is_null(self) -> ColumnExpr: return _LiteralColumnExpr(self.value is None) def not_null(self) -> ColumnExpr: return _LiteralColumnExpr(self.value is not None) def alias(self, as_name: str) -> ColumnExpr: other = _LiteralColumnExpr(self.value) other._as_name = as_name other._as_type = self.as_type return other def cast(self, data_type: Any) -> ColumnExpr: other = _LiteralColumnExpr(self.value) other._as_name = self.as_name other._as_type = None if data_type is None else to_pa_datatype(data_type) return other def infer_type(self, schema: Schema) -> Optional[pa.DataType]: if self.value is None: return self.as_type return self.as_type or to_pa_datatype(type(self.value)) def _uuid_keys(self) -> List[Any]: return [self.value] class _FuncExpr(ColumnExpr): def __init__( self, func: str, *args: Any, arg_distinct: bool = False, **kwargs: Any, ): self._distinct = arg_distinct self._func = func self._args = list(args) self._kwargs = dict(kwargs) super().__init__() @property def body_str(self) -> str: def to_str(v: Any): if isinstance(v, str): return f"'{v}'" if isinstance(v, bool): return "TRUE" if v else "FALSE" return str(v) a1 = [to_str(x) for x in self.args] a2 = [k + "=" + to_str(v) for k, v in self.kwargs.items()] args = ",".join(a1 + a2) distinct = "DISTINCT " if self.is_distinct else "" return f"{self.func}({distinct}{args})" @property def func(self) -> str: return self._func @property def is_distinct(self) -> bool: return self._distinct @property def args(self) -> List[Any]: return self._args @property def kwargs(self) -> Dict[str, Any]: return self._kwargs def alias(self, as_name: str) -> ColumnExpr: other = self._copy() other._as_name = as_name other._distinct = self.is_distinct other._as_type = self.as_type return other def cast(self, data_type: Any) -> ColumnExpr: other = self._copy() other._as_name = self.as_name other._distinct = self.is_distinct other._as_type = None if data_type is None else to_pa_datatype(data_type) return other def _copy(self) -> "_FuncExpr": return _FuncExpr(self.func, *self.args, **self.kwargs) def _uuid_keys(self) -> List[Any]: return [self.func, self.is_distinct, self.args, self.kwargs] class _UnaryOpExpr(_FuncExpr): def __init__(self, op: str, column: ColumnExpr, arg_distinct: bool = False): super().__init__(op, column, arg_distinct=arg_distinct) @property def col(self) -> ColumnExpr: return self.args[0] @property def op(self) -> str: return self.func def infer_alias(self) -> ColumnExpr: return ( self if self.output_name != "" else self.alias(self.col.infer_alias().output_name) ) def _copy(self) -> _FuncExpr: return _UnaryOpExpr(self.op, self.col) class _InvertOpExpr(_UnaryOpExpr): def _copy(self) -> _FuncExpr: return _InvertOpExpr(self.op, self.col) def infer_type(self, schema: Schema) -> Optional[pa.DataType]: if self.as_type is not None: return self.as_type tp = self.col.infer_type(schema) if pa.types.is_signed_integer(tp) or pa.types.is_floating(tp): return tp return None class _NotOpExpr(_UnaryOpExpr): def _copy(self) -> _FuncExpr: return _NotOpExpr(self.op, self.col) def infer_type(self, schema: Schema) -> Optional[pa.DataType]: if self.as_type is not None: return self.as_type tp = self.col.infer_type(schema) if pa.types.is_boolean(tp): return tp return None class _BinaryOpExpr(_FuncExpr): def __init__(self, op: str, left: Any, right: Any, arg_distinct: bool = False): super().__init__(op, _to_col(left), _to_col(right), arg_distinct=arg_distinct) @property def left(self) -> ColumnExpr: return self.args[0] @property def right(self) -> ColumnExpr: return self.args[1] @property def op(self) -> str: return self.func def _copy(self) -> _FuncExpr: return _BinaryOpExpr(self.op, self.left, self.right) class _BoolBinaryOpExpr(_BinaryOpExpr): def _copy(self) -> _FuncExpr: return _BoolBinaryOpExpr(self.op, self.left, self.right) def infer_type(self, schema: Schema) -> Optional[pa.DataType]: return self.as_type or pa.bool_()