Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/capacium_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
VerificationRecord,
SearchQuery,
)
from .trust import TrustState, TrustMachine
from .trust import TrustState, TrustMachine, TRUST_BADGES, get_trust_badge
from .search import ExchangeSearch

__all__ = [
Expand All @@ -37,4 +37,6 @@
"VerificationRecord",
"SearchQuery",
"ExchangeSearch",
"TRUST_BADGES",
"get_trust_badge",
]
51 changes: 49 additions & 2 deletions src/capacium_models/trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Legacy states (indexed, claimed, audited) are mapped via normalize_legacy_state().
"""

from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import Dict, List

Expand Down Expand Up @@ -159,6 +159,53 @@ def transition(

listing.trust_state = to_state.value
listing.trust_history.append(transition.to_dict())
listing.updated_at = datetime.utcnow().isoformat()
listing.updated_at = datetime.now(timezone.utc).isoformat()

return listing


# ── V2 Trust Badge Dictionary (single source of truth for all surfaces) ──

TRUST_BADGES: Dict[str, Dict[str, str]] = {
"discovered": {
"label": "Discovered",
"symbol": "○",
"color": "grey",
"description": "Found by crawler, minimal validation",
"order": 0,
},
"pending_review": {
"label": "Pending Review",
"symbol": "◉",
"color": "yellow",
"description": "Schema validated, composite score computed",
"order": 1,
},
"verified": {
"label": "Verified",
"symbol": "✓",
"color": "green",
"description": "Publisher claimed and ownership verified",
"order": 2,
},
"signed": {
"label": "Signed",
"symbol": "✦",
"color": "blue",
"description": "Cryptographic signature present and valid",
"order": 3,
},
"deprecated": {
"label": "Deprecated",
"symbol": "✗",
"color": "red",
"description": "End-of-life, superseded, or abandoned",
"order": 4,
},
}


def get_trust_badge(trust_state: str) -> Dict[str, str]:
"""Return the badge definition for a trust state. Falls back to 'discovered' for unknown states."""
normalized = normalize_legacy_state(trust_state)
return TRUST_BADGES.get(normalized, TRUST_BADGES["discovered"])
72 changes: 36 additions & 36 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,27 @@
class TestTrustState:
def test_values(self):
assert TrustState.DISCOVERED.value == "discovered"
assert TrustState.AUDITED.value == "audited"
assert TrustState.PENDING_REVIEW.value == "pending_review"
assert TrustState.VERIFIED.value == "verified"
assert TrustState.SIGNED.value == "signed"

def test_ordering(self):
ordering = TrustState.ordering()
assert len(ordering) == 4
assert len(ordering) == 5
assert ordering[0] == TrustState.DISCOVERED
assert ordering[-1] == TrustState.SIGNED
assert ordering[-1] == TrustState.DEPRECATED

def test_from_string(self):
assert TrustState("discovered") == TrustState.DISCOVERED
assert TrustState("verified") == TrustState.VERIFIED
assert TrustState("signed") == TrustState.SIGNED

def test_comparison(self):
assert TrustState.DISCOVERED < TrustState.AUDITED
assert TrustState.AUDITED < TrustState.VERIFIED
assert TrustState.DISCOVERED < TrustState.PENDING_REVIEW
assert TrustState.PENDING_REVIEW < TrustState.VERIFIED
assert TrustState.VERIFIED < TrustState.SIGNED
assert TrustState.VERIFIED >= TrustState.VERIFIED
assert TrustState.SIGNED >= TrustState.AUDITED
assert TrustState.SIGNED >= TrustState.PENDING_REVIEW


class TestTrustMachine:
Expand All @@ -60,54 +60,54 @@ def setup_method(self):
# -- Valid transitions --

def test_can_transition_valid_forward(self):
assert TrustMachine.can_transition(TrustState.DISCOVERED, TrustState.AUDITED) is True
assert TrustMachine.can_transition(TrustState.AUDITED, TrustState.VERIFIED) is True
assert TrustMachine.can_transition(TrustState.DISCOVERED, TrustState.PENDING_REVIEW) is True
assert TrustMachine.can_transition(TrustState.PENDING_REVIEW, TrustState.VERIFIED) is True
assert TrustMachine.can_transition(TrustState.VERIFIED, TrustState.SIGNED) is True

def test_can_transition_valid_downgrade(self):
assert TrustMachine.can_transition(TrustState.AUDITED, TrustState.DISCOVERED) is True
assert TrustMachine.can_transition(TrustState.VERIFIED, TrustState.AUDITED) is True
assert TrustMachine.can_transition(TrustState.PENDING_REVIEW, TrustState.DISCOVERED) is True
assert TrustMachine.can_transition(TrustState.VERIFIED, TrustState.PENDING_REVIEW) is True
assert TrustMachine.can_transition(TrustState.SIGNED, TrustState.VERIFIED) is True

# -- Invalid transitions --

def test_can_transition_invalid(self):
assert TrustMachine.can_transition(TrustState.DISCOVERED, TrustState.VERIFIED) is False
assert TrustMachine.can_transition(TrustState.DISCOVERED, TrustState.SIGNED) is False
assert TrustMachine.can_transition(TrustState.AUDITED, TrustState.SIGNED) is False
assert TrustMachine.can_transition(TrustState.PENDING_REVIEW, TrustState.SIGNED) is False

# -- Transition execution --

def test_transition_discovered_to_audited(self):
def test_transition_discovered_to_pending_review(self):
result = TrustMachine.transition(
self.listing, TrustState.AUDITED, "Schema validated"
self.listing, TrustState.PENDING_REVIEW, "Schema validated"
)
assert result.trust_state == "audited"
assert result.trust_state == "pending_review"
assert len(result.trust_history) == 1
assert result.trust_history[0]["from_state"] == "discovered"
assert result.trust_history[0]["to_state"] == "audited"
assert result.trust_history[0]["to_state"] == "pending_review"
# indexed_at no longer set in v2 trust model (indexed state removed)

def test_transition_audited_to_verified_requires_publisher(self):
def test_transition_pending_review_to_verified_requires_publisher(self):
listing = Listing(
canonical_name="org/test",
canonical_source_url="https://github.com/org/test",
short_description="test",
primary_category="tools",
)
TrustMachine.transition(listing, TrustState.AUDITED, "ok")
TrustMachine.transition(listing, TrustState.PENDING_REVIEW, "ok")
with pytest.raises(TrustTransitionError, match="publisher_id"):
TrustMachine.transition(listing, TrustState.VERIFIED, "no publisher")

def test_transition_audited_to_verified_with_publisher(self):
def test_transition_pending_review_to_verified_with_publisher(self):
listing = Listing(
canonical_name="org/test",
canonical_source_url="https://github.com/org/test",
short_description="test",
primary_category="tools",
publisher_id="pub-1",
)
TrustMachine.transition(listing, TrustState.AUDITED, "ok")
TrustMachine.transition(listing, TrustState.PENDING_REVIEW, "ok")
result = TrustMachine.transition(listing, TrustState.VERIFIED, "publisher verified")
assert result.trust_state == "verified"

Expand All @@ -119,7 +119,7 @@ def test_transition_verified_to_signed_requires_signature(self):
primary_category="tools",
publisher_id="pub-1",
)
TrustMachine.transition(listing, TrustState.AUDITED, "ok")
TrustMachine.transition(listing, TrustState.PENDING_REVIEW, "ok")
TrustMachine.transition(listing, TrustState.VERIFIED, "ok")
with pytest.raises(TrustTransitionError, match="fingerprint"):
TrustMachine.transition(listing, TrustState.SIGNED, "no signature")
Expand All @@ -134,7 +134,7 @@ def test_transition_verified_to_signed_with_signature(self):
fingerprint="sha256:abc123",
signature_value="sig-xyz",
)
TrustMachine.transition(listing, TrustState.AUDITED, "ok")
TrustMachine.transition(listing, TrustState.PENDING_REVIEW, "ok")
TrustMachine.transition(listing, TrustState.VERIFIED, "ok")
result = TrustMachine.transition(listing, TrustState.SIGNED, "signed")
assert result.trust_state == "signed"
Expand All @@ -149,21 +149,21 @@ def test_transition_invalid_raises(self):

# -- Downgrade transitions --

def test_downgrade_verified_to_audited(self):
def test_downgrade_verified_to_pending_review(self):
listing = Listing(
canonical_name="org/test",
canonical_source_url="https://github.com/org/test",
short_description="test",
primary_category="tools",
publisher_id="pub-1",
)
TrustMachine.transition(listing, TrustState.AUDITED, "ok")
TrustMachine.transition(listing, TrustState.PENDING_REVIEW, "ok")
TrustMachine.transition(listing, TrustState.VERIFIED, "ok")
result = TrustMachine.transition(listing, TrustState.AUDITED, "revoked")
assert result.trust_state == "audited"
result = TrustMachine.transition(listing, TrustState.PENDING_REVIEW, "revoked")
assert result.trust_state == "pending_review"

def test_downgrade_audited_to_discovered(self):
TrustMachine.transition(self.listing, TrustState.AUDITED, "ok")
def test_downgrade_pending_review_to_discovered(self):
TrustMachine.transition(self.listing, TrustState.PENDING_REVIEW, "ok")
result = TrustMachine.transition(self.listing, TrustState.DISCOVERED, "invalidated")
assert result.trust_state == "discovered"

Expand All @@ -179,23 +179,23 @@ def test_full_lifecycle(self):
fingerprint="sha256:abc",
signature_value="sig-abc",
)
TrustMachine.transition(listing, TrustState.AUDITED, "schema ok")
TrustMachine.transition(listing, TrustState.PENDING_REVIEW, "schema ok")
TrustMachine.transition(listing, TrustState.VERIFIED, "publisher verified")
TrustMachine.transition(listing, TrustState.SIGNED, "cryptographic sig")
assert listing.trust_state == "signed"
assert len(listing.trust_history) == 3

# -- Entry criteria --

def test_entry_criteria_audited(self):
def test_entry_criteria_pending_review(self):
lst = Listing(canonical_name="", trust_state="discovered")
errors = TrustMachine.validate_entry_criteria(lst, TrustState.AUDITED)
errors = TrustMachine.validate_entry_criteria(lst, TrustState.PENDING_REVIEW)
assert any("canonical_name" in e for e in errors)
assert any("short_description" in e for e in errors)
assert any("canonical_source_url" in e for e in errors)

def test_entry_criteria_verified_requires_publisher(self):
lst = Listing(trust_state="audited")
lst = Listing(trust_state="pending_review")
errors = TrustMachine.validate_entry_criteria(lst, TrustState.VERIFIED)
assert any("publisher_id" in e for e in errors)

Expand All @@ -208,14 +208,14 @@ def test_entry_criteria_signed_requires_fingerprint_and_signature(self):
# -- Backward compatibility --

def test_normalize_legacy_state_indexed(self):
assert TrustMachine.normalize_legacy_state("indexed") == "audited"
assert TrustMachine.normalize_legacy_state("indexed") == "pending_review"

def test_normalize_legacy_state_claimed(self):
assert TrustMachine.normalize_legacy_state("claimed") == "verified"

def test_normalize_legacy_state_passthrough(self):
assert TrustMachine.normalize_legacy_state("discovered") == "discovered"
assert TrustMachine.normalize_legacy_state("audited") == "audited"
assert TrustMachine.normalize_legacy_state("pending_review") == "pending_review"
assert TrustMachine.normalize_legacy_state("verified") == "verified"
assert TrustMachine.normalize_legacy_state("signed") == "signed"

Expand Down Expand Up @@ -366,22 +366,22 @@ def test_from_json_empty(self):

class TestTrustTransition:
def test_defaults(self):
t = TrustTransition(from_state="discovered", to_state="audited")
t = TrustTransition(from_state="discovered", to_state="pending_review")
assert t.timestamp != ""
assert t.triggered_by == "system"
assert t.reason == ""

def test_roundtrip(self):
t = TrustTransition(
from_state="discovered",
to_state="audited",
to_state="pending_review",
reason="Schema validated",
triggered_by="crawler",
)
d = t.to_dict()
t2 = TrustTransition.from_dict(d)
assert t2.from_state == "discovered"
assert t2.to_state == "audited"
assert t2.to_state == "pending_review"
assert t2.reason == "Schema validated"
assert t2.triggered_by == "crawler"

Expand Down
Loading