Source code for saltext.salt_describe.runners.salt_describe

# Copyright 2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Module for building state file

.. versionadded:: 3006

"""
import logging
import pathlib
from inspect import getargspec
from inspect import Parameter
from inspect import signature

import salt.daemons.masterapi  # pylint: disable=import-error
import salt.utils.files  # pylint: disable=import-error
import yaml
from saltext.salt_describe.utils.init import ret_info


__virtualname__ = "describe"

__func_alias__ = {"all_": "all", "top_": "top"}


log = logging.getLogger(__name__)


def __virtual__():
    return __virtualname__


def _exclude_from_all(func):
    """
    Decorator to exclude functions from all function
    """
    func.__all_excluded__ = True
    return func


def _get_all_single_describe_methods():
    """
    Get all methods that should be run in `all`
    """
    single_functions = [
        (name.replace("describe.", ""), loaded_func)
        for name, loaded_func in __salt__.items()
        if name.startswith("describe")
    ]
    names = {}
    for name, loaded_func in single_functions:
        if getattr(loaded_func, "__all_excluded__", False):
            continue
        names[name] = loaded_func
    return names


[docs]@_exclude_from_all def all_(tgt, top=True, include=None, exclude=None, config_system="salt", **kwargs): """ Run all describe methods against target. One of either a exclude or include can be given to specify which functions to run. These can be either a string or python list. CLI Example: .. code-block:: bash salt-run describe.all minion-tgt exclude='["file", "user"]' You can supply args and kwargs to functions that require them as well. These are passed as explicit kwargs. CLI Example: .. code-block:: bash salt-run describe.all minion-tgt include='["file", "pip"]' paths='["/tmp/testfile", "/tmp/testfile2"]' If two functions take an arg or kwarg of the same name, you can differentiate them by prefixing the argument name. CLI Example: .. code-block:: bash salt-run describe.all minion-tgt include='["file", "pip"]' file_paths='["/tmp/testfile", "/tmp/testfile2"]' """ if exclude and include: log.error("Only one of exclude and include can be provided") return False all_methods = _get_all_single_describe_methods() # Sanitize the include and exclude to the extremes if none are given if exclude is None: exclude = set() elif isinstance(exclude, str): exclude = {exclude} elif isinstance(exclude, (list, tuple)): exclude = set(exclude) if include is None: include = all_methods.keys() elif isinstance(include, str): include = {include} elif isinstance(include, (list, tuple)): include = set(include) # The set difference gives us all the allowed methods here allowed_method_names = include - exclude allowed_methods = { name: func for name, func in all_methods.items() if name in allowed_method_names } log.debug("Allowed methods in all: %s", allowed_methods) def _get_arg_for_func(p_name, func_name, kwargs): """ Return the argument value and whether or not it failed to find """ # Allow more specific arg to take precendence spec_name = f"{func_name}_{p_name}" if spec_name in kwargs: return kwargs.get(spec_name), False if p_name in kwargs: return kwargs.get(p_name), False return None, True kwargs["tgt"] = tgt kwargs["config_system"] = config_system sls_files = [] for name, func in allowed_methods.items(): sig = signature(func) call_args = [] call_kwargs = {} args, _, _, defaults = getargspec(func) args = args[: -len(defaults)] misg_req_arg = False for p_name, p_obj in sig.parameters.items(): p_value, failed = _get_arg_for_func(p_name, name, kwargs) # Take care of required args and kwargs if failed and p_obj.kind == Parameter.POSITIONAL_ONLY or p_name in args and not p_value: log.error("Missing positional arg %s for describe.%s", p_name, name) misg_req_arg = True # we can still continue trying to generate other SLS files even if a # required arg is not available for a specific module break if failed and p_obj.kind == Parameter.KEYWORD_ONLY and p_obj.default == Parameter.empty: log.error("Missing required keyword arg %s for describe.%s", p_name, name) misg_req_arg = True # we can still continue trying to generate other SLS files even if a # required arg is not available for a specific module break # We can fail to find some args if failed: continue if ( p_obj.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) and p_name in args ): call_args.append(p_value) elif p_obj.kind == Parameter.VAR_POSITIONAL: if not isinstance(p_value, list): log.error(f"{p_name} must be a Python list") return False call_args.extend(p_value) elif p_obj.kind == Parameter.KEYWORD_ONLY: call_kwargs[p_name] = p_value elif p_obj.kind == Parameter.VAR_KEYWORD: if not isinstance(p_value, dict): log.error(f"{p_name} must be a Python dictionary") return False call_kwargs.update(p_value) elif p_name not in args: call_kwargs[p_name] = p_value if misg_req_arg: continue try: bound_sig = sig.bind(*call_args, **call_kwargs) except TypeError: log.error(f"Invalid args, kwargs for signature of {name}: {call_args}, {call_kwargs}") return False log.debug( "Running describe.%s in all -- tgt: %s\targs: %s\tkwargs: %s", name, tgt, bound_sig.args, bound_sig.kwargs, ) try: # This follows the unwritten standard that the minion target must be the first argument log.debug(f"Generating SLS for {name} module") ret = __salt__[f"describe.{name}"](*bound_sig.args, **bound_sig.kwargs) if isinstance(ret, dict): sls_files = sls_files + list(ret.values())[0] else: log.error(f"Could not generate the SLS file for {name}") except TypeError as err: log.error(err.args[0]) # generate the top file if top: __salt__["describe.top"](tgt) return ret_info(sls_files)
[docs]@_exclude_from_all def top_(tgt, tgt_type="glob", env="base"): """ Add the generated states to top.sls CLI Example: .. code-block:: bash salt-run describe.top minion-tgt """ # Gather minions based on tgt and tgt_type arguments masterapi = salt.daemons.masterapi.RemoteFuncs(__opts__) minions = masterapi.local.gather_minions(tgt, tgt_type) state_file_root = pathlib.Path(__salt__["config.get"]("file_roots:base")[0]) top_file = state_file_root / "top.sls" if not top_file.is_file(): top_file.touch() top_file_dict = {} with salt.utils.files.fopen(top_file, "r") as fp_: top_data = yaml.safe_load(fp_.read()) if top_data: top_file_dict = top_data if env not in top_file_dict: top_file_dict[env] = {} for minion in minions: sls_files = [] add_top = [] minion_file_root = state_file_root / minion if not minion_file_root.exists(): log.error(f"The file root path {minion_file_root} does not exist") return False for file in minion_file_root.iterdir(): if file.suffix == ".sls" and file.stem != "init": sls_files.append(minion + "." + file.stem) # Check to see if the SLS file already exists in top file for sls in sls_files: if sls not in top_file_dict[env].get(minion, []): add_top.append(sls) if minion not in top_file_dict[env]: top_file_dict[env][minion] = add_top else: for _top_file in add_top: top_file_dict[env][minion].append(_top_file) if add_top: with salt.utils.files.fopen(top_file, "w") as fp_: fp_.write(yaml.dump(top_file_dict)) else: return {"Top file was not changed, alread contains correct SLS files": str(top_file)} return ret_info(str(top_file), mod="top file")
[docs]@_exclude_from_all def pillar_top(tgt, tgt_type="glob", env="base"): """ Add the generated pillars to top.sls CLI Example: .. code-block:: bash salt-run describe.top minion-tgt """ # Gather minions based on tgt and tgt_type arguments masterapi = salt.daemons.masterapi.RemoteFuncs(__opts__) minions = masterapi.local.gather_minions(tgt, tgt_type) pillar_file_root = pathlib.Path(__salt__["config.get"]("pillar_roots:base")[0]) top_file = pillar_file_root / "top.sls" if not top_file.is_file(): top_file.touch() top_file_dict = {} with salt.utils.files.fopen(top_file, "r") as fp_: top_file_dict = yaml.safe_load(fp_.read()) if env not in top_file_dict: top_file_dict[env] = {} for minion in minions: add_top = [] minion_pillar_root = pillar_file_root / minion for file in minion_pillar_root.iterdir(): if file.suffix == ".sls" and file.stem != "init": add_top.append(minion + "." + file.stem) if minion not in top_file_dict[env]: top_file_dict[env][minion] = add_top else: top_file_dict[env][minion].append(add_top) with salt.utils.files.fopen(top_file, "w") as fp_: fp_.write(yaml.dump(top_file_dict)) return True