fugue.execution#

fugue.execution.execution_engine#

class fugue.execution.execution_engine.ExecutionEngine(conf)[source]#

Bases: abc.ABC

The abstract base class for execution engines. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user’s higher level logic.

Please read the ExecutionEngine Tutorial to understand this most important Fugue concept

Parameters

conf (Any) – dict-like config, read this to learn Fugue specific options

aggregate(df, partition_spec, agg_cols, metadata=None)[source]#

Aggregate on dataframe

Parameters
Returns

the aggregated result as a dataframe

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

# SELECT MAX(b) AS b FROM df
engine.aggregate(
    df,
    partition_spec=None,
    agg_cols=[f.max(col("b"))])

# SELECT a, MAX(b) AS x FROM df GROUP BY a
engine.aggregate(
    df,
    partition_spec=PartitionSpec(by=["a"]),
    agg_cols=[f.max(col("b")).alias("x")])
assign(df, columns, metadata=None)[source]#

Update existing columns with new values and add new columns

Parameters
Returns

the updated dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Tip

This can be used to cast data types, alter column values or add new columns. But you can’t use aggregation in columns.

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

# assume df has schema: a:int,b:str

# add constant column x
engine.assign(df, lit(1,"x"))

# change column b to be a constant integer
engine.assign(df, lit(1,"b"))

# add new x to be a+b
engine.assign(df, (col("a")+col("b")).alias("x"))

# cast column a data type to double
engine.assign(df, col("a").cast(float))
abstract broadcast(df)[source]#

Broadcast the dataframe to all workers for a distributed computing framework

Parameters

df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

Returns

the broadcasted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

comap(df, map_func, output_schema, partition_spec, metadata=None, on_init=None)[source]#

Apply a function to each zipped partition on the zipped dataframe.

Parameters
Returns

the dataframe after the comap operation

Note

  • The input of this method must be an output of zip() or zip_all()

  • The partition_spec here is NOT related with how you zipped the dataframe and however you set it, will only affect the processing speed, actually the partition keys will be overriden to the zipped dataframe partition keys. You may use it in this way to improve the efficiency: PartitionSpec(algo="even", num="ROWCOUNT"), this tells the execution engine to put each zipped partition into a physical partition so it can achieve the best possible load balance.

  • If input dataframe has keys, the dataframes you get in map_func and on_init will have keys, otherwise you will get list-like dataframes

  • on_init function will get a DataFrames object that has the same structure, but has all empty dataframes, you can use the schemas but not the data.

See also

For more details and examples, read Zip & Comap

property compile_conf: triad.collections.dict.ParamDict#

Compiled time (workflow level) configurations, it is always a superset of conf

Note

Users normally don’t need to use this property. It is for internal use.

property conf: triad.collections.dict.ParamDict#

All configurations of this engine instance.

Note

It can contain more than you providec, for example in SparkExecutionEngine, the Spark session can bring in more config, they are all accessible using this property.

convert_yield_dataframe(df, as_local)[source]#

Convert a yield dataframe to a dataframe that can be used after this execution engine stops.

Parameters
Returns

another DataFrame that can be used after this execution engine stops

Return type

fugue.dataframe.dataframe.DataFrame

Note

By default, the output dataframe is the input dataframe. But it should be overridden if when an engine stops and the input dataframe will become invalid.

For example, if you custom a spark engine where you start and stop the spark session in this engine’s start_engine() and stop_engine(), then the spark dataframe will be invalid. So you may consider converting it to a local dataframe so it can still exist after the engine stops.

abstract property default_sql_engine: fugue.execution.execution_engine.SQLEngine#

Default SQLEngine if user doesn’t specify

abstract distinct(df, metadata=None)[source]#

Equivalent to SELECT DISTINCT * FROM df

Parameters
Returns

[description]

Return type

DataFrame

abstract dropna(df, how='any', thresh=None, subset=None, metadata=None)[source]#

Drop NA recods from dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (Optional[int]) – int, drops rows that have less than thresh non-null values

  • subset (Optional[List[str]]) – list of columns to operate on

  • metadata (Any, optional) – dict-like object to add to the result dataframe, defaults to None

