fugue.sql

fugue.sql.api

fugue.sql.api.fugue_sql(query, *args, fsql_ignore_case=None, fsql_dialect=None, engine=None, engine_conf=None, as_fugue=False, as_local=False, **kwargs)[source]

Simplified Fugue SQL interface. This function can still take multiple dataframe inputs but will always return the last generated dataframe in the SQL workflow. And YIELD should NOT be used with this function. If you want to use Fugue SQL to represent the full workflow, or want to see more Fugue SQL examples, please read fugue_sql_flow().

Parameters:
  • query (str) – the Fugue SQL string (can be a jinja template)

  • args (Any) – variables related to the SQL string

  • fsql_ignore_case (bool | None) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).

  • fsql_dialect (str | None) – the dialect of this fsql, defaults to None (it depends on the engine/global config).

  • kwargs (Any) – variables related to the SQL string

  • engine (AnyExecutionEngine | None) – an engine like object, defaults to None

  • engine_conf (Any | None) – the configs for the engine, defaults to None

  • as_fugue (bool) – whether to force return a Fugue DataFrame, defaults to False

  • as_local (bool) – whether return a local dataframe, defaults to False

Returns:

the result dataframe

Return type:

AnyDataFrame

Note

This function is different from raw_sql() which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates a FugueSQLWorkflow which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.

This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.

Caution

Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.

That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.

We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.

import pandas as pd
import fugue.api as fa

def tr(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(c=2)

input = pd.DataFrame([[0,1],[3.4]], columns=["a","b"])

with fa.engine_context("duckdb"):
    res = fa.fugue_sql('''
    SELECT * FROM input WHERE a<{{x}}
    TRANSFORM USING tr SCHEMA *,c:int
    ''', x=2)
    assert fa.as_array(res) == [[0,1,2]]
fugue.sql.api.fugue_sql_flow(query, *args, fsql_ignore_case=None, fsql_dialect=None, **kwargs)[source]

Fugue SQL full functional interface. This function allows full workflow definition using Fugue SQL, and it allows multiple outputs using YIELD.

Parameters:
  • query (str) – the Fugue SQL string (can be a jinja template)

  • args (Any) – variables related to the SQL string

  • fsql_ignore_case (bool | None) – whether to ignore case when parsing the SQL string, defaults to None (it depends on the engine/global config).

  • fsql_dialect (str | None) – the dialect of this fsql, defaults to None (it depends on the engine/global config).

  • kwargs (Any) – variables related to the SQL string

Returns:

the translated Fugue workflow

Return type:

FugueSQLWorkflow

Note

This function is different from raw_sql() which directly sends the query to the execution engine to run. This function parses the query based on Fugue SQL syntax, creates a FugueSQLWorkflow which could contain multiple raw SQLs plus other operations, and runs and returns the last dataframe generated in the workflow.

This function allows you to parameterize the SQL in a more elegant way. The data tables referred in the query can either be automatically extracted from the local variables or be specified in the arguments.

Caution

Currently, we have not unified the dialects of different SQL backends. So there can be some slight syntax differences when you switch between backends. In addition, we have not unified the UDFs cross different backends, so you should be careful to use uncommon UDFs belonging to a certain backend.

That being said, if you keep your SQL part general and leverage Fugue extensions (transformer, creator, processor, outputter, etc.) appropriately, it should be easy to write backend agnostic Fugue SQL.

We are working on unifying the dialects of different SQLs, it should be available in the future releases. Regarding unifying UDFs, the effort is still unclear.

import fugue.api.fugue_sql_flow as fsql
import fugue.api as fa

# Basic case
fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run()

# With external data sources
df = pd.DataFrame([[0],[1]], columns=["a"])
fsql('''
SELECT * FROM df WHERE a=0
PRINT
''').run()

# With external variables
df = pd.DataFrame([[0],[1]], columns=["a"])
t = 1
fsql('''
SELECT * FROM df WHERE a={{t}}
PRINT
''').run()

# The following is the explicit way to specify variables and datafrems
# (recommended)
df = pd.DataFrame([[0],[1]], columns=["a"])
t = 1
fsql('''
SELECT * FROM df WHERE a={{t}}
PRINT
''', df=df, t=t).run()

# Using extensions
def dummy(df:pd.DataFrame) -> pd.DataFrame:
    return df

fsql('''
CREATE [[0]] SCHEMA a:int
TRANSFORM USING dummy SCHEMA *
PRINT
''').run()

# It's recommended to provide full path of the extension inside
# Fugue SQL, so the SQL definition and exeuction can be more
# independent from the extension definition.

# Run with different execution engines
sql = '''
CREATE [[0]] SCHEMA a:int
TRANSFORM USING dummy SCHEMA *
PRINT
'''

fsql(sql).run(spark_session)
fsql(sql).run("dask")

with fa.engine_context("duckdb"):
    fsql(sql).run()

# Passing dataframes between fsql calls
result = fsql('''
CREATE [[0]] SCHEMA a:int
YIELD DATAFRAME AS x

CREATE [[1]] SCHEMA a:int
YIELD DATAFRAME AS y
''').run(DaskExecutionEngine)

fsql('''
SELECT * FROM x
UNION
SELECT * FROM y
UNION
SELECT * FROM z

PRINT
''', result, z=pd.DataFrame([[2]], columns=["z"])).run()

# Get framework native dataframes
result["x"].native  # Dask dataframe
result["y"].native  # Dask dataframe
result["x"].as_pandas()  # Pandas dataframe

# Use lower case fugue sql
df = pd.DataFrame([[0],[1]], columns=["a"])
t = 1
fsql('''
select * from df where a={{t}}
print
''', df=df, t=t, fsql_ignore_case=True).run()

fugue.sql.workflow

class fugue.sql.workflow.FugueSQLWorkflow(compile_conf=None)[source]

Bases: FugueWorkflow

Fugue workflow that supports Fugue SQL. Please read the Fugue SQL Tutorial.

Parameters:

compile_conf (Any) –

property sql_vars: Dict[str, WorkflowDataFrame]