fugue.collections#
fugue.collections.partition#
- class fugue.collections.partition.BagPartitionCursor(physical_partition_no)[source]#
Bases:
DatasetPartitionCursor
The cursor pointing at the first bag item of each logical partition inside a physical partition.
It’s important to understand the concept of partition, please read the Partition Tutorial
- Parameters
physical_partition_no (int) – physical partition number passed in by
ExecutionEngine
- class fugue.collections.partition.DatasetPartitionCursor(physical_partition_no)[source]#
Bases:
object
The cursor pointing at the first item of each logical partition inside a physical partition.
It’s important to understand the concept of partition, please read the Partition Tutorial
- Parameters
physical_partition_no (int) – physical partition number passed in by
ExecutionEngine
- property item: Any#
Get current item
- property partition_no: int#
Logical partition number
- property physical_partition_no: int#
Physical partition number
- set(item, partition_no, slice_no)[source]#
reset the cursor to a row (which should be the first row of a new logical partition)
- Parameters
item (Any) – an item of the dataset, or an function generating the item
partition_no (int) – logical partition number
slice_no (int) – slice number inside the logical partition (to be deprecated)
- Return type
None
- property slice_no: int#
Slice number (inside the current logical partition), for now it should always be 0
- class fugue.collections.partition.PartitionCursor(schema, spec, physical_partition_no)[source]#
Bases:
DatasetPartitionCursor
The cursor pointing at the first row of each logical partition inside a physical partition.
It’s important to understand the concept of partition, please read the Partition Tutorial
- Parameters
schema (Schema) – input dataframe schema
spec (PartitionSpec) – partition spec
physical_partition_no (int) – physical partition number passed in by
ExecutionEngine
- property key_value_array: List[Any]#
Based on current row, get the partition key values as an array
- property key_value_dict: Dict[str, Any]#
Based on current row, get the partition key values as a dict
- property row: List[Any]#
Get current row data
- set(row, partition_no, slice_no)[source]#
reset the cursor to a row (which should be the first row of a new logical partition)
- Parameters
row (Any) – list-like row data or a function generating a list-like row
partition_no (int) – logical partition number
slice_no (int) – slice number inside the logical partition (to be deprecated)
- Return type
None
- class fugue.collections.partition.PartitionSpec(*args, **kwargs)[source]#
Bases:
object
Fugue Partition Specification.
Examples
>>> PartitionSepc(num=4) >>> PartitionSepc(4) # == PartitionSepc(num=4) >>> PartitionSepc(num="ROWCOUNT/4 + 3") # It can be an expression >>> PartitionSepc(by=["a","b"]) >>> PartitionSepc(["a","b"]) # == PartitionSepc(by=["a","b"]) >>> PartitionSpec(by=["a"], presort="b DESC, c ASC") >>> PartitionSpec(algo="even", num=4) >>> p = PartitionSpec(num=4, by=["a"]) >>> p_override = PartitionSpec(p, by=["a","b"], algo="even") >>> PartitionSpec(by="a") # == PartitionSpec(by=["a"]) >>> PartitionSpec("a") # == PartitionSpec(by=["a"]) >>> PartitionSpec("per_row") # == PartitionSpec(num="ROWCOUNT", algo="even")
It’s important to understand this concept, please read the Partition Tutorial
Partition consists for these specs:
algo: can be one of
default
,hash
,rand
,even
orcoarse
num or num_partitions: number of physical partitions, it can be an expression or integer numbers, e.g
(ROWCOUNT+4) / 3
by or partition_by: keys to partition on
presort: keys to sort other than partition keys. E.g.
a
anda asc
means presort by column a ascendingly,a,b desc
means presort by a ascendingly and then by b descendingly.row_limit and size_limit are to be deprecated
- Parameters
args (Any) –
kwargs (Any) –
- property algo: str#
Get algo of the spec, one of
default
,hash
,rand
even
orcoarse
- property empty: bool#
Whether this spec didn’t specify anything
- get_cursor(schema, physical_partition_no)[source]#
Get
PartitionCursor
based on dataframe schema and physical partition number. You normally don’t call this method directly- Parameters
schema (Schema) – the dataframe schema this partition spec to operate on
physical_partition_no (int) – physical partition no passed in by
ExecutionEngine
- Returns
PartitionCursor object
- Return type
- get_num_partitions(**expr_map_funcs)[source]#
Convert
num_partitions
expression to int number- Parameters
expr_map_funcs (Any) – lambda functions (no parameter) for keywords
- Returns
integer value of the partitions
- Return type
int
Examples
>>> p = PartitionSpec(num="ROWCOUNT/2") >>> p.get_num_partitions(ROWCOUNT=lambda: df.count())
- get_partitioner(schema)[source]#
Get
SchemaedDataPartitioner
by input dataframe schema- Parameters
schema (Schema) – the dataframe schema this partition spec to operate on
- Returns
SchemaedDataPartitioner object
- Return type
- get_sorts(schema, with_partition_keys=True)[source]#
Get keys for sorting in a partition, it’s the combination of partition keys plus the presort keys
- Parameters
schema (Schema) – the dataframe schema this partition spec to operate on
with_partition_keys (bool) – whether to include partition keys
- Returns
an ordered dictionary of key, order pairs
- Return type
IndexedOrderedDict[str, bool]
Examples
>>> p = PartitionSpec(by=["a"],presort="b , c dESc") >>> schema = Schema("a:int,b:int,c:int,d:int")) >>> assert p.get_sorts(schema) == {"a":True, "b":True, "c": False}
- property num_partitions: str#
Number of partitions, it can be a string expression or int
- property partition_by: List[str]#
Get partition keys of the spec
- property presort: IndexedOrderedDict[str, bool]#
Get presort pairs of the spec
Examples
>>> p = PartitionSpec(by=["a"],presort="b,c desc") >>> assert p.presort == {"b":True, "c":False}
- property presort_expr: str#
Get normalized presort expression
Examples
>>> p = PartitionSpec(by=["a"],presort="b , c dESc") >>> assert p.presort_expr == "b ASC,c DESC"
- fugue.collections.partition.parse_presort_exp(presort)[source]#
Returns ordered column sorting direction where ascending order would return as true, and descending as false.
- Parameters
presort (Any) – string that contains column and sorting direction or list of tuple that contains column and boolean sorting direction
- Returns
column and boolean sorting direction of column, order matters.
- Return type
IndexedOrderedDict[str, bool]
Examples
>>> parse_presort_exp("b desc, c asc") >>> parse_presort_exp([("b", True), ("c", False))]) both return IndexedOrderedDict([("b", True), ("c", False))])
fugue.collections.sql#
- class fugue.collections.sql.StructuredRawSQL(statements, dialect=None)[source]#
Bases:
object
The Raw SQL object containing table references and dialect information.
- Parameters
statements (Iterable[Tuple[bool, str]]) – In each tuple, the first value indicates whether the second value is a dataframe name reference (True), or just a part of the statement (False)
dialect (Optional[str]) – the dialect of the statements, defaults to None
Note
dialect
None means no transpilation will be done when constructing the final sql.- construct(name_map=None, dialect=None, log=None)[source]#
Construct the final SQL given the
dialect
- Parameters
name_map (Union[None, Callable[[str], str], Dict[str, str]]) – the name map from the original statement to the expected names, defaults to None. It can be a function or a dictionary
dialect (Optional[str]) – the expected dialect, defaults to None
log (Optional[Logger]) – the logger to log information, defaults to None
- Returns
the final SQL string
- property dialect: Optional[str]#
The dialect of this query
- static from_expr(sql, prefix='<tmpdf:', suffix='>', dialect=None)[source]#
Parse the
StructuredRawSQL
from thesql
expression. The sql should look likeSELECT * FROM <tmpdf:dfname>
. This function can identify the tmpdfs with the given syntax, and construct theStructuredRawSQL
- Parameters
sql (str) – the SQL expression with
<tmpdf:?>
prefix (str) – the prefix of the temp df
suffix (str) – the suffix of the temp df
dialect (Optional[str]) – the dialect of the sql expression, defaults to None
- Returns
the parsed object
- Return type
fugue.collections.yielded#
- class fugue.collections.yielded.PhysicalYielded(yid, storage_type)[source]#
Bases:
Yielded
Physical yielded object from
FugueWorkflow
. Users shouldn’t create this object directly.- Parameters
yid (str) – unique id for determinism
storage_type (str) –
file
ortable
- property is_set: bool#
Whether the value is set. It can be false if the parent workflow has not been executed.
- property name: str#
The name reference of the yield
- set_value(name)[source]#
Set the storage name after compute
- Parameters
name (str) – name reference of the storage
- Return type
None
- property storage_type: str#
The storage type of this yield
- class fugue.collections.yielded.Yielded(yid)[source]#
Bases:
object
Yields from
FugueWorkflow
. Users shouldn’t create this object directly.- Parameters
yid (str) – unique id for determinism
- property is_set: bool#
Whether the value is set. It can be false if the parent workflow has not been executed.