diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml index cc5e466..5a765d6 100644 --- a/.github/workflows/run_tests.yml +++ b/.github/workflows/run_tests.yml @@ -34,3 +34,6 @@ jobs: TSS_SECRET_PATH: ${{ secrets.TSS_SECRET_PATH }} TSS_FOLDER_ID: ${{ secrets.TSS_FOLDER_ID }} TSS_FOLDER_PATH: ${{ secrets.TSS_FOLDER_PATH }} + TSS_PLATFORM_USERNAME: ${{ secrets.TSS_PLATFORM_USERNAME }} + TSS_PLATFORM_PASSWORD: ${{ secrets.TSS_PLATFORM_PASSWORD }} + TSS_PLATFORM_BASE_URL: ${{ secrets.TSS_PLATFORM_BASE_URL }} diff --git a/README.md b/README.md index 2f34aee..2fc090f 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,12 @@ ![PyPI Version](https://img.shields.io/pypi/v/python-tss-sdk) ![License](https://img.shields.io/github/license/DelineaXPM/python-tss-sdk) ![Python Versions](https://img.shields.io/pypi/pyversions/python-tss-sdk) -The [Delinea](https://delinea.com/) [Secret Server](https://delinea.com/products/secret-server/) Python SDK contains classes that interact with Secret Server via the REST API. + +The [Delinea](https://delinea.com/) [Secret Server](https://delinea.com/products/secret-server/) Python SDK contains classes that interact with Secret Server via their REST APIs. + +## Authentication Support + +This SDK supports both Secret Server and Platform authentication. You can use the same authorizer classes for both systems and instantiate either a Secret Server or Platform client as needed. For Secret Server, you need to create an application user with the required permissions for authentication. For Platform, you need to create a service user with the appropriate permissions for authentication. ## Install @@ -26,17 +31,26 @@ There are three ways in which you can authorize the `SecretServer` and `SecretSe #### Password Authorization -If using traditional `username` and `password` authentication to log in to your Secret Server, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. +If using traditional `username` and `password` authentication to log in to your Secret Server either directly or through Platform, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token` or `/identity/api/oauth2/token/xpmplatform`, depending on whether a secret server or platform is used for authentication. +##### With Secret Server ```python from delinea.secrets.server import PasswordGrantAuthorizer authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")") ``` +##### With Platform + +```python +from delinea.secrets.server import PasswordGrantAuthorizer + +authorizer = PasswordGrantAuthorizer("https://platform.delinea.app", os.getenv("myusername"), os.getenv("password")) +``` + #### Domain Authorization -To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. +To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. It is applicable only when authentication is done using a secret server. ```python from delinea.secrets.server import DomainPasswordGrantAuthorizer @@ -46,7 +60,7 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g #### Access Token Authorization -If you already have an `access_token`, you can pass directly via the `AccessTokenAuthorizer`. +If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. ```python from delinea.secrets.server import AccessTokenAuthorizer @@ -56,14 +70,15 @@ authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...") ## Secret Server Cloud -The SDK API requires an `Authorizer` and a `tenant`. +The SDK API requires an `Authorizer` and either a `tenant` or a `base_url`. In the case of plaform authentication, only a `base_url` is supported. `tenant` simplifies the configuration when using Secret Server Cloud by assuming the default folder structure and creating the _base URL_ from a template that takes the `tenant` and an optional top-level domain (TLD) that defaults to `com`, as parameters. ### Useage -Instantiate the `SecretServerCloud` class with `tenant` and an `Authorizer` (optionally include a `tld`). To retrieve a secret, pass an integer `id` to `get_secret()` which will return the secret as a JSON encoded string. +Instantiate the `SecretServerCloud` class with `tenant` or `base_url`, along with an `Authorizer` (when providing `tenant`, yoou may optionally include a `tld`). To retrieve a secret, pass an integer `id` to `get_secret()` which will return the secret as a JSON encoded string. +##### With Secret Server ```python from delinea.secrets.server import SecretServerCloud @@ -76,6 +91,20 @@ serverSecret = ServerSecret(**secret) print(f"username: {serverSecret.fields['username'].value}\npassword: {serverSecret.fields['password'].value}") ``` +##### With Platform + +```python +from delinea.secrets.server import SecretServerCloud + +secret_server = SecretServerCloud(authorizer=authorizer, base_url="https://platform.delinea.app") + +secret = secret_server.get_secret(os.getenv("TSS_SECRET_ID")) + +serverSecret = ServerSecret(**secret) + +print(f"username: {serverSecret.fields['username'].value}\npassword: {serverSecret.fields['password'].value}") +``` + The SDK API also contains a `Secret` `@dataclass` containing a subset of the Secret's attributes and a dictionary of all the fields keyed by the Secret's `slug`. ## Initializing SecretServer @@ -86,12 +115,21 @@ The SDK API also contains a `Secret` `@dataclass` containing a subset of the Sec To instantiate the `SecretServer` class, it requires a `base_url`, an `Authorizer` object (see above), and an optional `api_path_uri` (defaults to `"/api/v1"`) +##### With Secret Server ```python from delinea.secrets.server import SecretServer secret_server = SecretServer("https://hostname/SecretServer", authorizer=authorizer) ``` +##### With Platform + +```python +from delinea.secrets.server import SecretServer + +secret_server = SecretServer(base_url="https://platform.delinea.app", authorizer=authorizer) +``` + Secrets can be fetched using the `get_secret` method, which takes an integer `id` of the secret and, returns a `json` object: ```python diff --git a/conftest.py b/conftest.py index 90fc0b7..910fad3 100644 --- a/conftest.py +++ b/conftest.py @@ -19,6 +19,19 @@ def env_vars(): } +@pytest.fixture +def platform_env_vars(): + return { + "platform_username": os.getenv("TSS_PLATFORM_USERNAME"), + "platform_password": os.getenv("TSS_PLATFORM_PASSWORD"), + "platform_base_url": os.getenv("TSS_PLATFORM_BASE_URL"), + "secret_id": os.getenv("TSS_SECRET_ID"), + "secret_path": os.getenv("TSS_SECRET_PATH"), + "folder_id": os.getenv("TSS_FOLDER_ID"), + "folder_path": os.getenv("TSS_FOLDER_PATH"), + } + + @pytest.fixture def authorizer(env_vars): return PasswordGrantAuthorizer( @@ -28,6 +41,24 @@ def authorizer(env_vars): ) +@pytest.fixture +def platform_authorizer(platform_env_vars): + from delinea.secrets.server import PasswordGrantAuthorizer + + return PasswordGrantAuthorizer( + platform_env_vars["platform_base_url"], + platform_env_vars["platform_username"], + platform_env_vars["platform_password"], + ) + + @pytest.fixture def secret_server(env_vars, authorizer): return SecretServerCloud(env_vars["tenant"], authorizer) + + +@pytest.fixture +def platform_server(platform_env_vars, platform_authorizer): + from delinea.secrets.server import SecretServer + + return SecretServer(platform_env_vars["platform_base_url"], platform_authorizer) diff --git a/delinea/__init__.py b/delinea/__init__.py index 284d0d8..34d6518 100644 --- a/delinea/__init__.py +++ b/delinea/__init__.py @@ -1,3 +1,3 @@ """The Delinea Secret Server Python SDK""" -__version__ = "1.2.3" +__version__ = "2.0.0" diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 29e3a0d..9695c9c 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -198,8 +198,9 @@ class AccessTokenAuthorizer(Authorizer): def get_access_token(self): return self.access_token - def __init__(self, access_token): + def __init__(self, access_token, server_type="secret_server"): self.access_token = access_token + self._server_type = server_type.lower() class PasswordGrantAuthorizer(Authorizer): @@ -208,6 +209,38 @@ class PasswordGrantAuthorizer(Authorizer): """ TOKEN_PATH_URI = "/oauth2/token" + PLATFORM_TOKEN_PATH_URI = "/identity/api/oauth2/token/xpmplatform" + + def _detect_server_type(self): + """Detects if the server is Secret Server or Platform by health check endpoints, using _check_json_response.""" + ss_health_url = self.base_url.rstrip("/") + "/api/v1/healthcheck" + platform_health_url = self.base_url.rstrip("/") + "/health" + if self._check_json_response(ss_health_url): + self._server_type = "secret_server" + return + if self._check_json_response(platform_health_url): + self._server_type = "platform" + return + raise SecretServerError( + "Unable to detect server type via health check endpoints." + ) + + def _check_json_response(self, url): + """Python equivalent of Go checkJSONResponse for health check detection.""" + try: + resp = requests.get(url, timeout=60) + except Exception: + return False + try: + body = resp.content + except Exception: + return False + try: + data = resp.json() + healthy = data.get("Healthy", False) + return healthy + except Exception: + return b"Healthy" in body or b"healthy" in body @staticmethod def get_access_grant(token_url, grant_request): @@ -241,18 +274,53 @@ def _refresh(self, seconds_of_drift=300): ): return else: - self.access_grant = self.get_access_grant( - self.token_url, self.grant_request - ) - self.access_grant_refreshed = datetime.now() - - def __init__(self, base_url, username, password, token_path_uri=TOKEN_PATH_URI): - self.token_url = base_url.rstrip("/") + "/" + token_path_uri.strip("/") - self.grant_request = { - "username": username, - "password": password, - "grant_type": "password", - } + # Detect server type if not already done + if not hasattr(self, "_server_type"): + self._detect_server_type() + # Decide token_path_uri if not provided + if not self.token_path_uri: + if self._server_type == "secret_server": + self.token_path_uri = self.TOKEN_PATH_URI + elif self._server_type == "platform": + self.token_path_uri = self.PLATFORM_TOKEN_PATH_URI + else: + raise SecretServerError("Unknown server type for token request.") + if self._server_type == "secret_server": + self.token_url = ( + self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + ) + grant_request = { + "username": self.username, + "password": self.password, + "grant_type": "password", + } + if hasattr(self, "domain") and self.domain: + grant_request["domain"] = self.domain + self.access_grant = self.get_access_grant(self.token_url, grant_request) + self.access_grant_refreshed = datetime.now() + elif self._server_type == "platform": + self.token_url = ( + self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/") + ) + grant_request = { + "client_id": self.username, + "client_secret": self.password, + "grant_type": "client_credentials", + "scope": "xpmheadless", + } + self.access_grant = self.get_access_grant(self.token_url, grant_request) + self.access_grant_refreshed = datetime.now() + else: + raise SecretServerError("Unknown server type for token request.") + + def __init__(self, base_url, username, password, token_path_uri=None, domain=None): + self.base_url = base_url.rstrip("/") + self.username = username + self.password = password + self.domain = domain + self.token_path_uri = token_path_uri # May be None, will decide in _refresh + self.token_url = None + self.grant_request = None def get_access_token(self): self._refresh() @@ -268,10 +336,11 @@ def __init__( username, domain, password, - token_path_uri=PasswordGrantAuthorizer.TOKEN_PATH_URI, + token_path_uri=None, ): - super().__init__(base_url, username, password, token_path_uri=token_path_uri) - self.grant_request["domain"] = domain + super().__init__( + base_url, username, password, token_path_uri=token_path_uri, domain=domain + ) class SecretServer: @@ -330,10 +399,46 @@ def __init__( :param api_path_uri: Defaults to ``/api/v1`` :type api_path_uri: str """ - self.base_url = base_url.rstrip("/") + self.platform_url = self.base_url self.authorizer = authorizer - self.api_url = f"{self.base_url}/{api_path_uri.strip('/')}" + self._api_path_uri = api_path_uri + + @property + def api_url(self): + return f"{self.base_url}/{self._api_path_uri.strip('/')}" + + def ensure_vault_url(self): + """For platform, fetch and set the vault URL before making API calls.""" + # Only needed for platform scenario + if ( + hasattr(self.authorizer, "_server_type") + and self.authorizer._server_type == "platform" + ): + if not hasattr(self, "_vault_url_fetched") or not self._vault_url_fetched: + access_token = self.authorizer.get_access_token() + vaults_endpoint = self.platform_url + "/vaultbroker/api/vaults" + headers = {"Authorization": f"Bearer {access_token}"} + resp = requests.get(vaults_endpoint, headers=headers, timeout=60) + if resp.status_code != 200: + raise SecretServerError( + f"Failed to fetch vault details: HTTP {resp.status_code} - {resp.text}" + ) + try: + data = resp.json() + except Exception as ex: + raise SecretServerError(f"Failed to parse vault details: {ex}") + for vault in data.get("vaults", []): + if vault.get("isDefault") and vault.get("isActive"): + conn = vault.get("connection", {}) + url = conn.get("url") + if url: + self.base_url = url.rstrip("/") + self._vault_url_fetched = True + return + raise SecretServerError( + "No configured default and active vault found in vault details." + ) def get_secret_json(self, id, query_params=None): """Gets a Secret from Secret Server @@ -349,18 +454,20 @@ def get_secret_json(self, id, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers = self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/secrets/{id}" if query_params is None: return self.process( - requests.get(endpoint_url, headers=self.headers(), timeout=60) + requests.get(endpoint_url, headers=headers, timeout=60) ).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, timeout=60, ) ).text @@ -379,19 +486,21 @@ def get_folder_json(self, id, query_params=None, get_all_children=True): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers = self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/folders/{id}" if get_all_children: query_params["getAllChildren"] = "true" if query_params is None: - return self.process(requests.get(endpoint_url, headers=self.headers())).text + return self.process(requests.get(endpoint_url, headers=headers)).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, ) ).text @@ -520,18 +629,20 @@ def search_secrets(self, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers = self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/secrets" if query_params is None: return self.process( - requests.get(endpoint_url, headers=self.headers(), timeout=60) + requests.get(endpoint_url, headers=headers, timeout=60) ).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, timeout=60, ) ).text @@ -548,16 +659,18 @@ def lookup_folders(self, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + headers = self.headers() + self.ensure_vault_url() endpoint_url = f"{self.api_url}/folders/lookup" if query_params is None: - return self.process(requests.get(endpoint_url, headers=self.headers())).text + return self.process(requests.get(endpoint_url, headers=headers)).text else: return self.process( requests.get( endpoint_url, params=query_params, - headers=self.headers(), + headers=headers, ) ).text @@ -573,13 +686,12 @@ def get_secret_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - + headers = self.headers() + self.ensure_vault_url() params = {"filter.folderId": folder_id} endpoint_url = f"{self.api_url}/secrets/search-total" params["take"] = self.process( - requests.get( - endpoint_url, params=params, headers=self.headers(), timeout=60 - ) + requests.get(endpoint_url, params=params, headers=headers, timeout=60) ).text response = self.search_secrets(query_params=params) @@ -605,7 +717,8 @@ def get_child_folder_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - + headers = self.headers() + self.ensure_vault_url() params = { "filter.parentFolderId": folder_id, "filter.limitToDirectDescendents": True, @@ -614,7 +727,7 @@ def get_child_folder_ids_by_folderid(self, folder_id): endpoint_url = f"{self.api_url}/folders/lookup" params["take"] = self.process( - requests.get(endpoint_url, params=params, headers=self.headers()) + requests.get(endpoint_url, params=params, headers=headers) ).json()["total"] # Handle result of zero child folders if params["take"] != 0: @@ -651,13 +764,11 @@ def __init__( username, password, api_path_uri=SecretServer.API_PATH_URI, - token_path_uri=PasswordGrantAuthorizer.TOKEN_PATH_URI, + token_path_uri=None, ): super().__init__( base_url, - PasswordGrantAuthorizer( - f"{base_url}/{token_path_uri.strip('/')}", username, password - ), + PasswordGrantAuthorizer(f"{base_url}", username, password, token_path_uri), api_path_uri, ) @@ -676,5 +787,15 @@ class SecretServerCloud(SecretServer): DEFAULT_TLD = "com" URL_TEMPLATE = "https://{}.secretservercloud.{}" - def __init__(self, tenant, authorizer: Authorizer, tld=DEFAULT_TLD): - super().__init__(self.URL_TEMPLATE.format(tenant, tld), authorizer) + def __init__(self, tenant=None, authorizer=None, tld=DEFAULT_TLD, base_url=None): + if authorizer is None or not isinstance(authorizer, Authorizer): + raise ValueError( + "authorizer must be provided and must be of type Authorizer" + ) + if tenant: + url = self.URL_TEMPLATE.format(tenant, tld) + elif base_url: + url = base_url.rstrip("/") + else: + raise ValueError("Must provide either tenant or base_url") + super().__init__(url, authorizer) diff --git a/tests/test_server.py b/tests/test_server.py index 5675f51..9b76a35 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -20,6 +20,7 @@ def test_bad_url(env_vars, authorizer): def test_token_url(env_vars, authorizer): + authorizer.get_access_token() assert ( authorizer.token_url == f"https://{env_vars['tenant']}.secretservercloud.com/oauth2/token" @@ -77,3 +78,81 @@ def test_server_child_folder_ids_by_folderid(env_vars, secret_server): type(secret_server.get_child_folder_ids_by_folderid(env_vars["folder_id"])) is list ) + + +def test_platform_bad_url(platform_env_vars, platform_authorizer): + bad_server = SecretServer( + f"{platform_env_vars['platform_base_url']}/nonexistent", + platform_authorizer, + ) + with pytest.raises(SecretServerError): + bad_server.get_secret(platform_env_vars["secret_id"]) + + +def test_platform_token_url(platform_env_vars, platform_authorizer): + platform_authorizer.get_access_token() + assert ( + platform_authorizer.token_url + == f"{platform_env_vars['platform_base_url']}/identity/api/oauth2/token/xpmplatform" + ) + + +def test_platform_api_url(platform_server, platform_env_vars): + assert platform_server.api_url == f"{platform_env_vars['platform_base_url']}/api/v1" + + +def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer): + assert SecretServer( + platform_env_vars["platform_base_url"], + AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"), + ).get_secret(platform_env_vars["secret_id"])["id"] == int( + platform_env_vars["secret_id"] + ) + + +def test_platform_server_secret(platform_env_vars, platform_server): + assert ServerSecret( + **platform_server.get_secret(platform_env_vars["secret_id"]) + ).id == int(platform_env_vars["secret_id"]) + + +def test_platform_server_secret_by_path(platform_env_vars, platform_server): + assert ServerSecret( + **platform_server.get_secret_by_path(platform_env_vars["secret_path"]) + ).id == int(platform_env_vars["secret_id"]) + + +def test_platform_server_folder_by_path(platform_env_vars, platform_server): + assert ServerFolder( + **platform_server.get_folder_by_path(platform_env_vars["folder_path"]) + ).id == int(platform_env_vars["folder_id"]) + + +def test_platform_nonexistent_secret(platform_server): + with pytest.raises(SecretServerClientError): + platform_server.get_secret(1000) + + +def test_platform_nonexistent_folder(platform_server): + with pytest.raises(SecretServerClientError): + platform_server.get_folder(1000) + + +def test_platform_server_secret_ids_by_folderid(platform_env_vars, platform_server): + assert ( + type(platform_server.get_secret_ids_by_folderid(platform_env_vars["folder_id"])) + is list + ) + + +def test_platform_server_child_folder_ids_by_folderid( + platform_env_vars, platform_server +): + assert ( + type( + platform_server.get_child_folder_ids_by_folderid( + platform_env_vars["folder_id"] + ) + ) + is list + ) diff --git a/tox.ini b/tox.ini index 1489270..2420de2 100644 --- a/tox.ini +++ b/tox.ini @@ -6,7 +6,7 @@ # Docs for tox config -> https://tox.readthedocs.io/en/latest/config.html [tox] -envlist = 3.8, 3.9, 3.10, 3.11 +envlist = 3.8, 3.9, 3.10, 3.11, 3.12 isolated_build = True skipsdist = True @@ -23,5 +23,8 @@ passenv = TSS_SECRET_PATH TSS_FOLDER_ID TSS_FOLDER_PATH + TSS_PLATFORM_USERNAME + TSS_PLATFORM_PASSWORD + TSS_PLATFORM_BASE_URL commands = pytest