fugue.collections#

fugue.collections.partition#

class fugue.collections.partition.BagPartitionCursor(physical_partition_no)[source]#

Bases: DatasetPartitionCursor

The cursor pointing at the first bag item of each logical partition inside a physical partition.

It’s important to understand the concept of partition, please read the Partition Tutorial

Parameters

physical_partition_no (int) – physical partition number passed in by ExecutionEngine

class fugue.collections.partition.DatasetPartitionCursor(physical_partition_no)[source]#

Bases: object

The cursor pointing at the first item of each logical partition inside a physical partition.

It’s important to understand the concept of partition, please read the Partition Tutorial

Parameters

physical_partition_no (int) – physical partition number passed in by ExecutionEngine

property item: Any#

Get current item

property partition_no: int#

Logical partition number

property physical_partition_no: int#

Physical partition number

set(item, partition_no, slice_no)[source]#

reset the cursor to a row (which should be the first row of a new logical partition)

Parameters
  • item (Any) – an item of the dataset, or an function generating the item

  • partition_no (int) – logical partition number

  • slice_no (int) – slice number inside the logical partition (to be deprecated)

Return type

None

property slice_no: int#

Slice number (inside the current logical partition), for now it should always be 0

class fugue.collections.partition.PartitionCursor(schema, spec, physical_partition_no)[source]#

Bases: DatasetPartitionCursor

The cursor pointing at the first row of each logical partition inside a physical partition.

It’s important to understand the concept of partition, please read the Partition Tutorial

Parameters
  • schema (Schema) – input dataframe schema

  • spec (PartitionSpec) – partition spec

  • physical_partition_no (int) – physical partition number passed in by ExecutionEngine

property key_schema: Schema#

Partition key schema

property key_value_array: List[Any]#

Based on current row, get the partition key values as an array

property key_value_dict: Dict[str, Any]#

Based on current row, get the partition key values as a dict

property row: List[Any]#

Get current row data

property row_schema: Schema#

Schema of the current row

set(row, partition_no, slice_no)[source]#

reset the cursor to a row (which should be the first row of a new logical partition)

Parameters
  • row (Any) – list-like row data or a function generating a list-like row

  • partition_no (int) – logical partition number

  • slice_no (int) – slice number inside the logical partition (to be deprecated)

Return type

None

class fugue.collections.partition.PartitionSpec(*args, **kwargs)[source]#

Bases: object

Fugue Partition Specification.

Examples

>>> PartitionSepc(num=4)
>>> PartitionSepc(4)  # == PartitionSepc(num=4)
>>> PartitionSepc(num="ROWCOUNT/4 + 3")  # It can be an expression
>>> PartitionSepc(by=["a","b"])
>>> PartitionSepc(["a","b"])  # == PartitionSepc(by=["a","b"])
>>> PartitionSpec(by=["a"], presort="b DESC, c ASC")
>>> PartitionSpec(algo="even", num=4)
>>> p = PartitionSpec(num=4, by=["a"])
>>> p_override = PartitionSpec(p, by=["a","b"], algo="even")
>>> PartitionSpec(by="a")  # == PartitionSpec(by=["a"])
>>> PartitionSpec("a")  # == PartitionSpec(by=["a"])
>>> PartitionSpec("per_row")  # == PartitionSpec(num="ROWCOUNT", algo="even")

It’s important to understand this concept, please read the Partition Tutorial

Partition consists for these specs:

  • algo: can be one of hash (default), rand, even or coarse

  • num or num_partitions: number of physical partitions, it can be an expression or integer numbers, e.g (ROWCOUNT+4) / 3

  • by or partition_by: keys to partition on

  • presort: keys to sort other than partition keys. E.g. a and a asc means presort by column a ascendingly, a,b desc means presort by a ascendingly and then by b descendingly.

  • row_limit and size_limit are to be deprecated

Parameters
  • args (Any) –

  • kwargs (Any) –

property algo: str#

Get algo of the spec, one of hash (default), rand even or coarse

property empty: bool#

Whether this spec didn’t specify anything

get_cursor(schema, physical_partition_no)[source]#

Get PartitionCursor based on dataframe schema and physical partition number. You normally don’t call this method directly

Parameters
  • schema (Schema) – the dataframe schema this partition spec to operate on

  • physical_partition_no (int) – physical partition no passed in by ExecutionEngine

Returns

PartitionCursor object

Return type

PartitionCursor

get_key_schema(schema)[source]#

Get partition keys schema

Parameters

schema (Schema) – the dataframe schema this partition spec to operate on

Returns

the sub-schema only containing partition keys

Return type

Schema

get_num_partitions(**expr_map_funcs)[source]#

Convert num_partitions expression to int number

Parameters

expr_map_funcs (Any) – lambda functions (no parameter) for keywords

