Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions nexus/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,26 @@
from sklearn.cluster import KMeans
import os
from dotenv import load_dotenv
import re

load_dotenv()


def safe_filename(s: str) -> str:
"""Sanitize a string to be safe for filenames: allow [a-zA-Z0-9_.-], replace others with _."""
return re.sub(r'[^a-zA-Z0-9_\.\-]', '_', s)

def _safe_join(base: str, *paths) -> str:
"""
Safely join one or more path components to the base directory, preventing path traversal.
Returns the joined absolute path, or raises ValueError if traversal detected.
"""
base_abspath = os.path.abspath(base)
path = os.path.abspath(os.path.join(base_abspath, *paths))
if not os.path.commonpath([base_abspath, path]) == base_abspath:
raise ValueError("Detected path traversal attempt (outside configured directory)")
return path

class Profile:
def __init__(self):
self.utils = Utils()
Expand All @@ -28,11 +44,17 @@ def create_social_profile_tweepy(self, map_name: str, map_description: str, user
:param outdir: specified directory of where the tweets(in JSON format) shoudl go
"""
lookup_amount = 10000
# Sanitize and normalize outdir
outdir = os.path.abspath(outdir) if outdir else os.path.abspath("data")
if not os.path.exists(outdir):
os.makedirs(outdir, exist_ok=True)
for user in users:
sanitized_user = safe_filename(user)
tweets = [{"text": p.clean(tweet["full_text"]), "created_at": tweet["created_at"]} for tweet in
self.utils.user_lookup(user, lookup_amount)]
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
for idx, tweet in enumerate(tweets):
tweet_file_path = _safe_join(outdir, f'{sanitized_user}_tweets.jsonl')
with jsonlines.open(tweet_file_path, mode='a') as writer:
for idx, tweet in enumerate(tweets[:]):
if len(tweet["text"]) < 10:
tweets.pop(idx)
continue
Expand Down Expand Up @@ -66,26 +88,31 @@ def create_social_profile_sns(self,
"""
all_tweets = []

# Sanitize and normalize outdir
outdir = os.path.abspath(outdir) if outdir else os.path.abspath("data")
if not os.path.exists(outdir):
os.makedirs(outdir, exist_ok=True)

for user in tqdm(users):
sanitized_user = safe_filename(user)
try:
logger.info(f"Loading {user}'s tweets from disk")
data_path = os.path.join(outdir, f"{user}_tweets.jsonl")
data_path = _safe_join(outdir, f"{sanitized_user}_tweets.jsonl")
with jsonlines.open(data_path, mode="r") as tweets:
for tweet in tweets:
all_tweets.append(tweet)
except BaseException:
logger.info(f"Not on disk! scraping {users}'s tweets now")
tweets = self.utils.user_lookup_sns(user, 10000)
with jsonlines.open(f'{outdir}/{user}_tweets.jsonl', mode='a') as writer:
tweet_file_path = _safe_join(outdir, f'{sanitized_user}_tweets.jsonl')
with jsonlines.open(tweet_file_path, mode='a') as writer:
for idx, tweet in enumerate(tweets):
tweet["full_text"] = p.clean(tweet["full_text"])
if len(tweet["full_text"]) > 30:
tweet["created_at"] = str(tweet["created_at"])
all_tweets.append(tweet)
writer.write(tweet)


for idx, tweet in enumerate(all_tweets):
tweet["id"] = str(idx)

Expand All @@ -94,7 +121,12 @@ def create_social_profile_sns(self,
for n_clusters in n_cluster_docs:
logger.info(f"computing {n_clusters} cluster layer")
try:
with open(f"data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}", "r") as f:
cluster_labels_dir = os.path.abspath("data/cluster_labels")
if not os.path.exists(cluster_labels_dir):
os.makedirs(cluster_labels_dir, exist_ok=True)
sanitized_user0 = safe_filename(users[0])
cluster_path = _safe_join(cluster_labels_dir, f"{sanitized_user0}_id_to_cluster_label_{n_clusters}")
with open(cluster_path, "r") as f:
id_to_cluster_label = json.load(f)
logger.info("Loaded all resources from disk")
print(id_to_cluster_label[-1])
Expand All @@ -117,7 +149,12 @@ def create_social_profile_sns(self,
for datum, cluster_id in zip(all_tweets, [int(i) for i in list(kmeans.labels_)]):
id_to_cluster_label[datum['id']] = cluster_id

with open(f'data/cluster_labels/{users[0]}_id_to_cluster_label_{n_clusters}', 'w') as f:
cluster_labels_dir = os.path.abspath("data/cluster_labels")
if not os.path.exists(cluster_labels_dir):
os.makedirs(cluster_labels_dir, exist_ok=True)
sanitized_user0 = safe_filename(users[0])
cluster_path = _safe_join(cluster_labels_dir, f"{sanitized_user0}_id_to_cluster_label_{n_clusters}")
with open(cluster_path, 'w') as f:
json.dump(id_to_cluster_label, f)
print(len(all_tweets))
logger.info("Computing Topics")
Expand All @@ -144,4 +181,4 @@ def create_social_profile_sns(self,
map_description="A social profile of the latest POTUS Joe Biden, with Nomic's text embedder created by Yuvanesh Anand",
users=["JoeBiden", "POTUS"],
topics=True,
embedding_path="embeddings/JoeBiden.npy")
embedding_path="embeddings/JoeBiden.npy")