Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/api/plane/api/middleware/api_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def validate_api_token(self, token):
Q(Q(expired_at__gt=timezone.now()) | Q(expired_at__isnull=True)),
token=token,
is_active=True,
user__is_active=True,
)
except APIToken.DoesNotExist:
raise AuthenticationFailed("Given API token is not valid")
Expand Down
1 change: 1 addition & 0 deletions apps/api/plane/app/middleware/api_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def validate_api_token(self, token):
Q(Q(expired_at__gt=timezone.now()) | Q(expired_at__isnull=True)),
token=token,
is_active=True,
user__is_active=True,
)
except APIToken.DoesNotExist:
raise AuthenticationFailed("Given API token is not valid")
Expand Down
44 changes: 44 additions & 0 deletions apps/api/plane/tests/contract/api/test_authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

"""
Contract tests for external API key authentication.

End-to-end proof that an API key cannot be used to access the API once the
owning user account has been deactivated.
"""

import pytest
from rest_framework import status


@pytest.mark.contract
class TestAPIKeyAuthenticationContract:
"""Test API key authentication behaviour against a live endpoint."""

USERS_ME_URL = "/api/v1/users/me/"

@pytest.mark.django_db
def test_active_user_can_access_with_api_key(self, api_key_client):
response = api_key_client.get(self.USERS_ME_URL)

assert response.status_code == status.HTTP_200_OK

@pytest.mark.django_db
def test_deactivated_user_cannot_access_with_api_key(
self, api_key_client, create_user
):
# The account is disabled after the API key was generated.
create_user.is_active = False
create_user.save()

response = api_key_client.get(self.USERS_ME_URL)

# Access is denied once the account is deactivated. APIKeyAuthentication
# does not set a WWW-Authenticate header, so DRF surfaces the
# AuthenticationFailed as 403 Forbidden rather than 401.
assert response.status_code in (
status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN,
)
46 changes: 46 additions & 0 deletions apps/api/plane/tests/unit/middleware/test_api_authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

"""
Unit tests for APIKeyAuthentication.

Covers the access-control guarantees of the external API key authentication:
- a valid token belonging to an active user authenticates successfully
- a valid token is rejected once the underlying user account is deactivated
(prevents an authentication bypass via a disabled account that still holds
a previously generated API key)
"""

import pytest
from rest_framework.exceptions import AuthenticationFailed

from plane.api.middleware.api_authentication import APIKeyAuthentication
from plane.db.models import APIToken


@pytest.mark.unit
class TestAPIKeyAuthentication:
@pytest.mark.django_db
def test_validate_api_token_authenticates_active_user(self, create_user):
token = APIToken.objects.create(
user=create_user, label="Active Token", token="active-user-token"
)

user, returned_token = APIKeyAuthentication().validate_api_token(token.token)

assert user == create_user
assert returned_token == token.token

@pytest.mark.django_db
def test_validate_api_token_rejects_deactivated_user(self, create_user):
token = APIToken.objects.create(
user=create_user, label="Stale Token", token="deactivated-user-token"
)

# Account is deactivated by an administrator after the token was issued.
create_user.is_active = False
create_user.save()

with pytest.raises(AuthenticationFailed):
APIKeyAuthentication().validate_api_token(token.token)
34 changes: 34 additions & 0 deletions apps/api/plane/tests/unit/utils/test_csv_export_sanitization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

import csv
from io import StringIO

import pytest

from plane.utils.porters.formatters import CSVFormatter


def _read_rows(content):
return list(csv.reader(StringIO(content)))


@pytest.mark.unit
class TestPorterCSVFormatterSanitization:
"""CSV exports must not emit formula-triggering values, including in header rows."""

def test_data_values_are_sanitized(self):
content = CSVFormatter().encode([{"name": "=1+2"}])
rows = _read_rows(content)
assert rows[1][0] == "'=1+2"

def test_prettified_headers_are_sanitized(self):
content = CSVFormatter().encode([{"=evil_header": "value"}])
rows = _read_rows(content)
assert rows[0][0] == "'=Evil Header"

def test_raw_headers_are_sanitized(self):
content = CSVFormatter(prettify_headers=False).encode([{"=evil": "value"}])
rows = _read_rows(content)
assert rows[0][0] == "'=evil"
115 changes: 115 additions & 0 deletions apps/api/plane/tests/unit/utils/test_xlsx_export_sanitization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

from io import BytesIO

import pytest
from openpyxl import load_workbook

from plane.utils.exporters.formatters import XLSXFormatter as SchemaXLSXFormatter
from plane.utils.porters.formatters import XLSXFormatter as PorterXLSXFormatter

# Characters that trigger formula evaluation in spreadsheet applications.
# See: https://owasp.org/www-community/attacks/CSV_Injection
FORMULA_TRIGGERS = ["=", "+", "-", "@", "\t", "\r", "\n"]

HYPERLINK_PAYLOAD = '=HYPERLINK("https://example.com/poc","click")'


def _load_cells(xlsx_bytes):
"""Load XLSX bytes and return the active worksheet's cells (formulas not evaluated)."""
wb = load_workbook(filename=BytesIO(xlsx_bytes))
return wb.active


@pytest.mark.unit
class TestPorterXLSXFormatterSanitization:
"""XLSX issue exports must not store user-controlled values as formula cells."""

def test_formula_payload_is_stored_as_text(self):
content = PorterXLSXFormatter().encode([{"name": HYPERLINK_PAYLOAD}])
ws = _load_cells(content)
cell = ws.cell(row=2, column=1)
assert cell.data_type != "f"
assert cell.value == "'" + HYPERLINK_PAYLOAD