Returns

DataFrame with NA records dropped

Return type

DataFrame

abstract fillna(df, value, subset=None, metadata=None)[source]#

Fill NULL, NAN, NAT values in a dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary

  • metadata (Any, optional) – dict-like object to add to the result dataframe, defaults to None

Returns

DataFrame with NA records filled

Return type

DataFrame

filter(df, condition, metadata=None)[source]#

Filter rows by the given condition

Parameters
Returns

the filtered dataframe

Return type

fugue.dataframe.dataframe.DataFrame

New Since

0.6.0

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

engine.filter(df, (col("a")>1) & (col("b")=="x"))
engine.filter(df, f.coalesce(col("a"),col("b"))>1)
abstract property fs: triad.collections.fs.FileSystem#

File system of this engine instance

abstract intersect(df1, df2, distinct=True, metadata=None)[source]#

Intersect df1 and df2

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

abstract join(df1, df2, how, on=[], metadata=None)[source]#

Join two dataframes

Parameters
  • df1 (fugue.dataframe.dataframe.DataFrame) – the first dataframe

  • df2 (fugue.dataframe.dataframe.DataFrame) – the second dataframe

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (List[str]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

the joined dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Please read this

abstract load_df(path, format_hint=None, columns=None, **kwargs)[source]#

Load dataframe from persistent storage

Parameters
  • path (Union[str, List[str]]) – the path to the dataframe

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Optional[Any]) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

Returns

an engine compatible dataframe

Return type

fugue.dataframe.dataframe.DataFrame

For more details and examples, read Zip & Comap.

abstract property log: logging.Logger#

Logger of this engine instance

abstract map(df, map_func, output_schema, partition_spec, metadata=None, on_init=None)[source]#

Apply a function to each partition after you partition the data in a specified way.

Parameters
Returns

the dataframe after the map operation

Return type

fugue.dataframe.dataframe.DataFrame

Note

Before implementing, you must read this to understand what map is used for and how it should work.

abstract persist(df, lazy=False, **kwargs)[source]#

Force materializing and caching the dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • args – parameter to pass to the underlying persist implementation

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns

the persisted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

abstract repartition(df, partition_spec)[source]#

Partition the input dataframe using partition_spec.

Parameters
Returns

repartitioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Before implementing please read the Partition Tutorial

property rpc_server: fugue.rpc.base.RPCServer#

RPCServer of this execution engine

abstract sample(df, n=None, frac=None, replace=False, seed=None, metadata=None)[source]#

Sample dataframe by number of rows or by fraction

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • n (Optional[int]) – number of rows to sample, one and only one of n and frac must be set

  • frac (Optional[float]) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (Optional[int]) – seed for randomness, defaults to None

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

sampled dataframe

Return type

DataFrame

abstract save_df(df, path, format_hint=None, mode='overwrite', partition_spec=PartitionSpec(num='0', by=[], presort=''), force_single=False, **kwargs)[source]#

Save dataframe to a persistent storage

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (fugue.collections.partition.PartitionSpec) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type

None

For more details and examples, read Load & Save.

select(df, cols, where=None, having=None, metadata=None)[source]#

The functional interface for SQL select statement

Parameters
Returns

the select result as a dataframe

Return type

fugue.dataframe.dataframe.DataFrame

New Since

0.6.0

Attention

This interface is experimental, it’s subjected to change in new versions.

See also

Please find more expression examples in fugue.column.sql and fugue.column.functions

Examples

import fugue.column.functions as f

# select existed and new columns
engine.select(df, SelectColumns(col("a"),col("b"),lit(1,"another")))
engine.select(df, SelectColumns(col("a"),(col("b")+lit(1)).alias("x")))

# aggregation
# SELECT COUNT(DISTINCT *) AS x FROM df
engine.select(
    df,
    SelectColumns(f.count_distinct(col("*")).alias("x")))

