diff --git a/.gitignore b/.gitignore index 40dfc85..a97294d 100644 --- a/.gitignore +++ b/.gitignore @@ -111,6 +111,7 @@ celerybeat.pid *.sql.gz *.dump *.backup +backup* # ========================= # Docker diff --git a/accounts/management/__init__.py b/accounts/management/__init__.py new file mode 100644 index 0000000..7f37fec --- /dev/null +++ b/accounts/management/__init__.py @@ -0,0 +1,4 @@ +"""Commands suite for accounts app. + +This package contains management commands for accounts application. +""" diff --git a/accounts/management/commands/__init__.py b/accounts/management/commands/__init__.py new file mode 100644 index 0000000..7f37fec --- /dev/null +++ b/accounts/management/commands/__init__.py @@ -0,0 +1,4 @@ +"""Commands suite for accounts app. + +This package contains management commands for accounts application. +""" diff --git a/accounts/management/commands/generate_test_users.py b/accounts/management/commands/generate_test_users.py new file mode 100644 index 0000000..d399182 --- /dev/null +++ b/accounts/management/commands/generate_test_users.py @@ -0,0 +1,322 @@ +""" +Management command: create test users for development and delete users in a marker group. + +This command has two main modes: + +1. Creation mode (default) + - Creates test users with configurable parameters: --count, --prefix, --start, + --email-domain, --password, and flags --staff / --superuser / --inactive. + - Adds created users to a marker group (default: "Test_Users") so they can be + identified and removed later. + - Supports --force to create users even when the plain username exists + (a short random suffix is appended in that case). + - Supports --dry-run to preview actions without mutating the database. + +2. Deletion mode (--delete) + - Deletes users who are members of the configured marker group (default: "Test_Users"). + - Excludes staff and superusers from deletion if the user model supports those flags. + - Shows matched users, supports --dry-run, and requires interactive confirmation + by default (use --noinput to skip confirmation). + +Examples: + # Create 3 users testuser1..testuser3 + python manage.py generate_test_users --count 3 --prefix testuser + + # Delete all users in Test_Users group without prompt + python manage.py generate_test_users --delete --noinput +""" +import argparse +from typing import List, Optional +from django.core.management.base import BaseCommand, CommandError +from django.contrib.auth import get_user_model +from django.contrib.auth.models import Group +from django.db import transaction + +User = get_user_model() + + +class Command(BaseCommand): + """CLI wrapper for creating test users and deleting users in a marker group. + + Responsibilities: + - Parse command-line options and delegate to helper methods. + - Keep interactive I/O (prompts and formatted output) centralized. + + The heavy lifting is done in the private helpers `_handle_create` and + `_handle_delete` which are easier to test in isolation. + """ + + help = "Create test users or delete all users in the marker group (use --delete)." + + def add_arguments(self, parser): + """Register the command-line arguments.""" + parser.add_argument( + "--count", + "-c", + type=int, + default=1, + help="Number of users to create (default: 1).", + ) + parser.add_argument( + "--prefix", + "-p", + type=str, + default="testuser", + help='Username prefix (default: "testuser").', + ) + parser.add_argument( + "--start", + type=int, + default=1, + help="Starting index appended to username (default: 1).", + ) + parser.add_argument( + "--email-domain", + type=str, + default="example.com", + help="Email domain for generated users.", + ) + parser.add_argument( + "--password", + type=str, + default="test_password", + help="Password for created users.", + ) + parser.add_argument( + "--staff", + action="store_true", + help="Mark created users as staff.", + ) + parser.add_argument( + "--superuser", + action="store_true", + help="Create superuser(s).", + ) + parser.add_argument( + "--inactive", + action="store_true", + help="Create users with is_active=False.", + ) + parser.add_argument( + "--force", + action="store_true", + help="Append random suffix if username exists.", + ) + parser.add_argument( + "--delete", + action="store_true", + help="Delete users in marker group instead of creating.", + ) + parser.add_argument( + "--marker-group", + type=str, + default="Test_Users", + help='Group name used to mark generated users.', + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview actions without making DB changes.", + ) + parser.add_argument( + "--noinput", + action="store_true", + help="Do not prompt for confirmation when deleting.", + ) + + # ---- helpers ---- + def _make_username(self, prefix: str, idx: int) -> str: + """Return a username built from prefix and index (e.g. 'testuser3').""" + return f"{prefix}{idx}" + + def _email_for_username(self, username: str, domain: str) -> str: + """Return a simple email address for the given username and domain.""" + return f"{username}@{domain}" + + @staticmethod + def _random_suffix(length: int = 4) -> str: + """Return a short random alphanumeric suffix for collision avoidance.""" + import random, string + chars = string.ascii_lowercase + string.digits + return "".join(random.choice(chars) for _ in range(length)) + + def _handle_delete(self, marker_group_name: str, dry_run: bool, noinput: bool) -> None: + """Delete non-staff/non-superuser users who belong to the marker group. + + Behavior: + - Prints matched users and total count. + - If dry_run is True, only prints what would be deleted. + - If noinput is False, prompts interactively before deletion. + - Uses a single transaction to perform deletions atomically. + - Collects and reports failures without hiding them. + """ + try: + group = Group.objects.get(name=marker_group_name) + except Group.DoesNotExist: + self.stdout.write(self.style.WARNING( + f"Marker group '{marker_group_name}' does not exist. Nothing to delete." + )) + return + + qs = User.objects.filter(groups__name=marker_group_name) + + # Exclude privileged accounts if those attributes exist + if hasattr(User, "is_staff"): + qs = qs.exclude(is_staff=True) + if hasattr(User, "is_superuser"): + qs = qs.exclude(is_superuser=True) + + total = qs.count() + if total == 0: + self.stdout.write(self.style.WARNING("No non-staff/non-superuser users found in marker group.")) + return + + self.stdout.write(self.style.WARNING(f"Matched users for deletion (group='{marker_group_name}'): {total}")) + for u in qs: + parts = [f"username='{getattr(u, 'username', '')}'"] + if getattr(u, "email", None): + parts.append(f"email='{u.email}'") + self.stdout.write(" - " + " ".join(parts)) + + if dry_run: + self.stdout.write(self.style.WARNING("Dry run: no users were deleted.")) + return + + if not noinput: + answer = input("Delete all listed users? This is irreversible. [y/N]: ") + if answer.lower() not in ("y", "yes"): + self.stdout.write(self.style.WARNING("Aborted by user.")) + return + + deleted = 0 + failed = [] + try: + with transaction.atomic(): + for u in qs: + try: + u.delete() + deleted += 1 + except Exception as exc: + failed.append((u, exc)) + self.stdout.write(self.style.SUCCESS(f"Deleted {deleted} users from group '{marker_group_name}'.")) + + if failed: + self.stdout.write(self.style.ERROR(f"{len(failed)} deletions failed:")) + for u, exc in failed: + self.stdout.write(f" - {getattr(u, 'username', '')}: {exc}") + except Exception as exc_outer: + raise CommandError(f"Deletion transaction failed: {exc_outer}") + + def _handle_create( + self, + options, + dry_run: bool, + marker_group_name: str, + ) -> None: + """Create multiple users and add them to the marker group. + + Behavior: + - Respects `force` to append a random suffix when a plain username exists. + - Uses get_or_create semantics for the marker group (created if absent). + - Adds users to the marker group when possible; reports warnings on failure. + - Prints a success message per created user and a final summary. + """ + created: List[User] = [] + + count: int = int(options.get("count", 1)) + prefix: str = options.get("prefix") or "testuser" + start: int = int(options.get("start", 1)) + email_domain: str = options.get("email_domain") or "example.com" + password: str = options.get("password") or "test_password" + make_staff: bool = bool(options.get("staff")) + make_superuser: bool = bool(options.get("superuser")) + inactive: bool = bool(options.get("inactive")) + force: bool = bool(options.get("force")) + + group_obj: Optional[Group] = None + try: + group_obj, _ = Group.objects.get_or_create(name=marker_group_name) + except Exception: + group_obj = None + + for i in range(start, start + count): + username = self._make_username(prefix, i) + email = self._email_for_username(username, email_domain) + + if User.objects.filter(username=username).exists(): + if not force: + self.stdout.write(self.style.WARNING(f"Skipping existing username: {username}")) + continue + username = f"{username}_{self._random_suffix()}" + email = self._email_for_username(username, email_domain) + try: + self.stdout.write(self.style.NOTICE(f"Username existed; using fallback username: {username}")) + except Exception: + # Some Django versions may not provide NOTICE style + self.stdout.write(f"NOTICE: Username existed; using fallback username: {username}") + + if dry_run: + self.stdout.write( + f"[DRY RUN] Would create username='{username}', email='{email}', staff={make_staff}, superuser={make_superuser}, active={not inactive}" + ) + continue + + if make_superuser: + user = User.objects.create_superuser(username=username, email=email, password=password) # type: ignore[attr-defined] + try: + user.is_staff = True + user.is_superuser = True + except Exception: + pass + else: + user = User.objects.create_user(username=username, email=email, password=password) # type: ignore[attr-defined] + try: + user.is_staff = bool(make_staff) + user.is_superuser = False + except Exception: + pass + + try: + user.is_active = not bool(inactive) + except Exception: + pass + + try: + if group_obj is not None and hasattr(user, "groups"): + user.groups.add(group_obj) + except Exception: + self.stdout.write(self.style.WARNING(f"Warning: couldn't add user '{username}' to group '{marker_group_name}'")) + + user.save() + created.append(user) + self.stdout.write(self.style.SUCCESS(f"Created user: username='{username}' email='{email}'")) + + # final summary (SQL_TABLE may not exist in all versions, fall back if needed) + try: + self.stdout.write(self.style.SQL_TABLE(f"Total users created: {len(created)}")) + except Exception: + self.stdout.write(f"Total users created: {len(created)}") + + # ---- entry point ---- + def handle(self, *args, **options): + """Parse CLI options and dispatch to the create or delete handler. + + This method is intentionally short: it validates and extracts options + and then delegates functionality to `_handle_delete` or `_handle_create`. + """ + dry_run: bool = options.get("dry_run", False) + marker_group_name: str = options.get("marker_group") or "Test_Users" + + # Deletion mode + if options.get("delete"): + self._handle_delete(marker_group_name=marker_group_name, dry_run=dry_run, noinput=options.get("noinput", False)) + return + + # Creation mode: collect options and delegate + + + self._handle_create( + options=options, + dry_run=dry_run, + marker_group_name=marker_group_name, + ) diff --git a/accounts/tests/__init__.py b/accounts/tests/__init__.py index 2c59631..a939001 100644 --- a/accounts/tests/__init__.py +++ b/accounts/tests/__init__.py @@ -1,4 +1,4 @@ -"""Test suite for game app. +"""Test suite for accounts app. This package contains comprehensive tests for all accounts-related models, """ diff --git a/accounts/tests/test_generate_test_users.py b/accounts/tests/test_generate_test_users.py new file mode 100644 index 0000000..5e2a0f2 --- /dev/null +++ b/accounts/tests/test_generate_test_users.py @@ -0,0 +1,106 @@ +""" +Tests for accounts.generate_test_users management command. + +This module exercises the `generate_test_users` management command by: +- creating a set of test users with a given prefix and marker group, +- asserting the expected count changes, +- verifying that deletion via the same command removes the created users, +- and checking conflict/force behavior and group membership. +""" + +from django.contrib.auth import get_user_model +from django.contrib.auth.models import Group +from django.core.management import call_command +import pytest + +User = get_user_model() + + +@pytest.mark.django_db +def test_generate_test_users_creates_and_adds_group(user_factory): + """Create a small batch of test users and verify they are added to a group. + + The test records the baseline set of users with the given prefix, runs the + command to create `test_users_count` users, asserts the user count grew by + that amount, and then deletes those users using the command's `--delete` + flag and verifies cleanup. + """ + + # Configuration for this test (easy to change in one place) + test_users_count = 3 + prefix = "test_" + marker_group = "Test_Users" + + # Baseline + before = list(User.objects.filter(username__startswith=prefix).order_by("id")) + + # Create test users + call_command( + "generate_test_users", + "--count", + str(test_users_count), + "--prefix", + prefix, + "--start", + "1", + "--marker-group", + marker_group, + ) + + after = list(User.objects.filter(username__startswith=prefix).order_by("id")) + assert len(after) >= len(before) + test_users_count + + # Ensure marker group exists and some of the created users are in it + group = Group.objects.filter(name=marker_group).first() + assert group is not None, "Expected the marker group to be created" + assert group.user_set.filter(username__startswith=prefix).exists() + + # Clean up via the same command (non-interactive) + call_command("generate_test_users", "--delete", "--marker-group", marker_group, "--noinput") + + remaining = list(User.objects.filter(username__startswith=prefix).order_by("id")) + # After deletion, remaining should be <= before + assert len(remaining) <= len(before) + + +@pytest.mark.django_db +def test_generate_test_users_force_and_conflict(user_factory): + """Test behavior when existing usernames conflict with the generated prefix. + + The test creates an existing user whose username would conflict with the + generated users' prefix, then runs the command with `--force` and asserts + that at least one user with that prefix exists and that the marker group + contains at least one such user (i.e. the command created or assigned a + user to the group). + """ + + prefix = "conflictuser" + marker_group = "Test_Users" + + # Create a user that would share the prefix + u = user_factory(username=prefix) + + before_count = User.objects.filter(username__startswith=prefix).count() + + # Create with same prefix and start so a conflict may occur, pass --force + call_command( + "generate_test_users", + "--count", + "1", + "--prefix", + prefix, + "--start", + "1", + "--force", + "--marker-group", + marker_group, + ) + + after_count = User.objects.filter(username__startswith=prefix).count() + # Ensure count did not decrease and (ideally) increased by at least 1 + assert after_count > before_count + + # Ensure marker group exists and contains at least one user with the prefix + group = Group.objects.filter(name=marker_group).first() + assert group is not None, "Expected the marker group to be created" + assert group.user_set.filter(username__startswith=prefix).exists() diff --git a/chat/management/__init__.py b/chat/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chat/management/commands/__init__.py b/chat/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/game/management/__init__.py b/game/management/__init__.py new file mode 100644 index 0000000..6547108 --- /dev/null +++ b/game/management/__init__.py @@ -0,0 +1,4 @@ +"""Commands suite for game app. + +This package contains management commands for game application. +""" diff --git a/game/management/commands/__init__.py b/game/management/commands/__init__.py new file mode 100644 index 0000000..6547108 --- /dev/null +++ b/game/management/commands/__init__.py @@ -0,0 +1,4 @@ +"""Commands suite for game app. + +This package contains management commands for game application. +""" diff --git a/game/management/commands/export_db.py b/game/management/commands/export_db.py new file mode 100644 index 0000000..f65a191 --- /dev/null +++ b/game/management/commands/export_db.py @@ -0,0 +1,280 @@ +""" +Export database rows to a JSON file (Django serialization) with optional apps filter. + +This management command streams model instances into a JSON array in chunks to +avoid building one huge list in memory. It supports filtering exported models +by app label (--apps), excluding particular models (--exclude), pretty JSON +(--indent), natural key serialization, gzipped output and configurable +chunk sizes. + +Design notes +- The `handle` entry point is small and delegates to helper methods for I/O + and per-model streaming to follow single-responsibility and make unit + testing easier. +- The command avoids loading entire querysets into memory by iterating and + flushing chunks. + +Examples + python manage.py export_db + python manage.py export_db --apps game,auth --indent 2 -o backups/backup.json.gz +""" + +import os +import gzip +from typing import Optional, Set, List, Callable, Tuple + +from django.core.management.base import BaseCommand, CommandError +from django.apps import apps +from django.conf import settings +from django.core import serializers + + +EXCLUDE_MODEL_NAMES = { + "ContentType", + "Session", +} + + +class Command(BaseCommand): + """Export database rows to JSON using Django serializers. + + The implementation keeps `handle()` concise by delegating tasks to + small helpers like `_resolve_output_path`, `_open_output_stream`, and + `_serialize_chunk_and_write`. + """ + + help = "Export database rows to a JSON file (Django serialization). Supports --apps filter." + + def add_arguments(self, parser): + parser.add_argument( + "--output", + "-o", + help=( + "Output file path (relative paths are created inside BASE_DIR). " + "Use '-' for stdout. Use .gz to gzip." + ), + default="db_backups/backup.json", + ) + parser.add_argument( + "--apps", + help="Comma-separated app labels to export (e.g. 'auth,game'). If omitted exports all apps.", + default=None, + ) + parser.add_argument( + "--exclude", + help="Comma-separated model names to exclude (ModelName or app_label.ModelName).", + default="", + ) + parser.add_argument( + "--indent", + type=int, + default=None, + help="JSON indent level (pass an integer). If omitted output is compact (single line).", + ) + parser.add_argument( + "--natural-foreign", + action="store_true", + help="Use natural foreign keys when serializing (if supported by models).", + ) + parser.add_argument( + "--natural-primary", + action="store_true", + help="Use natural primary keys when serializing (if supported).", + ) + parser.add_argument( + "--chunk-size", + type=int, + default=1000, + help="Number of objects to serialize per chunk (memory / performance tuning).", + ) + + # ------------------------- + # Small helpers + # ------------------------- + def _parse_apps_arg(self, apps_arg: Optional[str]) -> Optional[Set[str]]: + if not apps_arg: + return None + return {p.strip() for p in apps_arg.split(",") if p.strip()} + + def _resolve_output_path(self, path: str) -> str: + base = getattr(settings, "BASE_DIR", os.getcwd()) + if not os.path.isabs(path): + path = os.path.join(base, path) + directory = os.path.dirname(path) + if directory: + os.makedirs(directory, exist_ok=True) + return path + + def models_to_export(self, apps_filter: Optional[Set[str]], exclude_set: Set[str]) -> List[type]: + all_models = list(apps.get_models()) + models_to_export: List[type] = [] + for m in all_models: + full_name = f"{m._meta.app_label}.{m.__name__}" + if m.__name__ in EXCLUDE_MODEL_NAMES or full_name in exclude_set or m.__name__ in exclude_set: + continue + if apps_filter is not None and m._meta.app_label not in apps_filter: + continue + models_to_export.append(m) + return models_to_export + + def _open_output_stream(self, output: str) -> Tuple[Callable[[str], None], Callable[[], None], str]: + """Return (write_chunk, close_fn, output_display_name). + + write_chunk is a callable that accepts a string and writes to the + destination. close_fn closes the underlying file object when used. + output_display_name is used in the final status message. + """ + write_to_stdout = (output == "-") + if write_to_stdout: + write_chunk = lambda s: self.stdout.write(s) + close_fn = lambda: None + return write_chunk, close_fn, "stdout" + + # Ensure path exists and open the file handle + output_path = self._resolve_output_path(output) + if output_path.endswith(".gz"): + fh = gzip.open(output_path, "wt", encoding="utf-8") + else: + fh = open(output_path, "w", encoding="utf-8") + + def _write(s: str): + fh.write(s) + + def _close(): + try: + fh.close() + except Exception: + pass + + return _write, _close, output_path + + def _serialize_chunk_and_write( + self, + chunk: List[object], + indent: Optional[int], + use_nat_foreign: bool, + use_nat_primary: bool, + write_chunk: Callable[[str], None], + separator: str, + first_piece: bool, + compact: bool, + ) -> bool: + """Serialize a chunk and write its inner JSON array items to the stream. + + Returns True if any items were written (so caller can update first_piece). + """ + if not chunk: + return False + + serialized = serializers.serialize( + "json", + chunk, + indent=indent, + use_natural_foreign_keys=use_nat_foreign, + use_natural_primary_keys=use_nat_primary, + ) + start = serialized.find('[') + end = serialized.rfind(']') + if start == -1 or end == -1: + # Fallback: write whole serialization + inner = serialized + else: + inner = serialized[start + 1 : end] + + if indent is not None: + inner = inner.lstrip('\n').rstrip('\n') + if inner: + if not first_piece: + write_chunk(separator) + write_chunk(inner) + return True + return False + else: + inner = inner.strip() + if inner: + if not first_piece: + write_chunk(separator) + write_chunk(inner) + return True + return False + + # ------------------------- + # Main handler (delegates heavily) + # ------------------------- + def handle(self, *args, **options): + """Main command entry: collect models and stream them to JSON output.""" + output = options["output"] + apps_arg = options["apps"] + exclude_arg = options["exclude"] + indent = options["indent"] + use_nat_foreign = options["natural_foreign"] + use_nat_primary = options["natural_primary"] + chunk_size = options["chunk_size"] + + apps_filter = self._parse_apps_arg(apps_arg) + exclude_set = {x.strip() for x in exclude_arg.split(",") if x.strip()} if exclude_arg else set() + + models_to_export = self.models_to_export(apps_filter, exclude_set) + if not models_to_export: + raise CommandError("No models found to export (check --apps and --exclude).") + + write_chunk, close_fh, output_display = self._open_output_stream(output) + + total_objects = 0 + first_piece = True + + try: + # Prepare JSON array delimiters + if indent is not None: + write_chunk("[\n") + separator = ",\n" + closing = "\n]\n" + else: + write_chunk("[") + separator = "," + closing = "]" + + # Stream objects model-by-model in chunks + for model in models_to_export: + qs = model._default_manager.all().iterator() + chunk: List[object] = [] + for obj in qs: + chunk.append(obj) + if len(chunk) >= chunk_size: + wrote = self._serialize_chunk_and_write( + chunk, + indent, + use_nat_foreign, + use_nat_primary, + write_chunk, + separator, + first_piece, + compact=(indent is None), + ) + if wrote: + first_piece = False + total_objects += len(chunk) + chunk = [] + + # flush remaining + if chunk: + wrote = self._serialize_chunk_and_write( + chunk, + indent, + use_nat_foreign, + use_nat_primary, + write_chunk, + separator, + first_piece, + compact=(indent is None), + ) + if wrote: + first_piece = False + total_objects += len(chunk) + + # Finish JSON + write_chunk(closing) + finally: + close_fh() + + self.stdout.write(self.style.SUCCESS(f"Exported {total_objects} objects to {output_display}")) diff --git a/game/management/commands/generate_fake_games.py b/game/management/commands/generate_fake_games.py new file mode 100644 index 0000000..60e880e --- /dev/null +++ b/game/management/commands/generate_fake_games.py @@ -0,0 +1,524 @@ +""" +Generate synthetic Durak game data for UI & statistics testing. + +This command builds realistic game rows (lobbies, games, game players, hands, +game deck, turns, table cards, moves, and discard piles). It prefers to create +test users by invoking the `generate_test_users` management command (which you +said lives in the `accounts` app). The command will call that management +command with a marker group (`Test_Users`) so created accounts are easy and +safe to delete later. + +Usage: + python manage.py generate_fake_games --games 3 --players 4 --moves 30 --card-count 36 +""" + +import itertools +import random +import re +import uuid +from typing import List, Optional + +from django.core.management import call_command +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction +from django.utils import timezone +from django.contrib.auth.models import Group +from django.contrib.auth import get_user_model + +from game.models import ( + CardSuit, + CardRank, + Card, + Lobby, + LobbySettings, + LobbyPlayer, + Game, + GamePlayer, + GameDeck, + PlayerHand, + TableCard, + DiscardPile, + Turn, + Move, +) + +User = get_user_model() + +DEFAULT_RANKS_52 = [ + ("2", 2), + ("3", 3), + ("4", 4), + ("5", 5), + ("6", 6), + ("7", 7), + ("8", 8), + ("9", 9), + ("10", 10), + ("Jack", 11), + ("Queen", 12), + ("King", 13), + ("Ace", 14), +] + +DEFAULT_RANKS_36 = [r for r in DEFAULT_RANKS_52 if r[1] >= 6] +DEFAULT_RANKS_24 = [r for r in DEFAULT_RANKS_52 if r[1] >= 9] + + +class Command(BaseCommand): + """Management command to generate fake Durak games. + + The command produces decks, deals hands, creates games and simulates a + sequence of turns with Move rows (attack/defend/pickup). For test user + creation it prefers to call the `generate_test_users` command (from the + `accounts` app) with marker group `Test_Users`. If that call fails the + command falls back to inline user creation to avoid blocking generation. + + Options: + --games: Number of games to generate (default 5) + --players: Players per game (default 4) + --moves: Approximate number of moves per game (default 20) + --card-count: Deck size (24, 36 or 52, default 36) + --seed: Optional random seed + --reset: Delete generated lobbies/games and fake users (prefix 'fake_user_') + """ + + help = "Generate sample Durak games with move history for UI & statistics testing." + + def add_arguments(self, parser): + """Add command-line arguments. + + Args: + parser: Argument parser provided by Django. + """ + parser.add_argument("--games", type=int, default=5, help="Number of games to generate (default: 5)") + parser.add_argument("--players", type=int, default=4, help="Players per game (2-8 recommended, default: 4)") + parser.add_argument("--moves", type=int, default=20, help="Approx number of moves per game (default: 20)") + parser.add_argument( + "--card-count", + type=int, + choices=[24, 36, 52], + default=52, + help="Deck size per lobby (24, 36, or 52). Default: 52", + ) + parser.add_argument("--seed", type=int, default=None, help="Random seed for reproducible runs") + parser.add_argument( + "--reset", + action="store_true", + default=False, + help="Delete generated lobbies/games and fake users (prefix 'fake_user_')", + ) + + def handle(self, *args, **options): + """Main entry point. + + Args: + *args: Positional args (unused). + **options: Parsed CLI options. + """ + games = options["games"] + players = options["players"] + approx_moves = options["moves"] + card_count = options["card_count"] + seed = options["seed"] + do_reset = options["reset"] + + if seed is not None: + random.seed(seed) + + if do_reset: + self._reset_generated_lobbies_and_users() + return + + if not (2 <= players <= 8): + self.stdout.write(self.style.WARNING("players outside normal range (2-8). Continuing anyway.")) + + self.stdout.write( + f"Generating {games} game(s), {players} players each, ~{approx_moves} moves per game, {card_count}-card decks..." + ) + + with transaction.atomic(): + self._ensure_suits_and_ranks(card_count) + self._ensure_cards(card_count) + + # estimate users needed: owner + players per game, reuse users when possible + users = self._ensure_users(max_needed=games * (players + 1)) + + # cycle through the users so we can reuse them across games if fewer provided + user_iter = itertools.cycle(users) + created_games = [] + + for g_idx in range(games): + game = self._create_fake_game( + players=players, + approx_moves=approx_moves, + card_count=card_count, + user_iter=user_iter, + game_index=g_idx + 1, + ) + created_games.append(str(game.id)) + self.stdout.write(self.style.SUCCESS(f"Created Game {game.id} in Lobby '{game.lobby.name}'")) + + self.stdout.write(self.style.SUCCESS(f"Done. Created {len(created_games)} games: {created_games}")) + + # --------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------- + def _reset_generated_lobbies_and_users(self): + """Remove lobbies that contain any player who belongs to the Test_Users group, + then remove non-staff/non-superuser users who are members of that group. + + Behavior: + - Find the Group named 'Test_Users'. If it doesn't exist, nothing to delete. + - Find all users in that group. + - Find all Lobby objects that have a LobbyPlayer referencing any of those users, + and delete those lobbies (and their related game rows). + - Delete users in the group, excluding staff and superusers if those flags exist. + """ + with transaction.atomic(): + # Resolve the marker group + try: + marker_group = Group.objects.get(name="Test_Users") + except Group.DoesNotExist: + self.stdout.write(self.style.NOTICE("Group 'Test_Users' does not exist. Nothing to reset.")) + return + + # All users who are members of the marker group + users_in_group = User.objects.filter(groups=marker_group) + + total_users = users_in_group.count() + if total_users == 0: + self.stdout.write(self.style.NOTICE("No users found in group 'Test_Users'. Nothing to reset.")) + return + + # Find lobby IDs that have at least one LobbyPlayer who is in that group + lobby_ids_qs = LobbyPlayer.objects.filter(user__in=users_in_group).values_list("lobby_id", + flat=True).distinct() + lobby_ids = list(lobby_ids_qs) + + if lobby_ids: + lobby_qs = Lobby.objects.filter(id__in=lobby_ids) + deleted_count, details = lobby_qs.delete() + self.stdout.write(self.style.WARNING( + f"Deleted {deleted_count} objects belonging to {len(lobby_ids)} lobby(ies) that had Test_Users members (details: {details})." + )) + else: + self.stdout.write(self.style.NOTICE("No lobbies found that contain users from group 'Test_Users'.")) + + # Now delete non-staff, non-superuser users who belong to the marker group + users_to_delete = users_in_group + if hasattr(User, "is_staff"): + users_to_delete = users_to_delete.exclude(is_staff=True) + if hasattr(User, "is_superuser"): + users_to_delete = users_to_delete.exclude(is_superuser=True) + + count_to_delete = users_to_delete.count() + if count_to_delete: + u_deleted_count, u_deleted_details = users_to_delete.delete() + self.stdout.write(self.style.WARNING( + f"Deleted {count_to_delete} user(s) from group 'Test_Users' (deleted objects: {u_deleted_count})." + )) + else: + self.stdout.write( + self.style.NOTICE("No non-staff/non-superuser users in group 'Test_Users' to delete.")) + + def _ensure_suits_and_ranks(self, card_count: int): + """Ensure CardSuit and CardRank rows exist. + + Args: + card_count: Number of cards in deck (24/36/52) to decide which ranks to create. + """ + suits = {"Hearts": "red", "Diamonds": "red", "Clubs": "black", "Spades": "black"} + for name, color in suits.items(): + CardSuit.objects.get_or_create(name=name, defaults={"color": color}) + + if card_count == 52: + ranks = DEFAULT_RANKS_52 + elif card_count == 36: + ranks = DEFAULT_RANKS_36 + else: + ranks = DEFAULT_RANKS_24 + + existing_values = set(CardRank.objects.values_list("value", flat=True)) + for name, val in ranks: + if val not in existing_values: + CardRank.objects.create(name=name, value=val) + + def _ensure_cards(self, card_count: int): + """Ensure Card objects exist for the requested deck size. + + Args: + card_count: Deck size (24/36/52). + """ + suits = list(CardSuit.objects.all()) + if card_count == 52: + ranks_qs = CardRank.objects.all() + elif card_count == 36: + ranks_qs = CardRank.objects.filter(value__gte=6) + else: + ranks_qs = CardRank.objects.filter(value__gte=9) + + ranks = list(ranks_qs) + created = 0 + for suit in suits: + for rank in ranks: + _, created_flag = Card.objects.get_or_create(suit=suit, rank=rank) + if created_flag: + created += 1 + if created: + self.stdout.write(self.style.NOTICE(f"Created {created} Card objects.")) + + def _ensure_users(self, max_needed: int) -> List[User]: + """Ensure at least `max_needed` users exist. + + The method prefers to call the `generate_test_users` management command + (from the accounts app). It requests creation with prefix `fake_user_` and + marker group `Test_Users`. If the call fails, it will fall back to inline + creation to ensure generation continues. + + Args: + max_needed: Minimum number of users required. + + Returns: + List of User instances (length >= max_needed). + """ + existing = list(User.objects.all().order_by("id")) + if len(existing) >= max_needed: + return existing[:max_needed] + + needed = max_needed - len(existing) + + prefix = "fake_user_" + existing_fake = list(User.objects.filter(username__startswith=prefix)) + max_index = 0 + for u in existing_fake: + m = re.search(rf'^{re.escape(prefix)}(\d+)$', u.username) + if m: + try: + idx = int(m.group(1)) + if idx > max_index: + max_index = idx + except Exception: + pass + start = max_index + 1 + + try: + # Ask the accounts app's generate_test_users command to create the missing users + call_command( + "generate_test_users", + "--count", + str(needed), + "--prefix", + prefix, + "--start", + str(start), + "--marker-group", + "Test_Users", + ) + self.stdout.write(self.style.NOTICE( + f"Requested {needed} test user(s) via generate_test_users (prefix={prefix}, start={start}).")) + except Exception as exc: + # Fallback inline creation + self.stdout.write( + self.style.WARNING(f"Calling generate_test_users failed: {exc}. Falling back to inline creation.")) + created_users = [] + for i in range(needed): + username = f"{prefix}{start + i}" + try: + user = User.objects.create_user(username=username, password="testpass") + except Exception: + #Try create without password method if custom user model differs + user = User.objects.create(username=username) + created_users.append(user) + self.stdout.write(self.style.NOTICE(f"Fallback created {len(created_users)} users.")) + all_users = existing + created_users + return all_users[:max_needed] + + all_users = list(User.objects.all().order_by("id")) + return all_users[:max_needed] + + def _draw_from_deck_list(self, deck_cards: List[Card]) -> Optional[Card]: + """Pop a card from a list representing the deck. + + Args: + deck_cards: Mutable list acting as the deck (front = top). + + Returns: + Card or None if deck is empty. + """ + if not deck_cards: + return None + return deck_cards.pop(0) + + def _create_fake_game(self, players: int, approx_moves: int, card_count: int, user_iter, game_index: int) -> Game: + """Create a single fake lobby/game and simulate play. + + Args: + players: Number of players in the created game. + approx_moves: Approximate number of attack/defend/pickup moves to simulate. + card_count: Deck size (24/36/52). + user_iter: Iterator that yields User objects (owner + players). + game_index: One-based index used for naming. + + Returns: + The created Game instance. + """ + owner_user = next(user_iter) + # NOTE: lobby name no longer uses 'Fake Lobby' prefix to avoid relying on names for cleanup. + lobby = Lobby.objects.create( + owner=owner_user, + name=f"Generated Lobby {uuid.uuid4().hex[:6]}", + is_private=False, + status="playing", + ) + + LobbySettings.objects.create( + lobby=lobby, + max_players=players, + card_count=card_count, + is_transferable=False, + neighbor_throw_only=False, + allow_jokers=False, + ) + + # Build list of Card objects representing the deck + if card_count == 52: + ranks_qs = CardRank.objects.all() + elif card_count == 36: + ranks_qs = CardRank.objects.filter(value__gte=6) + else: + ranks_qs = CardRank.objects.filter(value__gte=9) + + ranks = list(ranks_qs) + suits = list(CardSuit.objects.all()) + deck_cards: List[Card] = [] + for suit in suits: + for rank in ranks: + try: + deck_cards.append(Card.objects.get(suit=suit, rank=rank)) + except Card.DoesNotExist: + continue + + random.shuffle(deck_cards) + trump_card_obj = deck_cards[-1] if deck_cards else None + if trump_card_obj is None: + raise RuntimeError("No Card objects available to create a game.") + + game = Game.objects.create(lobby=lobby, trump_card=trump_card_obj, status="in_progress") + + # Persist deck into GameDeck model + for pos, card in enumerate(deck_cards, start=1): + GameDeck.objects.create(game=game, card=card, position=pos) + + # Create LobbyPlayer and GamePlayer entries + game_players: List[GamePlayer] = [] + for seat in range(1, players + 1): + user = next(user_iter) + LobbyPlayer.objects.create(lobby=lobby, user=user, status="playing") + gp = GamePlayer.objects.create(game=game, user=user, seat_position=seat, cards_remaining=0) + game_players.append(gp) + + # Helper to pop top GameDeck entry (DB-backed) + def pop_top_db_card(game_obj: Game) -> Optional[Card]: + top_qs = GameDeck.objects.filter(game=game_obj).order_by("position")[:1] + if not top_qs: + return None + gd = top_qs[0] + card = gd.card + gd.delete() + return card + + # Deal initial hands (6 cards typical) + initial_hand_size = 6 + for gp in game_players: + for idx in range(initial_hand_size): + card = pop_top_db_card(game) + if not card: + break + PlayerHand.objects.create(game=game, player=gp.user, card=card, order_in_hand=idx + 1) + gp.cards_remaining += 1 + gp.save(update_fields=["cards_remaining"]) + + # Simulate a sequence of turns and moves + current_attacker_index = 0 + turn_counter = 0 + discard_position = 1 + + def find_defense_card_for_player(defender_user, attack_card): + """Return a PlayerHand instance that can defend or None.""" + ph_qs = PlayerHand.objects.filter(game=game, player=defender_user) + for ph in ph_qs: + try: + if ph.card.can_beat(attack_card, game.get_trump_suit()): + return ph + except Exception: + return None + return None + + for _ in range(approx_moves): + turn_counter += 1 + attacker_gp = game_players[current_attacker_index % len(game_players)] + attacker_user = attacker_gp.user + turn = Turn.objects.create(game=game, player=attacker_user, turn_number=turn_counter) + + attacker_hand = list(PlayerHand.objects.filter(game=game, player=attacker_user).order_by("order_in_hand")) + if not attacker_hand: + current_attacker_index += 1 + continue + + attack_ph = attacker_hand[0] + attack_card = attack_ph.card + attack_ph.delete() + attacker_gp.cards_remaining = max(0, attacker_gp.cards_remaining - 1) + attacker_gp.save(update_fields=["cards_remaining"]) + + table_card = TableCard.objects.create(game=game, attack_card=attack_card) + Move.objects.create(turn=turn, table_card=table_card, action_type="attack", created_at=timezone.now()) + + defender_index = (current_attacker_index + 1) % len(game_players) + defender_gp = game_players[defender_index] + defender_user = defender_gp.user + + defense_ph = find_defense_card_for_player(defender_user, attack_card) + if defense_ph: + defense_card = defense_ph.card + table_card.defense_card = defense_card + table_card.save(update_fields=["defense_card"]) + defense_ph.delete() + defender_gp.cards_remaining = max(0, defender_gp.cards_remaining - 1) + defender_gp.save(update_fields=["cards_remaining"]) + + Move.objects.create(turn=turn, table_card=table_card, action_type="defend", created_at=timezone.now()) + + DiscardPile.objects.create(game=game, card=attack_card, position=discard_position) + discard_position += 1 + DiscardPile.objects.create(game=game, card=defense_card, position=discard_position) + discard_position += 1 + else: + Move.objects.create(turn=turn, table_card=table_card, action_type="pickup", created_at=timezone.now()) + PlayerHand.objects.create(game=game, player=defender_user, card=attack_card, + order_in_hand=defender_gp.cards_remaining + 1) + defender_gp.cards_remaining += 1 + defender_gp.save(update_fields=["cards_remaining"]) + + # Refill hands up to 6 cards + for gp in game_players: + while gp.cards_remaining < 6: + card = pop_top_db_card(game) + if not card: + break + PlayerHand.objects.create(game=game, player=gp.user, card=card, + order_in_hand=gp.cards_remaining + 1) + gp.cards_remaining += 1 + gp.save(update_fields=["cards_remaining"]) + + current_attacker_index = (current_attacker_index + 1) % len(game_players) + + # Optionally finish the game and pick a loser + if random.random() < 0.6: + loser_gp = random.choice(game_players) + game.loser = loser_gp.user + game.status = "finished" + game.finished_at = timezone.now() + game.save(update_fields=["loser", "status", "finished_at"]) + + return game diff --git a/game/management/commands/import_db.py b/game/management/commands/import_db.py new file mode 100644 index 0000000..d36622c --- /dev/null +++ b/game/management/commands/import_db.py @@ -0,0 +1,302 @@ +""" +Import JSON (Django serialization) into the database with optional filtering. + +This management command imports JSON previously exported by `export_db` (Django +serialized objects). It supports gzipped input, an app-label filter (--apps), +an optional full DB flush (--clear), and resetting database sequences for +PostgreSQL-driven backends. + +Behavior summary +--------------- +- Input: path to JSON or '-' for stdin. '.gz' files are supported. +- --apps: comma-separated list of app labels to import (if omitted, import all). +- --ignore-errors: attempt to continue past individual object save errors. +- --clear: run `manage.py flush --noinput` before importing (DANGEROUS). +- Sequence reset: automatically attempted for Postgres or when --reset-sequences is given. +""" + +import os +import gzip +from typing import Optional, Set, List, Iterable, Tuple + +from django.core.management.base import BaseCommand, CommandError +from django.core import serializers +from django.db import transaction, IntegrityError, connection +from django.core.management import call_command +from django.conf import settings +from django.apps import apps +from django.core.management.color import no_style + + +class Command(BaseCommand): + """ + Import JSON exported by export_db (Django serialization) with optional app filter. + + The command accepts a Django-serialized JSON array (optionally gzipped) and + deserializes objects into the database. Use `--apps` to restrict import to + objects belonging to a set of app labels (comma-separated). When PostgreSQL + is detected (or `--reset-sequences` is passed) the command will attempt to + reset DB sequences — by default for all models, or only for the selected apps + when `--apps` is used. + """ + + help = "Import JSON exported by export_db (Django serialization). Supports --apps filter." + + def add_arguments(self, parser): + """Define command-line arguments.""" + parser.add_argument( + "input", + help="Input JSON file path (relative paths searched in BASE_DIR). Use '-' for stdin. Supports .gz.", + ) + parser.add_argument( + "--clear", + action="store_true", + help="Clear the database via `flush --noinput` before importing. USE WITH CARE.", + ) + parser.add_argument( + "--ignore-errors", + action="store_true", + help="Try to continue past individual object errors (logs them).", + ) + parser.add_argument( + "--reset-sequences", + action="store_true", + help="Attempt to reset DB sequences after import (Postgres only). Default: on for Postgres.", + ) + parser.add_argument( + "--apps", + help="Comma-separated app labels to import (e.g. 'auth,game'). If omitted imports all apps.", + default=None, + ) + + # ------------------------- + # Helper utilities + # ------------------------- + def _parse_apps_arg(self, apps_arg: Optional[str]) -> Optional[Set[str]]: + """ + Parse the --apps argument into a set of app labels. + + Returns a set of app labels, or None if apps_arg is falsy. + """ + if not apps_arg: + return None + return {p.strip() for p in apps_arg.split(",") if p.strip()} + + def _resolve_input_path(self, path: str) -> str: + """ + Resolve a possibly-relative input path against BASE_DIR. + + If path is absolute, return as-is. If relative, join with BASE_DIR (or cwd). + """ + base = getattr(settings, "BASE_DIR", os.getcwd()) + if not os.path.isabs(path): + path = os.path.join(base, path) + return path + + def _read_raw_input(self, input_path: str, from_stdin: bool) -> str: + """ + Read input JSON text from a file (supports .gz) or from stdin. + + Raises CommandError if file missing or unreadable. + """ + if from_stdin: + return self.stdin.read() + + if not os.path.exists(input_path): + raise CommandError(f"Input file not found: {input_path}") + + try: + if input_path.endswith(".gz"): + with gzip.open(input_path, "rt", encoding="utf-8") as fh: + return fh.read() + else: + with open(input_path, "r", encoding="utf-8") as fh: + return fh.read() + except OSError as e: + raise CommandError(f"Failed reading input file '{input_path}': {e}") + + def _deserialized_iter(self, raw: str) -> Iterable: + """ + Return an iterator of deserialized objects from a JSON string. + + Raises CommandError if deserialization fails. + """ + try: + return serializers.deserialize("json", raw) + except Exception as e: + raise CommandError(f"Failed to deserialize input JSON: {e}") + + def _maybe_flush(self): + """Perform database flush (via manage.py flush) if requested.""" + try: + call_command("flush", "--noinput") + except Exception as e: + raise CommandError(f"Failed to flush database: {e}") + + # ------------------------- + # Import core + # ------------------------- + def _import_objects( + self, + deserialized_iter: Iterable, + apps_filter: Optional[Set[str]], + ignore_errors: bool + ) -> Tuple[int, int, List[str]]: + """ + Iterate the deserialized objects and save them to the DB. + + Returns (saved_count, skipped_count, errors_list). + + Behavior: + - If apps_filter is provided, objects whose model's app_label are not in + the set will be skipped (counted in skipped_count). + - If ignore_errors is True, exceptions for individual objects are logged + and the loop continues; objects are saved in per-object transactions + to avoid leaving partial state locked in a big transaction. + - If ignore_errors is False, the entire import runs in one transaction + and any error aborts the import (exception propagates). + """ + saved = 0 + skipped = 0 + errors: List[str] = [] + + if ignore_errors: + # Save each object in its own small transaction so we can continue on error. + for des_obj in deserialized_iter: + try: + obj_app_label = des_obj.object._meta.app_label + except Exception: + skipped += 1 + continue + + if apps_filter is not None and obj_app_label not in apps_filter: + skipped += 1 + continue + + try: + with transaction.atomic(): + des_obj.save() + saved += 1 + except IntegrityError as e: + msg = f"IntegrityError saving {des_obj}: {e}" + errors.append(msg) + self.stderr.write(self.style.ERROR(msg)) + # continue to next object + except Exception as e: + msg = f"Error saving {des_obj}: {e}" + errors.append(msg) + self.stderr.write(self.style.ERROR(msg)) + # continue to next object + else: + # Perform the import inside a single transaction — consistent commit or rollback. + try: + with transaction.atomic(): + for des_obj in deserialized_iter: + try: + obj_app_label = des_obj.object._meta.app_label + except Exception: + skipped += 1 + continue + + if apps_filter is not None and obj_app_label not in apps_filter: + skipped += 1 + continue + + des_obj.save() + saved += 1 + except IntegrityError: + # Let the IntegrityError propagate after reporting (so caller sees it), + # the transaction will rollback automatically. + raise + except Exception: + # Any other exception also should propagate out and rollback. + raise + + return saved, skipped, errors + + # ------------------------- + # Sequence reset + # ------------------------- + def _reset_postgres_sequences(self, apps_filter: Optional[Set[str]]) -> Optional[str]: + """ + Reset Postgres sequences for models (all models or filtered by apps_filter). + + Returns None on success, or an error message string on failure. + """ + style = no_style() + try: + if apps_filter is None: + models_to_reset = list(apps.get_models()) + else: + models_to_reset = [m for m in apps.get_models() if m._meta.app_label in apps_filter] + + sql_list = connection.ops.sequence_reset_sql(style, models_to_reset) + with connection.cursor() as cursor: + for sql in sql_list: + if sql.strip(): + cursor.execute(sql) + return None + except Exception as e: + return f"Failed to reset sequences: {e}" + + # ------------------------- + # Main handle + # ------------------------- + def handle(self, *args, **options): + """ + Main command entry. + + Steps: + 1. Resolve input (path/stdin) and read raw JSON text. + 2. Optionally flush DB (--clear). + 3. Deserialize objects and import them (respecting --apps and --ignore-errors). + 4. Optionally reset Postgres sequences (automatic on Postgres or via --reset-sequences). + 5. Print a summary and raise CommandError on fatal problems. + """ + input_path = options["input"] + do_clear = options["clear"] + ignore_errors = options["ignore_errors"] + reset_sequences_flag = options["reset_sequences"] + apps_arg = options["apps"] + + apps_filter = self._parse_apps_arg(apps_arg) + + read_from_stdin = (input_path == "-") + if not read_from_stdin: + input_path = self._resolve_input_path(input_path) + + # Optional DB flush + if do_clear: + self.stdout.write(self.style.WARNING("Flushing the database (this will remove ALL data)...")) + self._maybe_flush() + + # Read raw input + raw = self._read_raw_input(input_path, read_from_stdin) if not read_from_stdin else self._read_raw_input(input_path, True) + if not raw or not raw.strip(): + raise CommandError("Input file is empty.") + + # Prepare deserialized iterator + deserialized_iter = self._deserialized_iter(raw) + + # Import objects + saved, skipped, errors = self._import_objects(deserialized_iter, apps_filter, ignore_errors) + + # Decide whether to reset sequences: + engine = connection.settings_dict.get("ENGINE", "") + is_postgres = "postgresql" in engine or connection.vendor == "postgresql" + do_reset = reset_sequences_flag or is_postgres + + if do_reset and is_postgres: + reset_err = self._reset_postgres_sequences(apps_filter) + if reset_err: + self.stderr.write(self.style.ERROR(reset_err)) + errors.append(reset_err) + else: + self.stdout.write(self.style.SUCCESS("Postgres sequences reset successfully.")) + + # Summary + self.stdout.write(self.style.SUCCESS(f"Imported {saved} objects.")) + if skipped: + self.stdout.write(self.style.WARNING(f"Skipped {skipped} objects (outside --apps or invalid).")) + if errors: + self.stdout.write(self.style.WARNING(f"{len(errors)} errors occurred during import. See stderr for details.")) diff --git a/game/management/commands/init_game_data.py b/game/management/commands/init_game_data.py new file mode 100644 index 0000000..d113a4b --- /dev/null +++ b/game/management/commands/init_game_data.py @@ -0,0 +1,190 @@ +""" +Initialize default card suits, ranks and create Card entries. + +This management command will create standard card suits and ranks and then +create Card objects for each suit × rank combination for a chosen deck size. + +Usage: + python manage.py init_game_data + python manage.py init_game_data --deck-size 36 + python manage.py init_game_data --reset + +The command is idempotent by default (it uses get_or_create and updates mismatched +names/colors). Using --reset will delete existing Card, CardRank and CardSuit +records before recreating them. + +Module contents: + Command -- Django management command class implementing the behavior. +""" + +from typing import List, Tuple + +from django.core.management.base import BaseCommand +from django.db import transaction + +from game.models import CardSuit, CardRank, Card + + +class Command(BaseCommand): + """ + Django management command to initialize card suits, ranks and cards. + + The command supports 24-, 36- and 52-card decks and an optional reset flag + which deletes existing Card, CardRank and CardSuit records before creating + new ones. + + Attributes: + help (str): Short description displayed by `manage.py help`. + """ + + # Standard four suits with their display colors. + suits = [ + ("Hearts", "red"), + ("Diamonds", "red"), + ("Clubs", "black"), + ("Spades", "black"), + ] + + help = "Initialize default card suits, ranks and create Card entries. Default deck: 36 (Durak)." + + def add_arguments(self, parser): + """ + Add command-line arguments for the management command. + + Args: + parser (argparse.ArgumentParser): The argument parser instance. + + Recognized flags: + --deck-size {24,36,52}: Which deck to create (default 52). + --reset: If present, deletes existing Card/Rank/Suit rows before creating. + """ + parser.add_argument( + "--deck-size", + type=int, + choices=[24, 36, 52], + default=52, + help="Which deck to create cards for: 24 (9-A), 36 (6-A), 52 (2-A). Default: 52.", + ) + parser.add_argument( + "--reset", + action="store_true", + help="Delete existing Card, CardRank and CardSuit records and recreate from scratch.", + ) + + def ranks_for_deck(self, size: int) -> List[Tuple[str, int]]: + """ + Return a list of (name, value) tuples representing card ranks for the given deck size. + + The returned list orders ranks from lowest to highest numeric value. + + Args: + size (int): Deck size. Supported values: 24, 36, 52. + + Returns: + List[Tuple[str, int]]: List of (display_name, numeric_value) for ranks. + + Raises: + ValueError: If an unsupported deck size is supplied. + """ + face = [("Jack", 11), ("Queen", 12), ("King", 13), ("Ace", 14)] + if size == 52: + # 2..10 plus face cards + numeric = [(str(i), i) for i in range(2, 11)] + return numeric + face + if size == 36: + # 6..10 plus face cards (typical Durak deck) + numeric = [(str(i), i) for i in range(6, 11)] + return numeric + face + if size == 24: + # 9..10 plus face cards (short deck) + numeric = [("9", 9), ("10", 10)] + return numeric + face + raise ValueError("Unsupported deck size") + + def create_suits(self): + # Create or update suits + created_suits = [] + for name, color in self.suits: + suit_obj, created = CardSuit.objects.get_or_create(name=name, defaults={"color": color}) + # If suit exists but has a different color, update it to our canonical color. + if not created and getattr(suit_obj, "color", None) != color: + suit_obj.color = color + suit_obj.save(update_fields=["color"]) + created_suits.append(suit_obj) + self.stdout.write(f"{'Created' if created else 'Found'} suit: {suit_obj.name} ({suit_obj.color})") + return created_suits + + def create_ranks(self, ranks: List[Tuple[str, int]]) -> List[Tuple[str, int]]: + # Create or update ranks + created_ranks = [] + for name, value in ranks: + rank_obj, created = CardRank.objects.get_or_create(value=value, defaults={"name": name}) + # Normalize the printable name if it differs from our desired name. + if not created and getattr(rank_obj, "name", None) != name: + rank_obj.name = name + rank_obj.save(update_fields=["name"]) + created_ranks.append(rank_obj) + self.stdout.write(f"{'Created' if created else 'Found'} rank: {rank_obj.name} (value={rank_obj.value})") + return created_ranks + + def create_cards(self, ranks: List[Tuple[str, int]]): + # Create cards for every suit × rank (skip cards that already exist) + created_cards = 0 + skipped_cards = 0 + created_suits = self.create_suits() + created_ranks = self.create_ranks(ranks) + for suit in created_suits: + for rank in created_ranks: + # If a Card with the suit & rank already exists (and is not a special card), + # skip creating a duplicate. + card_qs = Card.objects.filter(suit=suit, rank=rank, special_card__isnull=True) + if card_qs.exists(): + skipped_cards += 1 + continue + Card.objects.create(suit=suit, rank=rank) + created_cards += 1 + + return created_cards, skipped_cards + + def handle(self, *args, **options): + """ + Main entry point for the management command. + + This method creates suits and ranks (using get_or_create so it is safe to + run repeatedly), then creates Card objects for each combination of suit + and rank. If --reset is passed, existing Card, CardRank and CardSuit + records will be deleted first. + + Args: + *args: Positional arguments (unused). + **options: Command options dictionary with keys: + deck_size (int): Deck size to create (24, 36, 52). + reset (bool): Whether to delete existing entries first. + + Raises: + ValueError: If an unsupported deck size is provided (shouldn't happen + because argparse restricts choices). + """ + deck_size = options["deck_size"] + do_reset = options["reset"] + + ranks = self.ranks_for_deck(deck_size) + + with transaction.atomic(): + if do_reset: + self.stdout.write("Reset requested — deleting existing Cards, CardRank and CardSuit entries...") + # Delete Cards first because of foreign key references to ranks & suits + Card.objects.all().delete() + CardRank.objects.all().delete() + CardSuit.objects.all().delete() + self.stdout.write("Existing card data deleted.") + + created_cards, skipped_cards = self.create_cards(ranks) + + # Summary output + self.stdout.write(self.style.SUCCESS( + f"Deck initialization finished for deck_size={deck_size}." + )) + self.stdout.write(f"Cards created: {created_cards}. Cards already present: {skipped_cards}.") + self.stdout.write( + f"Total suits: {CardSuit.objects.count()}; ranks: {CardRank.objects.count()}; cards: {Card.objects.count()}") diff --git a/game/management/commands/reset_games.py b/game/management/commands/reset_games.py new file mode 100644 index 0000000..36d09d7 --- /dev/null +++ b/game/management/commands/reset_games.py @@ -0,0 +1,179 @@ +""" +Initialize default card suits, ranks and create Card entries. + +This management command will create standard card suits and ranks and then +create Card objects for each suit × rank combination for a chosen deck size. + +Usage: + python manage.py init_game_data + python manage.py init_game_data --deck-size 36 + python manage.py init_game_data --reset + +The command is idempotent by default (it uses get_or_create and updates mismatched +names/colors). Using --reset will delete existing Card, CardRank and CardSuit +records before recreating them. + +Module contents: + Command -- Django management command class implementing the behavior. +""" + +from typing import List, Tuple + +from django.core.management.base import BaseCommand +from django.db import transaction + +from game.models import CardSuit, CardRank, Card + + +class Command(BaseCommand): + """ + Django management command to initialize card suits, ranks and cards. + + The command supports 24-, 36- and 52-card decks and an optional reset flag + which deletes existing Card, CardRank and CardSuit records before creating + new ones. + + Attributes: + help (str): Short description displayed by `manage.py help`. + """ + + help = "Initialize default card suits, ranks and create Card entries. Default deck: 36 (Durak)." + + def add_arguments(self, parser): + """ + Add command-line arguments for the management command. + + Args: + parser (argparse.ArgumentParser): The argument parser instance. + + Recognized flags: + --deck-size {24,36,52}: Which deck to create (default 52). + --reset: If present, deletes existing Card/Rank/Suit rows before creating. + """ + parser.add_argument( + "--deck-size", + type=int, + choices=[24, 36, 52], + default=52, + help="Which deck to create cards for: 24 (9-A), 36 (6-A), 52 (2-A). Default: 52.", + ) + parser.add_argument( + "--reset", + action="store_true", + help="Delete existing Card, CardRank and CardSuit records and recreate from scratch.", + ) + + def handle(self, *args, **options): + """ + Main entry point for the management command. + + This method creates suits and ranks (using get_or_create so it is safe to + run repeatedly), then creates Card objects for each combination of suit + and rank. If --reset is passed, existing Card, CardRank and CardSuit + records will be deleted first. + + Args: + *args: Positional arguments (unused). + **options: Command options dictionary with keys: + deck_size (int): Deck size to create (24, 36, 52). + reset (bool): Whether to delete existing entries first. + + Raises: + ValueError: If an unsupported deck size is provided (shouldn't happen + because argparse restricts choices). + """ + deck_size = options["deck_size"] + do_reset = options["reset"] + + # Standard four suits with their display colors. + suits = [ + ("Hearts", "red"), + ("Diamonds", "red"), + ("Clubs", "black"), + ("Spades", "black"), + ] + + def ranks_for_deck(size: int) -> List[Tuple[str, int]]: + """ + Return a list of (name, value) tuples representing card ranks for the given deck size. + + The returned list orders ranks from lowest to highest numeric value. + + Args: + size (int): Deck size. Supported values: 24, 36, 52. + + Returns: + List[Tuple[str, int]]: List of (display_name, numeric_value) for ranks. + + Raises: + ValueError: If an unsupported deck size is supplied. + """ + face = [("Jack", 11), ("Queen", 12), ("King", 13), ("Ace", 14)] + if size == 52: + # 2..10 plus face cards + numeric = [(str(i), i) for i in range(2, 11)] + return numeric + face + if size == 36: + # 6..10 plus face cards (typical Durak deck) + numeric = [(str(i), i) for i in range(6, 11)] + return numeric + face + if size == 24: + # 9..10 plus face cards (short deck) + numeric = [("9", 9), ("10", 10)] + return numeric + face + raise ValueError("Unsupported deck size") + + ranks = ranks_for_deck(deck_size) + + with transaction.atomic(): + if do_reset: + self.stdout.write("Reset requested — deleting existing Cards, CardRank and CardSuit entries...") + # Delete Cards first because of foreign key references to ranks & suits + Card.objects.all().delete() + CardRank.objects.all().delete() + CardSuit.objects.all().delete() + self.stdout.write("Existing card data deleted.") + + # Create or update suits + created_suits = [] + for name, color in suits: + suit_obj, created = CardSuit.objects.get_or_create(name=name, defaults={"color": color}) + # If suit exists but has a different color, update it to our canonical color. + if not created and getattr(suit_obj, "color", None) != color: + suit_obj.color = color + suit_obj.save(update_fields=["color"]) + created_suits.append(suit_obj) + self.stdout.write(f"{'Created' if created else 'Found'} suit: {suit_obj.name} ({suit_obj.color})") + + # Create or update ranks + created_ranks = [] + for name, value in ranks: + rank_obj, created = CardRank.objects.get_or_create(value=value, defaults={"name": name}) + # Normalize the printable name if it differs from our desired name. + if not created and getattr(rank_obj, "name", None) != name: + rank_obj.name = name + rank_obj.save(update_fields=["name"]) + created_ranks.append(rank_obj) + self.stdout.write(f"{'Created' if created else 'Found'} rank: {rank_obj.name} (value={rank_obj.value})") + + # Create cards for every suit × rank (skip cards that already exist) + created_cards = 0 + skipped_cards = 0 + for suit in created_suits: + for rank in created_ranks: + # If a Card with the suit & rank already exists (and is not a special card), + # skip creating a duplicate. + card_qs = Card.objects.filter(suit=suit, rank=rank, special_card__isnull=True) + if card_qs.exists(): + skipped_cards += 1 + continue + Card.objects.create(suit=suit, rank=rank) + created_cards += 1 + + # Summary output + self.stdout.write(self.style.SUCCESS( + f"Deck initialization finished for deck_size={deck_size}." + )) + self.stdout.write(f"Cards created: {created_cards}. Cards already present: {skipped_cards}.") + self.stdout.write( + f"Total suits: {CardSuit.objects.count()}; ranks: {CardRank.objects.count()}; cards: {Card.objects.count()}") diff --git a/game/tests/test_export_import_db.py b/game/tests/test_export_import_db.py new file mode 100644 index 0000000..39bceb4 --- /dev/null +++ b/game/tests/test_export_import_db.py @@ -0,0 +1,46 @@ +"""Tests for database export/import management commands. + +This module contains tests that exercise the `export_db` and `import_db` +Django management commands to ensure data exported to JSON can be imported +back and recreate the expected model instances. +""" + +import pytest + +from django.core.management import call_command + +from game.models import Lobby +from accounts.models import User + + +@pytest.mark.django_db +def test_export_import_db_creates_objects(user_factory, tmp_path): + """Verify that exporting the DB to JSON and re-importing recreates objects. + + The test performs the following steps: + 1. Create a test user and a Lobby owned by that user. + 2. Export the database to a temporary JSON file using the ``export_db`` + management command. + 3. Remove the created objects from the database to simulate a fresh import. + 4. Run the ``import_db`` management command to import from the JSON file. + 5. Assert that the Lobby and User objects exist after import. + """ + + user = user_factory(username="player1") + lobby_name = "TestLobby" + Lobby.objects.create(owner=user, name=lobby_name) + + # Export to temp file using --output + tmp_file = tmp_path / "backup.json" + call_command("export_db", f"--output={str(tmp_file)}") + + # Clear DB (simulate fresh import) + Lobby.objects.all().delete() + User.objects.filter(username="player1").delete() + + # Import back + call_command("import_db", str(tmp_file)) + + # Assertions + assert Lobby.objects.filter(name=lobby_name).exists() + assert User.objects.filter(username="player1").exists() diff --git a/game/tests/test_generate_fake_games.py b/game/tests/test_generate_fake_games.py new file mode 100644 index 0000000..718bc31 --- /dev/null +++ b/game/tests/test_generate_fake_games.py @@ -0,0 +1,42 @@ +"""Tests for the `generate_fake_games` management command. + +This module verifies that the `generate_fake_games` command creates `Game` +instances and associates them with `Lobby` objects. +""" + +import pytest +from django.core.management import call_command + +from game.models import Game, Lobby + + +@pytest.mark.django_db +def test_generate_fake_games_creates_games(user_factory): + """Ensure `generate_fake_games` creates new Game objects. + + Steps: + 1. Create an owner user and a Lobby. + 2. Record the number of Game instances before running the command. + 3. Run the management command. + 4. Assert that the Game count increased and at least one game has a + non-null lobby assignment. + """ + + user = user_factory(username="owner") + lobby = Lobby.objects.create(owner=user, name="FakeLobby") + + count_before = Game.objects.count() + + # Generate fake games + call_command("generate_fake_games") + + count_after = Game.objects.count() + assert count_after > count_before, "Expected generate_fake_games to increase Game count" + + # There should be at least one game with a lobby assigned + assert Game.objects.filter(lobby__isnull=False).exists() + + # Basic sanity on the first game + game = Game.objects.first() + assert game is not None + assert game.lobby is not None diff --git a/game/tests/test_init_game_data.py b/game/tests/test_init_game_data.py new file mode 100644 index 0000000..13191e7 --- /dev/null +++ b/game/tests/test_init_game_data.py @@ -0,0 +1,36 @@ +"""Tests for the `init_game_data` management command. + +This module verifies that `init_game_data` initializes card suits, ranks, +and Card records for the requested deck size. The test runs the command +with a 36-card deck and checks that the expected numbers of suits, ranks, +and cards were created. +""" + +import pytest +from django.core.management import call_command + +from game.models import CardSuit, CardRank, Card + + +@pytest.mark.django_db +def test_init_game_data_creates_cards(): + """Run `init_game_data --deck-size 36 --reset` and verify DB state. + + The test executes the management command to initialize a 36-card deck + and asserts: + - 4 suits were created + - 9 ranks (6..10 plus J,Q,K,A) were created + - 36 Card instances were created + """ + + # Run the initialization command (reset ensures idempotence) + call_command("init_game_data", "--deck-size", "36", "--reset") + + # Check suits + assert CardSuit.objects.count() == 4 + + # Numeric 6..10 + face cards = 5 + 4 = 9 + assert CardRank.objects.count() == 9 + + # Total cards = 36 + assert Card.objects.count() == 36 diff --git a/game/tests/test_reset_games.py b/game/tests/test_reset_games.py new file mode 100644 index 0000000..98fd935 --- /dev/null +++ b/game/tests/test_reset_games.py @@ -0,0 +1,29 @@ +"""Tests for the `reset_games` management command. + +This module verifies that the `reset_games` command removes active/unfinished +Game instances when run with confirmation. +""" + +import pytest +from django.core.management import call_command + +from game.models import Game + + +@pytest.mark.django_db +def test_reset_games_removes_active_games(basic_game): + """Ensure `reset_games` deletes active games only when confirmed. + + Steps: + 1. Record number of Game instances before running the command. + 2. Run `reset_games` with no arguments + """ + + # Ensure at least one game exists (provided by the basic_game fixture) + count_before = Game.objects.count() + assert count_before >= 1, "basic_game fixture should create at least one Game" + + # Dry-run: should not delete any games + call_command("reset_games") + count_after_dry = Game.objects.count() + assert count_after_dry == count_before, "Expected dry-run reset_games to not delete games"