@pytest.mark.parametrize("trigger", FORMULA_TRIGGERS)
def test_all_formula_trigger_characters_are_escaped(self, trigger):
payload = trigger + "1+2"
content = PorterXLSXFormatter().encode([{"name": payload}])
ws = _load_cells(content)
cell = ws.cell(row=2, column=1)
assert cell.data_type != "f"
assert cell.value == "'" + payload

def test_safe_string_is_unchanged(self):
content = PorterXLSXFormatter().encode([{"name": "Fix login bug"}])
ws = _load_cells(content)
assert ws.cell(row=2, column=1).value == "Fix login bug"

def test_non_string_values_are_preserved(self):
content = PorterXLSXFormatter().encode([{"estimate": 5}])
ws = _load_cells(content)
cell = ws.cell(row=2, column=1)
assert cell.value == 5
assert cell.data_type == "n"

def test_list_value_joining_to_formula_is_escaped(self):
content = PorterXLSXFormatter().encode([{"labels": ["=cmd", "bug"]}])
ws = _load_cells(content)
cell = ws.cell(row=2, column=1)
assert cell.data_type != "f"
assert cell.value == "'=cmd, bug"

def test_headers_are_sanitized(self):
content = PorterXLSXFormatter().encode([{"=evil_header": "value"}])
ws = _load_cells(content)
header = ws.cell(row=1, column=1)
assert header.data_type != "f"
assert header.value == "'=Evil Header"


class _FakeField:
def __init__(self, label=None):
self.label = label


class _FakeSchema:
_declared_fields = {
"name": _FakeField("Name"),
"estimate": _FakeField("=Estimate"),
}


@pytest.mark.unit
class TestSchemaXLSXFormatterSanitization:
"""Schema-based XLSX exports must not store user-controlled values as formula cells."""

def test_formula_payload_is_stored_as_text(self):
_, content = SchemaXLSXFormatter().format("export", [{"name": HYPERLINK_PAYLOAD, "estimate": 5}], _FakeSchema)
ws = _load_cells(content)
cell = ws.cell(row=2, column=1)
assert cell.data_type != "f"
assert cell.value == "'" + HYPERLINK_PAYLOAD

@pytest.mark.parametrize("trigger", FORMULA_TRIGGERS)
def test_all_formula_trigger_characters_are_escaped(self, trigger):
payload = trigger + "1+2"
_, content = SchemaXLSXFormatter().format("export", [{"name": payload, "estimate": 5}], _FakeSchema)
ws = _load_cells(content)
cell = ws.cell(row=2, column=1)
assert cell.data_type != "f"
assert cell.value == "'" + payload

def test_safe_string_is_unchanged(self):
_, content = SchemaXLSXFormatter().format("export", [{"name": "Fix login bug", "estimate": 5}], _FakeSchema)
ws = _load_cells(content)
assert ws.cell(row=2, column=1).value == "Fix login bug"

def test_headers_are_sanitized(self):
_, content = SchemaXLSXFormatter().format("export", [{"name": "ok", "estimate": 5}], _FakeSchema)
ws = _load_cells(content)
header = ws.cell(row=1, column=2)
assert header.data_type != "f"
assert header.value == "'=Estimate"
6 changes: 3 additions & 3 deletions apps/api/plane/utils/csv_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

# CSV utility functions for safe export
# Utility functions for safe spreadsheet exports (CSV and XLSX)
# Characters that trigger formula evaluation in spreadsheet applications
_CSV_FORMULA_TRIGGERS = frozenset(("=", "+", "-", "@", "\t", "\r", "\n"))


def sanitize_csv_value(value):
"""Sanitize a value for CSV export to prevent formula injection.
"""Sanitize a value for CSV/XLSX export to prevent formula injection.

Prefixes string values starting with formula-triggering characters
with a single quote so spreadsheet applications treat them as text
Expand All @@ -22,5 +22,5 @@ def sanitize_csv_value(value):


def sanitize_csv_row(row):
"""Sanitize all values in a CSV row."""
"""Sanitize all values in a CSV/XLSX row."""
return [sanitize_csv_value(v) for v in row]
2 changes: 1 addition & 1 deletion apps/api/plane/utils/exporters/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _create_xlsx_file(self, data: List[List[str]]) -> bytes:
wb = Workbook()
sh = wb.active
for row in data:
sh.append(row)
sh.append(sanitize_csv_row(row))
out = io.BytesIO()
wb.save(out)
out.seek(0)
Expand Down
8 changes: 4 additions & 4 deletions apps/api/plane/utils/porters/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ def encode(self, data: List[Dict]) -> str:

# Write pretty headers manually, then write data rows
writer = csv.writer(output, delimiter=self.delimiter)
writer.writerow(pretty_headers)
writer.writerow(sanitize_csv_row(pretty_headers))

# Write data rows in the same field order
for row in data:
writer.writerow(sanitize_csv_row([row.get(key, "") for key in fieldnames]))
else:
writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=self.delimiter)
writer.writeheader()
writer.writerow({k: sanitize_csv_value(k) for k in fieldnames})
for row in data:
writer.writerow({k: sanitize_csv_value(row.get(k, "")) for k in fieldnames})

Expand Down Expand Up @@ -219,11 +219,11 @@ def encode(self, data: List[Dict]) -> bytes:
headers = [self._prettify_header(key) for key in fieldnames]
else:
headers = fieldnames
ws.append(headers)
ws.append(sanitize_csv_row(headers))

# Write data rows
for row in data:
ws.append([self._format_value(row.get(key, "")) for key in fieldnames])
ws.append(sanitize_csv_row([self._format_value(row.get(key, "")) for key in fieldnames]))

output = BytesIO()
wb.save(output)
Expand Down
Loading