Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ toml==0.10.2
twilio==7.15.0
typing-extensions==4.12.2
versioneer==0.19
fideslang==3.1.2
fideslang==3.1.3a0
3 changes: 3 additions & 0 deletions src/fides/api/graph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
from fides.api.schemas.partitioning.time_based_partitioning import (
validate_partitioning_list,
)
from fides.api.schemas.query_hints.base import QueryHints
from fides.api.util.collection_util import merge_dicts
from fides.api.util.querytoken import QueryToken

Expand Down Expand Up @@ -464,6 +465,8 @@ class Collection(BaseModel):
data_categories: Set[FidesKey] = set()
masking_strategy_override: Optional[MaskingStrategyOverride] = None
partitioning: Optional[Union[List[TimeBasedPartitioning], Dict[str, Any]]] = None
# Query hints for optimizing database queries (e.g., MAXDOP for MSSQL)
query_hints: Optional[QueryHints] = None

@property
def field_dict(self) -> Dict[FieldPath, Field]:
Expand Down
20 changes: 20 additions & 0 deletions src/fides/api/models/datasetconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from fides.api.graph.data_type import parse_data_type_string
from fides.api.models.connectionconfig import ConnectionConfig, ConnectionType
from fides.api.schemas.query_hints.base import QueryHints
from fides.api.service.masking.strategy.masking_strategy import MaskingStrategy
from fides.api.util.saas_util import merge_datasets

