Source code for bsb.config._make

import functools
import importlib
import inspect
import os
import sys
import types
import warnings
from collections import defaultdict
from re import sub

import errr

from .._package_spec import warn_missing_packages
from .._util import get_qualified_class_name
from ..exceptions import (
    AttributeOrderError,
    BootError,
    CastError,
    ConfigurationError,
    DynamicClassError,
    DynamicClassInheritanceError,
    DynamicObjectNotFoundError,
    PluginError,
    RequirementError,
    UnfitClassCastError,
    UnresolvedClassCastError,
)
from ..reporting import warn
from ._hooks import overrides


def _has_own_init(meta_subject, kwargs):
    try:
        determined_class = meta_subject.__new__.class_determinant(meta_subject, kwargs)
        return overrides(determined_class, "__init__", mro=True)
    except Exception:
        return overrides(meta_subject, "__init__", mro=True)


def make_metaclass(cls):
    # We make a `NodeMeta` class for each decorated node class, in compliance with any
    # metaclasses they might already have (to prevent metaclass confusion).
    # The purpose of the metaclass is to rewrite `__new__` and `__init__` arguments,
    # and to always call `__new__` and `__init__` in the same manner.
    # The metaclass makes it so that there are 3 overloaded constructor forms:
    #
    # MyNode({ <config dict values> })
    # MyNode(example="attr", values="here")
    # ParentNode(me=MyNode(...))
    #
    # The third makes it that type handling and other types of casting opt out early
    # and keep the object reference that the user gives them
    class ConfigArgRewrite:
        def __call__(meta_subject, *args, _parent=None, _key=None, **kwargs):
            has_own_init = _has_own_init(meta_subject, kwargs)
            # Rewrite the arguments
            primer = args[0] if args else None
            if isinstance(primer, meta_subject):
                _set_pk(primer, _parent, _key)
                return primer
            elif isinstance(primer, dict):
                args = args[1:]
                primed = primer.copy()
                primed.update(kwargs)
                kwargs = primed
            elif primer is not None and not has_own_init:
                # If we're dealing with a typical config node, the primer should be a dict
                # or already precast node. If it is not, we consider it invalid input,
                # unless the user has specified its own `__init__` function and will deal
                # with the input arguments there.
                raise ValueError(f"Unexpected positional argument '{primer}'")
            # Call the base class's new with internal arguments
            instance = meta_subject.__new__(
                meta_subject, *args, _parent=_parent, _key=_key, **kwargs
            )
            instance._config_pos_init = getattr(instance, "_config_pos_init", False)
            # Call the end user's __init__ with the rewritten arguments, if one is defined
            if has_own_init:
                sig = inspect.signature(instance.__init__)
                try:
                    # Check whether the arguments match the signature. We use `sig.bind`
                    # so that the function isn't actually called, as this could mask
                    # `TypeErrors` that occur inside the function.
                    sig.bind(*args, **kwargs)
                except TypeError as e:
                    # Since the user might not know where all these additional arguments
                    # are coming from, inform them that config nodes get passed their
                    # config attrs, and how to correctly override __init__.
                    Param = inspect.Parameter
                    help_params = {"self": Param("self", Param.POSITIONAL_OR_KEYWORD)}
                    help_params.update(sig._parameters)
                    help_params["kwargs"] = Param("kwargs", Param.VAR_KEYWORD)
                    sig._parameters = help_params
                    raise TypeError(
                        f"`{instance.__init__.__module__}.__init__` {e}."
                        + " When overriding `__init__` on config nodes, do not define"
                        + " any positional arguments, and catch any additional"
                        + " configuration attributes that are passed as keyword arguments"
                        + f": e.g. 'def __init__{sig}'"
                    ) from None
                else:
                    instance._config_pos_init = bool(len(args))
                    instance.__init__(*args, **kwargs)
            return instance

    # Avoid metaclass conflicts by prepending our rewrite class to existing metaclass MRO
    class NodeMeta(ConfigArgRewrite, *cls.__class__.__mro__):
        def __new__(cls, *args, **kwargs):
            rcls = super().__new__(cls, *args, **kwargs)
            # `__init_subclass__` refused to be called with correct subclass, so call
            # it ourselves.
            if hasattr(rcls.__bases__[0], "_cfgnode_replaced_ics"):
                rcls.__bases__[0]._cfgnode_replaced_ics(rcls, **kwargs)
            return rcls

    return NodeMeta


