From 8d185748e096f9d5c7a93d6104ed2f538e82d1dc Mon Sep 17 00:00:00 2001 From: "pensarappdev[bot]" <182706286+pensarappdev[bot]@users.noreply.github.com> Date: Wed, 7 May 2025 16:39:25 +0000 Subject: [PATCH] Fix security issue: ML Pipeline Data Poisoning via Unvalidated Tweet Ingestion (CWE-20, ML02) --- nexus/profile.py | 131 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 114 insertions(+), 17 deletions(-) diff --git a/nexus/profile.py b/nexus/profile.py index bceb62d..7d5977e 100644 --- a/nexus/profile.py +++ b/nexus/profile.py @@ -10,6 +10,7 @@ from sklearn.cluster import KMeans import os from dotenv import load_dotenv +import re load_dotenv() @@ -19,6 +20,65 @@ def __init__(self): self.utils = Utils() self.atlas = AtlasClient() + def _is_valid_tweet(self, tweet: dict, min_length: int = 30, max_length: int = 512) -> bool: + """ + Perform validation/sanity checks on ingested tweets to reduce data poisoning risk. + - Enforces min/max length, + - Checks for only-printable UTF-8, + - Rejects tweets with overwhelming URLs, control chars, or binary data, + - Basic anomaly: extremely repetitive/unusual text. + """ + text = tweet.get("full_text") or tweet.get("text") + if not text or not isinstance(text, str): + return False + + # Length checks + if len(text) < min_length or len(text) > max_length: + return False + + # Attempt to encode as UTF-8 and check printable chars + try: + text.encode("utf-8") + except Exception: + return False + + # Reject text containing null bytes or lots of control chars + if "\x00" in text or len([c for c in text if ord(c) < 32 and c not in "\n\r\t"]) > 3: + return False + + # Limit urls + url_count = len(re.findall(r'https?://\S+', text)) + if url_count > 3: + return False + + # High unicode/surrogates + if len([c for c in text if ord(c) > 0xFFFF]) > 5: + return False + + # Very repetitive or single-character spam (anomaly) + if len(set(text.strip())) < 4 and len(text) > 40: + return False + + # For tweets that look like "RT RT RT... [50x]" + if re.match(r'^(\w{1,5})( \1){6,}', text): + return False + + # Passed all checks + return True + + def _deduplicate_tweets(self, tweets: List[dict]) -> List[dict]: + """Deduplicate tweets by full_text/content and created_at field.""" + seen = set() + unique = [] + for tweet in tweets: + text = tweet.get("full_text") or tweet.get("text") + created_at = tweet.get("created_at", "") + k = (text, created_at) + if k not in seen: + seen.add(k) + unique.append(tweet) + return unique + def create_social_profile_tweepy(self, map_name: str, map_description: str, users: List[str], outdir: str): """Create social profile with tweepy as tweet source @@ -29,13 +89,22 @@ def create_social_profile_tweepy(self, map_name: str, map_description: str, user """ lookup_amount = 10000 for user in users: - tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in - self.utils.user_lookup(user, lookup_amount)] + tweets = [] + for tweet in self.utils.user_lookup(user, lookup_amount): + cleaned_text = p.clean(tweet["full_text"]) + record = { + "text": cleaned_text, + "created_at": tweet["created_at"] + } + # Apply validation + if not self._is_valid_tweet(record, min_length=10): + continue + record["_provenance"] = "scraped" + tweets.append(record) + + tweets = self._deduplicate_tweets(tweets) with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: - for idx, tweet in enumerate(tweets): - if len(tweet["text"]) < 10: - tweets.pop(idx) - continue + for tweet in tweets: writer.write(tweet) self.atlas.map_text(data=tweets, @@ -66,28 +135,56 @@ def create_social_profile_sns(self, """ all_tweets = [] - for user in tqdm(users): + found_valid_on_disk = False try: logger.info(f"Loading {user}'s tweets from disk") data_path = os.path.join(outdir, f"{user}_tweets.jsonl") with jsonlines.open(data_path, mode="r") as tweets: for tweet in tweets: + # Ensure every tweet loaded from disk is validated and cleaned! + if "full_text" in tweet and tweet["full_text"]: + tweet["full_text"] = p.clean(tweet["full_text"]) + elif "text" in tweet: + tweet["full_text"] = p.clean(tweet["text"]) + else: + continue + if not self._is_valid_tweet(tweet): + logger.warning(f"Invalid or anomalous tweet from disk for user {user}, skipping.") + continue + tweet["_provenance"] = "disk" + tweet["user"] = user all_tweets.append(tweet) + found_valid_on_disk = True except BaseException: - logger.info(f"Not on disk! scraping {users}'s tweets now") + logger.info(f"Not on disk! scraping {user}'s tweets now") + + if not found_valid_on_disk: + # Scrape tweets tweets = self.utils.user_lookup_sns(user, 10000) + scraped_records = [] + for tweet in tweets: + cleaned_text = p.clean(tweet["full_text"]) + record = { + "full_text": cleaned_text, + "created_at": str(tweet["created_at"]), + "_provenance": "scraped", + "user": user + } + if not self._is_valid_tweet(record): + continue + scraped_records.append(record) + all_tweets.append(record) + scraped_records = self._deduplicate_tweets(scraped_records) with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: - for idx, tweet in enumerate(tweets): - tweet["full_text"] = p.clean(tweet["full_text"]) - if len(tweet["full_text"]) > 30: - tweet["created_at"] = str(tweet["created_at"]) - all_tweets.append(tweet) - writer.write(tweet) + for tweet in scraped_records: + writer.write(tweet) + # Deduplicate corpus + all_tweets = self._deduplicate_tweets(all_tweets) - for idx, tweet in enumerate(all_tweets): - tweet["id"] = str(idx) + for idx, tweet in enumerate(all_tweets): + tweet["id"] = str(idx) if topics: n_cluster_docs = [40] @@ -144,4 +241,4 @@ def create_social_profile_sns(self, map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand", users=["JoeBiden", "POTUS"], topics=True, - embedding_path="embeddings/JoeBiden.npy") + embedding_path="embeddings/JoeBiden.npy") \ No newline at end of file