From 3d24b40fa552fb5a9e5525f02ae6ee8d8e87b8f1 Mon Sep 17 00:00:00 2001 From: sgoral Date: Wed, 14 May 2025 11:30:42 +0200 Subject: [PATCH 1/3] fix: init commit, remove btool from solnlib --- solnlib/splunkenv.py | 84 ++++++++------------------------------------ 1 file changed, 14 insertions(+), 70 deletions(-) diff --git a/solnlib/splunkenv.py b/solnlib/splunkenv.py index d27ef882..9fd40c54 100644 --- a/solnlib/splunkenv.py +++ b/solnlib/splunkenv.py @@ -19,10 +19,9 @@ import os import os.path as op import socket -import subprocess -from configparser import ConfigParser -from io import StringIO from typing import List, Optional, Tuple, Union +from splunk.clilib.cli_common import getAppConf +from splunk.clilib.bundle_paths import make_splunkhome_path from .utils import is_true @@ -112,41 +111,7 @@ def make_splunkhome_path(parts: Union[List, Tuple]) -> str: ValueError: Escape from intended parent directories. """ - relpath = os.path.normpath(os.path.join(*parts)) - - basepath = None - shared_storage = _get_shared_storage() - if shared_storage: - for candidate in on_shared_storage: - # SPL-100508 On windows if the path is missing the drive letter, - # construct fullpath manually and call relpath - if os.name == "nt" and not _verify_path_prefix(relpath, candidate): - break - - if os.path.relpath(relpath, candidate)[0:2] != "..": - basepath = shared_storage - break - - if basepath is None: - etc_with_trailing_sep = os.path.join(ETC_LEAF, "") - if relpath == ETC_LEAF or relpath.startswith(etc_with_trailing_sep): - # Redirect $SPLUNK_HOME/etc to $SPLUNK_ETC. - basepath = _splunk_etc() - # Remove leading etc (and path separator, if present). Note: when - # emitting $SPLUNK_ETC exactly, with no additional path parts, we - # set to the empty string. - relpath = relpath[4:] - else: - basepath = _splunk_home() - - fullpath = os.path.normpath(os.path.join(basepath, relpath)) - - # Check that we haven't escaped from intended parent directories. - if os.path.relpath(fullpath, basepath)[0:2] == "..": - raise ValueError( - f'Illegal escape from parent directory "{basepath}": {fullpath}' - ) - return fullpath + make_splunkhome_path(parts) def get_splunk_host_info() -> Tuple: @@ -284,7 +249,7 @@ def get_conf_stanza( return stanzas[stanza] -def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict: +def get_conf_stanzas(conf_name: str, app_name: str, logger = None) -> dict: """Get stanzas of `conf_name` Arguments: @@ -299,34 +264,13 @@ def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict: >>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...} """ - if conf_name.endswith(".conf"): - conf_name = conf_name[:-5] - - # TODO: dynamically calculate SPLUNK_HOME - btool_cli = [ - op.join(os.environ["SPLUNK_HOME"], "bin", "splunk"), - "cmd", - "btool", - conf_name, - "list", - ] - - if app_name: - btool_cli.append(f"--app={app_name}") - - p = subprocess.Popen( # nosemgrep: python.lang.security.audit.dangerous-subprocess-use.dangerous-subprocess-use - btool_cli, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - out, _ = p.communicate() - - if isinstance(out, bytes): - out = out.decode() - - parser = ConfigParser(**{"strict": False}) - parser.optionxform = str - parser.read_file(StringIO(out)) - - out = {} - for section in parser.sections(): - out[section] = {item[0]: item[1] for item in parser.items(section, raw=True)} - return out + path = make_splunkhome_path(["etc", "apps", app_name]) + app_conf = getAppConf(confName=conf_name, app=app_name, use_btool=False, app_path=path) + + if logger: + logger.info(f"akakakakakkakakakaka 21 path: {path}") + + if logger: + logger.info(f"akakakakakkakakakaka 22 app_conf={type(app_conf)} app_conf: {app_conf}") + + return app_conf From b96835319e2500f985041aa40aba8fe06d97921e Mon Sep 17 00:00:00 2001 From: sgoral Date: Wed, 14 May 2025 11:31:45 +0200 Subject: [PATCH 2/3] fix: init commit, remove btool from solnlib --- solnlib/splunkenv.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/solnlib/splunkenv.py b/solnlib/splunkenv.py index 9fd40c54..f9e2e631 100644 --- a/solnlib/splunkenv.py +++ b/solnlib/splunkenv.py @@ -21,7 +21,7 @@ import socket from typing import List, Optional, Tuple, Union from splunk.clilib.cli_common import getAppConf -from splunk.clilib.bundle_paths import make_splunkhome_path +from splunk.clilib.bundle_paths import make_splunkhome_path as mksplhomepath from .utils import is_true @@ -111,7 +111,7 @@ def make_splunkhome_path(parts: Union[List, Tuple]) -> str: ValueError: Escape from intended parent directories. """ - make_splunkhome_path(parts) + return mksplhomepath(parts) def get_splunk_host_info() -> Tuple: @@ -249,7 +249,7 @@ def get_conf_stanza( return stanzas[stanza] -def get_conf_stanzas(conf_name: str, app_name: str, logger = None) -> dict: +def get_conf_stanzas(conf_name: str, app_name: str, logger=None) -> dict: """Get stanzas of `conf_name` Arguments: @@ -265,12 +265,16 @@ def get_conf_stanzas(conf_name: str, app_name: str, logger = None) -> dict: """ path = make_splunkhome_path(["etc", "apps", app_name]) - app_conf = getAppConf(confName=conf_name, app=app_name, use_btool=False, app_path=path) + app_conf = getAppConf( + confName=conf_name, app=app_name, use_btool=False, app_path=path + ) if logger: logger.info(f"akakakakakkakakakaka 21 path: {path}") if logger: - logger.info(f"akakakakakkakakakaka 22 app_conf={type(app_conf)} app_conf: {app_conf}") + logger.info( + f"akakakakakkakakakaka 22 app_conf={type(app_conf)} app_conf: {app_conf}" + ) return app_conf From 35502364cedf0852e53fbf1aea76af60c530b82b Mon Sep 17 00:00:00 2001 From: sgoral Date: Thu, 15 May 2025 10:16:09 +0200 Subject: [PATCH 3/3] tests: add unit tests --- solnlib/splunkenv.py | 41 +++++++++++-------- tests/unit/common.py | 95 ++++++++++++++++++++++++++------------------ 2 files changed, 81 insertions(+), 55 deletions(-) diff --git a/solnlib/splunkenv.py b/solnlib/splunkenv.py index f9e2e631..9eecc359 100644 --- a/solnlib/splunkenv.py +++ b/solnlib/splunkenv.py @@ -20,8 +20,22 @@ import os.path as op import socket from typing import List, Optional, Tuple, Union -from splunk.clilib.cli_common import getAppConf -from splunk.clilib.bundle_paths import make_splunkhome_path as mksplhomepath + +try: + from splunk.clilib.cli_common import getAppConf +except ImportError: + + def getAppConf(*args, **kwargs): + return None + + +try: + from splunk.clilib.bundle_paths import make_splunkhome_path as mksplhomepath +except ImportError: + + def mksplhomepath(*args, **kwargs): + return None + from .utils import is_true @@ -34,7 +48,7 @@ "get_splunkd_uri", "get_conf_key_value", "get_conf_stanza", - "get_conf_stanzas", + "_get_conf_stanzas", ] ETC_LEAF = "etc" @@ -224,7 +238,7 @@ def get_conf_key_value( KeyError: If `stanza` or `key` doesn't exist. """ - stanzas = get_conf_stanzas(conf_name, app_name) + stanzas = _get_conf_stanzas(conf_name, app_name) return stanzas[stanza][key] @@ -245,11 +259,11 @@ def get_conf_stanza( KeyError: If stanza doesn't exist. """ - stanzas = get_conf_stanzas(conf_name, app_name) + stanzas = _get_conf_stanzas(conf_name, app_name) return stanzas[stanza] -def get_conf_stanzas(conf_name: str, app_name: str, logger=None) -> dict: +def _get_conf_stanzas(conf_name: str, app_name: str, logger=None) -> dict: """Get stanzas of `conf_name` Arguments: @@ -260,21 +274,16 @@ def get_conf_stanzas(conf_name: str, app_name: str, logger=None) -> dict: Config stanzas. Examples: - >>> stanzas = get_conf_stanzas('server') + >>> stanzas = _get_conf_stanzas('server') >>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...} """ - path = make_splunkhome_path(["etc", "apps", app_name]) + if app_name == APP_SYSTEM: + path = make_splunkhome_path(["etc", app_name]) + else: + path = make_splunkhome_path(["etc", "apps", app_name]) app_conf = getAppConf( confName=conf_name, app=app_name, use_btool=False, app_path=path ) - if logger: - logger.info(f"akakakakakkakakakaka 21 path: {path}") - - if logger: - logger.info( - f"akakakakakkakakakaka 22 app_conf={type(app_conf)} app_conf: {app_conf}" - ) - return app_conf diff --git a/tests/unit/common.py b/tests/unit/common.py index fa79619f..9e98f2cd 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -14,9 +14,12 @@ # limitations under the License. # +import os import os.path as op import socket -import subprocess + +from configparser import ConfigParser +from io import StringIO from splunklib import binding, client from splunklib.data import record @@ -32,50 +35,64 @@ def mock_splunkhome(monkeypatch): - class MockPopen: - def __init__( - self, - args, - bufsize=0, - executable=None, - stdin=None, - stdout=None, - stderr=None, - preexec_fn=None, - close_fds=False, - shell=False, - cwd=None, - env=None, - universal_newlines=False, - startupinfo=None, - creationflags=0, - ): - self._conf = args[3] - - def communicate(self, input=None): - if self._conf == "server": - file_path = op.sep.join( - [cur_dir, "data/mock_splunk/etc/system/default/server.conf"] - ) - elif self._conf == "inputs": - file_path = op.sep.join( - [ - cur_dir, - "data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf", - ] - ) - else: - file_path = op.sep.join( - [cur_dir, "data/mock_splunk/etc/system/default/web.conf"] + def get_app_conf(confName, app, use_btool, app_path): + if confName == "server": + file_path = op.sep.join( + [cur_dir, "data/mock_splunk/etc/system/default/server.conf"] + ) + elif confName == "inputs": + file_path = op.sep.join( + [ + cur_dir, + "data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf", + ] + ) + else: + file_path = op.sep.join( + [cur_dir, "data/mock_splunk/etc/system/default/web.conf"] + ) + + with open(file_path) as fp: + data = fp.read(), None + + parser = ConfigParser(**{"strict": False}) + parser.optionxform = str + parser.read_file(StringIO(data[0])) + + out = {} + for section in parser.sections(): + out[section] = { + item[0]: item[1] for item in parser.items(section, raw=True) + } + return out + + def make_splunk_gome(parts): + relpath = os.path.normpath(os.path.join(*parts)) + basepath = "" + etc_with_trailing_sep = os.path.join("etc", "") + if (relpath == "etc") or relpath.startswith(etc_with_trailing_sep): + try: + basepath = os.environ["SPLUNK_ETC"] + except KeyError: + basepath = op.join(os.path.normpath(os.environ["SPLUNK_HOME"]), "etc") + relpath = relpath[4:] + else: + basepath = os.path.normpath(os.environ["SPLUNK_HOME"]) + fullpath = os.path.normpath(os.path.join(basepath, relpath)) + if os.path.relpath(fullpath, basepath)[0:2] == "..": + raise ValueError( + 'Illegal escape from parent directory "{}": {}'.format( + basepath, fullpath ) + ) - with open(file_path) as fp: - return fp.read(), None + return fullpath splunk_home = op.join(cur_dir, "data/mock_splunk/") monkeypatch.setenv("SPLUNK_HOME", splunk_home) monkeypatch.setenv("SPLUNK_ETC", op.join(splunk_home, "etc")) - monkeypatch.setattr(subprocess, "Popen", MockPopen) + monkeypatch.setattr("solnlib.splunkenv.getAppConf", get_app_conf) + monkeypatch.setattr("solnlib.splunkenv.mksplhomepath", make_splunk_gome) def mock_serverinfo(monkeypatch):