diff --git a/nexus/profile.py b/nexus/profile.py index bceb62d..1f26684 100644 --- a/nexus/profile.py +++ b/nexus/profile.py @@ -10,10 +10,26 @@ from sklearn.cluster import KMeans import os from dotenv import load_dotenv +import re load_dotenv() +def safe_filename(s: str) -> str: + """Sanitize a string to be safe for filenames: allow [a-zA-Z0-9_.-], replace others with _.""" + return re.sub(r'[^a-zA-Z0-9_\.\-]', '_', s) + +def _safe_join(base: str, *paths) -> str: + """ + Safely join one or more path components to the base directory, preventing path traversal. + Returns the joined absolute path, or raises ValueError if traversal detected. + """ + base_abspath = os.path.abspath(base) + path = os.path.abspath(os.path.join(base_abspath, *paths)) + if not os.path.commonpath([base_abspath, path]) == base_abspath: + raise ValueError("Detected path traversal attempt (outside configured directory)") + return path + class Profile: def __init__(self): self.utils = Utils() @@ -28,11 +44,17 @@ def create_social_profile_tweepy(self, map_name: str, map_description: str, user :param outdir: specified directory of where the tweets(in JSON format) shoudl go """ lookup_amount = 10000 + # Sanitize and normalize outdir + outdir = os.path.abspath(outdir) if outdir else os.path.abspath("data") + if not os.path.exists(outdir): + os.makedirs(outdir, exist_ok=True) for user in users: + sanitized_user = safe_filename(user) tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in self.utils.user_lookup(user, lookup_amount)] - with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: - for idx, tweet in enumerate(tweets): + tweet_file_path = _safe_join(outdir, f'{sanitized_user}_tweets.jsonl') + with jsonlines.open(tweet_file_path, mode='a') as writer: + for idx, tweet in enumerate(tweets[:]): if len(tweet["text"]) < 10: tweets.pop(idx) continue @@ -66,18 +88,24 @@ def create_social_profile_sns(self, """ all_tweets = [] + # Sanitize and normalize outdir + outdir = os.path.abspath(outdir) if outdir else os.path.abspath("data") + if not os.path.exists(outdir): + os.makedirs(outdir, exist_ok=True) for user in tqdm(users): + sanitized_user = safe_filename(user) try: logger.info(f"Loading {user}'s tweets from disk") - data_path = os.path.join(outdir, f"{user}_tweets.jsonl") + data_path = _safe_join(outdir, f"{sanitized_user}_tweets.jsonl") with jsonlines.open(data_path, mode="r") as tweets: for tweet in tweets: all_tweets.append(tweet) except BaseException: logger.info(f"Not on disk! scraping {users}'s tweets now") tweets = self.utils.user_lookup_sns(user, 10000) - with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer: + tweet_file_path = _safe_join(outdir, f'{sanitized_user}_tweets.jsonl') + with jsonlines.open(tweet_file_path, mode='a') as writer: for idx, tweet in enumerate(tweets): tweet["full_text"] = p.clean(tweet["full_text"]) if len(tweet["full_text"]) > 30: @@ -85,7 +113,6 @@ def create_social_profile_sns(self, all_tweets.append(tweet) writer.write(tweet) - for idx, tweet in enumerate(all_tweets): tweet["id"] = str(idx) @@ -94,7 +121,12 @@ def create_social_profile_sns(self, for n_clusters in n_cluster_docs: logger.info(f"computing {n_clusters} cluster layer") try: - with open(f"data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}", "r") as f: + cluster_labels_dir = os.path.abspath("data/cluster_labels") + if not os.path.exists(cluster_labels_dir): + os.makedirs(cluster_labels_dir, exist_ok=True) + sanitized_user0 = safe_filename(users[0]) + cluster_path = _safe_join(cluster_labels_dir, f"{sanitized_user0}_id_to_cluster_label_{n_clusters}") + with open(cluster_path, "r") as f: id_to_cluster_label = json.load(f) logger.info("Loaded all resources from disk") print(id_to_cluster_label[-1]) @@ -117,7 +149,12 @@ def create_social_profile_sns(self, for datum, cluster_id in zip(all_tweets, [int(i) for i in list(kmeans.labels_)]): id_to_cluster_label[datum['id']] = cluster_id - with open(f'data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}', 'w') as f: + cluster_labels_dir = os.path.abspath("data/cluster_labels") + if not os.path.exists(cluster_labels_dir): + os.makedirs(cluster_labels_dir, exist_ok=True) + sanitized_user0 = safe_filename(users[0]) + cluster_path = _safe_join(cluster_labels_dir, f"{sanitized_user0}_id_to_cluster_label_{n_clusters}") + with open(cluster_path, 'w') as f: json.dump(id_to_cluster_label, f) print(len(all_tweets)) logger.info("Computing Topics") @@ -144,4 +181,4 @@ def create_social_profile_sns(self, map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand", users=["JoeBiden", "POTUS"], topics=True, - embedding_path="embeddings/JoeBiden.npy") + embedding_path="embeddings/JoeBiden.npy") \ No newline at end of file