# SELECT a, MAX(b+1) AS x FROM df GROUP BY a
engine.select(
    df,
    SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")))

# SELECT a, MAX(b+1) AS x FROM df
#   WHERE b<2 AND a>1
#   GROUP BY a
#   HAVING MAX(b+1)>0
engine.select(
    df,
    SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")),
    where=(col("b")<2) & (col("a")>1),
    having=f.max(col("b")+lit(1))>0
)
set_sql_engine(engine)[source]#

Set SQLEngine for this execution engine. If not set, the default is default_sql_engine()

Parameters

engine (fugue.execution.execution_engine.SQLEngine) – SQLEngine instance

Return type

None

property sql_engine: fugue.execution.execution_engine.SQLEngine#

The SQLEngine currently used by this execution engine. You should use set_sql_engine() to set a new SQLEngine instance. If not set, the default is default_sql_engine()

start()[source]#

Start this execution engine, do not override. You should customize start_engine() if necessary.

Return type

None

start_engine()[source]#

Custom logic to start the execution engine, defaults to no operation

Return type

None

stop()[source]#

Stop this execution engine, do not override You should customize stop_engine() if necessary.

Return type

None

stop_engine()[source]#

Custom logic to stop the execution engine, defaults to no operation

Return type

None

abstract subtract(df1, df2, distinct=True, metadata=None)[source]#

df1 - df2

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

abstract take(df, n, presort, na_position='last', partition_spec=PartitionSpec(num='0', by=[], presort=''), metadata=None)[source]#

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition_spec (fugue.collections.partition.PartitionSpec) – PartitionSpec to apply the take operation

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

n rows of DataFrame per partition

Return type

DataFrame

abstract to_df(data, schema=None, metadata=None)[source]#

Convert a data structure to this engine compatible DataFrame

Parameters
  • data (Any) – DataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Optional[Any]) – Schema like object, defaults to None

  • metadata (Optional[Any]) – Parameters like object, defaults to None

Returns

engine compatible dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

abstract union(df1, df2, distinct=True, metadata=None)[source]#

Join two dataframes

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

zip(df1, df2, how='inner', partition_spec=PartitionSpec(num='0', by=[], presort=''), temp_path=None, to_file_threshold=- 1, df1_name=None, df2_name=None)[source]#

Partition the two dataframes in the same way with partition_spec and zip the partitions together on the partition keys.

Parameters
  • df1 (fugue.dataframe.dataframe.DataFrame) – the first dataframe

  • df2 (fugue.dataframe.dataframe.DataFrame) – the second dataframe

  • how (str) – can accept inner, left_outer, right_outer, full_outer, cross, defaults to inner

  • partition_spec (PartitionSpec, optional) – partition spec to partition each dataframe, defaults to empty.

  • temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than to_file_threshold), defaults to None

  • to_file_threshold (Any) – file byte size threshold, defaults to -1

  • df1_name (Optional[str]) – df1’s name in the zipped dataframe, defaults to None

  • df2_name (Optional[str]) – df2’s name in the zipped dataframe, defaults to None

Returns

a zipped dataframe, the metadata of the dataframe will indicate it’s zipped

Note

  • Different from join, df1 and df2 can have common columns that you will not use as partition keys.

  • If on is not specified it will also use the common columns of the two dataframes (if it’s not a cross zip)

  • For non-cross zip, the two dataframes must have common columns, or error will be thrown

See also

For more details and examples, read Zip & Comap.

zip_all(dfs, how='inner', partition_spec=PartitionSpec(num='0', by=[], presort=''), temp_path=None, to_file_threshold=- 1)[source]#

Zip multiple dataframes together with given partition specifications.

Parameters
  • dfs (fugue.dataframe.dataframes.DataFrames) – DataFrames like object

  • how (str) – can accept inner, left_outer, right_outer, full_outer, cross, defaults to inner

  • partition_spec (fugue.collections.partition.PartitionSpec) – Partition like object, defaults to empty.

  • temp_path (Optional[str]) – file path to store the data (used only if the serialized data is larger than to_file_threshold), defaults to None

  • to_file_threshold (Any) – file byte size threshold, defaults to -1

Returns

a zipped dataframe, the metadata of the dataframe will indicated it’s zipped

Return type

fugue.dataframe.dataframe.DataFrame

Note

  • Please also read zip()

  • If dfs is dict like, the zipped dataframe will be dict like, If dfs is list like, the zipped dataframe will be list like

  • It’s fine to contain only one dataframe in dfs

