from typing import Any, Dict
from triad import ParamDict
KEYWORD_ROWCOUNT = "ROWCOUNT"
KEYWORD_PARALLELISM = "CONCURRENCY"
FUGUE_ENTRYPOINT = "fugue.plugins"
FUGUE_SQL_DEFAULT_DIALECT = "spark"
FUGUE_CONF_WORKFLOW_CONCURRENCY = "fugue.workflow.concurrency"
FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH = "fugue.workflow.checkpoint.path"
FUGUE_CONF_WORKFLOW_AUTO_PERSIST = "fugue.workflow.auto_persist"
FUGUE_CONF_WORKFLOW_AUTO_PERSIST_VALUE = "fugue.workflow.auto_persist_value"
FUGUE_CONF_CACHE_PATH = "fugue.workflow.cache.path"
FUGUE_CONF_WORKFLOW_EXCEPTION_HIDE = "fugue.workflow.exception.hide"
FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT = "fugue.workflow.exception.inject"
FUGUE_CONF_WORKFLOW_EXCEPTION_OPTIMIZE = "fugue.workflow.exception.optimize"
FUGUE_CONF_SQL_IGNORE_CASE = "fugue.sql.compile.ignore_case"
FUGUE_CONF_SQL_DIALECT = "fugue.sql.compile.dialect"
FUGUE_CONF_DEFAULT_PARTITIONS = "fugue.default.partitions"
FUGUE_COMPILE_TIME_CONFIGS = set(
[
FUGUE_CONF_WORKFLOW_AUTO_PERSIST,
FUGUE_CONF_WORKFLOW_AUTO_PERSIST_VALUE,
FUGUE_CONF_WORKFLOW_EXCEPTION_HIDE,
FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT,
FUGUE_CONF_WORKFLOW_EXCEPTION_OPTIMIZE,
FUGUE_CONF_SQL_IGNORE_CASE,
FUGUE_CONF_SQL_DIALECT,
]
)
_FUGUE_GLOBAL_CONF = ParamDict(
{
FUGUE_CONF_WORKFLOW_CONCURRENCY: 1,
FUGUE_CONF_WORKFLOW_AUTO_PERSIST: False,
FUGUE_CONF_WORKFLOW_EXCEPTION_HIDE: "fugue.,six,adagio.,pandas,"
"fugue_dask.,dask.,fugue_spark.,pyspark.,antlr4,_qpd_antlr,qpd,triad,"
"fugue_notebook.,ipython.,jupyter.,ipykernel,_pytest,pytest,fugue_ibis.",
FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 3,
FUGUE_CONF_WORKFLOW_EXCEPTION_OPTIMIZE: True,
FUGUE_CONF_SQL_IGNORE_CASE: False,
FUGUE_CONF_SQL_DIALECT: FUGUE_SQL_DEFAULT_DIALECT,
FUGUE_CONF_DEFAULT_PARTITIONS: -1,
}
)
[docs]
def register_global_conf(
conf: Dict[str, Any], on_dup: int = ParamDict.OVERWRITE
) -> None:
"""Register global Fugue configs that can be picked up by any
Fugue execution engines as the base configs.
:param conf: the config dictionary
:param on_dup: see :meth:`triad.collections.dict.ParamDict.update`
, defaults to ``ParamDict.OVERWRITE``
.. note::
When using ``ParamDict.THROW`` or ``on_dup``, it's transactional.
If any key in ``conf`` is already in global config and the value
is different from the new value, then ValueError will be thrown.
.. admonition:: Examples
.. code-block:: python
from fugue import register_global_conf, NativeExecutionEngine
register_global_conf({"my.value",1})
engine = NativeExecutionEngine()
assert 1 == engine.conf["my.value"]
engine = NativeExecutionEngine({"my.value",2})
assert 2 == engine.conf["my.value"]
"""
if on_dup == ParamDict.THROW:
# be transactional
for k, v in conf.items():
if k in _FUGUE_GLOBAL_CONF:
vv = _FUGUE_GLOBAL_CONF[k]
if vv != v:
raise ValueError(
f"for global config {k}, the existed "
f"value is {vv} and can't take new value {v}"
)
on_dup = ParamDict.OVERWRITE
_FUGUE_GLOBAL_CONF.update(conf, on_dup=on_dup, deep=True)