From 1ea5d4bdaf32520b79349ed8cb772bfae3c81745 Mon Sep 17 00:00:00 2001 From: Cameron Bender Date: Thu, 24 Jul 2025 12:12:01 -0400 Subject: [PATCH 1/8] feat: fingerprint algorithm added --- ignore_helpers/adaptive_randomizer.py | 11 ++ ignore_helpers/attack_sim.py | 68 +++++++++++ ignore_helpers/fingerprinting.py | 88 ++++++++++++++ ignore_helpers/poison.py | 98 +++++++++++++++ ignore_helpers/verification_workflow.py | 13 ++ ignore_notes | 2 + ignore_tests/test_fingerprinting.py | 144 +++++++++++++++++++++++ ignore_tests/test_model.py | 27 +++++ models/defense/QueryBasedVerification.py | 84 +++++++++++++ models/defense/__init__.py | 4 +- 10 files changed, 538 insertions(+), 1 deletion(-) create mode 100644 ignore_helpers/adaptive_randomizer.py create mode 100644 ignore_helpers/attack_sim.py create mode 100644 ignore_helpers/fingerprinting.py create mode 100644 ignore_helpers/poison.py create mode 100644 ignore_helpers/verification_workflow.py create mode 100644 ignore_notes create mode 100644 ignore_tests/test_fingerprinting.py create mode 100644 ignore_tests/test_model.py create mode 100644 models/defense/QueryBasedVerification.py diff --git a/ignore_helpers/adaptive_randomizer.py b/ignore_helpers/adaptive_randomizer.py new file mode 100644 index 0000000..3b5d957 --- /dev/null +++ b/ignore_helpers/adaptive_randomizer.py @@ -0,0 +1,11 @@ +class AdaptiveRandomizer: + def __init__(self, candidate_nodes): + self.candidates = candidate_nodes + + def sample_candidates(self, sample_size): + # Randomly sample a subset before selecting fingerprints + pass + + def apply_random_label(self, labels): + # Randomly shuffle/mutate fingerprint labels for adaptive defense + pass diff --git a/ignore_helpers/attack_sim.py b/ignore_helpers/attack_sim.py new file mode 100644 index 0000000..eea1214 --- /dev/null +++ b/ignore_helpers/attack_sim.py @@ -0,0 +1,68 @@ +import torch +import numpy as np +import random + +def true_bit_flip(tensor, index=None, bit=0): + """ + Flips a single bit (bit index) of a float32 tensor element at a specified index. + bit=0: least significant bit (LSB) + """ + # Copy as numpy array for bit manipulation + a = tensor.detach().cpu().numpy().copy() + flat = a.ravel() + if index is None: + index = np.random.randint(0, flat.size) + old_val = flat[index] + # Get float as int + int_view = np.frombuffer(flat[index].tobytes(), dtype=np.uint32)[0] + # Flip the bit + int_view ^= (1 << bit) + # Back to float + new_val = np.frombuffer(np.uint32(int_view).tobytes(), dtype=np.float32)[0] + flat[index] = new_val + # Restore to tensor + a = flat.reshape(a.shape) + tensor.data = torch.from_numpy(a).to(tensor.device) + return old_val, new_val, index + +class BitFlipAttack: + def __init__(self, model, attack_type='random', bit=0): + """ + attack_type: 'random' (any param), 'BFA-F' (first layer), 'BFA-L' (last layer) + bit: which bit to flip (0 = LSB, 23 = start of mantissa, 30 = exponent, etc.) + """ + self.model = model + self.attack_type = attack_type + self.bit = bit + + def _get_target_params(self): + params = [p for p in self.model.parameters() if p.requires_grad and p.numel() > 0] + if self.attack_type == 'random': + return params + elif self.attack_type == 'BFA-F': # First layer only + return [params[0]] # Assumes first param is first layer (usually weights) + elif self.attack_type == 'BFA-L': # Last layer only + return [params[-1]] # Assumes last param is last layer (usually bias or weights) + else: + raise ValueError(f"Unknown attack_type {self.attack_type}") + + def apply(self): + """ + Apply the bit-flip attack in-place. + Returns: (layer_idx, param_idx, old_val, new_val) + """ + params = self._get_target_params() + with torch.no_grad(): + layer_idx = random.randrange(len(params)) + param = params[layer_idx] + idx = random.randrange(param.numel()) + old_val, new_val, actual_idx = true_bit_flip(param, index=idx, bit=self.bit) + return { + 'layer': layer_idx, + 'param_idx': actual_idx, + 'old_val': old_val, + 'new_val': new_val, + 'bit': self.bit, + 'attack_type': self.attack_type + } + diff --git a/ignore_helpers/fingerprinting.py b/ignore_helpers/fingerprinting.py new file mode 100644 index 0000000..cd68bb3 --- /dev/null +++ b/ignore_helpers/fingerprinting.py @@ -0,0 +1,88 @@ +import torch +import torch.nn.functional as F + +class TransductiveFingerprintGenerator: + def __init__(self, model, dataset, candidate_fraction=1.0, random_seed=None, device='cpu'): + """ + Args: + model: Trained GNN model (PyTorch, implements forward(graph, features)) + dataset: PyGIP Dataset object with .graph, .features, .labels + candidate_fraction: float, what fraction of nodes to consider as candidates (default 1.0 = all) + random_seed: int, seed for reproducibility (optional) + device: device string (cpu/cuda) + """ + self.model = model.to(device) + self.dataset = dataset + self.candidate_fraction = candidate_fraction + self.random_seed = random_seed + self.device = device + + def get_candidate_nodes(self): + all_nodes = torch.arange(self.dataset.graph.num_nodes()) + if self.candidate_fraction < 1.0: + num_candidates = int(len(all_nodes) * self.candidate_fraction) + generator = torch.Generator(device=self.device) + if self.random_seed is not None: + generator.manual_seed(self.random_seed) + idx = torch.randperm(len(all_nodes), generator=generator)[:num_candidates] + return all_nodes[idx] + return all_nodes + + def compute_fingerprint_scores_full(self, candidate_nodes): + """ + Full model knowledge (Transductive-F): uses gradient norms. + """ + self.model.eval() + scores = [] + logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + for node in candidate_nodes: + logit = logits[node] + label = logit.argmax().item() + loss = F.nll_loss(F.log_softmax(logit.unsqueeze(0), dim=1), torch.tensor([label], device=self.device)) + self.model.zero_grad() + loss.backward(retain_graph=True) + # Sum of gradient norms for all parameters + grad_norm = 0.0 + for p in self.model.parameters(): + if p.grad is not None: + grad_norm += (p.grad ** 2).sum().item() + scores.append(grad_norm) + return torch.tensor(scores, device=self.device) + + def compute_fingerprint_scores_limited(self, candidate_nodes): + """ + Limited model knowledge (Transductive-L): uses confidence. + """ + self.model.eval() + logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + probs = F.softmax(logits, dim=1) + labels = probs.argmax(dim=1) + # Score is 1 - confidence of the predicted class (Eq. 6) + scores = 1.0 - probs[candidate_nodes, labels[candidate_nodes]] + return scores + + def select_top_fingerprints(self, scores, candidate_nodes, k): + topk = torch.topk(scores, k) + return candidate_nodes[topk.indices], topk.values + + def generate_fingerprints(self, k=5, method='full'): + """ + Args: + k: Number of fingerprints to generate + method: 'full' for Transductive-F, 'limited' for Transductive-L + Returns: + List of (node_id, label) tuples + """ + candidate_nodes = self.get_candidate_nodes().to(self.device) + if method == 'full': + scores = self.compute_fingerprint_scores_full(candidate_nodes) + elif method == 'limited': + scores = self.compute_fingerprint_scores_limited(candidate_nodes) + else: + raise ValueError("method must be 'full' or 'limited'") + fingerprint_nodes, _ = self.select_top_fingerprints(scores, candidate_nodes, k) + # Use model to get labels for fingerprint nodes + logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + labels = logits.argmax(dim=1) + fingerprints = [(int(n), int(labels[n])) for n in fingerprint_nodes] + return fingerprints diff --git a/ignore_helpers/poison.py b/ignore_helpers/poison.py new file mode 100644 index 0000000..764071a --- /dev/null +++ b/ignore_helpers/poison.py @@ -0,0 +1,98 @@ +import copy +import random +import torch + +def random_edge_addition_poisoning(dataset, perturb_frac, random_seed=None): + """ + Returns a new DGLGraph with random edges added. + + Args: + dataset: Dataset object (with .graph as DGLGraph) + perturb_frac: Fraction of edges to add (e.g., 0.01 = 1%) + random_seed: Optional integer for reproducibility + + Returns: + poisoned_graph: DGLGraph (deepcopy of original with new edges) + """ + import dgl + + if random_seed is not None: + random.seed(random_seed) + torch.manual_seed(random_seed) + + orig_graph = dataset.graph + poisoned_graph = copy.deepcopy(orig_graph) + num_nodes = poisoned_graph.num_nodes() + num_edges_to_add = int(perturb_frac * orig_graph.num_edges()) + + # Build set of all existing edges (as (u,v) pairs) + existing_edges = set(zip( + orig_graph.edges()[0].tolist(), + orig_graph.edges()[1].tolist() + )) + + # Generate candidate node pairs (exclude self-loops and duplicates) + candidate_pairs = [ + (i, j) + for i in range(num_nodes) + for j in range(num_nodes) + if i != j and (i, j) not in existing_edges + ] + + if len(candidate_pairs) < num_edges_to_add: + raise ValueError("Perturbation budget too large: not enough candidate edges.") + + new_edges = random.sample(candidate_pairs, num_edges_to_add) + src, dst = zip(*new_edges) + poisoned_graph.add_edges(src, dst) + + return poisoned_graph + +def retrain_poisoned_model(dataset, poisoned_graph, defense_class, device='cpu'): + """ + Retrain target GCN using the poisoned graph structure. + + Args: + dataset: Original Dataset object (provides features, labels, masks) + poisoned_graph: DGLGraph (with new random edges added) + defense_class: The defense class to use for model training (e.g., QueryBasedVerificationDefense) + device: 'cpu' or 'cuda' + + Returns: + model: Trained GCN model + """ + # Create a shallow copy and swap in the poisoned graph + dataset_poisoned = copy.copy(dataset) + dataset_poisoned.graph = poisoned_graph + + # If Dataset is more complex, you may want to rebuild it from scratch + defense = defense_class(dataset=dataset_poisoned, attack_node_fraction=0.1) + model = defense._train_target_model() + return model + +def evaluate_accuracy(model, dataset, device='cpu'): + """ + Evaluates test accuracy of the given model on the dataset. + + Args: + model: Trained GCN model + dataset: Dataset object (provides features, labels, test_mask, graph) + device: 'cpu' or 'cuda' + + Returns: + accuracy: float (test accuracy, 0-1) + """ + model.eval() + features = dataset.features.to(device) + labels = dataset.labels.to(device) + test_mask = dataset.test_mask + + with torch.no_grad(): + logits = model(dataset.graph.to(device), features) + pred = logits.argmax(dim=1) + correct = (pred[test_mask] == labels[test_mask]).float() + accuracy = correct.sum().item() / test_mask.sum().item() + return accuracy + +# (Optional) If you plan to support more attack types, you could add: +# def mettack_poisoning(...): ... diff --git a/ignore_helpers/verification_workflow.py b/ignore_helpers/verification_workflow.py new file mode 100644 index 0000000..25bc7f2 --- /dev/null +++ b/ignore_helpers/verification_workflow.py @@ -0,0 +1,13 @@ +class VerificationWorkflow: + def __init__(self, model, graph, labels, fingerprinting_args): + self.fingerprinter = Fingerprinting(model, graph, labels, **fingerprinting_args) + self.fingerprints = None + + def offline_phase(self): + # 1. Generate fingerprints and record expected outputs + self.fingerprints = self.fingerprinter.select_fingerprints() + + def online_phase(self, queried_model): + # 2. Query fingerprint nodes, compare predictions + # 3. Return detection result (True if any mismatch) + pass diff --git a/ignore_notes b/ignore_notes new file mode 100644 index 0000000..18aa2b9 --- /dev/null +++ b/ignore_notes @@ -0,0 +1,2 @@ +random poisoning implemented, need to test fingerprints on these then +mettack next diff --git a/ignore_tests/test_fingerprinting.py b/ignore_tests/test_fingerprinting.py new file mode 100644 index 0000000..a9fdad7 --- /dev/null +++ b/ignore_tests/test_fingerprinting.py @@ -0,0 +1,144 @@ +import torch +from datasets import Cora +from models.defense import QueryBasedVerificationDefense +from ignore_helpers import fingerprinting, attack_sim, poison +import torch.nn.functional as F +import copy # Python's deepcopy + + +def evaluate_fingerprints(model, dataset, fingerprints, device='cpu'): + model.eval() + logits = model(dataset.graph.to(device), dataset.features.to(device)) + pred_labels = logits.argmax(dim=1).cpu() + changed = [] + for node_id, clean_label in fingerprints: + if pred_labels[node_id] != clean_label: + changed.append((node_id, clean_label, int(pred_labels[node_id]))) + return changed + + + +def main_poisoning(num_trials=50, poison_frac=0.01): + + device = 'cuda' if torch.cuda.is_available() else 'cpu' + dataset = Cora() + + print("Training clean target model...") + defense = QueryBasedVerificationDefense(dataset=dataset, attack_node_fraction=0.1) + base_model = defense._train_target_model() + + # Accuracy before any poisoning + clean_acc = poison.evaluate_accuracy(base_model, dataset, device=device) + print(f"Clean model test accuracy: {clean_acc:.4f}") + + # # If/when you want to test fingerprints: + # generator = fingerprinting.TransductiveFingerprintGenerator(base_model, dataset, candidate_fraction=1.0, random_seed=42, device=device) + # fingerprints_full = generator.generate_fingerprints(k=k, method='full') + # fingerprints_limited = generator.generate_fingerprints(k=k, method='limited') + + poisoned_accuracies = [] + + for trial in range(num_trials): + poisoned_graph = poison.random_edge_addition_poisoning( + dataset=dataset, + perturb_frac=poison_frac, + random_seed=trial + ) + + # Make a dataset copy with the poisoned graph + dataset_poisoned = copy.copy(dataset) + dataset_poisoned.graph = poisoned_graph + + poisoned_model = poison.retrain_poisoned_model( + dataset=dataset_poisoned, # Use the poisoned dataset + poisoned_graph=poisoned_graph, + defense_class=QueryBasedVerificationDefense, + device=device + ) + + # Evaluate on the poisoned dataset + poisoned_acc = poison.evaluate_accuracy(poisoned_model, dataset_poisoned, device=device) + poisoned_accuracies.append(poisoned_acc) + + if trial == 0: + print(f"Example poisoned test accuracy: {poisoned_acc:.4f}") + + if (trial + 1) % 10 == 0: + print(f"Poison Trial {trial+1}/{num_trials}") + + + # # Evaluate fingerprints (disabled for now) + # changed_full = evaluate_fingerprints(poisoned_model, dataset, fingerprints_full, device=device) + # changed_limited = evaluate_fingerprints(poisoned_model, dataset, fingerprints_limited, device=device) + # if changed_full: + # detected_full += 1 + # if changed_limited: + # detected_limited += 1 + + # Final stats + avg_poisoned_acc = sum(poisoned_accuracies) / len(poisoned_accuracies) + print("\n==== Poisoning Results ====") + print(f"Average clean model test accuracy: {clean_acc:.4f}") + print(f"Average poisoned model test accuracy: {avg_poisoned_acc:.4f}") + print(f"Average accuracy drop: {clean_acc - avg_poisoned_acc:.4f}") + # print("\n==== Poisoning Detection Rate Results ====") + # print(f"Transductive-F (full knowledge) DR: {detected_full/num_trials:.3f}") + # print(f"Transductive-L (limited knowledge) DR: {detected_limited/num_trials:.3f}") + + + +def main(num_trials=100, k=5, attack_type='random', bit=0): + """ + :param num_trials: Number of attack rounds + :param k: Number of fingerprints + :param attack_type: 'random', 'BFA-F', 'BFA-L' + :param bit: Which bit to flip (0 = LSB, 23 = mantissa, 30 = exponent, etc.) + """ + device = 'cuda' if torch.cuda.is_available() else 'cpu' + dataset = Cora() + print("Training target model (baseline)...") + defense = QueryBasedVerificationDefense(dataset=dataset, attack_node_fraction=0.1) + base_model = defense._train_target_model() # Train ONCE + + generator = fingerprinting.TransductiveFingerprintGenerator(base_model, dataset, candidate_fraction=1.0, random_seed=42, device=device) + fingerprints_full = generator.generate_fingerprints(k=k, method='full') + fingerprints_limited = generator.generate_fingerprints(k=k, method='limited') + + detected_full = 0 + detected_limited = 0 + + for trial in range(num_trials): + attacked_model = copy.deepcopy(base_model) + attack = attack_sim.BitFlipAttack(attacked_model, attack_type=attack_type, bit=bit) + attack_result = attack.apply() + if trial < 5: + def float_to_bits(val): + import struct + [d] = struct.unpack(">L", struct.pack(">f", val)) + return f"{d:032b}" + old_val = attack_result['old_val'] + new_val = attack_result['new_val'] + bit_idx = attack_result['bit'] + print(f"Trial {trial+1} bit-flip details:") + print(f" Flipped bit: {bit_idx}") + print(f" Old value: {old_val} ({float_to_bits(old_val)})") + print(f" New value: {new_val} ({float_to_bits(new_val)})") + + changed_full = evaluate_fingerprints(attacked_model, dataset, fingerprints_full, device=device) + changed_limited = evaluate_fingerprints(attacked_model, dataset, fingerprints_limited, device=device) + if changed_full: + detected_full += 1 + if changed_limited: + detected_limited += 1 + if (trial + 1) % 10 == 0: + print(f"Trial {trial+1}/{num_trials}: F={detected_full} L={detected_limited}") + + + + print("\n==== Detection Rate Results ====") + print(f"Transductive-F (full knowledge) DR: {detected_full/num_trials:.3f}") + print(f"Transductive-L (limited knowledge) DR: {detected_limited/num_trials:.3f}") + +if __name__ == '__main__': + main_poisoning(num_trials=50, poison_frac=0.01) + diff --git a/ignore_tests/test_model.py b/ignore_tests/test_model.py new file mode 100644 index 0000000..6e325c4 --- /dev/null +++ b/ignore_tests/test_model.py @@ -0,0 +1,27 @@ +import torch +from datasets import Cora +from models.defense import QueryBasedVerificationDefense + +def test_train_target_model(): + # Load dataset + dataset = Cora() # substitute with your actual Dataset class if different + print("Dataset loaded.") + print(f"Features: {dataset.features.shape}, Labels: {dataset.labels.shape}") + + # Initialize defense object + defense = QueryBasedVerificationDefense(dataset=dataset, attack_node_fraction=0.1) + + # Train model + model = defense._train_target_model() + + # Test model outputs shape + model.eval() + with torch.no_grad(): + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + logits = model(dataset.graph.to(device), dataset.features.to(device)) + print("Logits shape:", logits.shape) + # Optionally: check output for a few nodes + print("First 5 node predictions:", logits[:5].argmax(dim=1).cpu().numpy()) + +if __name__ == "__main__": + test_train_target_model() diff --git a/models/defense/QueryBasedVerification.py b/models/defense/QueryBasedVerification.py new file mode 100644 index 0000000..02c2117 --- /dev/null +++ b/models/defense/QueryBasedVerification.py @@ -0,0 +1,84 @@ +from .base import BaseDefense +import torch +import torch.nn.functional as F +from torch.optim import Adam +from models.nn import GCN + +device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + + +class QueryBasedVerificationDefense(BaseDefense): + def __init__(self, dataset, attack_node_fraction, model_path=None): + super().__init__(dataset, attack_node_fraction) + self.model_path = model_path + + + def defend(self, *args, **kwargs): + """ + Main defense workflow for query-based verification. + For now, this is just a stub for testing and must be filled in later. + """ + print("defend() method called. Not implemented yet.") + model = self._load_model(self.model_path) if self.model_path else self._train_target_model() + # ... fingerprinting/verification logic ... + + def _train_target_model(self): + """ + Trains target GCN model according to protocol in + Wu et al. (2023), Section 6.1 for graph node classification. + + Returns + ------- + model : torch.nn.Module + The trained GCN model. + """ + model = GCN( + feature_number=self.dataset.feature_number, + label_number=self.dataset.label_number + ).to(device) + print(f"Training target model on device: {device} ...") + + optimizer = Adam(model.parameters(), lr=0.02) + loss_fn = torch.nn.NLLLoss() + + features = self.dataset.features.to(device) + labels = self.dataset.labels.to(device) + train_mask = self.dataset.train_mask.to(device) + # Use test_mask for validation monitoring if val_mask is not available + val_mask = getattr(self.dataset, "val_mask", None) + if val_mask is None: + val_mask = self.dataset.test_mask + val_mask = val_mask.to(device) + + for epoch in range(200): + model.train() + logits = model(self.dataset.graph.to(device), features) + log_probs = F.log_softmax(logits, dim=1) + loss = loss_fn(log_probs[train_mask], labels[train_mask]) + + optimizer.zero_grad() + loss.backward() + optimizer.step() + + if (epoch + 1) % 10 == 0 or epoch == 0: + model.eval() + with torch.no_grad(): + val_logits = model(self.dataset.graph.to(device), features) + val_log_probs = F.log_softmax(val_logits, dim=1) + val_pred = val_log_probs[val_mask].max(1)[1] + val_acc = (val_pred == labels[val_mask]).float().mean().item() + print(f"Epoch {epoch+1}: Loss={loss.item():.4f} | Val Acc={val_acc:.4f}") + + return model + + def _load_model(self, model_path): + # Load model weights if path is given + model = GCN( + in_feats=self.dataset.feature_number, + hidden_feats=16, + out_feats=self.dataset.label_number + ) + model.load_state_dict(torch.load(model_path)) + return model + + # ... _train_defense_model(), _train_surrogate_model() as needed ... diff --git a/models/defense/__init__.py b/models/defense/__init__.py index a507949..7432a8f 100644 --- a/models/defense/__init__.py +++ b/models/defense/__init__.py @@ -1,5 +1,6 @@ from .base import BaseDefense from .SurviveWM2 import OptimizedWatermarkDefense +from .QueryBasedVerification import QueryBasedVerificationDefense from .WatermarkDefense import ( WatermarkByRandomGraph, ) @@ -9,5 +10,6 @@ __all__ = [ 'BaseDefense', 'WatermarkByRandomGraph', - 'OptimizedWatermarkDefense' + 'OptimizedWatermarkDefense', + 'QueryBasedVerificationDefense' ] From c8735ef0480d1a145ac4c2f3bd39ae19f19cc725 Mon Sep 17 00:00:00 2001 From: Cameron Bender Date: Sun, 3 Aug 2025 19:22:27 -0400 Subject: [PATCH 2/8] Consolidated into PyGIP --- models/defense/QueryBasedVerification.py | 1057 +++++++++++++++++++++- 1 file changed, 1046 insertions(+), 11 deletions(-) diff --git a/models/defense/QueryBasedVerification.py b/models/defense/QueryBasedVerification.py index 02c2117..6468b6b 100644 --- a/models/defense/QueryBasedVerification.py +++ b/models/defense/QueryBasedVerification.py @@ -3,6 +3,18 @@ import torch.nn.functional as F from torch.optim import Adam from models.nn import GCN +import numpy as np +import random +from collections import Counter +from torch_geometric.utils import to_networkx, from_networkx, to_undirected +import networkx as nx +import copy +import torch.optim as optim +import dgl +from itertools import combinations +from tqdm import tqdm + + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') @@ -11,18 +23,69 @@ class QueryBasedVerificationDefense(BaseDefense): def __init__(self, dataset, attack_node_fraction, model_path=None): super().__init__(dataset, attack_node_fraction) self.model_path = model_path + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + - def defend(self, *args, **kwargs): + def defend(self, num_trials=10, k=5, attack_type='mettack', knowledge='full', mode='transductive', verbose=True, **kwargs): """ - Main defense workflow for query-based verification. - For now, this is just a stub for testing and must be filled in later. + Main defense routine. Generates fingerprints, runs attacks, and verifies integrity. + Returns a dict with per-trial and average metrics. """ - print("defend() method called. Not implemented yet.") - model = self._load_model(self.model_path) if self.model_path else self._train_target_model() - # ... fingerprinting/verification logic ... + trial_results = [] + for trial in range(num_trials): + if verbose: + print(f"\n=== Trial {trial+1}/{num_trials} ===") + + # Step 1: Train target model + model_clean = self._train_target_model() + acc_clean = self._evaluate_accuracy(model_clean, self.dataset) + + # Step 2: Fingerprint it + fingerprints = self._generate_fingerprints(model_clean, mode=mode, knowledge=knowledge, k=k, **kwargs) + + # Step 3: Attack the model + poisoned_model, attack_info = self._run_attack(model_clean, attack_type=attack_type, knowledge=knowledge, **kwargs) + poisoned_dataset = copy.deepcopy(self.dataset) + if 'graph' in attack_info: + poisoned_dataset.graph = attack_info['graph'] + acc_poisoned = self._evaluate_accuracy(poisoned_model, poisoned_dataset) + + + # Step 4: Detect fingerprint flips + flipped_info = self._evaluate_fingerprints(poisoned_model, fingerprints) + + flip_rate = flipped_info['flip_rate'] + acc_drop = acc_clean - acc_poisoned + + if verbose: + print(f"Clean Accuracy: {acc_clean:.4f}") + print(f"Poisoned Accuracy: {acc_poisoned:.4f}") + print(f"Accuracy Drop: {acc_drop:.4f}") + print(f"Flip Rate: {flip_rate:.4f}") + + trial_results.append({ + 'flip_rate': flip_rate, + 'accuracy_drop': acc_drop, + }) + + # Compute averages + avg_flip_rate = sum(r['flip_rate'] for r in trial_results) / num_trials + avg_acc_drop = sum(r['accuracy_drop'] for r in trial_results) / num_trials + + print(f"Clean Graph NumEdges: {self.dataset.graph.num_edges()}") + print(f"Poisoned Graph NumEdges: {poisoned_model.graph.num_edges() if hasattr(poisoned_model, 'graph') else 'N/A'}") + - def _train_target_model(self): + return { + 'trial_results': trial_results, + 'average_flip_rate': avg_flip_rate, + 'average_accuracy_drop': avg_acc_drop, + } + + + + def _train_target_model(self, epochs=200): """ Trains target GCN model according to protocol in Wu et al. (2023), Section 6.1 for graph node classification. @@ -44,13 +107,12 @@ def _train_target_model(self): features = self.dataset.features.to(device) labels = self.dataset.labels.to(device) train_mask = self.dataset.train_mask.to(device) - # Use test_mask for validation monitoring if val_mask is not available val_mask = getattr(self.dataset, "val_mask", None) if val_mask is None: val_mask = self.dataset.test_mask val_mask = val_mask.to(device) - for epoch in range(200): + for epoch in range(epochs): model.train() logits = model(self.dataset.graph.to(device), features) log_probs = F.log_softmax(logits, dim=1) @@ -72,7 +134,6 @@ def _train_target_model(self): return model def _load_model(self, model_path): - # Load model weights if path is given model = GCN( in_feats=self.dataset.feature_number, hidden_feats=16, @@ -81,4 +142,978 @@ def _load_model(self, model_path): model.load_state_dict(torch.load(model_path)) return model - # ... _train_defense_model(), _train_surrogate_model() as needed ... + + def _generate_fingerprints(self, model, mode='transductive', knowledge='full', k=5, **kwargs): + """ + Wrapper for fingerprint generation based on mode and knowledge level. + Returns: + List of fingerprints + """ + if mode == 'transductive': + generator = TransductiveFingerprintGenerator( + model=model, + dataset=self.dataset, + candidate_fraction=kwargs.get('candidate_fraction', 1.0), + random_seed=kwargs.get('random_seed', None), + device=self.device, + randomize=kwargs.get('randomize', True), + ) + fingerprints = generator.generate_fingerprints(k=k, method=knowledge) + + unified_fingerprints = [(self.dataset.graph, node_id, label) for (node_id, label) in fingerprints] + + elif mode == 'inductive': + generator = InductiveFingerprintGenerator( + model=model, + shadow_graph=self.dataset.graph, + knowledge=knowledge, + candidate_fraction=kwargs.get('candidate_fraction', 0.3), + num_fingerprints=k, + randomize=kwargs.get('randomize', True), + random_seed=kwargs.get('random_seed', None), + device=self.device, + perturb_fingerprints=kwargs.get('perturb_fingerprints', False), + perturb_budget=kwargs.get('perturb_budget', 5), + ) + fingerprints = generator.generate_fingerprints(method=knowledge) + unified_fingerprints = fingerprints + + else: + raise ValueError("Unknown fingerprinting mode. Use 'transductive' or 'inductive'.") + + return unified_fingerprints + + def _evaluate_fingerprints(self, model, fingerprints): + """ + Checks if fingerprinted nodes have changed labels under the given model. + + Args: + model: The model to evaluate. + fingerprints: List of (graph, node_id, label) tuples. + + Returns: + results: { + 'flipped': List[Tuple[node_id, old_label, new_label]], + 'flip_rate': float + } + """ + model.eval() + flipped = [] + + with torch.no_grad(): + for graph, node_id, expected_label in fingerprints: + x = graph.ndata['feat'] if hasattr(graph, 'ndata') else graph.x + logits = model(graph.to(self.device), x.to(self.device)) + pred = logits[node_id].argmax().item() + if pred != expected_label: + flipped.append((node_id, expected_label, pred)) + + return { + 'flipped': flipped, + 'flip_rate': len(flipped) / len(fingerprints) if fingerprints else 0.0 + } + + + def _run_attack(self, model, attack_type='mettack', knowledge='full', **kwargs): + """ + Run the specified attack on the model. + Returns: + poisoned_model: torch.nn.Module + metadata: dict with info about the attack + """ + if attack_type == 'bitflip': + attacker = BitFlipAttack(model=model, attack_type=kwargs.get('bitflip_type', 'random'), bit=kwargs.get('bit', 0)) + info = attacker.apply() + return model, {'type': 'bitflip', 'info': info} + + elif attack_type == 'random': + perturbed_graph = self._random_edge_addition_poisoning( + perturb_frac=kwargs.get('perturb_frac', 0.01), + random_seed=kwargs.get('random_seed', None), + ) + poisoned_model = self._retrain_poisoned_model( + poisoned_graph=perturbed_graph, + epochs=kwargs.get('epochs', 200), + ) + return poisoned_model, {'type': 'random_poison', 'graph': perturbed_graph} + + elif attack_type == 'mettack': + helper = MettackHelper( + graph=self.dataset.graph, + features=self.dataset.features, + labels=self.dataset.labels, + train_mask=self.dataset.train_mask, + val_mask=getattr(self.dataset, 'val_mask', None), + test_mask=self.dataset.test_mask, + n_perturbations=kwargs.get('n_perturbations', 5), + device=self.device, + max_perturbations=kwargs.get('max_perturbations', 50), + surrogate_epochs=kwargs.get('surrogate_epochs', 30), + candidate_sample_size=kwargs.get('candidate_sample_size', 20), + ) + poisoned_graph, attack_metrics = helper.run() + poisoned_model = self._retrain_poisoned_model( + poisoned_graph=poisoned_graph, + epochs=kwargs.get('epochs', 200), + ) + return poisoned_model, {'type': 'mettack', 'metrics': attack_metrics, 'graph': poisoned_graph} + + else: + raise ValueError(f"Unsupported attack_type: {attack_type}") + + + def _random_edge_addition_poisoning(dataset, perturb_frac, random_seed=None): + """ + Returns a new DGLGraph with random edges added. + + Args: + dataset: Dataset object (with .graph as DGLGraph) + perturb_frac: Fraction of edges to add (e.g., 0.01 = 1%) + random_seed: Optional integer for reproducibility + + Returns: + poisoned_graph: DGLGraph (deepcopy of original with new edges) + """ + + if random_seed is not None: + random.seed(random_seed) + torch.manual_seed(random_seed) + + orig_graph = dataset.graph + poisoned_graph = copy.deepcopy(orig_graph) + num_nodes = poisoned_graph.num_nodes() + num_edges_to_add = int(perturb_frac * orig_graph.num_edges()) + + existing_edges = set(zip( + orig_graph.edges()[0].tolist(), + orig_graph.edges()[1].tolist() + )) + + candidate_pairs = [ + (i, j) + for i in range(num_nodes) + for j in range(num_nodes) + if i != j and (i, j) not in existing_edges + ] + + if len(candidate_pairs) < num_edges_to_add: + raise ValueError("Perturbation budget too large: not enough candidate edges.") + + new_edges = random.sample(candidate_pairs, num_edges_to_add) + src, dst = zip(*new_edges) + poisoned_graph.add_edges(src, dst) + + return poisoned_graph + + def _retrain_poisoned_model(self, poisoned_graph, epochs=200): + """ + Retrain target GCN using the poisoned graph structure. + + Args: + dataset: Original Dataset object (provides features, labels, masks) + poisoned_graph: DGLGraph (with new random edges added) + defense_class: The defense class to use for model training (e.g., QueryBasedVerificationDefense) + device: 'cpu' or 'cuda' + + Returns: + model: Trained GCN model + """ + dataset_poisoned = copy.deepcopy(self.dataset) + dataset_poisoned.graph = poisoned_graph + + defense = QueryBasedVerificationDefense(dataset=dataset_poisoned, attack_node_fraction=0.1) + model = defense._train_target_model(epochs=epochs) + return model + + + def _evaluate_accuracy(self, model, dataset): + """ + Evaluates test accuracy of the given model on the dataset. + + Args: + model: Trained GCN model + dataset: Dataset object (provides features, labels, test_mask, graph) + device: 'cpu' or 'cuda' + + Returns: + accuracy: float (test accuracy, 0-1) + """ + model.eval() + features = dataset.features.to(device) + labels = dataset.labels.to(device) + test_mask = dataset.test_mask + + with torch.no_grad(): + logits = model(dataset.graph.to(device), features) + pred = logits.argmax(dim=1) + correct = (pred[test_mask] == labels[test_mask]).float() + accuracy = correct.sum().item() / test_mask.sum().item() + return accuracy + + def run_full_pipeline(self, attack_type='random', mode='transductive', knowledge='full', k=5, trials=1, **kwargs): + """ + Runs the full fingerprinting + attack + evaluation pipeline. + + Parameters: + attack_type: 'random', 'bitflip', or 'mettack' + mode: 'transductive' or 'inductive' + knowledge: 'full' or 'limited' + k: number of fingerprints + trials: number of repeated trials + kwargs: extra params for attack or fingerprinting + + Prints per-trial results and summary statistics. + """ + flip_rates = [] + acc_drops = [] + + for trial in range(trials): + print(f"\n=== Trial {trial+1}/{trials} ===") + + model_clean = self._train_target_model() + acc_clean = self._evaluate_accuracy(model_clean, self.dataset) + print(f"Clean model accuracy: {acc_clean:.4f}") + + fingerprints = self._generate_fingerprints(model_clean, mode=mode, knowledge=knowledge, k=k, **kwargs) + + model_poisoned, attack_meta = self._run_attack(model_clean, attack_type=attack_type, knowledge=knowledge, **kwargs) + acc_poisoned = self._evaluate_accuracy(model_poisoned, self.dataset) + print(f"Poisoned model accuracy: {acc_poisoned:.4f}") + + eval_result = self._evaluate_fingerprints(model_poisoned, fingerprints) + flip_rate = eval_result['flip_rate'] + print(f"Fingerprint flip rate: {flip_rate:.4f}") + for (nid, old, new) in eval_result['flipped']: + print(f" Node {nid}: {old} → {new}") + + flip_rates.append(flip_rate) + acc_drops.append(acc_clean - acc_poisoned) + + print("\n=== Summary ===") + print(f"Avg Accuracy Drop: {np.mean(acc_drops):.4f}") + print(f"Avg Fingerprint Flip Rate: {np.mean(flip_rates):.4f}") + + + + +class TransductiveFingerprintGenerator: + def __init__(self, model, dataset, candidate_fraction=1.0, random_seed=None, device='cpu', randomize=True): + self.model = model.to(device) + self.dataset = dataset + self.candidate_fraction = candidate_fraction + self.random_seed = random_seed + self.device = device + self.randomize = randomize + + def get_candidate_nodes(self): + """ + Step 1: Randomly sample a subset of nodes as candidates (for robustness). + Step 2: Return that set for scoring. + """ + all_nodes = torch.arange(self.dataset.graph.num_nodes()) + num_candidates = max(1, int(len(all_nodes) * self.candidate_fraction)) + + if self.randomize and self.candidate_fraction < 1.0: + generator = torch.Generator(device=self.device) + if self.random_seed is not None: + generator.manual_seed(self.random_seed) + idx = torch.randperm(len(all_nodes), generator=generator)[:num_candidates] + candidates = all_nodes[idx] + print(f"[DEBUG] Trial {self.random_seed}: Sampled candidates = {candidates.tolist()[:5]}") + else: + candidates = all_nodes + + return candidates + + + + def compute_fingerprint_scores_full(self, candidate_nodes): + self.model.eval() + scores = [] + logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + + for node in candidate_nodes: + self.model.zero_grad() + logit = logits[node] + label = logit.argmax().item() + loss = F.cross_entropy(logit.unsqueeze(0), torch.tensor([label], device=self.device)) + loss.backward(retain_graph=True) + grad_norm = sum((p.grad ** 2).sum().item() for p in self.model.parameters() if p.grad is not None) + scores.append(grad_norm) + + scores_tensor = torch.tensor(scores, device=self.device) + print(f"[FULL] Fingerprint scores: mean={scores_tensor.mean():.4f}, std={scores_tensor.std():.4f}, max={scores_tensor.max():.4f}, min={scores_tensor.min():.4f}") + return scores_tensor + + + def compute_fingerprint_scores_limited(self, candidate_nodes): + self.model.eval() + with torch.no_grad(): + logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + probs = F.softmax(logits, dim=1) + labels = probs.argmax(dim=1) + scores = 1.0 - probs[candidate_nodes, labels[candidate_nodes]] + + print(f"[LIMITED] Fingerprint scores: mean={scores.mean():.4f}, std={scores.std():.4f}, max={scores.max():.4f}, min={scores.min():.4f}") + return scores + + + def select_top_fingerprints(self, scores, candidate_nodes, k, method='full'): + """ + Selects top-k fingerprint nodes after filtering out extreme score outliers. + """ + q = 0.99 if method == 'full' else 1.0 + threshold = torch.quantile(scores, q) + mask = scores <= threshold + + filtered_scores = scores[mask] + filtered_candidates = candidate_nodes[mask] + + if filtered_scores.size(0) < k: + print(f"[WARN] Only {filtered_scores.size(0)} candidates left after filtering, reducing k to fit.") + k = filtered_scores.size(0) + + topk = torch.topk(filtered_scores, k) + selected_nodes = filtered_candidates[topk.indices] + selected_scores = topk.values + + return selected_nodes, selected_scores + + + def generate_fingerprints(self, k=5, method='full'): + candidate_nodes = self.get_candidate_nodes().to(self.device) + + with torch.no_grad(): + logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + labels = logits.argmax(dim=1) + + if method == 'full': + scores = self.compute_fingerprint_scores_full(candidate_nodes) + elif method == 'limited': + scores = self.compute_fingerprint_scores_limited(candidate_nodes) + else: + raise ValueError("method must be 'full' or 'limited'") + + class_to_candidates = {} + for i, node in enumerate(candidate_nodes): + cls = int(labels[node]) + if cls not in class_to_candidates: + class_to_candidates[cls] = [] + class_to_candidates[cls].append((node.item(), scores[i].item())) + + rng = random.Random(self.random_seed) + + class_list = list(class_to_candidates.keys()) + rng.shuffle(class_list) + + fingerprints = [] + for cls in class_list: + + class_nodes = sorted(class_to_candidates[cls], key=lambda x: x[1], reverse=True) + top_node = class_nodes[0][0] + fingerprints.append((top_node, cls)) + if len(fingerprints) >= k: + break + + if len(fingerprints) < k: + + fingerprint_nodes, _ = self.select_top_fingerprints(scores, candidate_nodes, k, method=method) + fingerprints = [(int(n), int(labels[n])) for n in fingerprint_nodes] + + + labels_only = [label for (_, label) in fingerprints] + nodes_only = [node for (node, _) in fingerprints] + + print(f"[{method.upper()}] Fingerprint label distribution: {Counter(labels_only)}") + print(f"[{method.upper()}] Fingerprint node IDs: {nodes_only}") + + return fingerprints + + + +class InductiveFingerprintGenerator: + """ + Implements inductive fingerprint generation for both Full ('full') and Limited ('limited') + knowledge settings, as described in Wu et al. (2023) Sections 4.2, 4.2.2, and 5.2. + Supports randomized candidate selection for robustness against adaptive attackers. + """ + + def __init__(self, model, shadow_graph, knowledge='limited', + candidate_fraction=0.3, num_fingerprints=5, + randomize=True, random_seed=None, device='cpu', + perturb_fingerprints=False, perturb_budget=5): + """ + Args: + model: GNN model to be fingerprinted. + shadow_graph: PyG/DGL graph object for querying (shadow/inference graph). + knowledge: 'full' for gradient-based (requires model weights), 'limited' for output-based. + candidate_fraction: Fraction of nodes considered as candidates for fingerprinting. + num_fingerprints: Number of fingerprint nodes to select. + randomize: Whether to randomly sample candidate nodes (default True). + random_seed: Optional seed for reproducibility. + device: Torch device string (e.g., 'cpu' or 'cuda'). + perturb_fingerprints: Whether to greedily perturb fingerprint nodes' features/edges to increase sensitivity. + perturb_budget: Max number of perturbation steps per fingerprint node (default 5). + + """ + self.model = model.to(device) + self.shadow_graph = shadow_graph + self.knowledge = knowledge + self.candidate_fraction = candidate_fraction + self.num_fingerprints = num_fingerprints + self.randomize = randomize + self.random_seed = random_seed + self.device = device + self.perturb_fingerprints = perturb_fingerprints + self.perturb_budget = perturb_budget + + if self.random_seed is not None: + torch.manual_seed(self.random_seed) + random.seed(self.random_seed) + + def get_candidate_nodes(self): + """ + Step 1: Randomly sample a subset of nodes as candidates (for robustness). + Step 2: Score and select top-k from this set. + """ + all_nodes = torch.arange(self.shadow_graph.num_nodes()) + num_candidates = max(1, int(len(all_nodes) * self.candidate_fraction)) + + if self.randomize and self.candidate_fraction < 1.0: + generator = torch.Generator(device=self.device) + if self.random_seed is not None: + generator.manual_seed(self.random_seed) + idx = torch.randperm(len(all_nodes), generator=generator)[:num_candidates] + candidates = all_nodes[idx] + print(f"[DEBUG] Trial {self.random_seed}: Sampled candidates = {candidates.tolist()[:5]}") + else: + candidates = all_nodes + + return candidates + + + def compute_fingerprint_score(self, node_idx): + """ + Computes the fingerprint score for a given node according to knowledge mode. + Returns: float: Sensitivity score for the node. + """ + features = self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x + features = features.to(self.device) + self.model.eval() + + if self.knowledge == 'limited': + with torch.no_grad(): + logits = self.model(self.shadow_graph.to(self.device), features) + probs = torch.softmax(logits[node_idx], dim=0) + pred_class = probs.argmax().item() + score = 1 - probs[pred_class].item() + return score + + elif self.knowledge == 'full': + # Full knowledge: compute gradient norm wrt input features of the node + features.requires_grad_(True) + logits = self.model(self.shadow_graph.to(self.device), features) + pred = logits[node_idx] + label = pred.argmax().item() + + self.model.zero_grad() + loss = torch.nn.functional.nll_loss( + torch.log_softmax(pred.unsqueeze(0), dim=1), + torch.tensor([label], device=self.device) + ) + loss.backward(retain_graph=True) + # For simplicity, we use grad wrt features (could be extended to model params) + grad = features.grad[node_idx] + grad_norm_sq = (grad ** 2).sum().item() + features.requires_grad_(False) + features.grad = None # Clean up + return grad_norm_sq + + else: + raise ValueError("knowledge must be 'limited' or 'full'") + + + def generate_fingerprint_nodes(self): + """ + Step 3: Identifies and returns the top-k (num_fingerprints) nodes with the highest + fingerprint scores from the candidate set. (Section 4.2.2) + + Returns: + List[int]: Indices of selected fingerprint nodes. + """ + candidates = self.get_candidate_nodes() + scores = [] + for idx in candidates: + score = self.compute_fingerprint_score(idx) + scores.append((score, int(idx))) + # Sort candidates by score, descending + scores.sort(reverse=True) + selected = [idx for (_, idx) in scores[:self.num_fingerprints]] + return selected + + def save_fingerprint_tuples(self, node_indices): + """ + Step 4: Creates the final fingerprint set, storing the expected label for each + selected fingerprint node. Tuples (graph, node_id, label) will be used + during online verification. + + Args: + node_indices: List[int] of selected fingerprint node indices. + + Returns: + List[Tuple[graph, node_id, label]]: The fingerprints for online checking. + """ + self.model.eval() + with torch.no_grad(): + features = self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x + logits = self.model(self.shadow_graph.to(self.device), features.to(self.device)) + labels = logits.argmax(dim=1).cpu().numpy() + fingerprints = [(self.shadow_graph, int(idx), int(labels[idx])) for idx in node_indices] + return fingerprints + + def generate_fingerprints(self, method='full'): + """ + Generate inductive fingerprints for model watermarking. + + Parameters: + method (str): 'full' for gradient-based or 'limited' for output-based + + Returns: + List of fingerprints + """ + if method == 'full': + return self._generate_full() + elif method == 'limited': + return self._generate_limited() + else: + raise ValueError(f"Invalid fingerprinting method: '{method}'") + + def _generate_full(self): + """ + Implements full knowledge fingerprint generation (gradient-based). + Based on Section 4.2.1 and 5.2 of Wu et al. (2023). + """ + self.knowledge = 'full' + print("[Fingerprint] Generating FULL knowledge fingerprints...") + fingerprint_nodes = self.generate_fingerprint_nodes() + + if self.perturb_fingerprints: + print("[Fingerprint] Applying greedy feature perturbation (FULL)...") + self.greedy_perturb_fingerprints(fingerprint_nodes) + + return self.save_fingerprint_tuples(fingerprint_nodes) + + def _generate_limited(self): + """ + Implements limited knowledge fingerprint generation (output-based). + Based on Section 4.2.2 and 5.2 of Wu et al. (2023). + """ + self.knowledge = 'limited' + print("[Fingerprint] Generating LIMITED knowledge fingerprints...") + fingerprint_nodes = self.generate_fingerprint_nodes() + + if self.perturb_fingerprints: + print("[Fingerprint] Applying greedy feature perturbation (LIMITED)...") + self.greedy_perturb_fingerprints(fingerprint_nodes) + + return self.save_fingerprint_tuples(fingerprint_nodes) + + + def greedy_perturb_fingerprints(self, node_indices): + """ + Greedily perturbs each fingerprint node's features (not edges) to increase its + fingerprint score, without changing the predicted label. + + - For each node, for each feature dimension: + - Add or subtract a small epsilon. + - Accept change if predicted label stays the same and fingerprint score increases. + - Stop after perturb_budget attempts or no improvement. + + Returns: + List[int]: Indices of perturbed fingerprint nodes (features in shadow_graph are updated in-place). + """ + epsilon = 0.01 # Perturbation magnitude; you may want to tune this + features = self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x + features = features.clone().detach().to(self.device) + self.shadow_graph = self.shadow_graph.to(self.device) + + for idx in node_indices: + num_tries = 0 + improved = True + while num_tries < self.perturb_budget and improved: + improved = False + current_score = self.compute_fingerprint_score(idx) + # Get current prediction + self.model.eval() + with torch.no_grad(): + logits = self.model(self.shadow_graph, features) + pred_label = logits[idx].argmax().item() + original_features = features[idx].clone() + for dim in range(features.shape[1]): + for direction in [+1, -1]: + features[idx][dim] += direction * epsilon + # Get new prediction and score + self.model.eval() + with torch.no_grad(): + logits_new = self.model(self.shadow_graph, features) + new_pred_label = logits_new[idx].argmax().item() + new_score = self.compute_fingerprint_score(idx) + # Accept if label unchanged and score increased + if new_pred_label == pred_label and new_score > current_score: + current_score = new_score + improved = True + num_tries += 1 + else: + features[idx][dim] = original_features[dim] # Revert + if num_tries >= self.perturb_budget: + break + if num_tries >= self.perturb_budget: + break + # Optionally, update self.shadow_graph features (depends on your data structure) + if hasattr(self.shadow_graph, 'ndata'): + self.shadow_graph.ndata['feat'] = features + else: + self.shadow_graph.x = features + return node_indices + + +class BitFlipAttack: + def __init__(self, model, attack_type='random', bit=0): + self.model = model + self.attack_type = attack_type + self.bit = bit + + def _get_target_params(self): + params = [p for p in self.model.parameters() if p.requires_grad and p.numel() > 0] + if self.attack_type == 'random': + return params + elif self.attack_type == 'BFA-F': + return [params[0]] + elif self.attack_type == 'BFA-L': + return [params[-1]] + else: + raise ValueError(f"Unknown attack_type {self.attack_type}") + + def _true_bit_flip(self, tensor, index=None, bit=0): + a = tensor.detach().cpu().numpy().copy() + flat = a.ravel() + if index is None: + index = np.random.randint(0, flat.size) + old_val = flat[index] + int_view = np.frombuffer(flat[index].tobytes(), dtype=np.uint32)[0] + int_view ^= (1 << bit) + new_val = np.frombuffer(np.uint32(int_view).tobytes(), dtype=np.float32)[0] + flat[index] = new_val + a = flat.reshape(a.shape) + tensor.data = torch.from_numpy(a).to(tensor.device) + return old_val, new_val, index + + def apply(self): + params = self._get_target_params() + with torch.no_grad(): + layer_idx = random.randrange(len(params)) + param = params[layer_idx] + idx = random.randrange(param.numel()) + old_val, new_val, actual_idx = self._true_bit_flip(param, index=idx, bit=self.bit) + return { + 'layer': layer_idx, + 'param_idx': actual_idx, + 'old_val': old_val, + 'new_val': new_val, + 'bit': self.bit, + 'attack_type': self.attack_type + } + + +class MettackHelper: + def __init__(self, graph, features, labels, train_mask, val_mask, test_mask, + n_perturbations=5, device='cpu', max_perturbations=50, + surrogate_epochs=30, candidate_sample_size=20): + # Add self-loops to the original graph to prevent zero in-degree issues + self.graph = dgl.add_self_loop(graph).to(device) + self.features = features.to(device) + self.labels = labels.to(device) + self.train_mask = train_mask.to(device) + self.surrogate_epochs = surrogate_epochs + self.candidate_sample_size = candidate_sample_size + # Handle case where val_mask might be None + if val_mask is not None: + self.val_mask = val_mask.to(device) + else: + # Create a validation mask from a subset of training data + self.val_mask = self._create_val_mask_from_train(train_mask).to(device) + + self.test_mask = test_mask.to(device) + + # Cap the number of perturbations to a reasonable limit + self.n_perturbations = min(n_perturbations, max_perturbations) + self.device = device + + # Surrogate GCN, matches the victim model structure from the paper (Sec. 6.1) + in_feats = features.shape[1] + n_classes = int(labels.max().item()) + 1 + self.surrogate = GCN(in_feats, n_classes).to(device) + + # For reproducibility (optional) + torch.manual_seed(42) + np.random.seed(42) + + # Track current edge modifications if desired + self.modified_edges = set() + + # Store original adjacency for candidate generation (without self-loops for edge candidates) + original_graph_no_self_loop = dgl.remove_self_loop(graph) + self.original_edges = set(zip(original_graph_no_self_loop.edges()[0].cpu().numpy(), + original_graph_no_self_loop.edges()[1].cpu().numpy())) + + # Pre-compute candidate edges for efficiency + self.candidate_edges = self._get_candidate_edges() + + def _create_val_mask_from_train(self, train_mask): + """ + Create a validation mask by taking a subset of training nodes. + This is needed when the dataset doesn't provide a validation mask. + """ + train_indices = torch.where(train_mask)[0] + n_val = min(500, len(train_indices) // 4) # Use 25% of training data or 500, whichever is smaller + + # Randomly select validation indices from training indices + perm = torch.randperm(len(train_indices)) + val_indices = train_indices[perm[:n_val]] + + # Create validation mask + val_mask = torch.zeros_like(train_mask, dtype=torch.bool) + val_mask[val_indices] = True + + # Update training mask to exclude validation nodes + self.train_mask = train_mask.clone() + self.train_mask[val_indices] = False + + return val_mask + + def run(self): + """ + Main entrypoint to run the Mettack algorithm. + Returns: + poisoned_graph (DGLGraph): The perturbed graph with edges changed. + metrics (dict): Metrics for before/after attack, for evaluation. + """ + print("Starting Mettack attack...") + + # 1. Train surrogate GCN on the clean graph + print("Training surrogate model...") + self._train_surrogate() + + # 2. Run bi-level optimization to find edge perturbations + print("Applying structure attack...") + poisoned_graph = self._apply_structure_attack() + + # 3. (Optional) Retrain model on poisoned_graph and collect metrics + print("Evaluating attack results...") + metrics = self._evaluate(poisoned_graph) + + return poisoned_graph, metrics + + def _train_surrogate(self): + """ + Trains a surrogate GCN on the clean graph. + (Matches Wu et al., Section 6.1) + """ + optimizer = optim.Adam(self.surrogate.parameters(), lr=0.01, weight_decay=5e-4) + self.surrogate.train() + + # Standard GCN training loop + for epoch in range(self.surrogate_epochs): + optimizer.zero_grad() + logits = self.surrogate(self.graph, self.features) + loss = F.cross_entropy(logits[self.train_mask], self.labels[self.train_mask]) + loss.backward() + optimizer.step() + + if epoch % 50 == 0: + self.surrogate.eval() + with torch.no_grad(): + val_logits = self.surrogate(self.graph, self.features) + val_acc = self._compute_accuracy(val_logits[self.val_mask], + self.labels[self.val_mask]) + print(f"Surrogate epoch {epoch}: Val Acc = {val_acc:.4f}") + self.surrogate.train() + + def _apply_structure_attack(self): + """ + Runs the Mettack structure perturbation loop (bi-level optimization). + - At each step, modify the adjacency matrix (add/remove an edge). + - Select the perturbation that maximizes surrogate model loss on the validation nodes. + - Repeat up to n_perturbations times. + Returns a new DGLGraph with edges modified. + (See Appendix A.2 in Wu et al.) + """ + current_graph = copy.deepcopy(self.graph) + perturbed_edges = set() + + for step in range(self.n_perturbations): + print(f"Perturbation step {step + 1}/{self.n_perturbations}") + + best_edge = None + best_loss = -float('inf') + best_action = None # 'add' or 'remove' + + # Sample candidate edges for efficiency (reduced for speed) + candidate_sample = np.random.choice(len(self.candidate_edges), + min(self.candidate_sample_size, len(self.candidate_edges)), + replace=False) + + + for idx in tqdm(candidate_sample, desc="Evaluating candidates"): + edge = self.candidate_edges[idx] + + # Skip if already perturbed + if edge in perturbed_edges or (edge[1], edge[0]) in perturbed_edges: + continue + + # Try both add and remove operations + for action in ['add', 'remove']: + if action == 'add' and edge in self.original_edges: + continue + if action == 'remove' and edge not in self.original_edges: + continue + + # Create temporary graph with this perturbation + temp_graph = self._apply_single_perturbation(current_graph, edge, action) + + # Evaluate attack loss on this perturbed graph + attack_loss = self._compute_attack_loss(temp_graph) + + if attack_loss > best_loss: + best_loss = attack_loss + best_edge = edge + best_action = action + + # Apply the best perturbation + if best_edge is not None: + current_graph = self._apply_single_perturbation(current_graph, best_edge, best_action) + perturbed_edges.add(best_edge) + self.modified_edges.add((best_edge, best_action)) + print(f"Applied {best_action} edge {best_edge} with loss increase: {best_loss:.4f}") + else: + print("No beneficial perturbation found, stopping early.") + break + + return current_graph + + def _get_candidate_edges(self): + """ + Generate candidate edges for perturbation. + Includes both existing edges (for removal) and non-existing edges (for addition). + """ + n_nodes = self.graph.num_nodes() + + # Get all possible edges (excluding self-loops for undirected graphs) + all_possible_edges = [] + for i in range(n_nodes): + for j in range(i + 1, n_nodes): # Assume undirected graph + all_possible_edges.append((i, j)) + + # Convert to set for faster lookup + return all_possible_edges[:min(10000, len(all_possible_edges))] # Limit for efficiency + + def _apply_single_perturbation(self, graph, edge, action): + """ + Apply a single edge perturbation (add or remove) to the graph. + """ + temp_graph = copy.deepcopy(graph) + + if action == 'add': + # Add edge in both directions for undirected graph + temp_graph.add_edges([edge[0], edge[1]], [edge[1], edge[0]]) + elif action == 'remove': + # Find and remove the edge + src, dst = temp_graph.edges() + edge_ids = [] + + for i, (s, d) in enumerate(zip(src.cpu().numpy(), dst.cpu().numpy())): + if (s == edge[0] and d == edge[1]) or (s == edge[1] and d == edge[0]): + edge_ids.append(i) + + if edge_ids: + temp_graph.remove_edges(edge_ids) + + # Add self-loops to handle zero in-degree nodes + temp_graph = dgl.add_self_loop(temp_graph) + + return temp_graph + + def _compute_attack_loss(self, perturbed_graph): + """ + Compute the attack loss on a perturbed graph. + This measures how much the surrogate model's performance degrades. + Uses proper bi-level optimization as in the original Mettack paper. + """ + # Create a temporary surrogate model copy + temp_surrogate = copy.deepcopy(self.surrogate) + temp_surrogate.train() + + # Fine-tune on perturbed graph for a few steps (bi-level optimization) + optimizer = optim.Adam(temp_surrogate.parameters(), lr=0.01) + + for _ in range(5): # Reduced from 10 for efficiency but still doing proper retraining + optimizer.zero_grad() + logits = temp_surrogate(perturbed_graph, self.features) + loss = F.cross_entropy(logits[self.train_mask], self.labels[self.train_mask]) + loss.backward() + optimizer.step() + + # Evaluate on validation set - higher loss means better attack + temp_surrogate.eval() + with torch.no_grad(): + val_logits = temp_surrogate(perturbed_graph, self.features) + val_loss = F.cross_entropy(val_logits[self.val_mask], self.labels[self.val_mask]) + + return val_loss.item() + + def _evaluate(self, poisoned_graph): + """ + Evaluates GCN accuracy before/after poisoning, etc. + """ + metrics = {} + + # Evaluate surrogate on clean graph + self.surrogate.eval() + with torch.no_grad(): + clean_logits = self.surrogate(self.graph, self.features) + clean_acc = self._compute_accuracy(clean_logits[self.test_mask], + self.labels[self.test_mask]) + metrics['clean_test_acc'] = clean_acc + + # Train new model on poisoned graph + poisoned_model = GCN(self.features.shape[1], + int(self.labels.max().item()) + 1).to(self.device) + optimizer = optim.Adam(poisoned_model.parameters(), lr=0.01, weight_decay=5e-4) + + poisoned_model.train() + for epoch in range(200): + optimizer.zero_grad() + logits = poisoned_model(poisoned_graph, self.features) + loss = F.cross_entropy(logits[self.train_mask], self.labels[self.train_mask]) + loss.backward() + optimizer.step() + + # Evaluate poisoned model + poisoned_model.eval() + with torch.no_grad(): + poisoned_logits = poisoned_model(poisoned_graph, self.features) + poisoned_acc = self._compute_accuracy(poisoned_logits[self.test_mask], + self.labels[self.test_mask]) + metrics['poisoned_test_acc'] = poisoned_acc + + metrics['accuracy_drop'] = clean_acc - poisoned_acc + metrics['num_perturbations'] = len(self.modified_edges) + + + return metrics + + def _compute_accuracy(self, logits, labels): + """Helper function to compute accuracy.""" + _, predicted = torch.max(logits, 1) + correct = (predicted == labels).sum().item() + return correct / len(labels) From 7d23e765371c572df150c9d9a453e05fc6640374 Mon Sep 17 00:00:00 2001 From: Cameron Bender Date: Tue, 5 Aug 2025 18:50:42 -0400 Subject: [PATCH 3/8] Implementation results --- README.md | 384 ++++++----------------- models/defense/QueryBasedVerification.py | 381 ++++++++++++++++------ 2 files changed, 370 insertions(+), 395 deletions(-) diff --git a/README.md b/README.md index d6bad67..a0accde 100644 --- a/README.md +++ b/README.md @@ -1,320 +1,122 @@ -# PyGIP -PyGIP is a Python library designed for experimenting with graph-based model extraction attacks and defenses. It provides -a modular framework to implement and test attack and defense strategies on graph datasets. +**QueryBasedVerification** is a defense module implemented under the **PyGIP** framework that replicates the core defense proposed in the paper _"Securing Graph Neural Networks in MLaaS: A Comprehensive Realization of Query-based Integrity Verification"_ (Wu et al., 2023). -## Installation -To get started with PyGIP, set up your environment by installing the required dependencies: +## Experimental Parameters** -```bash -pip install -r reqs.txt -``` +#### Common Parameters +| Parameter | Value Used | Paper Value | Notes | +| ---------------------- | ---------------------------------------- | ------------------------------------------ | +| `attack_node_fraction` | `0.1` | `0.3` | Lowered to reduce impact and runtime | +| `k` (num fingerprints) | `5` | `10` | Halved to reduce query overhead while maintaining effectiveness | +| `attack_trial_map` | `bitflip: 20`, `random: 5`, `mettack: 5` | Paper uses 400 trials for BFA | Reduced for faster experimentation | +| `bit_position` | `30` | Unspecified, but paper flips exponent bits | Matches intent of BFA attack | -Ensure you have Python installed (version 3.8 or higher recommended) along with the necessary libraries listed -in `reqs.txt`. +#### Bit Flip Attack (BFA) Specific Parameters -Specifically, using following command to install `dgl 2.2.1` and ensure your `pytorch==2.3.0`. +| Parameter | Value Used | Paper Value | Notes | +| ------------- | ----------------------- | ----------- | ------------------------------ | +| `num_trials` | `20` | `400` | Downsampled to speed up runs | -```shell -pip install dgl==2.2.1 -f https://data.dgl.ai/wheels/torch-2.3/repo.html -``` +#### Random Poisoning Attack -## Quick Start +| Parameter | Value Used | Paper Value | Notes | +| ------------- | ---------- | ------------------------- | -------------------------- | +| `num_trials` | `5` | Not directly specified | Chosen for time-efficiency | -Here’s a simple example to launch a model extraction attack using PyGIP: +#### Mettack Poisoning Attack -```python -from datasets import Cora -from models.attack import ModelExtractionAttack0 +| Parameter | Value Used | Paper Value | Notes | +| ----------------------- | ---------- | ------------------------------ | ----------------------------------------------- | +| `poison_frac` | `0.005` | ~0.01 (for 100 perturbations) | Halved to ~50 perturbations for faster runtime | +| `epochs` | `30` | `200` | Reduced to speed up training | +| `surrogate_epochs` | `20` | `200` | Reduced for surrogate model efficiency | +| `candidate_sample_size` | `50` | `100` (default) | Smaller pool for runtime reasons | -# Load the Cora dataset -dataset = Cora() -# Initialize the attack with a sampling ratio of 0.25 -mea = ModelExtractionAttack0(dataset, 0.25) +## Results -# Execute the attack -mea.attack() -``` +**Cora Dataset** -This code loads the Cora dataset, initializes a basic model extraction attack (`ModelExtractionAttack0`), and runs the -attack with a specified sampling ratio. -Here’s an expanded and detailed version of the "Contribute to Code" section for your README.md, incorporating the -specifics of `BaseAttack` and `Dataset` you provided. This version is thorough, clear, and tailored for contributors: +### Transductive-F Detection Rate Comparison +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| -## Implementation +| BFA | 0.69 | 0.711 | +| BFA-F | 0.67 | 0.96 | +| BFA-L | 0.7 | 0.5 | +| random | 0.52 | 0.647 | +| mettack | 0.84 | 0.588 | -PyGIP is built to be modular and extensible, allowing contributors to implement their own attack and defense strategies. -Below, we detail how to extend the framework by implementing custom attack and defense classes, with a focus on how to -leverage the provided dataset structure. +### Transductive-L Detection Rate Comparison -### Implementing Attack +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| +| BFA | 0.63 | 0.982 | +| BFA-F | 0.77 | 0.81 | +| BFA-L | 0.74 | 1.0 | +| random | 0.72 | 0.353 | +| mettack | 0.88 | 0.598 | -To create a custom attack, you need to extend the abstract base class `BaseAttack`. Here’s the structure -of `BaseAttack`: +### Inductive-F Detection Rate Comparison -```python -class BaseAttack(ABC): - def __init__(self, dataset: Dataset, attack_node_fraction: float, model_path: str = None): - """Base class for all attack implementations.""" - self.dataset = dataset - self.graph = dataset.graph # Access the DGL-based graph directly - # Additional initialization can go here +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| +| BFA | 0.58 | 0.667 | +| BFA-F | 0.7 | 1.0 | +| BFA-L | 0.66 | 0.382 | +| random | 0.68 | 1.0 | +| mettack | 1.0 | 1.0 | - @abstractmethod - def attack(self): - raise NotImplementedError +### Inductive-L Detection Rate Comparison - def _train_target_model(self): - raise NotImplementedError +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| +| BFA | 0.73 | 0.688 | +| BFA-F | 0.62 | 0.989 | +| BFA-L | 0.63 | 0.348 | +| random | 0.44 | 1.0 | +| mettack | 0.68 | 1.0 | - def _train_attack_model(self): - raise NotImplementedError - def _load_model(self, model_path): - raise NotImplementedError -``` +**Citeseer Dataset** -To implement your own attack: +### Transductive-F Detection Rate Comparison -1. **Inherit from `BaseAttack`**: - Create a new class that inherits from `BaseAttack`. You’ll need to provide the following required parameters in the - constructor: +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| +| BFA | 0.63 | 0.586 | +| BFA-F | 0.70 | 0.430 | +| BFA-L | 0.56 | 0.529 | +| random | 0.60 | 0.412 | +| mettack | 0.68 | 0.353 | -- `dataset`: An instance of the `Dataset` class (see below for details). -- `attack_node_fraction`: A float between 0 and 1 representing the fraction of nodes to attack. -- `model_path` (optional): A string specifying the path to a pre-trained model (defaults to `None`). +### Transductive-L Detection Rate Comparison -You need to implement following methods: +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| +| BFA | 0.56 | 0.289 | +| BFA-F | 0.61 | 0.430 | +| BFA-L | 0.53 | 0.133 | +| random | 0.76 | 0.824 | +| mettack | 0.68 | 0.235 | -- `attack()`: Add main attack logic here. If multiple attack types are supported, define the attack type as an optional - argument to this function. - For each specific attack type, implement a corresponding helper function such as `_attack_type1()` - or `_attack_type2()`, - and call the appropriate helper inside `attack()` based on the given method name. -- `_load_model()`: Load victim model. -- `_train_target_model()`: Train victim model. -- `_train_attack_model()`: Train attack model. -- `_helper_func()`(optional): Add your helper functions based on your needs, but keep the methods private. +### Inductive-F Detection Rate Comparison + +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| +| BFA | 0.48 | 0.941 | +| BFA-F | 0.57 | 0.882 | +| BFA-L | 0.68 | 0.529 | +| random | 0.64 | 1.0 | +| mettack | 0.92 | 1.0 | -2. **Implement the `attack()` Method**: - Override the abstract `attack()` method with your attack logic, and return a dict of results. For example: - -```python -class MyCustomAttack(BaseAttack): - def __init__(self, dataset: Dataset, attack_node_fraction: float, model_path: str = None): - super().__init__(dataset, attack_node_fraction, model_path) - # Additional initialization if needed - - def attack(self): - # Example: Access the graph and perform an attack - print(f"Attacking {self.attack_node_fraction * 100}% of nodes") - num_nodes = self.graph.num_nodes() - print(f"Graph has {num_nodes} nodes") - # Add your attack logic here - return { - 'metric1': 'metric1 here', - 'metric2': 'metric2 here' - } - - def _load_model(self): - # add your logic here - pass - - def _train_target_model(self): - # add your logic here - pass - - def _train_attack_model(self): - # add your logic here - pass -``` - -### Implementing Defense - -To create a custom defense, you need to extend the abstract base class `BaseDefense`. Here’s the structure -of `BaseDefense`: - -```python -class BaseDefense(ABC): - def __init__(self, dataset: Dataset, attack_node_fraction: float): - """Base class for all defense implementations.""" - # add initialization here - - @abstractmethod - def defend(self): - raise NotImplementedError - - def _load_model(self): - raise NotImplementedError - - def _train_target_model(self): - raise NotImplementedError - - def _train_defense_model(self): - raise NotImplementedError - - def _train_surrogate_model(self): - raise NotImplementedError -``` - -To implement your own defense: - -1. **Inherit from `BaseDefense`**: - Create a new class that inherits from `BaseDefense`. You’ll need to provide the following required parameters in the - constructor: - -- `dataset`: An instance of the `Dataset` class (see below for details). -- `attack_node_fraction`: A float between 0 and 1 representing the fraction of nodes to attack. -- `model_path` (optional): A string specifying the path to a pre-trained model (defaults to `None`). - -You need to implement following methods: - -- `defense()`: Add main defense logic here. If multiple defense types are supported, define the defense type as an - optional argument to this function. - For each specific defense type, implement a corresponding helper function such as `_defense_type1()` - or `_defense_type2()`, - and call the appropriate helper inside `defense()` based on the given method name. -- `_load_model()`: Load victim model. -- `_train_target_model()`: Train victim model. -- `_train_defense_model()`: Train defense model. -- `_train_surrogate_model()`: Train attack model. -- `_helper_func()`(optional): Add your helper functions based on your needs, but keep the methods private. - - -2. **Implement the `defense()` Method**: - Override the abstract `defense()` method with your defense logic, and return a dict of results. For example: - -```python -class MyCustomDefense(BaseDefense): - def defend(self): - # Step 1: Train target model - target_model = self._train_target_model() - # Step 2: Attack target model - attack = MyCustomAttack(self.dataset, attack_node_fraction=0.3) - attack.attack(target_model) - # Step 3: Train defense model - defense_model = self._train_defense_model() - # Step 4: Test defense against attack - attack = MyCustomAttack(self.dataset, attack_node_fraction=0.3) - attack.attack(defense_model) - # Print performance metrics - - def _load_model(self): - # add your logic here - pass - - def _train_target_model(self): - # add your logic here - pass - - def _train_defense_model(self): - # add your logic here - pass - - def _train_surrogate_model(self): - # add your logic here - pass -``` - -### Understanding the Dataset Class - -The `Dataset` class standardizes the data format across PyGIP. Here’s its structure: - -```python -class Dataset(object): - def __init__(self, api_type='pyg', path='./downloads/'): - self.api_type = api_type # Set to 'pyg' for torch_geometric-based graphs - self.path = path # Directory for dataset storage - self.dataset_name = "" # Name of the dataset (e.g., "Cora") - - # Graph properties - self.node_number = 0 # Number of nodes - self.feature_number = 0 # Number of features per node - self.label_number = 0 # Number of label classes - - # Core data - self.graph = None # PyG graph object - self.features = None # Node features - self.labels = None # Node labels - - # Data splits - self.train_mask = None # Boolean mask for training nodes - self.val_mask = None # Boolean mask for validation nodes - self.test_mask = None # Boolean mask for test nodes -``` - -- **Importance**: We are currently using the default api_type='pyg' to load the data. It is important to note that when - api_type='pyg', `self.graph` should be an instance of `torch_geometric.data.Data`. In your implementation, make sure to - use our defined Dataset class to build your code. -- Additional attributes like `self.dataset.features` (node features), `self.dataset.labels` (node labels), - and `self.dataset.train_mask` (training split) are also available if your logic requires them. - -### Miscellaneous Tips - -- **Reference Implementation**: The `ModelExtractionAttack0` class is a fully implemented attack example. Study it for - inspiration or as a template. -- **Flexibility**: Add as many helper functions as needed within your class to keep your code clean and modular. -- **Backbone Models**: We provide several basic backbone models like `GCN, GraphSAGE`. You can use or add more - at `from models.nn import GraphSAGE`. - -By following these guidelines, you can seamlessly integrate your custom attack or defense strategies into PyGIP. Happy -coding! - -## Internal Code Submission Guideline - -For internal team members with write access to the repository: - -1. Always Use Feature/Fix Branches - -- Never commit directly to the main or develop branch. -- Create a new branch for each feature, bug fix. - -```shell -git checkout -b feat/your-feature-name -``` - -```shell -git checkout -b fix/your-fix-name -``` - -2. Keep Commits Clean & Meaningful - -- feat: add data loader for graph dataset -- fix: resolve crash on edge cases - -Use clear commit messages following the format: - -```shell -: -``` - -3. Test Before Pushing - -- Test your implementation in `example.py`, and compare the performance with the results in original paper. - -4. Push to Internal Branch - -- Always run `git pull origin pygip-release` before pushing your changes -- Submit a pull request targeting the `pygip-release` branch -- Write a brief summary describing the features you’ve added, how to run your method, and how to evaluate its - performance - -Push to the remote feature branch. - -```shell -git push origin feat/your-feature-name -``` - -## External Pull Request Guideline - -Refer to [guidline](.github/CONTRIBUTING.md) - -## License - -MIT License - -## Contact - -For questions or contributions, please contact blshen@fsu.edu. +### Inductive-L Detection Rate Comparison + +| Attack | Our Detection Rate | Paper Detection Rate | +|-----------|--------------------|----------------------| +| BFA | 0.59 | 0.901 | +| BFA-F | 0.53 | 0.852 | +| BFA-L | 0.67 | 0.569 | +| random | 0.72 | 1.0 | +| mettack | 0.92 | 1.0 | diff --git a/models/defense/QueryBasedVerification.py b/models/defense/QueryBasedVerification.py index 6468b6b..4b170e9 100644 --- a/models/defense/QueryBasedVerification.py +++ b/models/defense/QueryBasedVerification.py @@ -27,7 +27,9 @@ def __init__(self, dataset, attack_node_fraction, model_path=None): - def defend(self, num_trials=10, k=5, attack_type='mettack', knowledge='full', mode='transductive', verbose=True, **kwargs): + def defend(self, fingerprint_mode='inductive', knowledge='full', attack_type='bitflip', + k=5, num_trials=10, use_edge_perturbation=False, verbose=True, **kwargs): + """ Main defense routine. Generates fingerprints, runs attacks, and verifies integrity. Returns a dict with per-trial and average metrics. @@ -37,50 +39,69 @@ def defend(self, num_trials=10, k=5, attack_type='mettack', knowledge='full', mo if verbose: print(f"\n=== Trial {trial+1}/{num_trials} ===") - # Step 1: Train target model + model_clean = self._train_target_model() acc_clean = self._evaluate_accuracy(model_clean, self.dataset) - # Step 2: Fingerprint it - fingerprints = self._generate_fingerprints(model_clean, mode=mode, knowledge=knowledge, k=k, **kwargs) - # Step 3: Attack the model - poisoned_model, attack_info = self._run_attack(model_clean, attack_type=attack_type, knowledge=knowledge, **kwargs) + fingerprints = self._generate_fingerprints(model_clean, mode=fingerprint_mode, knowledge=knowledge, k=k, + perturb_fingerprints=use_edge_perturbation, + perturb_budget=kwargs.get('perturb_budget', 5), + **kwargs) + + + bit = kwargs.pop('bit', 30) + bfa_variant = kwargs.pop('bfa_variant', 'BFA') + + poisoned_model, attack_info = self._run_attack( + model_clean, + attack_type=attack_type, + knowledge=knowledge, + bit=bit, + bfa_variant=bfa_variant, + **kwargs + ) + poisoned_dataset = copy.deepcopy(self.dataset) if 'graph' in attack_info: poisoned_dataset.graph = attack_info['graph'] acc_poisoned = self._evaluate_accuracy(poisoned_model, poisoned_dataset) - # Step 4: Detect fingerprint flips flipped_info = self._evaluate_fingerprints(poisoned_model, fingerprints) flip_rate = flipped_info['flip_rate'] acc_drop = acc_clean - acc_poisoned + num_flipped = len(flipped_info['flipped']) + num_total = len(fingerprints) + detection_rate = num_flipped / num_total if num_total > 0 else 0.0 if verbose: print(f"Clean Accuracy: {acc_clean:.4f}") print(f"Poisoned Accuracy: {acc_poisoned:.4f}") print(f"Accuracy Drop: {acc_drop:.4f}") print(f"Flip Rate: {flip_rate:.4f}") + print(f"Detection Rate: {detection_rate:.4f}") + trial_results.append({ 'flip_rate': flip_rate, 'accuracy_drop': acc_drop, + 'detection_rate': detection_rate }) - # Compute averages + avg_flip_rate = sum(r['flip_rate'] for r in trial_results) / num_trials avg_acc_drop = sum(r['accuracy_drop'] for r in trial_results) / num_trials + avg_detection_rate = sum(r['detection_rate'] for r in trial_results) / num_trials - print(f"Clean Graph NumEdges: {self.dataset.graph.num_edges()}") - print(f"Poisoned Graph NumEdges: {poisoned_model.graph.num_edges() if hasattr(poisoned_model, 'graph') else 'N/A'}") return { 'trial_results': trial_results, 'average_flip_rate': avg_flip_rate, 'average_accuracy_drop': avg_acc_drop, + 'average_detection_rate': avg_detection_rate } @@ -176,6 +197,16 @@ def _generate_fingerprints(self, model, mode='transductive', knowledge='full', k perturb_budget=kwargs.get('perturb_budget', 5), ) fingerprints = generator.generate_fingerprints(method=knowledge) + if kwargs.get('perturb_fingerprints', False): + for i, (graph, node_idx, label) in enumerate(fingerprints): + generator.shadow_graph = graph + generator.greedy_edge_perturbation( + node_idx=node_idx, + perturb_budget=kwargs.get('perturb_budget', 5), + knowledge=knowledge + ) + fingerprints[i] = (generator.shadow_graph, node_idx, label) + unified_fingerprints = fingerprints else: @@ -222,13 +253,16 @@ def _run_attack(self, model, attack_type='mettack', knowledge='full', **kwargs): metadata: dict with info about the attack """ if attack_type == 'bitflip': - attacker = BitFlipAttack(model=model, attack_type=kwargs.get('bitflip_type', 'random'), bit=kwargs.get('bit', 0)) - info = attacker.apply() - return model, {'type': 'bitflip', 'info': info} + bit = kwargs.get('bit', 30) + bfa_variant = kwargs.get('bfa_variant', 'BFA') + attacker = BitFlipAttack(model, attack_type=bfa_variant, bit=bit) + attack_info = attacker.apply() + return model, attack_info elif attack_type == 'random': perturbed_graph = self._random_edge_addition_poisoning( - perturb_frac=kwargs.get('perturb_frac', 0.01), + node_fraction=kwargs.get('node_fraction', 0.1), + edges_per_node=kwargs.get('edges_per_node', 5), random_seed=kwargs.get('random_seed', None), ) poisoned_model = self._retrain_poisoned_model( @@ -238,6 +272,10 @@ def _run_attack(self, model, attack_type='mettack', knowledge='full', **kwargs): return poisoned_model, {'type': 'random_poison', 'graph': perturbed_graph} elif attack_type == 'mettack': + num_edges = self.dataset.graph.num_edges() + poison_frac = kwargs.get('poison_frac', 0.05) + n_perturbations = int(poison_frac * num_edges) + helper = MettackHelper( graph=self.dataset.graph, features=self.dataset.features, @@ -245,7 +283,7 @@ def _run_attack(self, model, attack_type='mettack', knowledge='full', **kwargs): train_mask=self.dataset.train_mask, val_mask=getattr(self.dataset, 'val_mask', None), test_mask=self.dataset.test_mask, - n_perturbations=kwargs.get('n_perturbations', 5), + n_perturbations=n_perturbations, device=self.device, max_perturbations=kwargs.get('max_perturbations', 50), surrogate_epochs=kwargs.get('surrogate_epochs', 30), @@ -262,49 +300,46 @@ def _run_attack(self, model, attack_type='mettack', knowledge='full', **kwargs): raise ValueError(f"Unsupported attack_type: {attack_type}") - def _random_edge_addition_poisoning(dataset, perturb_frac, random_seed=None): + def _random_edge_addition_poisoning(self, node_fraction=0.1, edges_per_node=5, random_seed=None): """ - Returns a new DGLGraph with random edges added. + Poison a fraction of nodes by adding random edges. Args: - dataset: Dataset object (with .graph as DGLGraph) - perturb_frac: Fraction of edges to add (e.g., 0.01 = 1%) - random_seed: Optional integer for reproducibility + dataset: Dataset object (DGL-based) + node_fraction: Fraction of nodes to poison (e.g., 0.1 = 10%) + edges_per_node: Number of random edges to add per poisoned node + random_seed: Optional seed Returns: - poisoned_graph: DGLGraph (deepcopy of original with new edges) + poisoned_graph: DGLGraph """ - if random_seed is not None: random.seed(random_seed) torch.manual_seed(random_seed) - orig_graph = dataset.graph - poisoned_graph = copy.deepcopy(orig_graph) + poisoned_graph = copy.deepcopy(self.dataset.graph) num_nodes = poisoned_graph.num_nodes() - num_edges_to_add = int(perturb_frac * orig_graph.num_edges()) + num_poisoned_nodes = int(node_fraction * num_nodes) + poisoned_nodes = random.sample(range(num_nodes), num_poisoned_nodes) - existing_edges = set(zip( - orig_graph.edges()[0].tolist(), - orig_graph.edges()[1].tolist() - )) + new_edges = [] - candidate_pairs = [ - (i, j) - for i in range(num_nodes) - for j in range(num_nodes) - if i != j and (i, j) not in existing_edges - ] + for src in poisoned_nodes: + for _ in range(edges_per_node): + dst = random.randint(0, num_nodes - 1) + if src != dst and \ + not poisoned_graph.has_edges_between(src, dst) and \ + not poisoned_graph.has_edges_between(dst, src): + new_edges.append((src, dst)) + new_edges.append((dst, src)) - if len(candidate_pairs) < num_edges_to_add: - raise ValueError("Perturbation budget too large: not enough candidate edges.") - - new_edges = random.sample(candidate_pairs, num_edges_to_add) - src, dst = zip(*new_edges) - poisoned_graph.add_edges(src, dst) + if new_edges: + src, dst = zip(*new_edges) + poisoned_graph.add_edges(src, dst) return poisoned_graph + def _retrain_poisoned_model(self, poisoned_graph, epochs=200): """ Retrain target GCN using the poisoned graph structure. @@ -397,7 +432,7 @@ def run_full_pipeline(self, attack_type='random', mode='transductive', knowledge class TransductiveFingerprintGenerator: - def __init__(self, model, dataset, candidate_fraction=1.0, random_seed=None, device='cpu', randomize=True): + def __init__(self, model, dataset, candidate_fraction=0.3, random_seed=None, device='cpu', randomize=True): self.model = model.to(device) self.dataset = dataset self.candidate_fraction = candidate_fraction @@ -419,7 +454,6 @@ def get_candidate_nodes(self): generator.manual_seed(self.random_seed) idx = torch.randperm(len(all_nodes), generator=generator)[:num_candidates] candidates = all_nodes[idx] - print(f"[DEBUG] Trial {self.random_seed}: Sampled candidates = {candidates.tolist()[:5]}") else: candidates = all_nodes @@ -442,7 +476,6 @@ def compute_fingerprint_scores_full(self, candidate_nodes): scores.append(grad_norm) scores_tensor = torch.tensor(scores, device=self.device) - print(f"[FULL] Fingerprint scores: mean={scores_tensor.mean():.4f}, std={scores_tensor.std():.4f}, max={scores_tensor.max():.4f}, min={scores_tensor.min():.4f}") return scores_tensor @@ -454,7 +487,6 @@ def compute_fingerprint_scores_limited(self, candidate_nodes): labels = probs.argmax(dim=1) scores = 1.0 - probs[candidate_nodes, labels[candidate_nodes]] - print(f"[LIMITED] Fingerprint scores: mean={scores.mean():.4f}, std={scores.std():.4f}, max={scores.max():.4f}, min={scores.min():.4f}") return scores @@ -470,7 +502,6 @@ def select_top_fingerprints(self, scores, candidate_nodes, k, method='full'): filtered_candidates = candidate_nodes[mask] if filtered_scores.size(0) < k: - print(f"[WARN] Only {filtered_scores.size(0)} candidates left after filtering, reducing k to fit.") k = filtered_scores.size(0) topk = torch.topk(filtered_scores, k) @@ -521,12 +552,6 @@ def generate_fingerprints(self, k=5, method='full'): fingerprints = [(int(n), int(labels[n])) for n in fingerprint_nodes] - labels_only = [label for (_, label) in fingerprints] - nodes_only = [node for (node, _) in fingerprints] - - print(f"[{method.upper()}] Fingerprint label distribution: {Counter(labels_only)}") - print(f"[{method.upper()}] Fingerprint node IDs: {nodes_only}") - return fingerprints @@ -585,7 +610,6 @@ def get_candidate_nodes(self): generator.manual_seed(self.random_seed) idx = torch.randperm(len(all_nodes), generator=generator)[:num_candidates] candidates = all_nodes[idx] - print(f"[DEBUG] Trial {self.random_seed}: Sampled candidates = {candidates.tolist()[:5]}") else: candidates = all_nodes @@ -610,7 +634,7 @@ def compute_fingerprint_score(self, node_idx): return score elif self.knowledge == 'full': - # Full knowledge: compute gradient norm wrt input features of the node + features.requires_grad_(True) logits = self.model(self.shadow_graph.to(self.device), features) pred = logits[node_idx] @@ -622,11 +646,11 @@ def compute_fingerprint_score(self, node_idx): torch.tensor([label], device=self.device) ) loss.backward(retain_graph=True) - # For simplicity, we use grad wrt features (could be extended to model params) + grad = features.grad[node_idx] grad_norm_sq = (grad ** 2).sum().item() features.requires_grad_(False) - features.grad = None # Clean up + features.grad = None return grad_norm_sq else: @@ -646,7 +670,7 @@ def generate_fingerprint_nodes(self): for idx in candidates: score = self.compute_fingerprint_score(idx) scores.append((score, int(idx))) - # Sort candidates by score, descending + scores.sort(reverse=True) selected = [idx for (_, idx) in scores[:self.num_fingerprints]] return selected @@ -732,7 +756,7 @@ def greedy_perturb_fingerprints(self, node_indices): Returns: List[int]: Indices of perturbed fingerprint nodes (features in shadow_graph are updated in-place). """ - epsilon = 0.01 # Perturbation magnitude; you may want to tune this + epsilon = 0.01 features = self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x features = features.clone().detach().to(self.device) self.shadow_graph = self.shadow_graph.to(self.device) @@ -743,7 +767,7 @@ def greedy_perturb_fingerprints(self, node_indices): while num_tries < self.perturb_budget and improved: improved = False current_score = self.compute_fingerprint_score(idx) - # Get current prediction + self.model.eval() with torch.no_grad(): logits = self.model(self.shadow_graph, features) @@ -752,24 +776,24 @@ def greedy_perturb_fingerprints(self, node_indices): for dim in range(features.shape[1]): for direction in [+1, -1]: features[idx][dim] += direction * epsilon - # Get new prediction and score + self.model.eval() with torch.no_grad(): logits_new = self.model(self.shadow_graph, features) new_pred_label = logits_new[idx].argmax().item() new_score = self.compute_fingerprint_score(idx) - # Accept if label unchanged and score increased + if new_pred_label == pred_label and new_score > current_score: current_score = new_score improved = True num_tries += 1 else: - features[idx][dim] = original_features[dim] # Revert + features[idx][dim] = original_features[dim] if num_tries >= self.perturb_budget: break if num_tries >= self.perturb_budget: break - # Optionally, update self.shadow_graph features (depends on your data structure) + if hasattr(self.shadow_graph, 'ndata'): self.shadow_graph.ndata['feat'] = features else: @@ -777,6 +801,174 @@ def greedy_perturb_fingerprints(self, node_indices): return node_indices + def greedy_edge_perturbation(self, node_idx, perturb_budget=5, knowledge='full'): + """ + Dispatch to greedy edge perturbation strategy based on verifier knowledge level. + + Args: + node_idx (int): Fingerprint node index. + perturb_budget (int): Number of edge perturbations allowed. + knowledge (str): 'full' or 'limited' + """ + if knowledge == 'full': + self._greedy_edge_perturbation_f(node_idx, perturb_budget) + elif knowledge == 'limited': + self._greedy_edge_perturbation_l(node_idx, perturb_budget) + else: + raise ValueError("knowledge must be 'full' or 'limited'") + + + def _greedy_edge_perturbation_f(self, node_idx, perturb_budget): + """ + Full knowledge edge perturbation (Inductive-F). + Increases fingerprint score using model gradients while preserving prediction. + """ + import copy + from torch_geometric.utils import to_networkx, from_networkx + import torch + + g_nx = to_networkx(self.shadow_graph.to('cpu'), to_undirected=True) + x = self.dataset.features.to(self.device) + self.model.eval() + + with torch.no_grad(): + original_pred = self.model(self.shadow_graph.to(self.device), x)[node_idx].argmax().item() + + def score_fn(modified_graph): + return self._fingerprint_score(node_idx, modified_graph.to(self.device), x) + + neighbors = list(g_nx.neighbors(node_idx)) + non_neighbors = list(set(range(self.dataset.graph.num_nodes())) - set(neighbors) - {node_idx}) + + applied = 0 + while applied < perturb_budget: + best_delta = 0 + best_graph = None + best_action = None + + + for nbr in non_neighbors: + temp_g = copy.deepcopy(g_nx) + temp_g.add_edge(node_idx, nbr) + g_temp = from_networkx(temp_g).to(self.device) + with torch.no_grad(): + pred = self.model(g_temp, x)[node_idx].argmax().item() + if pred != original_pred: + continue + score = score_fn(g_temp) + delta = score - score_fn(self.shadow_graph) + if delta > best_delta: + best_delta = delta + best_graph = g_temp + best_action = ('add', nbr) + + + for nbr in neighbors: + temp_g = copy.deepcopy(g_nx) + if temp_g.has_edge(node_idx, nbr): + temp_g.remove_edge(node_idx, nbr) + g_temp = from_networkx(temp_g).to(self.device) + with torch.no_grad(): + pred = self.model(g_temp, x)[node_idx].argmax().item() + if pred != original_pred: + continue + score = score_fn(g_temp) + delta = score - score_fn(self.shadow_graph) + if delta > best_delta: + best_delta = delta + best_graph = g_temp + best_action = ('remove', nbr) + + if best_graph is None: + break + self.shadow_graph = best_graph + g_nx = to_networkx(best_graph.to('cpu'), to_undirected=True) + + if best_action[0] == 'add': + non_neighbors.remove(best_action[1]) + neighbors.append(best_action[1]) + else: + neighbors.remove(best_action[1]) + non_neighbors.append(best_action[1]) + + applied += 1 + + def _greedy_edge_perturbation_l(self, node_idx, perturb_budget): + """ + Limited knowledge edge perturbation (Inductive-L). + Uses confidence margin (1 - confidence) as proxy for fingerprint sensitivity. + """ + import copy + from torch_geometric.utils import to_networkx, from_networkx + import torch + import torch.nn.functional as F + + g_nx = to_networkx(self.shadow_graph.to('cpu'), to_undirected=True) + x = self.dataset.features.to(self.device) + self.model.eval() + + with torch.no_grad(): + logits = self.model(self.shadow_graph.to(self.device), x) + original_pred = logits[node_idx].argmax().item() + original_conf = F.softmax(logits[node_idx], dim=0)[original_pred].item() + original_score = 1 - original_conf + + def score_fn(modified_graph): + with torch.no_grad(): + logits = self.model(modified_graph.to(self.device), x) + pred = logits[node_idx].argmax().item() + if pred != original_pred: + return -1 + conf = F.softmax(logits[node_idx], dim=0)[pred].item() + return 1 - conf + + neighbors = list(g_nx.neighbors(node_idx)) + non_neighbors = list(set(range(self.dataset.graph.num_nodes())) - set(neighbors) - {node_idx}) + + applied = 0 + while applied < perturb_budget: + best_delta = 0 + best_graph = None + best_action = None + + for nbr in non_neighbors: + temp_g = copy.deepcopy(g_nx) + temp_g.add_edge(node_idx, nbr) + g_temp = from_networkx(temp_g).to(self.device) + new_score = score_fn(g_temp) + delta = new_score - original_score + if new_score >= 0 and delta > best_delta: + best_delta = delta + best_graph = g_temp + best_action = ('add', nbr) + + for nbr in neighbors: + temp_g = copy.deepcopy(g_nx) + if temp_g.has_edge(node_idx, nbr): + temp_g.remove_edge(node_idx, nbr) + g_temp = from_networkx(temp_g).to(self.device) + new_score = score_fn(g_temp) + delta = new_score - original_score + if new_score >= 0 and delta > best_delta: + best_delta = delta + best_graph = g_temp + best_action = ('remove', nbr) + + if best_graph is None: + break + self.shadow_graph = best_graph + g_nx = to_networkx(best_graph.to('cpu'), to_undirected=True) + + if best_action[0] == 'add': + non_neighbors.remove(best_action[1]) + neighbors.append(best_action[1]) + else: + neighbors.remove(best_action[1]) + non_neighbors.append(best_action[1]) + + applied += 1 + + class BitFlipAttack: def __init__(self, model, attack_type='random', bit=0): self.model = model @@ -785,7 +977,7 @@ def __init__(self, model, attack_type='random', bit=0): def _get_target_params(self): params = [p for p in self.model.parameters() if p.requires_grad and p.numel() > 0] - if self.attack_type == 'random': + if self.attack_type in ['random', 'BFA']: return params elif self.attack_type == 'BFA-F': return [params[0]] @@ -824,49 +1016,42 @@ def apply(self): 'attack_type': self.attack_type } + class MettackHelper: def __init__(self, graph, features, labels, train_mask, val_mask, test_mask, n_perturbations=5, device='cpu', max_perturbations=50, surrogate_epochs=30, candidate_sample_size=20): - # Add self-loops to the original graph to prevent zero in-degree issues self.graph = dgl.add_self_loop(graph).to(device) self.features = features.to(device) self.labels = labels.to(device) self.train_mask = train_mask.to(device) self.surrogate_epochs = surrogate_epochs self.candidate_sample_size = candidate_sample_size - # Handle case where val_mask might be None if val_mask is not None: self.val_mask = val_mask.to(device) else: - # Create a validation mask from a subset of training data self.val_mask = self._create_val_mask_from_train(train_mask).to(device) self.test_mask = test_mask.to(device) - # Cap the number of perturbations to a reasonable limit - self.n_perturbations = min(n_perturbations, max_perturbations) + self.n_perturbations = n_perturbations self.device = device - # Surrogate GCN, matches the victim model structure from the paper (Sec. 6.1) in_feats = features.shape[1] n_classes = int(labels.max().item()) + 1 self.surrogate = GCN(in_feats, n_classes).to(device) - # For reproducibility (optional) torch.manual_seed(42) np.random.seed(42) - # Track current edge modifications if desired + self.modified_edges = set() - # Store original adjacency for candidate generation (without self-loops for edge candidates) original_graph_no_self_loop = dgl.remove_self_loop(graph) self.original_edges = set(zip(original_graph_no_self_loop.edges()[0].cpu().numpy(), original_graph_no_self_loop.edges()[1].cpu().numpy())) - # Pre-compute candidate edges for efficiency self.candidate_edges = self._get_candidate_edges() def _create_val_mask_from_train(self, train_mask): @@ -875,17 +1060,16 @@ def _create_val_mask_from_train(self, train_mask): This is needed when the dataset doesn't provide a validation mask. """ train_indices = torch.where(train_mask)[0] - n_val = min(500, len(train_indices) // 4) # Use 25% of training data or 500, whichever is smaller - - # Randomly select validation indices from training indices + n_val = min(500, len(train_indices) // 4) + perm = torch.randperm(len(train_indices)) val_indices = train_indices[perm[:n_val]] - # Create validation mask + val_mask = torch.zeros_like(train_mask, dtype=torch.bool) val_mask[val_indices] = True - # Update training mask to exclude validation nodes + self.train_mask = train_mask.clone() self.train_mask[val_indices] = False @@ -900,15 +1084,14 @@ def run(self): """ print("Starting Mettack attack...") - # 1. Train surrogate GCN on the clean graph + print("Training surrogate model...") self._train_surrogate() - # 2. Run bi-level optimization to find edge perturbations + print("Applying structure attack...") poisoned_graph = self._apply_structure_attack() - # 3. (Optional) Retrain model on poisoned_graph and collect metrics print("Evaluating attack results...") metrics = self._evaluate(poisoned_graph) @@ -922,7 +1105,7 @@ def _train_surrogate(self): optimizer = optim.Adam(self.surrogate.parameters(), lr=0.01, weight_decay=5e-4) self.surrogate.train() - # Standard GCN training loop + for epoch in range(self.surrogate_epochs): optimizer.zero_grad() logits = self.surrogate(self.graph, self.features) @@ -956,9 +1139,9 @@ def _apply_structure_attack(self): best_edge = None best_loss = -float('inf') - best_action = None # 'add' or 'remove' + best_action = None - # Sample candidate edges for efficiency (reduced for speed) + candidate_sample = np.random.choice(len(self.candidate_edges), min(self.candidate_sample_size, len(self.candidate_edges)), replace=False) @@ -967,21 +1150,17 @@ def _apply_structure_attack(self): for idx in tqdm(candidate_sample, desc="Evaluating candidates"): edge = self.candidate_edges[idx] - # Skip if already perturbed if edge in perturbed_edges or (edge[1], edge[0]) in perturbed_edges: continue - # Try both add and remove operations for action in ['add', 'remove']: if action == 'add' and edge in self.original_edges: continue if action == 'remove' and edge not in self.original_edges: continue - # Create temporary graph with this perturbation temp_graph = self._apply_single_perturbation(current_graph, edge, action) - # Evaluate attack loss on this perturbed graph attack_loss = self._compute_attack_loss(temp_graph) if attack_loss > best_loss: @@ -989,7 +1168,6 @@ def _apply_structure_attack(self): best_edge = edge best_action = action - # Apply the best perturbation if best_edge is not None: current_graph = self._apply_single_perturbation(current_graph, best_edge, best_action) perturbed_edges.add(best_edge) @@ -1008,14 +1186,12 @@ def _get_candidate_edges(self): """ n_nodes = self.graph.num_nodes() - # Get all possible edges (excluding self-loops for undirected graphs) all_possible_edges = [] for i in range(n_nodes): - for j in range(i + 1, n_nodes): # Assume undirected graph + for j in range(i + 1, n_nodes): all_possible_edges.append((i, j)) - # Convert to set for faster lookup - return all_possible_edges[:min(10000, len(all_possible_edges))] # Limit for efficiency + return all_possible_edges[:min(10000, len(all_possible_edges))] def _apply_single_perturbation(self, graph, edge, action): """ @@ -1024,10 +1200,8 @@ def _apply_single_perturbation(self, graph, edge, action): temp_graph = copy.deepcopy(graph) if action == 'add': - # Add edge in both directions for undirected graph temp_graph.add_edges([edge[0], edge[1]], [edge[1], edge[0]]) elif action == 'remove': - # Find and remove the edge src, dst = temp_graph.edges() edge_ids = [] @@ -1038,7 +1212,6 @@ def _apply_single_perturbation(self, graph, edge, action): if edge_ids: temp_graph.remove_edges(edge_ids) - # Add self-loops to handle zero in-degree nodes temp_graph = dgl.add_self_loop(temp_graph) return temp_graph @@ -1049,21 +1222,21 @@ def _compute_attack_loss(self, perturbed_graph): This measures how much the surrogate model's performance degrades. Uses proper bi-level optimization as in the original Mettack paper. """ - # Create a temporary surrogate model copy + temp_surrogate = copy.deepcopy(self.surrogate) temp_surrogate.train() - # Fine-tune on perturbed graph for a few steps (bi-level optimization) + optimizer = optim.Adam(temp_surrogate.parameters(), lr=0.01) - for _ in range(5): # Reduced from 10 for efficiency but still doing proper retraining + for _ in range(5): optimizer.zero_grad() logits = temp_surrogate(perturbed_graph, self.features) loss = F.cross_entropy(logits[self.train_mask], self.labels[self.train_mask]) loss.backward() optimizer.step() - # Evaluate on validation set - higher loss means better attack + temp_surrogate.eval() with torch.no_grad(): val_logits = temp_surrogate(perturbed_graph, self.features) @@ -1077,7 +1250,7 @@ def _evaluate(self, poisoned_graph): """ metrics = {} - # Evaluate surrogate on clean graph + self.surrogate.eval() with torch.no_grad(): clean_logits = self.surrogate(self.graph, self.features) @@ -1085,7 +1258,7 @@ def _evaluate(self, poisoned_graph): self.labels[self.test_mask]) metrics['clean_test_acc'] = clean_acc - # Train new model on poisoned graph + poisoned_model = GCN(self.features.shape[1], int(self.labels.max().item()) + 1).to(self.device) optimizer = optim.Adam(poisoned_model.parameters(), lr=0.01, weight_decay=5e-4) @@ -1098,7 +1271,7 @@ def _evaluate(self, poisoned_graph): loss.backward() optimizer.step() - # Evaluate poisoned model + poisoned_model.eval() with torch.no_grad(): poisoned_logits = poisoned_model(poisoned_graph, self.features) From 3d2ef825532ce1753e79885c9a92b0f0b9ccb3c4 Mon Sep 17 00:00:00 2001 From: cameronbender3 Date: Tue, 5 Aug 2025 18:52:51 -0400 Subject: [PATCH 4/8] Update README.md --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index a0accde..b9d3a40 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,6 @@ ### Transductive-F Detection Rate Comparison | Attack | Our Detection Rate | Paper Detection Rate | |-----------|--------------------|----------------------| - | BFA | 0.69 | 0.711 | | BFA-F | 0.67 | 0.96 | | BFA-L | 0.7 | 0.5 | From f2e74c3a63c47f578faf52d5e185e3034eb52fd7 Mon Sep 17 00:00:00 2001 From: cameronbender3 Date: Tue, 5 Aug 2025 19:16:14 -0400 Subject: [PATCH 5/8] Delete ignore_helpers directory --- ignore_helpers/adaptive_randomizer.py | 11 --- ignore_helpers/attack_sim.py | 68 ----------------- ignore_helpers/fingerprinting.py | 88 ---------------------- ignore_helpers/poison.py | 98 ------------------------- ignore_helpers/verification_workflow.py | 13 ---- 5 files changed, 278 deletions(-) delete mode 100644 ignore_helpers/adaptive_randomizer.py delete mode 100644 ignore_helpers/attack_sim.py delete mode 100644 ignore_helpers/fingerprinting.py delete mode 100644 ignore_helpers/poison.py delete mode 100644 ignore_helpers/verification_workflow.py diff --git a/ignore_helpers/adaptive_randomizer.py b/ignore_helpers/adaptive_randomizer.py deleted file mode 100644 index 3b5d957..0000000 --- a/ignore_helpers/adaptive_randomizer.py +++ /dev/null @@ -1,11 +0,0 @@ -class AdaptiveRandomizer: - def __init__(self, candidate_nodes): - self.candidates = candidate_nodes - - def sample_candidates(self, sample_size): - # Randomly sample a subset before selecting fingerprints - pass - - def apply_random_label(self, labels): - # Randomly shuffle/mutate fingerprint labels for adaptive defense - pass diff --git a/ignore_helpers/attack_sim.py b/ignore_helpers/attack_sim.py deleted file mode 100644 index eea1214..0000000 --- a/ignore_helpers/attack_sim.py +++ /dev/null @@ -1,68 +0,0 @@ -import torch -import numpy as np -import random - -def true_bit_flip(tensor, index=None, bit=0): - """ - Flips a single bit (bit index) of a float32 tensor element at a specified index. - bit=0: least significant bit (LSB) - """ - # Copy as numpy array for bit manipulation - a = tensor.detach().cpu().numpy().copy() - flat = a.ravel() - if index is None: - index = np.random.randint(0, flat.size) - old_val = flat[index] - # Get float as int - int_view = np.frombuffer(flat[index].tobytes(), dtype=np.uint32)[0] - # Flip the bit - int_view ^= (1 << bit) - # Back to float - new_val = np.frombuffer(np.uint32(int_view).tobytes(), dtype=np.float32)[0] - flat[index] = new_val - # Restore to tensor - a = flat.reshape(a.shape) - tensor.data = torch.from_numpy(a).to(tensor.device) - return old_val, new_val, index - -class BitFlipAttack: - def __init__(self, model, attack_type='random', bit=0): - """ - attack_type: 'random' (any param), 'BFA-F' (first layer), 'BFA-L' (last layer) - bit: which bit to flip (0 = LSB, 23 = start of mantissa, 30 = exponent, etc.) - """ - self.model = model - self.attack_type = attack_type - self.bit = bit - - def _get_target_params(self): - params = [p for p in self.model.parameters() if p.requires_grad and p.numel() > 0] - if self.attack_type == 'random': - return params - elif self.attack_type == 'BFA-F': # First layer only - return [params[0]] # Assumes first param is first layer (usually weights) - elif self.attack_type == 'BFA-L': # Last layer only - return [params[-1]] # Assumes last param is last layer (usually bias or weights) - else: - raise ValueError(f"Unknown attack_type {self.attack_type}") - - def apply(self): - """ - Apply the bit-flip attack in-place. - Returns: (layer_idx, param_idx, old_val, new_val) - """ - params = self._get_target_params() - with torch.no_grad(): - layer_idx = random.randrange(len(params)) - param = params[layer_idx] - idx = random.randrange(param.numel()) - old_val, new_val, actual_idx = true_bit_flip(param, index=idx, bit=self.bit) - return { - 'layer': layer_idx, - 'param_idx': actual_idx, - 'old_val': old_val, - 'new_val': new_val, - 'bit': self.bit, - 'attack_type': self.attack_type - } - diff --git a/ignore_helpers/fingerprinting.py b/ignore_helpers/fingerprinting.py deleted file mode 100644 index cd68bb3..0000000 --- a/ignore_helpers/fingerprinting.py +++ /dev/null @@ -1,88 +0,0 @@ -import torch -import torch.nn.functional as F - -class TransductiveFingerprintGenerator: - def __init__(self, model, dataset, candidate_fraction=1.0, random_seed=None, device='cpu'): - """ - Args: - model: Trained GNN model (PyTorch, implements forward(graph, features)) - dataset: PyGIP Dataset object with .graph, .features, .labels - candidate_fraction: float, what fraction of nodes to consider as candidates (default 1.0 = all) - random_seed: int, seed for reproducibility (optional) - device: device string (cpu/cuda) - """ - self.model = model.to(device) - self.dataset = dataset - self.candidate_fraction = candidate_fraction - self.random_seed = random_seed - self.device = device - - def get_candidate_nodes(self): - all_nodes = torch.arange(self.dataset.graph.num_nodes()) - if self.candidate_fraction < 1.0: - num_candidates = int(len(all_nodes) * self.candidate_fraction) - generator = torch.Generator(device=self.device) - if self.random_seed is not None: - generator.manual_seed(self.random_seed) - idx = torch.randperm(len(all_nodes), generator=generator)[:num_candidates] - return all_nodes[idx] - return all_nodes - - def compute_fingerprint_scores_full(self, candidate_nodes): - """ - Full model knowledge (Transductive-F): uses gradient norms. - """ - self.model.eval() - scores = [] - logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) - for node in candidate_nodes: - logit = logits[node] - label = logit.argmax().item() - loss = F.nll_loss(F.log_softmax(logit.unsqueeze(0), dim=1), torch.tensor([label], device=self.device)) - self.model.zero_grad() - loss.backward(retain_graph=True) - # Sum of gradient norms for all parameters - grad_norm = 0.0 - for p in self.model.parameters(): - if p.grad is not None: - grad_norm += (p.grad ** 2).sum().item() - scores.append(grad_norm) - return torch.tensor(scores, device=self.device) - - def compute_fingerprint_scores_limited(self, candidate_nodes): - """ - Limited model knowledge (Transductive-L): uses confidence. - """ - self.model.eval() - logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) - probs = F.softmax(logits, dim=1) - labels = probs.argmax(dim=1) - # Score is 1 - confidence of the predicted class (Eq. 6) - scores = 1.0 - probs[candidate_nodes, labels[candidate_nodes]] - return scores - - def select_top_fingerprints(self, scores, candidate_nodes, k): - topk = torch.topk(scores, k) - return candidate_nodes[topk.indices], topk.values - - def generate_fingerprints(self, k=5, method='full'): - """ - Args: - k: Number of fingerprints to generate - method: 'full' for Transductive-F, 'limited' for Transductive-L - Returns: - List of (node_id, label) tuples - """ - candidate_nodes = self.get_candidate_nodes().to(self.device) - if method == 'full': - scores = self.compute_fingerprint_scores_full(candidate_nodes) - elif method == 'limited': - scores = self.compute_fingerprint_scores_limited(candidate_nodes) - else: - raise ValueError("method must be 'full' or 'limited'") - fingerprint_nodes, _ = self.select_top_fingerprints(scores, candidate_nodes, k) - # Use model to get labels for fingerprint nodes - logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) - labels = logits.argmax(dim=1) - fingerprints = [(int(n), int(labels[n])) for n in fingerprint_nodes] - return fingerprints diff --git a/ignore_helpers/poison.py b/ignore_helpers/poison.py deleted file mode 100644 index 764071a..0000000 --- a/ignore_helpers/poison.py +++ /dev/null @@ -1,98 +0,0 @@ -import copy -import random -import torch - -def random_edge_addition_poisoning(dataset, perturb_frac, random_seed=None): - """ - Returns a new DGLGraph with random edges added. - - Args: - dataset: Dataset object (with .graph as DGLGraph) - perturb_frac: Fraction of edges to add (e.g., 0.01 = 1%) - random_seed: Optional integer for reproducibility - - Returns: - poisoned_graph: DGLGraph (deepcopy of original with new edges) - """ - import dgl - - if random_seed is not None: - random.seed(random_seed) - torch.manual_seed(random_seed) - - orig_graph = dataset.graph - poisoned_graph = copy.deepcopy(orig_graph) - num_nodes = poisoned_graph.num_nodes() - num_edges_to_add = int(perturb_frac * orig_graph.num_edges()) - - # Build set of all existing edges (as (u,v) pairs) - existing_edges = set(zip( - orig_graph.edges()[0].tolist(), - orig_graph.edges()[1].tolist() - )) - - # Generate candidate node pairs (exclude self-loops and duplicates) - candidate_pairs = [ - (i, j) - for i in range(num_nodes) - for j in range(num_nodes) - if i != j and (i, j) not in existing_edges - ] - - if len(candidate_pairs) < num_edges_to_add: - raise ValueError("Perturbation budget too large: not enough candidate edges.") - - new_edges = random.sample(candidate_pairs, num_edges_to_add) - src, dst = zip(*new_edges) - poisoned_graph.add_edges(src, dst) - - return poisoned_graph - -def retrain_poisoned_model(dataset, poisoned_graph, defense_class, device='cpu'): - """ - Retrain target GCN using the poisoned graph structure. - - Args: - dataset: Original Dataset object (provides features, labels, masks) - poisoned_graph: DGLGraph (with new random edges added) - defense_class: The defense class to use for model training (e.g., QueryBasedVerificationDefense) - device: 'cpu' or 'cuda' - - Returns: - model: Trained GCN model - """ - # Create a shallow copy and swap in the poisoned graph - dataset_poisoned = copy.copy(dataset) - dataset_poisoned.graph = poisoned_graph - - # If Dataset is more complex, you may want to rebuild it from scratch - defense = defense_class(dataset=dataset_poisoned, attack_node_fraction=0.1) - model = defense._train_target_model() - return model - -def evaluate_accuracy(model, dataset, device='cpu'): - """ - Evaluates test accuracy of the given model on the dataset. - - Args: - model: Trained GCN model - dataset: Dataset object (provides features, labels, test_mask, graph) - device: 'cpu' or 'cuda' - - Returns: - accuracy: float (test accuracy, 0-1) - """ - model.eval() - features = dataset.features.to(device) - labels = dataset.labels.to(device) - test_mask = dataset.test_mask - - with torch.no_grad(): - logits = model(dataset.graph.to(device), features) - pred = logits.argmax(dim=1) - correct = (pred[test_mask] == labels[test_mask]).float() - accuracy = correct.sum().item() / test_mask.sum().item() - return accuracy - -# (Optional) If you plan to support more attack types, you could add: -# def mettack_poisoning(...): ... diff --git a/ignore_helpers/verification_workflow.py b/ignore_helpers/verification_workflow.py deleted file mode 100644 index 25bc7f2..0000000 --- a/ignore_helpers/verification_workflow.py +++ /dev/null @@ -1,13 +0,0 @@ -class VerificationWorkflow: - def __init__(self, model, graph, labels, fingerprinting_args): - self.fingerprinter = Fingerprinting(model, graph, labels, **fingerprinting_args) - self.fingerprints = None - - def offline_phase(self): - # 1. Generate fingerprints and record expected outputs - self.fingerprints = self.fingerprinter.select_fingerprints() - - def online_phase(self, queried_model): - # 2. Query fingerprint nodes, compare predictions - # 3. Return detection result (True if any mismatch) - pass From c2e7e3ee4df46b5143872f43ae6eda715d9c60b0 Mon Sep 17 00:00:00 2001 From: cameronbender3 Date: Tue, 5 Aug 2025 19:16:25 -0400 Subject: [PATCH 6/8] Delete ignore_tests directory --- ignore_tests/test_fingerprinting.py | 144 ---------------------------- ignore_tests/test_model.py | 27 ------ 2 files changed, 171 deletions(-) delete mode 100644 ignore_tests/test_fingerprinting.py delete mode 100644 ignore_tests/test_model.py diff --git a/ignore_tests/test_fingerprinting.py b/ignore_tests/test_fingerprinting.py deleted file mode 100644 index a9fdad7..0000000 --- a/ignore_tests/test_fingerprinting.py +++ /dev/null @@ -1,144 +0,0 @@ -import torch -from datasets import Cora -from models.defense import QueryBasedVerificationDefense -from ignore_helpers import fingerprinting, attack_sim, poison -import torch.nn.functional as F -import copy # Python's deepcopy - - -def evaluate_fingerprints(model, dataset, fingerprints, device='cpu'): - model.eval() - logits = model(dataset.graph.to(device), dataset.features.to(device)) - pred_labels = logits.argmax(dim=1).cpu() - changed = [] - for node_id, clean_label in fingerprints: - if pred_labels[node_id] != clean_label: - changed.append((node_id, clean_label, int(pred_labels[node_id]))) - return changed - - - -def main_poisoning(num_trials=50, poison_frac=0.01): - - device = 'cuda' if torch.cuda.is_available() else 'cpu' - dataset = Cora() - - print("Training clean target model...") - defense = QueryBasedVerificationDefense(dataset=dataset, attack_node_fraction=0.1) - base_model = defense._train_target_model() - - # Accuracy before any poisoning - clean_acc = poison.evaluate_accuracy(base_model, dataset, device=device) - print(f"Clean model test accuracy: {clean_acc:.4f}") - - # # If/when you want to test fingerprints: - # generator = fingerprinting.TransductiveFingerprintGenerator(base_model, dataset, candidate_fraction=1.0, random_seed=42, device=device) - # fingerprints_full = generator.generate_fingerprints(k=k, method='full') - # fingerprints_limited = generator.generate_fingerprints(k=k, method='limited') - - poisoned_accuracies = [] - - for trial in range(num_trials): - poisoned_graph = poison.random_edge_addition_poisoning( - dataset=dataset, - perturb_frac=poison_frac, - random_seed=trial - ) - - # Make a dataset copy with the poisoned graph - dataset_poisoned = copy.copy(dataset) - dataset_poisoned.graph = poisoned_graph - - poisoned_model = poison.retrain_poisoned_model( - dataset=dataset_poisoned, # Use the poisoned dataset - poisoned_graph=poisoned_graph, - defense_class=QueryBasedVerificationDefense, - device=device - ) - - # Evaluate on the poisoned dataset - poisoned_acc = poison.evaluate_accuracy(poisoned_model, dataset_poisoned, device=device) - poisoned_accuracies.append(poisoned_acc) - - if trial == 0: - print(f"Example poisoned test accuracy: {poisoned_acc:.4f}") - - if (trial + 1) % 10 == 0: - print(f"Poison Trial {trial+1}/{num_trials}") - - - # # Evaluate fingerprints (disabled for now) - # changed_full = evaluate_fingerprints(poisoned_model, dataset, fingerprints_full, device=device) - # changed_limited = evaluate_fingerprints(poisoned_model, dataset, fingerprints_limited, device=device) - # if changed_full: - # detected_full += 1 - # if changed_limited: - # detected_limited += 1 - - # Final stats - avg_poisoned_acc = sum(poisoned_accuracies) / len(poisoned_accuracies) - print("\n==== Poisoning Results ====") - print(f"Average clean model test accuracy: {clean_acc:.4f}") - print(f"Average poisoned model test accuracy: {avg_poisoned_acc:.4f}") - print(f"Average accuracy drop: {clean_acc - avg_poisoned_acc:.4f}") - # print("\n==== Poisoning Detection Rate Results ====") - # print(f"Transductive-F (full knowledge) DR: {detected_full/num_trials:.3f}") - # print(f"Transductive-L (limited knowledge) DR: {detected_limited/num_trials:.3f}") - - - -def main(num_trials=100, k=5, attack_type='random', bit=0): - """ - :param num_trials: Number of attack rounds - :param k: Number of fingerprints - :param attack_type: 'random', 'BFA-F', 'BFA-L' - :param bit: Which bit to flip (0 = LSB, 23 = mantissa, 30 = exponent, etc.) - """ - device = 'cuda' if torch.cuda.is_available() else 'cpu' - dataset = Cora() - print("Training target model (baseline)...") - defense = QueryBasedVerificationDefense(dataset=dataset, attack_node_fraction=0.1) - base_model = defense._train_target_model() # Train ONCE - - generator = fingerprinting.TransductiveFingerprintGenerator(base_model, dataset, candidate_fraction=1.0, random_seed=42, device=device) - fingerprints_full = generator.generate_fingerprints(k=k, method='full') - fingerprints_limited = generator.generate_fingerprints(k=k, method='limited') - - detected_full = 0 - detected_limited = 0 - - for trial in range(num_trials): - attacked_model = copy.deepcopy(base_model) - attack = attack_sim.BitFlipAttack(attacked_model, attack_type=attack_type, bit=bit) - attack_result = attack.apply() - if trial < 5: - def float_to_bits(val): - import struct - [d] = struct.unpack(">L", struct.pack(">f", val)) - return f"{d:032b}" - old_val = attack_result['old_val'] - new_val = attack_result['new_val'] - bit_idx = attack_result['bit'] - print(f"Trial {trial+1} bit-flip details:") - print(f" Flipped bit: {bit_idx}") - print(f" Old value: {old_val} ({float_to_bits(old_val)})") - print(f" New value: {new_val} ({float_to_bits(new_val)})") - - changed_full = evaluate_fingerprints(attacked_model, dataset, fingerprints_full, device=device) - changed_limited = evaluate_fingerprints(attacked_model, dataset, fingerprints_limited, device=device) - if changed_full: - detected_full += 1 - if changed_limited: - detected_limited += 1 - if (trial + 1) % 10 == 0: - print(f"Trial {trial+1}/{num_trials}: F={detected_full} L={detected_limited}") - - - - print("\n==== Detection Rate Results ====") - print(f"Transductive-F (full knowledge) DR: {detected_full/num_trials:.3f}") - print(f"Transductive-L (limited knowledge) DR: {detected_limited/num_trials:.3f}") - -if __name__ == '__main__': - main_poisoning(num_trials=50, poison_frac=0.01) - diff --git a/ignore_tests/test_model.py b/ignore_tests/test_model.py deleted file mode 100644 index 6e325c4..0000000 --- a/ignore_tests/test_model.py +++ /dev/null @@ -1,27 +0,0 @@ -import torch -from datasets import Cora -from models.defense import QueryBasedVerificationDefense - -def test_train_target_model(): - # Load dataset - dataset = Cora() # substitute with your actual Dataset class if different - print("Dataset loaded.") - print(f"Features: {dataset.features.shape}, Labels: {dataset.labels.shape}") - - # Initialize defense object - defense = QueryBasedVerificationDefense(dataset=dataset, attack_node_fraction=0.1) - - # Train model - model = defense._train_target_model() - - # Test model outputs shape - model.eval() - with torch.no_grad(): - device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - logits = model(dataset.graph.to(device), dataset.features.to(device)) - print("Logits shape:", logits.shape) - # Optionally: check output for a few nodes - print("First 5 node predictions:", logits[:5].argmax(dim=1).cpu().numpy()) - -if __name__ == "__main__": - test_train_target_model() From ce492db79978a39f2ba6d6efe0c7ede5c3761770 Mon Sep 17 00:00:00 2001 From: cameronbender3 Date: Tue, 5 Aug 2025 19:16:44 -0400 Subject: [PATCH 7/8] Delete ignore_notes --- ignore_notes | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 ignore_notes diff --git a/ignore_notes b/ignore_notes deleted file mode 100644 index 18aa2b9..0000000 --- a/ignore_notes +++ /dev/null @@ -1,2 +0,0 @@ -random poisoning implemented, need to test fingerprints on these then -mettack next From 6d65146ca7770e7bbd8ba8b59649211e0c346b77 Mon Sep 17 00:00:00 2001 From: cameronbender3 Date: Sun, 10 Aug 2025 14:26:46 -0400 Subject: [PATCH 8/8] Update dataset and device handling --- models/defense/QueryBasedVerification.py | 255 +++++++++-------------- 1 file changed, 98 insertions(+), 157 deletions(-) diff --git a/models/defense/QueryBasedVerification.py b/models/defense/QueryBasedVerification.py index 4b170e9..51bc7a1 100644 --- a/models/defense/QueryBasedVerification.py +++ b/models/defense/QueryBasedVerification.py @@ -15,15 +15,12 @@ from tqdm import tqdm - -device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - - class QueryBasedVerificationDefense(BaseDefense): + supported_api_types = {"dgl"} + supported_datasets = {} def __init__(self, dataset, attack_node_fraction, model_path=None): super().__init__(dataset, attack_node_fraction) self.model_path = model_path - self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') @@ -64,7 +61,7 @@ def defend(self, fingerprint_mode='inductive', knowledge='full', attack_type='bi poisoned_dataset = copy.deepcopy(self.dataset) if 'graph' in attack_info: - poisoned_dataset.graph = attack_info['graph'] + poisoned_dataset.graph_data = attack_info['graph'] acc_poisoned = self._evaluate_accuracy(poisoned_model, poisoned_dataset) @@ -105,6 +102,9 @@ def defend(self, fingerprint_mode='inductive', knowledge='full', attack_type='bi } + def _get_features(self): + return self.graph_data.ndata['feat'] if hasattr(self.graph_data, 'ndata') else self.graph_data.x + def _train_target_model(self, epochs=200): """ @@ -119,23 +119,23 @@ def _train_target_model(self, epochs=200): model = GCN( feature_number=self.dataset.feature_number, label_number=self.dataset.label_number - ).to(device) - print(f"Training target model on device: {device} ...") + ).to(self.device) + print(f"Training target model on device: {self.device} ...") optimizer = Adam(model.parameters(), lr=0.02) loss_fn = torch.nn.NLLLoss() - features = self.dataset.features.to(device) - labels = self.dataset.labels.to(device) - train_mask = self.dataset.train_mask.to(device) + features = self._get_features().to(self.device) + labels = self.dataset.labels.to(self.device) + train_mask = self.dataset.train_mask.to(self.device) val_mask = getattr(self.dataset, "val_mask", None) if val_mask is None: val_mask = self.dataset.test_mask - val_mask = val_mask.to(device) + val_mask = val_mask.to(self.device) for epoch in range(epochs): model.train() - logits = model(self.dataset.graph.to(device), features) + logits = model(self.graph_data.to(self.device), features) log_probs = F.log_softmax(logits, dim=1) loss = loss_fn(log_probs[train_mask], labels[train_mask]) @@ -146,7 +146,7 @@ def _train_target_model(self, epochs=200): if (epoch + 1) % 10 == 0 or epoch == 0: model.eval() with torch.no_grad(): - val_logits = model(self.dataset.graph.to(device), features) + val_logits = model(self.graph_data.to(self.device), features) val_log_probs = F.log_softmax(val_logits, dim=1) val_pred = val_log_probs[val_mask].max(1)[1] val_acc = (val_pred == labels[val_mask]).float().mean().item() @@ -181,12 +181,12 @@ def _generate_fingerprints(self, model, mode='transductive', knowledge='full', k ) fingerprints = generator.generate_fingerprints(k=k, method=knowledge) - unified_fingerprints = [(self.dataset.graph, node_id, label) for (node_id, label) in fingerprints] + unified_fingerprints = [(self.graph_data, node_id, label) for (node_id, label) in fingerprints] elif mode == 'inductive': generator = InductiveFingerprintGenerator( model=model, - shadow_graph=self.dataset.graph, + shadow_graph=self.dataset.graph_data, knowledge=knowledge, candidate_fraction=kwargs.get('candidate_fraction', 0.3), num_fingerprints=k, @@ -272,13 +272,13 @@ def _run_attack(self, model, attack_type='mettack', knowledge='full', **kwargs): return poisoned_model, {'type': 'random_poison', 'graph': perturbed_graph} elif attack_type == 'mettack': - num_edges = self.dataset.graph.num_edges() + num_edges = self.graph_data.num_edges() poison_frac = kwargs.get('poison_frac', 0.05) n_perturbations = int(poison_frac * num_edges) helper = MettackHelper( - graph=self.dataset.graph, - features=self.dataset.features, + graph=self.graph_data, + features=self._get_features(), labels=self.dataset.labels, train_mask=self.dataset.train_mask, val_mask=getattr(self.dataset, 'val_mask', None), @@ -317,7 +317,7 @@ def _random_edge_addition_poisoning(self, node_fraction=0.1, edges_per_node=5, r random.seed(random_seed) torch.manual_seed(random_seed) - poisoned_graph = copy.deepcopy(self.dataset.graph) + poisoned_graph = copy.deepcopy(self.graph_data) num_nodes = poisoned_graph.num_nodes() num_poisoned_nodes = int(node_fraction * num_nodes) poisoned_nodes = random.sample(range(num_nodes), num_poisoned_nodes) @@ -354,7 +354,7 @@ def _retrain_poisoned_model(self, poisoned_graph, epochs=200): model: Trained GCN model """ dataset_poisoned = copy.deepcopy(self.dataset) - dataset_poisoned.graph = poisoned_graph + dataset_poisoned.graph_data = poisoned_graph defense = QueryBasedVerificationDefense(dataset=dataset_poisoned, attack_node_fraction=0.1) model = defense._train_target_model(epochs=epochs) @@ -368,18 +368,17 @@ def _evaluate_accuracy(self, model, dataset): Args: model: Trained GCN model dataset: Dataset object (provides features, labels, test_mask, graph) - device: 'cpu' or 'cuda' Returns: accuracy: float (test accuracy, 0-1) """ model.eval() - features = dataset.features.to(device) - labels = dataset.labels.to(device) + features = self._get_features().to(self.device) + labels = dataset.labels.to(self.device) test_mask = dataset.test_mask with torch.no_grad(): - logits = model(dataset.graph.to(device), features) + logits = model(dataset.graph_data.to(self.device), features) pred = logits.argmax(dim=1) correct = (pred[test_mask] == labels[test_mask]).float() accuracy = correct.sum().item() / test_mask.sum().item() @@ -433,19 +432,21 @@ def run_full_pipeline(self, attack_type='random', mode='transductive', knowledge class TransductiveFingerprintGenerator: def __init__(self, model, dataset, candidate_fraction=0.3, random_seed=None, device='cpu', randomize=True): - self.model = model.to(device) + self.device = torch.device(device) + self.model = model.to(self.device) self.dataset = dataset + self.graph_data = dataset.graph_data self.candidate_fraction = candidate_fraction self.random_seed = random_seed - self.device = device self.randomize = randomize + def _get_features(self): + """Backend-agnostic feature getter (DGL or PyG).""" + return self.graph_data.ndata['feat'] if hasattr(self.graph_data, 'ndata') else self.graph_data.x + def get_candidate_nodes(self): - """ - Step 1: Randomly sample a subset of nodes as candidates (for robustness). - Step 2: Return that set for scoring. - """ - all_nodes = torch.arange(self.dataset.graph.num_nodes()) + """Randomly sample a subset of nodes as candidates.""" + all_nodes = torch.arange(self.graph_data.num_nodes()) num_candidates = max(1, int(len(all_nodes) * self.candidate_fraction)) if self.randomize and self.candidate_fraction < 1.0: @@ -453,18 +454,15 @@ def get_candidate_nodes(self): if self.random_seed is not None: generator.manual_seed(self.random_seed) idx = torch.randperm(len(all_nodes), generator=generator)[:num_candidates] - candidates = all_nodes[idx] - else: - candidates = all_nodes - - return candidates - - + return all_nodes[idx] + return all_nodes def compute_fingerprint_scores_full(self, candidate_nodes): + """Full-knowledge fingerprint scores (gradient-based).""" self.model.eval() scores = [] - logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + x = self._get_features().to(self.device) + logits = self.model(self.graph_data.to(self.device), x) for node in candidate_nodes: self.model.zero_grad() @@ -475,26 +473,22 @@ def compute_fingerprint_scores_full(self, candidate_nodes): grad_norm = sum((p.grad ** 2).sum().item() for p in self.model.parameters() if p.grad is not None) scores.append(grad_norm) - scores_tensor = torch.tensor(scores, device=self.device) - return scores_tensor - + return torch.tensor(scores, device=self.device) def compute_fingerprint_scores_limited(self, candidate_nodes): + """Limited-knowledge fingerprint scores (confidence margin).""" self.model.eval() + x = self._get_features().to(self.device) with torch.no_grad(): - logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + logits = self.model(self.graph_data.to(self.device), x) probs = F.softmax(logits, dim=1) labels = probs.argmax(dim=1) scores = 1.0 - probs[candidate_nodes, labels[candidate_nodes]] - return scores - def select_top_fingerprints(self, scores, candidate_nodes, k, method='full'): - """ - Selects top-k fingerprint nodes after filtering out extreme score outliers. - """ - q = 0.99 if method == 'full' else 1.0 + """Selects top-k fingerprint nodes after filtering out extreme score outliers.""" + q = 0.99 if method == 'full' else 1.0 threshold = torch.quantile(scores, q) mask = scores <= threshold @@ -505,17 +499,14 @@ def select_top_fingerprints(self, scores, candidate_nodes, k, method='full'): k = filtered_scores.size(0) topk = torch.topk(filtered_scores, k) - selected_nodes = filtered_candidates[topk.indices] - selected_scores = topk.values - - return selected_nodes, selected_scores - + return filtered_candidates[topk.indices], topk.values def generate_fingerprints(self, k=5, method='full'): candidate_nodes = self.get_candidate_nodes().to(self.device) + x = self._get_features().to(self.device) with torch.no_grad(): - logits = self.model(self.dataset.graph.to(self.device), self.dataset.features.to(self.device)) + logits = self.model(self.graph_data.to(self.device), x) labels = logits.argmax(dim=1) if method == 'full': @@ -528,18 +519,14 @@ def generate_fingerprints(self, k=5, method='full'): class_to_candidates = {} for i, node in enumerate(candidate_nodes): cls = int(labels[node]) - if cls not in class_to_candidates: - class_to_candidates[cls] = [] - class_to_candidates[cls].append((node.item(), scores[i].item())) + class_to_candidates.setdefault(cls, []).append((node.item(), scores[i].item())) rng = random.Random(self.random_seed) - class_list = list(class_to_candidates.keys()) rng.shuffle(class_list) fingerprints = [] for cls in class_list: - class_nodes = sorted(class_to_candidates[cls], key=lambda x: x[1], reverse=True) top_node = class_nodes[0][0] fingerprints.append((top_node, cls)) @@ -547,48 +534,26 @@ def generate_fingerprints(self, k=5, method='full'): break if len(fingerprints) < k: - fingerprint_nodes, _ = self.select_top_fingerprints(scores, candidate_nodes, k, method=method) fingerprints = [(int(n), int(labels[n])) for n in fingerprint_nodes] - return fingerprints - class InductiveFingerprintGenerator: - """ - Implements inductive fingerprint generation for both Full ('full') and Limited ('limited') - knowledge settings, as described in Wu et al. (2023) Sections 4.2, 4.2.2, and 5.2. - Supports randomized candidate selection for robustness against adaptive attackers. - """ - - def __init__(self, model, shadow_graph, knowledge='limited', + def __init__(self, model, dataset, shadow_graph=None, knowledge='limited', candidate_fraction=0.3, num_fingerprints=5, randomize=True, random_seed=None, device='cpu', perturb_fingerprints=False, perturb_budget=5): - """ - Args: - model: GNN model to be fingerprinted. - shadow_graph: PyG/DGL graph object for querying (shadow/inference graph). - knowledge: 'full' for gradient-based (requires model weights), 'limited' for output-based. - candidate_fraction: Fraction of nodes considered as candidates for fingerprinting. - num_fingerprints: Number of fingerprint nodes to select. - randomize: Whether to randomly sample candidate nodes (default True). - random_seed: Optional seed for reproducibility. - device: Torch device string (e.g., 'cpu' or 'cuda'). - perturb_fingerprints: Whether to greedily perturb fingerprint nodes' features/edges to increase sensitivity. - perturb_budget: Max number of perturbation steps per fingerprint node (default 5). - - """ - self.model = model.to(device) - self.shadow_graph = shadow_graph + self.device = torch.device(device) + self.model = model.to(self.device) + self.dataset = dataset + self.shadow_graph = shadow_graph if shadow_graph is not None else dataset.graph_data self.knowledge = knowledge self.candidate_fraction = candidate_fraction self.num_fingerprints = num_fingerprints self.randomize = randomize self.random_seed = random_seed - self.device = device self.perturb_fingerprints = perturb_fingerprints self.perturb_budget = perturb_budget @@ -596,11 +561,12 @@ def __init__(self, model, shadow_graph, knowledge='limited', torch.manual_seed(self.random_seed) random.seed(self.random_seed) + + def _get_features(self): + return self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x + + def get_candidate_nodes(self): - """ - Step 1: Randomly sample a subset of nodes as candidates (for robustness). - Step 2: Score and select top-k from this set. - """ all_nodes = torch.arange(self.shadow_graph.num_nodes()) num_candidates = max(1, int(len(all_nodes) * self.candidate_fraction)) @@ -616,27 +582,25 @@ def get_candidate_nodes(self): return candidates - def compute_fingerprint_score(self, node_idx): + def compute_fingerprint_score(self, node_idx, graph_override=None): """ Computes the fingerprint score for a given node according to knowledge mode. - Returns: float: Sensitivity score for the node. + If graph_override is provided, scoring is done on that graph instead of shadow_graph. """ - features = self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x - features = features.to(self.device) + graph = graph_override if graph_override is not None else self.shadow_graph + x = (graph.ndata['feat'] if hasattr(graph, 'ndata') else graph.x).to(self.device) self.model.eval() if self.knowledge == 'limited': with torch.no_grad(): - logits = self.model(self.shadow_graph.to(self.device), features) + logits = self.model(graph.to(self.device), x) probs = torch.softmax(logits[node_idx], dim=0) pred_class = probs.argmax().item() - score = 1 - probs[pred_class].item() - return score + return 1 - probs[pred_class].item() elif self.knowledge == 'full': - - features.requires_grad_(True) - logits = self.model(self.shadow_graph.to(self.device), features) + x.requires_grad_(True) + logits = self.model(graph.to(self.device), x) pred = logits[node_idx] label = pred.argmax().item() @@ -647,12 +611,11 @@ def compute_fingerprint_score(self, node_idx): ) loss.backward(retain_graph=True) - grad = features.grad[node_idx] + grad = x.grad[node_idx] grad_norm_sq = (grad ** 2).sum().item() - features.requires_grad_(False) - features.grad = None + x.requires_grad_(False) + x.grad = None return grad_norm_sq - else: raise ValueError("knowledge must be 'limited' or 'full'") @@ -675,25 +638,15 @@ def generate_fingerprint_nodes(self): selected = [idx for (_, idx) in scores[:self.num_fingerprints]] return selected - def save_fingerprint_tuples(self, node_indices): - """ - Step 4: Creates the final fingerprint set, storing the expected label for each - selected fingerprint node. Tuples (graph, node_id, label) will be used - during online verification. - - Args: - node_indices: List[int] of selected fingerprint node indices. - Returns: - List[Tuple[graph, node_id, label]]: The fingerprints for online checking. - """ + def save_fingerprint_tuples(self, node_indices): self.model.eval() + x = self._get_features().to(self.device) with torch.no_grad(): - features = self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x - logits = self.model(self.shadow_graph.to(self.device), features.to(self.device)) + logits = self.model(self.shadow_graph.to(self.device), x) labels = logits.argmax(dim=1).cpu().numpy() - fingerprints = [(self.shadow_graph, int(idx), int(labels[idx])) for idx in node_indices] - return fingerprints + return [(self.shadow_graph, int(idx), int(labels[idx])) for idx in node_indices] + def generate_fingerprints(self, method='full'): """ @@ -757,8 +710,7 @@ def greedy_perturb_fingerprints(self, node_indices): List[int]: Indices of perturbed fingerprint nodes (features in shadow_graph are updated in-place). """ epsilon = 0.01 - features = self.shadow_graph.ndata['feat'] if hasattr(self.shadow_graph, 'ndata') else self.shadow_graph.x - features = features.clone().detach().to(self.device) + features = self._get_features().clone().detach().to(self.device) self.shadow_graph = self.shadow_graph.to(self.device) for idx in node_indices: @@ -766,12 +718,13 @@ def greedy_perturb_fingerprints(self, node_indices): improved = True while num_tries < self.perturb_budget and improved: improved = False - current_score = self.compute_fingerprint_score(idx) + current_score = self.compute_fingerprint_score(idx, graph_override=self.shadow_graph) self.model.eval() with torch.no_grad(): logits = self.model(self.shadow_graph, features) pred_label = logits[idx].argmax().item() + original_features = features[idx].clone() for dim in range(features.shape[1]): for direction in [+1, -1]: @@ -781,7 +734,7 @@ def greedy_perturb_fingerprints(self, node_indices): with torch.no_grad(): logits_new = self.model(self.shadow_graph, features) new_pred_label = logits_new[idx].argmax().item() - new_score = self.compute_fingerprint_score(idx) + new_score = self.compute_fingerprint_score(idx, graph_override=self.shadow_graph) if new_pred_label == pred_label and new_score > current_score: current_score = new_score @@ -789,6 +742,7 @@ def greedy_perturb_fingerprints(self, node_indices): num_tries += 1 else: features[idx][dim] = original_features[dim] + if num_tries >= self.perturb_budget: break if num_tries >= self.perturb_budget: @@ -800,7 +754,6 @@ def greedy_perturb_fingerprints(self, node_indices): self.shadow_graph.x = features return node_indices - def greedy_edge_perturbation(self, node_idx, perturb_budget=5, knowledge='full'): """ Dispatch to greedy edge perturbation strategy based on verifier knowledge level. @@ -823,22 +776,19 @@ def _greedy_edge_perturbation_f(self, node_idx, perturb_budget): Full knowledge edge perturbation (Inductive-F). Increases fingerprint score using model gradients while preserving prediction. """ - import copy - from torch_geometric.utils import to_networkx, from_networkx - import torch g_nx = to_networkx(self.shadow_graph.to('cpu'), to_undirected=True) - x = self.dataset.features.to(self.device) + x = self._get_features().to(self.device) self.model.eval() with torch.no_grad(): original_pred = self.model(self.shadow_graph.to(self.device), x)[node_idx].argmax().item() def score_fn(modified_graph): - return self._fingerprint_score(node_idx, modified_graph.to(self.device), x) + return self.compute_fingerprint_score(node_idx, graph_override=modified_graph) neighbors = list(g_nx.neighbors(node_idx)) - non_neighbors = list(set(range(self.dataset.graph.num_nodes())) - set(neighbors) - {node_idx}) + non_neighbors = list(set(range(self.shadow_graph.num_nodes())) - set(neighbors) - {node_idx}) applied = 0 while applied < perturb_budget: @@ -846,7 +796,6 @@ def score_fn(modified_graph): best_graph = None best_action = None - for nbr in non_neighbors: temp_g = copy.deepcopy(g_nx) temp_g.add_edge(node_idx, nbr) @@ -855,14 +804,12 @@ def score_fn(modified_graph): pred = self.model(g_temp, x)[node_idx].argmax().item() if pred != original_pred: continue - score = score_fn(g_temp) - delta = score - score_fn(self.shadow_graph) + delta = score_fn(g_temp) - score_fn(self.shadow_graph) if delta > best_delta: best_delta = delta best_graph = g_temp best_action = ('add', nbr) - for nbr in neighbors: temp_g = copy.deepcopy(g_nx) if temp_g.has_edge(node_idx, nbr): @@ -872,15 +819,14 @@ def score_fn(modified_graph): pred = self.model(g_temp, x)[node_idx].argmax().item() if pred != original_pred: continue - score = score_fn(g_temp) - delta = score - score_fn(self.shadow_graph) + delta = score_fn(g_temp) - score_fn(self.shadow_graph) if delta > best_delta: best_delta = delta best_graph = g_temp best_action = ('remove', nbr) if best_graph is None: - break + break self.shadow_graph = best_graph g_nx = to_networkx(best_graph.to('cpu'), to_undirected=True) @@ -892,19 +838,15 @@ def score_fn(modified_graph): non_neighbors.append(best_action[1]) applied += 1 - + def _greedy_edge_perturbation_l(self, node_idx, perturb_budget): """ Limited knowledge edge perturbation (Inductive-L). Uses confidence margin (1 - confidence) as proxy for fingerprint sensitivity. """ - import copy - from torch_geometric.utils import to_networkx, from_networkx - import torch - import torch.nn.functional as F g_nx = to_networkx(self.shadow_graph.to('cpu'), to_undirected=True) - x = self.dataset.features.to(self.device) + x = self._get_features().to(self.device) self.model.eval() with torch.no_grad(): @@ -918,12 +860,12 @@ def score_fn(modified_graph): logits = self.model(modified_graph.to(self.device), x) pred = logits[node_idx].argmax().item() if pred != original_pred: - return -1 + return -1 conf = F.softmax(logits[node_idx], dim=0)[pred].item() return 1 - conf neighbors = list(g_nx.neighbors(node_idx)) - non_neighbors = list(set(range(self.dataset.graph.num_nodes())) - set(neighbors) - {node_idx}) + non_neighbors = list(set(range(self.shadow_graph.num_nodes())) - set(neighbors) - {node_idx}) applied = 0 while applied < perturb_budget: @@ -968,7 +910,6 @@ def score_fn(modified_graph): applied += 1 - class BitFlipAttack: def __init__(self, model, attack_type='random', bit=0): self.model = model @@ -1022,25 +963,25 @@ class MettackHelper: def __init__(self, graph, features, labels, train_mask, val_mask, test_mask, n_perturbations=5, device='cpu', max_perturbations=50, surrogate_epochs=30, candidate_sample_size=20): - self.graph = dgl.add_self_loop(graph).to(device) - self.features = features.to(device) - self.labels = labels.to(device) - self.train_mask = train_mask.to(device) + self.device = device + self.graph = dgl.add_self_loop(graph).to(self.device) + self.features = features.to(self.device) + self.labels = labels.to(self.device) + self.train_mask = train_mask.to(self.device) self.surrogate_epochs = surrogate_epochs self.candidate_sample_size = candidate_sample_size if val_mask is not None: - self.val_mask = val_mask.to(device) + self.val_mask = val_mask.to(self.device) else: - self.val_mask = self._create_val_mask_from_train(train_mask).to(device) + self.val_mask = self._create_val_mask_from_train(train_mask).to(self.device) - self.test_mask = test_mask.to(device) + self.test_mask = test_mask.to(self.device) self.n_perturbations = n_perturbations - self.device = device in_feats = features.shape[1] n_classes = int(labels.max().item()) + 1 - self.surrogate = GCN(in_feats, n_classes).to(device) + self.surrogate = GCN(in_feats, n_classes).to(self.device) torch.manual_seed(42) np.random.seed(42)