Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 65 additions & 56 deletions pyrit/common/display_response.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,65 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import io
import logging

from PIL import Image

from pyrit.common.notebook_utils import is_in_ipython_session
from pyrit.memory import CentralMemory
from pyrit.models import AzureBlobStorageIO, DiskStorageIO, MessagePiece

logger = logging.getLogger(__name__)


async def display_image_response(response_piece: MessagePiece) -> None:
"""
Display response images if running in notebook environment.

Args:
response_piece (MessagePiece): The response piece to display.

Raises:
RuntimeError: If storage IO is not initialized.
"""
memory = CentralMemory.get_memory_instance()
if (
response_piece.response_error == "none"
and response_piece.converted_value_data_type == "image_path"
and is_in_ipython_session()
):
image_location = response_piece.converted_value

try:
if memory.results_storage_io is None:
raise RuntimeError("Storage IO not initialized")
image_bytes = await memory.results_storage_io.read_file(image_location)
except Exception as e:
if isinstance(memory.results_storage_io, AzureBlobStorageIO):
try:
# Fallback to reading from disk if the storage IO fails
image_bytes = await DiskStorageIO().read_file(image_location)
except Exception as exc:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(exc)}")
return
else:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(e)}")
return

image_stream = io.BytesIO(image_bytes)
image = Image.open(image_stream)

# Jupyter built-in display function only works in notebooks.
display(image) # type: ignore[name-defined] # noqa: F821
if response_piece.response_error == "blocked":
logger.info("---\nContent blocked, cannot show a response.\n---")
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import io
import logging

from PIL import Image, ImageEnhance

from pyrit.common.notebook_utils import is_in_ipython_session
from pyrit.memory import CentralMemory
from pyrit.models import AzureBlobStorageIO, DiskStorageIO, MessagePiece

logger = logging.getLogger(__name__)


async def display_image_response(response_piece: MessagePiece, safe_outputs: bool = False) -> None:
"""
Display response images if running in notebook environment.

Args:
response_piece (MessagePiece): The response piece to display.
safe_outputs (bool): Whether to sanitize image outputs before displaying them.

Raises:
RuntimeError: If storage IO is not initialized.
"""
memory = CentralMemory.get_memory_instance()
if (
response_piece.response_error == "none"
and response_piece.converted_value_data_type == "image_path"
and is_in_ipython_session()
):
image_location = response_piece.converted_value

try:
if memory.results_storage_io is None:
raise RuntimeError("Storage IO not initialized")
image_bytes = await memory.results_storage_io.read_file(image_location)
except Exception as e:
if isinstance(memory.results_storage_io, AzureBlobStorageIO):
try:
# Fallback to reading from disk if the storage IO fails
image_bytes = await DiskStorageIO().read_file(image_location)
except Exception as exc:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(exc)}")
return
else:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(e)}")
return

image_stream = io.BytesIO(image_bytes)
image: Image.Image = Image.open(image_stream)

if safe_outputs:
new_width = int(image.width * 0.5)
new_height = int(image.height * 0.5)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)

image = ImageEnhance.Color(image).enhance(0.0)
image = image.rotate(90.0, expand=True, fillcolor=(255, 255, 255))

# Jupyter built-in display function only works in notebooks.
display(image) # type: ignore[name-defined] # noqa: F821
if response_piece.response_error == "blocked":
logger.info("---\nContent blocked, cannot show a response.\n---")
9 changes: 7 additions & 2 deletions pyrit/executor/attack/printer/console_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class ConsoleAttackResultPrinter(AttackResultPrinter):
for consoles that don't support ANSI characters.
"""

def __init__(self, *, width: int = 100, indent_size: int = 2, enable_colors: bool = True):
def __init__(
self, *, width: int = 100, indent_size: int = 2, enable_colors: bool = True, safe_outputs: bool = False
):
"""
Initialize the console printer.

