import json
from typing import Any, Dict, List, Tuple
from triad.collections.dict import IndexedOrderedDict, ParamDict
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw as aot
from triad.utils.convert import to_size
from triad.utils.hash import to_uuid
from triad.utils.pyarrow import SchemaedDataPartitioner
from triad.utils.schema import safe_split_out_of_quote, unquote_name
[docs]
def parse_presort_exp(presort: Any) -> IndexedOrderedDict[str, bool]: # noqa [C901]
"""Returns ordered column sorting direction where ascending order
would return as true, and descending as false.
:param presort: string that contains column and sorting direction or
list of tuple that contains column and boolean sorting direction
:type presort: Any
:return: column and boolean sorting direction of column, order matters.
:rtype: IndexedOrderedDict[str, bool]
.. admonition:: Examples
>>> parse_presort_exp("b desc, c asc")
>>> parse_presort_exp([("b", True), ("c", False))])
both return IndexedOrderedDict([("b", True), ("c", False))])
"""
if isinstance(presort, IndexedOrderedDict):
return presort
presort_list: List[Tuple[str, bool]] = []
res: IndexedOrderedDict[str, bool] = IndexedOrderedDict()
if presort is None:
return res
elif isinstance(presort, str):
presort = presort.strip()
if presort == "":
return res
for p in safe_split_out_of_quote(presort, ","):
pp = safe_split_out_of_quote(p.strip(), " ", max_split=1)
key = unquote_name(pp[0].strip())
if len(pp) == 1:
presort_list.append((key, True))
elif len(pp) == 2:
if pp[1].strip().lower() == "asc":
presort_list.append((key, True))
elif pp[1].strip().lower() == "desc":
presort_list.append((key, False))
else:
raise SyntaxError(f"Invalid expression {presort}")
else: # pragma: no cover
# impossible
raise SyntaxError(f"Invalid expression {presort}")
elif isinstance(presort, list):
for p in presort:
if isinstance(p, str):
presort_list.append((p, True))
else:
aot(len(p) == 2, SyntaxError(f"Invalid expression {presort}"))
aot(
isinstance(p, tuple)
& (isinstance(p[0], str) & (isinstance(p[1], bool))),
SyntaxError(f"Invalid expression {presort}"),
)
presort_list.append((p[0], p[1]))
for key, value in presort_list:
if key in res:
raise SyntaxError(f"Invalid expression {presort} duplicated key {key}")
res[key] = value
return res
[docs]
class PartitionSpec(object):
"""Fugue Partition Specification.
.. admonition:: Examples
>>> PartitionSepc(num=4)
>>> PartitionSepc(4) # == PartitionSepc(num=4)
>>> PartitionSepc(num="ROWCOUNT/4 + 3") # It can be an expression
>>> PartitionSepc(by=["a","b"])
>>> PartitionSepc(["a","b"]) # == PartitionSepc(by=["a","b"])
>>> PartitionSpec(by=["a"], presort="b DESC, c ASC")
>>> PartitionSpec(algo="even", num=4)
>>> p = PartitionSpec(num=4, by=["a"])
>>> p_override = PartitionSpec(p, by=["a","b"], algo="even")
>>> PartitionSpec(by="a") # == PartitionSpec(by=["a"])
>>> PartitionSpec("a") # == PartitionSpec(by=["a"])
>>> PartitionSpec("per_row") # == PartitionSpec(num="ROWCOUNT", algo="even")
It's important to understand this concept, please read |PartitionTutorial|
Partition consists for these specs:
* **algo**: can be one of ``default``, ``hash``, ``rand``, ``even`` or ``coarse``
* **num** or **num_partitions**: number of physical partitions, it can be an
expression or integer numbers, e.g ``(ROWCOUNT+4) / 3``
* **by** or **partition_by**: keys to partition on
* **presort**: keys to sort other than partition keys. E.g. ``a``
and ``a asc`` means presort by column a ascendingly, ``a,b desc``
means presort by a ascendingly and then by b descendingly.
* row_limit and size_limit are to be deprecated
"""
def __init__(self, *args: Any, **kwargs: Any): # noqa: C901
p = ParamDict()
if len(args) == 1 and len(kwargs) == 0:
if isinstance(args[0], str):
if args[0].lower() == "per_row":
p["algo"] = "even"
p["num_partitions"] = "ROWCOUNT"
elif not args[0].startswith("{"):
p["partition_by"] = [args[0]]
elif isinstance(args[0], int):
p["num_partitions"] = str(args[0])
elif isinstance(args[0], (list, tuple)):
p["partition_by"] = args[0]
if len(p) == 0: # the first condition had no match
for a in args:
if a is None:
continue
elif isinstance(a, PartitionSpec):
self._update_dict(p, a.jsondict)
elif isinstance(a, Dict):
self._update_dict(p, a)
elif isinstance(a, str):
self._update_dict(p, json.loads(a))
else:
raise TypeError(f"{a} is not supported")
self._update_dict(p, kwargs)
self._num_partitions = p.get("num_partitions", "0")
self._algo = p.get("algo", "").lower()
if "partition_by" not in p:
self._partition_by: List[str] = []
elif isinstance(p["partition_by"], str):
self._partition_by = [p["partition_by"]]
elif isinstance(p["partition_by"], (list, tuple)):
self._partition_by = list(p["partition_by"])
else:
raise SyntaxError(p["partition_by"])
aot(
len(self._partition_by) == len(set(self._partition_by)),
SyntaxError(f"{self._partition_by} has duplicated keys"),
)
self._presort = parse_presort_exp(p.get_or_none("presort", object))
if any(x in self._presort for x in self._partition_by):
raise SyntaxError(
"partition by overlap with presort: "
+ f"{self._partition_by}, {self._presort}"
)
# TODO: currently, size limit not in use
self._size_limit = to_size(p.get("size_limit", "0"))
self._row_limit = p.get("row_limit", 0)
def __repr__(self) -> str:
return (
f"PartitionSpec(num='{self._num_partitions}', "
+ f"by={self._partition_by}, presort='{self.presort_expr}')"
)
def __eq__(self, other: Any) -> bool:
if other is self:
return True
if not isinstance(other, PartitionSpec):
other = PartitionSpec(other)
return self.jsondict == other.jsondict
@property
def empty(self) -> bool:
"""Whether this spec didn't specify anything"""
return (
self._num_partitions == "0"
and self._algo == ""
and len(self._partition_by) == 0
and len(self._presort) == 0
and self._size_limit == 0
and self._row_limit == 0
)
@property
def num_partitions(self) -> str:
"""Number of partitions, it can be a string expression or int"""
return self._num_partitions
[docs]
def get_num_partitions(self, **expr_map_funcs: Any) -> int:
"""Convert ``num_partitions`` expression to int number
:param expr_map_funcs: lambda functions (no parameter) for keywords
:return: integer value of the partitions
.. admonition:: Examples
>>> p = PartitionSpec(num="ROWCOUNT/2")
>>> p.get_num_partitions(ROWCOUNT=lambda: df.count())
"""
expr = self.num_partitions
for k, v in expr_map_funcs.items():
if k in expr:
value = str(v())
expr = expr.replace(k, value)
return int(eval(expr)) # pylint: disable=W0123
@property
def algo(self) -> str:
"""Get algo of the spec, one of ``default``, ``hash``,
``rand`` ``even`` or ``coarse``
"""
return self._algo if self._algo != "" else "default"
@property
def partition_by(self) -> List[str]:
"""Get partition keys of the spec"""
return self._partition_by
@property
def presort(self) -> IndexedOrderedDict[str, bool]:
"""Get presort pairs of the spec
.. admonition:: Examples
>>> p = PartitionSpec(by=["a"],presort="b,c desc")
>>> assert p.presort == {"b":True, "c":False}
"""
return self._presort
@property
def presort_expr(self) -> str:
"""Get normalized presort expression
.. admonition:: Examples
>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> assert p.presort_expr == "b ASC,c DESC"
"""
return ",".join(
k + " " + ("ASC" if v else "DESC") for k, v in self.presort.items()
)
@property
def jsondict(self) -> ParamDict:
"""Get json serializeable dict of the spec"""
return ParamDict(
dict(
num_partitions=self._num_partitions,
algo=self._algo,
partition_by=self._partition_by,
presort=self.presort_expr,
size_limit=self._size_limit,
row_limit=self._row_limit,
)
)
def __uuid__(self) -> str:
"""Get deterministic unique id of this object"""
return to_uuid(self.jsondict)
[docs]
def get_sorts(
self, schema: Schema, with_partition_keys: bool = True
) -> IndexedOrderedDict[str, bool]:
"""Get keys for sorting in a partition, it's the combination of partition
keys plus the presort keys
:param schema: the dataframe schema this partition spec to operate on
:param with_partition_keys: whether to include partition keys
:return: an ordered dictionary of key, order pairs
.. admonition:: Examples
>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> schema = Schema("a:int,b:int,c:int,d:int"))
>>> assert p.get_sorts(schema) == {"a":True, "b":True, "c": False}
"""
d: IndexedOrderedDict[str, bool] = IndexedOrderedDict()
if with_partition_keys:
for p in self.partition_by:
aot(p in schema, lambda: KeyError(f"{p} not in {schema}"))
d[p] = True
for p, v in self.presort.items():
aot(p in schema, lambda: KeyError(f"{p} not in {schema}"))
d[p] = v
return d
[docs]
def get_key_schema(self, schema: Schema) -> Schema:
"""Get partition keys schema
:param schema: the dataframe schema this partition spec to operate on
:return: the sub-schema only containing partition keys
"""
return schema.extract(self.partition_by)
[docs]
def get_cursor(
self, schema: Schema, physical_partition_no: int
) -> "PartitionCursor":
"""Get :class:`.PartitionCursor` based on
dataframe schema and physical partition number. You normally don't call
this method directly
:param schema: the dataframe schema this partition spec to operate on
:param physical_partition_no: physical partition no passed in by
:class:`~fugue.execution.execution_engine.ExecutionEngine`
:return: PartitionCursor object
"""
return PartitionCursor(schema, self, physical_partition_no)
[docs]
def get_partitioner(self, schema: Schema) -> SchemaedDataPartitioner:
"""Get :class:`~triad.utils.pyarrow.SchemaedDataPartitioner` by input
dataframe schema
:param schema: the dataframe schema this partition spec to operate on
:return: SchemaedDataPartitioner object
"""
pos = [schema.index_of_key(key) for key in self.partition_by]
return SchemaedDataPartitioner(
schema.pa_schema,
pos,
sizer=None,
row_limit=self._row_limit,
size_limit=self._size_limit,
)
def _update_dict(self, d: Dict[str, Any], u: Dict[str, Any]) -> None:
for k, v in u.items():
if k == "by":
k = "partition_by"
if k == "num":
k = "num_partitions"
d[k] = v
[docs]
class DatasetPartitionCursor:
"""The cursor pointing at the first item of each logical partition inside
a physical partition.
It's important to understand the concept of partition, please read
|PartitionTutorial|
:param physical_partition_no: physical partition number passed in by
:class:`~fugue.execution.execution_engine.ExecutionEngine`
"""
def __init__(self, physical_partition_no: int):
self._physical_partition_no = physical_partition_no
# the following will be set by the framework
self._partition_no = 0
self._slice_no = 0
[docs]
def set(self, item: Any, partition_no: int, slice_no: int) -> None:
"""reset the cursor to a row (which should be the first row of a
new logical partition)
:param item: an item of the dataset, or an function generating the item
:param partition_no: logical partition number
:param slice_no: slice number inside the logical partition (to be deprecated)
"""
self._item = item
self._partition_no = partition_no
self._slice_no = slice_no
@property
def item(self) -> Any:
"""Get current item"""
if callable(self._item):
self._item = self._item()
return self._item
@property
def partition_no(self) -> int:
"""Logical partition number"""
return self._partition_no
@property
def physical_partition_no(self) -> int:
"""Physical partition number"""
return self._physical_partition_no
@property
def slice_no(self) -> int:
"""Slice number (inside the current logical partition), for now
it should always be 0
"""
return self._slice_no
[docs]
class BagPartitionCursor(DatasetPartitionCursor):
"""The cursor pointing at the first bag item of each logical partition inside
a physical partition.
It's important to understand the concept of partition, please read
|PartitionTutorial|
:param physical_partition_no: physical partition number passed in by
:class:`~fugue.execution.execution_engine.ExecutionEngine`
"""
pass
[docs]
class PartitionCursor(DatasetPartitionCursor):
"""The cursor pointing at the first row of each logical partition inside
a physical partition.
It's important to understand the concept of partition, please read
|PartitionTutorial|
:param schema: input dataframe schema
:param spec: partition spec
:param physical_partition_no: physical partition number passed in by
:class:`~fugue.execution.execution_engine.ExecutionEngine`
"""
def __init__(self, schema: Schema, spec: PartitionSpec, physical_partition_no: int):
self._orig_schema = schema
self._key_index = [schema.index_of_key(key) for key in spec.partition_by]
self._schema = schema.extract(spec.partition_by)
self._physical_partition_no = physical_partition_no
super().__init__(physical_partition_no=physical_partition_no)
[docs]
def set(self, row: Any, partition_no: int, slice_no: int) -> None:
"""reset the cursor to a row (which should be the first row of a
new logical partition)
:param row: list-like row data or a function generating a list-like row
:param partition_no: logical partition number
:param slice_no: slice number inside the logical partition (to be deprecated)
"""
super().set(
list(row) if not callable(row) else lambda: list(row()),
partition_no=partition_no,
slice_no=slice_no,
)
@property
def row(self) -> List[Any]:
"""Get current row data"""
return self.item
@property
def row_schema(self) -> Schema:
"""Schema of the current row"""
return self._orig_schema
@property
def key_schema(self) -> Schema:
"""Partition key schema"""
return self._schema
@property
def key_value_dict(self) -> Dict[str, Any]:
"""Based on current row, get the partition key values as a dict"""
return {self.row_schema.names[i]: self.row[i] for i in self._key_index}
@property
def key_value_array(self) -> List[Any]:
"""Based on current row, get the partition key values as an array"""
return [self.row[i] for i in self._key_index]
def __getitem__(self, key: str) -> Any:
"""Get value by key from the current row
:param key: column name
:return: value in the column
"""
return self.row[self.row_schema.index_of_key(key)]