Source code for fugue.extensions.context

from typing import Any, Dict, Union

from fugue.collections.partition import PartitionCursor, PartitionSpec
from fugue.dataframe import DataFrame, DataFrames
from fugue.execution.execution_engine import ExecutionEngine
from fugue.extensions._utils import validate_input_schema, validate_partition_spec
from fugue.rpc import RPCClient, RPCServer
from triad.collections import ParamDict, Schema
from triad.utils.convert import get_full_type_path
from triad.utils.hash import to_uuid


[docs] class ExtensionContext(object): # pylint: disable=E1101 """Context variables that extensions can access. It's also the base class of all extensions. """ @property def params(self) -> ParamDict: """Parameters set for using this extension. .. admonition:: Examples >>> FugueWorkflow().df(...).transform(using=dummy, params={"a": 1}) You will get ``{"a": 1}`` as `params` in the ``dummy`` transformer """ return self._params # type: ignore @property def workflow_conf(self) -> ParamDict: """Workflow level configs, this is accessible even in :class:`~fugue.extensions.transformer.transformer.Transformer` and :class:`~fugue.extensions.transformer.transformer.CoTransformer` .. admonition:: Examples >>> dag = FugueWorkflow().df(...).transform(using=dummy) >>> dag.run(NativeExecutionEngine(conf={"b": 10})) You will get ``{"b": 10}`` as `workflow_conf` in the ``dummy`` transformer on both driver and workers. """ if "_workflow_conf" in self.__dict__: return self._workflow_conf # type: ignore return self.execution_engine.conf @property def execution_engine(self) -> ExecutionEngine: """Execution engine for the current execution, this is only available on driver side """ return self._execution_engine # type: ignore @property def output_schema(self) -> Schema: """Output schema of the operation. This is accessible for all extensions ( if defined), and on both driver and workers """ return self._output_schema # type: ignore @property def key_schema(self) -> Schema: """Partition keys schema, this is for transformers only, and available on both driver and workers """ return self._key_schema # type: ignore @property def partition_spec(self) -> PartitionSpec: """Partition specification, this is for all extensions except for creators, and available on both driver and workers """ return self._partition_spec # type: ignore @property def cursor(self) -> PartitionCursor: """Cursor of the current logical partition, this is for transformers only, and only available on worker side """ return self._cursor # type: ignore @property def has_callback(self) -> bool: """Whether this transformer has callback""" return ( "_has_rpc_client" in self.__dict__ and self._has_rpc_client # type: ignore ) @property def callback(self) -> RPCClient: """RPC client to talk to driver, this is for transformers only, and available on both driver and workers """ return self._rpc_client # type: ignore @property def rpc_server(self) -> RPCServer: """RPC client to talk to driver, this is for transformers only, and available on both driver and workers """ return self._rpc_server # type: ignore @property def validation_rules(self) -> Dict[str, Any]: """Extension input validation rules defined by user""" return {}
[docs] def validate_on_compile(self) -> None: validate_partition_spec(self.partition_spec, self.validation_rules)
[docs] def validate_on_runtime(self, data: Union[DataFrame, DataFrames]) -> None: if isinstance(data, DataFrame): validate_input_schema(data.schema, self.validation_rules) else: for df in data.values(): validate_input_schema(df.schema, self.validation_rules)
def __uuid__(self) -> str: return to_uuid(get_full_type_path(self))