fugue.collections

fugue.collections.partition

class fugue.collections.partition.BagPartitionCursor(physical_partition_no)[source]

Bases: DatasetPartitionCursor

The cursor pointing at the first bag item of each logical partition inside a physical partition.

It’s important to understand the concept of partition, please read the Partition Tutorial

Parameters:

physical_partition_no (int) – physical partition number passed in by ExecutionEngine

class fugue.collections.partition.DatasetPartitionCursor(physical_partition_no)[source]

Bases: object

The cursor pointing at the first item of each logical partition inside a physical partition.

It’s important to understand the concept of partition, please read the Partition Tutorial

Parameters:

physical_partition_no (int) – physical partition number passed in by ExecutionEngine

property item: Any

Get current item

property partition_no: int

Logical partition number

property physical_partition_no: int

Physical partition number

set(item, partition_no, slice_no)[source]

reset the cursor to a row (which should be the first row of a new logical partition)

Parameters:
  • item (Any) – an item of the dataset, or an function generating the item

  • partition_no (int) – logical partition number

  • slice_no (int) – slice number inside the logical partition (to be deprecated)

Return type:

None

property slice_no: int

Slice number (inside the current logical partition), for now it should always be 0

class fugue.collections.partition.PartitionCursor(schema, spec, physical_partition_no)[source]

Bases: DatasetPartitionCursor

The cursor pointing at the first row of each logical partition inside a physical partition.

It’s important to understand the concept of partition, please read the Partition Tutorial

Parameters:
  • schema (Schema) – input dataframe schema

  • spec (PartitionSpec) – partition spec

  • physical_partition_no (int) – physical partition number passed in by ExecutionEngine

property key_schema: Schema

Partition key schema

property key_value_array: List[Any]

Based on current row, get the partition key values as an array

property key_value_dict: Dict[str, Any]

Based on current row, get the partition key values as a dict

property row: List[Any]

Get current row data

property row_schema: Schema

Schema of the current row

set(row, partition_no, slice_no)[source]

reset the cursor to a row (which should be the first row of a new logical partition)

Parameters:
  • row (Any) – list-like row data or a function generating a list-like row

  • partition_no (int) – logical partition number

  • slice_no (int) – slice number inside the logical partition (to be deprecated)

Return type:

None

class fugue.collections.partition.PartitionSpec(*args, **kwargs)[source]

Bases: object

Fugue Partition Specification.

Examples

>>> PartitionSepc(num=4)
>>> PartitionSepc(4)  # == PartitionSepc(num=4)
>>> PartitionSepc(num="ROWCOUNT/4 + 3")  # It can be an expression
>>> PartitionSepc(by=["a","b"])
>>> PartitionSepc(["a","b"])  # == PartitionSepc(by=["a","b"])
>>> PartitionSpec(by=["a"], presort="b DESC, c ASC")
>>> PartitionSpec(algo="even", num=4)
>>> p = PartitionSpec(num=4, by=["a"])
>>> p_override = PartitionSpec(p, by=["a","b"], algo="even")
>>> PartitionSpec(by="a")  # == PartitionSpec(by=["a"])
>>> PartitionSpec("a")  # == PartitionSpec(by=["a"])
>>> PartitionSpec("per_row")  # == PartitionSpec(num="ROWCOUNT", algo="even")

It’s important to understand this concept, please read the Partition Tutorial

Partition consists for these specs:

  • algo: can be one of default, hash, rand, even or coarse

  • num or num_partitions: number of physical partitions, it can be an expression or integer numbers, e.g (ROWCOUNT+4) / 3

  • by or partition_by: keys to partition on

  • presort: keys to sort other than partition keys. E.g. a and a asc means presort by column a ascendingly, a,b desc means presort by a ascendingly and then by b descendingly.

  • row_limit and size_limit are to be deprecated

Parameters:
  • args (Any) –

  • kwargs (Any) –

property algo: str

Get algo of the spec, one of default, hash, rand even or coarse

property empty: bool

Whether this spec didn’t specify anything

get_cursor(schema, physical_partition_no)[source]

Get PartitionCursor based on dataframe schema and physical partition number. You normally don’t call this method directly

Parameters:
  • schema (Schema) – the dataframe schema this partition spec to operate on

  • physical_partition_no (int) – physical partition no passed in by ExecutionEngine

Returns:

PartitionCursor object

Return type:

PartitionCursor

get_key_schema(schema)[source]

Get partition keys schema

Parameters:

schema (Schema) – the dataframe schema this partition spec to operate on

Returns:

the sub-schema only containing partition keys

Return type:

Schema

get_num_partitions(**expr_map_funcs)[source]

Convert num_partitions expression to int number

Parameters:

