fugue_sql#

fugue_sql.exceptions#

exception fugue_sql.exceptions.FugueSQLError[source]#

Bases: fugue.exceptions.FugueWorkflowCompileError

Fugue SQL error

exception fugue_sql.exceptions.FugueSQLRuntimeError[source]#

Bases: fugue.exceptions.FugueWorkflowRuntimeError

Fugue SQL runtime error

exception fugue_sql.exceptions.FugueSQLSyntaxError[source]#

Bases: fugue_sql.exceptions.FugueSQLError

Fugue SQL syntax error

fugue_sql.workflow#

class fugue_sql.workflow.FugueSQLWorkflow(*args, **kwargs)[source]#

Bases: fugue.workflow.workflow.FugueWorkflow

Fugue workflow that supports Fugue SQL. Please read the Fugue SQL Tutorial.

Parameters
  • args (Any) –

  • kwargs (Any) –

property sql_vars: Dict[str, fugue.workflow.workflow.WorkflowDataFrame]#
fugue_sql.workflow.fsql(sql, *args, fsql_ignore_case=False, **kwargs)[source]#

Fugue SQL functional interface

Parameters
  • sql (str) – the Fugue SQL string (can be a jinja template)

  • args (Any) – variables related to the SQL string

  • fsql_ignore_case (bool) – whether to ignore case when parsing the SQL string defaults to False.

  • kwargs (Any) – variables related to the SQL string

Returns

the translated Fugue workflow

Return type

fugue_sql.workflow.FugueSQLWorkflow

# Basic case
fsql('''
CREATE [[0]] SCHEMA a:int
PRINT
''').run()

# With external data sources
df = pd.DataFrame([[0],[1]], columns=["a"])
fsql('''
SELECT * FROM df WHERE a=0
PRINT
''').run()

# With external variables
df = pd.DataFrame([[0],[1]], columns=["a"])
t = 1
fsql('''
SELECT * FROM df WHERE a={{t}}
PRINT
''').run()

# The following is the explicit way to specify variables and datafrems
# (recommended)
df = pd.DataFrame([[0],[1]], columns=["a"])
t = 1
fsql('''
SELECT * FROM df WHERE a={{t}}
PRINT
''', df=df, t=t).run()

# Using extensions
def dummy(df:pd.DataFrame) -> pd.DataFrame:
    return df

fsql('''
CREATE [[0]] SCHEMA a:int
TRANSFORM USING dummy SCHEMA *
PRINT
''').run()

# It's recommended to provide full path of the extension inside
# Fugue SQL, so the SQL definition and exeuction can be more
# independent from the extension definition.

# Run with different execution engines
sql = '''
CREATE [[0]] SCHEMA a:int
TRANSFORM USING dummy SCHEMA *
PRINT
'''

fsql(sql).run(user_defined_spark_session())
fsql(sql).run(SparkExecutionEngine, {"spark.executor.instances":10})
fsql(sql).run(DaskExecutionEngine)

# Passing dataframes between fsql calls
result = fsql('''
CREATE [[0]] SCHEMA a:int
YIELD DATAFRAME AS x

CREATE [[1]] SCHEMA a:int
YIELD DATAFRAME AS y
''').run(DaskExecutionEngine)

fsql('''
SELECT * FROM x
UNION
SELECT * FROM y
UNION
SELECT * FROM z

PRINT
''', result, z=pd.DataFrame([[2]], columns=["z"])).run()

# Get framework native dataframes
result["x"].native  # Dask dataframe
result["y"].native  # Dask dataframe
result["x"].as_pandas()  # Pandas dataframe

# Use lower case fugue sql
df = pd.DataFrame([[0],[1]], columns=["a"])
t = 1
fsql('''
select * from df where a={{t}}
print
''', df=df, t=t, fsql_ignore_case=True).run()