fugue#

fugue.constants#

fugue.constants.register_global_conf(conf, on_dup=0)[source]#

Register global Fugue configs that can be picked up by any Fugue execution engines as the base configs.

Parameters
Return type

None

Note

When using ParamDict.THROW or on_dup, it’s transactional. If any key in conf is already in global config and the value is different from the new value, then ValueError will be thrown.

Examples

from fugue import register_global_conf, NativeExecutionEngine

register_global_conf({"my.value",1})

engine = NativeExecutionEngine()
assert 1 == engine.conf["my.value"]

engine = NativeExecutionEngine({"my.value",2})
assert 2 == engine.conf["my.value"]

fugue.exceptions#

exception fugue.exceptions.FugueBug[source]#

Bases: fugue.exceptions.FugueError

Fugue internal bug

exception fugue.exceptions.FugueDataFrameEmptyError[source]#

Bases: fugue.exceptions.FugueDataFrameError

Fugue dataframe is empty

exception fugue.exceptions.FugueDataFrameError[source]#

Bases: fugue.exceptions.FugueError

Fugue dataframe related error

exception fugue.exceptions.FugueDataFrameInitError[source]#

Bases: fugue.exceptions.FugueDataFrameError

Fugue dataframe initialization error

exception fugue.exceptions.FugueDataFrameOperationError[source]#

Bases: fugue.exceptions.FugueDataFrameError

Fugue dataframe invalid operation

exception fugue.exceptions.FugueError[source]#

Bases: Exception

Fugue exceptions

exception fugue.exceptions.FugueInterfacelessError[source]#

Bases: fugue.exceptions.FugueWorkflowCompileError

Fugue interfaceless exceptions

exception fugue.exceptions.FuguePluginsRegistrationError[source]#

Bases: fugue.exceptions.FugueError

Fugue plugins registration error

exception fugue.exceptions.FugueWorkflowCompileError[source]#

Bases: fugue.exceptions.FugueWorkflowError

Fugue workflow compile time error

exception fugue.exceptions.FugueWorkflowCompileValidationError[source]#

Bases: fugue.exceptions.FugueWorkflowCompileError

Fugue workflow compile time validation error

exception fugue.exceptions.FugueWorkflowError[source]#

Bases: fugue.exceptions.FugueError

Fugue workflow exceptions

exception fugue.exceptions.FugueWorkflowRuntimeError[source]#

Bases: fugue.exceptions.FugueWorkflowError

Fugue workflow compile time error

exception fugue.exceptions.FugueWorkflowRuntimeValidationError[source]#

Bases: fugue.exceptions.FugueWorkflowRuntimeError

Fugue workflow runtime validation error

fugue.interfaceless#

fugue.interfaceless.out_transform(df, using, params=None, partition=None, callback=None, ignore_errors=None, engine=None, engine_conf=None)[source]#

Transform this dataframe using transformer. It’s a wrapper of out_transform() and run(). It let you do the basic dataframe transformation without using FugueWorkflow and DataFrame. The input can be native type only

Please read the Transformer Tutorial

Parameters
  • df (Any) – DataFrame like object or Yielded or a path string to a parquet file

  • using (Any) – transformer-like object, can’t be a string expression

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • partition (Optional[Any]) – Partition like object, defaults to None.

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)

  • engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an ExecutionEngine type, or the ExecutionEngine instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can use None for either of them to use the default one), defaults to None

  • engine_conf (Optional[Any]) – Parameters like object, defaults to None

Return type

None

Note

This function can only take parquet file paths in df. Csv and other file formats are disallowed.

This transformation is guaranteed to execute immediately (eager) and return nothing

fugue.interfaceless.transform(df, using, schema=None, params=None, partition=None, callback=None, ignore_errors=None, engine=None, engine_conf=None, force_output_fugue_dataframe=False, persist=False, as_local=False, save_path=None, checkpoint=False)[source]#

Transform this dataframe using transformer. It’s a wrapper of transform() and run(). It let you do the basic dataframe transformation without using FugueWorkflow and DataFrame. Both input and output can be native types only.

Please read the Transformer Tutorial

Parameters
  • df (Any) – DataFrame like object or Yielded or a path string to a parquet file

  • using (Any) – transformer-like object, can’t be a string expression

  • schema (Optional[Any]) – Schema like object, defaults to None. The transformer will be able to access this value from output_schema()

  • params (Optional[Any]) – Parameters like object to run the processor, defaults to None. The transformer will be able to access this value from params()

  • partition (Optional[Any]) – Partition like object, defaults to None

  • callback (Optional[Any]) – RPChandler like object, defaults to None

  • ignore_errors (Optional[List[Any]]) – list of exception types the transformer can ignore, defaults to None (empty list)

  • engine (Optional[Any]) – it can be empty string or null (use the default execution engine), a string (use the registered execution engine), an ExecutionEngine type, or the ExecutionEngine instance , or a tuple of two values where the first value represents execution engine and the second value represents the sql engine (you can use None for either of them to use the default one), defaults to None

  • engine_conf (Optional[Any]) – Parameters like object, defaults to None

  • force_output_fugue_dataframe (bool) – If true, the function will always return a FugueDataFrame, otherwise, if df is in native dataframe types such as pandas dataframe, then the output will also in its native format. Defaults to False

  • persist (bool) – Whether to persist(materialize) the dataframe before returning

  • as_local (bool) – If true, the result will be converted to a LocalDataFrame

  • save_path (Optional[str]) – Whether to save the output to a file (see the note)

  • checkpoint (bool) – Whether to add a checkpoint for the output (see the note)

Returns

the transformed dataframe, if df is a native dataframe (e.g. pd.DataFrame, spark dataframe, etc), the output will be a native dataframe, the type is determined by the execution engine you use. But if df is of type DataFrame, then the output will also be a DataFrame

Return type

Any

Note

This function may be lazy and return the transformed dataframe.

Note

When you use callback in this function, you must be careful that the output dataframe must be materialized. Otherwise, if the real compute happens out of the function call, the callback receiver is already shut down. To do that you can either use persist or as_local, both will materialize the dataframe before the callback receiver shuts down.

Note

  • When save_path is None and checkpoint is False, then the output will not be saved into a file. The return will be a dataframe.

  • When save_path is None and checkpoint is True, then the output will be saved into the path set by fugue.workflow.checkpoint.path, the name will be randomly chosen, and it is NOT a deterministic checkpoint, so if you run multiple times, the output will be saved into different files. The return will be a dataframe.

  • When save_path is not None and checkpoint is False, then the output will be saved into save_path. The return will be the value of save_path

  • When save_path is not None and checkpoint is True, then the output will be saved into save_path. The return will be the dataframe from save_path

This function can only take parquet file paths in df and save_path. Csv and other file formats are disallowed.

The checkpoint here is NOT deterministic, so re-run will generate new checkpoints.

If you want to read and write other file formats or if you want to use deterministic checkpoints, please use FugueWorkflow.

fugue.registry#