Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 29 additions & 72 deletions solnlib/splunkenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,24 @@
import os
import os.path as op
import socket
import subprocess
from configparser import ConfigParser
from io import StringIO
from typing import List, Optional, Tuple, Union

try:
from splunk.clilib.cli_common import getAppConf
except ImportError:

def getAppConf(*args, **kwargs):
return None


try:
from splunk.clilib.bundle_paths import make_splunkhome_path as mksplhomepath
except ImportError:

def mksplhomepath(*args, **kwargs):
return None


from .utils import is_true

__all__ = [
Expand All @@ -35,7 +48,7 @@
"get_splunkd_uri",
"get_conf_key_value",
"get_conf_stanza",
"get_conf_stanzas",
"_get_conf_stanzas",
]

ETC_LEAF = "etc"
Expand Down Expand Up @@ -112,41 +125,7 @@ def make_splunkhome_path(parts: Union[List, Tuple]) -> str:
ValueError: Escape from intended parent directories.
"""

relpath = os.path.normpath(os.path.join(*parts))

basepath = None
shared_storage = _get_shared_storage()
if shared_storage:
for candidate in on_shared_storage:
# SPL-100508 On windows if the path is missing the drive letter,
# construct fullpath manually and call relpath
if os.name == "nt" and not _verify_path_prefix(relpath, candidate):
break

if os.path.relpath(relpath, candidate)[0:2] != "..":
basepath = shared_storage
break

if basepath is None:
etc_with_trailing_sep = os.path.join(ETC_LEAF, "")
if relpath == ETC_LEAF or relpath.startswith(etc_with_trailing_sep):
# Redirect $SPLUNK_HOME/etc to $SPLUNK_ETC.
basepath = _splunk_etc()
# Remove leading etc (and path separator, if present). Note: when
# emitting $SPLUNK_ETC exactly, with no additional path parts, we
# set <relpath> to the empty string.
relpath = relpath[4:]
else:
basepath = _splunk_home()

fullpath = os.path.normpath(os.path.join(basepath, relpath))

# Check that we haven't escaped from intended parent directories.
if os.path.relpath(fullpath, basepath)[0:2] == "..":
raise ValueError(
f'Illegal escape from parent directory "{basepath}": {fullpath}'
)
return fullpath
return mksplhomepath(parts)


def get_splunk_host_info() -> Tuple:
Expand Down Expand Up @@ -259,7 +238,7 @@ def get_conf_key_value(
KeyError: If `stanza` or `key` doesn't exist.
"""

stanzas = get_conf_stanzas(conf_name, app_name)
stanzas = _get_conf_stanzas(conf_name, app_name)
return stanzas[stanza][key]


Expand All @@ -280,11 +259,11 @@ def get_conf_stanza(
KeyError: If stanza doesn't exist.
"""

stanzas = get_conf_stanzas(conf_name, app_name)
stanzas = _get_conf_stanzas(conf_name, app_name)
return stanzas[stanza]


def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict:
def _get_conf_stanzas(conf_name: str, app_name: str, logger=None) -> dict:
"""Get stanzas of `conf_name`

Arguments:
Expand All @@ -295,38 +274,16 @@ def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict:
Config stanzas.

Examples:
>>> stanzas = get_conf_stanzas('server')
>>> stanzas = _get_conf_stanzas('server')
>>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
"""

if conf_name.endswith(".conf"):
conf_name = conf_name[:-5]

# TODO: dynamically calculate SPLUNK_HOME
btool_cli = [
op.join(os.environ["SPLUNK_HOME"], "bin", "splunk"),
"cmd",
"btool",
conf_name,
"list",
]

if app_name:
btool_cli.append(f"--app={app_name}")

p = subprocess.Popen( # nosemgrep: python.lang.security.audit.dangerous-subprocess-use.dangerous-subprocess-use
btool_cli, stdout=subprocess.PIPE, stderr=subprocess.PIPE
if app_name == APP_SYSTEM:
path = make_splunkhome_path(["etc", app_name])
else:
path = make_splunkhome_path(["etc", "apps", app_name])
app_conf = getAppConf(
confName=conf_name, app=app_name, use_btool=False, app_path=path
)
out, _ = p.communicate()

if isinstance(out, bytes):
out = out.decode()

parser = ConfigParser(**{"strict": False})
parser.optionxform = str
parser.read_file(StringIO(out))

out = {}
for section in parser.sections():
out[section] = {item[0]: item[1] for item in parser.items(section, raw=True)}
return out
return app_conf
95 changes: 56 additions & 39 deletions tests/unit/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
# limitations under the License.
#

import os
import os.path as op
import socket
import subprocess

from configparser import ConfigParser
from io import StringIO

from splunklib import binding, client
from splunklib.data import record
Expand All @@ -32,50 +35,64 @@


def mock_splunkhome(monkeypatch):
class MockPopen:
def __init__(
self,
args,
bufsize=0,
executable=None,
stdin=None,
stdout=None,
stderr=None,
preexec_fn=None,
close_fds=False,
shell=False,
cwd=None,
env=None,
universal_newlines=False,
startupinfo=None,
creationflags=0,
):
self._conf = args[3]

def communicate(self, input=None):
if self._conf == "server":
file_path = op.sep.join(
[cur_dir, "data/mock_splunk/etc/system/default/server.conf"]
)
elif self._conf == "inputs":
file_path = op.sep.join(
[
cur_dir,
"data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf",
]
)
else:
file_path = op.sep.join(
[cur_dir, "data/mock_splunk/etc/system/default/web.conf"]
def get_app_conf(confName, app, use_btool, app_path):
if confName == "server":
file_path = op.sep.join(
[cur_dir, "data/mock_splunk/etc/system/default/server.conf"]
)
elif confName == "inputs":
file_path = op.sep.join(
[
cur_dir,
"data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf",
]
)
else:
file_path = op.sep.join(
[cur_dir, "data/mock_splunk/etc/system/default/web.conf"]
)

with open(file_path) as fp:
data = fp.read(), None

parser = ConfigParser(**{"strict": False})
parser.optionxform = str
parser.read_file(StringIO(data[0]))

out = {}
for section in parser.sections():
out[section] = {
item[0]: item[1] for item in parser.items(section, raw=True)
}
return out

def make_splunk_gome(parts):
relpath = os.path.normpath(os.path.join(*parts))
basepath = ""
etc_with_trailing_sep = os.path.join("etc", "")
if (relpath == "etc") or relpath.startswith(etc_with_trailing_sep):
try:
basepath = os.environ["SPLUNK_ETC"]
except KeyError:
basepath = op.join(os.path.normpath(os.environ["SPLUNK_HOME"]), "etc")
relpath = relpath[4:]
else:
basepath = os.path.normpath(os.environ["SPLUNK_HOME"])
fullpath = os.path.normpath(os.path.join(basepath, relpath))
if os.path.relpath(fullpath, basepath)[0:2] == "..":
raise ValueError(
'Illegal escape from parent directory "{}": {}'.format(
basepath, fullpath
)
)

with open(file_path) as fp:
return fp.read(), None
return fullpath

splunk_home = op.join(cur_dir, "data/mock_splunk/")
monkeypatch.setenv("SPLUNK_HOME", splunk_home)
monkeypatch.setenv("SPLUNK_ETC", op.join(splunk_home, "etc"))
monkeypatch.setattr(subprocess, "Popen", MockPopen)
monkeypatch.setattr("solnlib.splunkenv.getAppConf", get_app_conf)
monkeypatch.setattr("solnlib.splunkenv.mksplhomepath", make_splunk_gome)


def mock_serverinfo(monkeypatch):
Expand Down
Loading