diff --git a/.fides/db_dataset.yml b/.fides/db_dataset.yml index f64eaf39673..e368b3b1746 100644 --- a/.fides/db_dataset.yml +++ b/.fides/db_dataset.yml @@ -2069,6 +2069,101 @@ dataset: data_categories: [system.operations] - name: is_hash_migrated data_categories: [system.operations] + - name: rbac_permission + description: 'Permission definitions for RBAC system' + fields: + - name: id + data_categories: [system.operations] + - name: created_at + data_categories: [system.operations] + - name: updated_at + data_categories: [system.operations] + - name: code + data_categories: [system.operations] + - name: description + data_categories: [system.operations] + - name: resource_type + data_categories: [system.operations] + - name: is_active + data_categories: [system.operations] + - name: rbac_role + description: 'Role definitions for RBAC system with hierarchy support' + fields: + - name: id + data_categories: [system.operations] + - name: created_at + data_categories: [system.operations] + - name: updated_at + data_categories: [system.operations] + - name: name + data_categories: [system.operations] + - name: key + data_categories: [system.operations] + - name: description + data_categories: [system.operations] + - name: is_system_role + data_categories: [system.operations] + - name: is_active + data_categories: [system.operations] + - name: parent_role_id + data_categories: [system.operations] + - name: priority + data_categories: [system.operations] + - name: rbac_role_constraint + description: 'Separation of duties and cardinality constraints for RBAC roles' + fields: + - name: id + data_categories: [system.operations] + - name: created_at + data_categories: [system.operations] + - name: updated_at + data_categories: [system.operations] + - name: name + data_categories: [system.operations] + - name: constraint_type + data_categories: [system.operations] + - name: role_id_1 + data_categories: [system.operations] + - name: role_id_2 + data_categories: [system.operations] + - name: max_users + data_categories: [system.operations] + - name: description + data_categories: [system.operations] + - name: is_active + data_categories: [system.operations] + - name: rbac_role_permission + description: 'Junction table mapping roles to permissions' + fields: + - name: role_id + data_categories: [system.operations] + - name: permission_id + data_categories: [system.operations] + - name: created_at + data_categories: [system.operations] + - name: rbac_user_role + description: 'User role assignments with resource scoping and temporal validity' + fields: + - name: id + data_categories: [system.operations] + - name: created_at + data_categories: [system.operations] + - name: updated_at + data_categories: [system.operations] + - name: user_id + data_categories: [system.operations] + - name: role_id + data_categories: [system.operations] + - name: resource_type + data_categories: [system.operations] + - name: resource_id + data_categories: [system.operations] + - name: valid_from + data_categories: [system.operations] + - name: valid_until + data_categories: [system.operations] + - name: assigned_by + data_categories: [system.operations] - name: rule data_categories: [] fields: diff --git a/changelog/7285.yaml b/changelog/7285.yaml new file mode 100644 index 00000000000..be86a78908a --- /dev/null +++ b/changelog/7285.yaml @@ -0,0 +1,4 @@ +type: Added +description: Database migrations and models for dynamic RBAC system (roles, permissions, user assignments, constraints) +pr: 7285 +labels: ["db-migration"] diff --git a/src/fides/api/alembic/migrations/versions/xx_2026_01_31_1000_d9ee4ea46797_add_rbac_tables.py b/src/fides/api/alembic/migrations/versions/xx_2026_01_31_1000_d9ee4ea46797_add_rbac_tables.py new file mode 100644 index 00000000000..257497f3a1a --- /dev/null +++ b/src/fides/api/alembic/migrations/versions/xx_2026_01_31_1000_d9ee4ea46797_add_rbac_tables.py @@ -0,0 +1,401 @@ +"""Add RBAC tables for dynamic role-based access control + +Revision ID: d9ee4ea46797 +Revises: f85bd4c08401 +Create Date: 2026-01-31 10:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "d9ee4ea46797" +down_revision = "d304f57aea6d" +branch_labels = None +depends_on = None + + +def upgrade(): + # Create rbac_role table + op.create_table( + "rbac_role", + sa.Column("id", sa.String(length=255), nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "name", + sa.String(length=255), + nullable=False, + comment="Human-readable display name for the role", + ), + sa.Column( + "key", + sa.String(length=255), + nullable=False, + comment="Machine-readable key for the role, e.g., 'owner', 'custom_auditor'", + ), + sa.Column( + "description", + sa.Text(), + nullable=True, + comment="Description of the role's purpose and access level", + ), + sa.Column( + "is_system_role", + sa.Boolean(), + server_default="false", + nullable=False, + comment="True for built-in system roles that cannot be deleted", + ), + sa.Column( + "is_active", + sa.Boolean(), + server_default="true", + nullable=False, + comment="Whether this role can be assigned to users", + ), + sa.Column( + "parent_role_id", + sa.String(length=255), + nullable=True, + comment="Parent role ID for hierarchy. Child roles inherit parent permissions.", + ), + sa.Column( + "priority", + sa.Integer(), + server_default="0", + nullable=False, + comment="Priority for conflict resolution. Higher values = more privileges.", + ), + sa.ForeignKeyConstraint( + ["parent_role_id"], + ["rbac_role.id"], + ondelete="SET NULL", + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("name", name="uq_rbac_role_name"), + ) + op.create_index(op.f("ix_rbac_role_id"), "rbac_role", ["id"], unique=False) + op.create_index(op.f("ix_rbac_role_key"), "rbac_role", ["key"], unique=True) + op.create_index( + op.f("ix_rbac_role_parent_role_id"), + "rbac_role", + ["parent_role_id"], + unique=False, + ) + + # Create rbac_permission table + op.create_table( + "rbac_permission", + sa.Column("id", sa.String(length=255), nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "code", + sa.String(length=255), + nullable=False, + comment="Unique permission code, e.g., 'system:read', 'privacy-request:create'", + ), + sa.Column( + "description", + sa.Text(), + nullable=True, + comment="Human-readable description of what this permission allows", + ), + sa.Column( + "resource_type", + sa.String(length=100), + nullable=True, + comment="Resource type this permission applies to, e.g., 'system', 'privacy_request'. NULL for global permissions.", + ), + sa.Column( + "is_active", + sa.Boolean(), + server_default="true", + nullable=False, + comment="Whether this permission is currently active and can be assigned", + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_rbac_permission_id"), "rbac_permission", ["id"], unique=False + ) + op.create_index( + op.f("ix_rbac_permission_code"), "rbac_permission", ["code"], unique=True + ) + op.create_index( + op.f("ix_rbac_permission_resource_type"), + "rbac_permission", + ["resource_type"], + unique=False, + ) + + # Create rbac_role_permission junction table (composite PK) + op.create_table( + "rbac_role_permission", + sa.Column( + "role_id", + sa.String(length=255), + nullable=False, + comment="FK to rbac_role", + ), + sa.Column( + "permission_id", + sa.String(length=255), + nullable=False, + comment="FK to rbac_permission", + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + comment="When this permission was assigned to the role", + ), + sa.ForeignKeyConstraint( + ["role_id"], + ["rbac_role.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["permission_id"], + ["rbac_permission.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("role_id", "permission_id"), + ) + + # Create rbac_user_role table + op.create_table( + "rbac_user_role", + sa.Column("id", sa.String(length=255), nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "user_id", + sa.String(length=255), + nullable=False, + comment="FK to fidesuser", + ), + sa.Column( + "role_id", + sa.String(length=255), + nullable=False, + comment="FK to rbac_role", + ), + sa.Column( + "resource_type", + sa.String(length=100), + nullable=True, + comment="Resource type for scoped permissions, e.g., 'system'. NULL for global.", + ), + sa.Column( + "resource_id", + sa.String(length=255), + nullable=True, + comment="Specific resource ID for scoped permissions. NULL for global.", + ), + sa.Column( + "valid_from", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + comment="When this assignment becomes active. NULL means immediately.", + ), + sa.Column( + "valid_until", + sa.DateTime(timezone=True), + nullable=True, + comment="When this assignment expires. NULL means never expires.", + ), + sa.Column( + "assigned_by", + sa.String(length=255), + nullable=True, + comment="User ID of who created this assignment", + ), + sa.ForeignKeyConstraint( + ["user_id"], + ["fidesuser.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["role_id"], + ["rbac_role.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["assigned_by"], + ["fidesuser.id"], + ondelete="SET NULL", + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "user_id", + "role_id", + "resource_type", + "resource_id", + name="uq_rbac_user_role_assignment", + ), + ) + op.create_index( + op.f("ix_rbac_user_role_id"), "rbac_user_role", ["id"], unique=False + ) + op.create_index( + op.f("ix_rbac_user_role_user_id"), "rbac_user_role", ["user_id"], unique=False + ) + op.create_index( + op.f("ix_rbac_user_role_role_id"), "rbac_user_role", ["role_id"], unique=False + ) + + # Create rbac_role_constraint table + op.create_table( + "rbac_role_constraint", + sa.Column("id", sa.String(length=255), nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "name", + sa.String(length=255), + nullable=False, + comment="Human-readable name for this constraint", + ), + sa.Column( + "constraint_type", + sa.String(length=50), + nullable=False, + comment="Type of constraint: static_sod, dynamic_sod, or cardinality", + ), + sa.Column( + "role_id_1", + sa.String(length=255), + nullable=False, + comment="First role in the constraint (required for all types)", + ), + sa.Column( + "role_id_2", + sa.String(length=255), + nullable=True, + comment="Second role in the constraint (required for SoD, NULL for cardinality)", + ), + sa.Column( + "max_users", + sa.Integer(), + nullable=True, + comment="Maximum number of users for cardinality constraint", + ), + sa.Column( + "description", + sa.Text(), + nullable=True, + comment="Description of why this constraint exists", + ), + sa.Column( + "is_active", + sa.Boolean(), + server_default="true", + nullable=False, + comment="Whether this constraint is currently enforced", + ), + sa.ForeignKeyConstraint( + ["role_id_1"], + ["rbac_role.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["role_id_2"], + ["rbac_role.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_rbac_role_constraint_id"), + "rbac_role_constraint", + ["id"], + unique=False, + ) + op.create_index( + op.f("ix_rbac_role_constraint_role_id_1"), + "rbac_role_constraint", + ["role_id_1"], + unique=False, + ) + op.create_index( + op.f("ix_rbac_role_constraint_role_id_2"), + "rbac_role_constraint", + ["role_id_2"], + unique=False, + ) + + +def downgrade(): + # Drop tables in reverse order of creation + op.drop_index( + op.f("ix_rbac_role_constraint_role_id_2"), table_name="rbac_role_constraint" + ) + op.drop_index( + op.f("ix_rbac_role_constraint_role_id_1"), table_name="rbac_role_constraint" + ) + op.drop_index(op.f("ix_rbac_role_constraint_id"), table_name="rbac_role_constraint") + op.drop_table("rbac_role_constraint") + + op.drop_index(op.f("ix_rbac_user_role_role_id"), table_name="rbac_user_role") + op.drop_index(op.f("ix_rbac_user_role_user_id"), table_name="rbac_user_role") + op.drop_index(op.f("ix_rbac_user_role_id"), table_name="rbac_user_role") + op.drop_table("rbac_user_role") + + op.drop_table("rbac_role_permission") + + op.drop_index( + op.f("ix_rbac_permission_resource_type"), table_name="rbac_permission" + ) + op.drop_index(op.f("ix_rbac_permission_code"), table_name="rbac_permission") + op.drop_index(op.f("ix_rbac_permission_id"), table_name="rbac_permission") + op.drop_table("rbac_permission") + + op.drop_index(op.f("ix_rbac_role_parent_role_id"), table_name="rbac_role") + op.drop_index(op.f("ix_rbac_role_key"), table_name="rbac_role") + op.drop_index(op.f("ix_rbac_role_id"), table_name="rbac_role") + op.drop_table("rbac_role") diff --git a/src/fides/api/alembic/migrations/versions/xx_2026_01_31_1100_f5f526cbc35a_seed_rbac_defaults.py b/src/fides/api/alembic/migrations/versions/xx_2026_01_31_1100_f5f526cbc35a_seed_rbac_defaults.py new file mode 100644 index 00000000000..7f71c16cc11 --- /dev/null +++ b/src/fides/api/alembic/migrations/versions/xx_2026_01_31_1100_f5f526cbc35a_seed_rbac_defaults.py @@ -0,0 +1,432 @@ +"""Seed default RBAC roles and permissions from existing role definitions + +Revision ID: f5f526cbc35a +Revises: d9ee4ea46797 +Create Date: 2026-01-31 11:00:00.000000 + +This migration seeds the RBAC tables with: +1. All permissions from the existing SCOPE_REGISTRY +2. All roles from the existing ROLES_TO_SCOPES_MAPPING +3. Role-permission mappings based on existing role definitions + +This ensures backward compatibility when switching to the new RBAC system. +""" + +from uuid import uuid4 + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = "f5f526cbc35a" +down_revision = "d9ee4ea46797" +branch_labels = None +depends_on = None + +# Scope definitions from scope_registry.py +# We hardcode these to avoid import issues during migration +SCOPE_DOCS = { + "config:read": "View the configuration", + "config:update": "Update the configuration", + "cli-objects:create": "Create objects via the command line interface", + "cli-objects:read": "Read objects via the command line interface", + "cli-objects:update": "Update objects via the command line interface", + "cli-objects:delete": "Delete objects via the command line interface", + "client:create": "Create OAuth clients", + "client:delete": "Remove OAuth clients", + "client:read": "View current scopes for OAuth clients", + "client:update": "Modify existing scopes for OAuth clients", + "connection:create_or_update": "Create or modify connections", + "connection:delete": "Remove connections", + "connection:read": "View connections", + "connection:authorize": "OAuth2 Authorization", + "connection_type:read": "View types of connections", + "connector_template:register": "Register a connector template", + "connector_template:read": "View connector template configurations", + "consent:read": "Read consent preferences", + "consent_settings:read": "Read org-wide consent settings", + "consent_settings:update": "Update org-wide consent settings", + "ctl_dataset:create": "Create a ctl dataset", + "ctl_dataset:read": "Read ctl datasets", + "ctl_dataset:delete": "Delete a ctl dataset", + "ctl_dataset:update": "Update ctl datasets", + "ctl_policy:create": "Create a ctl policy", + "ctl_policy:read": "Read ctl policies", + "ctl_policy:delete": "Delete a ctl policy", + "ctl_policy:update": "Update ctl policies", + "current-privacy-preference:read": "Read the current privacy preferences of all users", + "database:reset": "Reset the application database", + "data_category:create": "Create a data category", + "data_category:delete": "Delete data categories", + "data_category:read": "Read data categories", + "data_category:update": "Update data categories", + "data_subject:create": "Create a data subject", + "data_subject:read": "Read data subjects", + "data_subject:delete": "Delete data subjects", + "data_subject:update": "Update data subjects", + "data_use:create": "Create a data use", + "data_use:read": "Read data uses", + "data_use:delete": "Delete data uses", + "data_use:update": "Update data uses", + "dataset:create_or_update": "Create or modify datasets", + "dataset:delete": "Delete datasets", + "privacy-request-redaction-patterns:read": "View privacy request redaction patterns", + "privacy-request-redaction-patterns:update": "Update privacy request redaction patterns", + "dataset:read": "View datasets", + "dataset:test": "Run a standalone privacy request test for a dataset", + "encryption:exec": "Encrypt data", + "heap_dump:exec": "Execute a heap dump for memory diagnostics", + "messaging-template:update": "Update messaging templates", + "evaluation:create": "Create evaluation", + "evaluation:read": "Read evaluations", + "evaluation:delete": "Delete evaluations", + "evaluation:update": "Update evaluations", + "fides_taxonomy:update": "Update default fides taxonomy description", + "generate:exec": "", + "masking:exec": "Execute a masking strategy", + "masking:read": "Read masking strategies", + "messaging:create_or_update": "", + "messaging:delete": "", + "messaging:read": "", + "organization:create": "Create organization", + "organization:read": "Read organization details", + "organization:delete": "Delete organization", + "organization:update": "Update organization details", + "policy:create_or_update": "Create or modify policies", + "policy:delete": "Remove policies", + "policy:read": "View policies", + "privacy-experience:create": "Create privacy experiences", + "privacy-experience:update": "Update privacy experiences", + "privacy-experience:read": "View privacy experiences", + "privacy-notice:create": "Create privacy notices", + "privacy-notice:update": "Update privacy notices", + "privacy-notice:read": "View privacy notices", + "privacy-preference-history:read": "Read the history of all saved privacy preferences", + "privacy-request:create": "", + "privacy-request:resume": "Restart paused privacy requests", + "privacy-request-access-results:read": "Download access data for the privacy request", + "privacy-request:delete": "Remove privacy requests", + "privacy-request-email-integrations:send": "Send email for email integrations for the privacy request", + "privacy-request:manual-steps:respond": "Respond to manual steps for the privacy request", + "privacy-request:manual-steps:review": "Review manual steps for the privacy request", + "privacy-request-notifications:create_or_update": "", + "privacy-request-notifications:read": "", + "privacy-request:read": "View privacy requests", + "privacy-request:review": "Review privacy requests", + "privacy-request:transfer": "Transfer privacy requests", + "privacy-request:upload_data": "Manually upload data for the privacy request", + "privacy-request:view_data": "View subject data related to the privacy request", + "rule:create_or_update": "Create or update rules", + "rule:delete": "Remove rules", + "rule:read": "View rules", + "saas_config:create_or_update": "Create or update SaaS configurations", + "saas_config:delete": "Remove SaaS configurations", + "saas_config:read": "View SaaS configurations", + "connection:instantiate": "Instantiate a SaaS connection config from a connector template", + "scope:read": "View authorization scopes", + "storage:create_or_update": "Create or update storage", + "storage:delete": "Remove storage", + "storage:read": "View storage", + "system:create": "Create systems", + "system:read": "Read systems", + "system:delete": "Delete systems", + "system:update": "Update systems", + "system_manager:read": "Read systems users can manage", + "system_manager:delete": "Delete systems user can manage", + "system_manager:update": "Update systems user can manage", + "user:create": "Create users", + "user:update": "Update users", + "user:delete": "Remove users", + "user:read": "View users", + "user:read-own": "View own user", + "user:password-reset": "Reset another user's password", + "user-permission:create": "Create user permissions", + "user-permission:update": "Update user permissions", + "user-permission:assign_owners": "Assign the owner role to a user", + "user-permission:read": "View user permissions", + "validate:exec": "", + "webhook:create_or_update": "Create or update web hooks", + "webhook:delete": "Remove web hooks", + "webhook:read": "View web hooks", + "worker-stats:read": "View worker statistics", + "backfill:exec": "Execute database backfill operations", + # RBAC Management scopes - for managing the RBAC system itself + "rbac_role:create": "Create custom roles", + "rbac_role:read": "Read role definitions", + "rbac_role:update": "Update role definitions", + "rbac_role:delete": "Delete custom roles", + "rbac_permission:read": "Read permission definitions", + "rbac_user_role:create": "Assign roles to users", + "rbac_user_role:read": "Read user role assignments", + "rbac_user_role:update": "Update user role assignments", + "rbac_user_role:delete": "Remove roles from users", + "rbac_constraint:create": "Create role constraints", + "rbac_constraint:read": "Read role constraints", + "rbac_constraint:update": "Update role constraints", + "rbac_constraint:delete": "Delete role constraints", + "rbac:evaluate": "Evaluate user permissions", +} + +# RBAC management scopes - only Owner should have these +RBAC_MANAGEMENT_SCOPES = [ + "rbac_role:create", + "rbac_role:read", + "rbac_role:update", + "rbac_role:delete", + "rbac_permission:read", + "rbac_user_role:create", + "rbac_user_role:read", + "rbac_user_role:update", + "rbac_user_role:delete", + "rbac_constraint:create", + "rbac_constraint:read", + "rbac_constraint:update", + "rbac_constraint:delete", + "rbac:evaluate", +] + +# Role definitions from roles.py +LEGACY_ROLES = { + "owner": { + "name": "Owner", + "description": "Full admin access to all features and settings", + "priority": 100, + }, + "contributor": { + "name": "Contributor", + "description": "Can create and edit most resources but cannot configure storage, messaging, or assign owner role", + "priority": 80, + }, + "viewer_and_approver": { + "name": "Viewer & Approver", + "description": "Read-only access to the data map with ability to approve privacy requests", + "priority": 60, + }, + "viewer": { + "name": "Viewer", + "description": "Read-only access to the data map and system configuration", + "priority": 40, + }, + "approver": { + "name": "Approver", + "description": "Can review and approve privacy requests only", + "priority": 30, + }, + "respondent": { + "name": "Internal Respondent", + "description": "Internal user who can respond to assigned manual task steps", + "priority": 20, + }, + "external_respondent": { + "name": "External Respondent", + "description": "External user with minimal access to respond to assigned manual task steps", + "priority": 10, + }, +} + +# Approver scopes +APPROVER_SCOPES = [ + "privacy-request:review", + "privacy-request:read", + "privacy-request:resume", + "privacy-request:upload_data", + "privacy-request:view_data", + "privacy-request:delete", + "privacy-request:create", + "user:read", + "privacy-request:manual-steps:review", +] + +# Viewer scopes +VIEWER_SCOPES = [ + "cli-objects:read", + "client:read", + "connection:read", + "consent:read", + "consent_settings:read", + "connection_type:read", + "ctl_dataset:read", + "data_category:read", + "ctl_policy:read", + "dataset:read", + "data_subject:read", + "data_use:read", + "evaluation:read", + "masking:exec", + "masking:read", + "organization:read", + "policy:read", + "privacy-experience:read", + "privacy-notice:read", + "privacy-request-notifications:read", + "rule:read", + "scope:read", + "storage:read", + "system:read", + "messaging:read", + "webhook:read", + "system_manager:read", + "saas_config:read", + "user:read", +] + +# Respondent scopes +RESPONDENT_SCOPES = [ + "privacy-request:manual-steps:respond", + "user:read-own", +] + +# External respondent scopes +EXTERNAL_RESPONDENT_SCOPES = [ + "privacy-request:manual-steps:respond", +] + +# Scopes excluded from contributor +NOT_CONTRIBUTOR_SCOPES = [ + "connector_template:register", + "storage:create_or_update", + "storage:delete", + "messaging:create_or_update", + "messaging:delete", + "privacy-request-notifications:create_or_update", + "privacy-request-email-integrations:send", + "user-permission:assign_owners", + "heap_dump:exec", +] + + +def get_resource_type_from_scope(scope: str) -> str | None: + """Extract resource type from scope code.""" + if ":" not in scope: + return None + resource = scope.split(":")[0] + # Normalize common patterns + resource_map = { + "privacy-request": "privacy_request", + "cli-objects": "cli_objects", + "privacy-experience": "privacy_experience", + "privacy-notice": "privacy_notice", + "user-permission": "user_permission", + "privacy-request-redaction-patterns": "privacy_request", + "privacy-request-notifications": "privacy_request", + "privacy-request-email-integrations": "privacy_request", + "privacy-request-access-results": "privacy_request", + "current-privacy-preference": "consent", + "privacy-preference-history": "consent", + "messaging-template": "messaging", + "worker-stats": "worker", + "heap_dump": "system", + } + return resource_map.get(resource, resource) + + +def upgrade(): + connection = op.get_bind() + + # Step 1: Seed permissions from SCOPE_DOCS + permission_ids = {} # code -> id mapping + for scope_code, description in SCOPE_DOCS.items(): + permission_id = f"per_{uuid4()}" + permission_ids[scope_code] = permission_id + resource_type = get_resource_type_from_scope(scope_code) + + connection.execute( + text( + """ + INSERT INTO rbac_permission (id, code, description, resource_type, is_active) + VALUES (:id, :code, :description, :resource_type, true) + """ + ), + { + "id": permission_id, + "code": scope_code, + "description": description or f"Permission for {scope_code}", + "resource_type": resource_type, + }, + ) + + # Step 2: Seed roles + role_ids = {} # key -> id mapping + for role_key, role_data in LEGACY_ROLES.items(): + role_id = f"rol_{uuid4()}" + role_ids[role_key] = role_id + + connection.execute( + text( + """ + INSERT INTO rbac_role (id, key, name, description, is_system_role, is_active, priority) + VALUES (:id, :key, :name, :description, true, true, :priority) + """ + ), + { + "id": role_id, + "key": role_key, + "name": role_data["name"], + "description": role_data["description"], + "priority": role_data["priority"], + }, + ) + + # Step 3: Create role-permission mappings + + # Helper to create role-permission mapping + def create_role_permission_mapping(role_key: str, scope_codes: list[str]): + role_id = role_ids[role_key] + for scope_code in scope_codes: + if scope_code in permission_ids: + connection.execute( + text( + """ + INSERT INTO rbac_role_permission (role_id, permission_id) + VALUES (:role_id, :permission_id) + """ + ), + { + "role_id": role_id, + "permission_id": permission_ids[scope_code], + }, + ) + + # Owner gets all permissions + create_role_permission_mapping("owner", list(SCOPE_DOCS.keys())) + + # Viewer & Approver gets viewer + approver scopes + viewer_and_approver_scopes = list(set(VIEWER_SCOPES + APPROVER_SCOPES)) + create_role_permission_mapping("viewer_and_approver", viewer_and_approver_scopes) + + # Viewer gets viewer scopes + create_role_permission_mapping("viewer", VIEWER_SCOPES) + + # Approver gets approver scopes + create_role_permission_mapping("approver", APPROVER_SCOPES) + + # Contributor gets all scopes except NOT_CONTRIBUTOR_SCOPES + contributor_scopes = [ + s for s in SCOPE_DOCS.keys() if s not in NOT_CONTRIBUTOR_SCOPES + ] + create_role_permission_mapping("contributor", contributor_scopes) + + # Respondent gets respondent scopes + create_role_permission_mapping("respondent", RESPONDENT_SCOPES) + + # External respondent gets external respondent scopes + create_role_permission_mapping("external_respondent", EXTERNAL_RESPONDENT_SCOPES) + + # Step 4: Create default constraints + + # Approver and System Manager are mutually exclusive (static SoD) + # This mirrors the existing behavior in the codebase + # Note: We'll add this constraint once we verify the roles exist + + +def downgrade(): + connection = op.get_bind() + + # Delete all seeded data (system roles and their mappings) + # Cascading deletes will handle rbac_role_permission + connection.execute( + text("DELETE FROM rbac_role WHERE is_system_role = true") + ) + + # Delete all permissions (they were seeded) + connection.execute(text("DELETE FROM rbac_permission")) diff --git a/src/fides/api/alembic/migrations/versions/xx_2026_02_01_0900_9f6555f12ad1_add_rbac_management_scopes.py b/src/fides/api/alembic/migrations/versions/xx_2026_02_01_0900_9f6555f12ad1_add_rbac_management_scopes.py new file mode 100644 index 00000000000..288d7ccf2c7 --- /dev/null +++ b/src/fides/api/alembic/migrations/versions/xx_2026_02_01_0900_9f6555f12ad1_add_rbac_management_scopes.py @@ -0,0 +1,143 @@ +"""Add RBAC management scopes to permission table and assign to Owner role + +Revision ID: 9f6555f12ad1 +Revises: f5f526cbc35a +Create Date: 2026-02-01 09:00:00.000000 + +This migration adds the RBAC management scopes (rbac_role:read, etc.) +to the rbac_permission table and assigns them to the Owner role. +These scopes are required for the RBAC management UI to function. +""" + +from uuid import uuid4 + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = "9f6555f12ad1" +down_revision = "f5f526cbc35a" +branch_labels = None +depends_on = None + +# RBAC Management scopes - these allow managing the RBAC system itself +RBAC_MANAGEMENT_SCOPES = { + "rbac_role:create": "Create custom roles", + "rbac_role:read": "Read role definitions", + "rbac_role:update": "Update role definitions", + "rbac_role:delete": "Delete custom roles", + "rbac_permission:read": "Read permission definitions", + "rbac_user_role:create": "Assign roles to users", + "rbac_user_role:read": "Read user role assignments", + "rbac_user_role:update": "Update user role assignments", + "rbac_user_role:delete": "Remove roles from users", + "rbac_constraint:create": "Create role constraints", + "rbac_constraint:read": "Read role constraints", + "rbac_constraint:update": "Update role constraints", + "rbac_constraint:delete": "Delete role constraints", + "rbac:evaluate": "Evaluate user permissions", +} + + +def get_resource_type_from_scope(scope: str) -> str | None: + """Extract resource type from scope code.""" + if ":" not in scope: + return None + resource = scope.split(":")[0] + return resource + + +def upgrade(): + """Add RBAC management scopes and assign to Owner role.""" + conn = op.get_bind() + + # Get Owner role ID + result = conn.execute( + text("SELECT id FROM rbac_role WHERE key = 'owner'") + ).fetchone() + if not result: + # Owner role doesn't exist - skip (fresh install will have scopes from seed) + return + owner_role_id = result[0] + + # Add each RBAC management scope + for scope_code, description in RBAC_MANAGEMENT_SCOPES.items(): + # Check if permission already exists + existing = conn.execute( + text("SELECT id FROM rbac_permission WHERE code = :code"), + {"code": scope_code}, + ).fetchone() + + if existing: + permission_id = existing[0] + else: + # Create permission + permission_id = f"rba_{uuid4()}" + resource_type = get_resource_type_from_scope(scope_code) + conn.execute( + text( + """ + INSERT INTO rbac_permission (id, code, description, resource_type, is_active, created_at, updated_at) + VALUES (:id, :code, :description, :resource_type, true, now(), now()) + """ + ), + { + "id": permission_id, + "code": scope_code, + "description": description, + "resource_type": resource_type, + }, + ) + + # Check if role-permission mapping exists + existing_mapping = conn.execute( + text( + """ + SELECT 1 FROM rbac_role_permission + WHERE role_id = :role_id AND permission_id = :permission_id + """ + ), + {"role_id": owner_role_id, "permission_id": permission_id}, + ).fetchone() + + if not existing_mapping: + # Assign permission to Owner role + conn.execute( + text( + """ + INSERT INTO rbac_role_permission (role_id, permission_id, created_at) + VALUES (:role_id, :permission_id, now()) + """ + ), + { + "role_id": owner_role_id, + "permission_id": permission_id, + }, + ) + + +def downgrade(): + """Remove RBAC management scopes.""" + conn = op.get_bind() + + for scope_code in RBAC_MANAGEMENT_SCOPES.keys(): + # Get permission ID + result = conn.execute( + text("SELECT id FROM rbac_permission WHERE code = :code"), + {"code": scope_code}, + ).fetchone() + + if result: + permission_id = result[0] + + # Remove role-permission mappings + conn.execute( + text("DELETE FROM rbac_role_permission WHERE permission_id = :id"), + {"id": permission_id}, + ) + + # Remove permission + conn.execute( + text("DELETE FROM rbac_permission WHERE id = :id"), + {"id": permission_id}, + ) diff --git a/src/fides/api/alembic/migrations/versions/xx_2026_02_01_1000_a8b9c0d1e2f3_seed_fidesplus_scopes.py b/src/fides/api/alembic/migrations/versions/xx_2026_02_01_1000_a8b9c0d1e2f3_seed_fidesplus_scopes.py new file mode 100644 index 00000000000..c5e42ca3e4e --- /dev/null +++ b/src/fides/api/alembic/migrations/versions/xx_2026_02_01_1000_a8b9c0d1e2f3_seed_fidesplus_scopes.py @@ -0,0 +1,411 @@ +"""Seed fidesplus-specific scopes into RBAC tables + +Revision ID: a8b9c0d1e2f3 +Revises: 9f6555f12ad1 +Create Date: 2026-02-01 10:00:00.000000 + +This migration adds fidesplus-specific scopes (discovery_monitor, custom_field, etc.) +to the rbac_permission table and assigns them to the appropriate roles. + +These scopes are required for fidesplus features to work correctly when RBAC is enabled. +""" + +from uuid import uuid4 + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = "a8b9c0d1e2f3" +down_revision = "9f6555f12ad1" +branch_labels = None +depends_on = None + + +# Fidesplus scope definitions (excluding RBAC scopes which are in previous migration) +FIDESPLUS_SCOPE_DOCS = { + "allow_list:create": "Create an allowlist", + "allow_list:delete": "Delete an allowlist", + "allow_list:read": "Read an allowlist", + "allow_list:update": "Update an allowlist", + "attachment:create": "Create an attachment", + "attachment:delete": "Delete an attachment", + "attachment:read": "Read an attachment", + "classify_instance:create": "Kicks off a classify instance", + "classify_instance:read": "Read classify instances", + "classify_instance:update": "Updates the state of a classify instance", + "comment:create": "Create a comment", + "comment:read": "Read a comment", + "custom_field:create": "Add a custom field for a resource", + "custom_field:delete": "Delete custom fields", + "custom_field:read": "Read custom fields", + "custom_field:update": "Update a custom field for a resource", + "custom_field_definition:create": "Add a custom field definition", + "custom_field_definition:delete": "Delete a custom field definition", + "custom_field_definition:read": "Read custom field definitions", + "custom_field_definition:update": "Update a custom field definition", + "custom_report:create": "Create a custom report", + "custom_report:read": "Read custom reports", + "custom_report:delete": "Delete a custom report", + "datamap:read": "Read systems on the datamap", + "digest_config:read": "Read digest configurations", + "digest_config:create": "Create digest configurations", + "digest_config:update": "Update digest configurations", + "digest_config:delete": "Delete digest configurations", + "discovery_monitor:read": "Read discovery monitors", + "discovery_monitor:update": "Update discovery monitors", + "monitor_steward:read": "Read monitor steward assignments", + "monitor_steward:update": "Update monitor steward assignments", + "monitor_steward:delete": "Delete monitor steward assignments", + "shared_monitor_config:create": "Create shared monitor configs", + "shared_monitor_config:read": "Read shared monitor configs", + "shared_monitor_config:update": "Update shared monitor configs", + "shared_monitor_config:delete": "Delete shared monitor configs", + "gvl:update": "Update the GVL vendor list", + "system_scan:create": "Fetch the results of a new system scan", + "system_scan:read": "Read a system scan", + "fides_cloud_config:read": "Read the Fides Cloud config", + "fides_cloud_config:update": "Update the Fides Cloud config", + "system_history:read": "Read a system's change history", + "custom_asset:update": "Update a custom asset", + "endpoint_cache:update": "Update endpoint caches", + "tcf_publisher_override:read": "Read global TCF purpose overrides", + "tcf_publisher_override:update": "Patch global TCF purpose overrides", + "location:read": "Read locations and regulations", + "location:update": "Update locations and regulations", + "language:read": "Read available languages", + "openid_provider:create": "Create OpenID providers", + "openid_provider:read": "Read OpenID providers", + "openid_provider:update": "Update OpenID providers", + "openid_provider:delete": "Delete OpenID providers", + "property:create": "Create properties", + "property:read": "Read properties", + "property:update": "Update properties", + "property:delete": "Delete properties", + "privacy_center_config:read": "Read the Privacy Center config", + "privacy_center_config:update": "Update the Privacy Center config", + "privacy_experience_cache:delete": "Delete all privacy experiences in the db cache", + "privacy_experience_cache:read": "Read the privacy experience cache", + "privacy_experience_cache:update": "Update (refresh) the privacy experience cache", + "privacy_preferences:create": "Submit a privacy preference update with pre-verified identities", + "respondent:create": "Create a respondent", + "consent_webhook:post": "Post to an integration's consent webhook", + "consent_webhook_token:create": "Create a token for consent webhook", + "system_group:create": "Create system groups", + "system_group:read": "Read system groups", + "system_group:update": "Update system groups", + "system_group:delete": "Delete system groups", + "taxonomy:create": "Create custom taxonomies and taxonomy elements", + "taxonomy:read": "Read custom taxonomies and taxonomy elements", + "taxonomy:update": "Update custom taxonomies and taxonomy elements", + "taxonomy:delete": "Delete custom taxonomies and taxonomy elements", + "tcf_configurations:create": "Create a TCF configuration", + "tcf_configurations:read": "Read TCF configurations", + "tcf_configurations:update": "Update a TCF configuration", + "tcf_configurations:delete": "Delete a TCF configuration", + "tcf_publisher_restrictions:create": "Create a TCF publisher restriction", + "tcf_publisher_restrictions:read": "Read TCF publisher restrictions", + "tcf_publisher_restrictions:update": "Update a TCF publisher restriction", + "tcf_publisher_restrictions:delete": "Delete a TCF publisher restriction", + "manual_field:read-all": "Read all manual task fields across assignees", + "manual_field:read-own": "Read only manual task fields assigned to the caller", + "identity_definition:create": "Create identity definitions", + "identity_definition:read": "Read identity definitions", + "identity_definition:delete": "Delete identity definitions", + "taxonomy_history:read": "Read taxonomy history (audit log entries)", + "datahub:sync": "Sync data with DataHub", + "chat_provider:read": "Read chat provider settings", + "chat_provider:update": "Update chat provider settings", + "privacy_assessment:read": "Read privacy assessments", + "privacy_assessment:create": "Create privacy assessments", + "privacy_assessment:update": "Update privacy assessments", + "privacy_assessment:delete": "Delete privacy assessments", +} + +# Role to fidesplus scopes mapping +ROLE_SCOPES = { + "owner": [ + "allow_list:create", "allow_list:delete", "allow_list:read", "allow_list:update", + "attachment:create", "attachment:delete", "attachment:read", + "classify_instance:create", "classify_instance:read", "classify_instance:update", + "comment:create", "comment:read", + "consent_webhook:post", "consent_webhook_token:create", + "custom_field:create", "custom_field:delete", "custom_field:read", "custom_field:update", + "custom_field_definition:create", "custom_field_definition:delete", "custom_field_definition:read", "custom_field_definition:update", + "custom_report:create", "custom_report:read", "custom_report:delete", + "datahub:sync", "datamap:read", + "digest_config:read", "digest_config:create", "digest_config:update", "digest_config:delete", + "discovery_monitor:read", "discovery_monitor:update", + "monitor_steward:read", "monitor_steward:update", "monitor_steward:delete", + "taxonomy_history:read", + "shared_monitor_config:create", "shared_monitor_config:read", "shared_monitor_config:update", "shared_monitor_config:delete", + "gvl:update", + "system_scan:create", "system_scan:read", + "fides_cloud_config:read", "fides_cloud_config:update", + "system_history:read", + "custom_asset:update", + "tcf_publisher_override:read", "tcf_publisher_override:update", + "endpoint_cache:update", + "location:update", "location:read", + "language:read", + "openid_provider:create", "openid_provider:read", "openid_provider:update", "openid_provider:delete", + "property:create", "property:read", "property:update", "property:delete", + "privacy_center_config:read", "privacy_center_config:update", + "privacy_experience_cache:delete", "privacy_experience_cache:read", "privacy_experience_cache:update", + "privacy_preferences:create", + "respondent:create", + "system_group:create", "system_group:read", "system_group:update", "system_group:delete", + "taxonomy:create", "taxonomy:read", "taxonomy:update", "taxonomy:delete", + "tcf_configurations:create", "tcf_configurations:read", "tcf_configurations:update", "tcf_configurations:delete", + "tcf_publisher_restrictions:create", "tcf_publisher_restrictions:read", "tcf_publisher_restrictions:update", "tcf_publisher_restrictions:delete", + "manual_field:read-all", + "identity_definition:create", "identity_definition:read", "identity_definition:delete", + "chat_provider:read", "chat_provider:update", + "privacy_assessment:read", "privacy_assessment:create", "privacy_assessment:update", "privacy_assessment:delete", + ], + "contributor": [ + "allow_list:create", "allow_list:delete", "allow_list:read", "allow_list:update", + "attachment:create", "attachment:delete", "attachment:read", + "classify_instance:create", "classify_instance:read", "classify_instance:update", + "comment:create", "comment:read", + "consent_webhook:post", + "custom_field:create", "custom_field:delete", "custom_field:read", "custom_field:update", + "custom_field_definition:create", "custom_field_definition:delete", "custom_field_definition:read", "custom_field_definition:update", + "custom_report:create", "custom_report:read", "custom_report:delete", + "datahub:sync", "datamap:read", + "digest_config:read", "digest_config:create", "digest_config:update", "digest_config:delete", + "discovery_monitor:read", "discovery_monitor:update", + "monitor_steward:read", "monitor_steward:update", "monitor_steward:delete", + "shared_monitor_config:create", "shared_monitor_config:read", "shared_monitor_config:update", "shared_monitor_config:delete", + "gvl:update", + "system_scan:create", "system_scan:read", + "system_history:read", + "fides_cloud_config:read", + "custom_asset:update", + "tcf_publisher_override:read", "tcf_publisher_override:update", + "endpoint_cache:update", + "location:update", "location:read", + "language:read", + "property:create", "property:read", "property:update", "property:delete", + "privacy_center_config:read", "privacy_center_config:update", + "privacy_experience_cache:delete", "privacy_experience_cache:read", "privacy_experience_cache:update", + "privacy_preferences:create", + "system_group:create", "system_group:read", "system_group:update", "system_group:delete", + "taxonomy:create", "taxonomy:read", "taxonomy:update", "taxonomy:delete", + "tcf_configurations:create", "tcf_configurations:read", "tcf_configurations:update", "tcf_configurations:delete", + "tcf_publisher_restrictions:create", "tcf_publisher_restrictions:read", "tcf_publisher_restrictions:update", "tcf_publisher_restrictions:delete", + "manual_field:read-all", + "identity_definition:create", "identity_definition:read", "identity_definition:delete", + "chat_provider:read", "chat_provider:update", + ], + "viewer_and_approver": [ + "allow_list:read", + "attachment:create", "attachment:delete", "attachment:read", + "classify_instance:read", + "comment:create", "comment:read", + "custom_field_definition:read", + "custom_field:read", + "datamap:read", + "discovery_monitor:read", + "monitor_steward:read", + "shared_monitor_config:read", + "system_scan:read", + "system_history:read", + "property:read", + "privacy_center_config:read", + "respondent:create", + "manual_field:read-all", + "system_group:read", + "taxonomy:read", + "identity_definition:read", + "taxonomy_history:read", + ], + "viewer": [ + "allow_list:read", + "classify_instance:read", + "custom_field_definition:read", + "custom_field:read", + "datamap:read", + "discovery_monitor:read", + "monitor_steward:read", + "shared_monitor_config:read", + "system_scan:read", + "system_history:read", + "property:read", + "privacy_center_config:read", + "manual_field:read-own", + "system_group:read", + "taxonomy:read", + "identity_definition:read", + "taxonomy_history:read", + ], + "approver": [ + "attachment:create", "attachment:delete", "attachment:read", + "comment:create", "comment:read", + "manual_field:read-all", + "privacy_center_config:read", + ], + "respondent": [ + "manual_field:read-own", + ], + "external_respondent": [ + "manual_field:read-own", + ], +} + + +def get_resource_type_from_scope(scope: str) -> str | None: + """Extract resource type from scope code.""" + if ":" not in scope: + return None + resource = scope.split(":")[0] + # Normalize common patterns + resource_map = { + "discovery_monitor": "discovery_monitor", + "custom_field": "custom_field", + "custom_field_definition": "custom_field", + "system_scan": "system", + "system_history": "system", + "system_group": "system", + "datamap": "system", + "allow_list": "classification", + "classify_instance": "classification", + "taxonomy": "taxonomy", + "taxonomy_history": "taxonomy", + "privacy_center_config": "consent", + "privacy_preferences": "consent", + "privacy_experience_cache": "consent", + "tcf_configurations": "consent", + "tcf_publisher_override": "consent", + "tcf_publisher_restrictions": "consent", + "gvl": "consent", + "property": "property", + "openid_provider": "authentication", + "fides_cloud_config": "config", + "digest_config": "notifications", + "comment": "collaboration", + "attachment": "collaboration", + "custom_report": "reporting", + "custom_asset": "asset", + "consent_webhook": "webhook", + "consent_webhook_token": "webhook", + "location": "location", + "language": "localization", + "identity_definition": "identity", + "manual_field": "privacy_request", + "respondent": "privacy_request", + "monitor_steward": "discovery_monitor", + "shared_monitor_config": "discovery_monitor", + "endpoint_cache": "cache", + "datahub": "integration", + } + return resource_map.get(resource, resource) + + +def upgrade(): + """Add fidesplus scopes and assign to roles.""" + conn = op.get_bind() + + # Create a mapping of scope code to permission ID + permission_ids = {} + + # Add each fidesplus scope + for scope_code, description in FIDESPLUS_SCOPE_DOCS.items(): + # Check if permission already exists + existing = conn.execute( + text("SELECT id FROM rbac_permission WHERE code = :code"), + {"code": scope_code}, + ).fetchone() + + if existing: + permission_ids[scope_code] = existing[0] + else: + # Create permission + permission_id = f"pls_{uuid4()}" + resource_type = get_resource_type_from_scope(scope_code) + conn.execute( + text( + """ + INSERT INTO rbac_permission (id, code, description, resource_type, is_active, created_at, updated_at) + VALUES (:id, :code, :description, :resource_type, true, now(), now()) + """ + ), + { + "id": permission_id, + "code": scope_code, + "description": description, + "resource_type": resource_type, + }, + ) + permission_ids[scope_code] = permission_id + + # Get role IDs + roles = conn.execute( + text("SELECT id, key FROM rbac_role") + ).fetchall() + role_ids = {r[1]: r[0] for r in roles} + + # Assign permissions to roles + for role_key, scopes in ROLE_SCOPES.items(): + role_id = role_ids.get(role_key) + if not role_id: + # Role doesn't exist - skip + continue + + for scope_code in scopes: + permission_id = permission_ids.get(scope_code) + if not permission_id: + continue + + # Check if mapping already exists + existing_mapping = conn.execute( + text( + """ + SELECT 1 FROM rbac_role_permission + WHERE role_id = :role_id AND permission_id = :permission_id + """ + ), + {"role_id": role_id, "permission_id": permission_id}, + ).fetchone() + + if not existing_mapping: + # Create mapping + conn.execute( + text( + """ + INSERT INTO rbac_role_permission (role_id, permission_id, created_at) + VALUES (:role_id, :permission_id, now()) + """ + ), + { + "role_id": role_id, + "permission_id": permission_id, + }, + ) + + +def downgrade(): + """Remove fidesplus scopes.""" + conn = op.get_bind() + + for scope_code in FIDESPLUS_SCOPE_DOCS.keys(): + # Get permission ID + result = conn.execute( + text("SELECT id FROM rbac_permission WHERE code = :code"), + {"code": scope_code}, + ).fetchone() + + if result: + permission_id = result[0] + + # Remove role-permission mappings + conn.execute( + text("DELETE FROM rbac_role_permission WHERE permission_id = :id"), + {"id": permission_id}, + ) + + # Remove permission + conn.execute( + text("DELETE FROM rbac_permission WHERE id = :id"), + {"id": permission_id}, + ) diff --git a/src/fides/api/models/rbac/__init__.py b/src/fides/api/models/rbac/__init__.py new file mode 100644 index 00000000000..e75336bc933 --- /dev/null +++ b/src/fides/api/models/rbac/__init__.py @@ -0,0 +1,15 @@ +"""RBAC models for dynamic role-based access control.""" + +from fides.api.models.rbac.rbac_permission import RBACPermission +from fides.api.models.rbac.rbac_role import RBACRole +from fides.api.models.rbac.rbac_role_constraint import RBACRoleConstraint +from fides.api.models.rbac.rbac_role_permission import RBACRolePermission +from fides.api.models.rbac.rbac_user_role import RBACUserRole + +__all__ = [ + "RBACRole", + "RBACPermission", + "RBACRolePermission", + "RBACUserRole", + "RBACRoleConstraint", +] diff --git a/src/fides/api/models/rbac/rbac_permission.py b/src/fides/api/models/rbac/rbac_permission.py new file mode 100644 index 00000000000..c5e2277127a --- /dev/null +++ b/src/fides/api/models/rbac/rbac_permission.py @@ -0,0 +1,66 @@ +"""RBAC Permission model for storing permission definitions.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, List + +from sqlalchemy import Boolean, Column, String, Text +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import RelationshipProperty, relationship + +from fides.api.db.base_class import Base + +if TYPE_CHECKING: + from fides.api.models.rbac.rbac_role import RBACRole + + +class RBACPermission(Base): + """ + Permission definition for RBAC system. + + Permissions are the atomic units of access control. Each permission represents + a specific action that can be performed on a resource type. + + Permissions are seeded from the existing SCOPE_REGISTRY and should not be + created manually in most cases. + """ + + @declared_attr + def __tablename__(cls) -> str: + return "rbac_permission" + + code = Column( + String(255), + nullable=False, + unique=True, + index=True, + comment="Unique permission code, e.g., 'system:read', 'privacy-request:create'", + ) + description = Column( + Text, + nullable=True, + comment="Human-readable description of what this permission allows", + ) + resource_type = Column( + String(100), + nullable=True, + index=True, + comment="Resource type this permission applies to, e.g., 'system', 'privacy_request'. NULL for global permissions.", + ) + is_active = Column( + Boolean, + nullable=False, + server_default="true", + comment="Whether this permission is currently active and can be assigned", + ) + + # Relationships + roles: RelationshipProperty[List["RBACRole"]] = relationship( + "RBACRole", + secondary="rbac_role_permission", + back_populates="permissions", + lazy="selectin", + ) + + def __repr__(self) -> str: + return f"" diff --git a/src/fides/api/models/rbac/rbac_role.py b/src/fides/api/models/rbac/rbac_role.py new file mode 100644 index 00000000000..e71926bd6c0 --- /dev/null +++ b/src/fides/api/models/rbac/rbac_role.py @@ -0,0 +1,185 @@ +"""RBAC Role model for dynamic role definitions.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, List, Optional, Set + +from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import RelationshipProperty, Session, relationship + +from fides.api.db.base_class import Base + +if TYPE_CHECKING: + from fides.api.models.rbac.rbac_permission import RBACPermission + from fides.api.models.rbac.rbac_user_role import RBACUserRole + + +class RBACRole(Base): + """ + Dynamic role definition for RBAC system. + + Roles group permissions together and can form hierarchies where child roles + inherit permissions from parent roles. System roles (is_system_role=True) + are the built-in roles migrated from the legacy hardcoded system and cannot + be deleted or have their core properties modified. + + Role hierarchy is implemented via parent_role_id. When checking permissions, + a role's effective permissions include both its directly assigned permissions + and all permissions inherited from ancestor roles. + """ + + @declared_attr + def __tablename__(cls) -> str: + return "rbac_role" + + name = Column( + String(255), + nullable=False, + unique=True, + comment="Human-readable display name for the role", + ) + key = Column( + String(255), + nullable=False, + unique=True, + index=True, + comment="Machine-readable key for the role, e.g., 'owner', 'custom_auditor'", + ) + description = Column( + Text, + nullable=True, + comment="Description of the role's purpose and access level", + ) + is_system_role = Column( + Boolean, + nullable=False, + server_default="false", + comment="True for built-in system roles that cannot be deleted", + ) + is_active = Column( + Boolean, + nullable=False, + server_default="true", + comment="Whether this role can be assigned to users", + ) + parent_role_id = Column( + String(255), + ForeignKey("rbac_role.id", ondelete="SET NULL"), + nullable=True, + index=True, + comment="Parent role ID for hierarchy. Child roles inherit parent permissions.", + ) + priority = Column( + Integer, + nullable=False, + server_default="0", + comment="Priority for conflict resolution. Higher values = more privileges.", + ) + + # Self-referential relationship for hierarchy + parent_role: RelationshipProperty[Optional["RBACRole"]] = relationship( + "RBACRole", + remote_side="RBACRole.id", + backref="child_roles", + foreign_keys=[parent_role_id], + ) + + # Permissions assigned directly to this role + permissions: RelationshipProperty[List["RBACPermission"]] = relationship( + "RBACPermission", + secondary="rbac_role_permission", + back_populates="roles", + lazy="selectin", + ) + + # User assignments for this role + user_assignments: RelationshipProperty[List["RBACUserRole"]] = relationship( + "RBACUserRole", + back_populates="role", + cascade="all, delete-orphan", + lazy="dynamic", + ) + + def __repr__(self) -> str: + return f"" + + def get_all_permissions(self, db: Session) -> List["RBACPermission"]: + """ + Get all permissions for this role, including inherited from parent roles. + + This method traverses the role hierarchy and collects all permissions + from this role and all ancestor roles. + + Args: + db: Database session (needed for lazy-loaded relationships) + + Returns: + List of unique RBACPermission objects + """ + seen_permission_ids: Set[str] = set() + all_permissions: List["RBACPermission"] = [] + + # Add direct permissions + for permission in self.permissions: + if permission.id not in seen_permission_ids and permission.is_active: + seen_permission_ids.add(permission.id) + all_permissions.append(permission) + + # Add inherited permissions from parent hierarchy + if self.parent_role: + for permission in self.parent_role.get_all_permissions(db): + if permission.id not in seen_permission_ids and permission.is_active: + seen_permission_ids.add(permission.id) + all_permissions.append(permission) + + return all_permissions + + def get_permission_codes(self, db: Session) -> List[str]: + """ + Get all permission codes for this role, including inherited. + + Args: + db: Database session + + Returns: + List of unique permission code strings + """ + return [p.code for p in self.get_all_permissions(db)] + + def get_direct_permission_codes(self) -> List[str]: + """ + Get only the directly assigned permission codes (not inherited). + + Returns: + List of permission code strings directly assigned to this role + """ + return [p.code for p in self.permissions if p.is_active] + + def get_inherited_permission_codes(self, db: Session) -> List[str]: + """ + Get only the inherited permission codes (from parent roles). + + Args: + db: Database session + + Returns: + List of permission codes inherited from parent roles + """ + direct_codes = set(self.get_direct_permission_codes()) + all_codes = set(self.get_permission_codes(db)) + return list(all_codes - direct_codes) + + def get_ancestor_roles(self) -> List["RBACRole"]: + """ + Get all ancestor roles in the hierarchy (parent, grandparent, etc.). + + Returns: + List of ancestor RBACRole objects, starting with immediate parent + """ + ancestors: List["RBACRole"] = [] + current = self.parent_role + while current: + ancestors.append(current) + current = current.parent_role + return ancestors diff --git a/src/fides/api/models/rbac/rbac_role_constraint.py b/src/fides/api/models/rbac/rbac_role_constraint.py new file mode 100644 index 00000000000..42c1a0359a9 --- /dev/null +++ b/src/fides/api/models/rbac/rbac_role_constraint.py @@ -0,0 +1,147 @@ +"""RBAC Role Constraint model for separation of duties and cardinality constraints.""" + +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Optional + +from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import RelationshipProperty, relationship + +from fides.api.db.base_class import Base + +if TYPE_CHECKING: + from fides.api.models.rbac.rbac_role import RBACRole + + +class ConstraintType(str, Enum): + """Types of RBAC constraints.""" + + STATIC_SOD = "static_sod" + """ + Static Separation of Duties: Users cannot be assigned both roles simultaneously. + Checked at role assignment time. + """ + + DYNAMIC_SOD = "dynamic_sod" + """ + Dynamic Separation of Duties: Users cannot activate both roles in the same session. + Checked at runtime/permission evaluation time. + """ + + CARDINALITY = "cardinality" + """ + Cardinality constraint: Limits the number of users that can be assigned a role. + """ + + +class RBACRoleConstraint(Base): + """ + Separation of duties and cardinality constraints for RBAC. + + Constraints enforce security policies: + - Static SoD: Prevents users from holding conflicting roles + - Dynamic SoD: Prevents users from using conflicting roles in same session + - Cardinality: Limits how many users can have a role + + Examples: + - Static SoD: approver and contributor cannot be held by same user + - Cardinality: Only 3 users can be owners + """ + + @declared_attr + def __tablename__(cls) -> str: + return "rbac_role_constraint" + + name = Column( + String(255), + nullable=False, + comment="Human-readable name for this constraint", + ) + constraint_type = Column( + String(50), + nullable=False, + comment="Type of constraint: static_sod, dynamic_sod, or cardinality", + ) + role_id_1 = Column( + String(255), + ForeignKey("rbac_role.id", ondelete="CASCADE"), + nullable=False, + index=True, + comment="First role in the constraint (required for all types)", + ) + role_id_2 = Column( + String(255), + ForeignKey("rbac_role.id", ondelete="CASCADE"), + nullable=True, + index=True, + comment="Second role in the constraint (required for SoD, NULL for cardinality)", + ) + max_users = Column( + Integer, + nullable=True, + comment="Maximum number of users for cardinality constraint", + ) + description = Column( + Text, + nullable=True, + comment="Description of why this constraint exists", + ) + is_active = Column( + Boolean, + nullable=False, + server_default="true", + comment="Whether this constraint is currently enforced", + ) + + # Relationships + role_1: RelationshipProperty["RBACRole"] = relationship( + "RBACRole", + foreign_keys=[role_id_1], + lazy="selectin", + ) + role_2: RelationshipProperty[Optional["RBACRole"]] = relationship( + "RBACRole", + foreign_keys=[role_id_2], + lazy="selectin", + ) + + def __repr__(self) -> str: + if self.constraint_type == ConstraintType.CARDINALITY: + return f"" + return f" '{self.role_id_2}')>" + + def is_sod_constraint(self) -> bool: + """Check if this is a separation of duties constraint.""" + return self.constraint_type in ( + ConstraintType.STATIC_SOD, + ConstraintType.DYNAMIC_SOD, + ) + + def is_cardinality_constraint(self) -> bool: + """Check if this is a cardinality constraint.""" + return self.constraint_type == ConstraintType.CARDINALITY + + def involves_role(self, role_id: str) -> bool: + """Check if a role is involved in this constraint.""" + return role_id in (self.role_id_1, self.role_id_2) + + def get_conflicting_role_id(self, role_id: str) -> Optional[str]: + """ + For SoD constraints, get the role that conflicts with the given role. + + Args: + role_id: The role to check + + Returns: + The conflicting role ID, or None if not a SoD constraint or role not involved + """ + if not self.is_sod_constraint(): + return None + + if role_id == self.role_id_1: + return self.role_id_2 + if role_id == self.role_id_2: + return self.role_id_1 + return None diff --git a/src/fides/api/models/rbac/rbac_role_permission.py b/src/fides/api/models/rbac/rbac_role_permission.py new file mode 100644 index 00000000000..28f77371e4f --- /dev/null +++ b/src/fides/api/models/rbac/rbac_role_permission.py @@ -0,0 +1,50 @@ +"""RBAC Role-Permission junction table.""" + +from sqlalchemy import Column, DateTime, ForeignKey, String +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.sql import func + +from fides.api.db.base_class import Base + + +class RBACRolePermission(Base): + """ + Junction table for role-permission assignments. + + This is a many-to-many mapping between roles and permissions. + Each row represents a permission that is directly assigned to a role. + + Note: This table does not track inherited permissions. Inheritance is + computed at runtime via the role hierarchy. + """ + + @declared_attr + def __tablename__(cls) -> str: + return "rbac_role_permission" + + # Override Base.id - junction table uses composite PK instead + id = None # type: ignore[assignment] + # Override Base.updated_at - junction table doesn't need this + updated_at = None # type: ignore[assignment] + + # Composite primary key: (role_id, permission_id) + role_id = Column( + String(255), + ForeignKey("rbac_role.id", ondelete="CASCADE"), + primary_key=True, + comment="FK to rbac_role", + ) + permission_id = Column( + String(255), + ForeignKey("rbac_permission.id", ondelete="CASCADE"), + primary_key=True, + comment="FK to rbac_permission", + ) + created_at = Column( + DateTime(timezone=True), + server_default=func.now(), + comment="When this permission was assigned to the role", + ) + + def __repr__(self) -> str: + return f"" diff --git a/src/fides/api/models/rbac/rbac_user_role.py b/src/fides/api/models/rbac/rbac_user_role.py new file mode 100644 index 00000000000..30a5495c839 --- /dev/null +++ b/src/fides/api/models/rbac/rbac_user_role.py @@ -0,0 +1,175 @@ +"""RBAC User-Role assignment model with resource scoping and temporal validity.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Optional + +from sqlalchemy import Column, DateTime, ForeignKey, String, UniqueConstraint +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import RelationshipProperty, relationship + +from fides.api.db.base_class import Base + +if TYPE_CHECKING: + from fides.api.models.fides_user import FidesUser + from fides.api.models.rbac.rbac_role import RBACRole + + +class RBACUserRole(Base): + """ + User to role assignment with optional resource scoping and temporal validity. + + This table supports: + - Global role assignments: resource_type and resource_id are NULL + - Resource-scoped assignments: permissions apply only to specific resources + - Temporal roles: valid_from/valid_until define when the assignment is active + + Examples: + - Global admin: role=owner, resource_type=NULL, resource_id=NULL + - System manager: role=system_manager, resource_type='system', resource_id='sys_123' + - Temporary access: valid_from=now, valid_until=now+30days + """ + + @declared_attr + def __tablename__(cls) -> str: + return "rbac_user_role" + + user_id = Column( + String(255), + ForeignKey("fidesuser.id", ondelete="CASCADE"), + nullable=False, + index=True, + comment="FK to fidesuser", + ) + role_id = Column( + String(255), + ForeignKey("rbac_role.id", ondelete="CASCADE"), + nullable=False, + index=True, + comment="FK to rbac_role", + ) + resource_type = Column( + String(100), + nullable=True, + comment="Resource type for scoped permissions, e.g., 'system'. NULL for global.", + ) + resource_id = Column( + String(255), + nullable=True, + comment="Specific resource ID for scoped permissions. NULL for global.", + ) + valid_from = Column( + DateTime(timezone=True), + server_default="now()", + nullable=True, + comment="When this assignment becomes active. NULL means immediately.", + ) + valid_until = Column( + DateTime(timezone=True), + nullable=True, + comment="When this assignment expires. NULL means never expires.", + ) + assigned_by = Column( + String(255), + ForeignKey("fidesuser.id", ondelete="SET NULL"), + nullable=True, + comment="User ID of who created this assignment", + ) + + # Relationships + user: RelationshipProperty["FidesUser"] = relationship( + "FidesUser", + foreign_keys=[user_id], + backref="rbac_role_assignments", + passive_deletes=True, # Let database handle CASCADE delete + ) + role: RelationshipProperty["RBACRole"] = relationship( + "RBACRole", + back_populates="user_assignments", + lazy="selectin", + ) + assigner: RelationshipProperty[Optional["FidesUser"]] = relationship( + "FidesUser", + foreign_keys=[assigned_by], + ) + + # Ensure unique combination of user, role, resource_type, resource_id + __table_args__ = ( + UniqueConstraint( + "user_id", + "role_id", + "resource_type", + "resource_id", + name="uq_rbac_user_role_assignment", + ), + ) + + def __repr__(self) -> str: + scope = ( + f"{self.resource_type}:{self.resource_id}" + if self.resource_type + else "global" + ) + return f"" + + def is_valid(self) -> bool: + """ + Check if this role assignment is currently valid based on temporal constraints. + + Returns: + True if the assignment is currently active, False otherwise + """ + now = datetime.now(timezone.utc) + + # Check valid_from + if self.valid_from: + # Ensure comparison is timezone-aware + valid_from = self.valid_from + if valid_from.tzinfo is None: + valid_from = valid_from.replace(tzinfo=timezone.utc) + if valid_from > now: + return False + + # Check valid_until + if self.valid_until: + # Ensure comparison is timezone-aware + valid_until = self.valid_until + if valid_until.tzinfo is None: + valid_until = valid_until.replace(tzinfo=timezone.utc) + if valid_until < now: + return False + + return True + + def is_global(self) -> bool: + """Check if this is a global (non-resource-scoped) assignment.""" + return self.resource_type is None and self.resource_id is None + + def matches_resource( + self, resource_type: Optional[str], resource_id: Optional[str] + ) -> bool: + """ + Check if this assignment applies to a specific resource. + + Args: + resource_type: The type of resource being accessed + resource_id: The specific resource ID being accessed + + Returns: + True if this assignment grants access to the resource + """ + # Global assignments match everything + if self.is_global(): + return True + + # Resource type must match + if self.resource_type != resource_type: + return False + + # If assignment has specific resource_id, it must match + if self.resource_id is not None: + return self.resource_id == resource_id + + # Assignment applies to all resources of this type + return True diff --git a/src/fides/api/models/sql_models.py b/src/fides/api/models/sql_models.py index c9d8d10d183..1d96563679d 100644 --- a/src/fides/api/models/sql_models.py +++ b/src/fides/api/models/sql_models.py @@ -50,6 +50,13 @@ from fides.api.models.client import ClientDetail from fides.api.models.fides_user import FidesUser from fides.api.models.fides_user_permissions import FidesUserPermissions +from fides.api.models.rbac import ( + RBACPermission, + RBACRole, + RBACRoleConstraint, + RBACRolePermission, + RBACUserRole, +) from fides.api.models.tcf_purpose_overrides import TCFPurposeOverride from fides.api.util.taxonomy_utils import find_undeclared_categories from fides.config import get_config @@ -870,6 +877,11 @@ class SystemScans(Base): "policy": PolicyCtl, "system": System, "evaluation": Evaluation, + "rbac_role": RBACRole, + "rbac_permission": RBACPermission, + "rbac_role_permission": RBACRolePermission, + "rbac_user_role": RBACUserRole, + "rbac_role_constraint": RBACRoleConstraint, } diff --git a/tests/api/models/test_rbac.py b/tests/api/models/test_rbac.py new file mode 100644 index 00000000000..d1679dfe82f --- /dev/null +++ b/tests/api/models/test_rbac.py @@ -0,0 +1,418 @@ +"""Tests for the RBAC models.""" + +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy.orm import Session + +from fides.api.models.fides_user import FidesUser +from fides.api.models.rbac import ( + RBACPermission, + RBACRole, + RBACRoleConstraint, + RBACRolePermission, + RBACUserRole, +) +from fides.api.models.rbac.rbac_role_constraint import ConstraintType + + +class TestRBACPermission: + """Tests for the RBACPermission model.""" + + def test_create_permission(self, db: Session): + """Test creating an RBACPermission record.""" + permission = RBACPermission.create( + db=db, + data={ + "code": "test:read", + "description": "Test read permission", + "resource_type": "test", + "is_active": True, + }, + check_name=False, + ) + db.commit() + + assert permission.code == "test:read" + assert permission.description == "Test read permission" + assert permission.resource_type == "test" + assert permission.is_active is True + + # Verify persistence + retrieved = ( + db.query(RBACPermission).filter(RBACPermission.code == "test:read").first() + ) + assert retrieved is not None + assert retrieved.code == "test:read" + + +class TestRBACRole: + """Tests for the RBACRole model.""" + + def test_create_custom_role(self, db: Session): + """Test creating a custom RBAC role.""" + role = RBACRole.create( + db=db, + data={ + "name": "Test Role", + "key": "test_role", + "description": "A test role for unit testing", + "priority": 10, + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + assert role.name == "Test Role" + assert role.key == "test_role" + assert role.description == "A test role for unit testing" + assert role.priority == 10 + assert role.is_system_role is False + assert role.is_active is True + + def test_create_role_with_parent(self, db: Session): + """Test creating a role with parent role hierarchy.""" + # Create parent role + parent_role = RBACRole.create( + db=db, + data={ + "name": "Parent Role", + "key": "parent_role", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + # Create child role + child_role = RBACRole.create( + db=db, + data={ + "name": "Child Role", + "key": "child_role", + "parent_role_id": parent_role.id, + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + assert child_role.parent_role_id == parent_role.id + assert child_role.parent_role.id == parent_role.id + assert parent_role.child_roles[0].id == child_role.id + + def test_role_permission_inheritance(self, db: Session): + """Test that child roles inherit permissions from parent roles.""" + # Create permission + permission = RBACPermission.create( + db=db, + data={ + "code": "inherited:permission", + "description": "Permission to be inherited", + "is_active": True, + }, + check_name=False, + ) + db.commit() + + # Create parent role with permission + parent_role = RBACRole.create( + db=db, + data={ + "name": "Inheriting Parent", + "key": "inheriting_parent", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + parent_role.permissions.append(permission) + db.commit() + + # Create child role + child_role = RBACRole.create( + db=db, + data={ + "name": "Inheriting Child", + "key": "inheriting_child", + "parent_role_id": parent_role.id, + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + # Test inheritance + all_permissions = child_role.get_all_permissions(db) + assert any(p.code == "inherited:permission" for p in all_permissions) + + # Test inherited permission codes + inherited_codes = child_role.get_inherited_permission_codes(db) + assert "inherited:permission" in inherited_codes + + +class TestRBACUserRole: + """Tests for the RBACUserRole model.""" + + def test_assign_role_to_user(self, db: Session, user: FidesUser): + """Test assigning a role to a user.""" + # Create role + role = RBACRole.create( + db=db, + data={ + "name": "User Test Role", + "key": "user_test_role", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + # Assign role to user + user_role = RBACUserRole.create( + db=db, + data={ + "user_id": user.id, + "role_id": role.id, + "assigned_by": user.id, + }, + check_name=False, + ) + db.commit() + + assert user_role.user_id == user.id + assert user_role.role_id == role.id + assert user_role.role.key == "user_test_role" + assert user_role.is_valid() is True + + def test_scoped_role_assignment(self, db: Session, user: FidesUser): + """Test assigning a role with resource scoping.""" + role = RBACRole.create( + db=db, + data={ + "name": "Scoped Role", + "key": "scoped_role", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + # Assign scoped role + user_role = RBACUserRole.create( + db=db, + data={ + "user_id": user.id, + "role_id": role.id, + "resource_type": "system", + "resource_id": "system_123", + }, + check_name=False, + ) + db.commit() + + assert user_role.resource_type == "system" + assert user_role.resource_id == "system_123" + assert user_role.is_global() is False + assert user_role.matches_resource("system", "system_123") is True + assert user_role.matches_resource("system", "system_456") is False + assert user_role.matches_resource("other", "system_123") is False + + def test_temporal_role_assignment(self, db: Session, user: FidesUser): + """Test temporal validity of role assignments.""" + role = RBACRole.create( + db=db, + data={ + "name": "Temporal Role", + "key": "temporal_role", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + now = datetime.now(timezone.utc) + + # Create future role (not yet valid) + future_role = RBACUserRole.create( + db=db, + data={ + "user_id": user.id, + "role_id": role.id, + "valid_from": now + timedelta(days=1), + }, + check_name=False, + ) + db.commit() + + assert future_role.is_valid() is False + + # Create expired role + expired_role = RBACUserRole.create( + db=db, + data={ + "user_id": user.id, + "role_id": role.id, + "valid_until": now - timedelta(days=1), + }, + check_name=False, + ) + db.commit() + + assert expired_role.is_valid() is False + + +class TestRBACRoleConstraint: + """Tests for the RBACRoleConstraint model.""" + + def test_create_sod_constraint(self, db: Session): + """Test creating a separation of duties constraint.""" + # Create two roles + role1 = RBACRole.create( + db=db, + data={ + "name": "SOD Role 1", + "key": "sod_role_1", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + role2 = RBACRole.create( + db=db, + data={ + "name": "SOD Role 2", + "key": "sod_role_2", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + # Create SOD constraint + constraint = RBACRoleConstraint.create( + db=db, + data={ + "name": "Test SOD Constraint", + "constraint_type": ConstraintType.STATIC_SOD, + "role_id_1": role1.id, + "role_id_2": role2.id, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + assert constraint.name == "Test SOD Constraint" + assert constraint.constraint_type == ConstraintType.STATIC_SOD + assert constraint.role_id_1 == role1.id + assert constraint.role_id_2 == role2.id + + def test_create_cardinality_constraint(self, db: Session): + """Test creating a cardinality constraint.""" + role = RBACRole.create( + db=db, + data={ + "name": "Cardinality Role", + "key": "cardinality_role", + "is_system_role": False, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + # Create cardinality constraint + constraint = RBACRoleConstraint.create( + db=db, + data={ + "name": "Max 5 Users", + "constraint_type": ConstraintType.CARDINALITY, + "role_id_1": role.id, + "max_users": 5, + "is_active": True, + }, + check_name=False, + ) + db.commit() + + assert constraint.constraint_type == ConstraintType.CARDINALITY + assert constraint.max_users == 5 + + +class TestRBACScopeRegistrySync: + """Tests to ensure SCOPE_REGISTRY scopes exist in the RBAC migration. + + This prevents drift where scopes are added to SCOPE_REGISTRY but not to + the RBAC migration. Such drift would cause legacy scopes to not be + available in the database-driven RBAC system. + + Note: The reverse (RBAC-only scopes that don't exist in SCOPE_REGISTRY) + is intentionally allowed - new scopes can be added directly to RBAC + without needing to exist in the legacy system. + """ + + def test_all_scopes_in_registry_have_rbac_migration(self): + """Every scope in SCOPE_REGISTRY must exist in the RBAC seed migration. + + This test parses the migration file to extract hardcoded SCOPE_DOCS keys + and verifies all SCOPE_REGISTRY scopes are present. + """ + import ast + from pathlib import Path + + from fides.common.api.scope_registry import SCOPE_REGISTRY + + # Find the seed migration file + migrations_dir = ( + Path(__file__).parent.parent.parent.parent + / "src" + / "fides" + / "api" + / "alembic" + / "migrations" + / "versions" + ) + seed_migration = None + for f in migrations_dir.glob("*seed_rbac_defaults*.py"): + seed_migration = f + break + + assert seed_migration is not None, "Could not find RBAC seed migration file" + + # Parse the migration file to extract SCOPE_DOCS keys + migration_content = seed_migration.read_text() + + # Find the SCOPE_DOCS dictionary in the migration + tree = ast.parse(migration_content) + migration_scopes = set() + + for node in ast.walk(tree): + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == "SCOPE_DOCS": + if isinstance(node.value, ast.Dict): + for key in node.value.keys: + if isinstance(key, ast.Constant): + migration_scopes.add(key.value) + + assert len(migration_scopes) > 0, "Could not parse SCOPE_DOCS from migration" + + registry_scopes = set(SCOPE_REGISTRY) + + # Check for scopes in registry but not in migration + missing_from_migration = registry_scopes - migration_scopes + assert not missing_from_migration, ( + f"The following scopes exist in SCOPE_REGISTRY but are missing from the " + f"RBAC seed migration (seed_rbac_defaults.py). Add them to SCOPE_DOCS in " + f"the migration:\n{sorted(missing_from_migration)}" + )