Returns

integer value of the partitions

Return type

int

Examples

>>> p = PartitionSpec(num="ROWCOUNT/2")
>>> p.get_num_partitions(ROWCOUNT=lambda: df.count())
get_partitioner(schema)[source]#

Get SchemaedDataPartitioner by input dataframe schema

Parameters

schema (Schema) – the dataframe schema this partition spec to operate on

Returns

SchemaedDataPartitioner object

Return type

SchemaedDataPartitioner

get_sorts(schema, with_partition_keys=True)[source]#

Get keys for sorting in a partition, it’s the combination of partition keys plus the presort keys

Parameters
  • schema (Schema) – the dataframe schema this partition spec to operate on

  • with_partition_keys (bool) – whether to include partition keys

Returns

an ordered dictionary of key, order pairs

Return type

IndexedOrderedDict[str, bool]

Examples

>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> schema = Schema("a:int,b:int,c:int,d:int"))
>>> assert p.get_sorts(schema) == {"a":True, "b":True, "c": False}
property jsondict: ParamDict#

Get json serializeable dict of the spec

property num_partitions: str#

Number of partitions, it can be a string expression or int

property partition_by: List[str]#

Get partition keys of the spec

property presort: IndexedOrderedDict[str, bool]#

Get presort pairs of the spec

Examples

>>> p = PartitionSpec(by=["a"],presort="b,c desc")
>>> assert p.presort == {"b":True, "c":False}
property presort_expr: str#

Get normalized presort expression

Examples

>>> p = PartitionSpec(by=["a"],presort="b , c dESc")
>>> assert p.presort_expr == "b ASC,c DESC"
fugue.collections.partition.parse_presort_exp(presort)[source]#

Returns ordered column sorting direction where ascending order would return as true, and descending as false.

Parameters

presort (Any) – string that contains column and sorting direction or list of tuple that contains column and boolean sorting direction

Returns

column and boolean sorting direction of column, order matters.

Return type

IndexedOrderedDict[str, bool]

Examples

>>> parse_presort_exp("b desc, c asc")
>>> parse_presort_exp([("b", True), ("c", False))])
both return IndexedOrderedDict([("b", True), ("c", False))])

fugue.collections.sql#

class fugue.collections.sql.StructuredRawSQL(statements, dialect=None)[source]#

Bases: object

The Raw SQL object containing table references and dialect information.

Parameters
  • statements (Iterable[Tuple[bool, str]]) – In each tuple, the first value indicates whether the second value is a dataframe name reference (True), or just a part of the statement (False)

  • dialect (Optional[str]) – the dialect of the statements, defaults to None

Note

dialect None means no transpilation will be done when constructing the final sql.

construct(name_map=None, dialect=None, log=None)[source]#

Construct the final SQL given the dialect

Parameters
  • name_map (Union[None, Callable[[str], str], Dict[str, str]]) – the name map from the original statement to the expected names, defaults to None. It can be a function or a dictionary

  • dialect (Optional[str]) – the expected dialect, defaults to None

  • log (Optional[Logger]) – the logger to log information, defaults to None

Returns

the final SQL string

property dialect: Optional[str]#

The dialect of this query

static from_expr(sql, prefix='<tmpdf:', suffix='>', dialect=None)[source]#

Parse the StructuredRawSQL from the sql expression. The sql should look like SELECT * FROM <tmpdf:dfname>. This function can identify the tmpdfs with the given syntax, and construct the StructuredRawSQL

Parameters
  • sql (str) – the SQL expression with <tmpdf:?>

  • prefix (str) – the prefix of the temp df

  • suffix (str) – the suffix of the temp df

  • dialect (Optional[str]) – the dialect of the sql expression, defaults to None

Returns

the parsed object

Return type

StructuredRawSQL

class fugue.collections.sql.TempTableName[source]#

Bases: object

Generating a temporary, random and globaly unique table name

fugue.collections.yielded#

class fugue.collections.yielded.PhysicalYielded(yid, storage_type)[source]#

Bases: Yielded

Physical yielded object from FugueWorkflow. Users shouldn’t create this object directly.

Parameters
  • yid (str) – unique id for determinism

  • storage_type (str) – file or table

property is_set: bool#

Whether the value is set. It can be false if the parent workflow has not been executed.

property name: str#

The name reference of the yield

set_value(name)[source]#

Set the storage name after compute

Parameters

name (str) – name reference of the storage

Return type

None

property storage_type: str#

The storage type of this yield

class fugue.collections.yielded.Yielded(yid)[source]#

Bases: object

Yields from FugueWorkflow. Users shouldn’t create this object directly.

Parameters

yid (str) – unique id for determinism

property is_set: bool#

Whether the value is set. It can be false if the parent workflow has not been executed.