Expand Down Expand Up @@ -336,6 +337,24 @@ def convert_dataset_to_graph(
if collection.fides_meta and collection.fides_meta.partitioning:
collection_partitioning = collection.fides_meta.partitioning

# Extract query hints from collection metadata if present
collection_query_hints = None
if (
collection.fides_meta
and hasattr(collection.fides_meta, "query_hints")
and collection.fides_meta.query_hints
):
try:
collection_query_hints = QueryHints(
hints=collection.fides_meta.query_hints
)
except Exception:
logger.warning(
"Invalid query_hints on collection {}.{}, ignoring",
dataset_name,
collection.name,
)

graph_collection = Collection(
name=collection.name,
fields=graph_fields,
Expand All @@ -347,6 +366,7 @@ def convert_dataset_to_graph(
set(collection.data_categories) if collection.data_categories else set()
),
partitioning=collection_partitioning,
query_hints=collection_query_hints,
)
graph_collections.append(graph_collection)
logger.debug(
Expand Down
11 changes: 11 additions & 0 deletions src/fides/api/schemas/query_hints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Query hints schemas for database-specific query optimization."""

from fides.api.schemas.query_hints.base import QueryHint, QueryHints
from fides.api.schemas.query_hints.mssql_query_hints import MSSQLHintType, MSSQLQueryHint

__all__ = [
"QueryHint",
"QueryHints",
"MSSQLHintType",
"MSSQLQueryHint",
]
100 changes: 100 additions & 0 deletions src/fides/api/schemas/query_hints/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Base classes for query hints."""

from abc import ABC, abstractmethod
from typing import Any, ClassVar, Dict, List, Optional, Set, Type

from loguru import logger
from pydantic import BaseModel


class QueryHint(BaseModel, ABC):
"""
Base class for database-specific query hints.

Each database implementation must define:
- The hint type enum
- Validation for hint values
- How to render the hint as SQL
"""

# Registry of implementations by connection type
_implementations: ClassVar[Dict[str, Type["QueryHint"]]] = {}

# The connection types this hint applies to
connection_types: ClassVar[Set[str]] = set()

def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
for conn_type in cls.connection_types:
cls._implementations[conn_type] = cls

@classmethod
def get_implementation(cls, connection_type: str) -> Optional[Type["QueryHint"]]:
"""Get the QueryHint implementation for a connection type."""
return cls._implementations.get(connection_type)

@classmethod
def get_supported_connection_types(cls) -> Set[str]:
"""Get all connection types that support query hints."""
return set(cls._implementations.keys())

@abstractmethod
def to_sql_option(self) -> str:
"""
Render this hint as a SQL OPTION clause component.

Returns the hint without the OPTION() wrapper, e.g., "MAXDOP 1"
"""


class QueryHints(BaseModel):
"""
Container for multiple query hints that can be specified on a Collection.

Example YAML:
fides_meta:
query_hints:
- hint_type: maxdop
value: 1
"""

hints: List[Dict[str, Any]] = []

def get_hints_for_connection_type(self, connection_type: str) -> List[QueryHint]:
"""
Parse and validate hints for a specific connection type.
Returns only hints that are valid for this connection type.
"""
implementation = QueryHint.get_implementation(connection_type)
if implementation is None:
return []

valid_hints = []
for hint_dict in self.hints:
try:
hint = implementation.model_validate(hint_dict)
valid_hints.append(hint)
except (ValueError, Exception) as exc:
# Skip hints that don't validate for this connection type
logger.debug(
"Skipping invalid query hint for connection type {}: {}",
connection_type,
exc,
)
continue

return valid_hints

def to_sql_option_clause(self, connection_type: str) -> Optional[str]:
"""
Generate the full SQL OPTION clause for this connection type.

Returns None if no valid hints exist for this connection type.
Returns e.g., "OPTION (MAXDOP 1)" for MSSQL.
"""
hints = self.get_hints_for_connection_type(connection_type)
if not hints:
return None

hint_parts = [hint.to_sql_option() for hint in hints]
return f"OPTION ({', '.join(hint_parts)})"
64 changes: 64 additions & 0 deletions src/fides/api/schemas/query_hints/mssql_query_hints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Microsoft SQL Server specific query hints."""

from enum import Enum
from typing import ClassVar, Optional, Set

from pydantic import model_validator

from fides.api.schemas.query_hints.base import QueryHint


class MSSQLHintType(str, Enum):
"""
Supported Microsoft SQL Server query hints.

Reference: https://learn.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-query

We explicitly enumerate only safe, performance-related hints.
This prevents SQL injection by only allowing known hint types.
"""

# Parallelism hints
MAXDOP = "maxdop"

# Future hints can be added here as needed:
# RECOMPILE = "recompile"
# OPTIMIZE_FOR_UNKNOWN = "optimize_for_unknown"
# FAST = "fast"
# MAXRECURSION = "maxrecursion"


class MSSQLQueryHint(QueryHint):
"""
Microsoft SQL Server query hint.

Example usage in Dataset YAML:
fides_meta:
query_hints:
- hint_type: maxdop
value: 1
"""

connection_types: ClassVar[Set[str]] = {"mssql"}

hint_type: MSSQLHintType
value: Optional[int] = None

@model_validator(mode="after")
def validate_hint_value(self) -> "MSSQLQueryHint":
"""Validate that the hint has appropriate values."""
if self.hint_type == MSSQLHintType.MAXDOP:
if self.value is None:
raise ValueError("MAXDOP hint requires a value")
if not isinstance(self.value, int) or self.value < 0 or self.value > 64:
raise ValueError("MAXDOP value must be an integer between 0 and 64")

return self

def to_sql_option(self) -> str:
"""Render as SQL OPTION clause component."""
if self.hint_type == MSSQLHintType.MAXDOP:
return f"MAXDOP {self.value}"

# Future hints would be handled here
raise ValueError(f"Unknown hint type: {self.hint_type}")
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import List

from fides.api.service.connectors.query_configs.query_config import (
QueryStringWithoutTuplesOverrideQueryConfig,
)
Expand All @@ -7,3 +9,27 @@ class MicrosoftSQLServerQueryConfig(QueryStringWithoutTuplesOverrideQueryConfig)
"""
Generates SQL valid for SQLServer.
"""

def get_formatted_query_string(
self,
field_list: str,
clauses: List[str],
) -> str:
"""
Returns an SQL query string with optional MSSQL query hints.

If query_hints are configured on the collection, appends an OPTION clause.
Example output:
SELECT a, b FROM table WHERE x IN (:x) OPTION (MAXDOP 1)
"""
base_query = f"SELECT {field_list} FROM {self.node.collection.name} WHERE {' OR '.join(clauses)}"

# Check if collection has query hints configured
if self.node.collection.query_hints:
option_clause = self.node.collection.query_hints.to_sql_option_clause(
"mssql"
)
if option_clause:
return f"{base_query} {option_clause}"

return base_query
116 changes: 116 additions & 0 deletions tests/ops/schemas/test_query_hints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for query hints schemas."""

import pytest
from pydantic import ValidationError

from fides.api.schemas.query_hints.base import QueryHint, QueryHints
from fides.api.schemas.query_hints.mssql_query_hints import MSSQLHintType, MSSQLQueryHint


class TestMSSQLQueryHint:
"""Tests for Microsoft SQL Server query hints."""

def test_maxdop_hint_valid(self):
"""Test that a valid MAXDOP hint is created successfully."""
hint = MSSQLQueryHint(hint_type=MSSQLHintType.MAXDOP, value=1)
assert hint.hint_type == MSSQLHintType.MAXDOP
assert hint.value == 1
assert hint.to_sql_option() == "MAXDOP 1"

def test_maxdop_hint_zero(self):
"""Test that MAXDOP 0 is valid (unlimited parallelism)."""
hint = MSSQLQueryHint(hint_type=MSSQLHintType.MAXDOP, value=0)
assert hint.to_sql_option() == "MAXDOP 0"

def test_maxdop_hint_max_value(self):
"""Test that MAXDOP 64 is valid (max allowed)."""
hint = MSSQLQueryHint(hint_type=MSSQLHintType.MAXDOP, value=64)
assert hint.to_sql_option() == "MAXDOP 64"

def test_maxdop_hint_missing_value(self):
"""Test that MAXDOP hint requires a value."""
with pytest.raises(ValidationError) as exc_info:
MSSQLQueryHint(hint_type=MSSQLHintType.MAXDOP)
assert "MAXDOP hint requires a value" in str(exc_info.value)

def test_maxdop_hint_negative_value(self):
"""Test that negative MAXDOP values are rejected."""
with pytest.raises(ValidationError) as exc_info:
MSSQLQueryHint(hint_type=MSSQLHintType.MAXDOP, value=-1)
assert "MAXDOP value must be an integer between 0 and 64" in str(exc_info.value)

def test_maxdop_hint_value_too_high(self):
"""Test that MAXDOP values > 64 are rejected."""
with pytest.raises(ValidationError) as exc_info:
MSSQLQueryHint(hint_type=MSSQLHintType.MAXDOP, value=65)
assert "MAXDOP value must be an integer between 0 and 64" in str(exc_info.value)

def test_from_dict(self):
"""Test creating hint from dictionary (as would come from YAML)."""
hint_dict = {"hint_type": "maxdop", "value": 1}
hint = MSSQLQueryHint.model_validate(hint_dict)
assert hint.hint_type == MSSQLHintType.MAXDOP
assert hint.value == 1


class TestQueryHint:
"""Tests for the base QueryHint class."""

def test_mssql_implementation_registered(self):
"""Test that MSSQL implementation is registered."""
impl = QueryHint.get_implementation("mssql")
assert impl == MSSQLQueryHint

def test_unknown_connection_type_returns_none(self):
"""Test that unknown connection types return None."""
impl = QueryHint.get_implementation("unknown_db")
assert impl is None

def test_supported_connection_types(self):
"""Test getting supported connection types."""
supported = QueryHint.get_supported_connection_types()
assert "mssql" in supported


class TestQueryHints:
"""Tests for the QueryHints container."""

def test_empty_hints(self):
"""Test empty hints container."""
hints = QueryHints(hints=[])
assert hints.get_hints_for_connection_type("mssql") == []
assert hints.to_sql_option_clause("mssql") is None

def test_single_mssql_hint(self):
"""Test single MSSQL hint."""
hints = QueryHints(hints=[{"hint_type": "maxdop", "value": 1}])
mssql_hints = hints.get_hints_for_connection_type("mssql")
assert len(mssql_hints) == 1
assert mssql_hints[0].to_sql_option() == "MAXDOP 1"

def test_to_sql_option_clause(self):
"""Test generating full OPTION clause."""
hints = QueryHints(hints=[{"hint_type": "maxdop", "value": 1}])
clause = hints.to_sql_option_clause("mssql")
assert clause == "OPTION (MAXDOP 1)"

def test_invalid_hints_skipped(self):
"""Test that invalid hints are skipped silently."""
hints = QueryHints(
hints=[
{"hint_type": "maxdop", "value": 1}, # Valid
{"hint_type": "invalid_hint", "value": 99}, # Invalid
{"hint_type": "maxdop", "value": -1}, # Invalid value
]
)
mssql_hints = hints.get_hints_for_connection_type("mssql")
# Only the valid hint should be returned
assert len(mssql_hints) == 1
assert mssql_hints[0].value == 1

def test_hints_for_unsupported_connection_type(self):
"""Test that unsupported connection types return no hints."""
hints = QueryHints(hints=[{"hint_type": "maxdop", "value": 1}])
postgres_hints = hints.get_hints_for_connection_type("postgres")
assert postgres_hints == []
assert hints.to_sql_option_clause("postgres") is None
Loading
Loading