diff --git a/weeb_cli/i18n.py b/weeb_cli/i18n.py index 137179e..dcd770b 100644 --- a/weeb_cli/i18n.py +++ b/weeb_cli/i18n.py @@ -77,12 +77,19 @@ def __init__(self) -> None: self.translations: Dict[str, Any] = {} self.load_translations() + SUPPORTED_LANGUAGES = {"en", "tr", "de", "pl"} + def set_language(self, language_code: str) -> None: """Set the active language and reload translations. Args: language_code: Language code (e.g., 'en', 'tr', 'de', 'pl'). + + Raises: + ValueError: If language_code is not a supported language. """ + if language_code not in self.SUPPORTED_LANGUAGES: + raise ValueError(f"Unsupported language: {language_code}. Supported: {self.SUPPORTED_LANGUAGES}") self.language = language_code config.set("language", language_code) self.load_translations() diff --git a/weeb_cli/providers/registry.py b/weeb_cli/providers/registry.py index 1a5dcee..7d91d9f 100644 --- a/weeb_cli/providers/registry.py +++ b/weeb_cli/providers/registry.py @@ -39,8 +39,8 @@ class MyProvider(BaseProvider): """ import importlib +import json import pkgutil -import pickle from pathlib import Path from typing import Dict, List, Type, Optional @@ -53,8 +53,8 @@ class MyProvider(BaseProvider): _provider_meta: Dict[str, dict] = {} _initialized: bool = False -# Cache file location -PROVIDER_CACHE_FILE = CONFIG_DIR / "provider_cache.pkl" +# Cache file location (JSON instead of pickle for security) +PROVIDER_CACHE_FILE = CONFIG_DIR / "provider_cache.json" def register_provider(name: str, lang: str = "tr", region: str = "TR", disabled: bool = False): @@ -143,7 +143,7 @@ def _discover_providers() -> None: def _load_from_cache() -> bool: - """Load provider registry from cache file. + """Load provider registry from cache file (JSON-based). Returns: True if cache was loaded successfully, False otherwise. @@ -151,14 +151,21 @@ def _load_from_cache() -> bool: global _providers, _provider_meta if not PROVIDER_CACHE_FILE.exists(): + # Clean up old pickle cache if it exists + old_pickle = PROVIDER_CACHE_FILE.with_suffix(".pkl") + if old_pickle.exists(): + try: + old_pickle.unlink() + except OSError: + pass return False try: - with open(PROVIDER_CACHE_FILE, 'rb') as f: - cache_data = pickle.load(f) + with open(PROVIDER_CACHE_FILE, 'r', encoding='utf-8') as f: + cache_data = json.load(f) # Validate cache structure - if not isinstance(cache_data, dict) or 'providers' not in cache_data: + if not isinstance(cache_data, dict) or 'modules' not in cache_data: return False # Import all provider modules to register classes @@ -170,33 +177,30 @@ def _load_from_cache() -> bool: return False return True - except Exception as e: + except (json.JSONDecodeError, OSError) as e: debug(f"[Registry] Failed to load provider cache: {e}") return False def _save_to_cache() -> None: - """Save provider registry to cache file.""" + """Save provider registry to cache file (JSON-based).""" try: PROVIDER_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) # Collect module paths for all registered providers modules = set() for name, cls in _providers.items(): - module_path = cls.__module__ - modules.add(module_path) + modules.add(cls.__module__) cache_data = { - 'providers': list(_providers.keys()), - 'modules': list(modules), - 'meta': _provider_meta + 'modules': sorted(modules), } - with open(PROVIDER_CACHE_FILE, 'wb') as f: - pickle.dump(cache_data, f) + with open(PROVIDER_CACHE_FILE, 'w', encoding='utf-8') as f: + json.dump(cache_data, f, indent=2) debug(f"[Registry] Saved {len(_providers)} providers to cache") - except Exception as e: + except OSError as e: debug(f"[Registry] Failed to save provider cache: {e}") diff --git a/weeb_cli/services/dependency_manager.py b/weeb_cli/services/dependency_manager.py index d960d71..7e45a03 100644 --- a/weeb_cli/services/dependency_manager.py +++ b/weeb_cli/services/dependency_manager.py @@ -273,7 +273,7 @@ def _extract_and_install(self, archive_path, target_files, tool_name): z.extractall(path=temp_extract) elif archive_path.endswith((".tar.gz", ".tar.xz", ".tar.bz2", ".tgz")): with tarfile.open(archive_path, "r:*") as tar_ref: - tar_ref.extractall(temp_extract) + tar_ref.extractall(temp_extract, filter='data') else: raise Exception("Unsupported format") diff --git a/weeb_cli/services/notifier.py b/weeb_cli/services/notifier.py index 90e2f93..f8af806 100644 --- a/weeb_cli/services/notifier.py +++ b/weeb_cli/services/notifier.py @@ -1,7 +1,8 @@ import platform +import re import subprocess +import shutil import threading -from typing import Optional def send_notification(title: str, message: str) -> None: @@ -26,6 +27,10 @@ def _send_notification_sync(title: str, message: str) -> None: pass +def _sanitize_for_shell(text: str) -> str: + return re.sub(r'[^\w\s\-.,!?:()\'"/]', '', text)[:256] + + def _notify_windows(title: str, message: str) -> None: if _try_winotify(title, message): return @@ -64,15 +69,17 @@ def _try_win10toast(title: str, message: str) -> bool: def _try_powershell(title: str, message: str) -> None: try: - ps_script = f''' - [Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null - $template = [Windows.UI.Notifications.ToastNotificationManager]::GetTemplateContent([Windows.UI.Notifications.ToastTemplateType]::ToastText02) - $textNodes = $template.GetElementsByTagName("text") - $textNodes.Item(0).AppendChild($template.CreateTextNode("{title}")) | Out-Null - $textNodes.Item(1).AppendChild($template.CreateTextNode("{message}")) | Out-Null - $toast = [Windows.UI.Notifications.ToastNotification]::new($template) - [Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier("Weeb CLI").Show($toast) - ''' + safe_title = _sanitize_for_shell(title) + safe_message = _sanitize_for_shell(message) + ps_script = ( + '[Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null\n' + '$template = [Windows.UI.Notifications.ToastNotificationManager]::GetTemplateContent([Windows.UI.Notifications.ToastTemplateType]::ToastText02)\n' + '$textNodes = $template.GetElementsByTagName("text")\n' + f'$textNodes.Item(0).AppendChild($template.CreateTextNode("{safe_title}")) | Out-Null\n' + f'$textNodes.Item(1).AppendChild($template.CreateTextNode("{safe_message}")) | Out-Null\n' + '$toast = [Windows.UI.Notifications.ToastNotification]::new($template)\n' + '[Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier("Weeb CLI").Show($toast)' + ) subprocess.run( ["powershell", "-Command", ps_script], capture_output=True, @@ -83,10 +90,14 @@ def _try_powershell(title: str, message: str) -> None: def _notify_macos(title: str, message: str) -> None: - script = f'display notification "{message}" with title "{title}"' - subprocess.run(["osascript", "-e", script], capture_output=True) + safe_title = _sanitize_for_shell(title) + safe_message = _sanitize_for_shell(message) + script = f'display notification "{safe_message}" with title "{safe_title}"' + subprocess.run(["osascript", "-e", script], capture_output=True, timeout=5) def _notify_linux(title: str, message: str) -> None: - subprocess.run(["notify-send", title, message], capture_output=True) + if not shutil.which("notify-send"): + return + subprocess.run(["notify-send", title, message], capture_output=True, timeout=5) diff --git a/weeb_cli/services/updater.py b/weeb_cli/services/updater.py index 2cfb3a1..135be4e 100644 --- a/weeb_cli/services/updater.py +++ b/weeb_cli/services/updater.py @@ -112,11 +112,13 @@ def download_exe(url, filename): console.print(f"[dim]{i18n.t('update.location')}: {new_exe_path}[/dim]") if getattr(sys, 'frozen', False): + safe_current = current_exe.replace('^', '^^').replace('&', '^&').replace('|', '^|').replace('<', '^<').replace('>', '^>').replace('%', '%%') + safe_new = new_exe_path.replace('^', '^^').replace('&', '^&').replace('|', '^|').replace('<', '^<').replace('>', '^>').replace('%', '%%') batch_content = f'''@echo off timeout /t 2 /nobreak >nul -del "{current_exe}" -move "{new_exe_path}" "{current_exe}" -start "" "{current_exe}" +del "{safe_current}" +move "{safe_new}" "{safe_current}" +start "" "{safe_current}" del "%~f0" ''' batch_path = os.path.join(download_dir, "update.bat")