diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..af19793 --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# Copy this file to .env and fill in the values. +# .env is git-ignored; .env.example is committed. + +# Required — SQLAlchemy database URL +# SQLite (development): +DATABASE_URL=sqlite:///./app.db +# PostgreSQL (production): +# DATABASE_URL=postgresql+psycopg://user:password@localhost:5432/mydb + +# Required — secret key for session signing (use a long random string in production) +SECRET_KEY=change-me-in-production diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..42934f8 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,94 @@ +name: CI + +on: + push: + branches: ["main", "dev", "v0.1"] + pull_request: + branches: ["main"] + +concurrency: + group: ci-${{ github.ref }} + cancel-in-progress: true + +jobs: + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install dev dependencies + run: pip install -e ".[dev]" + + - name: black + run: black --check internal_admin tests + + - name: isort + run: isort --check-only internal_admin tests + + - name: ruff + run: ruff check internal_admin tests + + typecheck: + name: Type check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install dev dependencies + run: pip install -e ".[dev]" + + - name: mypy + run: mypy internal_admin + + test: + name: Test (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + + - name: Install dependencies + run: pip install -e ".[dev]" + + - name: Run tests with coverage + run: | + pytest tests/ \ + --cov=internal_admin \ + --cov-report=xml \ + --cov-report=term-missing \ + -v + env: + DATABASE_URL: sqlite:///./test.db + SECRET_KEY: ci-test-secret-key + + - name: Upload coverage report + uses: codecov/codecov-action@v4 + if: matrix.python-version == '3.12' + with: + file: ./coverage.xml + fail_ci_if_error: false + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..143b72f --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,63 @@ +name: Release + +on: + push: + branches: ["main"] + +# Only one release job runs at a time; queued runs wait rather than cancel. +concurrency: + group: release + cancel-in-progress: false + +jobs: + release: + name: Semantic release + runs-on: ubuntu-latest + + # Prevent release from running on the automated version-bump commit itself. + if: "!contains(github.event.head_commit.message, '[skip ci]')" + + permissions: + id-token: write # required for PyPI trusted publishing + contents: write # required to push the version-bump commit and tag + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 # full history needed for semantic-release + token: ${{ secrets.GH_TOKEN }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install dev dependencies + run: pip install -e ".[dev]" + + - name: Configure git identity + run: | + git config --global user.name "github-actions[bot]" + git config --global user.email "github-actions[bot]@users.noreply.github.com" + + # python-semantic-release will: + # 1. Analyse commits since the last tag using Conventional Commits + # 2. Determine the version bump (patch / minor / major) + # 3. Update pyproject.toml and internal_admin/__init__.py + # 4. Update CHANGELOG.md + # 5. Commit, tag, and push + # 6. Build the sdist + wheel + # 7. Create a GitHub release and attach the build artefacts + # 8. Publish to PyPI via trusted publishing (OIDC — no token stored) + - name: Run semantic release + id: semantic + run: semantic-release version + env: + GH_TOKEN: ${{ secrets.GH_TOKEN }} + + - name: Publish to PyPI + if: steps.semantic.outputs.released == 'true' + run: semantic-release publish + env: + GH_TOKEN: ${{ secrets.GH_TOKEN }} diff --git a/.gitignore b/.gitignore index 15f9b4c..1cfe814 100644 --- a/.gitignore +++ b/.gitignore @@ -216,4 +216,12 @@ cookies.txt ai_contribute.md # sqlite db -*.db \ No newline at end of file +*.db + +# Release build artefacts +dist/ +build/ + +# .env — never commit credentials; commit .env.example instead +.env +.DS_Store \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..5fabfe9 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,54 @@ +repos: + # General hygiene + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-toml + - id: check-json + - id: check-merge-conflict + - id: check-added-large-files + args: ["--maxkb=500"] + - id: debug-statements + - id: mixed-line-ending + args: ["--fix=lf"] + + # Python import sorting + - repo: https://github.com/pycqa/isort + rev: 5.13.2 + hooks: + - id: isort + args: ["--profile", "black"] + + # Python formatting + - repo: https://github.com/psf/black + rev: 24.4.2 + hooks: + - id: black + + # Fast linting + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.4.4 + hooks: + - id: ruff + args: ["--fix"] + + # Conventional commit message enforcement + - repo: https://github.com/compilerla/conventional-pre-commit + rev: v3.4.0 + hooks: + - id: conventional-pre-commit + stages: [commit-msg] + args: + - feat + - fix + - perf + - refactor + - docs + - style + - test + - chore + - ci + - build diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ca7543..650813e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,42 +2,62 @@ All notable changes to this project will be documented in this file. -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +Releases are automated via [python-semantic-release](https://python-semantic-release.readthedocs.io/) +using [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/). -### Added -- Initial project structure -- Core architecture design -- AdminSite, ModelAdmin, AdminConfig public API -- SQLAlchemy 2.0 integration planning -- Authentication system design -- Permission system architecture -- Query engine design -- Form engine planning -- Bootstrap 5 template structure -- Development environment setup +--- -### Changed -- N/A +## [Unreleased] -### Deprecated -- N/A +--- -### Removed -- N/A +## [0.1.0] - 2026-03-04 -### Fixed -- N/A +### Added + +- `AdminSite`, `ModelAdmin`, and `AdminConfig` as the stable public API +- Automatic CRUD route generation per registered SQLAlchemy model + (`GET`/`POST` list, create, edit, delete with confirmation) +- Model registry with validation (primary key presence, declarative model check, + duplicate registration guard) +- Query engine with search, boolean/foreign-key filters, configurable ordering, + and pagination — all in a single pipeline +- Form engine: SQLAlchemy column introspection mapping types to HTML widgets + (text, textarea, checkbox, datetime-local, number, select) +- Session-based authentication with HTTP-only cookies and bcrypt password hashing +- Built-in `AdminUser` model (`admin_users` table) with `username`, `email`, + `is_active`, `is_superuser`, `created_at`, `last_login` +- `internal-admin createsuperuser` CLI command — reads `DATABASE_URL` and + `SECRET_KEY` from environment or `.env` file; no default users are ever seeded +- Activity logging: `ActivityLog` model records create, update, delete, and login + events with user, IP, user-agent, and timestamp +- Dashboard showing registered model counts and the 10 most recent activity + log entries +- Bootstrap 5 server-rendered UI (sidebar, navbar, list, form, confirm-delete + templates) — no JavaScript frameworks, no build pipeline +- Standalone login page (no sidebar/navbar) with client-side field validation, + show/hide password toggle, and loading state +- Logout confirmation modal +- User profile modal accessible from the navbar dropdown (display name, email, + role badge, member-since, last-login) +- `python-dotenv` integration: `.env` file loaded automatically in both the CLI + and demo server +- `demo_web.py` demo server registering `DemoCategory`, `DemoProduct`, and + `AdminUser` in the admin panel +- `CONTRIBUTING.md` and `CODE_OF_CONDUCT.md` ### Security -- N/A -## [0.1.0] - 2026-02-25 +- Passwords are hashed with bcrypt via passlib; plaintext passwords are never + stored or logged +- Session cookies are HTTP-only +- `password_hash` excluded from all admin forms by default when `AdminUser` is + registered -### Added -- Initial project setup -- Project architecture documentation -- Contributing guidelines -- Development environment configuration \ No newline at end of file +--- + +[Unreleased]: https://github.com/ayahaustine/internal-admin/compare/v0.1.0...HEAD +[0.1.0]: https://github.com/ayahaustine/internal-admin/releases/tag/v0.1.0 \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c53fb1b..a5cc1cd 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -55,20 +55,27 @@ source .venv/bin/activate # Install in editable mode with development dependencies pip install -e ".[dev]" + +# Install git hooks (enforces formatting and commit message format locally) +pre-commit install --install-hooks +pre-commit install --hook-type commit-msg ``` -To run the demo server: +To create a superuser and run the demo server: ```bash -python3 demo_web.py +internal-admin createsuperuser +python3 demo.py # Admin interface: http://localhost:8080/admin/ -# Login: admin / password123 ``` To run the test suite: ```bash pytest tests/ -v + +# With coverage +pytest tests/ --cov=internal_admin --cov-report=term-missing ``` --- @@ -197,22 +204,55 @@ Direct merges to `main` are not permitted. ## Versioning and Changelog -This project uses [Semantic Versioning](https://semver.org). +This project uses [Semantic Versioning](https://semver.org) and +[Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/). +**Releases are fully automated** — you do not manually bump the version. + +### Commit message format + +``` +(): + +[optional body] + +[optional footer] +``` + +| Type | Effect | +|------|--------| +| `feat:` | Increments **minor** version (0.x.0) | +| `fix:`, `perf:`, `refactor:` | Increments **patch** version (0.0.x) | +| `feat!:` or `BREAKING CHANGE:` in footer | Increments **major** version (x.0.0) | +| `docs:`, `chore:`, `style:`, `test:`, `ci:`, `build:` | No version bump | + +Examples: + +``` +feat(auth): add password reset via email +fix(query): prevent N+1 on foreign-key filter +feat!: remove deprecated `user_model` kwarg from AdminConfig +``` + +The pre-commit hook (`conventional-pre-commit`) enforces this format locally. +The CI pipeline validates it on every pull request. -| Change type | Version bump | -|---------------------|--------------| -| Bug fix | Patch (0.0.x) | -| New feature | Minor (0.x.0) | -| Breaking change | Major (x.0.0) | +### How a release is triggered -Every pull request that changes behavior must include a changelog entry in -`CHANGELOG.md` under the `[Unreleased]` section. Use the appropriate -category: `Added`, `Changed`, `Deprecated`, `Removed`, `Fixed`, or `Security`. +When a pull request is merged into `main`: -Breaking changes additionally require: +1. The **Release** GitHub Actions workflow runs. +2. `python-semantic-release` reads all commits since the last tag. +3. It determines the version bump from commit types. +4. If a bump is warranted it: + - Updates `version` in `pyproject.toml` + - Updates `__version__` in `internal_admin/__init__.py` + - Rewrites `CHANGELOG.md` + - Commits as `chore(release): [skip ci]` + - Creates and pushes a `v` tag + - Creates a GitHub release with build artefacts + - Publishes the wheel and sdist to PyPI -- A major version increment -- A migration note describing what callers need to change +If no commit since the last tag produces a version bump, no release occurs. --- diff --git a/README.md b/README.md index 4ae9c6c..5691dec 100644 --- a/README.md +++ b/README.md @@ -234,7 +234,7 @@ internal-admin createsuperuser Then start the server: ```bash -python3 demo_web.py +python3 demo.py ``` Open [http://localhost:8080/admin/](http://localhost:8080/admin/) and log in with the diff --git a/demo.py b/demo.py index facc8ac..044dfe8 100644 --- a/demo.py +++ b/demo.py @@ -1,185 +1,176 @@ #!/usr/bin/env python3 """ -Demo script for Internal Admin Framework. +Demo web server for Internal Admin Framework. -This script demonstrates the internal-admin framework setup and usage. +Configuration is read from a .env file or environment variables: + + DATABASE_URL=sqlite:///./demo.db + SECRET_KEY=your-secret-key + +Before starting the server for the first time, create a superuser: + + internal-admin createsuperuser """ -import asyncio -import sys import os -from pathlib import Path -# Add project root to path -project_root = Path(__file__).parent -sys.path.insert(0, str(project_root)) +try: + from dotenv import load_dotenv + load_dotenv(override=False) +except ImportError: + pass + +from fastapi import FastAPI, Request +from fastapi.responses import RedirectResponse +from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, create_engine +from sqlalchemy.orm import declarative_base, relationship, sessionmaker +from datetime import datetime +import uvicorn from internal_admin import AdminSite, AdminConfig, ModelAdmin from internal_admin.auth.models import AdminUser -from internal_admin.auth.security import hash_password -# Import example models from example.py -from example import User, Category, Product, CategoryAdmin, ProductAdmin +class AdminUserAdmin(ModelAdmin): + list_display = ["id", "username", "email", "is_superuser", "is_active", "created_at"] + search_fields = ["username", "email"] + list_filter = ["is_superuser", "is_active"] + ordering = ["username"] + readonly_fields = ["created_at", "last_login"] + exclude_fields = ["password_hash"] -def print_header(title: str): - """Print a formatted header.""" - print("\n" + "=" * 60) - print(f"🎯 {title}") - print("=" * 60) +# Demo models +Base = declarative_base() -def print_success(message: str): - """Print a success message.""" - print(f"✅ {message}") +class DemoCategory(Base): + """Demo category model.""" + __tablename__ = "demo_categories" + + id = Column(Integer, primary_key=True) + name = Column(String(100), nullable=False, unique=True) + description = Column(String(255)) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + + products = relationship("DemoProduct", back_populates="category") -def print_info(message: str): - """Print an info message.""" - print(f"ℹ️ {message}") +class DemoProduct(Base): + """Demo product model.""" + __tablename__ = "demo_products" + + id = Column(Integer, primary_key=True) + name = Column(String(100), nullable=False) + description = Column(String(500)) + price = Column(Integer) # Price in cents + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + category_id = Column(Integer, ForeignKey("demo_categories.id")) + + category = relationship("DemoCategory", back_populates="products") -def print_warning(message: str): - """Print a warning message.""" - print(f"⚠️ {message}") +# Admin classes +class DemoCategoryAdmin(ModelAdmin): + list_display = ["id", "name", "is_active", "created_at"] + search_fields = ["name", "description"] + list_filter = ["is_active"] + ordering = ["name"] -def main(): - """Run the internal-admin demo.""" - - print_header("Internal Admin Framework Demo") - - print_info("This demo shows the setup and capabilities of the internal-admin framework") - print_info("A Django-style admin interface for FastAPI applications") - - # 1. Configuration Demo - print_header("1. Configuration Setup") - +class DemoProductAdmin(ModelAdmin): + list_display = ["id", "name", "price", "description", "is_active"] + search_fields = ["name", "description"] + list_filter = ["is_active", "category_id"] + + +def create_demo_data(session): + """Seed demo categories and products. Never creates users.""" + if session.query(DemoCategory).count() > 0: + return + + electronics = DemoCategory(name="Electronics", description="Electronic devices") + books = DemoCategory(name="Books", description="Books and literature") + session.add_all([electronics, books]) + session.commit() + + products = [ + DemoProduct(name="Laptop", description="Gaming laptop", price=99999, category=electronics), + DemoProduct(name="Phone", description="Smartphone", price=79999, category=electronics), + DemoProduct(name="Python Guide", description="Learn Python programming", price=2999, category=books), + DemoProduct(name="Django Book", description="Web development with Django", price=3499, category=books), + ] + session.add_all(products) + session.commit() + + +def create_demo_app(): + """Create demo FastAPI app with authentication.""" + + app = FastAPI( + title="Internal Admin Demo", + description="Demo of internal-admin framework.", + version="1.0.0", + ) + + # Read config from environment / .env file + database_url = os.environ.get("DATABASE_URL", "sqlite:///./demo.db") + secret_key = os.environ.get("SECRET_KEY", "demo-secret-key-change-in-production") + config = AdminConfig( - database_url="sqlite:///./demo.db", - secret_key="demo-secret-key-change-in-production", - user_model=User, - debug=True + database_url=database_url, + secret_key=secret_key, + user_model=AdminUser, + debug=True, ) - - print_success("AdminConfig created") - print_info(f"Database: {config.database_url}") - print_info(f"Database Type: {'SQLite' if config.is_sqlite else 'PostgreSQL'}") - print_info(f"Debug Mode: {config.debug}") - - # 2. Admin Site Setup - print_header("2. AdminSite Setup") - + admin = AdminSite(config) - print_success("AdminSite created") - - # 3. Model Registration - print_header("3. Model Registration") - - # Register models with their admin classes - admin.register(Category, CategoryAdmin) - admin.register(Product, ProductAdmin) - admin.register(User) # Uses default ModelAdmin - - print_success("Category registered with CategoryAdmin") - print_success("Product registered with ProductAdmin") - print_success("User registered with default ModelAdmin") - - # Show registered models - registered_models = admin.get_registered_models() - print_info(f"Total registered models: {len(registered_models)}") - - for model_class, admin_class in registered_models.items(): - print_info(f" • {model_class.__name__} → {admin_class.__name__}") - - # 4. ModelAdmin Configuration Examples - print_header("4. ModelAdmin Configuration Examples") - - category_admin = admin.get_model_admin(Category) - print_info("CategoryAdmin Configuration:") - print_info(f" • List Display: {category_admin.get_list_display()}") - print_info(f" • Search Fields: {category_admin.get_search_fields()}") - print_info(f" • List Filters: {category_admin.get_list_filter()}") - print_info(f" • Ordering: {category_admin.get_ordering()}") - - product_admin = admin.get_model_admin(Product) - print_info("ProductAdmin Configuration:") - print_info(f" • List Display: {product_admin.get_list_display()}") - print_info(f" • Search Fields: {product_admin.get_search_fields()}") - print_info(f" • List Filters: {product_admin.get_list_filter()}") - print_info(f" • Read-only Fields: {product_admin.get_readonly_fields()}") - - # 5. Architecture Overview - print_header("5. Framework Architecture") - - print_info("🏗️ Architecture Components:") - print_info(" • AdminSite - Central orchestrator") - print_info(" • ModelAdmin - Per-model configuration") - print_info(" • AdminConfig - Configuration container") - print_info(" • Registry - Model registration system") - print_info(" • Query Engine - Database query pipeline") - print_info(" • Form Engine - Form generation and validation") - print_info(" • Router Factory - Dynamic route generation") - print_info(" • Authentication - Session-based auth system") - print_info(" • Permission System - Role-based access control") - - # 6. Generated Routes Overview - print_header("6. Generated Admin Routes") + admin.register(DemoCategory, DemoCategoryAdmin) + admin.register(DemoProduct, DemoProductAdmin) + admin.register(AdminUser, AdminUserAdmin) - print_info("🛣️ Auto-generated Routes per Model:") - print_info(" • GET /admin/{model}/ - List view") - print_info(" • GET /admin/{model}/create/ - Create form") - print_info(" • POST /admin/{model}/create/ - Create submit") - print_info(" • GET /admin/{model}/{id}/ - Edit form") - print_info(" • POST /admin/{model}/{id}/ - Edit submit") - print_info(" • GET /admin/{model}/{id}/delete/ - Delete confirmation") - print_info(" • POST /admin/{model}/{id}/delete/ - Delete submit") + # Mount admin interface + admin.mount(app) - print_info("🔐 Authentication Routes:") - print_info(" • GET /admin/login - Login form") - print_info(" • POST /admin/login - Login submit") - print_info(" • POST /admin/logout - Logout") + # Create database and demo data + from internal_admin.database.admin_tables import create_admin_tables + engine = create_engine(config.database_url) - print_info("📊 Dashboard:") - print_info(" • GET /admin/ - Admin dashboard") - - # 7. Features Summary - print_header("7. Key Features") - - features = [ - "🎨 Django-style admin interface", - "📝 Automatic CRUD generation from SQLAlchemy models", - "🔍 Built-in search and filtering", - "📄 Pagination with configurable page sizes", - "🔐 Session-based authentication", - "🛡️ Role-based permission system", - "📱 Responsive Bootstrap 5 UI", - "🗄️ SQLite & PostgreSQL support", - "⚡ FastAPI integration", - "🎛️ Extensible via ModelAdmin classes", - "🔧 Form validation and type conversion", - "🪝 Model lifecycle hooks (before_save, after_save, etc.)", - "🎯 Zero frontend build tools required", - "📦 Pip-installable package" - ] - - for feature in features: - print_info(f" {feature}") - - # 8. Next Steps - print_header("8. Next Steps") - - print_info("To try the admin interface:") - print_info("1. Run: python create_superuser.py") - print_info("2. Run: python example.py") - print_info("3. Visit: http://localhost:8000/admin/") + # Create both demo tables and admin tables + Base.metadata.create_all(engine) # Demo tables + create_admin_tables(engine) # Admin tables (users, activity logs) + + Session = sessionmaker(bind=engine) + session = Session() + create_demo_data(session) + session.close() - print_warning("Note: This is a demonstration - use proper secrets in production!") + # Root redirect + @app.get("/") + async def root(): + return RedirectResponse(url="/admin/", status_code=302) - print_header("Demo Complete") - print_success("The internal-admin framework is ready to use!") - print_info("Check the example.py file for a complete FastAPI application setup") + return app if __name__ == "__main__": - main() \ No newline at end of file + db = os.environ.get("DATABASE_URL", "sqlite:///./demo.db") + print("Internal Admin Demo Server") + print("=" * 60) + print("Admin Interface: http://localhost:8080/admin/") + print("Database: " + db) + print("=" * 60) + print("No default users are created.") + print("Run `internal-admin createsuperuser` first if you have") + print("not already done so.") + print("=" * 60) + + app = create_demo_app() + + uvicorn.run( + app, + host="127.0.0.1", + port=8080, + reload=False + ) \ No newline at end of file diff --git a/demo_web.py b/demo_web.py deleted file mode 100644 index 044dfe8..0000000 --- a/demo_web.py +++ /dev/null @@ -1,176 +0,0 @@ -#!/usr/bin/env python3 -""" -Demo web server for Internal Admin Framework. - -Configuration is read from a .env file or environment variables: - - DATABASE_URL=sqlite:///./demo.db - SECRET_KEY=your-secret-key - -Before starting the server for the first time, create a superuser: - - internal-admin createsuperuser -""" - -import os - -try: - from dotenv import load_dotenv - load_dotenv(override=False) -except ImportError: - pass - -from fastapi import FastAPI, Request -from fastapi.responses import RedirectResponse -from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, create_engine -from sqlalchemy.orm import declarative_base, relationship, sessionmaker -from datetime import datetime -import uvicorn - -from internal_admin import AdminSite, AdminConfig, ModelAdmin -from internal_admin.auth.models import AdminUser - - -class AdminUserAdmin(ModelAdmin): - list_display = ["id", "username", "email", "is_superuser", "is_active", "created_at"] - search_fields = ["username", "email"] - list_filter = ["is_superuser", "is_active"] - ordering = ["username"] - readonly_fields = ["created_at", "last_login"] - exclude_fields = ["password_hash"] - -# Demo models -Base = declarative_base() - - -class DemoCategory(Base): - """Demo category model.""" - __tablename__ = "demo_categories" - - id = Column(Integer, primary_key=True) - name = Column(String(100), nullable=False, unique=True) - description = Column(String(255)) - is_active = Column(Boolean, default=True) - created_at = Column(DateTime, default=datetime.utcnow) - - products = relationship("DemoProduct", back_populates="category") - - -class DemoProduct(Base): - """Demo product model.""" - __tablename__ = "demo_products" - - id = Column(Integer, primary_key=True) - name = Column(String(100), nullable=False) - description = Column(String(500)) - price = Column(Integer) # Price in cents - is_active = Column(Boolean, default=True) - created_at = Column(DateTime, default=datetime.utcnow) - category_id = Column(Integer, ForeignKey("demo_categories.id")) - - category = relationship("DemoCategory", back_populates="products") - - -# Admin classes -class DemoCategoryAdmin(ModelAdmin): - list_display = ["id", "name", "is_active", "created_at"] - search_fields = ["name", "description"] - list_filter = ["is_active"] - ordering = ["name"] - - -class DemoProductAdmin(ModelAdmin): - list_display = ["id", "name", "price", "description", "is_active"] - search_fields = ["name", "description"] - list_filter = ["is_active", "category_id"] - - -def create_demo_data(session): - """Seed demo categories and products. Never creates users.""" - if session.query(DemoCategory).count() > 0: - return - - electronics = DemoCategory(name="Electronics", description="Electronic devices") - books = DemoCategory(name="Books", description="Books and literature") - session.add_all([electronics, books]) - session.commit() - - products = [ - DemoProduct(name="Laptop", description="Gaming laptop", price=99999, category=electronics), - DemoProduct(name="Phone", description="Smartphone", price=79999, category=electronics), - DemoProduct(name="Python Guide", description="Learn Python programming", price=2999, category=books), - DemoProduct(name="Django Book", description="Web development with Django", price=3499, category=books), - ] - session.add_all(products) - session.commit() - - -def create_demo_app(): - """Create demo FastAPI app with authentication.""" - - app = FastAPI( - title="Internal Admin Demo", - description="Demo of internal-admin framework.", - version="1.0.0", - ) - - # Read config from environment / .env file - database_url = os.environ.get("DATABASE_URL", "sqlite:///./demo.db") - secret_key = os.environ.get("SECRET_KEY", "demo-secret-key-change-in-production") - - config = AdminConfig( - database_url=database_url, - secret_key=secret_key, - user_model=AdminUser, - debug=True, - ) - - admin = AdminSite(config) - admin.register(DemoCategory, DemoCategoryAdmin) - admin.register(DemoProduct, DemoProductAdmin) - admin.register(AdminUser, AdminUserAdmin) - - # Mount admin interface - admin.mount(app) - - # Create database and demo data - from internal_admin.database.admin_tables import create_admin_tables - engine = create_engine(config.database_url) - - # Create both demo tables and admin tables - Base.metadata.create_all(engine) # Demo tables - create_admin_tables(engine) # Admin tables (users, activity logs) - - Session = sessionmaker(bind=engine) - session = Session() - create_demo_data(session) - session.close() - - # Root redirect - @app.get("/") - async def root(): - return RedirectResponse(url="/admin/", status_code=302) - - return app - - -if __name__ == "__main__": - db = os.environ.get("DATABASE_URL", "sqlite:///./demo.db") - print("Internal Admin Demo Server") - print("=" * 60) - print("Admin Interface: http://localhost:8080/admin/") - print("Database: " + db) - print("=" * 60) - print("No default users are created.") - print("Run `internal-admin createsuperuser` first if you have") - print("not already done so.") - print("=" * 60) - - app = create_demo_app() - - uvicorn.run( - app, - host="127.0.0.1", - port=8080, - reload=False - ) \ No newline at end of file diff --git a/internal_admin/__init__.py b/internal_admin/__init__.py index f32ccbe..93f38d7 100644 --- a/internal_admin/__init__.py +++ b/internal_admin/__init__.py @@ -9,15 +9,31 @@ - AdminConfig: Configuration container for admin settings """ +from __future__ import annotations + +# python-semantic-release manages this string directly. +# importlib.metadata is used as the authoritative source at runtime +# once the package is installed, so both stay in sync after a release. __version__ = "0.1.0" -__author__ = "Internal Admin Team" +try: + from importlib.metadata import PackageNotFoundError, version + try: + __version__ = version("internal-admin") + except PackageNotFoundError: + pass # running from source without an editable install +except ImportError: + pass # Python < 3.8 fallback (shouldn't happen given requires-python) + +__author__ = "Ayah Austine" + +from .admin.model_admin import ModelAdmin from .config import AdminConfig from .site import AdminSite -from .admin.model_admin import ModelAdmin __all__ = [ "AdminSite", - "ModelAdmin", + "ModelAdmin", "AdminConfig", -] \ No newline at end of file + "__version__", +] diff --git a/internal_admin/admin/__init__.py b/internal_admin/admin/__init__.py index 975bf48..96a527e 100644 --- a/internal_admin/admin/__init__.py +++ b/internal_admin/admin/__init__.py @@ -9,14 +9,14 @@ - Filter system for list views """ +from .form_engine import FormEngine from .model_admin import ModelAdmin from .query_engine import QueryEngine -from .form_engine import FormEngine from .router_factory import AdminRouterFactory __all__ = [ "ModelAdmin", - "QueryEngine", + "QueryEngine", "FormEngine", "AdminRouterFactory", -] \ No newline at end of file +] diff --git a/internal_admin/admin/filters.py b/internal_admin/admin/filters.py index 1713022..b780877 100644 --- a/internal_admin/admin/filters.py +++ b/internal_admin/admin/filters.py @@ -5,10 +5,11 @@ including field-based filters and search functionality. """ -from typing import Any, Dict, List, Optional, Tuple, Type from abc import ABC, abstractmethod +from typing import Any + +from sqlalchemy import Boolean, Date, DateTime from sqlalchemy.orm import Session -from sqlalchemy import Column, Boolean, Integer, String, DateTime, Date from .model_admin import ModelAdmin @@ -16,45 +17,45 @@ class BaseFilter(ABC): """ Abstract base class for admin filters. - + Filters provide a way to limit the objects shown in list views based on field values or other criteria. """ - - def __init__(self, field_name: str, title: Optional[str] = None) -> None: + + def __init__(self, field_name: str, title: str | None = None) -> None: """ Initialize filter. - + Args: field_name: Name of model field to filter on title: Display title for filter (defaults to field name) """ self.field_name = field_name self.title = title or field_name.replace('_', ' ').title() - + @abstractmethod - def get_choices(self, session: Session, model_class: Type[Any]) -> List[Tuple[Any, str]]: + def get_choices(self, session: Session, model_class: type[Any]) -> list[tuple[Any, str]]: """ Get available filter choices. - + Args: session: SQLAlchemy session model_class: Model class being filtered - + Returns: List of (value, display_name) tuples """ pass - + @abstractmethod def apply_filter(self, query: Any, value: Any) -> Any: """ Apply filter to query. - + Args: query: SQLAlchemy query to modify value: Filter value selected by user - + Returns: Modified query """ @@ -64,38 +65,38 @@ def apply_filter(self, query: Any, value: Any) -> Any: class FieldFilter(BaseFilter): """ Generic filter for model fields. - + Automatically determines filter behavior based on field type. """ - - def get_choices(self, session: Session, model_class: Type[Any]) -> List[Tuple[Any, str]]: + + def get_choices(self, session: Session, model_class: type[Any]) -> list[tuple[Any, str]]: """Get distinct values for the field.""" if not hasattr(model_class, self.field_name): return [] - + field = getattr(model_class, self.field_name) - + # Get distinct non-null values distinct_query = session.query(field).distinct().filter(field.is_not(None)) - + choices = [] for (value,) in distinct_query.all(): display_name = str(value) if value is not None else "N/A" choices.append((value, display_name)) - + # Sort by display name choices.sort(key=lambda x: x[1]) - + return choices - + def apply_filter(self, query: Any, value: Any) -> Any: """Apply exact match filter.""" if not value or not hasattr(query.column_descriptions[0]['type'], self.field_name): return query - + model_class = query.column_descriptions[0]['type'] field = getattr(model_class, self.field_name) - + return query.filter(field == value) @@ -103,28 +104,28 @@ class BooleanFilter(BaseFilter): """ Filter for boolean fields with Yes/No choices. """ - - def get_choices(self, session: Session, model_class: Type[Any]) -> List[Tuple[Any, str]]: + + def get_choices(self, session: Session, model_class: type[Any]) -> list[tuple[Any, str]]: """Return Yes/No choices for boolean field.""" return [ (True, "Yes"), (False, "No"), ] - + def apply_filter(self, query: Any, value: Any) -> Any: """Apply boolean filter.""" if value is None: return query - + model_class = query.column_descriptions[0]['type'] field = getattr(model_class, self.field_name) - + # Convert string values to boolean if isinstance(value, str): bool_value = value.lower() in ('true', '1', 'yes') else: bool_value = bool(value) - + return query.filter(field == bool_value) @@ -132,35 +133,35 @@ class DateRangeFilter(BaseFilter): """ Filter for date fields with predefined ranges. """ - - def get_choices(self, session: Session, model_class: Type[Any]) -> List[Tuple[Any, str]]: + + def get_choices(self, session: Session, model_class: type[Any]) -> list[tuple[Any, str]]: """Return predefined date range choices.""" from datetime import datetime, timedelta - + today = datetime.now().date() - week_ago = today - timedelta(days=7) - month_ago = today - timedelta(days=30) - year_ago = today - timedelta(days=365) - + today - timedelta(days=7) + today - timedelta(days=30) + today - timedelta(days=365) + return [ ("today", "Today"), ("week", "Last 7 days"), ("month", "Last 30 days"), ("year", "Last year"), ] - + def apply_filter(self, query: Any, value: Any) -> Any: """Apply date range filter.""" if not value: return query - + from datetime import datetime, timedelta - + model_class = query.column_descriptions[0]['type'] field = getattr(model_class, self.field_name) - + today = datetime.now().date() - + if value == "today": return query.filter(field == today) elif value == "week": @@ -172,7 +173,7 @@ def apply_filter(self, query: Any, value: Any) -> Any: elif value == "year": year_ago = today - timedelta(days=365) return query.filter(field >= year_ago) - + return query @@ -180,16 +181,16 @@ class ForeignKeyFilter(BaseFilter): """ Filter for foreign key relationships. """ - + def __init__( - self, - field_name: str, - title: Optional[str] = None, + self, + field_name: str, + title: str | None = None, display_field: str = "id" ) -> None: """ Initialize foreign key filter. - + Args: field_name: Name of foreign key field title: Display title @@ -197,39 +198,39 @@ def __init__( """ super().__init__(field_name, title) self.display_field = display_field - - def get_choices(self, session: Session, model_class: Type[Any]) -> List[Tuple[Any, str]]: + + def get_choices(self, session: Session, model_class: type[Any]) -> list[tuple[Any, str]]: """Get choices from related model.""" if not hasattr(model_class, self.field_name): return [] - + # This is simplified - in practice you'd need proper relationship introspection # For now, return empty choices return [] - + def apply_filter(self, query: Any, value: Any) -> Any: """Apply foreign key filter.""" if not value: return query - + model_class = query.column_descriptions[0]['type'] field = getattr(model_class, self.field_name) - + return query.filter(field == value) class FilterManager: """ Manages filters for a ModelAdmin. - + Automatically creates appropriate filters based on model fields and ModelAdmin configuration. """ - + def __init__(self, model_admin: ModelAdmin) -> None: """ Initialize FilterManager. - + Args: model_admin: ModelAdmin instance """ @@ -237,42 +238,42 @@ def __init__(self, model_admin: ModelAdmin) -> None: self.model = model_admin.model self._filters = {} self._create_filters() - + def _create_filters(self) -> None: """Create filters based on ModelAdmin configuration.""" filter_fields = self.model_admin.get_list_filter() - + for field_name in filter_fields: filter_obj = self._create_filter_for_field(field_name) if filter_obj: self._filters[field_name] = filter_obj - - def _create_filter_for_field(self, field_name: str) -> Optional[BaseFilter]: + + def _create_filter_for_field(self, field_name: str) -> BaseFilter | None: """ Create appropriate filter for a field. - + Args: field_name: Name of field to create filter for - + Returns: Filter instance or None """ if not hasattr(self.model, field_name): return None - + # Find the column in the table column = None for col in self.model.__table__.columns: if col.name == field_name: column = col break - + if column is None: return None - + # Create filter based on column type column_type = type(column.type) - + if column_type == Boolean: return BooleanFilter(field_name) elif column_type in (DateTime, Date): @@ -281,49 +282,49 @@ def _create_filter_for_field(self, field_name: str) -> Optional[BaseFilter]: return ForeignKeyFilter(field_name) else: return FieldFilter(field_name) - - def get_filters(self) -> Dict[str, BaseFilter]: + + def get_filters(self) -> dict[str, BaseFilter]: """ Get all configured filters. - + Returns: Dictionary of field_name -> Filter """ return self._filters.copy() - - def get_filter(self, field_name: str) -> Optional[BaseFilter]: + + def get_filter(self, field_name: str) -> BaseFilter | None: """ Get filter for specific field. - + Args: field_name: Name of field - + Returns: Filter instance or None """ return self._filters.get(field_name) - - def get_filter_context(self, session: Session, current_filters: Dict[str, Any]) -> Dict[str, Any]: + + def get_filter_context(self, session: Session, current_filters: dict[str, Any]) -> dict[str, Any]: """ Get template context for filters. - + Args: session: SQLAlchemy session current_filters: Currently applied filter values - + Returns: Template context for filters """ filter_context = {} - + for field_name, filter_obj in self._filters.items(): choices = filter_obj.get_choices(session, self.model) current_value = current_filters.get(field_name) - + filter_context[field_name] = { 'title': filter_obj.title, 'choices': choices, 'current_value': current_value, } - - return filter_context \ No newline at end of file + + return filter_context diff --git a/internal_admin/admin/form_engine.py b/internal_admin/admin/form_engine.py index ac4424d..2858579 100644 --- a/internal_admin/admin/form_engine.py +++ b/internal_admin/admin/form_engine.py @@ -5,10 +5,11 @@ based on SQLAlchemy model metadata. """ -from typing import Any, Dict, List, Optional, Type, Union from dataclasses import dataclass -from datetime import datetime, date -from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, Date, Float +from datetime import datetime +from typing import Any + +from sqlalchemy import Boolean, Column, Date, DateTime, Float, Integer, String, Text from sqlalchemy.orm import Session from sqlalchemy.sql.sqltypes import TypeDecorator @@ -25,15 +26,15 @@ class FormField: field_type: str required: bool default_value: Any = None - choices: Optional[List[tuple]] = None + choices: list[tuple] | None = None readonly: bool = False - help_text: Optional[str] = None + help_text: str | None = None class FormEngine: """ Generates and processes forms based on SQLAlchemy models. - + Responsibilities: - Inspect model columns and generate form fields - Map SQLAlchemy types to HTML input types @@ -41,92 +42,92 @@ class FormEngine: - Handle foreign key relationships - Process form submissions """ - + def __init__(self, model_admin: ModelAdmin) -> None: """ Initialize FormEngine for a ModelAdmin. - + Args: model_admin: ModelAdmin instance """ self.model_admin = model_admin self.model = model_admin.model self._type_mapping = self._get_type_mapping() - - def generate_form_fields(self, session: Session, instance: Optional[Any] = None) -> List[FormField]: + + def generate_form_fields(self, session: Session, instance: Any | None = None) -> list[FormField]: """ Generate form fields for the model. - + Args: session: SQLAlchemy session for foreign key choices instance: Optional existing instance for editing - + Returns: List of FormField objects """ fields = [] form_field_names = self.model_admin.get_form_fields() readonly_fields = self.model_admin.get_readonly_fields() - + for field_name in form_field_names: if not hasattr(self.model, field_name): continue - + column = None for col in self.model.__table__.columns: if col.name == field_name: column = col break - + if column is None: continue - + field = self._create_form_field( column=column, session=session, instance=instance, readonly=field_name in readonly_fields ) - + if field: fields.append(field) - + return fields - + def _create_form_field( self, column: Column, session: Session, - instance: Optional[Any] = None, + instance: Any | None = None, readonly: bool = False - ) -> Optional[FormField]: + ) -> FormField | None: """ Create a FormField from a SQLAlchemy column. - + Args: column: SQLAlchemy column session: Database session instance: Optional model instance for current values readonly: Whether field should be read-only - + Returns: FormField or None if field should be skipped """ field_name = column.name - + # Skip primary key for create forms if column.primary_key and instance is None: return None - + # Get field type mapping field_type = self._map_column_type(column) - + # Create label from field name label = field_name.replace('_', ' ').title() - + # Determine if required required = not column.nullable and column.default is None - + # Get default value default_value = None if instance: @@ -134,13 +135,13 @@ def _create_form_field( elif column.default is not None: if hasattr(column.default, 'arg'): default_value = column.default.arg - + # Handle foreign key relationships choices = None if column.foreign_keys: choices = self._get_foreign_key_choices(column, session) field_type = "select" - + return FormField( name=field_name, label=label, @@ -150,137 +151,135 @@ def _create_form_field( choices=choices, readonly=readonly or column.primary_key ) - + def _map_column_type(self, column: Column) -> str: """ Map SQLAlchemy column type to HTML input type. - + Args: column: SQLAlchemy column - + Returns: HTML input type string """ column_type = type(column.type) - + # Handle type decorators if isinstance(column.type, TypeDecorator): column_type = type(column.type.impl) - + return self._type_mapping.get(column_type, "text") - - def _get_type_mapping(self) -> Dict[Type, str]: + + def _get_type_mapping(self) -> dict[type, str]: """ Get mapping from SQLAlchemy types to HTML input types. - + Returns: Dictionary mapping SQLAlchemy types to HTML input types """ return { String: "text", - Text: "textarea", + Text: "textarea", Integer: "number", Float: "number", Boolean: "checkbox", DateTime: "datetime-local", Date: "date", } - - def _get_foreign_key_choices(self, column: Column, session: Session) -> List[tuple]: + + def _get_foreign_key_choices(self, column: Column, session: Session) -> list[tuple]: """ Get choices for a foreign key field. - + Args: column: Foreign key column session: Database session - + Returns: List of (value, label) tuples """ choices = [("", "-- Select --")] - + # Get the referenced table and model - foreign_key = list(column.foreign_keys)[0] - referenced_table = foreign_key.column.table - + list(column.foreign_keys)[0] + # Find the model class for the referenced table # This is a simplified approach - in practice, you might need # a more sophisticated model registry lookup try: # Try to find model class by table name # This requires models to be registered or discoverable - referenced_model = None - + # For now, skip foreign key choices - can be implemented later # when we have better model discovery return choices - + except Exception: return choices - - def validate_form_data(self, form_data: Dict[str, Any]) -> Dict[str, Any]: + + def validate_form_data(self, form_data: dict[str, Any]) -> dict[str, Any]: """ Validate and convert form data. - + Args: form_data: Raw form data from request - + Returns: Validated and converted data - + Raises: ValueError: If validation fails """ validated_data = {} errors = [] - + form_field_names = self.model_admin.get_form_fields() - + for field_name in form_field_names: if not hasattr(self.model, field_name): continue - + column = None for col in self.model.__table__.columns: if col.name == field_name: column = col break - + if column is None: continue - + # Skip readonly fields if field_name in self.model_admin.get_readonly_fields(): continue - + # Skip primary key for new objects if column.primary_key: continue - + raw_value = form_data.get(field_name) - + try: validated_value = self._convert_field_value(column, raw_value) validated_data[field_name] = validated_value except ValueError as e: errors.append(f"{field_name}: {str(e)}") - + if errors: raise ValueError("; ".join(errors)) - + return validated_data - + def _convert_field_value(self, column: Column, raw_value: Any) -> Any: """ Convert and validate a field value. - + Args: column: SQLAlchemy column raw_value: Raw value from form - + Returns: Converted value - + Raises: ValueError: If conversion fails """ @@ -288,13 +287,13 @@ def _convert_field_value(self, column: Column, raw_value: Any) -> Any: if not column.nullable and column.default is None: raise ValueError("This field is required") return None - + column_type = type(column.type) - + # Handle type decorators if isinstance(column.type, TypeDecorator): column_type = type(column.type.impl) - + try: if column_type == String or column_type == Text: return str(raw_value) @@ -316,18 +315,18 @@ def _convert_field_value(self, column: Column, raw_value: Any) -> Any: return raw_value else: return raw_value - + except (ValueError, TypeError) as e: - raise ValueError(f"Invalid value for {column_type.__name__}: {raw_value}") - - def populate_instance(self, instance: Any, validated_data: Dict[str, Any]) -> None: + raise ValueError(f"Invalid value for {column_type.__name__}: {raw_value}") from e + + def populate_instance(self, instance: Any, validated_data: dict[str, Any]) -> None: """ Populate model instance with validated data. - + Args: instance: Model instance to populate validated_data: Validated form data """ for field_name, value in validated_data.items(): if hasattr(instance, field_name): - setattr(instance, field_name, value) \ No newline at end of file + setattr(instance, field_name, value) diff --git a/internal_admin/admin/model_admin.py b/internal_admin/admin/model_admin.py index 3573f94..bdf646b 100644 --- a/internal_admin/admin/model_admin.py +++ b/internal_admin/admin/model_admin.py @@ -1,11 +1,12 @@ """ ModelAdmin base class for Internal Admin. -ModelAdmin defines the configuration and behavior for admin +ModelAdmin defines the configuration and behavior for admin interface generation for SQLAlchemy models. """ -from typing import List, Optional, Tuple, Any, Dict, Type +from typing import Any + from sqlalchemy.orm import Session from ..auth.permissions import Permission @@ -14,246 +15,246 @@ class ModelAdmin: """ Base class for model-specific admin configuration. - + ModelAdmin defines how a model should be displayed and managed in the admin interface. It provides configuration attributes and hook methods that can be overridden by subclasses. - + This is the primary extension point for customizing admin behavior. """ - + # Display configuration - list_display: List[str] = [] # Fields to show in list view - search_fields: List[str] = [] # Fields to search across - list_filter: List[str] = [] # Fields to filter by - ordering: List[str] = [] # Default ordering - readonly_fields: List[str] = [] # Read-only fields in forms - exclude_fields: List[str] = [] # Fields to exclude from forms - - # Form configuration - form_fields: Optional[List[str]] = None # Explicit field order - + list_display: list[str] = [] # Fields to show in list view + search_fields: list[str] = [] # Fields to search across + list_filter: list[str] = [] # Fields to filter by + ordering: list[str] = [] # Default ordering + readonly_fields: list[str] = [] # Read-only fields in forms + exclude_fields: list[str] = [] # Fields to exclude from forms + + # Form configuration + form_fields: list[str] | None = None # Explicit field order + # Pagination page_size: int = 25 # Items per page - - def __init__(self, model_class: Type[Any]) -> None: + + def __init__(self, model_class: type[Any]) -> None: """ Initialize ModelAdmin for a specific model. - + Args: model_class: SQLAlchemy model class this admin manages """ self.model = model_class self._validate_configuration() - + def _validate_configuration(self) -> None: """Validate ModelAdmin configuration against the model.""" model_columns = [col.name for col in self.model.__table__.columns] - + # Validate list_display fields exist for field in self.list_display: if field not in model_columns: raise ValueError( f"list_display field '{field}' does not exist on model {self.model.__name__}" ) - + # Validate search_fields exist and are text-like for field in self.search_fields: if field not in model_columns: raise ValueError( f"search_fields field '{field}' does not exist on model {self.model.__name__}" ) - + # Validate filter fields exist for field in self.list_filter: if field not in model_columns: raise ValueError( f"list_filter field '{field}' does not exist on model {self.model.__name__}" ) - - def get_list_display(self) -> List[str]: + + def get_list_display(self) -> list[str]: """ Get fields to display in list view. - + Returns: List of field names to display """ if self.list_display: return self.list_display - + # Default: show first few columns columns = [col.name for col in self.model.__table__.columns] return columns[:4] # Show first 4 columns by default - - def get_search_fields(self) -> List[str]: + + def get_search_fields(self) -> list[str]: """ Get fields to search across. - + Returns: List of searchable field names """ return self.search_fields - - def get_list_filter(self) -> List[str]: + + def get_list_filter(self) -> list[str]: """ Get fields available for filtering. - + Returns: List of filterable field names """ return self.list_filter - - def get_ordering(self) -> List[str]: + + def get_ordering(self) -> list[str]: """ Get default ordering for list view. - + Returns: List of field names for ordering (prefix with '-' for desc) """ if self.ordering: return self.ordering - + # Default: order by primary key pk_column = self.model.__table__.primary_key.columns.keys()[0] return [pk_column] - - def get_form_fields(self) -> List[str]: + + def get_form_fields(self) -> list[str]: """ Get fields to include in forms. - + Returns: List of field names for forms """ if self.form_fields is not None: return self.form_fields - + # Default: all columns except excluded ones all_fields = [col.name for col in self.model.__table__.columns] return [f for f in all_fields if f not in self.exclude_fields] - - def get_readonly_fields(self) -> List[str]: + + def get_readonly_fields(self) -> list[str]: """ Get fields that should be read-only in forms. - + Returns: List of read-only field names """ return self.readonly_fields - + def get_page_size(self) -> int: """ Get page size for list view. - + Returns: Number of items per page """ return self.page_size - + # Permission hooks def has_view_permission(self, user: Any) -> bool: """ Check if user can view objects of this model. - + Args: user: User object to check permissions for - + Returns: True if user can view objects """ from ..auth.permissions import has_permission return has_permission(user, self.model, Permission.VIEW) - + def has_create_permission(self, user: Any) -> bool: """ Check if user can create objects of this model. - + Args: user: User object to check permissions for - + Returns: True if user can create objects """ from ..auth.permissions import has_permission return has_permission(user, self.model, Permission.CREATE) - - def has_update_permission(self, user: Any, obj: Optional[Any] = None) -> bool: + + def has_update_permission(self, user: Any, obj: Any | None = None) -> bool: """ Check if user can update objects of this model. - + Args: user: User object to check permissions for obj: Optional specific object for object-level permissions - + Returns: True if user can update objects """ from ..auth.permissions import has_permission return has_permission(user, self.model, Permission.UPDATE, obj) - - def has_delete_permission(self, user: Any, obj: Optional[Any] = None) -> bool: + + def has_delete_permission(self, user: Any, obj: Any | None = None) -> bool: """ Check if user can delete objects of this model. - + Args: user: User object to check permissions for obj: Optional specific object for object-level permissions - + Returns: True if user can delete objects """ from ..auth.permissions import has_permission return has_permission(user, self.model, Permission.DELETE, obj) - + # Query hooks def get_queryset(self, session: Session) -> Any: """ Get base queryset for this model. - + This method can be overridden to customize the base query, such as adding filters, eager loading, or access restrictions. - + Args: session: SQLAlchemy session - + Returns: SQLAlchemy query object """ return session.query(self.model) - + def before_save(self, obj: Any, is_create: bool = False) -> None: """ Hook called before saving an object. - + Args: obj: Model instance being saved is_create: True if creating new object, False if updating """ pass - + def after_save(self, obj: Any, is_create: bool = False) -> None: """ Hook called after saving an object. - + Args: obj: Model instance that was saved is_create: True if created new object, False if updated """ pass - + def before_delete(self, obj: Any) -> None: """ Hook called before deleting an object. - + Args: obj: Model instance being deleted """ pass - + def after_delete(self, obj: Any) -> None: """ Hook called after deleting an object. - + Args: obj: Model instance that was deleted """ - pass \ No newline at end of file + pass diff --git a/internal_admin/admin/query_engine.py b/internal_admin/admin/query_engine.py index 475c178..c16443b 100644 --- a/internal_admin/admin/query_engine.py +++ b/internal_admin/admin/query_engine.py @@ -5,9 +5,10 @@ including filtering, searching, ordering, and pagination. """ -from typing import Any, List, Dict, Optional, Tuple -from sqlalchemy.orm import Session, Query -from sqlalchemy import or_, and_, func +from typing import Any + +from sqlalchemy import or_ +from sqlalchemy.orm import Query, Session from sqlalchemy.orm.strategy_options import selectinload from .model_admin import ModelAdmin @@ -17,17 +18,17 @@ class QueryResult: """ Container for query results with pagination information. """ - + def __init__( - self, - items: List[Any], - total_count: int, - page: int, + self, + items: list[Any], + total_count: int, + page: int, page_size: int ) -> None: """ Initialize QueryResult. - + Args: items: List of model instances for current page total_count: Total number of items across all pages @@ -38,31 +39,31 @@ def __init__( self.total_count = total_count self.page = page self.page_size = page_size - + @property def total_pages(self) -> int: """Calculate total number of pages.""" if self.page_size == 0: return 0 return (self.total_count + self.page_size - 1) // self.page_size - + @property def has_previous(self) -> bool: """Check if there's a previous page.""" return self.page > 1 - + @property def has_next(self) -> bool: """Check if there's a next page.""" return self.page < self.total_pages - + @property - def previous_page(self) -> Optional[int]: + def previous_page(self) -> int | None: """Get previous page number.""" return self.page - 1 if self.has_previous else None - + @property - def next_page(self) -> Optional[int]: + def next_page(self) -> int | None: """Get next page number.""" return self.page + 1 if self.has_next else None @@ -70,7 +71,7 @@ def next_page(self) -> Optional[int]: class QueryEngine: """ Handles database queries for admin list views. - + Provides a pipeline for building complex queries with: - Base queryset from ModelAdmin - Search across multiple fields @@ -78,29 +79,29 @@ class QueryEngine: - Ordering by specified fields - Pagination with count optimization """ - + def __init__(self, model_admin: ModelAdmin) -> None: """ Initialize QueryEngine for a ModelAdmin. - + Args: model_admin: ModelAdmin instance to query for """ self.model_admin = model_admin self.model = model_admin.model - + def execute_query( self, session: Session, - search_query: Optional[str] = None, - filters: Optional[Dict[str, Any]] = None, - ordering: Optional[List[str]] = None, + search_query: str | None = None, + filters: dict[str, Any] | None = None, + ordering: list[str] | None = None, page: int = 1, - page_size: Optional[int] = None + page_size: int | None = None ) -> QueryResult: """ Execute complete query pipeline. - + Args: session: SQLAlchemy session search_query: Text to search for across search_fields @@ -108,101 +109,101 @@ def execute_query( ordering: List of field names for ordering page: Page number (1-based) page_size: Items per page (defaults to ModelAdmin page_size) - + Returns: QueryResult with items and pagination info """ if page_size is None: page_size = self.model_admin.get_page_size() - + # Build query pipeline query = self._get_base_query(session) query = self._apply_search(query, search_query) query = self._apply_filters(query, filters) - + # Get total count before pagination total_count = query.count() - + # Apply ordering and pagination query = self._apply_ordering(query, ordering) query = self._apply_eager_loading(query) items = self._apply_pagination(query, page, page_size) - + return QueryResult( items=items, total_count=total_count, page=page, page_size=page_size ) - + def _get_base_query(self, session: Session) -> Query: """ Get base query from ModelAdmin. - + Args: session: SQLAlchemy session - + Returns: Base query for the model """ return self.model_admin.get_queryset(session) - - def _apply_search(self, query: Query, search_query: Optional[str]) -> Query: + + def _apply_search(self, query: Query, search_query: str | None) -> Query: """ Apply text search across search fields. - + Args: query: Base query to modify search_query: Search string - + Returns: Query with search filters applied """ if not search_query or not search_query.strip(): return query - + search_fields = self.model_admin.get_search_fields() if not search_fields: return query - + search_term = f"%{search_query.strip()}%" search_conditions = [] - + for field_name in search_fields: if hasattr(self.model, field_name): field = getattr(self.model, field_name) # Use ILIKE for case-insensitive search search_conditions.append(field.ilike(search_term)) - + if search_conditions: query = query.filter(or_(*search_conditions)) - + return query - - def _apply_filters(self, query: Query, filters: Optional[Dict[str, Any]]) -> Query: + + def _apply_filters(self, query: Query, filters: dict[str, Any] | None) -> Query: """ Apply field-specific filters. - + Args: query: Query to modify filters: Dictionary of field -> value filters - + Returns: Query with filters applied """ if not filters: return query - + for field_name, value in filters.items(): if not hasattr(self.model, field_name): continue - + field = getattr(self.model, field_name) - + # Skip empty values if value is None or value == "": continue - + # Handle different filter types if isinstance(value, (list, tuple)): # Multiple values - use IN clause @@ -213,22 +214,22 @@ def _apply_filters(self, query: Query, filters: Optional[Dict[str, Any]]) -> Que else: # Exact match filter query = query.filter(field == value) - + return query - - def _apply_ordering(self, query: Query, ordering: Optional[List[str]]) -> Query: + + def _apply_ordering(self, query: Query, ordering: list[str] | None) -> Query: """ Apply ordering to query. - + Args: query: Query to modify ordering: List of field names (prefix '-' for descending) - + Returns: Query with ordering applied """ order_fields = ordering or self.model_admin.get_ordering() - + for field_spec in order_fields: # Handle descending order (field prefixed with '-') if field_spec.startswith('-'): @@ -237,85 +238,85 @@ def _apply_ordering(self, query: Query, ordering: Optional[List[str]]) -> Query: else: field_name = field_spec descending = False - + if hasattr(self.model, field_name): field = getattr(self.model, field_name) if descending: query = query.order_by(field.desc()) else: query = query.order_by(field.asc()) - + return query - + def _apply_eager_loading(self, query: Query) -> Query: """ Apply eager loading for foreign key relationships. - + This helps prevent N+1 queries when displaying related objects. - + Args: query: Query to modify - + Returns: Query with eager loading applied """ # For now, we'll use selectinload for all relationships # This can be optimized based on the fields being displayed - + list_display = self.model_admin.get_list_display() - + for field_name in list_display: if hasattr(self.model, field_name): field = getattr(self.model, field_name) - + # Check if it's a relationship if hasattr(field.property, 'mapper'): # It's a relationship - add selectinload query = query.options(selectinload(getattr(self.model, field_name))) - + return query - - def _apply_pagination(self, query: Query, page: int, page_size: int) -> List[Any]: + + def _apply_pagination(self, query: Query, page: int, page_size: int) -> list[Any]: """ Apply pagination and execute query. - + Args: query: Query to paginate page: Page number (1-based) page_size: Items per page - + Returns: List of model instances for the page """ offset = (page - 1) * page_size return query.offset(offset).limit(page_size).all() - - def get_filter_choices(self, session: Session, field_name: str) -> List[Tuple[Any, str]]: + + def get_filter_choices(self, session: Session, field_name: str) -> list[tuple[Any, str]]: """ Get available choices for a filter field. - + Args: session: SQLAlchemy session field_name: Name of field to get choices for - + Returns: List of (value, display_name) tuples """ if not hasattr(self.model, field_name): return [] - + field = getattr(self.model, field_name) - + # Get distinct values for this field distinct_query = session.query(field).distinct().filter(field.is_not(None)) - + choices = [] for (value,) in distinct_query.all(): # Use string representation as display name display_name = str(value) if value is not None else "N/A" choices.append((value, display_name)) - + # Sort choices by display name choices.sort(key=lambda x: x[1]) - - return choices \ No newline at end of file + + return choices diff --git a/internal_admin/admin/router_factory.py b/internal_admin/admin/router_factory.py index e294311..3ee0014 100644 --- a/internal_admin/admin/router_factory.py +++ b/internal_admin/admin/router_factory.py @@ -4,36 +4,36 @@ Generates FastAPI routes for CRUD operations on registered models. """ -from typing import Any, Dict, Optional, Type -from fastapi import APIRouter, Request, Response, Form, Depends, HTTPException, status +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session +from ..auth.activity import log_create, log_delete, log_update +from ..auth.routes import create_auth_dependency from ..config import AdminConfig -from ..registry import get_registry from ..database.session import get_session -from ..auth.routes import get_current_user, create_auth_dependency -from ..auth.permissions import Permission -from ..auth.activity import log_create, log_update, log_delete +from ..registry import get_registry +from .filters import FilterManager +from .form_engine import FormEngine from .model_admin import ModelAdmin from .query_engine import QueryEngine -from .form_engine import FormEngine -from .filters import FilterManager class AdminRouterFactory: """ Factory for creating FastAPI routers for admin models. - + Generates standardized CRUD routes for each registered model, handling permissions, form processing, and template rendering. """ - + def __init__(self, config: AdminConfig, templates: Jinja2Templates) -> None: """ Initialize AdminRouterFactory. - + Args: config: AdminConfig instance templates: Jinja2Templates for rendering @@ -41,22 +41,22 @@ def __init__(self, config: AdminConfig, templates: Jinja2Templates) -> None: self.config = config self.templates = templates self.registry = get_registry() - + def _get_registered_models(self, user: Any, prefix: str = "/admin") -> list: """ Get registered models for sidebar navigation. - + Args: user: Current authenticated user prefix: URL prefix for admin routes - + Returns: List of model info dictionaries for template context """ registered_models = [] for model_class, model_admin_class in self.registry.get_registered_models().items(): model_admin = model_admin_class(model_class) - + # Check if user has view permission if model_admin.has_view_permission(user): registered_models.append({ @@ -65,40 +65,40 @@ def _get_registered_models(self, user: Any, prefix: str = "/admin") -> list: 'name_plural': f"{model_class.__name__}s", 'url': f"{prefix}/{model_class.__name__.lower()}/", }) - + return registered_models - + def create_model_router( self, - model_class: Type[Any], + model_class: type[Any], model_admin: ModelAdmin ) -> APIRouter: """ Create FastAPI router for a model. - + Args: model_class: SQLAlchemy model class model_admin: ModelAdmin configuration - + Returns: FastAPI router with CRUD endpoints """ model_name = model_class.__name__.lower() router = APIRouter(prefix=f"/{model_name}", tags=[f"admin-{model_name}"]) - + # Initialize engines query_engine = QueryEngine(model_admin) form_engine = FormEngine(model_admin) filter_manager = FilterManager(model_admin) - + # Create authentication dependencies get_current_user_dep, require_auth_dep = create_auth_dependency(self.config) - + @router.get("/", response_class=HTMLResponse, name=f"{model_name}_list") async def list_view( request: Request, page: int = 1, - search: Optional[str] = None, + search: str | None = None, user: Any = Depends(require_auth_dep), db: Session = Depends(get_session) ) -> HTMLResponse: @@ -109,13 +109,13 @@ async def list_view( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied" ) - + # Extract filters from query params filters = {} for key, value in request.query_params.items(): if key not in ('page', 'search') and value: filters[key] = value - + # Execute query result = query_engine.execute_query( session=db, @@ -123,10 +123,10 @@ async def list_view( filters=filters, page=page ) - + # Get filter context filter_context = filter_manager.get_filter_context(db, filters) - + context = { "request": request, "model_name": model_class.__name__, @@ -149,9 +149,9 @@ async def list_view( "registered_models": self._get_registered_models(user), "user": user, } - + return self.templates.TemplateResponse("admin/list.html", context) - + @router.get("/create/", response_class=HTMLResponse, name=f"{model_name}_create") async def create_form( request: Request, @@ -165,10 +165,10 @@ async def create_form( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied" ) - + # Generate form fields form_fields = form_engine.generate_form_fields(db) - + context = { "request": request, "model_name": model_class.__name__, @@ -179,9 +179,9 @@ async def create_form( "registered_models": self._get_registered_models(user), "user": user, } - + return self.templates.TemplateResponse("admin/form.html", context) - + @router.post("/create/", name=f"{model_name}_create_submit") async def create_submit( request: Request, @@ -195,27 +195,27 @@ async def create_submit( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied" ) - + # Get form data form_data = await request.form() form_dict = dict(form_data) - + try: # Validate form data validated_data = form_engine.validate_form_data(form_dict) - + # Create new instance instance = model_class() - + # Call before_save hook model_admin.before_save(instance, is_create=True) - + # Populate instance form_engine.populate_instance(instance, validated_data) - + # Save to database db.add(instance) - + # Log the activity (before commit) try: log_create( @@ -229,25 +229,25 @@ async def create_submit( except Exception: # Don't fail the main operation if logging fails pass - + db.commit() - + # Call after_save hook model_admin.after_save(instance, is_create=True) - + # Redirect to list view return RedirectResponse( url=f"/admin/{model_name}/", status_code=status.HTTP_302_FOUND ) - + except Exception as e: # Handle validation or database errors db.rollback() - + # Re-render form with error form_fields = form_engine.generate_form_fields(db) - + context = { "request": request, "model_name": model_class.__name__, @@ -259,9 +259,9 @@ async def create_submit( "registered_models": self._get_registered_models(user), "user": user, } - + return self.templates.TemplateResponse("admin/form.html", context) - + @router.get("/{item_id}/", response_class=HTMLResponse, name=f"{model_name}_edit") async def edit_form( request: Request, @@ -274,23 +274,23 @@ async def edit_form( obj = db.query(model_class).filter( model_class.id == item_id ).first() - + if not obj: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Object not found" ) - + # Check permissions if not model_admin.has_update_permission(user, obj): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied" ) - + # Generate form fields with current values form_fields = form_engine.generate_form_fields(db, instance=obj) - + # Extract current values for form_data form_data = {} for column in model_class.__table__.columns: @@ -309,9 +309,9 @@ async def edit_form( "registered_models": self._get_registered_models(user), "user": user, } - + return self.templates.TemplateResponse("admin/form.html", context) - + @router.post("/{item_id}/", name=f"{model_name}_edit_submit") async def edit_submit( request: Request, @@ -324,34 +324,34 @@ async def edit_submit( obj = db.query(model_class).filter( model_class.id == item_id ).first() - + if not obj: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Object not found" ) - + # Check permissions if not model_admin.has_update_permission(user, obj): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied" ) - + # Get form data form_data = await request.form() form_dict = dict(form_data) - + try: # Validate form data validated_data = form_engine.validate_form_data(form_dict) - + # Call before_save hook model_admin.before_save(obj, is_create=False) - + # Update instance form_engine.populate_instance(obj, validated_data) - + # Log the activity (before commit) try: log_update( @@ -365,26 +365,26 @@ async def edit_submit( except Exception: # Don't fail the main operation if logging fails pass - + # Save to database db.commit() - + # Call after_save hook model_admin.after_save(obj, is_create=False) - + # Redirect to list view return RedirectResponse( url=f"/admin/{model_name}/", status_code=status.HTTP_302_FOUND ) - + except Exception as e: # Handle validation or database errors db.rollback() - + # Re-render form with error form_fields = form_engine.generate_form_fields(db, instance=obj) - + context = { "request": request, "model_name": model_class.__name__, @@ -398,9 +398,9 @@ async def edit_submit( "registered_models": self._get_registered_models(user), "user": user, } - + return self.templates.TemplateResponse("admin/form.html", context) - + @router.get("/{item_id}/delete/", response_class=HTMLResponse, name=f"{model_name}_delete") async def delete_confirmation( request: Request, @@ -413,20 +413,20 @@ async def delete_confirmation( obj = db.query(model_class).filter( model_class.id == item_id ).first() - + if not obj: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Object not found" ) - + # Check permissions if not model_admin.has_delete_permission(user, obj): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied" ) - + context = { "request": request, "model_name": model_class.__name__, @@ -436,9 +436,9 @@ async def delete_confirmation( "registered_models": self._get_registered_models(user), "user": user, } - + return self.templates.TemplateResponse("admin/confirm_delete.html", context) - + @router.post("/{item_id}/delete/", name=f"{model_name}_delete_submit") async def delete_submit( request: Request, @@ -451,28 +451,28 @@ async def delete_submit( obj = db.query(model_class).filter( model_class.id == item_id ).first() - + if not obj: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Object not found" ) - + # Check permissions if not model_admin.has_delete_permission(user, obj): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied" ) - + try: # Store object info before deletion object_repr = str(obj) object_id = obj.id - + # Call before_delete hook model_admin.before_delete(obj) - + # Log the activity (before delete and commit) try: log_delete( @@ -486,25 +486,25 @@ async def delete_submit( except Exception: # Don't fail the main operation if logging fails pass - + # Delete object db.delete(obj) db.commit() - + # Call after_delete hook model_admin.after_delete(obj) - + # Redirect to list view return RedirectResponse( url=f"/admin/{model_name}/", status_code=status.HTTP_302_FOUND ) - + except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting object: {str(e)}" - ) - - return router \ No newline at end of file + ) from e + + return router diff --git a/internal_admin/auth/__init__.py b/internal_admin/auth/__init__.py index 473e3ef..b4fdc94 100644 --- a/internal_admin/auth/__init__.py +++ b/internal_admin/auth/__init__.py @@ -9,14 +9,14 @@ """ from .models import AdminUser -from .security import SecurityManager, hash_password, verify_password from .permissions import PermissionManager, has_permission +from .security import SecurityManager, hash_password, verify_password __all__ = [ "AdminUser", - "SecurityManager", + "SecurityManager", "hash_password", "verify_password", "PermissionManager", "has_permission", -] \ No newline at end of file +] diff --git a/internal_admin/auth/activity.py b/internal_admin/auth/activity.py index d59af6c..7af9569 100644 --- a/internal_admin/auth/activity.py +++ b/internal_admin/auth/activity.py @@ -4,9 +4,10 @@ Provides functions to log user activities and system events. """ -from typing import Optional, Any -from sqlalchemy.orm import Session +from typing import Any + from fastapi import Request +from sqlalchemy.orm import Session from .models import ActivityLog @@ -14,16 +15,16 @@ def log_activity( session: Session, action: str, - user_id: Optional[int] = None, - model_name: Optional[str] = None, - object_id: Optional[str] = None, - object_repr: Optional[str] = None, - description: Optional[str] = None, - request: Optional[Request] = None, + user_id: int | None = None, + model_name: str | None = None, + object_id: str | None = None, + object_repr: str | None = None, + description: str | None = None, + request: Request | None = None, ) -> ActivityLog: """ Log an activity to the database. - + Args: session: Database session action: Action type (create, update, delete, login, etc.) @@ -33,17 +34,17 @@ def log_activity( object_repr: String representation of the object description: Additional description request: FastAPI request object for IP/user agent - + Returns: Created ActivityLog instance """ ip_address = None user_agent = None - + if request: ip_address = request.client.host if request.client else None user_agent = request.headers.get("user-agent") - + activity = ActivityLog( user_id=user_id, action=action, @@ -54,13 +55,13 @@ def log_activity( ip_address=ip_address, user_agent=user_agent, ) - + session.add(activity) session.flush() # Flush but don't commit - let caller handle commit return activity -def log_login(session: Session, user_id: int, request: Optional[Request] = None) -> ActivityLog: +def log_login(session: Session, user_id: int, request: Request | None = None) -> ActivityLog: """Log a successful login.""" return log_activity( session=session, @@ -71,7 +72,7 @@ def log_login(session: Session, user_id: int, request: Optional[Request] = None) ) -def log_logout(session: Session, user_id: int, request: Optional[Request] = None) -> ActivityLog: +def log_logout(session: Session, user_id: int, request: Request | None = None) -> ActivityLog: """Log a logout.""" return log_activity( session=session, @@ -88,7 +89,7 @@ def log_create( model_name: str, object_id: Any, object_repr: str, - request: Optional[Request] = None, + request: Request | None = None, ) -> ActivityLog: """Log object creation.""" return log_activity( @@ -109,7 +110,7 @@ def log_update( model_name: str, object_id: Any, object_repr: str, - request: Optional[Request] = None, + request: Request | None = None, ) -> ActivityLog: """Log object update.""" return log_activity( @@ -130,7 +131,7 @@ def log_delete( model_name: str, object_id: Any, object_repr: str, - request: Optional[Request] = None, + request: Request | None = None, ) -> ActivityLog: """Log object deletion.""" return log_activity( @@ -148,11 +149,11 @@ def log_delete( def get_recent_activities(session: Session, limit: int = 10) -> list[ActivityLog]: """ Get recent activities for display. - + Args: session: Database session limit: Maximum number of activities to return - + Returns: List of recent ActivityLog instances """ @@ -161,4 +162,4 @@ def get_recent_activities(session: Session, limit: int = 10) -> list[ActivityLog .order_by(ActivityLog.created_at.desc()) .limit(limit) .all() - ) \ No newline at end of file + ) diff --git a/internal_admin/auth/models.py b/internal_admin/auth/models.py index 04600be..8b36ce9 100644 --- a/internal_admin/auth/models.py +++ b/internal_admin/auth/models.py @@ -4,11 +4,10 @@ Provides base user model and authentication-related database models. """ -from typing import Optional -from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text -from sqlalchemy.sql import func +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship +from sqlalchemy.sql import func Base = declarative_base() @@ -16,68 +15,68 @@ class AdminUser(Base): """ Base admin user model. - + This provides the minimum contract required for authentication. Projects can extend this or provide their own user model that implements the same interface. - + Required attributes: - id: Primary key - password_hash: Hashed password - is_active: Whether user can log in - + Optional attributes: - username: Login username - email: User email - is_superuser: Full admin access - - created_at: Account creation timestamp + - created_at: Account creation timestamp - last_login: Last successful login """ - + __tablename__ = "admin_users" - + # Required fields id = Column(Integer, primary_key=True, index=True) password_hash = Column(String(255), nullable=False) is_active = Column(Boolean, default=True, nullable=False) - + # Common optional fields username = Column(String(50), unique=True, nullable=True, index=True) email = Column(String(255), unique=True, nullable=True, index=True) is_superuser = Column(Boolean, default=False, nullable=False) - + # Timestamps created_at = Column(DateTime(timezone=True), server_default=func.now()) last_login = Column(DateTime(timezone=True), nullable=True) - + def __repr__(self) -> str: identifier = self.username or self.email or f"ID:{self.id}" return f"" - + @property def display_name(self) -> str: """Get a display name for the user.""" return self.username or self.email or f"User {self.id}" - + def has_permission(self, permission: str) -> bool: """ Check if user has a specific permission. - + Base implementation - superusers have all permissions. Projects can override this method for custom permission logic. - + Args: permission: Permission string to check - + Returns: True if user has permission """ if not self.is_active: return False - + if self.is_superuser: return True - + # Base implementation denies all permissions for regular users # Projects should override this for custom permission logic return False @@ -86,13 +85,13 @@ def has_permission(self, permission: str) -> bool: class ActivityLog(Base): """ Activity log model to track admin user actions. - + This model tracks all significant actions performed by admin users including create, update, delete operations and login events. """ - + __tablename__ = "admin_activity_logs" - + id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("admin_users.id"), nullable=True, index=True) action = Column(String(50), nullable=False, index=True) # create, update, delete, login, etc. @@ -103,13 +102,13 @@ class ActivityLog(Base): ip_address = Column(String(45), nullable=True) # User's IP address user_agent = Column(Text, nullable=True) # User's browser/client info created_at = Column(DateTime(timezone=True), server_default=func.now(), index=True) - + # Relationship to user (nullable for system actions) user = relationship("AdminUser", backref="activity_logs") - + def __repr__(self) -> str: return f"" - + @property def display_description(self) -> str: """Get a human-readable description of the activity.""" @@ -124,23 +123,23 @@ def display_description(self) -> str: def validate_user_model(user_model_class) -> None: """ Validate that a user model class meets the required contract. - + Args: user_model_class: User model class to validate - + Raises: ValueError: If model doesn't meet requirements """ required_attributes = ["id", "password_hash", "is_active"] - + for attr in required_attributes: if not hasattr(user_model_class, attr): raise ValueError( f"User model {user_model_class.__name__} must have '{attr}' attribute" ) - + # Check if it has a table (SQLAlchemy model) if not hasattr(user_model_class, "__table__"): raise ValueError( f"User model {user_model_class.__name__} must be a SQLAlchemy model" - ) \ No newline at end of file + ) diff --git a/internal_admin/auth/permissions.py b/internal_admin/auth/permissions.py index 0a716bb..647f4e3 100644 --- a/internal_admin/auth/permissions.py +++ b/internal_admin/auth/permissions.py @@ -4,19 +4,19 @@ Provides role-based access control and permission checking utilities. """ -from typing import Any, Optional from enum import Enum +from typing import Any class Permission(Enum): """ Standard admin permissions. - + These are the basic CRUD operations that can be performed on models in the admin interface. """ VIEW = "view" - CREATE = "create" + CREATE = "create" UPDATE = "update" DELETE = "delete" @@ -24,93 +24,93 @@ class Permission(Enum): class PermissionManager: """ Manages permission checking for Internal Admin. - + Provides centralized permission logic that can be extended by projects for custom authorization rules. """ - + def __init__(self) -> None: """Initialize PermissionManager.""" pass - + def has_model_permission( - self, - user: Any, - model_class: Any, + self, + user: Any, + model_class: Any, permission: Permission ) -> bool: """ Check if user has permission for a model. - + Args: user: User object to check permissions for model_class: SQLAlchemy model class permission: Permission type to check - + Returns: True if user has permission """ if not user or not hasattr(user, "is_active") or not user.is_active: return False - + # Superusers have all permissions if hasattr(user, "is_superuser") and user.is_superuser: return True - + # Check if user has permission method if hasattr(user, "has_permission"): permission_string = f"{model_class.__name__.lower()}_{permission.value}" return user.has_permission(permission_string) - + # Default: deny all permissions for regular users return False - + def has_object_permission( self, user: Any, - obj: Any, + obj: Any, permission: Permission ) -> bool: """ Check if user has permission for a specific object. - + This allows for object-level permissions beyond model-level. - + Args: user: User object to check permissions for obj: Model instance to check permission for permission: Permission type to check - + Returns: True if user has permission for this specific object """ if not user or not hasattr(user, "is_active") or not user.is_active: return False - + # Check model-level permission first if not self.has_model_permission(user, obj.__class__, permission): return False - + # For now, object-level permissions are same as model-level # Projects can override this method for custom object-level logic return True - + def check_permission( self, user: Any, model_class: Any, permission: Permission, - obj: Optional[Any] = None + obj: Any | None = None ) -> None: """ Check permission and raise exception if denied. - + Args: user: User to check permissions for model_class: Model class permission: Permission to check obj: Optional specific object for object-level permissions - + Raises: PermissionError: If user doesn't have permission """ @@ -120,7 +120,7 @@ def check_permission( else: has_perm = self.has_model_permission(user, model_class, permission) context = f"{permission.value} {model_class.__name__} objects" - + if not has_perm: user_display = getattr(user, "display_name", "Anonymous") if user else "Anonymous" raise PermissionError(f"User {user_display} does not have permission to {context}") @@ -136,24 +136,24 @@ def get_permission_manager() -> PermissionManager: def has_permission( - user: Any, - model_class: Any, - permission: Permission, - obj: Optional[Any] = None + user: Any, + model_class: Any, + permission: Permission, + obj: Any | None = None ) -> bool: """ Check if user has permission using global permission manager. - + Args: user: User to check permissions for model_class: Model class - permission: Permission to check + permission: Permission to check obj: Optional specific object - + Returns: True if user has permission """ if obj is not None: return _permission_manager.has_object_permission(user, obj, permission) else: - return _permission_manager.has_model_permission(user, model_class, permission) \ No newline at end of file + return _permission_manager.has_model_permission(user, model_class, permission) diff --git a/internal_admin/auth/routes.py b/internal_admin/auth/routes.py index 77d41e9..00c514c 100644 --- a/internal_admin/auth/routes.py +++ b/internal_admin/auth/routes.py @@ -4,35 +4,36 @@ Provides login, logout, and session management endpoints. """ -from typing import Any, Optional -from fastapi import APIRouter, Request, Response, Form, Depends, HTTPException, status -from fastapi.responses import RedirectResponse, HTMLResponse +from typing import Any + +from fastapi import APIRouter, Depends, Form, HTTPException, Request, Response, status +from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from ..config import AdminConfig from ..database.session import get_session -from .security import get_security_manager -from .models import validate_user_model from .activity import log_login +from .models import validate_user_model +from .security import get_security_manager def create_auth_router(config: AdminConfig, templates: Jinja2Templates) -> APIRouter: """ Create FastAPI router for authentication endpoints. - + Args: config: AdminConfig with auth settings templates: Jinja2Templates instance for rendering - + Returns: Configured FastAPI router """ router = APIRouter(tags=["auth"]) - + # Validate user model validate_user_model(config.user_model) - + @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request) -> HTMLResponse: """Display login form.""" @@ -43,16 +44,16 @@ async def login_page(request: Request) -> HTMLResponse: user = get_current_user(request, config, db_session) if user is not None: return RedirectResponse(url="/admin/", status_code=302) - except: + except Exception: pass - + context = { "request": request, "title": "Admin Login", "error": request.query_params.get("error"), } return templates.TemplateResponse("auth/login.html", context) - + @router.post("/login") async def login_submit( request: Request, @@ -76,21 +77,21 @@ async def login_submit( security = get_security_manager() username = username.strip() - + # Query user by username or email user_query = db.query(config.user_model) - + if hasattr(config.user_model, "username"): user = user_query.filter(config.user_model.username == username).first() elif hasattr(config.user_model, "email"): user = user_query.filter(config.user_model.email == username).first() else: raise ValueError("User model must have either 'username' or 'email' field") - + # Verify user and password if ( - user is None - or not user.is_active + user is None + or not user.is_active or not security.verify_password(password, user.password_hash) ): # Redirect back to login with error @@ -98,25 +99,25 @@ async def login_submit( url="/admin/login?error=invalid_credentials", status_code=status.HTTP_302_FOUND ) - + # Update last login if field exists if hasattr(user, "last_login"): from datetime import datetime user.last_login = datetime.utcnow() - + # Log the login activity (before commit) try: log_login(session=db, user_id=user.id, request=request) except Exception: # Don't fail login if logging fails pass - + # Commit both login update and activity log db.commit() - + # Create session token session_token = security.create_session_token(user.id) - + # Set secure cookie response = RedirectResponse(url="/admin/", status_code=status.HTTP_302_FOUND) response.set_cookie( @@ -127,9 +128,9 @@ async def login_submit( samesite="lax", max_age=86400, # 24 hours ) - + return response - + @router.post("/logout") async def logout(request: Request, response: Response) -> RedirectResponse: """Handle logout.""" @@ -141,23 +142,23 @@ async def logout(request: Request, response: Response) -> RedirectResponse: samesite="lax" ) return response - + return router def get_current_user( - request: Request, + request: Request, config: AdminConfig, db: Session = Depends(get_session) -) -> Optional[Any]: +) -> Any | None: """ FastAPI dependency to get current authenticated user. - + Args: request: FastAPI request object config: AdminConfig instance db: Database session - + Returns: User object if authenticated, None otherwise """ @@ -165,25 +166,25 @@ def get_current_user( session_token = request.cookies.get(config.session_cookie_name) if not session_token: return None - + # Verify token and get user ID security = get_security_manager() user_id = security.verify_session_token(session_token) if not user_id: return None - + # Load user from database try: user = db.query(config.user_model).filter( config.user_model.id == user_id ).first() - + if user and user.is_active: return user except Exception: # Database error or invalid user model pass - + return None @@ -192,11 +193,11 @@ def create_auth_dependency(config: AdminConfig): def get_current_user_dependency( request: Request, db: Session = Depends(get_session) - ) -> Optional[Any]: + ) -> Any | None: return get_current_user(request, config, db) - + def require_auth_dependency( - user: Optional[Any] = Depends(get_current_user_dependency) + user: Any | None = Depends(get_current_user_dependency) ) -> Any: """Require authentication dependency.""" if user is None: @@ -205,23 +206,23 @@ def require_auth_dependency( detail="Authentication required" ) return user - + return get_current_user_dependency, require_auth_dependency # Legacy function for backward compatibility def require_auth( - user: Optional[Any] = Depends(get_current_user) + user: Any | None = Depends(get_current_user) ) -> Any: """ FastAPI dependency that requires authentication. - + Args: user: Current user from get_current_user dependency - + Returns: Authenticated user object - + Raises: HTTPException: If user is not authenticated """ @@ -230,4 +231,4 @@ def require_auth( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) - return user \ No newline at end of file + return user diff --git a/internal_admin/auth/security.py b/internal_admin/auth/security.py index 94f7837..adebb05 100644 --- a/internal_admin/auth/security.py +++ b/internal_admin/auth/security.py @@ -5,10 +5,10 @@ """ import secrets -from typing import Optional, Any from datetime import datetime, timedelta + +from jose import JWTError, jwt from passlib.context import CryptContext -from jose import jwt, JWTError from ..config import AdminConfig @@ -16,114 +16,114 @@ class SecurityManager: """ Manages security operations for Internal Admin. - + Handles: - Password hashing and verification - Session token generation and validation - Security configuration """ - + def __init__(self, config: AdminConfig) -> None: """ Initialize SecurityManager with configuration. - + Args: config: AdminConfig instance with security settings """ self.config = config # Use bcrypt as primary hasher for reliability self.pwd_context = CryptContext( - schemes=["bcrypt"], + schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 ) - + # JWT settings for session tokens self.algorithm = "HS256" self.session_expire_hours = 24 - + def hash_password(self, password: str) -> str: """ Hash a password using bcrypt. - + Args: password: Plain text password to hash - + Returns: Hashed password string """ # Ensure password is not too long (bcrypt limitation) if len(password.encode('utf-8')) > 72: password = password[:72] - + return self.pwd_context.hash(password) - + def verify_password(self, plain_password: str, hashed_password: str) -> bool: """ Verify a password against its hash. - + Args: plain_password: Plain text password to verify hashed_password: Stored password hash - + Returns: True if password matches hash """ return self.pwd_context.verify(plain_password, hashed_password) - + def create_session_token(self, user_id: int) -> str: """ Create a session token for a user. - + Args: user_id: User ID to encode in token - + Returns: JWT session token """ expire = datetime.utcnow() + timedelta(hours=self.session_expire_hours) - + payload = { "user_id": user_id, "exp": expire, "iat": datetime.utcnow(), "type": "session" } - + return jwt.encode(payload, self.config.secret_key, algorithm=self.algorithm) - - def verify_session_token(self, token: str) -> Optional[int]: + + def verify_session_token(self, token: str) -> int | None: """ Verify and decode a session token. - + Args: token: JWT session token to verify - + Returns: User ID if token is valid, None otherwise """ try: payload = jwt.decode( - token, - self.config.secret_key, + token, + self.config.secret_key, algorithms=[self.algorithm] ) - + user_id = payload.get("user_id") token_type = payload.get("type") - + if token_type != "session" or user_id is None: return None - + return int(user_id) - + except JWTError: return None - + def generate_csrf_token(self) -> str: """ Generate a CSRF token. - + Returns: Random CSRF token string """ @@ -131,7 +131,7 @@ def generate_csrf_token(self) -> str: # Module-level functions for convenience -_security_manager: Optional[SecurityManager] = None +_security_manager: SecurityManager | None = None def initialize_security(config: AdminConfig) -> None: @@ -143,10 +143,10 @@ def initialize_security(config: AdminConfig) -> None: def get_security_manager() -> SecurityManager: """ Get the global security manager. - + Returns: SecurityManager instance - + Raises: RuntimeError: If security manager not initialized """ @@ -160,10 +160,10 @@ def get_security_manager() -> SecurityManager: def hash_password(password: str) -> str: """ Hash a password using the global security manager. - + Args: password: Plain text password - + Returns: Hashed password """ @@ -173,12 +173,12 @@ def hash_password(password: str) -> str: def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a password using the global security manager. - + Args: plain_password: Plain text password hashed_password: Stored hash - + Returns: True if password is valid """ - return get_security_manager().verify_password(plain_password, hashed_password) \ No newline at end of file + return get_security_manager().verify_password(plain_password, hashed_password) diff --git a/internal_admin/cli.py b/internal_admin/cli.py index bc57e24..75bd1b5 100644 --- a/internal_admin/cli.py +++ b/internal_admin/cli.py @@ -76,8 +76,8 @@ def cmd_createsuperuser(args: argparse.Namespace) -> int: # Deferred so the module can be imported cheaply (e.g. for --help) try: from sqlalchemy import create_engine - from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError + from sqlalchemy.orm import sessionmaker from internal_admin.auth.models import AdminUser, Base from internal_admin.auth.security import hash_password, initialize_security diff --git a/internal_admin/config.py b/internal_admin/config.py index c78e1e0..9de6cd9 100644 --- a/internal_admin/config.py +++ b/internal_admin/config.py @@ -6,23 +6,23 @@ """ import os -from typing import Optional, Type, Any from dataclasses import dataclass +from typing import Any @dataclass class AdminConfig: """ Configuration container for Internal Admin. - + This class holds all configuration settings required for the admin system. Settings can be provided directly or via environment variables. - + Required settings: - database_url: SQLAlchemy database URL - - secret_key: Secret key for session encryption + - secret_key: Secret key for session encryption - user_model: SQLAlchemy model class for authentication - + Optional settings: - session_cookie_name: Name of session cookie (default: "admin_session") - login_route: Route path for login page (default: "/admin/login") @@ -30,59 +30,59 @@ class AdminConfig: - debug: Enable debug mode (default: False) - page_size: Default page size for lists (default: 25) """ - + database_url: str secret_key: str - user_model: Type[Any] - + user_model: type[Any] + # Optional settings with defaults session_cookie_name: str = "admin_session" login_route: str = "/admin/login" - template_path_override: Optional[str] = None + template_path_override: str | None = None debug: bool = False page_size: int = 25 - + def __post_init__(self) -> None: """Validate configuration after initialization.""" self._validate_required_fields() self._load_from_environment() - + def _validate_required_fields(self) -> None: """Validate that all required fields are provided.""" if not self.database_url: raise ValueError("database_url is required") - + if not self.secret_key: raise ValueError("secret_key is required") - + if self.user_model is None: raise ValueError("user_model is required") - + def _load_from_environment(self) -> None: """Load configuration from environment variables if not set.""" # Override with environment variables if they exist env_database_url = os.getenv("DATABASE_URL") if env_database_url: self.database_url = env_database_url - + env_secret_key = os.getenv("SECRET_KEY") if env_secret_key: self.secret_key = env_secret_key - + env_debug = os.getenv("DEBUG", "").lower() if env_debug in ("true", "1", "yes"): self.debug = True - + env_page_size = os.getenv("ADMIN_PAGE_SIZE") if env_page_size and env_page_size.isdigit(): self.page_size = int(env_page_size) - + @property def is_sqlite(self) -> bool: """Check if using SQLite database.""" return self.database_url.startswith("sqlite") - - @property + + @property def is_postgresql(self) -> bool: """Check if using PostgreSQL database.""" - return "postgresql" in self.database_url or "postgres" in self.database_url \ No newline at end of file + return "postgresql" in self.database_url or "postgres" in self.database_url diff --git a/internal_admin/database/__init__.py b/internal_admin/database/__init__.py index d2e092c..2c1a031 100644 --- a/internal_admin/database/__init__.py +++ b/internal_admin/database/__init__.py @@ -5,12 +5,12 @@ supporting both SQLite and PostgreSQL databases. """ -from .engine import get_engine, create_engine_from_config -from .session import get_session, SessionManager +from .engine import create_engine_from_config, get_engine +from .session import SessionManager, get_session __all__ = [ "get_engine", - "create_engine_from_config", + "create_engine_from_config", "get_session", "SessionManager", -] \ No newline at end of file +] diff --git a/internal_admin/database/admin_tables.py b/internal_admin/database/admin_tables.py index 3e94a77..496e92b 100644 --- a/internal_admin/database/admin_tables.py +++ b/internal_admin/database/admin_tables.py @@ -4,8 +4,7 @@ Provides functions for creating admin-specific database tables. """ -from typing import Any -from sqlalchemy import Engine, MetaData +from sqlalchemy import Engine from sqlalchemy.orm import Session from ..auth.models import Base as AdminBase @@ -14,11 +13,11 @@ def create_admin_tables(engine: Engine) -> None: """ Create all admin-specific database tables. - + This includes: - AdminUser table (if using built-in user model) - ActivityLog table (for activity logging) - + Args: engine: SQLAlchemy engine to create tables on """ @@ -28,12 +27,12 @@ def create_admin_tables(engine: Engine) -> None: def ensure_admin_tables_exist(session: Session) -> None: """ Ensure admin tables exist in the database. - + This is a convenience function that can be called during admin initialization to make sure all required tables exist. - + Args: session: SQLAlchemy session to check tables with """ engine = session.bind - create_admin_tables(engine) \ No newline at end of file + create_admin_tables(engine) diff --git a/internal_admin/database/engine.py b/internal_admin/database/engine.py index cd67498..2a138ab 100644 --- a/internal_admin/database/engine.py +++ b/internal_admin/database/engine.py @@ -5,34 +5,34 @@ for both SQLite and PostgreSQL databases. """ -from typing import Optional -from sqlalchemy import create_engine, Engine + +from sqlalchemy import Engine, create_engine from sqlalchemy.pool import StaticPool from ..config import AdminConfig # Global engine instance -_engine: Optional[Engine] = None +_engine: Engine | None = None def create_engine_from_config(config: AdminConfig) -> Engine: """ Create SQLAlchemy engine from AdminConfig. - + Args: config: AdminConfig instance with database settings - + Returns: Configured SQLAlchemy engine - + Raises: ValueError: If database URL is invalid or unsupported """ database_url = config.database_url - + # Engine configuration based on database type engine_kwargs = {} - + if config.is_sqlite: # SQLite-specific configuration engine_kwargs.update({ @@ -57,7 +57,7 @@ def create_engine_from_config(config: AdminConfig) -> Engine: f"Unsupported database URL: {database_url}. " "Only SQLite and PostgreSQL are supported." ) - + try: engine = create_engine(database_url, **engine_kwargs) return engine @@ -68,10 +68,10 @@ def create_engine_from_config(config: AdminConfig) -> Engine: def get_engine() -> Engine: """ Get the global engine instance. - + Returns: The configured SQLAlchemy engine - + Raises: RuntimeError: If engine has not been initialized """ @@ -92,13 +92,13 @@ def set_engine(engine: Engine) -> None: def initialize_engine(config: AdminConfig) -> Engine: """ Initialize the global engine from config. - + Args: config: AdminConfig instance - + Returns: The initialized engine """ engine = create_engine_from_config(config) set_engine(engine) - return engine \ No newline at end of file + return engine diff --git a/internal_admin/database/session.py b/internal_admin/database/session.py index fbee518..18e9146 100644 --- a/internal_admin/database/session.py +++ b/internal_admin/database/session.py @@ -4,9 +4,10 @@ Provides session factory and context management for database operations. """ -from typing import Generator +from collections.abc import Generator from contextlib import contextmanager -from sqlalchemy.orm import sessionmaker, Session + +from sqlalchemy.orm import Session, sessionmaker from .engine import get_engine @@ -14,14 +15,14 @@ class SessionManager: """ Manages SQLAlchemy session creation and lifecycle. - + Provides session factory and context managers for safe database transaction handling. """ - + def __init__(self) -> None: self._session_factory = None - + def initialize(self) -> None: """Initialize the session factory with the global engine.""" engine = get_engine() @@ -31,14 +32,14 @@ def initialize(self) -> None: autoflush=False, expire_on_commit=False, ) - + def create_session(self) -> Session: """ Create a new database session. - + Returns: New SQLAlchemy session - + Raises: RuntimeError: If session factory not initialized """ @@ -47,15 +48,15 @@ def create_session(self) -> Session: "SessionManager not initialized. Call initialize() first." ) return self._session_factory() - + @contextmanager def get_session(self) -> Generator[Session, None, None]: """ Context manager for database sessions. - + Provides automatic session cleanup and transaction handling. Commits on success, rolls back on exception. - + Yields: Database session """ @@ -87,15 +88,15 @@ def get_session_manager() -> SessionManager: def get_session() -> Generator[Session, None, None]: """ FastAPI dependency for database sessions. - + Usage: @app.get("/endpoint") async def endpoint(db: Session = Depends(get_session)): # Use db session here pass - + Yields: Database session with automatic cleanup """ with _session_manager.get_session() as session: - yield session \ No newline at end of file + yield session diff --git a/internal_admin/registry.py b/internal_admin/registry.py index 6a1439c..5593b91 100644 --- a/internal_admin/registry.py +++ b/internal_admin/registry.py @@ -1,75 +1,74 @@ """ Model registry for Internal Admin. -The registry maintains a mapping of SQLAlchemy models to their +The registry maintains a mapping of SQLAlchemy models to their associated ModelAdmin classes and provides validation. """ -from typing import Dict, Type, Any, Optional import inspect -from sqlalchemy.ext.declarative import DeclarativeMeta +from typing import Any class ModelRegistry: """ Registry for storing registered models and their admin classes. - + Responsibilities: - Store registered models with their ModelAdmin classes - Validate model metadata during registration - Provide access to registered model information """ - + def __init__(self) -> None: - self._registry: Dict[Type[Any], Type[Any]] = {} - - def register(self, model: Type[Any], model_admin_class: Optional[Type[Any]] = None) -> None: + self._registry: dict[type[Any], type[Any]] = {} + + def register(self, model: type[Any], model_admin_class: type[Any] | None = None) -> None: """ Register a model with its ModelAdmin class. - + Args: model: SQLAlchemy model class to register model_admin_class: Optional ModelAdmin subclass for this model - + Raises: ValueError: If model is invalid or already registered """ self._validate_model(model) - + if model in self._registry: raise ValueError(f"Model {model.__name__} is already registered") - + # Import here to avoid circular imports from .admin.model_admin import ModelAdmin - + # Use default ModelAdmin if none provided if model_admin_class is None: model_admin_class = ModelAdmin - + # Validate ModelAdmin class if not issubclass(model_admin_class, ModelAdmin): - raise ValueError(f"model_admin_class must be a subclass of ModelAdmin") - + raise ValueError("model_admin_class must be a subclass of ModelAdmin") + self._registry[model] = model_admin_class - - def get_model_admin(self, model: Type[Any]) -> Type[Any]: + + def get_model_admin(self, model: type[Any]) -> type[Any]: """Get the ModelAdmin class for a registered model.""" if model not in self._registry: raise ValueError(f"Model {model.__name__} is not registered") return self._registry[model] - - def get_registered_models(self) -> Dict[Type[Any], Type[Any]]: + + def get_registered_models(self) -> dict[type[Any], type[Any]]: """Get all registered models and their admin classes.""" return self._registry.copy() - - def is_registered(self, model: Type[Any]) -> bool: + + def is_registered(self, model: type[Any]) -> bool: """Check if a model is registered.""" return model in self._registry - - def _validate_model(self, model: Type[Any]) -> None: + + def _validate_model(self, model: type[Any]) -> None: """ Validate that a model meets requirements for registration. - + Requirements: - Must be a SQLAlchemy declarative model - Must have a primary key @@ -78,15 +77,15 @@ def _validate_model(self, model: Type[Any]) -> None: # Check if it's a SQLAlchemy declarative model if not hasattr(model, "__tablename__"): raise ValueError(f"Model {model.__name__} must have a __tablename__ attribute") - + # Check if it has proper SQLAlchemy metadata if not hasattr(model, "__table__"): raise ValueError(f"Model {model.__name__} must be a SQLAlchemy declarative model") - + # Check if it has a primary key if not hasattr(model.__table__, "primary_key") or not model.__table__.primary_key.columns: raise ValueError(f"Model {model.__name__} must have a primary key") - + # Verify it's a class and not an instance if not inspect.isclass(model): raise ValueError("model must be a class, not an instance") @@ -98,4 +97,4 @@ def _validate_model(self, model: Type[Any]) -> None: def get_registry() -> ModelRegistry: """Get the global model registry instance.""" - return _global_registry \ No newline at end of file + return _global_registry diff --git a/internal_admin/site.py b/internal_admin/site.py index 81664be..5ee2c8b 100644 --- a/internal_admin/site.py +++ b/internal_admin/site.py @@ -5,135 +5,135 @@ It manages model registration, router generation, and FastAPI integration. """ -import os -from typing import Any, Dict, Type, Optional from pathlib import Path -from fastapi import FastAPI, APIRouter, Request, Depends +from typing import Any + +from fastapi import APIRouter, FastAPI, Request from fastapi.responses import HTMLResponse, RedirectResponse -from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from .admin.model_admin import ModelAdmin +from .admin.router_factory import AdminRouterFactory +from .auth.routes import create_auth_router, get_current_user +from .auth.security import initialize_security from .config import AdminConfig -from .registry import get_registry from .database.engine import initialize_engine from .database.session import initialize_session_manager -from .auth.security import initialize_security -from .auth.routes import create_auth_router, create_auth_dependency, get_current_user -from .admin.model_admin import ModelAdmin -from .admin.router_factory import AdminRouterFactory +from .registry import get_registry class AdminSite: """ Central orchestrator for Internal Admin. - + AdminSite is responsible for: - Model registration and validation - Database initialization - Router generation and mounting - Template and static file management - FastAPI integration - + This is the main public API entry point. """ - + def __init__(self, config: AdminConfig) -> None: """ Initialize AdminSite with configuration. - + Args: config: AdminConfig instance with all settings """ self.config = config self.registry = get_registry() self._initialized = False - self._templates: Optional[Jinja2Templates] = None - self._router_factory: Optional[AdminRouterFactory] = None - + self._templates: Jinja2Templates | None = None + self._router_factory: AdminRouterFactory | None = None + # Validate configuration self._validate_config() - + def _validate_config(self) -> None: """Validate AdminConfig settings.""" if not self.config.database_url: raise ValueError("database_url is required in AdminConfig") - + if not self.config.secret_key: raise ValueError("secret_key is required in AdminConfig") - + if self.config.user_model is None: raise ValueError("user_model is required in AdminConfig") - + # Validate user model from .auth.models import validate_user_model validate_user_model(self.config.user_model) - + def register( - self, - model: Type[Any], - model_admin_class: Optional[Type[ModelAdmin]] = None + self, + model: type[Any], + model_admin_class: type[ModelAdmin] | None = None ) -> None: """ Register a model with the admin site. - + Args: model: SQLAlchemy model class to register model_admin_class: Optional ModelAdmin subclass for configuration - + Raises: ValueError: If model is invalid or already registered """ if model_admin_class is not None and not issubclass(model_admin_class, ModelAdmin): raise ValueError("model_admin_class must be a subclass of ModelAdmin") - + # Register with the global registry self.registry.register(model, model_admin_class) - + def mount(self, app: FastAPI, prefix: str = "/admin") -> None: """ Mount admin interface to FastAPI application. - + Args: app: FastAPI application instance prefix: URL prefix for admin routes (default: "/admin") """ if self._initialized: raise RuntimeError("AdminSite already mounted to an application") - + # Initialize components self._initialize_components() - + # Create main admin router admin_router = self._create_admin_router(prefix) - + # Mount static files self._mount_static_files(app, prefix) - + # Include admin router app.include_router(admin_router) - + self._initialized = True - + def _initialize_components(self) -> None: """Initialize all admin components.""" # Initialize database initialize_engine(self.config) initialize_session_manager() - + # Ensure admin tables exist from .database.admin_tables import create_admin_tables from .database.engine import get_engine create_admin_tables(get_engine()) - + # Initialize security initialize_security(self.config) - + # Initialize templates self._initialize_templates() - + # Initialize router factory self._router_factory = AdminRouterFactory(self.config, self._templates) - + def _initialize_templates(self) -> None: """Initialize Jinja2 templates.""" # Get template directory @@ -143,26 +143,26 @@ def _initialize_templates(self) -> None: # Use built-in templates package_dir = Path(__file__).parent template_dir = package_dir / "templates" - + self._templates = Jinja2Templates(directory=str(template_dir)) - + # Add global template functions self._templates.env.globals.update({ "admin_config": self.config, }) - + def _create_admin_router(self, prefix: str) -> APIRouter: """ Create main admin router with all endpoints. - + Args: prefix: URL prefix for admin routes - + Returns: FastAPI router with all admin endpoints """ router = APIRouter(prefix=prefix, tags=["admin"]) - + # Dashboard endpoint @router.get("/", response_class=HTMLResponse, name="admin_dashboard") async def dashboard( @@ -170,25 +170,24 @@ async def dashboard( ) -> HTMLResponse: """Admin dashboard page.""" # Check authentication manually - from .auth.routes import get_current_user - from .database.session import get_session from .auth.activity import get_recent_activities - + from .database.session import get_session + # Get session and check auth session_gen = get_session() db_session = next(session_gen) user = get_current_user(request, self.config, db_session) - + try: # Redirect to login if not authenticated if user is None: return RedirectResponse(url=f"{prefix}/login", status_code=302) - + # Get registered models for navigation registered_models = [] for model_class, model_admin_class in self.registry.get_registered_models().items(): model_admin = model_admin_class(model_class) - + # Check if user has view permission if model_admin.has_view_permission(user): registered_models.append({ @@ -197,10 +196,10 @@ async def dashboard( 'name_plural': f"{model_class.__name__}s", 'url': f"{prefix}/{model_class.__name__.lower()}/", }) - + # Get recent activities for the dashboard recent_activities = get_recent_activities(db_session, limit=10) - + context = { "request": request, "title": "Admin Dashboard", @@ -208,7 +207,7 @@ async def dashboard( "user": user, "recent_activities": recent_activities, } - + return self._templates.TemplateResponse("dashboard.html", context) finally: # Close session properly @@ -216,23 +215,23 @@ async def dashboard( next(session_gen) except StopIteration: pass - + # Include authentication routes auth_router = create_auth_router(self.config, self._templates) router.include_router(auth_router) - + # Create routes for each registered model for model_class, model_admin_class in self.registry.get_registered_models().items(): model_admin = model_admin_class(model_class) model_router = self._router_factory.create_model_router(model_class, model_admin) router.include_router(model_router) - + return router - + def _mount_static_files(self, app: FastAPI, prefix: str) -> None: """ Mount static files for admin interface. - + Args: app: FastAPI application prefix: URL prefix for admin routes @@ -240,50 +239,50 @@ def _mount_static_files(self, app: FastAPI, prefix: str) -> None: # Get static directory package_dir = Path(__file__).parent static_dir = package_dir / "static" - + if static_dir.exists(): app.mount( f"{prefix}/static", StaticFiles(directory=str(static_dir)), name="admin_static" ) - - def get_registered_models(self) -> Dict[Type[Any], Type[ModelAdmin]]: + + def get_registered_models(self) -> dict[type[Any], type[ModelAdmin]]: """ Get all registered models and their admin classes. - + Returns: Dictionary mapping model classes to ModelAdmin classes """ return self.registry.get_registered_models() - - def is_registered(self, model: Type[Any]) -> bool: + + def is_registered(self, model: type[Any]) -> bool: """ Check if a model is registered. - + Args: model: Model class to check - + Returns: True if model is registered """ return self.registry.is_registered(model) - - def get_model_admin(self, model: Type[Any]) -> ModelAdmin: + + def get_model_admin(self, model: type[Any]) -> ModelAdmin: """ Get ModelAdmin instance for a registered model. - + Args: model: Registered model class - + Returns: ModelAdmin instance - + Raises: ValueError: If model is not registered """ if not self.is_registered(model): raise ValueError(f"Model {model.__name__} is not registered") - + model_admin_class = self.registry.get_model_admin(model) - return model_admin_class(model) \ No newline at end of file + return model_admin_class(model) diff --git a/pyproject.toml b/pyproject.toml index 8c5df34..1a21554 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,23 +7,23 @@ name = "internal-admin" version = "0.1.0" description = "A reusable, installable administrative framework for FastAPI applications" readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10" license = {file = "LICENSE"} authors = [ - {name = "Internal Admin Team"}, + {name = "Ayah Austine"}, ] keywords = ["fastapi", "admin", "sqlalchemy", "crud"] classifiers = [ - "Development Status :: 3 - Alpha", + "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Framework :: FastAPI", + "Topic :: Internet :: WWW/HTTP :: WSGI :: Application", + "Topic :: Software Development :: Libraries :: Application Frameworks", ] dependencies = [ @@ -41,18 +41,19 @@ dependencies = [ dev = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", + "pytest-cov>=4.0.0", "httpx>=0.24.0", "black>=23.0.0", "isort>=5.12.0", - "flake8>=6.0.0", + "ruff>=0.3.0", "mypy>=1.0.0", "pre-commit>=3.0.0", + "build>=1.0.0", + "python-semantic-release>=9.0.0", + "uvicorn>=0.20.0", ] postgresql = [ - "psycopg[binary]>=3.1.0" -] -sqlite = [ - "aiosqlite>=0.19.0" + "psycopg[binary]>=3.1.0", ] [project.scripts] @@ -63,54 +64,122 @@ Homepage = "https://github.com/ayahaustine/internal-admin" Documentation = "https://internal-admin.readthedocs.io" Repository = "https://github.com/ayahaustine/internal-admin.git" "Bug Tracker" = "https://github.com/ayahaustine/internal-admin/issues" +Changelog = "https://github.com/ayahaustine/internal-admin/blob/main/CHANGELOG.md" + +# --------------------------------------------------------------------------- +# Build +# --------------------------------------------------------------------------- [tool.hatch.build.targets.wheel] packages = ["internal_admin"] +[tool.hatch.build.targets.sdist] +include = [ + "/internal_admin", + "/tests", + "/CHANGELOG.md", + "/README.md", + "/LICENSE", +] + +# --------------------------------------------------------------------------- +# Linting / formatting +# --------------------------------------------------------------------------- + [tool.black] line-length = 88 -target-version = ['py38'] -include = '\.pyi?$' -extend-exclude = ''' -/( - # directories - \.eggs - | \.git - | \.hg - | \.mypy_cache - | \.tox - | \.venv - | _build - | buck-out - | build - | dist -)/ -''' +target-version = ["py310"] [tool.isort] profile = "black" multi_line_output = 3 line_length = 88 +[tool.ruff] +line-length = 88 +target-version = "py310" + +[tool.ruff.lint] +select = ["E", "F", "W", "I", "UP", "B", "C4"] +ignore = [ + "E501", + "B008", # FastAPI Depends() in function defaults is the standard pattern +] + +# --------------------------------------------------------------------------- +# Type checking +# --------------------------------------------------------------------------- + [tool.mypy] -python_version = "3.8" +python_version = "3.10" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true disallow_incomplete_defs = true check_untyped_defs = true -disallow_untyped_decorators = true no_implicit_optional = true warn_redundant_casts = true warn_unused_ignores = true -warn_no_return = true -warn_unreachable = true strict_equality = true +# Third-party stubs not available for all deps — ignore missing +ignore_missing_imports = true + +# --------------------------------------------------------------------------- +# Testing +# --------------------------------------------------------------------------- [tool.pytest.ini_options] testpaths = ["tests"] python_files = "test_*.py" python_classes = "Test*" python_functions = "test_*" +addopts = "-v --tb=short --strict-markers" +asyncio_mode = "auto" + +[tool.coverage.run] +source = ["internal_admin"] +omit = ["tests/*", "demo_web.py", "demo.py", "example.py"] + +[tool.coverage.report] +show_missing = true +skip_covered = false +fail_under = 70 + +# --------------------------------------------------------------------------- +# Semantic release +# --------------------------------------------------------------------------- + +[tool.semantic_release] +version_toml = ["pyproject.toml:project.version"] +version_variables = ["internal_admin/__init__.py:__version__"] +branch = "main" +changelog_file = "CHANGELOG.md" +build_command = "pip install build && python -m build" +dist_path = "dist/" +upload_to_vcs_release = true +upload_to_pypi = true +commit_message = "chore(release): {version} [skip ci]" +tag_format = "v{version}" +major_on_zero = false + +[tool.semantic_release.changelog] +template_dir = "" +changelog_file = "CHANGELOG.md" +exclude_commit_patterns = [ + "^chore\\(release\\):", + "^Merge pull request", + "^Merge branch", +] + +[tool.semantic_release.branches.main] +match = "main" +prerelease = false + +[tool.semantic_release.commit_parser_options] +allowed_tags = ["feat", "fix", "perf", "refactor", "docs", "style", "test", "chore", "ci", "build"] +minor_tags = ["feat"] +patch_tags = ["fix", "perf", "refactor"] +python_classes = "Test*" +python_functions = "test_*" addopts = "-v --tb=short" asyncio_mode = "auto" \ No newline at end of file