From 2f7941a17c8bc2dd401ff08a6d6062653b0dc554 Mon Sep 17 00:00:00 2001 From: sriram veeraghanta Date: Wed, 10 Jun 2026 11:32:13 +0530 Subject: [PATCH 1/2] fix(api): sanitize XLSX export cells to prevent formula injection (#9224) User-controlled values (work item titles, labels, etc.) were written raw into openpyxl worksheet cells, so values beginning with = were stored as live formula cells in exported XLSX files. Apply the same formula-trigger sanitization already used for CSV exports to XLSX cell values and header rows in both export formatters, and sanitize CSV header rows in the porters formatter for parity. --- .../utils/test_csv_export_sanitization.py | 34 ++++++ .../utils/test_xlsx_export_sanitization.py | 115 ++++++++++++++++++ apps/api/plane/utils/csv_utils.py | 6 +- apps/api/plane/utils/exporters/formatters.py | 2 +- apps/api/plane/utils/porters/formatters.py | 8 +- 5 files changed, 157 insertions(+), 8 deletions(-) create mode 100644 apps/api/plane/tests/unit/utils/test_csv_export_sanitization.py create mode 100644 apps/api/plane/tests/unit/utils/test_xlsx_export_sanitization.py diff --git a/apps/api/plane/tests/unit/utils/test_csv_export_sanitization.py b/apps/api/plane/tests/unit/utils/test_csv_export_sanitization.py new file mode 100644 index 00000000000..1290d5e270d --- /dev/null +++ b/apps/api/plane/tests/unit/utils/test_csv_export_sanitization.py @@ -0,0 +1,34 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +import csv +from io import StringIO + +import pytest + +from plane.utils.porters.formatters import CSVFormatter + + +def _read_rows(content): + return list(csv.reader(StringIO(content))) + + +@pytest.mark.unit +class TestPorterCSVFormatterSanitization: + """CSV exports must not emit formula-triggering values, including in header rows.""" + + def test_data_values_are_sanitized(self): + content = CSVFormatter().encode([{"name": "=1+2"}]) + rows = _read_rows(content) + assert rows[1][0] == "'=1+2" + + def test_prettified_headers_are_sanitized(self): + content = CSVFormatter().encode([{"=evil_header": "value"}]) + rows = _read_rows(content) + assert rows[0][0] == "'=Evil Header" + + def test_raw_headers_are_sanitized(self): + content = CSVFormatter(prettify_headers=False).encode([{"=evil": "value"}]) + rows = _read_rows(content) + assert rows[0][0] == "'=evil" diff --git a/apps/api/plane/tests/unit/utils/test_xlsx_export_sanitization.py b/apps/api/plane/tests/unit/utils/test_xlsx_export_sanitization.py new file mode 100644 index 00000000000..e1fd874f112 --- /dev/null +++ b/apps/api/plane/tests/unit/utils/test_xlsx_export_sanitization.py @@ -0,0 +1,115 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +from io import BytesIO + +import pytest +from openpyxl import load_workbook + +from plane.utils.exporters.formatters import XLSXFormatter as SchemaXLSXFormatter +from plane.utils.porters.formatters import XLSXFormatter as PorterXLSXFormatter + +# Characters that trigger formula evaluation in spreadsheet applications. +# See: https://owasp.org/www-community/attacks/CSV_Injection +FORMULA_TRIGGERS = ["=", "+", "-", "@", "\t", "\r", "\n"] + +HYPERLINK_PAYLOAD = '=HYPERLINK("https://example.com/poc","click")' + + +def _load_cells(xlsx_bytes): + """Load XLSX bytes and return the active worksheet's cells (formulas not evaluated).""" + wb = load_workbook(filename=BytesIO(xlsx_bytes)) + return wb.active + + +@pytest.mark.unit +class TestPorterXLSXFormatterSanitization: + """XLSX issue exports must not store user-controlled values as formula cells.""" + + def test_formula_payload_is_stored_as_text(self): + content = PorterXLSXFormatter().encode([{"name": HYPERLINK_PAYLOAD}]) + ws = _load_cells(content) + cell = ws.cell(row=2, column=1) + assert cell.data_type != "f" + assert cell.value == "'" + HYPERLINK_PAYLOAD + + @pytest.mark.parametrize("trigger", FORMULA_TRIGGERS) + def test_all_formula_trigger_characters_are_escaped(self, trigger): + payload = trigger + "1+2" + content = PorterXLSXFormatter().encode([{"name": payload}]) + ws = _load_cells(content) + cell = ws.cell(row=2, column=1) + assert cell.data_type != "f" + assert cell.value == "'" + payload + + def test_safe_string_is_unchanged(self): + content = PorterXLSXFormatter().encode([{"name": "Fix login bug"}]) + ws = _load_cells(content) + assert ws.cell(row=2, column=1).value == "Fix login bug" + + def test_non_string_values_are_preserved(self): + content = PorterXLSXFormatter().encode([{"estimate": 5}]) + ws = _load_cells(content) + cell = ws.cell(row=2, column=1) + assert cell.value == 5 + assert cell.data_type == "n" + + def test_list_value_joining_to_formula_is_escaped(self): + content = PorterXLSXFormatter().encode([{"labels": ["=cmd", "bug"]}]) + ws = _load_cells(content) + cell = ws.cell(row=2, column=1) + assert cell.data_type != "f" + assert cell.value == "'=cmd, bug" + + def test_headers_are_sanitized(self): + content = PorterXLSXFormatter().encode([{"=evil_header": "value"}]) + ws = _load_cells(content) + header = ws.cell(row=1, column=1) + assert header.data_type != "f" + assert header.value == "'=Evil Header" + + +class _FakeField: + def __init__(self, label=None): + self.label = label + + +class _FakeSchema: + _declared_fields = { + "name": _FakeField("Name"), + "estimate": _FakeField("=Estimate"), + } + + +@pytest.mark.unit +class TestSchemaXLSXFormatterSanitization: + """Schema-based XLSX exports must not store user-controlled values as formula cells.""" + + def test_formula_payload_is_stored_as_text(self): + _, content = SchemaXLSXFormatter().format("export", [{"name": HYPERLINK_PAYLOAD, "estimate": 5}], _FakeSchema) + ws = _load_cells(content) + cell = ws.cell(row=2, column=1) + assert cell.data_type != "f" + assert cell.value == "'" + HYPERLINK_PAYLOAD + + @pytest.mark.parametrize("trigger", FORMULA_TRIGGERS) + def test_all_formula_trigger_characters_are_escaped(self, trigger): + payload = trigger + "1+2" + _, content = SchemaXLSXFormatter().format("export", [{"name": payload, "estimate": 5}], _FakeSchema) + ws = _load_cells(content) + cell = ws.cell(row=2, column=1) + assert cell.data_type != "f" + assert cell.value == "'" + payload + + def test_safe_string_is_unchanged(self): + _, content = SchemaXLSXFormatter().format("export", [{"name": "Fix login bug", "estimate": 5}], _FakeSchema) + ws = _load_cells(content) + assert ws.cell(row=2, column=1).value == "Fix login bug" + + def test_headers_are_sanitized(self): + _, content = SchemaXLSXFormatter().format("export", [{"name": "ok", "estimate": 5}], _FakeSchema) + ws = _load_cells(content) + header = ws.cell(row=1, column=2) + assert header.data_type != "f" + assert header.value == "'=Estimate" diff --git a/apps/api/plane/utils/csv_utils.py b/apps/api/plane/utils/csv_utils.py index 26c6e893752..304f70fe598 100644 --- a/apps/api/plane/utils/csv_utils.py +++ b/apps/api/plane/utils/csv_utils.py @@ -2,13 +2,13 @@ # SPDX-License-Identifier: AGPL-3.0-only # See the LICENSE file for details. -# CSV utility functions for safe export +# Utility functions for safe spreadsheet exports (CSV and XLSX) # Characters that trigger formula evaluation in spreadsheet applications _CSV_FORMULA_TRIGGERS = frozenset(("=", "+", "-", "@", "\t", "\r", "\n")) def sanitize_csv_value(value): - """Sanitize a value for CSV export to prevent formula injection. + """Sanitize a value for CSV/XLSX export to prevent formula injection. Prefixes string values starting with formula-triggering characters with a single quote so spreadsheet applications treat them as text @@ -22,5 +22,5 @@ def sanitize_csv_value(value): def sanitize_csv_row(row): - """Sanitize all values in a CSV row.""" + """Sanitize all values in a CSV/XLSX row.""" return [sanitize_csv_value(v) for v in row] diff --git a/apps/api/plane/utils/exporters/formatters.py b/apps/api/plane/utils/exporters/formatters.py index 611a60fca4f..912c2b0a68c 100644 --- a/apps/api/plane/utils/exporters/formatters.py +++ b/apps/api/plane/utils/exporters/formatters.py @@ -175,7 +175,7 @@ def _create_xlsx_file(self, data: List[List[str]]) -> bytes: wb = Workbook() sh = wb.active for row in data: - sh.append(row) + sh.append(sanitize_csv_row(row)) out = io.BytesIO() wb.save(out) out.seek(0) diff --git a/apps/api/plane/utils/porters/formatters.py b/apps/api/plane/utils/porters/formatters.py index 461a6a5e427..c0f1f9b8e96 100644 --- a/apps/api/plane/utils/porters/formatters.py +++ b/apps/api/plane/utils/porters/formatters.py @@ -128,14 +128,14 @@ def encode(self, data: List[Dict]) -> str: # Write pretty headers manually, then write data rows writer = csv.writer(output, delimiter=self.delimiter) - writer.writerow(pretty_headers) + writer.writerow(sanitize_csv_row(pretty_headers)) # Write data rows in the same field order for row in data: writer.writerow(sanitize_csv_row([row.get(key, "") for key in fieldnames])) else: writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=self.delimiter) - writer.writeheader() + writer.writerow({k: sanitize_csv_value(k) for k in fieldnames}) for row in data: writer.writerow({k: sanitize_csv_value(row.get(k, "")) for k in fieldnames}) @@ -219,11 +219,11 @@ def encode(self, data: List[Dict]) -> bytes: headers = [self._prettify_header(key) for key in fieldnames] else: headers = fieldnames - ws.append(headers) + ws.append(sanitize_csv_row(headers)) # Write data rows for row in data: - ws.append([self._format_value(row.get(key, "")) for key in fieldnames]) + ws.append(sanitize_csv_row([self._format_value(row.get(key, "")) for key in fieldnames])) output = BytesIO() wb.save(output) From fd16d033fccb37f18e8794df739c8c24ba9d7939 Mon Sep 17 00:00:00 2001 From: sriram veeraghanta Date: Wed, 10 Jun 2026 11:36:45 +0530 Subject: [PATCH 2/2] fix(api): reject API key auth for deactivated user accounts (#9225) The custom API key authentication only verified that the APIToken row was active and unexpired; it never checked the owning user's is_active flag. DRF's IsAuthenticated only checks user.is_authenticated (always True for a real User), so a user whose account was deactivated could keep using a previously issued API key indefinitely. Add user__is_active=True to the validate_api_token() lookup so a token tied to a disabled account is treated as invalid (a generic AuthenticationFailed, avoiding account-state disclosure). Applied to both the external API middleware (plane/api) and the identical, currently unused copy in plane/app to prevent the gap from being reintroduced. Adds unit coverage on validate_api_token and an end-to-end contract test proving GET /api/v1/users/me/ is denied once the account is deactivated. --- .../api/middleware/api_authentication.py | 1 + .../app/middleware/api_authentication.py | 1 + .../tests/contract/api/test_authentication.py | 44 ++++++++++++++++++ .../middleware/test_api_authentication.py | 46 +++++++++++++++++++ 4 files changed, 92 insertions(+) create mode 100644 apps/api/plane/tests/contract/api/test_authentication.py create mode 100644 apps/api/plane/tests/unit/middleware/test_api_authentication.py diff --git a/apps/api/plane/api/middleware/api_authentication.py b/apps/api/plane/api/middleware/api_authentication.py index abd81398573..ed03bbb8d44 100644 --- a/apps/api/plane/api/middleware/api_authentication.py +++ b/apps/api/plane/api/middleware/api_authentication.py @@ -32,6 +32,7 @@ def validate_api_token(self, token): Q(Q(expired_at__gt=timezone.now()) | Q(expired_at__isnull=True)), token=token, is_active=True, + user__is_active=True, ) except APIToken.DoesNotExist: raise AuthenticationFailed("Given API token is not valid") diff --git a/apps/api/plane/app/middleware/api_authentication.py b/apps/api/plane/app/middleware/api_authentication.py index abd81398573..ed03bbb8d44 100644 --- a/apps/api/plane/app/middleware/api_authentication.py +++ b/apps/api/plane/app/middleware/api_authentication.py @@ -32,6 +32,7 @@ def validate_api_token(self, token): Q(Q(expired_at__gt=timezone.now()) | Q(expired_at__isnull=True)), token=token, is_active=True, + user__is_active=True, ) except APIToken.DoesNotExist: raise AuthenticationFailed("Given API token is not valid") diff --git a/apps/api/plane/tests/contract/api/test_authentication.py b/apps/api/plane/tests/contract/api/test_authentication.py new file mode 100644 index 00000000000..d240567cccf --- /dev/null +++ b/apps/api/plane/tests/contract/api/test_authentication.py @@ -0,0 +1,44 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +""" +Contract tests for external API key authentication. + +End-to-end proof that an API key cannot be used to access the API once the +owning user account has been deactivated. +""" + +import pytest +from rest_framework import status + + +@pytest.mark.contract +class TestAPIKeyAuthenticationContract: + """Test API key authentication behaviour against a live endpoint.""" + + USERS_ME_URL = "/api/v1/users/me/" + + @pytest.mark.django_db + def test_active_user_can_access_with_api_key(self, api_key_client): + response = api_key_client.get(self.USERS_ME_URL) + + assert response.status_code == status.HTTP_200_OK + + @pytest.mark.django_db + def test_deactivated_user_cannot_access_with_api_key( + self, api_key_client, create_user + ): + # The account is disabled after the API key was generated. + create_user.is_active = False + create_user.save() + + response = api_key_client.get(self.USERS_ME_URL) + + # Access is denied once the account is deactivated. APIKeyAuthentication + # does not set a WWW-Authenticate header, so DRF surfaces the + # AuthenticationFailed as 403 Forbidden rather than 401. + assert response.status_code in ( + status.HTTP_401_UNAUTHORIZED, + status.HTTP_403_FORBIDDEN, + ) diff --git a/apps/api/plane/tests/unit/middleware/test_api_authentication.py b/apps/api/plane/tests/unit/middleware/test_api_authentication.py new file mode 100644 index 00000000000..5f67537c228 --- /dev/null +++ b/apps/api/plane/tests/unit/middleware/test_api_authentication.py @@ -0,0 +1,46 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +""" +Unit tests for APIKeyAuthentication. + +Covers the access-control guarantees of the external API key authentication: +- a valid token belonging to an active user authenticates successfully +- a valid token is rejected once the underlying user account is deactivated + (prevents an authentication bypass via a disabled account that still holds + a previously generated API key) +""" + +import pytest +from rest_framework.exceptions import AuthenticationFailed + +from plane.api.middleware.api_authentication import APIKeyAuthentication +from plane.db.models import APIToken + + +@pytest.mark.unit +class TestAPIKeyAuthentication: + @pytest.mark.django_db + def test_validate_api_token_authenticates_active_user(self, create_user): + token = APIToken.objects.create( + user=create_user, label="Active Token", token="active-user-token" + ) + + user, returned_token = APIKeyAuthentication().validate_api_token(token.token) + + assert user == create_user + assert returned_token == token.token + + @pytest.mark.django_db + def test_validate_api_token_rejects_deactivated_user(self, create_user): + token = APIToken.objects.create( + user=create_user, label="Stale Token", token="deactivated-user-token" + ) + + # Account is deactivated by an administrator after the token was issued. + create_user.is_active = False + create_user.save() + + with pytest.raises(AuthenticationFailed): + APIKeyAuthentication().validate_api_token(token.token)