Expand All @@ -34,6 +36,8 @@ def __init__(self, *, width: int = 100, indent_size: int = 2, enable_colors: boo
Defaults to 2.
enable_colors (bool): Whether to enable ANSI color output. When False,
all output will be plain text without colors. Defaults to True.
safe_outputs (bool): Whether to sanitize image outputs before displaying them.
Defaults to False.

Raises:
ValueError: If width <= 0 or indent_size < 0.
Expand All @@ -42,6 +46,7 @@ def __init__(self, *, width: int = 100, indent_size: int = 2, enable_colors: boo
self._width = width
self._indent = " " * indent_size
self._enable_colors = enable_colors
self._safe_outputs = safe_outputs

def _print_colored(self, text: str, *colors: str) -> None:
"""
Expand Down Expand Up @@ -227,7 +232,7 @@ async def print_messages_async(
self._print_wrapped_text(piece.converted_value, Fore.YELLOW)

# Display images if present
await display_image_response(piece)
await display_image_response(response_piece=piece, safe_outputs=self._safe_outputs)

# Print scores with better formatting (only if scores are requested)
if include_scores:
Expand Down
11 changes: 10 additions & 1 deletion pyrit/executor/attack/printer/markdown_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import os
from datetime import datetime, timezone
from pathlib import Path

from pyrit.executor.attack.printer.attack_result_printer import AttackResultPrinter
from pyrit.memory import CentralMemory
Expand All @@ -18,17 +19,20 @@ class MarkdownAttackResultPrinter(AttackResultPrinter):
markdown formatting that should be properly rendered.
"""

def __init__(self, *, display_inline: bool = True):
def __init__(self, *, display_inline: bool = True, output_file_path: Path | None = None):
"""
Initialize the markdown printer.

Args:
display_inline (bool): If True, uses IPython.display to render markdown
inline in Jupyter notebooks. If False, prints markdown strings.
Defaults to True.
output_file_path (Path | None): If set, markdown output is appended to this
file instead of being displayed or printed. Defaults to None.
"""
self._memory = CentralMemory.get_memory_instance()
self._display_inline = display_inline
self._output_file_path = output_file_path

def _render_markdown(self, markdown_lines: list[str]) -> None:
"""
Expand All @@ -42,6 +46,11 @@ def _render_markdown(self, markdown_lines: list[str]) -> None:
"""
full_markdown = "\n".join(markdown_lines)

if self._output_file_path:
with open(self._output_file_path, "a", encoding="utf-8") as f:
f.write(full_markdown + "\n")
return

if self._display_inline:
try:
from IPython.display import Markdown, display
Expand Down
110 changes: 75 additions & 35 deletions pyrit/scenario/scenarios/airt/jailbreak.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(
num_templates: Optional[int] = None,
num_attempts: int = 1,
jailbreak_names: list[str] | None = None,
jailbreak_paths: list[str] | None = None,
) -> None:
"""
Initialize the jailbreak scenario.
Expand All @@ -134,26 +135,32 @@ def __init__(
objective_scorer (Optional[TrueFalseScorer]): Scorer for detecting successful jailbreaks
(non-refusal). If not provided, defaults to an inverted refusal scorer.
include_baseline (bool): Whether to include a baseline atomic attack that sends all
objectives without modifications. Defaults to True.
objectives without modifications. Defaults to False.
scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume.
num_templates (Optional[int]): Choose num_templates random jailbreaks rather than using all of them.
num_attempts (Optional[int]): Number of times to try each jailbreak.
jailbreak_names (Optional[List[str]]): List of jailbreak names from the template list under datasets.
to use.
jailbreak_names (Optional[List[str]]): List of jailbreak names from the template list under datasets
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to hold off on the jailbreak changes.

We're grabbing techniques a different way that would be more extensible. But we should use this to do the same thing.

We can do this @fdubut; we can update it here or elsewhere. But we should use #1622

to use. Mutually exclusive with jailbreak_paths and num_templates.
jailbreak_paths (Optional[List[str]]): List of absolute or relative paths to YAML jailbreak
template files to use. Mutually exclusive with jailbreak_names and num_templates.

Raises:
ValueError: If both jailbreak_names and num_templates are provided, as random selection
is incompatible with a predetermined list.
ValueError: If the jailbreak_names list contains a jailbreak that isn't in the listed
templates.
ValueError: If more than one of jailbreak_names, jailbreak_paths, or num_templates is provided,
as the three selection modes are mutually exclusive.
ValueError: If the jailbreak_names list contains a name that isn't in the discovered templates.
ValueError: If any path in jailbreak_paths does not exist on disk.

"""
if jailbreak_names is None:
jailbreak_names = []
if jailbreak_names and num_templates:
if jailbreak_paths is None:
jailbreak_paths = []

provided_sources = [bool(jailbreak_names), bool(jailbreak_paths), bool(num_templates)]
if sum(provided_sources) > 1:
raise ValueError(
"Please provide only one of `num_templates` (random selection)"
" or `jailbreak_names` (specific selection)."
"Please provide only one of `num_templates` (random selection),"
" `jailbreak_names` (selection by name), or `jailbreak_paths` (selection by path)."
)

self._objective_scorer: TrueFalseScorer = (
Expand All @@ -163,22 +170,30 @@ def __init__(
self._num_templates = num_templates
self._num_attempts = num_attempts
self._adversarial_target: Optional[OpenAIChatTarget] = None

# Note that num_templates and jailbreak_names are mutually exclusive.
# If self._num_templates is None, then this returns all discoverable jailbreak templates.
# If self._num_templates has some value, then all_templates is a subset of all available
# templates, but jailbreak_names is guaranteed to be [], so diff = {}.
all_templates = TextJailBreak.get_jailbreak_templates(num_templates=self._num_templates)

# Example: if jailbreak_names is {'a', 'b', 'c'}, and all_templates is {'b', 'c', 'd'},
# then diff = {'a'}, which raises the error as 'a' was not discovered in all_templates.
diff = set(jailbreak_names) - set(all_templates)
if len(diff) > 0:
raise ValueError(f"Error: could not find templates `{diff}`!")

# If jailbreak_names has some value, then `if jailbreak_names` passes, and self._jailbreaks
# is set to jailbreak_names. Otherwise we use all_templates.
self._jailbreaks = jailbreak_names if jailbreak_names else all_templates
self._jailbreak_paths = jailbreak_paths
self._jailbreaks = jailbreak_names

if jailbreak_paths:
missing = [p for p in jailbreak_paths if not Path(p).exists()]
if missing:
raise ValueError(f"Jailbreak template paths not found: {missing}")
else:
# Note that num_templates and jailbreak_names are mutually exclusive.
# If self._num_templates is None, then this returns all discoverable jailbreak templates.
# If self._num_templates has some value, then all_templates is a subset of all available
# templates, but jailbreak_names is guaranteed to be [], so diff = {}.
all_templates = TextJailBreak.get_jailbreak_templates(num_templates=self._num_templates)

# Example: if jailbreak_names is {'a', 'b', 'c'}, and all_templates is {'b', 'c', 'd'},
# then diff = {'a'}, which raises the error as 'a' was not discovered in all_templates.
diff = set(jailbreak_names) - set(all_templates)
if len(diff) > 0:
raise ValueError(f"Error: could not find templates `{diff}`!")

# If jailbreak_names has some value, then `if jailbreak_names` passes, and self._jailbreaks
# is set to jailbreak_names. Otherwise we use all_templates.
if not jailbreak_names:
self._jailbreaks = all_templates

super().__init__(
version=self.VERSION,
Expand Down Expand Up @@ -236,31 +251,50 @@ def _resolve_seed_groups(self) -> list[SeedAttackGroup]:
return list(seed_groups)

async def _get_atomic_attack_from_strategy_async(
self, *, strategy: str, jailbreak_template_name: str
self,
*,
strategy: str,
jailbreak_template_name: Optional[str] = None,
jailbreak_template_path: Optional[str] = None,
) -> AtomicAttack:
"""
Create an atomic attack for a specific jailbreak template.

Exactly one of jailbreak_template_name or jailbreak_template_path must be provided.

Args:
strategy (str): JailbreakStrategy to use.
jailbreak_template_name (str): Name of the jailbreak template file.
jailbreak_template_name (Optional[str]): Name of the jailbreak template file (resolved
from the predefined templates directory).
jailbreak_template_path (Optional[str]): Absolute or relative path to a YAML jailbreak
template file.

Returns:
AtomicAttack: An atomic attack using the specified jailbreak template.

Raises:
ValueError: If scenario is not properly initialized.
ValueError: If neither or both template source arguments are provided.
"""
# objective_target is guaranteed to be non-None by parent class validation
if self._objective_target is None:
raise ValueError(
"Scenario not properly initialized. Call await scenario.initialize_async() before running."
)

# Create the jailbreak converter
jailbreak_converter = TextJailbreakConverter(
jailbreak_template=TextJailBreak(template_file_name=jailbreak_template_name)
)
if not jailbreak_template_name and not jailbreak_template_path:
raise ValueError("One of jailbreak_template_name or jailbreak_template_path must be provided.")

if jailbreak_template_name and jailbreak_template_path:
raise ValueError("Only one of jailbreak_template_name or jailbreak_template_path can be provided.")

# Create the jailbreak converter from name or path
if jailbreak_template_path:
jailbreak_template = TextJailBreak(template_path=jailbreak_template_path)
else:
jailbreak_template = TextJailBreak(template_file_name=jailbreak_template_name)

jailbreak_converter = TextJailbreakConverter(jailbreak_template=jailbreak_template)

# Create converter configuration
converter_config = AttackConverterConfig(
Expand Down Expand Up @@ -292,11 +326,11 @@ async def _get_atomic_attack_from_strategy_async(
if not attack:
raise ValueError(f"Attack cannot be None!")

# Extract template name without extension for the atomic attack name
template_name = Path(jailbreak_template_name).stem
# Extract template stem from whichever source was provided
template_stem = Path(jailbreak_template_path or jailbreak_template_name).stem

return AtomicAttack(
atomic_attack_name=f"jailbreak_{template_name}",
atomic_attack_name=f"jailbreak_{template_stem}",
attack_technique=AttackTechnique(attack=attack),
seed_groups=self._seed_groups or [],
)
Expand Down Expand Up @@ -324,5 +358,11 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]:
strategy=strategy, jailbreak_template_name=template_name
)
atomic_attacks.append(atomic_attack)
for template_path in self._jailbreak_paths:
for _ in range(self._num_attempts):
atomic_attack = await self._get_atomic_attack_from_strategy_async(
strategy=strategy, jailbreak_template_path=template_path
)
atomic_attacks.append(atomic_attack)

return atomic_attacks
Loading
Loading