class NodeKwargs(dict):
    """
    The raw input kwargs of a node, plus a handle to the node being built.

    Passed to ``required=`` callables. ``partial_node`` is the node instance
    under construction; use it to inspect identity, ``_config_parent`` and
    ``_config_key``.

    .. warning::

        A node is only partially built while its ``required=`` checkers run, so
        a checker cannot rely on attribute values being set. The node's own
        attributes are not assigned yet, and a parent attribute is available
        only if it is declared before the attribute currently being built.
        Build order follows attribute declaration order, not the order the keys
        appear in the user's configuration, so it is at least deterministic.
    """

    def __init__(self, partial_node, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.is_shortform = getattr(partial_node, "_config_pos_init", False)
        self.partial_node = partial_node


[docs] def compose_nodes(*node_classes): """ Create a composite mixin class of the given classes. Inherit from the returned class to inherit from more than one node class. """ meta = type("ComposedMetaclass", tuple(type(cls) for cls in node_classes), {}) return meta("CompositionMixin", node_classes, {})
def compile_class(cls): cls_dict = dict(cls.__dict__) if "__dict__" in cls_dict: del cls_dict["__dict__"] if "__weakref__" in cls_dict: del cls_dict["__weakref__"] ncls = make_metaclass(cls)(cls.__name__, cls.__bases__, cls_dict) for method in ncls.__dict__.values(): _replace_closure_cells(method, cls, ncls) # Shitty hack, for some reason I couldn't find a way to override the first argument # of `__init_subclass__` methods, that would otherwise work on other classmethods, # so we noop the actual `__init_subclass__` and we call `__init_subclass__` ourselves # from the metaclass' `__new__` method, where the argument replacement works as usual. if ( hasattr(ncls, "__init_subclass__") and "__init_subclass__" in ncls.__dict__ and not isinstance(ncls.__init_subclass__, types.BuiltinFunctionType) ): ncls._cfgnode_replaced_ics = ncls.__init_subclass__.__func__ ncls.__init_subclass__ = lambda *args, **kwargs: None classmap = getattr(ncls, "_config_dynamic_classmap", None) if classmap is not None: # Replace the reference to the old class with the new class. # The auto classmap entry is added in `__init_subclass__`, which happens before # we replace the class. for k, v in classmap.items(): if v is cls: classmap[k] = ncls return ncls def _replace_closure_cells(method, old, new): if old is new: return cl = getattr(method, "__closure__", None) or [] for cell in cl: if cell.cell_contents is old: cell.cell_contents = new elif inspect.isfunction(cell.cell_contents): # WARNING: If you end up with an infinitely recursive call here, you probably # have a decorator factory somewhere that stores references to node class # methods in a closure. I haven't figured out completely why it leads to # infinite recursion. The solution is to split the factory into 2 pieces so # that instead of a closure, the reference to the node class method can be # stored as a function call argument. Split the factory into an outer function # that retrieves the node class method and passes it to an inner function that # produces the decorator and calls the original function. See # `bsb.reporting._instrument_node` for an example of this. _replace_closure_cells(cell.cell_contents, old, new) def compile_isc(node_cls, dynamic_config): if not dynamic_config or not dynamic_config.auto_classmap: return node_cls.__init_subclass__ from ._hooks import overrides def dud(*args, **kwargs): pass f = node_cls.__init_subclass__ if overrides(node_cls, "__init_subclass__") else dud def __init_subclass__(cls, classmap_entry=MISSING, **kwargs): super(node_cls, cls).__init_subclass__(**kwargs) if classmap_entry is MISSING: classmap_entry = _snake_case(cls.__name__) if classmap_entry is not None: node_cls._config_dynamic_classmap[classmap_entry] = cls f(**kwargs) return classmethod(__init_subclass__) def _snake_case(s): return "_".join( sub("([A-Z][a-z]+)", r" \1", sub("([A-Z]+)", r" \1", s.replace("-", " "))).split() ).lower() def _node_determinant(cls, kwargs): return cls def compile_new(node_cls, dynamic=False, pluggable=False, root=False): if pluggable: class_determinant = _get_pluggable_class elif dynamic: class_determinant = _get_dynamic_class else: class_determinant = _node_determinant def __new__(_cls, *args, _parent=None, _key=None, **kwargs): ncls = class_determinant(_cls, kwargs) instance = object.__new__(ncls) instance._config_pos_init = bool(len(args)) _set_pk(instance, _parent, _key) if root: instance._config_isfinished = False instance.__post_new__(**kwargs) if _cls is not ncls: instance.__init__(*args, **kwargs) return instance __new__.class_determinant = class_determinant return __new__ def _set_pk(obj, parent, key): obj._config_parent = parent obj._config_key = key if not hasattr(obj, "_config_attr_order"): obj._config_attr_order = [] if not hasattr(obj, "_config_state"): obj._config_state = {} for a in get_config_attributes(obj.__class__).values(): if a.key: from ._attrs import _setattr _setattr(obj, a.attr_name, key) def _missing_requirements(instance, attr, kwargs): # We use `self.__class__`, not `cls`, to get the proper child class. cls = instance.__class__ dynamic_root = getattr(cls, "_config_dynamic_root", None) kwargs = NodeKwargs(instance, kwargs) if dynamic_root is not None: dynamic_attr = dynamic_root._config_dynamic_attr # If we are checking the dynamic attribute, but we're already a dynamic subclass, # we skip the required check. return ( attr.attr_name == dynamic_attr and cls is dynamic_root and attr.required(kwargs) ) or (attr.attr_name != dynamic_attr and attr.required(kwargs)) else: return attr.required(kwargs) def compile_postnew(cls): def __post_new__(self, _parent=None, _key=None, **kwargs): attrs = get_config_attributes(self.__class__) self._config_attr_order = list(kwargs.keys()) catch_attrs = [a for a in attrs.values() if hasattr(a, "__catch__")] leftovers = kwargs.copy() values = {} for attr in attrs.values(): name = attr.attr_name value = values[name] = leftovers.pop(name, None) try: if _missing_requirements(self, attr, kwargs) and value is None: raise RequirementError(f"Missing required attribute '{name}'") except RequirementError as e: # Catch both our own and possible `attr.required` RequirementErrors # and set the node detail before passing it on e.node = self raise for attr in attrs.values(): name = attr.attr_name if attr.key and attr.attr_name not in kwargs: # If this is a "key" attribute, and the user didn't overwrite it, # set the attribute to the config key setattr(self, name, self._config_key) attr.flag_pristine(self) elif (value := values[name]) is None: if _is_settable_attr(attr): setattr(self, name, attr.get_default()) attr.flag_pristine(self) else: setattr(self, name, value) attr.flag_dirty(self) for key, value in leftovers.items(): try: _try_catch_attrs(self, catch_attrs, key, value) except UncaughtAttributeError: try: setattr(self, key, value) except AttributeError: raise AttributeError( f"Configuration attribute key '{key}' conflicts with" + f" readonly class attribute on `{self.__class__.__module__}" + f".{self.__class__.__name__}`." ) from None raise ConfigurationError(f"Unknown attribute: '{key}'") from None return __post_new__ def wrap_root_postnew(post_new): def __post_new__(self, *args, _parent=None, _key=None, _store=None, **kwargs): from ._build_context import build_context if not hasattr(self, "_meta"): self._meta = {"path": None, "produced": True} with build_context(): try: # Root node bootstrapping sequence _bootstrap_components(kwargs.get("components", []), file_store=_store) warn_missing_packages(kwargs.get("packages", [])) except Exception as e: raise BootError("Failed to bootstrap configuration.") from e try: with warnings.catch_warnings(record=True) as log: try: post_new(self, *args, _parent=None, _key=None, **kwargs) except (CastError, RequirementError) as e: _bubble_up_exc(e, self._meta) self._config_isfinished = True _resolve_references(self) finally: _bubble_up_warnings(log) return __post_new__ def _is_settable_attr(attr): return not hasattr(attr, "fget") or attr.fset def _bubble_up_exc(exc, meta): if hasattr(exc, "node") and exc.node is not None: node = " in " + exc.node.get_node_name() else: node = "" attr = f".{exc.attr}" if hasattr(exc, "attr") and exc.attr else "" errr.wrap(type(exc), exc, append=node + attr) def _bubble_up_warnings(log): for w in log: m = w.message if hasattr(m, "node"): # Unpack the inner Warning that was passed instead of the warning msg attr = f".{m.attr.attr_name}" if hasattr(m, "attr") else "" warn(str(m) + " in " + m.node.get_node_name() + attr, type(m), stacklevel=4) else: warn(str(m), w.category, stacklevel=4) def _bootstrap_components(components, file_store=None): from ..storage._files import CodeDependencyNode for component in components: component_node = CodeDependencyNode(component) component_node.file_store = file_store component_node.load_object()
[docs] def get_config_attributes(cls): """Collect a node class's configuration attributes in build order. Walks the MRO base-first and threads each class's declared attributes into the order inherited so far, applying these rules: * Every class's declaration order is preserved as a subsequence. * An overridden attribute keeps its inherited slot; only its value is replaced by the most-derived declaration. * A new attribute is spliced in just before the next attribute the class declares after it that is already present, or appended when the class declares no such following attribute. * A subclass may reorder inherited attributes by redeclaring them, but only if it also redeclares every attribute caught between the ones it moves; otherwise the requested order is ambiguous and an :exc:`~bsb.exceptions.AttributeOrderError` is raised naming the attributes to redeclare. """ from ._attrs import ConfigurationAttribute if not isinstance(cls, type): cls = cls.__class__ order = [] values = {} unset = set() for p_cls in reversed(cls.__mro__): # base to most-derived decl = [ key for key, attr in vars(p_cls).items() if isinstance(attr, ConfigurationAttribute) ] for key in decl: # A more-derived override wins the value and cancels a prior unset. values[key] = vars(p_cls)[key] unset.discard(key) in_order = set(order) # Honor a reordering of inherited attributes only when it is # unambiguous, i.e. the class redeclares every attribute that sits # between the ones it moves. redeclared = [key for key in decl if key in in_order] if redeclared != [key for key in order if key in decl]: positions = [order.index(key) for key in redeclared] lo, hi = min(positions), max(positions) blocked = [key for key in order[lo : hi + 1] if key not in decl] if blocked: raise AttributeOrderError( f"`{cls.__name__}` redeclares {redeclared} in an order" f" conflicting with its parents, but {blocked} lie between" f" them. Redeclare {blocked} as well to define the order" " unambiguously.", redeclared, blocked, ) order = order[:lo] + redeclared + order[hi + 1 :] # Splice this class's new attributes into the inherited order: each lands # just before the next already-present attribute the class declares after # it (its following anchor), or at the end when there is none. following_anchor = {} anchor = None for key in reversed(decl): if key in in_order: anchor = key else: following_anchor[key] = anchor groups = {} for key in decl: if key not in in_order: groups.setdefault(following_anchor[key], []).append(key) if groups: spliced = [] for key in order: spliced.extend(groups.get(key, ())) spliced.append(key) spliced.extend(groups.get(None, ())) order = spliced for key in getattr(p_cls, "_config_unset", []): values.pop(key, None) if key in order: order.remove(key) unset.add(key) # Catch attributes registered on a class's merged set but not present as a # class attribute (e.g. injected onto a dynamic subclass), without # resurrecting any that a subclass unset. for p_cls in reversed(cls.__mro__): for key, attr in getattr(p_cls, "_config_attrs", {}).items(): if key not in values and key not in unset: values[key] = attr order.append(key) return {key: values[key] for key in order}
def _get_node_name(self): name = ".<missing>" if getattr(self, "attr_name", None) is not None: name = "." + str(self.attr_name) if getattr(self, "_config_key", None) is not None: name = "." + str(self._config_key) if hasattr(self, "_config_index"): if self._config_index is None: return "{removed}" else: name = "[" + str(self._config_index) + "]" if getattr(self, "name", None) is not None: name = "." + self.name if getattr(self, "_config_parent", None): return self._config_parent.get_node_name() + name else: return "{standalone}" + name def make_get_node_name(node_cls, root): if root: node_cls.get_node_name = lambda self: r"{root}" else: node_cls.get_node_name = _get_node_name class UncaughtAttributeError(Exception): pass def _try_catch_attrs(node, catchers, key, value): # See if any of the attributes in the node can catch the value of an unknown key in # the configuration section. If none of them catch the value, raise an # `UncaughtAttributeError` for attr in catchers: try: _try_catch(attr.__catch__, node, key, value) break except UncaughtAttributeError: pass else: raise UncaughtAttributeError() def _try_catch(catch, node, key, value): try: return catch(node, key, value) except Exception: raise UncaughtAttributeError() from None def _get_dynamic_class(node_cls, kwargs): if node_cls is not node_cls._config_dynamic_root: # When the node is already a subclass of its dynamic root, we don't need to cast # it anymore. return node_cls attr_name = node_cls._config_dynamic_attr dynamic_attr = getattr(node_cls, attr_name) if attr_name in kwargs: loaded_cls_name = kwargs[attr_name] elif dynamic_attr.required(kwargs): raise RequirementError(f"Dynamic node must contain a '{attr_name}' attribute") elif dynamic_attr.should_call_default(): # pragma: nocover loaded_cls_name = dynamic_attr.default() else: # Fall back to the default value, or the current class. loaded_cls_name = dynamic_attr.default or node_cls.__name__ module_path = ["__main__", node_cls.__module__] classmap = get_classmap(node_cls) interface = node_cls._config_dynamic_root try: dynamic_cls = _load_class( loaded_cls_name, module_path, interface=interface, classmap=classmap ) except DynamicClassInheritanceError: mapped_class_msg = _get_mapped_class_msg(loaded_cls_name, classmap) raise UnfitClassCastError( f"'{loaded_cls_name}'{mapped_class_msg} is not a valid class as it does not" f" inherit from {node_cls.__name__}" ) from None except DynamicClassError: mapped_class_msg = _get_mapped_class_msg(loaded_cls_name, classmap) raise UnresolvedClassCastError( f"Could not resolve '{loaded_cls_name}'{mapped_class_msg} to a class" ) from None return dynamic_cls def _get_pluggable_class(node_cls, kwargs): plugin_label = node_cls._config_plugin_name or node_cls.__name__ if node_cls._config_plugin_key not in kwargs: raise CastError( f"Pluggable node must contain a '{node_cls._config_plugin_key}' attribute " f"to select a {plugin_label}" ) plugin_name = kwargs[node_cls._config_plugin_key] plugins = node_cls.__plugins__() if plugin_name not in plugins: raise PluginError(f"Unknown {plugin_label} '{plugin_name}'") plugin_cls = plugins[plugin_name] # TODO: Enforce class inheritance return plugin_cls def _get_mapped_class_msg(loaded_cls_name, classmap): if classmap and loaded_cls_name in classmap: return f" (mapped to '{classmap[loaded_cls_name]}')" else: return "" def _load_class(cfg_classname, module_path, interface=None, classmap=None): if classmap and cfg_classname in classmap: cfg_classname = classmap[cfg_classname] if inspect.isclass(cfg_classname): class_ref = cfg_classname class_name = cfg_classname.__name__ else: class_ref = _load_object(cfg_classname, module_path) class_name = class_ref.__name__ def qualname(cls): return cls.__module__ + "." + cls.__name__ if interface and not issubclass(class_ref, interface): raise DynamicClassInheritanceError( f"Dynamic class '{class_name}' must derive from {qualname(interface)}" ) return class_ref def _load_object(object_path, module_path): class_parts = object_path.split(".") object_name = class_parts[-1] module_name = ".".join(class_parts[:-1]) if not module_name: object_ref = _search_module_path(object_name, module_path, object_path) else: object_ref = _get_module_object(object_name, module_name, object_path) return object_ref def _search_module_path(class_name, module_path, cfg_classname): for module_name in module_path: module_dict = sys.modules[module_name].__dict__ if class_name in module_dict: return module_dict[class_name] raise DynamicObjectNotFoundError("Class not found: " + cfg_classname) def _get_module_object(object_name, module_name, object_path): sys.path.append(os.getcwd()) try: module_ref = importlib.import_module(module_name) finally: tmp = list(reversed(sys.path)) tmp.remove(os.getcwd()) sys.path = list(reversed(tmp)) try: return getattr(module_ref, object_name) except Exception: raise DynamicObjectNotFoundError(f"'{object_path}' not found.") from None def make_dictable(node_cls): def __contains__(self, attr): return attr in get_config_attributes(self.__class__) def __getitem__(self, attr): if attr in get_config_attributes(self.__class__): return getattr(self, attr) else: raise KeyError(attr) def __iter__(self): return (attr for attr in get_config_attributes(self.__class__)) node_cls.__contains__ = __contains__ node_cls.__getitem__ = __getitem__ node_cls.__iter__ = __iter__ def make_tree(node_cls): def get_tree(instance): if hasattr(instance, "__inv__") and not getattr(instance, "_config_inv", None): instance._config_inv = True inv = instance.__inv__() instance._config_inv = False return inv attrs = get_config_attributes(instance.__class__) catch_attrs = [a for a in attrs.values() if hasattr(a, "__catch__")] tree = {} for name in instance._config_attr_order: if name in attrs: attr = attrs[name] value = attr.tree(instance) if attr.is_dirty(instance) else None else: for catcher in catch_attrs: if catcher.contains(instance, name): value = catcher.tree_callback(instance, name) break else: value = getattr(instance, name, None) if value is not None: tree[name] = value return tree node_cls.__tree__ = get_tree def make_copyable(node_cls): def loc_copy(instance, memo=None): return type(instance)(instance.__tree__()) node_cls.__copy__ = loc_copy node_cls.__deepcopy__ = loc_copy
[docs] def walk_node_attributes(node): """ Walk over all the child configuration nodes and attributes of ``node``. :returns: attribute, node, parents :rtype: Tuple[:class:`~.config.ConfigurationAttribute`, Any, Tuple] """ attrs = get_config_attributes(node) if not attrs: if hasattr(node, "_config_attr"): attrs = _get_walkable_iterator(node) else: return for attr in attrs.values(): yield node, attr # Yield but don't follow references. if hasattr(attr, "__ref__"): continue child = attr.__get__(node, node.__class__) yield from walk_node_attributes(child)
[docs] def walk_nodes(node): """ Walk over all the child configuration nodes of ``node``. :returns: node generator :rtype: Any """ if hasattr(node.__class__, "_config_attrs"): attrs = node.__class__._config_attrs elif hasattr(node, "_config_attr"): attrs = _get_walkable_iterator(node) else: return yield node for attr in attrs.values(): # Yield but don't follow references. if hasattr(attr, "__ref__"): continue child = attr.__get__(node, node.__class__) yield from walk_nodes(child)
def walk_node_values(start_node): for node, attr in walk_node_attributes(start_node): yield node, attr.attr_name, attr.__get__(node, node.__class__) def _resolve_references(root): from ._attrs import _setattr if root._config_isfinished: for node, attr in walk_node_attributes(root): if hasattr(attr, "__ref__"): ref = attr.__ref__(node, root) if ref is not None or getattr(node, attr.attr_name, None) is None: _setattr(node, attr.attr_name, ref) class WalkIterDescriptor: def __init__(self, n, v): self.attr_name = n self.v = v def __get__(self, instance, cls): return self.v def _get_walkable_iterator(node): if isinstance(node, dict): walkiter = {} for name, value in node.items(): walkiter[name] = WalkIterDescriptor(name, value) return walkiter elif isinstance(node, list): walkiter = {} for i, value in enumerate(node): walkiter[i] = WalkIterDescriptor(i, value) return walkiter _classmap_registry = defaultdict(dict) @functools.cache def load_component_plugins(): from ..plugins import discover plugins = discover("components") for plugin in plugins.values(): if isinstance(plugin, dict): for class_name, classmap in plugin.items(): register_classmap(class_name, classmap) return plugins def register_classmap(cls_name, classmap): _classmap_registry[cls_name].update(classmap) def get_classmap(cls): load_component_plugins() classmap = getattr(cls, "_config_dynamic_classmap", {}) classmap.update(_classmap_registry[get_qualified_class_name(cls)]) return classmap MISSING = object()