See also

For more details and examples, read Zip & Comap

class fugue.execution.execution_engine.SQLEngine(execution_engine)[source]#

Bases: abc.ABC

The abstract base class for different SQL execution implementations. Please read this to understand the concept

Parameters

execution_engine (ExecutionEngine) – the execution engine this sql engine will run on

Return type

None

property execution_engine: fugue.execution.execution_engine.ExecutionEngine#

the execution engine this sql engine will run on

abstract select(dfs, statement)[source]#

Execute select statement on the sql engine.

Parameters
Returns

result of the SELECT statement

Return type

fugue.dataframe.dataframe.DataFrame

Examples

>>> dfs = DataFrames(a=df1, b=df2)
>>> sql_engine.select(dfs, "SELECT * FROM a UNION SELECT * FROM b")

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")

fugue.execution.factory#

fugue.execution.factory.make_execution_engine(engine=None, conf=None, **kwargs)[source]#

Create ExecutionEngine with specified engine

Parameters
  • engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an ExecutionEngine type, or the ExecutionEngine instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can use None for either of them to use the default one), defaults to None

  • conf (Optional[Any]) – Parameters like object, defaults to None

  • kwargs (Any) – additional parameters to initialize the execution engine

Returns

the ExecutionEngine instance

Return type

fugue.execution.execution_engine.ExecutionEngine

Examples

register_default_execution_engine(lambda conf: E1(conf))
register_execution_engine("e2", lambda conf, **kwargs: E2(conf, **kwargs))

register_sql_engine("s", lambda conf: S2(conf))

# E1 + E1.default_sql_engine
make_execution_engine()

# E2 + E2.default_sql_engine
make_execution_engine(e2)

# E1 + S2
make_execution_engine((None, "s"))

# E2(conf, a=1, b=2) + S2
make_execution_engine(("e2", "s"), conf, a=1, b=2)

# SparkExecutionEngine + SparkSQLEngine
make_execution_engine(SparkExecutionEngine)
make_execution_engine(SparkExecutionEngine(spark_session, conf))

# SparkExecutionEngine + S2
make_execution_engine((SparkExecutionEngine, "s"))
fugue.execution.factory.make_sql_engine(engine=None, execution_engine=None, **kwargs)[source]#

Create SQLEngine with specified engine

Parameters
  • engine (Optional[Any]) – it can be empty string or null (use the default SQL engine), a string (use the registered SQL engine), an SQLEngine type, or the SQLEngine instance (you can use None to use the default one), defaults to None

  • execution_engine (Optional[fugue.execution.execution_engine.ExecutionEngine]) – the ExecutionEngine instance to create the SQLEngine. Normally you should always provide this value.

  • kwargs (Any) – additional parameters to initialize the sql engine

Returns

the SQLEngine instance

Return type

fugue.execution.execution_engine.SQLEngine

Note

For users, you normally don’t need to call this function directly. Use make_execution_engine instead

Examples

register_default_sql_engine(lambda conf: S1(conf))
register_sql_engine("s2", lambda conf: S2(conf))

engine = NativeExecutionEngine()

# S1(engine)
make_sql_engine(None, engine)

# S1(engine, a=1)
make_sql_engine(None, engine, a=1)

# S2(engine)
make_sql_engine("s2", engine)

# SqliteEngine(engine)
make_sql_engine(SqliteEngine)
fugue.execution.factory.register_default_execution_engine(func, on_dup='overwrite')[source]#

Register ExecutionEngine as the default engine.

Parameters
  • func (Callable) – a callable taking Parameters like object and **kwargs and returning an ExecutionEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.

Return type

None

Examples

# create a new engine with name my (overwrites if existed)
register_default_execution_engine(lambda conf: MyExecutionEngine(conf))

# the following examples will use MyExecutionEngine

