From 8ba09341df6a7a593d9d1641975a683df678dc4e Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Mon, 25 Aug 2025 06:33:11 -0400 Subject: [PATCH 01/10] Added platform token support --- delinea/secrets/server.py | 184 ++++++++++++++++++++++++++++++-------- 1 file changed, 146 insertions(+), 38 deletions(-) diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 29e3a0d..c28869d 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -208,6 +208,36 @@ class PasswordGrantAuthorizer(Authorizer): """ TOKEN_PATH_URI = "/oauth2/token" + PLATFORM_TOKEN_PATH_URI = "/identity/api/oauth2/token/xpmplatform" + + def _detect_server_type(self): + """Detects if the server is Secret Server or Platform by health check endpoints, using _check_json_response.""" + ss_health_url = self.base_url.rstrip("/") + "/api/v1/healthcheck" + platform_health_url = self.base_url.rstrip("/") + "/health" + if self._check_json_response(ss_health_url): + self._server_type = "secret_server" + return + if self._check_json_response(platform_health_url): + self._server_type = "platform" + return + raise SecretServerError("Unable to detect server type via health check endpoints.") + + def _check_json_response(self, url): + """Python equivalent of Go checkJSONResponse for health check detection.""" + try: + resp = requests.get(url, timeout=60) + except Exception: + return False + try: + body = resp.content + except Exception: + return False + try: + data = resp.json() + healthy = data.get("Healthy", False) + return healthy + except Exception: + return b"Healthy" in body or b"healthy" in body @staticmethod def get_access_grant(token_url, grant_request): @@ -241,18 +271,49 @@ def _refresh(self, seconds_of_drift=300): ): return else: - self.access_grant = self.get_access_grant( - self.token_url, self.grant_request - ) - self.access_grant_refreshed = datetime.now() - - def __init__(self, base_url, username, password, token_path_uri=TOKEN_PATH_URI): - self.token_url = base_url.rstrip("/") + "/" + token_path_uri.strip("/") - self.grant_request = { - "username": username, - "password": password, - "grant_type": "password", - } + # Detect server type if not already done + if not hasattr(self, "_server_type"): + self._detect_server_type() + # Decide token_path_uri if not provided + if not self.token_path_uri: + if self._server_type == "secret_server": + self.token_path_uri = self.TOKEN_PATH_URI + elif self._server_type == "platform": + self.token_path_uri = self.PLATFORM_TOKEN_PATH_URI + else: + raise SecretServerError("Unknown server type for token request.") + if self._server_type == "secret_server": + token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + grant_request = { + "username": self.username, + "password": self.password, + "grant_type": "password", + } + if hasattr(self, "domain") and self.domain: + grant_request["domain"] = self.domain + self.access_grant = self.get_access_grant(token_url, grant_request) + self.access_grant_refreshed = datetime.now() + elif self._server_type == "platform": + token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + grant_request = { + "client_id": self.username, + "client_secret": self.password, + "grant_type": "client_credentials", + "scope": "xpmheadless", + } + self.access_grant = self.get_access_grant(token_url, grant_request) + self.access_grant_refreshed = datetime.now() + else: + raise SecretServerError("Unknown server type for token request.") + + def __init__(self, base_url, username, password, token_path_uri=None, domain=None): + self.base_url = base_url.rstrip("/") + self.username = username + self.password = password + self.domain = domain + self.token_path_uri = token_path_uri # May be None, will decide in _refresh + self.token_url = None + self.grant_request = None def get_access_token(self): self._refresh() @@ -268,10 +329,9 @@ def __init__( username, domain, password, - token_path_uri=PasswordGrantAuthorizer.TOKEN_PATH_URI, + token_path_uri=None, ): - super().__init__(base_url, username, password, token_path_uri=token_path_uri) - self.grant_request["domain"] = domain + super().__init__(base_url, username, password, token_path_uri=token_path_uri, domain=domain) class SecretServer: @@ -317,11 +377,11 @@ def headers(self): return self.authorizer.headers() def __init__( - self, - base_url, - authorizer: Authorizer, - api_path_uri=API_PATH_URI, - ): + self, + base_url, + authorizer: Authorizer, + api_path_uri=API_PATH_URI, + ): """ :param base_url: The base URL e.g. ``http://localhost/SecretServer`` :type base_url: str @@ -330,10 +390,40 @@ def __init__( :param api_path_uri: Defaults to ``/api/v1`` :type api_path_uri: str """ - self.base_url = base_url.rstrip("/") + self.platform_url = self.base_url self.authorizer = authorizer - self.api_url = f"{self.base_url}/{api_path_uri.strip('/')}" + self._api_path_uri = api_path_uri + + @property + def api_url(self): + return f"{self.base_url}/{self._api_path_uri.strip('/')}" + + def ensure_vault_url(self): + """For platform, fetch and set the vault URL before making API calls.""" + # Only needed for platform scenario + if hasattr(self.authorizer, "_server_type") and self.authorizer._server_type == "platform": + if not hasattr(self, "_vault_url_fetched") or not self._vault_url_fetched: + access_token = self.authorizer.get_access_token() + vaults_endpoint = self.platform_url + "/vaultbroker/api/vaults" + headers = {"Authorization": f"Bearer {access_token}"} + resp = requests.get(vaults_endpoint, headers=headers, timeout=60) + if resp.status_code != 200: + raise SecretServerError(f"Failed to fetch vault details: HTTP {resp.status_code} - {resp.text}") + try: + data = resp.json() + except Exception as ex: + raise SecretServerError(f"Failed to parse vault details: {ex}") + for vault in data.get("vaults", []): + if vault.get("isDefault") and vault.get("isActive"): + conn = vault.get("connection", {}) + url = conn.get("url") + if url: + self.base_url = url.rstrip("/") + self._vault_url_fetched = True + return + raise SecretServerError("No configured default and active vault found in vault details.") + def get_secret_json(self, id, query_params=None): """Gets a Secret from Secret Server @@ -349,18 +439,20 @@ def get_secret_json(self, id, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers=self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/secrets/{id}" if query_params is None: return self.process( - requests.get(endpoint_url, headers=self.headers(), timeout=60) + requests.get(endpoint_url, headers=headers, timeout=60) ).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, timeout=60, ) ).text @@ -379,19 +471,21 @@ def get_folder_json(self, id, query_params=None, get_all_children=True): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers=self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/folders/{id}" if get_all_children: query_params["getAllChildren"] = "true" if query_params is None: - return self.process(requests.get(endpoint_url, headers=self.headers())).text + return self.process(requests.get(endpoint_url, headers=headers)).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, ) ).text @@ -520,18 +614,20 @@ def search_secrets(self, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers=self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/secrets" if query_params is None: return self.process( - requests.get(endpoint_url, headers=self.headers(), timeout=60) + requests.get(endpoint_url, headers=headers, timeout=60) ).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, timeout=60, ) ).text @@ -548,16 +644,18 @@ def lookup_folders(self, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers=self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/folders/lookup" if query_params is None: - return self.process(requests.get(endpoint_url, headers=self.headers())).text + return self.process(requests.get(endpoint_url, headers=headers)).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, ) ).text @@ -573,12 +671,13 @@ def get_secret_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - + headers=self.headers() + self.ensure_vault_url() params = {"filter.folderId": folder_id} endpoint_url = f"{self.api_url}/secrets/search-total" params["take"] = self.process( requests.get( - endpoint_url, params=params, headers=self.headers(), timeout=60 + endpoint_url, params=params, headers=headers, timeout=60 ) ).text response = self.search_secrets(query_params=params) @@ -605,7 +704,8 @@ def get_child_folder_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - + headers=self.headers() + self.ensure_vault_url() params = { "filter.parentFolderId": folder_id, "filter.limitToDirectDescendents": True, @@ -614,7 +714,7 @@ def get_child_folder_ids_by_folderid(self, folder_id): endpoint_url = f"{self.api_url}/folders/lookup" params["take"] = self.process( - requests.get(endpoint_url, params=params, headers=self.headers()) + requests.get(endpoint_url, params=params, headers=headers) ).json()["total"] # Handle result of zero child folders if params["take"] != 0: @@ -651,12 +751,12 @@ def __init__( username, password, api_path_uri=SecretServer.API_PATH_URI, - token_path_uri=PasswordGrantAuthorizer.TOKEN_PATH_URI, + token_path_uri=None, ): super().__init__( base_url, PasswordGrantAuthorizer( - f"{base_url}/{token_path_uri.strip('/')}", username, password + f"{base_url}", username, password, token_path_uri ), api_path_uri, ) @@ -676,5 +776,13 @@ class SecretServerCloud(SecretServer): DEFAULT_TLD = "com" URL_TEMPLATE = "https://{}.secretservercloud.{}" - def __init__(self, tenant, authorizer: Authorizer, tld=DEFAULT_TLD): - super().__init__(self.URL_TEMPLATE.format(tenant, tld), authorizer) + def __init__(self, tenant=None, authorizer=None, tld=DEFAULT_TLD, base_url=None): + if authorizer is None or not isinstance(authorizer, Authorizer): + raise ValueError("authorizer must be provided and must be of type Authorizer") + if tenant: + url = self.URL_TEMPLATE.format(tenant, tld) + elif base_url: + url = base_url.rstrip("/") + else: + raise ValueError("Must provide either tenant or base_url") + super().__init__(url, authorizer) From 12968a3f883c52cffc8da3c33b721efd186c06b9 Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Thu, 28 Aug 2025 08:46:10 -0400 Subject: [PATCH 02/10] Added unit test cases for platform token. --- conftest.py | 24 ++++++++++++++++ delinea/secrets/server.py | 11 +++---- tests/test_server.py | 60 +++++++++++++++++++++++++++++++++++++++ tox.ini | 5 +++- 4 files changed, 94 insertions(+), 6 deletions(-) diff --git a/conftest.py b/conftest.py index 90fc0b7..bb1047c 100644 --- a/conftest.py +++ b/conftest.py @@ -18,6 +18,17 @@ def env_vars(): "folder_path": os.getenv("TSS_FOLDER_PATH"), } +@pytest.fixture +def platform_env_vars(): + return { + "username": os.getenv("PLATFORM_USERNAME"), + "password": os.getenv("PLATFORM_PASSWORD"), + "base_url": os.getenv("PLATFORM_BASEURL"), + "secret_id": os.getenv("TSS_SECRET_ID"), + "secret_path": os.getenv("TSS_SECRET_PATH"), + "folder_id": os.getenv("TSS_FOLDER_ID"), + "folder_path": os.getenv("TSS_FOLDER_PATH"), + } @pytest.fixture def authorizer(env_vars): @@ -27,7 +38,20 @@ def authorizer(env_vars): env_vars["password"], ) +@pytest.fixture +def platform_authorizer(platform_env_vars): + from delinea.secrets.server import PasswordGrantAuthorizer + return PasswordGrantAuthorizer( + platform_env_vars["base_url"], + platform_env_vars["username"], + platform_env_vars["password"], + ) @pytest.fixture def secret_server(env_vars, authorizer): return SecretServerCloud(env_vars["tenant"], authorizer) + +@pytest.fixture +def platform_server(platform_env_vars, platform_authorizer): + from delinea.secrets.server import SecretServer + return SecretServer(platform_env_vars["base_url"], platform_authorizer) diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index c28869d..53df298 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -198,8 +198,9 @@ class AccessTokenAuthorizer(Authorizer): def get_access_token(self): return self.access_token - def __init__(self, access_token): + def __init__(self, access_token, server_type='secret_server'): self.access_token = access_token + self._server_type = server_type.lower() class PasswordGrantAuthorizer(Authorizer): @@ -283,7 +284,7 @@ def _refresh(self, seconds_of_drift=300): else: raise SecretServerError("Unknown server type for token request.") if self._server_type == "secret_server": - token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + self.token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") grant_request = { "username": self.username, "password": self.password, @@ -291,17 +292,17 @@ def _refresh(self, seconds_of_drift=300): } if hasattr(self, "domain") and self.domain: grant_request["domain"] = self.domain - self.access_grant = self.get_access_grant(token_url, grant_request) + self.access_grant = self.get_access_grant(self.token_url, grant_request) self.access_grant_refreshed = datetime.now() elif self._server_type == "platform": - token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + self.token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") grant_request = { "client_id": self.username, "client_secret": self.password, "grant_type": "client_credentials", "scope": "xpmheadless", } - self.access_grant = self.get_access_grant(token_url, grant_request) + self.access_grant = self.get_access_grant(self.token_url, grant_request) self.access_grant_refreshed = datetime.now() else: raise SecretServerError("Unknown server type for token request.") diff --git a/tests/test_server.py b/tests/test_server.py index 5675f51..4bf8dd0 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -20,6 +20,7 @@ def test_bad_url(env_vars, authorizer): def test_token_url(env_vars, authorizer): + authorizer.get_access_token() assert ( authorizer.token_url == f"https://{env_vars['tenant']}.secretservercloud.com/oauth2/token" @@ -77,3 +78,62 @@ def test_server_child_folder_ids_by_folderid(env_vars, secret_server): type(secret_server.get_child_folder_ids_by_folderid(env_vars["folder_id"])) is list ) + +def test_platform_bad_url(platform_env_vars, platform_authorizer): + bad_server = SecretServer( + f"{platform_env_vars['base_url']}/nonexistent", + platform_authorizer, + ) + with pytest.raises(SecretServerError): + bad_server.get_secret(platform_env_vars["secret_id"]) + +def test_platform_token_url(platform_env_vars, platform_authorizer): + platform_authorizer.get_access_token() + assert ( + platform_authorizer.token_url + == f"{platform_env_vars['base_url']}/identity/api/oauth2/token/xpmplatform" + ) + +def test_platform_api_url(platform_server, platform_env_vars): + assert ( + platform_server.api_url + == f"{platform_env_vars['base_url']}/api/v1" + ) + +def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer): + assert SecretServer( + platform_env_vars["base_url"], + AccessTokenAuthorizer(platform_authorizer.get_access_token(), 'platform'), + ).get_secret(platform_env_vars["secret_id"])["id"] == int(platform_env_vars["secret_id"]) + +def test_platform_server_secret(platform_env_vars, platform_server): + assert ServerSecret(**platform_server.get_secret(platform_env_vars["secret_id"])).id == int( + platform_env_vars["secret_id"] + ) + +def test_platform_server_secret_by_path(platform_env_vars, platform_server): + assert ServerSecret( + **platform_server.get_secret_by_path(platform_env_vars["secret_path"]) + ).id == int(platform_env_vars["secret_id"]) + +def test_platform_server_folder_by_path(platform_env_vars, platform_server): + assert ServerFolder( + **platform_server.get_folder_by_path(platform_env_vars["folder_path"]) + ).id == int(platform_env_vars["folder_id"]) + +def test_platform_nonexistent_secret(platform_server): + with pytest.raises(SecretServerClientError): + platform_server.get_secret(1000) + +def test_platform_nonexistent_folder(platform_server): + with pytest.raises(SecretServerClientError): + platform_server.get_folder(1000) + +def test_platform_server_secret_ids_by_folderid(platform_env_vars, platform_server): + assert type(platform_server.get_secret_ids_by_folderid(platform_env_vars["folder_id"])) is list + +def test_platform_server_child_folder_ids_by_folderid(platform_env_vars, platform_server): + assert ( + type(platform_server.get_child_folder_ids_by_folderid(platform_env_vars["folder_id"])) + is list + ) diff --git a/tox.ini b/tox.ini index 1489270..3f7d2a2 100644 --- a/tox.ini +++ b/tox.ini @@ -6,7 +6,7 @@ # Docs for tox config -> https://tox.readthedocs.io/en/latest/config.html [tox] -envlist = 3.8, 3.9, 3.10, 3.11 +envlist = 3.8, 3.9, 3.10, 3.11, 3.12 isolated_build = True skipsdist = True @@ -23,5 +23,8 @@ passenv = TSS_SECRET_PATH TSS_FOLDER_ID TSS_FOLDER_PATH + PLATFORM_USERNAME + PLATFORM_PASSWORD + PLATFORM_BASEURL commands = pytest From 2fdbdedc3ddce69c0f08e3470b9ee1fe6e0335d7 Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Mon, 1 Sep 2025 02:24:57 -0400 Subject: [PATCH 03/10] Updated README.md file --- README.md | 50 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 2f34aee..2fc090f 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,12 @@ ![PyPI Version](https://img.shields.io/pypi/v/python-tss-sdk) ![License](https://img.shields.io/github/license/DelineaXPM/python-tss-sdk) ![Python Versions](https://img.shields.io/pypi/pyversions/python-tss-sdk) -The [Delinea](https://delinea.com/) [Secret Server](https://delinea.com/products/secret-server/) Python SDK contains classes that interact with Secret Server via the REST API. + +The [Delinea](https://delinea.com/) [Secret Server](https://delinea.com/products/secret-server/) Python SDK contains classes that interact with Secret Server via their REST APIs. + +## Authentication Support + +This SDK supports both Secret Server and Platform authentication. You can use the same authorizer classes for both systems and instantiate either a Secret Server or Platform client as needed. For Secret Server, you need to create an application user with the required permissions for authentication. For Platform, you need to create a service user with the appropriate permissions for authentication. ## Install @@ -26,17 +31,26 @@ There are three ways in which you can authorize the `SecretServer` and `SecretSe #### Password Authorization -If using traditional `username` and `password` authentication to log in to your Secret Server, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. +If using traditional `username` and `password` authentication to log in to your Secret Server either directly or through Platform, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token` or `/identity/api/oauth2/token/xpmplatform`, depending on whether a secret server or platform is used for authentication. +##### With Secret Server ```python from delinea.secrets.server import PasswordGrantAuthorizer authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")") ``` +##### With Platform + +```python +from delinea.secrets.server import PasswordGrantAuthorizer + +authorizer = PasswordGrantAuthorizer("https://platform.delinea.app", os.getenv("myusername"), os.getenv("password")) +``` + #### Domain Authorization -To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. +To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. It is applicable only when authentication is done using a secret server. ```python from delinea.secrets.server import DomainPasswordGrantAuthorizer @@ -46,7 +60,7 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g #### Access Token Authorization -If you already have an `access_token`, you can pass directly via the `AccessTokenAuthorizer`. +If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. ```python from delinea.secrets.server import AccessTokenAuthorizer @@ -56,14 +70,15 @@ authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...") ## Secret Server Cloud -The SDK API requires an `Authorizer` and a `tenant`. +The SDK API requires an `Authorizer` and either a `tenant` or a `base_url`. In the case of plaform authentication, only a `base_url` is supported. `tenant` simplifies the configuration when using Secret Server Cloud by assuming the default folder structure and creating the _base URL_ from a template that takes the `tenant` and an optional top-level domain (TLD) that defaults to `com`, as parameters. ### Useage -Instantiate the `SecretServerCloud` class with `tenant` and an `Authorizer` (optionally include a `tld`). To retrieve a secret, pass an integer `id` to `get_secret()` which will return the secret as a JSON encoded string. +Instantiate the `SecretServerCloud` class with `tenant` or `base_url`, along with an `Authorizer` (when providing `tenant`, yoou may optionally include a `tld`). To retrieve a secret, pass an integer `id` to `get_secret()` which will return the secret as a JSON encoded string. +##### With Secret Server ```python from delinea.secrets.server import SecretServerCloud @@ -76,6 +91,20 @@ serverSecret = ServerSecret(**secret) print(f"username: {serverSecret.fields['username'].value}\npassword: {serverSecret.fields['password'].value}") ``` +##### With Platform + +```python +from delinea.secrets.server import SecretServerCloud + +secret_server = SecretServerCloud(authorizer=authorizer, base_url="https://platform.delinea.app") + +secret = secret_server.get_secret(os.getenv("TSS_SECRET_ID")) + +serverSecret = ServerSecret(**secret) + +print(f"username: {serverSecret.fields['username'].value}\npassword: {serverSecret.fields['password'].value}") +``` + The SDK API also contains a `Secret` `@dataclass` containing a subset of the Secret's attributes and a dictionary of all the fields keyed by the Secret's `slug`. ## Initializing SecretServer @@ -86,12 +115,21 @@ The SDK API also contains a `Secret` `@dataclass` containing a subset of the Sec To instantiate the `SecretServer` class, it requires a `base_url`, an `Authorizer` object (see above), and an optional `api_path_uri` (defaults to `"/api/v1"`) +##### With Secret Server ```python from delinea.secrets.server import SecretServer secret_server = SecretServer("https://hostname/SecretServer", authorizer=authorizer) ``` +##### With Platform + +```python +from delinea.secrets.server import SecretServer + +secret_server = SecretServer(base_url="https://platform.delinea.app", authorizer=authorizer) +``` + Secrets can be fetched using the `get_secret` method, which takes an integer `id` of the secret and, returns a `json` object: ```python From 56cd31fa1c86f20a3105c1a8e6ad563ed59c764e Mon Sep 17 00:00:00 2001 From: Lint Action Date: Mon, 1 Sep 2025 07:27:09 +0000 Subject: [PATCH 04/10] Fix code style issues with Black --- conftest.py | 7 ++++ delinea/secrets/server.py | 68 +++++++++++++++++++++++---------------- tests/test_server.py | 43 ++++++++++++++++++------- 3 files changed, 78 insertions(+), 40 deletions(-) diff --git a/conftest.py b/conftest.py index bb1047c..3045b3f 100644 --- a/conftest.py +++ b/conftest.py @@ -18,6 +18,7 @@ def env_vars(): "folder_path": os.getenv("TSS_FOLDER_PATH"), } + @pytest.fixture def platform_env_vars(): return { @@ -30,6 +31,7 @@ def platform_env_vars(): "folder_path": os.getenv("TSS_FOLDER_PATH"), } + @pytest.fixture def authorizer(env_vars): return PasswordGrantAuthorizer( @@ -38,20 +40,25 @@ def authorizer(env_vars): env_vars["password"], ) + @pytest.fixture def platform_authorizer(platform_env_vars): from delinea.secrets.server import PasswordGrantAuthorizer + return PasswordGrantAuthorizer( platform_env_vars["base_url"], platform_env_vars["username"], platform_env_vars["password"], ) + @pytest.fixture def secret_server(env_vars, authorizer): return SecretServerCloud(env_vars["tenant"], authorizer) + @pytest.fixture def platform_server(platform_env_vars, platform_authorizer): from delinea.secrets.server import SecretServer + return SecretServer(platform_env_vars["base_url"], platform_authorizer) diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 53df298..9695c9c 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -198,7 +198,7 @@ class AccessTokenAuthorizer(Authorizer): def get_access_token(self): return self.access_token - def __init__(self, access_token, server_type='secret_server'): + def __init__(self, access_token, server_type="secret_server"): self.access_token = access_token self._server_type = server_type.lower() @@ -221,7 +221,9 @@ def _detect_server_type(self): if self._check_json_response(platform_health_url): self._server_type = "platform" return - raise SecretServerError("Unable to detect server type via health check endpoints.") + raise SecretServerError( + "Unable to detect server type via health check endpoints." + ) def _check_json_response(self, url): """Python equivalent of Go checkJSONResponse for health check detection.""" @@ -284,7 +286,9 @@ def _refresh(self, seconds_of_drift=300): else: raise SecretServerError("Unknown server type for token request.") if self._server_type == "secret_server": - self.token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + self.token_url = ( + self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + ) grant_request = { "username": self.username, "password": self.password, @@ -295,7 +299,9 @@ def _refresh(self, seconds_of_drift=300): self.access_grant = self.get_access_grant(self.token_url, grant_request) self.access_grant_refreshed = datetime.now() elif self._server_type == "platform": - self.token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + self.token_url = ( + self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + ) grant_request = { "client_id": self.username, "client_secret": self.password, @@ -332,7 +338,9 @@ def __init__( password, token_path_uri=None, ): - super().__init__(base_url, username, password, token_path_uri=token_path_uri, domain=domain) + super().__init__( + base_url, username, password, token_path_uri=token_path_uri, domain=domain + ) class SecretServer: @@ -378,11 +386,11 @@ def headers(self): return self.authorizer.headers() def __init__( - self, - base_url, - authorizer: Authorizer, - api_path_uri=API_PATH_URI, - ): + self, + base_url, + authorizer: Authorizer, + api_path_uri=API_PATH_URI, + ): """ :param base_url: The base URL e.g. ``http://localhost/SecretServer`` :type base_url: str @@ -399,18 +407,23 @@ def __init__( @property def api_url(self): return f"{self.base_url}/{self._api_path_uri.strip('/')}" - + def ensure_vault_url(self): """For platform, fetch and set the vault URL before making API calls.""" # Only needed for platform scenario - if hasattr(self.authorizer, "_server_type") and self.authorizer._server_type == "platform": + if ( + hasattr(self.authorizer, "_server_type") + and self.authorizer._server_type == "platform" + ): if not hasattr(self, "_vault_url_fetched") or not self._vault_url_fetched: access_token = self.authorizer.get_access_token() vaults_endpoint = self.platform_url + "/vaultbroker/api/vaults" headers = {"Authorization": f"Bearer {access_token}"} resp = requests.get(vaults_endpoint, headers=headers, timeout=60) if resp.status_code != 200: - raise SecretServerError(f"Failed to fetch vault details: HTTP {resp.status_code} - {resp.text}") + raise SecretServerError( + f"Failed to fetch vault details: HTTP {resp.status_code} - {resp.text}" + ) try: data = resp.json() except Exception as ex: @@ -423,8 +436,9 @@ def ensure_vault_url(self): self.base_url = url.rstrip("/") self._vault_url_fetched = True return - raise SecretServerError("No configured default and active vault found in vault details.") - + raise SecretServerError( + "No configured default and active vault found in vault details." + ) def get_secret_json(self, id, query_params=None): """Gets a Secret from Secret Server @@ -440,7 +454,7 @@ def get_secret_json(self, id, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - headers=self.headers() + headers = self.headers() self.ensure_vault_url() endpoint_url = f"{self.api_url}/secrets/{id}" @@ -472,7 +486,7 @@ def get_folder_json(self, id, query_params=None, get_all_children=True): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - headers=self.headers() + headers = self.headers() self.ensure_vault_url() endpoint_url = f"{self.api_url}/folders/{id}" @@ -615,7 +629,7 @@ def search_secrets(self, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - headers=self.headers() + headers = self.headers() self.ensure_vault_url() endpoint_url = f"{self.api_url}/secrets" @@ -645,7 +659,7 @@ def lookup_folders(self, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - headers=self.headers() + headers = self.headers() self.ensure_vault_url() endpoint_url = f"{self.api_url}/folders/lookup" @@ -672,14 +686,12 @@ def get_secret_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - headers=self.headers() + headers = self.headers() self.ensure_vault_url() params = {"filter.folderId": folder_id} endpoint_url = f"{self.api_url}/secrets/search-total" params["take"] = self.process( - requests.get( - endpoint_url, params=params, headers=headers, timeout=60 - ) + requests.get(endpoint_url, params=params, headers=headers, timeout=60) ).text response = self.search_secrets(query_params=params) @@ -705,7 +717,7 @@ def get_child_folder_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - headers=self.headers() + headers = self.headers() self.ensure_vault_url() params = { "filter.parentFolderId": folder_id, @@ -756,9 +768,7 @@ def __init__( ): super().__init__( base_url, - PasswordGrantAuthorizer( - f"{base_url}", username, password, token_path_uri - ), + PasswordGrantAuthorizer(f"{base_url}", username, password, token_path_uri), api_path_uri, ) @@ -779,7 +789,9 @@ class SecretServerCloud(SecretServer): def __init__(self, tenant=None, authorizer=None, tld=DEFAULT_TLD, base_url=None): if authorizer is None or not isinstance(authorizer, Authorizer): - raise ValueError("authorizer must be provided and must be of type Authorizer") + raise ValueError( + "authorizer must be provided and must be of type Authorizer" + ) if tenant: url = self.URL_TEMPLATE.format(tenant, tld) elif base_url: diff --git a/tests/test_server.py b/tests/test_server.py index 4bf8dd0..84f2171 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -79,6 +79,7 @@ def test_server_child_folder_ids_by_folderid(env_vars, secret_server): is list ) + def test_platform_bad_url(platform_env_vars, platform_authorizer): bad_server = SecretServer( f"{platform_env_vars['base_url']}/nonexistent", @@ -87,6 +88,7 @@ def test_platform_bad_url(platform_env_vars, platform_authorizer): with pytest.raises(SecretServerError): bad_server.get_secret(platform_env_vars["secret_id"]) + def test_platform_token_url(platform_env_vars, platform_authorizer): platform_authorizer.get_access_token() assert ( @@ -94,46 +96,63 @@ def test_platform_token_url(platform_env_vars, platform_authorizer): == f"{platform_env_vars['base_url']}/identity/api/oauth2/token/xpmplatform" ) + def test_platform_api_url(platform_server, platform_env_vars): - assert ( - platform_server.api_url - == f"{platform_env_vars['base_url']}/api/v1" - ) + assert platform_server.api_url == f"{platform_env_vars['base_url']}/api/v1" + def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer): assert SecretServer( platform_env_vars["base_url"], - AccessTokenAuthorizer(platform_authorizer.get_access_token(), 'platform'), - ).get_secret(platform_env_vars["secret_id"])["id"] == int(platform_env_vars["secret_id"]) - -def test_platform_server_secret(platform_env_vars, platform_server): - assert ServerSecret(**platform_server.get_secret(platform_env_vars["secret_id"])).id == int( + AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"), + ).get_secret(platform_env_vars["secret_id"])["id"] == int( platform_env_vars["secret_id"] ) + +def test_platform_server_secret(platform_env_vars, platform_server): + assert ServerSecret( + **platform_server.get_secret(platform_env_vars["secret_id"]) + ).id == int(platform_env_vars["secret_id"]) + + def test_platform_server_secret_by_path(platform_env_vars, platform_server): assert ServerSecret( **platform_server.get_secret_by_path(platform_env_vars["secret_path"]) ).id == int(platform_env_vars["secret_id"]) + def test_platform_server_folder_by_path(platform_env_vars, platform_server): assert ServerFolder( **platform_server.get_folder_by_path(platform_env_vars["folder_path"]) ).id == int(platform_env_vars["folder_id"]) + def test_platform_nonexistent_secret(platform_server): with pytest.raises(SecretServerClientError): platform_server.get_secret(1000) + def test_platform_nonexistent_folder(platform_server): with pytest.raises(SecretServerClientError): platform_server.get_folder(1000) + def test_platform_server_secret_ids_by_folderid(platform_env_vars, platform_server): - assert type(platform_server.get_secret_ids_by_folderid(platform_env_vars["folder_id"])) is list + assert ( + type(platform_server.get_secret_ids_by_folderid(platform_env_vars["folder_id"])) + is list + ) + -def test_platform_server_child_folder_ids_by_folderid(platform_env_vars, platform_server): +def test_platform_server_child_folder_ids_by_folderid( + platform_env_vars, platform_server +): assert ( - type(platform_server.get_child_folder_ids_by_folderid(platform_env_vars["folder_id"])) + type( + platform_server.get_child_folder_ids_by_folderid( + platform_env_vars["folder_id"] + ) + ) is list ) From 2a8b91371f74f0740f51bcdf99fb8606af78cf0b Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Tue, 2 Sep 2025 00:14:06 -0400 Subject: [PATCH 05/10] Updated environment variables --- conftest.py | 6 +++--- tox.ini | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/conftest.py b/conftest.py index bb1047c..8008495 100644 --- a/conftest.py +++ b/conftest.py @@ -21,9 +21,9 @@ def env_vars(): @pytest.fixture def platform_env_vars(): return { - "username": os.getenv("PLATFORM_USERNAME"), - "password": os.getenv("PLATFORM_PASSWORD"), - "base_url": os.getenv("PLATFORM_BASEURL"), + "username": os.getenv("TSS_PLATFORM_USERNAME"), + "password": os.getenv("TSS_PLATFORM_PASSWORD"), + "base_url": os.getenv("TSS_PLATFORM_BASE_URL"), "secret_id": os.getenv("TSS_SECRET_ID"), "secret_path": os.getenv("TSS_SECRET_PATH"), "folder_id": os.getenv("TSS_FOLDER_ID"), diff --git a/tox.ini b/tox.ini index 3f7d2a2..2420de2 100644 --- a/tox.ini +++ b/tox.ini @@ -23,8 +23,8 @@ passenv = TSS_SECRET_PATH TSS_FOLDER_ID TSS_FOLDER_PATH - PLATFORM_USERNAME - PLATFORM_PASSWORD - PLATFORM_BASEURL + TSS_PLATFORM_USERNAME + TSS_PLATFORM_PASSWORD + TSS_PLATFORM_BASE_URL commands = pytest From 1e932e2b70c95dd2c25cc81e97840b3d9b9a47c3 Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Tue, 2 Sep 2025 00:23:54 -0400 Subject: [PATCH 06/10] updated environment variables --- conftest.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/conftest.py b/conftest.py index 30343f5..e71463e 100644 --- a/conftest.py +++ b/conftest.py @@ -22,9 +22,9 @@ def env_vars(): @pytest.fixture def platform_env_vars(): return { - "username": os.getenv("TSS_PLATFORM_USERNAME"), - "password": os.getenv("TSS_PLATFORM_PASSWORD"), - "base_url": os.getenv("TSS_PLATFORM_BASE_URL"), + "platform_username": os.getenv("TSS_PLATFORM_USERNAME"), + "platform_password": os.getenv("TSS_PLATFORM_PASSWORD"), + "platform_base_url": os.getenv("TSS_PLATFORM_BASE_URL"), "secret_id": os.getenv("TSS_SECRET_ID"), "secret_path": os.getenv("TSS_SECRET_PATH"), "folder_id": os.getenv("TSS_FOLDER_ID"), @@ -46,9 +46,9 @@ def platform_authorizer(platform_env_vars): from delinea.secrets.server import PasswordGrantAuthorizer return PasswordGrantAuthorizer( - platform_env_vars["base_url"], - platform_env_vars["username"], - platform_env_vars["password"], + platform_env_vars["platform_base_url"], + platform_env_vars["platform_username"], + platform_env_vars["platform_password"], ) From b0fea0b34a0bd20004c5629cd689409e81d0ba1c Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Tue, 2 Sep 2025 00:58:34 -0400 Subject: [PATCH 07/10] Modifued test configuration setting --- conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conftest.py b/conftest.py index e71463e..910fad3 100644 --- a/conftest.py +++ b/conftest.py @@ -61,4 +61,4 @@ def secret_server(env_vars, authorizer): def platform_server(platform_env_vars, platform_authorizer): from delinea.secrets.server import SecretServer - return SecretServer(platform_env_vars["base_url"], platform_authorizer) + return SecretServer(platform_env_vars["platform_base_url"], platform_authorizer) From 206b04248c2698a1a419d94ac5562c2940bb7cbf Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Tue, 2 Sep 2025 01:05:12 -0400 Subject: [PATCH 08/10] Changed environment variable name for base url --- tests/test_server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_server.py b/tests/test_server.py index 84f2171..9b76a35 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -82,7 +82,7 @@ def test_server_child_folder_ids_by_folderid(env_vars, secret_server): def test_platform_bad_url(platform_env_vars, platform_authorizer): bad_server = SecretServer( - f"{platform_env_vars['base_url']}/nonexistent", + f"{platform_env_vars['platform_base_url']}/nonexistent", platform_authorizer, ) with pytest.raises(SecretServerError): @@ -93,17 +93,17 @@ def test_platform_token_url(platform_env_vars, platform_authorizer): platform_authorizer.get_access_token() assert ( platform_authorizer.token_url - == f"{platform_env_vars['base_url']}/identity/api/oauth2/token/xpmplatform" + == f"{platform_env_vars['platform_base_url']}/identity/api/oauth2/token/xpmplatform" ) def test_platform_api_url(platform_server, platform_env_vars): - assert platform_server.api_url == f"{platform_env_vars['base_url']}/api/v1" + assert platform_server.api_url == f"{platform_env_vars['platform_base_url']}/api/v1" def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer): assert SecretServer( - platform_env_vars["base_url"], + platform_env_vars["platform_base_url"], AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"), ).get_secret(platform_env_vars["secret_id"])["id"] == int( platform_env_vars["secret_id"] From a40591774adbb1c8a6a5e8d45667b5d481df56bd Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Tue, 2 Sep 2025 01:56:26 -0400 Subject: [PATCH 09/10] Updated run_tests workflow file --- .github/workflows/run_tests.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml index cc5e466..5a765d6 100644 --- a/.github/workflows/run_tests.yml +++ b/.github/workflows/run_tests.yml @@ -34,3 +34,6 @@ jobs: TSS_SECRET_PATH: ${{ secrets.TSS_SECRET_PATH }} TSS_FOLDER_ID: ${{ secrets.TSS_FOLDER_ID }} TSS_FOLDER_PATH: ${{ secrets.TSS_FOLDER_PATH }} + TSS_PLATFORM_USERNAME: ${{ secrets.TSS_PLATFORM_USERNAME }} + TSS_PLATFORM_PASSWORD: ${{ secrets.TSS_PLATFORM_PASSWORD }} + TSS_PLATFORM_BASE_URL: ${{ secrets.TSS_PLATFORM_BASE_URL }} From 2c68a718d8000cf9fc59e650776d0abf106d834b Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Tue, 2 Sep 2025 03:21:10 -0400 Subject: [PATCH 10/10] Updated veriion number --- delinea/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delinea/__init__.py b/delinea/__init__.py index 284d0d8..34d6518 100644 --- a/delinea/__init__.py +++ b/delinea/__init__.py @@ -1,3 +1,3 @@ """The Delinea Secret Server Python SDK""" -__version__ = "1.2.3" +__version__ = "2.0.0"