expr_map_funcs (Any) – lambda functions (no parameter) for keywords

Returns:

integer value of the partitions

Return type:

int

Examples

>>> p = PartitionSpec(num="ROWCOUNT/2")
>>> p.get_num_partitions(ROWCOUNT=lambda: df.count())
get_partitioner(schema)[source]

Get SchemaedDataPartitioner by input dataframe schema

Parameters:

schema (Schema) – the dataframe schema this partition spec to operate on

Returns:

SchemaedDataPartitioner object

Return type:

SchemaedDataPartitioner

get_sorts(schema, with_partition_keys=True)[source]

Get keys for sorting in a partition, it’s the combination of partition keys plus the presort keys

Parameters:
  • schema (Schema) – the dataframe schema this partition spec to operate on

  • with_partition_keys (bool) – whether to include partition keys

Returns:

an ordered dictionary of key, order pairs

Return type:

IndexedOrderedDict[str, bool]

Examples

>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> schema = Schema("a:int,b:int,c:int,d:int"))
>>> assert p.get_sorts(schema) == {"a":True, "b":True, "c": False}
property jsondict: ParamDict

Get json serializeable dict of the spec

property num_partitions: str

Number of partitions, it can be a string expression or int

property partition_by: List[str]

Get partition keys of the spec

property presort: IndexedOrderedDict[str, bool]

Get presort pairs of the spec

Examples

>>> p = PartitionSpec(by=["a"],presort="b,c desc")
>>> assert p.presort == {"b":True, "c":False}
property presort_expr: str

Get normalized presort expression

Examples

>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> assert p.presort_expr == "b ASC,c DESC"
fugue.collections.partition.parse_presort_exp(presort)[source]

Returns ordered column sorting direction where ascending order would return as true, and descending as false.

Parameters:

presort (Any) – string that contains column and sorting direction or list of tuple that contains column and boolean sorting direction

Returns:

column and boolean sorting direction of column, order matters.

Return type:

IndexedOrderedDict[str, bool]

Examples

>>> parse_presort_exp("b desc, c asc")
>>> parse_presort_exp([("b", True), ("c", False))])
both return IndexedOrderedDict([("b", True), ("c", False))])

fugue.collections.sql

class fugue.collections.sql.StructuredRawSQL(statements, dialect=None)[source]

Bases: object

The Raw SQL object containing table references and dialect information.

Parameters:
  • statements (Iterable[Tuple[bool, str]]) – In each tuple, the first value indicates whether the second value is a dataframe name reference (True), or just a part of the statement (False)

  • dialect (str | None) – the dialect of the statements, defaults to None

Note

dialect None means no transpilation will be done when constructing the final sql.

construct(name_map=None, dialect=None, log=None)[source]

Construct the final SQL given the dialect

Parameters:
  • name_map (None | Callable[[str], str] | Dict[str, str]) – the name map from the original statement to the expected names, defaults to None. It can be a function or a dictionary

  • dialect (str | None) – the expected dialect, defaults to None

  • log (Logger | None) – the logger to log information, defaults to None

Returns:

the final SQL string

property dialect: str | None

The dialect of this query

static from_expr(sql, prefix='<tmpdf:', suffix='>', dialect=None)[source]

Parse the StructuredRawSQL from the sql expression. The sql should look like SELECT * FROM <tmpdf:dfname>. This function can identify the tmpdfs with the given syntax, and construct the StructuredRawSQL

Parameters:
  • sql (str) – the SQL expression with <tmpdf:?>

  • prefix (str) – the prefix of the temp df

  • suffix (str) – the suffix of the temp df

  • dialect (str | None) – the dialect of the sql expression, defaults to None

Returns:

the parsed object

Return type:

StructuredRawSQL

class fugue.collections.sql.TempTableName[source]

Bases: object

Generating a temporary, random and globaly unique table name

fugue.collections.yielded

class fugue.collections.yielded.PhysicalYielded(yid, storage_type)[source]

Bases: Yielded

Physical yielded object from FugueWorkflow. Users shouldn’t create this object directly.

Parameters:
  • yid (str) – unique id for determinism

  • storage_type (str) – file or table

property is_set: bool

Whether the value is set. It can be false if the parent workflow has not been executed.

property name: str

The name reference of the yield

set_value(name)[source]

Set the storage name after compute

Parameters:

name (str) – name reference of the storage

Return type:

None

property storage_type: str

The storage type of this yield

class fugue.collections.yielded.Yielded(yid)[source]

Bases: object

Yields from FugueWorkflow. Users shouldn’t create this object directly.

Parameters:

yid (str) – unique id for determinism

property is_set: bool

Whether the value is set. It can be false if the parent workflow has not been executed.