# 0
make_execution_engine()
make_execution_engine(None, {"myconfig":"value})

# 1
with FugueWorkflow() as dag:
    dag.create([[0]],"a:int").show()

# 2
dag = FugueWorkflow()
dag.create([[0]],"a:int").show()
dag.run(None, {"myconfig":"value})

# 3
fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run("", {"myconfig":"value})
fugue.execution.factory.register_default_sql_engine(func, on_dup='overwrite')[source]#

Register SQLEngine as the default engine

Parameters
  • func (Callable) – a callable taking ExecutionEngine and **kwargs and returning a SQLEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting) or “throw” (throw exception), defaults to “overwrite”.

Raises

KeyError – if on_dup is throw and the name already exists

Return type

None

Note

You should be careful to use this function, because when you set a custom SQL engine as default, all execution engines you create will use this SQL engine unless you are explicit. For example if you set the default SQL engine to be a Spark specific one, then if you start a NativeExecutionEngine, it will try to use it and will throw exceptions.

So it’s always a better idea to use register_sql_engine instead

Examples

# create a new engine with name my (overwrites if existed)
register_default_sql_engine(lambda engine: MySQLEngine(engine))

# create NativeExecutionEngine with MySQLEngine as the default
make_execution_engine()

# create SparkExecutionEngine with MySQLEngine instead of SparkSQLEngine
make_execution_engine("spark")

# NativeExecutionEngine with MySQLEngine
with FugueWorkflow() as dag:
    dag.create([[0]],"a:int").show()
fugue.execution.factory.register_execution_engine(name_or_type, func, on_dup='overwrite')[source]#

Register ExecutionEngine with a given name.

Parameters
  • name_or_type (Union[str, Type]) – alias of the execution engine, or type of an object that can be converted to an execution engine

  • func (Callable) – a callable taking Parameters like object and **kwargs and returning an ExecutionEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.

Return type

None

Examples

Alias registration examples:

# create a new engine with name my (overwrites if existed)
register_execution_engine("my", lambda conf: MyExecutionEngine(conf))

# 0
make_execution_engine("my")
make_execution_engine("my", {"myconfig":"value})

# 1
with FugueWorkflow("my") as dag:
    dag.create([[0]],"a:int").show()

# 2
dag = FugueWorkflow()
dag.create([[0]],"a:int").show()
dag.run("my", {"myconfig":"value})

# 3
fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run("my")

Type registration examples:

from pyspark.sql import SparkSession
from fugue_spark import SparkExecutionEngine
from fugue_sql import fsql

register_execution_engine(
    SparkSession,
    lambda session, conf: SparkExecutionEngine(session, conf))

spark_session = SparkSession.builder.getOrCreate()

fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run(spark_session)
fugue.execution.factory.register_sql_engine(name, func, on_dup='overwrite')[source]#

Register SQLEngine with a given name.

Parameters
  • name (str) – name of the SQL engine

  • func (Callable) – a callable taking ExecutionEngine and **kwargs and returning a SQLEngine instance

  • on_dup – action on duplicated name. It can be “overwrite”, “ignore” (not overwriting), defaults to “overwrite”.

Return type

None

Examples

# create a new engine with name my (overwrites if existed)
register_sql_engine("mysql", lambda engine: MySQLEngine(engine))

# create execution engine with MySQLEngine as the default
make_execution_engine(("", "mysql"))

# create DaskExecutionEngine with MySQLEngine as the default
make_execution_engine(("dask", "mysql"))

# default execution engine + MySQLEngine
with FugueWorkflow(("","mysql")) as dag:
    dag.create([[0]],"a:int").show()

fugue.execution.native_execution_engine#

class fugue.execution.native_execution_engine.NativeExecutionEngine(conf=None)[source]#

Bases: fugue.execution.execution_engine.ExecutionEngine

The execution engine based on native python and pandas. This execution engine is mainly for prototyping and unit tests.

Please read the ExecutionEngine Tutorial to understand this important Fugue concept

Parameters

conf (Any) – Parameters like object, read the Fugue Configuration Tutorial to learn Fugue specific options

broadcast(df)[source]#

Broadcast the dataframe to all workers for a distributed computing framework

Parameters

df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

Returns

the broadcasted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

property default_sql_engine: fugue.execution.execution_engine.SQLEngine#

Default SQLEngine if user doesn’t specify

distinct(df, metadata=None)[source]#

Equivalent to SELECT DISTINCT * FROM df

Parameters
Returns

[description]

Return type

DataFrame

dropna(df, how='any', thresh=None, subset=None, metadata=None)[source]#

Drop NA recods from dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • how (str) – ‘any’ or ‘all’. ‘any’ drops rows that contain any nulls. ‘all’ drops rows that contain all nulls.

  • thresh (Optional[int]) – int, drops rows that have less than thresh non-null values

  • subset (Optional[List[str]]) – list of columns to operate on

  • metadata (Any, optional) – dict-like object to add to the result dataframe, defaults to None

Returns

DataFrame with NA records dropped

Return type

DataFrame

fillna(df, value, subset=None, metadata=None)[source]#

Fill NULL, NAN, NAT values in a dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • value (Any) – if scalar, fills all columns with same value. if dictionary, fills NA using the keys as column names and the values as the replacement values.

  • subset (Optional[List[str]]) – list of columns to operate on. ignored if value is a dictionary

  • metadata (Any, optional) – dict-like object to add to the result dataframe, defaults to None

Returns

DataFrame with NA records filled

Return type

DataFrame

property fs: triad.collections.fs.FileSystem#

File system of this engine instance

intersect(df1, df2, distinct=True, metadata=None)[source]#

Intersect df1 and df2

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

join(df1, df2, how, on=[], metadata=None)[source]#

Join two dataframes

Parameters
  • df1 (fugue.dataframe.dataframe.DataFrame) – the first dataframe

  • df2 (fugue.dataframe.dataframe.DataFrame) – the second dataframe

  • how (str) – can accept semi, left_semi, anti, left_anti, inner, left_outer, right_outer, full_outer, cross

  • on (List[str]) – it can always be inferred, but if you provide, it will be validated against the inferred keys.

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

the joined dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Please read this

load_df(path, format_hint=None, columns=None, **kwargs)[source]#

Load dataframe from persistent storage

Parameters
  • path (Union[str, List[str]]) – the path to the dataframe

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • columns (Optional[Any]) – list of columns or a Schema like object, defaults to None

  • kwargs (Any) – parameters to pass to the underlying framework

Returns

an engine compatible dataframe

Return type

fugue.dataframe.dataframe.LocalBoundedDataFrame

For more details and examples, read Zip & Comap.

property log: logging.Logger#

Logger of this engine instance

map(df, map_func, output_schema, partition_spec, metadata=None, on_init=None)[source]#

Apply a function to each partition after you partition the data in a specified way.

Parameters
Returns

the dataframe after the map operation

Return type

fugue.dataframe.dataframe.DataFrame

Note

Before implementing, you must read this to understand what map is used for and how it should work.

persist(df, lazy=False, **kwargs)[source]#

Force materializing and caching the dataframe

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – the input dataframe

  • lazy (bool) – True: first usage of the output will trigger persisting to happen; False (eager): persist is forced to happend immediately. Default to False

  • args – parameter to pass to the underlying persist implementation

  • kwargs (Any) – parameter to pass to the underlying persist implementation

Returns

the persisted dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

persist can only guarantee the persisted dataframe will be computed for only once. However this doesn’t mean the backend really breaks up the execution dependency at the persisting point. Commonly, it doesn’t cause any issue, but if your execution graph is long, it may cause expected problems for example, stack overflow.

property pl_utils: qpd_pandas.engine.PandasUtils#

Pandas-like dataframe utils

repartition(df, partition_spec)[source]#

Partition the input dataframe using partition_spec.

Parameters
Returns

repartitioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Before implementing please read the Partition Tutorial

sample(df, n=None, frac=None, replace=False, seed=None, metadata=None)[source]#

Sample dataframe by number of rows or by fraction

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • n (Optional[int]) – number of rows to sample, one and only one of n and frac must be set

  • frac (Optional[float]) – fraction [0,1] to sample, one and only one of n and frac must be set

  • replace (bool) – whether replacement is allowed. With replacement, there may be duplicated rows in the result, defaults to False

  • seed (Optional[int]) – seed for randomness, defaults to None

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

sampled dataframe

Return type

DataFrame

save_df(df, path, format_hint=None, mode='overwrite', partition_spec=PartitionSpec(num='0', by=[], presort=''), force_single=False, **kwargs)[source]#

Save dataframe to a persistent storage

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – input dataframe

  • path (str) – output path

  • format_hint (Optional[Any]) – can accept parquet, csv, json, defaults to None, meaning to infer

  • mode (str) – can accept overwrite, append, error, defaults to “overwrite”

  • partition_spec (fugue.collections.partition.PartitionSpec) – how to partition the dataframe before saving, defaults to empty

  • force_single (bool) – force the output as a single file, defaults to False

  • kwargs (Any) – parameters to pass to the underlying framework

Return type

None

For more details and examples, read Load & Save.

subtract(df1, df2, distinct=True, metadata=None)[source]#

df1 - df2

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

take(df, n, presort, na_position='last', partition_spec=PartitionSpec(num='0', by=[], presort=''), metadata=None)[source]#

Get the first n rows of a DataFrame per partition. If a presort is defined, use the presort before applying take. presort overrides partition_spec.presort. The Fugue implementation of the presort follows Pandas convention of specifying NULLs first or NULLs last. This is different from the Spark and SQL convention of NULLs as the smallest value.

Parameters
  • df (fugue.dataframe.dataframe.DataFrame) – DataFrame

  • n (int) – number of rows to return

  • presort (str) – presort expression similar to partition presort

  • na_position (str) – position of null values during the presort. can accept first or last

  • partition_spec (fugue.collections.partition.PartitionSpec) – PartitionSpec to apply the take operation

  • metadata (Optional[Any]) – dict-like object to add to the result dataframe, defaults to None

Returns

n rows of DataFrame per partition

Return type

DataFrame

to_df(df, schema=None, metadata=None)[source]#

Convert a data structure to this engine compatible DataFrame

Parameters
  • dataDataFrame, pandas DataFramme or list or iterable of arrays or others that is supported by certain engine implementation

  • schema (Optional[Any]) – Schema like object, defaults to None

  • metadata (Optional[Any]) – Parameters like object, defaults to None

  • df (Any) –

Returns

engine compatible dataframe

Return type

fugue.dataframe.dataframe.LocalBoundedDataFrame

Note

There are certain conventions to follow for a new implementation:

  • if the input is already in compatible dataframe type, it should return itself

  • all other methods in the engine interface should take arbitrary dataframes and call this method to convert before doing anything

union(df1, df2, distinct=True, metadata=None)[source]#

Join two dataframes

Parameters
Returns

the unioned dataframe

Return type

fugue.dataframe.dataframe.DataFrame

Note

Currently, the schema of df1 and df2 must be identical, or an exception will be thrown.

class fugue.execution.native_execution_engine.QPDPandasEngine(execution_engine)[source]#

Bases: fugue.execution.execution_engine.SQLEngine

QPD execution implementation.

Parameters

execution_engine (fugue.execution.execution_engine.ExecutionEngine) – the execution engine this sql engine will run on

select(dfs, statement)[source]#

Execute select statement on the sql engine.

Parameters
Returns

result of the SELECT statement

Return type

fugue.dataframe.dataframe.DataFrame

Examples

>>> dfs = DataFrames(a=df1, b=df2)
>>> sql_engine.select(dfs, "SELECT * FROM a UNION SELECT * FROM b")

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")
class fugue.execution.native_execution_engine.SqliteEngine(execution_engine)[source]#

Bases: fugue.execution.execution_engine.SQLEngine

Sqlite execution implementation.

Parameters

execution_engine (fugue.execution.execution_engine.ExecutionEngine) – the execution engine this sql engine will run on

select(dfs, statement)[source]#

Execute select statement on the sql engine.

Parameters
Returns

result of the SELECT statement

Return type

fugue.dataframe.dataframe.DataFrame

Examples

>>> dfs = DataFrames(a=df1, b=df2)
>>> sql_engine.select(dfs, "SELECT * FROM a UNION SELECT * FROM b")

Note

There can be tables that is not in dfs. For example you want to select from hive without input DataFrames:

>>> sql_engine.select(DataFrames(), "SELECT * FROM hive.a.table")