diff --git a/pyrit/common/display_response.py b/pyrit/common/display_response.py index 6a97af39cc..1d1d84cc23 100644 --- a/pyrit/common/display_response.py +++ b/pyrit/common/display_response.py @@ -1,56 +1,65 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT license. - -import io -import logging - -from PIL import Image - -from pyrit.common.notebook_utils import is_in_ipython_session -from pyrit.memory import CentralMemory -from pyrit.models import AzureBlobStorageIO, DiskStorageIO, MessagePiece - -logger = logging.getLogger(__name__) - - -async def display_image_response(response_piece: MessagePiece) -> None: - """ - Display response images if running in notebook environment. - - Args: - response_piece (MessagePiece): The response piece to display. - - Raises: - RuntimeError: If storage IO is not initialized. - """ - memory = CentralMemory.get_memory_instance() - if ( - response_piece.response_error == "none" - and response_piece.converted_value_data_type == "image_path" - and is_in_ipython_session() - ): - image_location = response_piece.converted_value - - try: - if memory.results_storage_io is None: - raise RuntimeError("Storage IO not initialized") - image_bytes = await memory.results_storage_io.read_file(image_location) - except Exception as e: - if isinstance(memory.results_storage_io, AzureBlobStorageIO): - try: - # Fallback to reading from disk if the storage IO fails - image_bytes = await DiskStorageIO().read_file(image_location) - except Exception as exc: - logger.error(f"Failed to read image from {image_location}. Full exception: {str(exc)}") - return - else: - logger.error(f"Failed to read image from {image_location}. Full exception: {str(e)}") - return - - image_stream = io.BytesIO(image_bytes) - image = Image.open(image_stream) - - # Jupyter built-in display function only works in notebooks. - display(image) # type: ignore[name-defined] # noqa: F821 - if response_piece.response_error == "blocked": - logger.info("---\nContent blocked, cannot show a response.\n---") +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import io +import logging + +from PIL import Image, ImageEnhance + +from pyrit.common.notebook_utils import is_in_ipython_session +from pyrit.memory import CentralMemory +from pyrit.models import AzureBlobStorageIO, DiskStorageIO, MessagePiece + +logger = logging.getLogger(__name__) + + +async def display_image_response(response_piece: MessagePiece, safe_outputs: bool = False) -> None: + """ + Display response images if running in notebook environment. + + Args: + response_piece (MessagePiece): The response piece to display. + safe_outputs (bool): Whether to sanitize image outputs before displaying them. + + Raises: + RuntimeError: If storage IO is not initialized. + """ + memory = CentralMemory.get_memory_instance() + if ( + response_piece.response_error == "none" + and response_piece.converted_value_data_type == "image_path" + and is_in_ipython_session() + ): + image_location = response_piece.converted_value + + try: + if memory.results_storage_io is None: + raise RuntimeError("Storage IO not initialized") + image_bytes = await memory.results_storage_io.read_file(image_location) + except Exception as e: + if isinstance(memory.results_storage_io, AzureBlobStorageIO): + try: + # Fallback to reading from disk if the storage IO fails + image_bytes = await DiskStorageIO().read_file(image_location) + except Exception as exc: + logger.error(f"Failed to read image from {image_location}. Full exception: {str(exc)}") + return + else: + logger.error(f"Failed to read image from {image_location}. Full exception: {str(e)}") + return + + image_stream = io.BytesIO(image_bytes) + image: Image.Image = Image.open(image_stream) + + if safe_outputs: + new_width = int(image.width * 0.5) + new_height = int(image.height * 0.5) + image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) + + image = ImageEnhance.Color(image).enhance(0.0) + image = image.rotate(90.0, expand=True, fillcolor=(255, 255, 255)) + + # Jupyter built-in display function only works in notebooks. + display(image) # type: ignore[name-defined] # noqa: F821 + if response_piece.response_error == "blocked": + logger.info("---\nContent blocked, cannot show a response.\n---") diff --git a/pyrit/executor/attack/printer/console_printer.py b/pyrit/executor/attack/printer/console_printer.py index ff1cce42f9..c1117fbf29 100644 --- a/pyrit/executor/attack/printer/console_printer.py +++ b/pyrit/executor/attack/printer/console_printer.py @@ -23,7 +23,9 @@ class ConsoleAttackResultPrinter(AttackResultPrinter): for consoles that don't support ANSI characters. """ - def __init__(self, *, width: int = 100, indent_size: int = 2, enable_colors: bool = True): + def __init__( + self, *, width: int = 100, indent_size: int = 2, enable_colors: bool = True, safe_outputs: bool = False + ): """ Initialize the console printer. @@ -34,6 +36,8 @@ def __init__(self, *, width: int = 100, indent_size: int = 2, enable_colors: boo Defaults to 2. enable_colors (bool): Whether to enable ANSI color output. When False, all output will be plain text without colors. Defaults to True. + safe_outputs (bool): Whether to sanitize image outputs before displaying them. + Defaults to False. Raises: ValueError: If width <= 0 or indent_size < 0. @@ -42,6 +46,7 @@ def __init__(self, *, width: int = 100, indent_size: int = 2, enable_colors: boo self._width = width self._indent = " " * indent_size self._enable_colors = enable_colors + self._safe_outputs = safe_outputs def _print_colored(self, text: str, *colors: str) -> None: """ @@ -227,7 +232,7 @@ async def print_messages_async( self._print_wrapped_text(piece.converted_value, Fore.YELLOW) # Display images if present - await display_image_response(piece) + await display_image_response(response_piece=piece, safe_outputs=self._safe_outputs) # Print scores with better formatting (only if scores are requested) if include_scores: diff --git a/pyrit/executor/attack/printer/markdown_printer.py b/pyrit/executor/attack/printer/markdown_printer.py index 5946ce985c..624540a17e 100644 --- a/pyrit/executor/attack/printer/markdown_printer.py +++ b/pyrit/executor/attack/printer/markdown_printer.py @@ -3,6 +3,7 @@ import os from datetime import datetime, timezone +from pathlib import Path from pyrit.executor.attack.printer.attack_result_printer import AttackResultPrinter from pyrit.memory import CentralMemory @@ -18,7 +19,7 @@ class MarkdownAttackResultPrinter(AttackResultPrinter): markdown formatting that should be properly rendered. """ - def __init__(self, *, display_inline: bool = True): + def __init__(self, *, display_inline: bool = True, output_file_path: Path | None = None): """ Initialize the markdown printer. @@ -26,9 +27,12 @@ def __init__(self, *, display_inline: bool = True): display_inline (bool): If True, uses IPython.display to render markdown inline in Jupyter notebooks. If False, prints markdown strings. Defaults to True. + output_file_path (Path | None): If set, markdown output is appended to this + file instead of being displayed or printed. Defaults to None. """ self._memory = CentralMemory.get_memory_instance() self._display_inline = display_inline + self._output_file_path = output_file_path def _render_markdown(self, markdown_lines: list[str]) -> None: """ @@ -42,6 +46,11 @@ def _render_markdown(self, markdown_lines: list[str]) -> None: """ full_markdown = "\n".join(markdown_lines) + if self._output_file_path: + with open(self._output_file_path, "a", encoding="utf-8") as f: + f.write(full_markdown + "\n") + return + if self._display_inline: try: from IPython.display import Markdown, display diff --git a/pyrit/scenario/scenarios/airt/jailbreak.py b/pyrit/scenario/scenarios/airt/jailbreak.py index c09927def8..1077f924b6 100644 --- a/pyrit/scenario/scenarios/airt/jailbreak.py +++ b/pyrit/scenario/scenarios/airt/jailbreak.py @@ -126,6 +126,7 @@ def __init__( num_templates: Optional[int] = None, num_attempts: int = 1, jailbreak_names: list[str] | None = None, + jailbreak_paths: list[str] | None = None, ) -> None: """ Initialize the jailbreak scenario. @@ -134,26 +135,32 @@ def __init__( objective_scorer (Optional[TrueFalseScorer]): Scorer for detecting successful jailbreaks (non-refusal). If not provided, defaults to an inverted refusal scorer. include_baseline (bool): Whether to include a baseline atomic attack that sends all - objectives without modifications. Defaults to True. + objectives without modifications. Defaults to False. scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume. num_templates (Optional[int]): Choose num_templates random jailbreaks rather than using all of them. num_attempts (Optional[int]): Number of times to try each jailbreak. - jailbreak_names (Optional[List[str]]): List of jailbreak names from the template list under datasets. - to use. + jailbreak_names (Optional[List[str]]): List of jailbreak names from the template list under datasets + to use. Mutually exclusive with jailbreak_paths and num_templates. + jailbreak_paths (Optional[List[str]]): List of absolute or relative paths to YAML jailbreak + template files to use. Mutually exclusive with jailbreak_names and num_templates. Raises: - ValueError: If both jailbreak_names and num_templates are provided, as random selection - is incompatible with a predetermined list. - ValueError: If the jailbreak_names list contains a jailbreak that isn't in the listed - templates. + ValueError: If more than one of jailbreak_names, jailbreak_paths, or num_templates is provided, + as the three selection modes are mutually exclusive. + ValueError: If the jailbreak_names list contains a name that isn't in the discovered templates. + ValueError: If any path in jailbreak_paths does not exist on disk. """ if jailbreak_names is None: jailbreak_names = [] - if jailbreak_names and num_templates: + if jailbreak_paths is None: + jailbreak_paths = [] + + provided_sources = [bool(jailbreak_names), bool(jailbreak_paths), bool(num_templates)] + if sum(provided_sources) > 1: raise ValueError( - "Please provide only one of `num_templates` (random selection)" - " or `jailbreak_names` (specific selection)." + "Please provide only one of `num_templates` (random selection)," + " `jailbreak_names` (selection by name), or `jailbreak_paths` (selection by path)." ) self._objective_scorer: TrueFalseScorer = ( @@ -163,22 +170,30 @@ def __init__( self._num_templates = num_templates self._num_attempts = num_attempts self._adversarial_target: Optional[OpenAIChatTarget] = None - - # Note that num_templates and jailbreak_names are mutually exclusive. - # If self._num_templates is None, then this returns all discoverable jailbreak templates. - # If self._num_templates has some value, then all_templates is a subset of all available - # templates, but jailbreak_names is guaranteed to be [], so diff = {}. - all_templates = TextJailBreak.get_jailbreak_templates(num_templates=self._num_templates) - - # Example: if jailbreak_names is {'a', 'b', 'c'}, and all_templates is {'b', 'c', 'd'}, - # then diff = {'a'}, which raises the error as 'a' was not discovered in all_templates. - diff = set(jailbreak_names) - set(all_templates) - if len(diff) > 0: - raise ValueError(f"Error: could not find templates `{diff}`!") - - # If jailbreak_names has some value, then `if jailbreak_names` passes, and self._jailbreaks - # is set to jailbreak_names. Otherwise we use all_templates. - self._jailbreaks = jailbreak_names if jailbreak_names else all_templates + self._jailbreak_paths = jailbreak_paths + self._jailbreaks = jailbreak_names + + if jailbreak_paths: + missing = [p for p in jailbreak_paths if not Path(p).exists()] + if missing: + raise ValueError(f"Jailbreak template paths not found: {missing}") + else: + # Note that num_templates and jailbreak_names are mutually exclusive. + # If self._num_templates is None, then this returns all discoverable jailbreak templates. + # If self._num_templates has some value, then all_templates is a subset of all available + # templates, but jailbreak_names is guaranteed to be [], so diff = {}. + all_templates = TextJailBreak.get_jailbreak_templates(num_templates=self._num_templates) + + # Example: if jailbreak_names is {'a', 'b', 'c'}, and all_templates is {'b', 'c', 'd'}, + # then diff = {'a'}, which raises the error as 'a' was not discovered in all_templates. + diff = set(jailbreak_names) - set(all_templates) + if len(diff) > 0: + raise ValueError(f"Error: could not find templates `{diff}`!") + + # If jailbreak_names has some value, then `if jailbreak_names` passes, and self._jailbreaks + # is set to jailbreak_names. Otherwise we use all_templates. + if not jailbreak_names: + self._jailbreaks = all_templates super().__init__( version=self.VERSION, @@ -236,20 +251,30 @@ def _resolve_seed_groups(self) -> list[SeedAttackGroup]: return list(seed_groups) async def _get_atomic_attack_from_strategy_async( - self, *, strategy: str, jailbreak_template_name: str + self, + *, + strategy: str, + jailbreak_template_name: Optional[str] = None, + jailbreak_template_path: Optional[str] = None, ) -> AtomicAttack: """ Create an atomic attack for a specific jailbreak template. + Exactly one of jailbreak_template_name or jailbreak_template_path must be provided. + Args: strategy (str): JailbreakStrategy to use. - jailbreak_template_name (str): Name of the jailbreak template file. + jailbreak_template_name (Optional[str]): Name of the jailbreak template file (resolved + from the predefined templates directory). + jailbreak_template_path (Optional[str]): Absolute or relative path to a YAML jailbreak + template file. Returns: AtomicAttack: An atomic attack using the specified jailbreak template. Raises: ValueError: If scenario is not properly initialized. + ValueError: If neither or both template source arguments are provided. """ # objective_target is guaranteed to be non-None by parent class validation if self._objective_target is None: @@ -257,10 +282,19 @@ async def _get_atomic_attack_from_strategy_async( "Scenario not properly initialized. Call await scenario.initialize_async() before running." ) - # Create the jailbreak converter - jailbreak_converter = TextJailbreakConverter( - jailbreak_template=TextJailBreak(template_file_name=jailbreak_template_name) - ) + if not jailbreak_template_name and not jailbreak_template_path: + raise ValueError("One of jailbreak_template_name or jailbreak_template_path must be provided.") + + if jailbreak_template_name and jailbreak_template_path: + raise ValueError("Only one of jailbreak_template_name or jailbreak_template_path can be provided.") + + # Create the jailbreak converter from name or path + if jailbreak_template_path: + jailbreak_template = TextJailBreak(template_path=jailbreak_template_path) + else: + jailbreak_template = TextJailBreak(template_file_name=jailbreak_template_name) + + jailbreak_converter = TextJailbreakConverter(jailbreak_template=jailbreak_template) # Create converter configuration converter_config = AttackConverterConfig( @@ -292,11 +326,11 @@ async def _get_atomic_attack_from_strategy_async( if not attack: raise ValueError(f"Attack cannot be None!") - # Extract template name without extension for the atomic attack name - template_name = Path(jailbreak_template_name).stem + # Extract template stem from whichever source was provided + template_stem = Path(jailbreak_template_path or jailbreak_template_name).stem return AtomicAttack( - atomic_attack_name=f"jailbreak_{template_name}", + atomic_attack_name=f"jailbreak_{template_stem}", attack_technique=AttackTechnique(attack=attack), seed_groups=self._seed_groups or [], ) @@ -324,5 +358,11 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: strategy=strategy, jailbreak_template_name=template_name ) atomic_attacks.append(atomic_attack) + for template_path in self._jailbreak_paths: + for _ in range(self._num_attempts): + atomic_attack = await self._get_atomic_attack_from_strategy_async( + strategy=strategy, jailbreak_template_path=template_path + ) + atomic_attacks.append(atomic_attack) return atomic_attacks diff --git a/tests/unit/executor/attack/core/test_markdown_printer.py b/tests/unit/executor/attack/core/test_markdown_printer.py index fc0ff0adbf..deb0bb82a0 100644 --- a/tests/unit/executor/attack/core/test_markdown_printer.py +++ b/tests/unit/executor/attack/core/test_markdown_printer.py @@ -3,6 +3,7 @@ import os import uuid +from pathlib import Path from unittest.mock import MagicMock, patch import pytest @@ -36,6 +37,11 @@ def markdown_printer(patch_central_database): return MarkdownAttackResultPrinter(display_inline=False) +@pytest.fixture +def markdown_printer_to_file(patch_central_database, tmp_path): + return MarkdownAttackResultPrinter(output_file_path=Path(tmp_path) / "output.md") + + @pytest.fixture def sample_boolean_score(): return Score( @@ -247,3 +253,40 @@ async def test_print_summary_async(markdown_printer, sample_attack_result, capsy assert "Test objective" in captured.out assert "TestAttack" in captured.out assert "test-conv-123" in captured.out + + +@pytest.mark.asyncio +async def test_output_file_path_appends_to_file(markdown_printer_to_file, sample_attack_result, capsys): + """Test that output_file_path writes markdown to file and produces no stdout.""" + await markdown_printer_to_file.print_result_async(sample_attack_result) + + content = markdown_printer_to_file._output_file_path.read_text(encoding="utf-8") + assert "Attack Result: SUCCESS" in content + assert "## Attack Summary" in content + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.asyncio +async def test_output_file_path_appends_multiple_calls(markdown_printer_to_file, sample_attack_result): + """Test that calling print twice appends both reports to the same file.""" + await markdown_printer_to_file.print_result_async(sample_attack_result) + await markdown_printer_to_file.print_result_async(sample_attack_result) + + content = markdown_printer_to_file._output_file_path.read_text(encoding="utf-8") + assert content.count("Attack Result: SUCCESS") == 2 + + +@pytest.mark.asyncio +async def test_output_file_path_none_does_not_write( + markdown_printer, sample_attack_result, mock_memory, tmp_path, capsys +): + """Test that default output_file_path=None prints to stdout and writes no file.""" + await markdown_printer.print_result_async(sample_attack_result) + + captured = capsys.readouterr() + assert "Attack Result: SUCCESS" in captured.out + + # No file should have been created in tmp_path + assert list(tmp_path.iterdir()) == [] diff --git a/tests/unit/scenario/test_jailbreak.py b/tests/unit/scenario/test_jailbreak.py index 46ee07439a..54d83d2619 100644 --- a/tests/unit/scenario/test_jailbreak.py +++ b/tests/unit/scenario/test_jailbreak.py @@ -26,6 +26,15 @@ def mock_templates() -> list[str]: return ["aim", "dan_1", "tuo"] +@pytest.fixture +def mock_jailbreak_paths() -> list[str]: + """Two real jailbreak template paths for path-based selection tests.""" + return [ + str(JAILBREAK_TEMPLATES_PATH / "dan_1.yaml"), + str(JAILBREAK_TEMPLATES_PATH / "aim.yaml"), + ] + + @pytest.fixture def mock_random_num_attempts() -> int: """Mock constant for n-many attempts per jailbreak.""" @@ -174,12 +183,31 @@ def test_init_with_num_attempts(self, mock_random_num_attempts): scenario = Jailbreak(num_attempts=mock_random_num_attempts) assert scenario._num_attempts == mock_random_num_attempts - def test_init_raises_exception_when_both_num_and_which_jailbreaks(self, mock_random_num_templates, mock_templates): + def test_init_with_jailbreak_paths(self, mock_jailbreak_paths, mock_memory_seed_groups): + """Test initialization with explicit jailbreak file paths.""" + with patch.object(Jailbreak, "_resolve_seed_groups", return_value=mock_memory_seed_groups): + scenario = Jailbreak(jailbreak_paths=mock_jailbreak_paths) + assert scenario._jailbreak_paths == mock_jailbreak_paths + assert scenario._jailbreaks == [] + + def test_init_raises_exception_when_both_num_and_names(self, mock_random_num_templates, mock_templates): """Test failure on providing mutually exclusive arguments.""" with pytest.raises(ValueError): Jailbreak(num_templates=mock_random_num_templates, jailbreak_names=mock_templates) + def test_init_raises_exception_when_both_num_and_paths(self, mock_jailbreak_paths, mock_random_num_templates): + """Test failure when num_templates and jailbreak_paths are both provided.""" + with pytest.raises(ValueError): + Jailbreak(num_templates=mock_random_num_templates, jailbreak_paths=mock_jailbreak_paths) + + def test_init_raises_exception_when_both_paths_and_names( + self, mock_jailbreak_paths, mock_templates, mock_memory_seed_groups + ): + """Test failure when jailbreak_paths and jailbreak_names are both provided.""" + with pytest.raises(ValueError): + Jailbreak(jailbreak_paths=mock_jailbreak_paths, jailbreak_names=mock_templates) + def test_init_accepts_subdirectory_jailbreak_names(self, mock_objective_scorer, mock_memory_seed_groups): """Test that explicit jailbreak names can reference templates stored in subdirectories.""" # Pick a template that lives in a subdirectory (not top-level) @@ -203,6 +231,11 @@ async def test_init_raises_exception_when_no_datasets_available(self, mock_objec with pytest.raises(ValueError, match="DatasetConfiguration has no seed_groups"): await scenario.initialize_async(objective_target=mock_objective_target) + def test_init_raises_exception_when_path_not_found(self): + """Test failure when a jailbreak path does not exist on disk.""" + with pytest.raises(ValueError, match="not found"): + Jailbreak(jailbreak_paths=["/nonexistent/path/template.yaml"]) + @pytest.mark.usefixtures(*FIXTURES) class TestJailbreakAttackGeneration: