fugue.collections#

fugue.collections.partition#

class fugue.collections.partition.PartitionCursor(schema, spec, physical_partition_no)[source]#

Bases: object

The cursor pointing at the first row of each logical partition inside a physical partition.

It’s important to understand the concept of partition, please read the Partition Tutorial

Parameters
property key_schema: triad.collections.schema.Schema#

Partition key schema

property key_value_array: List[Any]#

Based on current row, get the partition key values as an array

property key_value_dict: Dict[str, Any]#

Based on current row, get the partition key values as a dict

property partition_no: int#

Logical partition number

property physical_partition_no: int#

Physical partition number

property row: List[Any]#

Get current row data

property row_schema: triad.collections.schema.Schema#

Schema of the current row

set(row, partition_no, slice_no)[source]#

reset the cursor to a row (which should be the first row of a new logical partition)

Parameters
  • row (Any) – list-like row data

  • partition_no (int) – logical partition number

  • slice_no (int) – slice number inside the logical partition (to be deprecated)

Return type

None

property slice_no: int#

Slice number (inside the current logical partition), for now it should always be 0

class fugue.collections.partition.PartitionSpec(*args, **kwargs)[source]#

Bases: object

Fugue Partition Specification.

Examples

>>> PartitionSepc(num=4)
>>> PartitionSepc(num="ROWCOUNT/4 + 3")  # It can be an expression
>>> PartitionSepc(by=["a","b"])
>>> PartitionSpec(by=["a"], presort="b DESC, c ASC")
>>> PartitionSpec(algo="even", num=4)
>>> p = PartitionSpec(num=4, by=["a"])
>>> p_override = PartitionSpec(p, by=["a","b"], algo="even")
>>> PartitionSpec(by="a")  # == PartitionSpec(by=["a"])
>>> PartitionSpec("per_row")  # == PartitionSpec(num="ROWCOUNT", algo="even")

It’s important to understand this concept, please read the Partition Tutorial

Partition consists for these specs:

  • algo: can be one of hash (default), rand and even

  • num or num_partitions: number of physical partitions, it can be an expression or integer numbers, e.g (ROWCOUNT+4) / 3

  • by or partition_by: keys to partition on

  • presort: keys to sort other than partition keys. E.g. a and a asc means presort by column a ascendingly, a,b desc means presort by a ascendingly and then by b descendingly.

  • row_limit and size_limit are to be deprecated

Parameters
  • args (Any) –

  • kwargs (Any) –

property algo: str#

Get algo of the spec, one of hash (default), rand and even

property empty: bool#

Whether this spec didn’t specify anything

get_cursor(schema, physical_partition_no)[source]#

Get PartitionCursor based on dataframe schema and physical partition number. You normally don’t call this method directly

Parameters
Returns

PartitionCursor object

Return type

fugue.collections.partition.PartitionCursor

get_key_schema(schema)[source]#

Get partition keys schema

Parameters

schema (triad.collections.schema.Schema) – the dataframe schema this partition spec to operate on

Returns

the sub-schema only containing partition keys

Return type

triad.collections.schema.Schema

get_num_partitions(**expr_map_funcs)[source]#

Convert num_partitions expression to int number

Parameters

expr_map_funcs (Any) – lambda functions (no parameter) for keywords

Returns

integer value of the partitions

Return type

int

Examples

>>> p = PartitionSpec(num="ROWCOUNT/2")
>>> p.get_num_partitions(ROWCOUNT=lambda: df.count())
get_partitioner(schema)[source]#

Get SchemaedDataPartitioner by input dataframe schema

Parameters

schema (triad.collections.schema.Schema) – the dataframe schema this partition spec to operate on

Returns

SchemaedDataPartitioner object

Return type

triad.utils.pyarrow.SchemaedDataPartitioner

get_sorts(schema)[source]#

Get keys for sorting in a partition, it’s the combination of partition keys plus the presort keys

Parameters

schema (triad.collections.schema.Schema) – the dataframe schema this partition spec to operate on

Returns

an ordered dictionary of key, order pairs

Return type

triad.collections.dict.IndexedOrderedDict[str, bool]

Examples

>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> schema = Schema("a:int,b:int,c:int,d:int"))
>>> assert p.get_sorts(schema) == {"a":True, "b":True, "c": False}
property jsondict: triad.collections.dict.ParamDict#

Get json serializeable dict of the spec

property num_partitions: str#

Number of partitions, it can be a string expression or int

property partition_by: List[str]#

Get partition keys of the spec

property presort: triad.collections.dict.IndexedOrderedDict[str, bool]#

Get presort pairs of the spec

Examples

>>> p = PartitionSpec(by=["a"],presort="b,c desc")
>>> assert p.presort == {"b":True, "c":False}
property presort_expr: str#

Get normalized presort expression

Examples

>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> assert p.presort_expr == "b ASC,c DESC"
fugue.collections.partition.parse_presort_exp(presort)[source]#

Returns ordered column sorting direction where ascending order would return as true, and descending as false.

Parameters

presort (Any) – string that contains column and sorting direction or list of tuple that contains column and boolean sorting direction

Returns

column and boolean sorting direction of column, order matters.

Return type

IndexedOrderedDict[str, bool]

Examples

>>> parse_presort_exp("b desc, c asc")
>>> parse_presort_exp([("b", True), ("c", False))])
both return IndexedOrderedDict([("b", True), ("c", False))])

fugue.collections.yielded#

class fugue.collections.yielded.Yielded(yid)[source]#

Bases: object

Yields from FugueWorkflow. Users shouldn’t create this object directly.

Parameters

yid (str) – unique id for determinism

property is_set: bool#

Whether the value is set. It can be false if the parent workflow has not been executed.

class fugue.collections.yielded.YieldedFile(yid)[source]#

Bases: fugue.collections.yielded.Yielded

Yielded file from FugueWorkflow. Users shouldn’t create this object directly.

Parameters

yid (str) – unique id for determinism

property is_set: bool#

Whether the value is set. It can be false if the parent workflow has not been executed.

property path: str#

File path of the yield

set_value(path)[source]#

Set the yielded path after compute

Parameters

path (str) – file path

Return type

None