diff --git a/app/routes/account_settings.py b/app/routes/account_settings.py index 47349a72b..f89ba9935 100755 --- a/app/routes/account_settings.py +++ b/app/routes/account_settings.py @@ -2,30 +2,24 @@ from models import User from utils.delete import delete_user -from utils.log import Log +from utils.route_guards import login_required account_settings_blueprint = Blueprint("account_settings", __name__) @account_settings_blueprint.route("/account-settings", methods=["GET", "POST"]) +@login_required("account settings", redirect_to="/login/redirect=&account-settings") def account_settings(): - if "username" in session: - user = User.query.filter_by(username=session["username"]).first() + user = User.query.filter_by(username=session["username"]).first() - if not user: - return redirect("/") + if not user: + return redirect("/") - if request.method == "POST": - delete_user(user.username) - return redirect("/") + if request.method == "POST": + delete_user(user.username) + return redirect("/") - return render_template( - "account_settings.html", - user=(user.username,), - ) - else: - Log.error( - f"{request.remote_addr} tried to reach account settings without being logged in" - ) - - return redirect("/login/redirect=&account-settings") + return render_template( + "account_settings.html", + user=(user.username,), + ) diff --git a/app/routes/admin_panel.py b/app/routes/admin_panel.py index b5f4633b9..ac1022775 100755 --- a/app/routes/admin_panel.py +++ b/app/routes/admin_panel.py @@ -1,32 +1,16 @@ -from flask import Blueprint, redirect, render_template, request, session +from flask import Blueprint, render_template, session -from models import User from utils.log import Log +from utils.route_guards import admin_required admin_panel_blueprint = Blueprint("admin_panel", __name__) @admin_panel_blueprint.route("/admin") +@admin_required("admin panel") def admin_panel(): - if "username" in session: - user = User.query.filter_by(username=session["username"]).first() + Log.info(f"Admin: {session['username']} reached to the admin panel") - if not user: - return redirect("/") + Log.info("Rendering admin_panel.html: params: None") - if user.role == "admin": - Log.info(f"Admin: {session['username']} reached to the admin panel") - - Log.info("Rendering admin_panel.html: params: None") - - return render_template("admin_panel.html") - else: - Log.error( - f"{request.remote_addr} tried to reach admin panel without being admin" - ) - - return redirect("/") - else: - Log.error(f"{request.remote_addr} tried to reach admin panel being logged in") - - return redirect("/") + return render_template("admin_panel.html") diff --git a/app/routes/admin_panel_comments.py b/app/routes/admin_panel_comments.py index 78dfb02f8..c61319914 100755 --- a/app/routes/admin_panel_comments.py +++ b/app/routes/admin_panel_comments.py @@ -1,52 +1,34 @@ from flask import ( Blueprint, - redirect, render_template, - request, session, ) -from models import Comment, User +from models import Comment from utils.log import Log from utils.paginate import paginate_query +from utils.route_guards import admin_required admin_panel_comments_blueprint = Blueprint("admin_panel_comments", __name__) @admin_panel_comments_blueprint.route("/admin/comments", methods=["GET", "POST"]) +@admin_required("comment admin panel") def admin_panel_comments(): - if "username" in session: - user = User.query.filter_by(username=session["username"]).first() - - if not user or user.role != "admin": - Log.error( - f"{request.remote_addr} tried to reach comment admin panel without being admin" - ) - return redirect("/") - - Log.info(f"Admin: {session['username']} reached to comments admin panel") - - query = Comment.query.order_by(Comment.time_stamp.desc()) - comments_objects, page, total_pages = paginate_query(query) - - comments = [ - (c.id, c.post_id, c.comment, c.username, c.time_stamp) - for c in comments_objects - ] - - Log.info( - f"Rendering admin_panel_comments.html: params: comments={len(comments)}" - ) - - return render_template( - "admin_panel_comments.html", - comments=comments, - page=page, - total_pages=total_pages, - ) - else: - Log.error( - f"{request.remote_addr} tried to reach comment admin panel being logged in" - ) - - return redirect("/") + Log.info(f"Admin: {session['username']} reached to comments admin panel") + + query = Comment.query.order_by(Comment.time_stamp.desc()) + comments_objects, page, total_pages = paginate_query(query) + + comments = [ + (c.id, c.post_id, c.comment, c.username, c.time_stamp) for c in comments_objects + ] + + Log.info(f"Rendering admin_panel_comments.html: params: comments={len(comments)}") + + return render_template( + "admin_panel_comments.html", + comments=comments, + page=page, + total_pages=total_pages, + ) diff --git a/app/routes/admin_panel_posts.py b/app/routes/admin_panel_posts.py index 92cd3f789..78cb28bd5 100755 --- a/app/routes/admin_panel_posts.py +++ b/app/routes/admin_panel_posts.py @@ -1,66 +1,51 @@ from flask import ( Blueprint, - redirect, render_template, - request, session, ) -from models import Post, User +from models import Post from utils.log import Log from utils.paginate import paginate_query +from utils.route_guards import admin_required admin_panel_posts_blueprint = Blueprint("admin_panel_posts", __name__) @admin_panel_posts_blueprint.route("/admin/posts", methods=["GET", "POST"]) +@admin_required("post admin panel") def admin_panel_posts(): - if "username" in session: - user = User.query.filter_by(username=session["username"]).first() - - if not user or user.role != "admin": - Log.error( - f"{request.remote_addr} tried to reach post admin panel without being admin" - ) - return redirect("/") - - Log.info(f"Admin: {session['username']} reached to posts admin panel") - - query = Post.query.order_by(Post.time_stamp.desc()) - posts_objects, page, total_pages = paginate_query(query) - - posts = [ - ( - p.id, - p.title, - p.tags, - p.content, - p.banner, - p.author, - p.views, - p.time_stamp, - p.last_edit_time_stamp, - p.category, - p.url_id, - p.abstract, - ) - for p in posts_objects - ] - - Log.info( - f"Rendering dashboard.html: params: posts={len(posts)} and show_posts=True" + Log.info(f"Admin: {session['username']} reached to posts admin panel") + + query = Post.query.order_by(Post.time_stamp.desc()) + posts_objects, page, total_pages = paginate_query(query) + + posts = [ + ( + p.id, + p.title, + p.tags, + p.content, + p.banner, + p.author, + p.views, + p.time_stamp, + p.last_edit_time_stamp, + p.category, + p.url_id, + p.abstract, ) - - return render_template( - "dashboard.html", - posts=posts, - show_posts=True, - page=page, - total_pages=total_pages, - ) - else: - Log.error( - f"{request.remote_addr} tried to reach post admin panel being logged in" - ) - - return redirect("/") + for p in posts_objects + ] + + Log.info( + f"Rendering dashboard.html: params: posts={len(posts)} and show_posts=True" + ) + + return render_template( + "dashboard.html", + posts=posts, + show_posts=True, + page=page, + total_pages=total_pages, + ) diff --git a/app/routes/admin_panel_users.py b/app/routes/admin_panel_users.py index f003d9a57..12bf8c378 100755 --- a/app/routes/admin_panel_users.py +++ b/app/routes/admin_panel_users.py @@ -1,8 +1,7 @@ from flask import ( Blueprint, - redirect, - render_template, request, + render_template, session, ) @@ -11,77 +10,54 @@ from utils.delete import delete_user from utils.log import Log from utils.paginate import paginate_query +from utils.route_guards import admin_required admin_panel_users_blueprint = Blueprint("admin_panel_users", __name__) @admin_panel_users_blueprint.route("/admin/users", methods=["GET", "POST"]) +@admin_required("user admin panel") def admin_panel_users(): - if "username" in session: - Log.info(f"Admin: {session['username']} reached to users admin panel") - - user = User.query.filter_by(username=session["username"]).first() + Log.info(f"Admin: {session['username']} reached to users admin panel") - if not user: - return redirect("/") - - if user.role != "admin": - Log.error( - f"{request.remote_addr} tried to reach user admin panel without being admin" + if request.method == "POST": + if "user_delete_button" in request.form: + Log.info( + f"Admin: {session['username']} deleted user: {request.form['username']}" ) - return redirect("/") - - if request.method == "POST": - if "user_delete_button" in request.form: - Log.info( - f"Admin: {session['username']} deleted user: {request.form['username']}" - ) - - delete_user(request.form["username"]) - if "user_role_change_button" in request.form: - Log.info( - f"Admin: {session['username']} changed {request.form['username']}'s role" - ) + delete_user(request.form["username"]) - change_user_role(request.form["username"]) - - if user.role == "admin": - query = User.query - users_objects, page, total_pages = paginate_query(query) - - users = [ - ( - u.user_id, - u.username, - u.email, - u.password, - u.profile_picture, - u.role, - u.points, - u.time_stamp, - u.is_verified, - ) - for u in users_objects - ] - - Log.info(f"Rendering admin_panel_users.html: params: users={len(users)}") - - return render_template( - "admin_panel_users.html", - users=users, - page=page, - total_pages=total_pages, - ) - else: - Log.error( - f"{request.remote_addr} tried to reach user admin panel without being admin" + if "user_role_change_button" in request.form: + Log.info( + f"Admin: {session['username']} changed {request.form['username']}'s role" ) - return redirect("/") - else: - Log.error( - f"{request.remote_addr} tried to reach user admin panel being logged in" + change_user_role(request.form["username"]) + + query = User.query + users_objects, page, total_pages = paginate_query(query) + + users = [ + ( + u.user_id, + u.username, + u.email, + u.password, + u.profile_picture, + u.role, + u.points, + u.time_stamp, + u.is_verified, ) + for u in users_objects + ] + + Log.info(f"Rendering admin_panel_users.html: params: users={len(users)}") - return redirect("/") + return render_template( + "admin_panel_users.html", + users=users, + page=page, + total_pages=total_pages, + ) diff --git a/app/routes/change_password.py b/app/routes/change_password.py index da1732e1d..cf255ed8b 100755 --- a/app/routes/change_password.py +++ b/app/routes/change_password.py @@ -12,11 +12,17 @@ from utils.flash_message import flash_message from utils.forms.change_password_form import ChangePasswordForm from utils.log import Log +from utils.route_guards import login_required change_password_blueprint = Blueprint("change_password", __name__) @change_password_blueprint.route("/change-password", methods=["GET", "POST"]) +@login_required( + "change password", + redirect_to="/login/redirect=change-password", + flash_page="change_password", +) def change_password(): """ This function is the route for the change password page. @@ -29,80 +35,68 @@ def change_password(): render_template: a rendered template with the form """ - if "username" in session: - form = ChangePasswordForm(request.form) + form = ChangePasswordForm(request.form) + + if request.method == "POST": + old_password = request.form["old_password"] + password = request.form["password"] + password_confirm = request.form["password_confirm"] - if request.method == "POST": - old_password = request.form["old_password"] - password = request.form["password"] - password_confirm = request.form["password_confirm"] + user = User.query.filter_by(username=session["username"]).first() - user = User.query.filter_by(username=session["username"]).first() + if not user: + flash_message( + page="change_password", + message="login", + category="error", + language=session["language"], + ) + return redirect("/login/redirect=change-password") - if not user: + if encryption.verify(old_password, user.password): + if old_password == password: flash_message( page="change_password", - message="login", + message="same", category="error", language=session["language"], ) - return redirect("/login/redirect=change-password") - - if encryption.verify(old_password, user.password): - if old_password == password: - flash_message( - page="change_password", - message="same", - category="error", - language=session["language"], - ) - - if password != password_confirm: - flash_message( - page="change_password", - message="match", - category="error", - language=session["language"], - ) - - if old_password != password and password == password_confirm: - user.password = encryption.hash(password) - db.session.commit() - - Log.success( - f'User: "{session["username"]}" changed his password', - ) - - session.clear() - flash_message( - page="change_password", - message="success", - category="success", - language=session["language"], - ) - - return redirect("/login/redirect=&") - else: + + if password != password_confirm: flash_message( page="change_password", - message="old", + message="match", category="error", language=session["language"], ) - return render_template( - "change_password.html", - form=form, - ) - else: - Log.error( - f"{request.remote_addr} tried to change his password without logging in" - ) - flash_message( - page="change_password", - message="login", - category="error", - language=session["language"], - ) - - return redirect("/login/redirect=change-password") + if old_password != password and password == password_confirm: + user.password = encryption.hash(password) + db.session.commit() + + Log.success( + f'User: "{session["username"]}" changed his password', + ) + + language = session.get("language", "en") + session.clear() + flash_message( + page="change_password", + message="success", + category="success", + language=language, + ) + + return redirect("/login/redirect=&") + else: + flash_message( + page="change_password", + message="old", + category="error", + language=session["language"], + ) + + return render_template( + "change_password.html", + form=form, + ) diff --git a/app/routes/change_profile_picture.py b/app/routes/change_profile_picture.py index 77e6f85eb..477334d51 100755 --- a/app/routes/change_profile_picture.py +++ b/app/routes/change_profile_picture.py @@ -11,6 +11,7 @@ from utils.flash_message import flash_message from utils.forms.change_profile_picture_form import ChangeProfilePictureForm from utils.log import Log +from utils.route_guards import login_required change_profile_picture_blueprint = Blueprint("change_profile_picture", __name__) @@ -18,6 +19,11 @@ @change_profile_picture_blueprint.route( "/change-profile-picture", methods=["GET", "POST"] ) +@login_required( + "change profile picture", + redirect_to="/login/redirect=change-profile-picture", + flash_page="change_profile_picture", +) def change_profile_picture(): """ This function is the route for the change profile picture page. @@ -30,45 +36,32 @@ def change_profile_picture(): render_template: a rendered template with the form """ - if "username" in session: - form = ChangeProfilePictureForm(request.form) - - if request.method == "POST": - new_profile_picture_seed = request.form["new_profile_picture_seed"] - new_profile_picture = f"https://api.dicebear.com/7.x/identicon/svg?seed={new_profile_picture_seed}&radius=10" + form = ChangeProfilePictureForm(request.form) - user = User.query.filter_by(username=session["username"]).first() + if request.method == "POST": + new_profile_picture_seed = request.form["new_profile_picture_seed"] + new_profile_picture = f"https://api.dicebear.com/7.x/identicon/svg?seed={new_profile_picture_seed}&radius=10" - if user: - user.profile_picture = new_profile_picture - db.session.commit() + user = User.query.filter_by(username=session["username"]).first() - Log.success( - f"User: {session['username']} changed his profile picture", - ) + if user: + user.profile_picture = new_profile_picture + db.session.commit() - flash_message( - page="change_profile_picture", - message="success", - category="success", - language=session["language"], - ) + Log.success( + f"User: {session['username']} changed his profile picture", + ) - return redirect("/account-settings") + flash_message( + page="change_profile_picture", + message="success", + category="success", + language=session["language"], + ) - return render_template( - "change_profile_picture.html", - form=form, - ) - else: - Log.error( - f"{request.remote_addr} tried to change his profile picture without logging in" - ) - flash_message( - page="change_profile_picture", - message="login", - category="error", - language=session["language"], - ) + return redirect("/account-settings") - return redirect("/login/redirect=change-profile-picture") + return render_template( + "change_profile_picture.html", + form=form, + ) diff --git a/app/routes/change_username.py b/app/routes/change_username.py index 2a3adc649..b5c2c1f2a 100755 --- a/app/routes/change_username.py +++ b/app/routes/change_username.py @@ -6,17 +6,23 @@ session, ) from sqlalchemy import func +from utils.forms.change_username_form import ChangeUserNameForm from database import db from models import Comment, Post, User from utils.flash_message import flash_message -from utils.forms.change_user_name_form import ChangeUserNameForm from utils.log import Log +from utils.route_guards import login_required change_username_blueprint = Blueprint("change_username", __name__) @change_username_blueprint.route("/change-username", methods=["GET", "POST"]) +@login_required( + "change username", + redirect_to="/login/redirect=change-username", + flash_page="change_username", +) def change_username(): """ This function is the route for the change username page. @@ -29,73 +35,58 @@ def change_username(): render_template: a rendered template with the form """ - if "username" in session: - form = ChangeUserNameForm(request.form) - - if request.method == "POST": - new_username = request.form["new_username"] - new_username = new_username.replace(" ", "") - - # Check if new username already exists - existing_user = User.query.filter( - func.lower(User.username) == new_username.lower() - ).first() - - if not existing_user: - old_username = session["username"] - - # Update username in users table - user = User.query.filter_by(username=old_username).first() - if user: - user.username = new_username - - # Update author in posts table - Post.query.filter_by(author=old_username).update( - {"author": new_username} - ) - - # Update username in comments table - Comment.query.filter_by(username=old_username).update( - {"username": new_username} - ) - - db.session.commit() - - Log.success( - f"User: {old_username} changed his username to {new_username}", - ) - - session["username"] = new_username - flash_message( - page="change_username", - message="success", - category="success", - language=session["language"], - ) - - return redirect("/account-settings") - else: - Log.error(f'User: "{new_username}" already exists') - flash_message( - page="change_username", - message="exists", - category="error", - language=session["language"], - ) - - return render_template( - "change_username.html", - form=form, - ) - else: - Log.error( - f"{request.remote_addr} tried to change his username without logging in" - ) - flash_message( - page="change_username", - message="login", - category="error", - language=session["language"], - ) - - return redirect("/login/redirect=change-username") + form = ChangeUserNameForm(request.form) + + if request.method == "POST": + new_username = request.form["new_username"] + new_username = new_username.replace(" ", "") + + # Check if new username already exists + existing_user = User.query.filter( + func.lower(User.username) == new_username.lower() + ).first() + + if not existing_user: + old_username = session["username"] + + # Update username in users table + user = User.query.filter_by(username=old_username).first() + if user: + user.username = new_username + + # Update author in posts table + Post.query.filter_by(author=old_username).update({"author": new_username}) + + # Update username in comments table + Comment.query.filter_by(username=old_username).update( + {"username": new_username} + ) + + db.session.commit() + + Log.success( + f"User: {old_username} changed his username to {new_username}", + ) + + session["username"] = new_username + flash_message( + page="change_username", + message="success", + category="success", + language=session["language"], + ) + + return redirect("/account-settings") + else: + Log.error(f'User: "{new_username}" already exists') + flash_message( + page="change_username", + message="taken", + category="error", + language=session["language"], + ) + + return render_template( + "change_username.html", + form=form, + ) diff --git a/app/routes/create_post.py b/app/routes/create_post.py index ef21b8411..978eb76bd 100755 --- a/app/routes/create_post.py +++ b/app/routes/create_post.py @@ -13,12 +13,18 @@ from utils.forms.create_post_form import CreatePostForm from utils.generate_url_id_from_post import generate_url_id from utils.log import Log +from utils.route_guards import login_required from utils.time import current_time_stamp create_post_blueprint = Blueprint("create_post", __name__) @create_post_blueprint.route("/create-post", methods=["GET", "POST"]) +@login_required( + "create post", + redirect_to="/login/redirect=&create-post", + flash_page="create_post", +) def create_post(): """ This function creates a new post for the user. @@ -33,69 +39,59 @@ def create_post(): 401: If the user is not authenticated. """ - if "username" in session: - form = CreatePostForm(request.form) + form = CreatePostForm(request.form) - if request.method == "POST": - post_banner = request.files["post_banner"].read() + if request.method == "POST": + post_banner = request.files["post_banner"].read() - if not form.validate(): - flash_message( - page="create_post", - message="empty", - category="error", - language=session["language"], - ) - Log.error( - f'User: "{session["username"]}" tried to create a post with ' - f"invalid data: {form.errors}", - ) - else: - post_title = form.post_title.data - post_tags = form.post_tags.data - post_abstract = form.post_abstract.data - post_content = form.post_content.data - post_category = form.post_category.data + if not form.validate(): + flash_message( + page="create_post", + message="empty", + category="error", + language=session["language"], + ) + Log.error( + f'User: "{session["username"]}" tried to create a post with ' + f"invalid data: {form.errors}", + ) + else: + post_title = form.post_title.data + post_tags = form.post_tags.data + post_abstract = form.post_abstract.data + post_content = form.post_content.data + post_category = form.post_category.data - new_post = Post( - title=post_title, - tags=post_tags, - content=post_content, - banner=post_banner, - author=session["username"], - views=0, - time_stamp=current_time_stamp(), - last_edit_time_stamp=current_time_stamp(), - category=post_category, - url_id=generate_url_id(), - abstract=post_abstract, - ) - db.session.add(new_post) - db.session.commit() + new_post = Post( + title=post_title, + tags=post_tags, + content=post_content, + banner=post_banner, + author=session["username"], + views=0, + time_stamp=current_time_stamp(), + last_edit_time_stamp=current_time_stamp(), + category=post_category, + url_id=generate_url_id(), + abstract=post_abstract, + ) + db.session.add(new_post) + db.session.commit() - Log.success( - f'Post: "{post_title}" posted by "{session["username"]}"', - ) + Log.success( + f'Post: "{post_title}" posted by "{session["username"]}"', + ) - add_points(20, session["username"]) - flash_message( - page="create_post", - message="success", - category="success", - language=session["language"], - ) - return redirect("/") + add_points(20, session["username"]) + flash_message( + page="create_post", + message="success", + category="success", + language=session["language"], + ) + return redirect("/") - return render_template( - "create_post.html", - form=form, - ) - else: - Log.error(f"{request.remote_addr} tried to create a new post without login") - flash_message( - page="create_post", - message="login", - category="error", - language=session["language"], - ) - return redirect("/login/redirect=&create-post") + return render_template( + "create_post.html", + form=form, + ) diff --git a/app/routes/dashboard.py b/app/routes/dashboard.py index 3b6b39253..e154c7e7e 100755 --- a/app/routes/dashboard.py +++ b/app/routes/dashboard.py @@ -12,94 +12,88 @@ from models import Comment, Post from utils.delete import delete_post -from utils.flash_message import flash_message from utils.log import Log from utils.paginate import paginate_query +from utils.route_guards import login_required dashboard_blueprint = Blueprint("dashboard", __name__) @dashboard_blueprint.route("/dashboard/", methods=["GET", "POST"]) +@login_required( + "dashboard", + redirect_to="/login/redirect=&dashboard&user", + flash_page="dashboard", +) def dashboard(username): - if "username" in session: - if session["username"].lower() == username.lower(): - if request.method == "POST": - if "post_delete_button" in request.form: - delete_post(request.form["post_id"], session.get("username")) - - return ( - redirect(url_for("dashboard.dashboard", username=username)), - 301, - ) - - query = Post.query.filter_by(author=session["username"]).order_by( - Post.time_stamp.desc() - ) - posts_objects, page, total_pages = paginate_query(query) - - posts = [ - [ - p.id, - p.title, - p.tags, - p.content, - p.banner, - p.author, - p.views, - p.time_stamp, - p.last_edit_time_stamp, - p.category, - p.url_id, - p.abstract, - ] - for p in posts_objects + if session["username"].lower() == username.lower(): + if request.method == "POST": + if "post_delete_button" in request.form: + delete_post(request.form["post_id"], session.get("username")) + + return ( + redirect(url_for("dashboard.dashboard", username=username)), + 301, + ) + + query = Post.query.filter_by(author=session["username"]).order_by( + Post.time_stamp.desc() + ) + posts_objects, page, total_pages = paginate_query(query) + + posts = [ + [ + p.id, + p.title, + p.tags, + p.content, + p.banner, + p.author, + p.views, + p.time_stamp, + p.last_edit_time_stamp, + p.category, + p.url_id, + p.abstract, ] + for p in posts_objects + ] - comments_objects = ( - Comment.query.filter(func.lower(Comment.username) == username.lower()) - .order_by(Comment.time_stamp.desc()) - .all() - ) + comments_objects = ( + Comment.query.filter(func.lower(Comment.username) == username.lower()) + .order_by(Comment.time_stamp.desc()) + .all() + ) - comments = [ - (c.id, c.post_id, c.comment, c.username, c.time_stamp) - for c in comments_objects - ] + comments = [ + (c.id, c.post_id, c.comment, c.username, c.time_stamp) + for c in comments_objects + ] + + show_posts = len(posts) > 0 + show_comments = len(comments) > 0 - show_posts = len(posts) > 0 - show_comments = len(comments) > 0 - - language = session.get("language") - translation_file = f"./translations/{language}.json" - - with open(translation_file, "r", encoding="utf-8") as file: - translations = load(file) - - for post in posts: - post[9] = translations["categories"][post[9].lower()] - - return render_template( - "/dashboard.html", - posts=posts, - comments=comments, - show_posts=show_posts, - show_comments=show_comments, - page=page, - total_pages=total_pages, - ) - else: - Log.error( - f'User: "{session["username"]}" tried to login to another users dashboard', - ) - - return redirect(f"/dashboard/{session['username'].lower()}") + language = session.get("language") + translation_file = f"./translations/{language}.json" + + with open(translation_file, "r", encoding="utf-8") as file: + translations = load(file) + + for post in posts: + post[9] = translations["categories"][post[9].lower()] + + return render_template( + "/dashboard.html", + posts=posts, + comments=comments, + show_posts=show_posts, + show_comments=show_comments, + page=page, + total_pages=total_pages, + ) else: - Log.error(f"{request.remote_addr} tried to access the dashboard without login") - flash_message( - page="dashboard", - message="login", - category="error", - language=session["language"], + Log.error( + f'User: "{session["username"]}" tried to login to another users dashboard', ) - return redirect("/login/redirect=&dashboard&user") + return redirect(f"/dashboard/{session['username'].lower()}") diff --git a/app/routes/edit_post.py b/app/routes/edit_post.py index 4c6bf7a8f..9cd682812 100755 --- a/app/routes/edit_post.py +++ b/app/routes/edit_post.py @@ -11,12 +11,18 @@ from utils.flash_message import flash_message from utils.forms.create_post_form import CreatePostForm from utils.log import Log +from utils.route_guards import login_required from utils.time import current_time_stamp edit_post_blueprint = Blueprint("edit_post", __name__) @edit_post_blueprint.route("/edit-post/", methods=["GET", "POST"]) +@login_required( + "edit post", + redirect_to=lambda url_id: f"/login/redirect=&edit-post&{url_id}", + flash_page="edit_post", +) def edit_post(url_id): """ This function handles the edit post route. @@ -32,92 +38,79 @@ def edit_post(url_id): abort(401): if the user is not authorized to edit the post """ - if "username" in session: - post = Post.query.filter_by(url_id=url_id).first() - - if post: - Log.success(f'POST: "{url_id}" FOUND') - - if ( - post.author == session["username"] - or session.get("user_role") == "admin" - ): - form = CreatePostForm(request.form) - form.post_title.data = post.title - form.post_tags.data = post.tags - form.post_abstract.data = post.abstract - form.post_content.data = post.content - form.post_category.data = post.category - - if request.method == "POST": - post_title = request.form["post_title"] - post_tags = request.form["post_tags"] - post_content = request.form["post_content"] - post_abstract = request.form["post_abstract"] - post_category = request.form["post_category"] - post_banner = request.files["post_banner"].read() - - if post_content == "" or post_abstract == "": - flash_message( - page="edit_post", - message="empty", - category="error", - language=session["language"], - ) - Log.error( - f'User: "{session["username"]}" tried to edit a post with empty content', - ) - else: - post.title = post_title - post.tags = post_tags - post.content = post_content - post.abstract = post_abstract - post.category = post_category - - if post_banner != b"": - post.banner = post_banner - - post.last_edit_time_stamp = current_time_stamp() - - db.session.commit() - - Log.success(f'Post: "{post_title}" edited') - flash_message( - page="edit_post", - message="success", - category="success", - language=session["language"], - ) - return redirect(f"/post/{post.url_id}") - - return render_template( - "/edit_post.html", - id=post.id, - title=post.title, - tags=post.tags, - content=post.content, - form=form, - ) - else: - flash_message( - page="edit_post", - message="author", - category="error", - language=session["language"], - ) - Log.error( - f'User: "{session["username"]}" tried to edit another authors post', - ) - return redirect("/") + post = Post.query.filter_by(url_id=url_id).first() + + if post: + Log.success(f'POST: "{url_id}" FOUND') + + if post.author == session["username"] or session.get("user_role") == "admin": + form = CreatePostForm(request.form) + form.post_title.data = post.title + form.post_tags.data = post.tags + form.post_abstract.data = post.abstract + form.post_content.data = post.content + form.post_category.data = post.category + + if request.method == "POST": + post_title = request.form["post_title"] + post_tags = request.form["post_tags"] + post_content = request.form["post_content"] + post_abstract = request.form["post_abstract"] + post_category = request.form["post_category"] + post_banner = request.files["post_banner"].read() + + if post_content == "" or post_abstract == "": + flash_message( + page="edit_post", + message="empty", + category="error", + language=session["language"], + ) + Log.error( + f'User: "{session["username"]}" tried to edit a post with empty content', + ) + else: + post.title = post_title + post.tags = post_tags + post.content = post_content + post.abstract = post_abstract + post.category = post_category + + if post_banner != b"": + post.banner = post_banner + + post.last_edit_time_stamp = current_time_stamp() + + db.session.commit() + + Log.success(f'Post: "{post_title}" edited') + flash_message( + page="edit_post", + message="success", + category="success", + language=session["language"], + ) + return redirect(f"/post/{post.url_id}") + + return render_template( + "/edit_post.html", + id=post.id, + title=post.title, + tags=post.tags, + content=post.content, + form=form, + ) else: - Log.error(f'Post: "{url_id}" not found') - return redirect("/not-found") + flash_message( + page="edit_post", + message="author", + category="error", + language=session["language"], + ) + Log.error( + f'User: "{session["username"]}" tried to edit another authors post', + ) + return redirect("/") else: - Log.error(f"{request.remote_addr} tried to edit post without login") - flash_message( - page="edit_post", - message="login", - category="error", - language=session["language"], - ) - return redirect(f"/login/redirect=&edit-post&{url_id}") + Log.error(f'Post: "{url_id}" not found') + return redirect("/not-found") diff --git a/app/routes/post.py b/app/routes/post.py index 05cf904fe..72f8d27e2 100755 --- a/app/routes/post.py +++ b/app/routes/post.py @@ -18,7 +18,6 @@ from utils.forms.comment_form import CommentForm from utils.generate_url_id_from_post import get_slug_from_post_title from utils.log import Log -from utils.sanitize_for_log import sanitize_for_log from utils.time import current_time_stamp post_blueprint = Blueprint("post", __name__) @@ -43,22 +42,22 @@ def post(url_id=None, slug=None): db.session.commit() if request.method == "POST": + if "username" not in session: + Log.error( + f'{request.remote_addr} attempted to submit to post "{url_id}" without login', + ) + return redirect(f"/login/redirect=&post&{url_id}") + if "post_delete_button" in request.form: if delete_post(post.id, session.get("username")): return redirect("/") + return redirect(url_for("post.post", url_id=url_id, slug=post_slug)) if "comment_delete_button" in request.form: - if delete_comment(request.form["comment_id"], session.get("username")): - return redirect(url_for("post.post", url_id=url_id)), 301 - - if "username" not in session: - safe_remote_addr = sanitize_for_log(request.remote_addr) - safe_url_id = sanitize_for_log(url_id) - Log.error( - f"{safe_remote_addr} tried to comment on post: " - f'"{safe_url_id}" without logging in', - ) - return redirect(f"/login/redirect=&post&{url_id}") + delete_comment(request.form["comment_id"], session.get("username")) + return redirect( + url_for("post.post", url_id=url_id, slug=post_slug) + ), 301 comment_text = escape(request.form["comment"]) diff --git a/app/routes/verify_user.py b/app/routes/verify_user.py index d1123de3f..904c32e1d 100755 --- a/app/routes/verify_user.py +++ b/app/routes/verify_user.py @@ -18,6 +18,7 @@ from utils.flash_message import flash_message from utils.forms.verify_user_form import VerifyUserForm from utils.log import Log +from utils.route_guards import login_required verify_user_blueprint = Blueprint("verify_user", __name__) @@ -25,6 +26,7 @@ @verify_user_blueprint.route( "/verify-user/codesent=", methods=["GET", "POST"] ) +@login_required("verify user") def verify_user(code_sent): """ This function handles the verification of the user's account. @@ -37,67 +39,66 @@ def verify_user(code_sent): """ - if "username" in session: - username = session["username"] - - user = User.query.filter(func.lower(User.username) == username.lower()).first() - - if not user: - return redirect("/") - - if user.is_verified == "True": - return redirect("/") - elif user.is_verified == "False": - form = VerifyUserForm(request.form) - - if code_sent == "true": - if request.method == "POST": - code = request.form["code"] - - if code == session.get("verification_code"): - user.is_verified = "True" - db.session.commit() - - Log.success(f'User: "{username}" has been verified') - flash_message( - page="verify_user", - message="success", - category="success", - language=session["language"], - ) - return redirect("/") - else: - flash_message( - page="verify_user", - message="wrong", - category="error", - language=session["language"], - ) - - return render_template( - "verify_user.html", - form=form, - mail_sent=True, - ) - elif code_sent == "false": - if request.method == "POST": - if user: - context = ssl.create_default_context() - server = smtplib.SMTP(Settings.SMTP_SERVER, Settings.SMTP_PORT) - server.ehlo() - server.starttls(context=context) - server.ehlo() - server.login(Settings.SMTP_MAIL, Settings.SMTP_PASSWORD) - - verification_code = str(randint(1000, 9999)) - session["verification_code"] = verification_code - - message = EmailMessage() - message.set_content( - f"Hi {username},\nHere is your account verification code:\n{verification_code}" - ) - message.add_alternative( - f"""\ + username = session["username"] + + user = User.query.filter(func.lower(User.username) == username.lower()).first() + + if not user: + return redirect("/") + + if user.is_verified == "True": + return redirect("/") + elif user.is_verified == "False": + form = VerifyUserForm(request.form) + + if code_sent == "true": + if request.method == "POST": + code = request.form["code"] + + if code == session.get("verification_code"): + user.is_verified = "True" + db.session.commit() + + Log.success(f'User: "{username}" has been verified') + flash_message( + page="verify_user", + message="success", + category="success", + language=session["language"], + ) + return redirect("/") + else: + flash_message( + page="verify_user", + message="wrong", + category="error", + language=session["language"], + ) + + return render_template( + "verify_user.html", + form=form, + mail_sent=True, + ) + elif code_sent == "false": + if request.method == "POST": + if user: + context = ssl.create_default_context() + server = smtplib.SMTP(Settings.SMTP_SERVER, Settings.SMTP_PORT) + server.ehlo() + server.starttls(context=context) + server.ehlo() + server.login(Settings.SMTP_MAIL, Settings.SMTP_PASSWORD) + + verification_code = str(randint(1000, 9999)) + session["verification_code"] = verification_code + + message = EmailMessage() + message.set_content( + f"Hi {username},\nHere is your account verification code:\n{verification_code}" + ) + message.add_alternative( + f"""\
""", - subtype="html", - ) - message["Subject"] = f"Verify your {Settings.APP_NAME} account!" - message["From"] = Settings.SMTP_MAIL - message["To"] = user.email - - server.send_message(message) - server.quit() - Log.success( - f'Verification code sent to "{user.email}" for user: "{username}"' - ) - - return redirect("/verify-user/codesent=true") - - return render_template( - "verify_user.html", - form=form, - mail_sent=False, - ) - else: - Log.error(f"{request.remote_addr} tried to verify user without being logged in") - return redirect("/") + subtype="html", + ) + message["Subject"] = f"Verify your {Settings.APP_NAME} account!" + message["From"] = Settings.SMTP_MAIL + message["To"] = user.email + + server.send_message(message) + server.quit() + Log.success( + f'Verification code sent to "{user.email}" for user: "{username}"' + ) + + return redirect("/verify-user/codesent=true") + + return render_template( + "verify_user.html", + form=form, + mail_sent=False, + ) diff --git a/app/templates/change_username.html b/app/templates/change_username.html index f20b5fd54..8fe149e1b 100755 --- a/app/templates/change_username.html +++ b/app/templates/change_username.html @@ -1,5 +1,5 @@ {% extends 'layout.html' %} {% block head %} -{{translations.change_user_name.title}} +{{translations.change_username.title}} {% endblock head %} {% block body %}
@@ -8,13 +8,13 @@

- {{translations.change_user_name.title}} + {{translations.change_username.title}}

- - {{ form.new_username(class_="input input-bordered w-full", placeholder=translations.change_user_name.placeholder) }} + + {{ form.new_username(class_="input input-bordered w-full", placeholder=translations.change_username.placeholder) }}
diff --git a/app/translations/de.json b/app/translations/de.json index 688d95517..5764ee3c9 100644 --- a/app/translations/de.json +++ b/app/translations/de.json @@ -83,7 +83,7 @@ "set": "Profilbild festlegen", "placeholder": "Seed für Profilbild eingeben" }, - "change_user_name": { + "change_username": { "title": "Benutzernamen ändern", "placeholder": "Neuer Benutzername", "change": "Benutzernamen ändern" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "Profilbild wurde geändert." }, - "change_user_name": { + "change_username": { "same": "Das ist bereits Ihr Benutzername.", "success": "Benutzername wurde geändert.", "taken": "Dieser Benutzername ist bereits vergeben.", diff --git a/app/translations/en.json b/app/translations/en.json index 762e07a9d..83991a9a7 100644 --- a/app/translations/en.json +++ b/app/translations/en.json @@ -83,7 +83,7 @@ "set": "Set Profile Picture", "placeholder": "Enter seed for profile picture" }, - "change_user_name": { + "change_username": { "title": "Change Username", "placeholder": "New Username", "change": "Change Username" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "Profile picture has been changed." }, - "change_user_name": { + "change_username": { "same": "This is already your username.", "success": "Username has been changed.", "taken": "This username is already taken.", diff --git a/app/translations/es.json b/app/translations/es.json index c144dd332..ca91e74d9 100644 --- a/app/translations/es.json +++ b/app/translations/es.json @@ -83,7 +83,7 @@ "set": "Establecer Foto de Perfil", "placeholder": "Ingrese semilla para la foto de perfil" }, - "change_user_name": { + "change_username": { "title": "Cambiar Nombre de Usuario", "placeholder": "Nuevo Nombre de Usuario", "change": "Cambiar Nombre de Usuario" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "La imagen de perfil ha sido cambiada." }, - "change_user_name": { + "change_username": { "same": "Este ya es tu nombre de usuario.", "success": "El nombre de usuario ha sido cambiado.", "taken": "Este nombre de usuario ya está en uso.", diff --git a/app/translations/fr.json b/app/translations/fr.json index 0bbee1b24..cfd71c8cf 100644 --- a/app/translations/fr.json +++ b/app/translations/fr.json @@ -83,7 +83,7 @@ "set": "Définir la photo de profil", "placeholder": "Entrez la graine pour la photo de profil" }, - "change_user_name": { + "change_username": { "title": "Changer le nom d'utilisateur", "placeholder": "Nouveau nom d'utilisateur", "change": "Changer le nom d'utilisateur" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "La photo de profil a été changée." }, - "change_user_name": { + "change_username": { "same": "C'est déjà votre nom d'utilisateur.", "success": "Le nom d'utilisateur a été modifié.", "taken": "Ce nom d'utilisateur est déjà pris.", diff --git a/app/translations/hi.json b/app/translations/hi.json index 64001ad09..213073c96 100644 --- a/app/translations/hi.json +++ b/app/translations/hi.json @@ -83,7 +83,7 @@ "set": "प्रोफ़ाइल चित्र सेट करें", "placeholder": "प्रोफ़ाइल चित्र के लिए सीड दर्ज करें" }, - "change_user_name": { + "change_username": { "title": "यूज़रनेम बदलें", "placeholder": "नया यूज़रनेम", "change": "यूज़रनेम बदलें" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "प्रोफ़ाइल चित्र बदल दिया गया है।" }, - "change_user_name": { + "change_username": { "same": "यह पहले से ही आपका उपयोगकर्ता नाम है।", "success": "उपयोगकर्ता नाम सफलतापूर्वक बदल दिया गया है।", "taken": "यह उपयोगकर्ता नाम पहले से लिया जा चुका है।", @@ -319,4 +319,4 @@ "contact": "हमसे संपर्क करें", "contact_text": "यदि आपके पास हमारी गोपनीयता नीति के बारे में कोई प्रश्न या सुझाव है, तो कृपया हमसे संपर्क करने में संकोच न करें। मेल:" } -} \ No newline at end of file +} diff --git a/app/translations/ja.json b/app/translations/ja.json index dc0c75f20..757132e5b 100644 --- a/app/translations/ja.json +++ b/app/translations/ja.json @@ -83,7 +83,7 @@ "set": "プロフィール写真を設定", "placeholder": "プロフィール写真のシードを入力" }, - "change_user_name": { + "change_username": { "title": "ユーザー名の変更", "placeholder": "新しいユーザー名", "change": "ユーザー名を変更" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "プロフィール写真が変更されました。" }, - "change_user_name": { + "change_username": { "same": "これはすでにあなたのユーザー名です。", "success": "ユーザー名が変更されました。", "taken": "このユーザー名はすでに使用されています。", diff --git a/app/translations/pl.json b/app/translations/pl.json index 11a52a3c1..17784cff6 100644 --- a/app/translations/pl.json +++ b/app/translations/pl.json @@ -83,7 +83,7 @@ "set": "Ustaw zdjęcie profilowe", "placeholder": "Wprowadź seed dla zdjęcia profilowego" }, - "change_user_name": { + "change_username": { "title": "Zmień nazwę użytkownika", "placeholder": "Nowa nazwa użytkownika", "change": "Zmień nazwę użytkownika" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "Zdjęcie profilowe zostało zmienione." }, - "change_user_name": { + "change_username": { "same": "To jest już twoja nazwa użytkownika.", "success": "Nazwa użytkownika została zmieniona.", "taken": "Ta nazwa użytkownika jest już zajęta.", diff --git a/app/translations/pt.json b/app/translations/pt.json index 76141327e..4931ea30b 100644 --- a/app/translations/pt.json +++ b/app/translations/pt.json @@ -83,7 +83,7 @@ "set": "Definir Foto de Perfil", "placeholder": "Digite a semente para a foto de perfil" }, - "change_user_name": { + "change_username": { "title": "Mudar Nome de Usuário", "placeholder": "Novo Nome de Usuário", "change": "Mudar Nome de Usuário" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "A foto de perfil foi alterada." }, - "change_user_name": { + "change_username": { "same": "Este já é seu nome de usuário.", "success": "O nome de usuário foi alterado.", "taken": "Este nome de usuário já está em uso.", diff --git a/app/translations/ru.json b/app/translations/ru.json index 94d077a72..a89b7a0de 100644 --- a/app/translations/ru.json +++ b/app/translations/ru.json @@ -83,7 +83,7 @@ "set": "Установить изображение профиля", "placeholder": "Введите семя для изображения профиля" }, - "change_user_name": { + "change_username": { "title": "Сменить имя пользователя", "placeholder": "Новое имя пользователя", "change": "Сменить имя пользователя" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "Фото профиля было изменено." }, - "change_user_name": { + "change_username": { "same": "Это уже ваше имя пользователя.", "success": "Имя пользователя было изменено.", "taken": "Это имя пользователя уже занято.", diff --git a/app/translations/tr.json b/app/translations/tr.json index ba6950deb..84a36646e 100644 --- a/app/translations/tr.json +++ b/app/translations/tr.json @@ -83,7 +83,7 @@ "set": "Profil Resmini Ayarla", "placeholder": "Profil resmi için seed girin" }, - "change_user_name": { + "change_username": { "title": "Kullanıcı Adını Değiştir", "placeholder": "Yeni Kullanıcı Adı", "change": "Kullanıcı Adını Değiştir" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "Profil resmi değiştirildi." }, - "change_user_name": { + "change_username": { "same": "Bu zaten sizin kullanıcı adınız.", "success": "Kullanıcı adı değiştirildi.", "taken": "Bu kullanıcı adı zaten alınmış.", diff --git a/app/translations/uk.json b/app/translations/uk.json index f77bcc6f7..7a2f640e8 100644 --- a/app/translations/uk.json +++ b/app/translations/uk.json @@ -83,7 +83,7 @@ "set": "Встановити фотографію профілю", "placeholder": "Введіть насіння для фотографії профілю" }, - "change_user_name": { + "change_username": { "title": "Змінити ім'я користувача", "placeholder": "Нове ім'я користувача", "change": "Змінити ім'я користувача" @@ -224,7 +224,7 @@ "change_profile_picture": { "success": "Фото профілю було змінено." }, - "change_user_name": { + "change_username": { "same": "Це вже ваше ім'я користувача.", "success": "Ім'я користувача було змінено.", "taken": "Це ім'я користувача вже зайняте.", diff --git a/app/translations/zh.json b/app/translations/zh.json index 94ff45460..c5493205e 100644 --- a/app/translations/zh.json +++ b/app/translations/zh.json @@ -82,7 +82,7 @@ "set": "设置头像", "placeholder": "输入头像种子" }, - "change_user_name": { + "change_username": { "title": "更改用户名", "placeholder": "新用户名", "change": "更改用户名" @@ -223,7 +223,7 @@ "change_profile_picture": { "success": "头像已更改。" }, - "change_user_name": { + "change_username": { "same": "这已经是您的用户名。", "success": "用户名已更改。", "taken": "此用户名已被占用。", diff --git a/app/utils/forms/change_user_name_form.py b/app/utils/forms/change_username_form.py similarity index 100% rename from app/utils/forms/change_user_name_form.py rename to app/utils/forms/change_username_form.py diff --git a/app/utils/route_guards.py b/app/utils/route_guards.py new file mode 100644 index 000000000..af33120e2 --- /dev/null +++ b/app/utils/route_guards.py @@ -0,0 +1,73 @@ +from functools import wraps + +from flask import redirect, request, session + +from models import User +from utils.flash_message import flash_message +from utils.log import Log + + +def login_required( + route_name: str, + redirect_to="/", + flash_page: str | None = None, + flash_message_key: str = "login", + flash_category: str = "error", +): + """Ensure the current session has a logged in user.""" + + def decorator(view_func): + @wraps(view_func) + def wrapped_view(*args, **kwargs): + if "username" in session: + return view_func(*args, **kwargs) + + Log.error( + f"{request.remote_addr} tried to reach {route_name} without being logged in" + ) + + if flash_page: + flash_message( + page=flash_page, + message=flash_message_key, + category=flash_category, + language=session.get("language", "en"), + ) + + if callable(redirect_to): + return redirect(redirect_to(*args, **kwargs)) + + return redirect(redirect_to) + + return wrapped_view + + return decorator + + +def admin_required(route_name: str): + """Ensure the current session belongs to an admin user.""" + + def decorator(view_func): + @login_required(route_name) + @wraps(view_func) + def wrapped_view(*args, **kwargs): + username = session["username"] + + user = User.query.filter_by(username=username).first() + if not user: + Log.error( + f'Session user "{username}" was not found while reaching {route_name}' + ) + return redirect("/") + + if user.role != "admin": + Log.error( + f"{request.remote_addr} tried to reach {route_name} without being admin" + ) + return redirect("/") + + return view_func(*args, **kwargs) + + return wrapped_view + + return decorator diff --git a/tests/README.md b/tests/README.md index edc76971e..cf4deb5fa 100644 --- a/tests/README.md +++ b/tests/README.md @@ -4,226 +4,197 @@ End-to-end tests for Flask Blog using Pytest and Playwright. ## Quick Start -### Using Makefile (Recommended) +Use Make targets from the repository root: ```bash -make install # Install all dependencies -make test # Run all tests (parallel) -make test-slow # Run with visible browser (slow-mo, sequential) +make install # Install app + dev + test deps and Playwright browser +make test # Run all E2E tests (parallel) +make test-slow # Run headed browser with slow-mo (sequential) ``` -### Manual Commands +## Run Specific Tests + +If you need targeted runs, execute pytest from `app/`: ```bash cd app -# Install test dependencies -uv sync --extra test -uv run playwright install chromium - -# Run all tests (parallel by default) +# Full E2E suite (parallel by default via pytest.ini) uv run pytest ../tests/e2e/ -v -# Run specific test file -uv run pytest ../tests/e2e/auth/test_login.py -v - -# Run with headed browser (visible) -uv run pytest ../tests/e2e/ --headed +# Specific domain +uv run pytest ../tests/e2e/post/ -v +uv run pytest ../tests/e2e/account/ -v -# Run specific test class +# Specific file / class / test +uv run pytest ../tests/e2e/auth/test_login.py -v uv run pytest ../tests/e2e/auth/test_login.py::TestLoginSuccess -v +uv run pytest ../tests/e2e/post/test_post.py::TestPostComments::test_logged_in_user_can_comment_on_post -v ``` +## Current Suite Coverage + +Current local suite size: **110 tests** across **14 test files**. + +| Suite | Files | Tests | Focus | +| ----- | ----- | ----- | ----- | +| `e2e/auth/` | 3 | 62 | Login, signup, logout, session handling | +| `e2e/account/` | 5 | 17 | Account settings, username/profile updates, password change flow, dashboard, static pages, preferences | +| `e2e/post/` | 1 | 14 | Create/edit/delete post, comments, authorization, admin moderation via protected POST flows | +| `e2e/admin/` | 1 | 8 | Admin access control, users (role + delete), comments management | +| `e2e/search/` | 2 | 6 | Search results and category filtering | +| `e2e/home/` | 1 | 3 | Home rendering and sorting routes | + +Recently added high-impact coverage: + +- Dashboard forged delete requests cannot remove posts owned by other users. +- Admin can delete users from `/admin/users`. +- Non-admin users are blocked from `/admin/comments`. +- Admin can delete other users' posts through the post route with valid CSRF. +- Admin can delete other users' comments through the post route with valid CSRF. + ## Parallel Execution -Tests run in parallel by default using `pytest-xdist` with automatic worker detection (`-n auto`). This uses all available CPU cores to speed up test execution. +Tests run in parallel by default using `pytest-xdist` (`-n auto` from `pytest.ini`). ```bash -# Run with specific number of workers -uv run pytest ../tests/e2e/ -n 4 +cd app -# Run sequentially (disable parallel) -uv run pytest ../tests/e2e/ -n 0 +# Override worker count +uv run pytest ../tests/e2e/ -n 4 -v -# Run with slow motion for debugging (milliseconds) -uv run pytest ../tests/e2e/ --slowmo 500 +# Disable parallel +uv run pytest ../tests/e2e/ -n 0 -v + +# Headed + slow motion for debugging +uv run pytest ../tests/e2e/ --headed --slowmo 500 -n 0 -v ``` -**How it works:** +Parallel behavior: -- A single Flask server is shared across all workers (coordinated via file locks) -- Each test creates unique users with UUIDs to avoid conflicts -- Database is backed up before tests and restored after all workers complete -- Browser contexts are isolated per test for clean state +- One shared Flask server is started with file-lock coordination. +- Database is backed up before the session and restored at the end. +- UUID-based user data avoids collisions between workers. +- Each test gets an isolated browser context and page. -## Structure +## Project Structure -``` +```text tests/ -├── conftest.py # Root fixtures +├── conftest.py # Root fixtures (markers, app_settings) +├── README.md └── e2e/ - ├── conftest.py # E2E fixtures (server, browser, database) - ├── auth/ # Authentication tests + ├── conftest.py # Server, browser, DB coordination + ├── account/ + │ ├── test_account_settings.py + │ ├── test_change_password_flow.py + │ ├── test_dashboard.py + │ ├── test_profile_and_preferences.py + │ └── test_static_pages.py + ├── admin/ + │ └── test_admin.py + ├── auth/ │ ├── test_login.py │ ├── test_logout.py │ └── test_signup.py - ├── pages/ # Page Object Model - │ ├── base_page.py - │ ├── login_page.py - │ ├── signup_page.py - │ └── navbar_component.py - └── helpers/ # Utilities - ├── database_helpers.py - └── test_data.py + ├── home/ + │ └── test_home.py + ├── post/ + │ └── test_post.py + ├── search/ + │ ├── test_category.py + │ └── test_search.py + ├── helpers/ + │ ├── database_helpers.py + │ └── test_data.py + └── pages/ + ├── base_page.py + ├── create_post_page.py + ├── login_page.py + ├── navbar_component.py + ├── post_page.py + └── signup_page.py ``` -## Test Coverage - -### Authentication (`e2e/auth/`) - -#### Login (`test_login.py` - 18 tests) - -| Category | Test | Description | -| --------------- | -------------------------------------------- | ---------------------------------------------- | -| Page Rendering | `test_login_page_renders` | Page loads with all required elements | -| | `test_login_page_has_csrf_token` | CSRF protection enabled | -| | `test_login_page_has_forgot_password_link` | Forgot password link present | -| | `test_login_page_title` | Correct page title | -| Success Flows | `test_login_with_valid_credentials` | Admin login with valid credentials | -| | `test_login_redirect_after_success` | Redirects to home after login | -| | `test_login_case_insensitive_username` | Username is case-insensitive | -| | `test_login_whitespace_trimmed` | Whitespace in username trimmed | -| Error Handling | `test_login_wrong_password` | Wrong password shows error | -| | `test_login_nonexistent_user` | Nonexistent user shows error | -| | `test_login_empty_password` | Empty password validation | -| Session | `test_login_already_logged_in_redirects` | Logged-in user redirected from login page | -| | `test_login_creates_session` | Session created (navbar shows logged-in state) | -| | `test_session_persists_after_navigation` | Session persists across pages | -| Form Validation | `test_login_empty_username_validation` | HTML5 validation for empty username | -| | `test_login_empty_password_validation` | HTML5 validation for empty password | -| | `test_login_form_prevents_double_submission` | No double submission issues | -| Dynamic Users | `test_login_with_test_user` | Dynamic test user can login | -| | `test_login_test_user_creates_session` | Test user login creates session | - -#### Logout (`test_logout.py` - 15 tests) - -| Category | Test | Description | -| ------------- | ------------------------------------------------ | --------------------------------- | -| Basic | `test_logout_clears_session_and_redirects` | Logout redirects to home | -| | `test_logout_shows_success_flash_message` | Success flash after logout | -| | `test_logout_button_not_visible_when_logged_out` | Logout hidden when not logged in | -| Session State | `test_logout_removes_session_navbar_shows_login` | Navbar shows login after logout | -| | `test_logout_session_does_not_persist` | Session cleared after navigation | -| | `test_cannot_access_create_post_after_logout` | Protected pages redirect to login | -| Edge Cases | `test_logout_when_not_logged_in_redirects` | Direct /logout redirects to home | -| | `test_logout_when_not_logged_in_no_flash` | No flash when not logged in | -| | `test_double_logout_does_not_error` | Double logout is safe | -| User Types | `test_logout_admin_user` | Admin can logout | -| | `test_logout_regular_user` | Regular user can logout | -| | `test_logout_and_login_as_different_user` | Can switch users after logout | -| UI Behavior | `test_login_link_appears_after_logout` | Login link visible after logout | -| | `test_profile_avatar_hidden_after_logout` | Avatar hidden after logout | -| | `test_create_post_button_hidden_after_logout` | Create post hidden after logout | - -#### Signup (`test_signup.py` - 22 tests) - -| Category | Test | Description | -| --------------- | ------------------------------------------------- | ----------------------------------------- | -| Page Rendering | `test_signup_page_renders` | Page loads with all elements | -| | `test_signup_page_has_csrf_token` | CSRF protection enabled | -| | `test_signup_page_has_privacy_policy_link` | Privacy policy link present | -| | `test_signup_page_title` | Correct page title | -| Success Flows | `test_signup_with_valid_data` | User can signup with valid data | -| | `test_signup_creates_user_in_database` | User record created in DB | -| | `test_signup_auto_login` | User auto-logged in after signup | -| | `test_signup_awards_points` | 1 point awarded to new user | -| | `test_signup_user_is_unverified` | New user has is_verified='False' | -| Error Handling | `test_signup_duplicate_username` | Existing username rejected | -| | `test_signup_duplicate_username_case_insensitive` | Case-insensitive username check | -| | `test_signup_duplicate_email` | Existing email rejected | -| | `test_signup_duplicate_email_case_insensitive` | Case-insensitive email check | -| | `test_signup_password_mismatch` | Mismatched passwords rejected | -| | `test_signup_non_ascii_username` | Non-ASCII username rejected | -| | `test_signup_both_username_and_email_taken` | Both taken shows error | -| Form Validation | `test_signup_username_too_short` | Username < 4 chars rejected | -| | `test_signup_username_too_long` | Username > 25 chars rejected | -| | `test_signup_email_invalid_format` | Invalid email format rejected | -| | `test_signup_password_too_short` | Password < 8 chars rejected | -| | `test_signup_empty_fields_validation` | Empty fields trigger validation | -| | `test_signup_username_whitespace_stripped` | Whitespace stripped from username | -| Session | `test_signup_when_already_logged_in` | Logged-in user redirected from signup | -| | `test_signup_session_persists_after_navigation` | Session persists after signup | -| | `test_can_access_protected_pages_after_signup` | Can access protected pages after signup | -| Edge Cases | `test_signup_with_special_email_characters` | Plus-addressed email (user+tag@) accepted | -| | `test_signup_minimum_valid_lengths` | Minimum valid lengths work | -| | `test_signup_maximum_valid_lengths` | Maximum valid lengths work | - ## Architecture ### Page Object Model -Pages encapsulate UI interactions: +Page objects encapsulate UI interactions (`tests/e2e/pages/`), including: + +- `LoginPage` +- `SignupPage` +- `CreatePostPage` +- `PostPage` +- `NavbarComponent` + +Example: ```python -from tests.e2e.pages.login_page import LoginPage +from tests.e2e.pages.create_post_page import CreatePostPage -def test_login(page, flask_server): - login_page = LoginPage(page, flask_server["base_url"]) - login_page.navigate() - login_page.login("admin", "admin") - login_page.expect_success_flash() +def test_create_post(page, flask_server): + create_post_page = CreatePostPage(page, flask_server["base_url"]) + create_post_page.navigate() + create_post_page.expect_page_loaded() ``` -### Fixtures +### Key Fixtures -| Fixture | Scope | Purpose | -| ------------------ | -------- | -------------------------------- | -| `flask_server` | session | Starts/stops the Flask app | -| `browser_instance` | session | Single Chromium instance | -| `clean_db` | session | Resets database once at start | -| `page` | function | Fresh page per test | -| `test_user` | function | Creates a unique UUID-based user | -| `logged_in_page` | function | Pre-authenticated page | +| Fixture | Scope | Purpose | +| ------- | ----- | ------- | +| `flask_server` | session | Starts/stops Flask app and shares it across workers | +| `browser_instance` | session | Single Chromium browser instance | +| `context` | function | Fresh isolated browser context per test | +| `page` | function | Fresh page per test | +| `clean_db` | session | One-time DB cleanup before tests | +| `test_user` | function | Creates unique UUID-based user | +| `unverified_test_user` | function | Creates unique unverified user | +| `logged_in_page` | function | Page pre-authenticated as default admin | -### Test Data +### Test Data Helpers -Generate test users with `UserData`: +`UserData` factory: ```python from tests.e2e.helpers.test_data import UserData -user = UserData.generate() # Random user -admin = UserData.admin() # Admin user -unverified = UserData.unverified() # Unverified user +user = UserData.generate() +unverified = UserData.unverified() ``` -### Database Helpers - -Direct database access for test setup: +Database helpers (`tests/e2e/helpers/database_helpers.py`) are used for test setup/assertions, for example: ```python -from tests.e2e.helpers.database_helpers import create_test_user, user_exists +from tests.e2e.helpers.database_helpers import create_test_user, get_user_by_username create_test_user(db_path, "testuser", "test@example.com", "Password123!") -assert user_exists(db_path, "testuser") +assert get_user_by_username(db_path, "testuser") is not None ``` ## Markers -Run tests by category: +Registered markers: + +- `auth` +- `admin` +- `smoke` +- `slow` + +Usage: ```bash -pytest -m auth # Authentication tests -pytest -m smoke # Quick smoke tests -pytest -m admin # Admin-related tests -pytest -m slow # Long-running tests +cd app +uv run pytest ../tests/e2e/ -m auth -v +uv run pytest ../tests/e2e/ -m smoke -v +uv run pytest ../tests/e2e/ -m "admin and not smoke" -v +uv run pytest ../tests/e2e/ -m "not slow" -v ``` -## CI/CD - -Tests run automatically via GitHub Actions on: +## CI -- Push to `main` -- Pull requests +E2E tests run in GitHub Actions (`.github/workflows/e2e-tests.yaml`) on: -See `.github/workflows/e2e-tests.yml` for configuration. +- Push to `main` (when `app/**`, `tests/**`, or workflow file changes) +- Pull requests to `main` +- Manual dispatch (`workflow_dispatch`) with optional `test_path` diff --git a/tests/e2e/account/__init__.py b/tests/e2e/account/__init__.py new file mode 100644 index 000000000..d0feaf097 --- /dev/null +++ b/tests/e2e/account/__init__.py @@ -0,0 +1 @@ +# Account E2E Tests Package diff --git a/tests/e2e/account/test_account_settings.py b/tests/e2e/account/test_account_settings.py new file mode 100644 index 000000000..83d6da651 --- /dev/null +++ b/tests/e2e/account/test_account_settings.py @@ -0,0 +1,107 @@ +""" +E2E tests for account settings, change username, change password, and delete account. +""" + +import re + +import pytest +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import get_user_by_username +from tests.e2e.pages.login_page import LoginPage + + +def _login(page, flask_server, username: str, password: str): + login_page = LoginPage(page, flask_server["base_url"]) + login_page.navigate("/login/redirect=&") + login_page.login(username, password) + page.wait_for_url("**/", timeout=5000) + + +class TestAccountSettings: + """Tests for account settings page access.""" + + @pytest.mark.smoke + @pytest.mark.auth + def test_account_settings_loads_for_logged_in_user( + self, page, flask_server, test_user + ): + """Logged-in user should see account settings with menu links and delete button.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/account-settings", + wait_until="domcontentloaded", + ) + + expect(page.locator('a[href="/change-username"]')).to_be_visible(timeout=5000) + expect(page.locator('a[href="/change-password"]')).to_be_visible(timeout=5000) + expect(page.locator(".btn-error").first).to_be_visible(timeout=5000) + + @pytest.mark.auth + def test_account_settings_requires_login(self, page, flask_server): + """Accessing account settings without login should redirect to login.""" + page.goto( + f"{flask_server['base_url']}/account-settings", + wait_until="domcontentloaded", + ) + + expect(page).to_have_url( + re.compile(rf"^{re.escape(flask_server['base_url'])}/login/.*$"), + timeout=10000, + ) + + +class TestChangePassword: + """Tests for the change password flow.""" + + @pytest.mark.auth + def test_change_password_redirects_to_login(self, page, flask_server, test_user): + """Changing password should log out the user and redirect to login. + + note: the change_password route has a known bug at line 76-81 where + session.clear() is called before accessing session["language"]. this + test may surface a 500 error. if it does, the route needs fixing first. + """ + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/change-password", + wait_until="domcontentloaded", + ) + + new_password = "NewPassword456!" + page.fill('input[name="old_password"]', test_user.password) + page.fill('input[name="password"]', new_password) + page.fill('input[name="password_confirm"]', new_password) + page.click('button[type="submit"]') + + # the route should redirect to /login/ after password change. + # if the session.clear() bug is present, this may result in an error + # page instead - which is also a valid (bug-catching) outcome. + expect(page).to_have_url( + re.compile( + rf"^{re.escape(flask_server['base_url'])}/(login/.*|change-password)$" + ), + timeout=10000, + ) + + +class TestDeleteAccount: + """Tests for account deletion.""" + + @pytest.mark.auth + def test_delete_account_removes_user(self, page, flask_server, test_user, db_path): + """Deleting account should remove user from database and redirect to /.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/account-settings", + wait_until="domcontentloaded", + ) + + page.locator(".btn-error").first.click() + page.wait_for_url(f"{flask_server['base_url']}/", timeout=5000) + + user = get_user_by_username(str(db_path), test_user.username) + assert user is None, "user should be deleted from database" diff --git a/tests/e2e/account/test_change_password_flow.py b/tests/e2e/account/test_change_password_flow.py new file mode 100644 index 000000000..815892e8a --- /dev/null +++ b/tests/e2e/account/test_change_password_flow.py @@ -0,0 +1,59 @@ +""" +E2E test for password change credential rotation flow. +""" + +import re + +import pytest +from playwright.sync_api import expect + +from tests.e2e.pages.login_page import LoginPage +from tests.e2e.pages.navbar_component import NavbarComponent + + +def _login(page, flask_server, username: str, password: str): + login_page = LoginPage(page, flask_server["base_url"]) + login_page.navigate("/login/redirect=&") + login_page.login(username, password) + page.wait_for_url("**/", timeout=5000) + + +class TestChangePasswordCredentialRotation: + """Tests that password changes update real authentication behavior.""" + + @pytest.mark.auth + def test_change_password_invalidates_old_password_and_accepts_new_password( + self, page, flask_server, test_user + ): + """After password change, old password should fail and new password should work.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/change-password", + wait_until="domcontentloaded", + ) + + new_password = "NewPassword456!" + page.fill('input[name="old_password"]', test_user.password) + page.fill('input[name="password"]', new_password) + page.fill('input[name="password_confirm"]', new_password) + page.click('button[type="submit"]') + + expect(page).to_have_url( + re.compile(rf"^{re.escape(flask_server['base_url'])}/login/.*$"), + timeout=10000, + ) + + login_page = LoginPage(page, flask_server["base_url"]) + + login_page.login(test_user.username, test_user.password) + login_page.expect_error_flash() + + login_page.login(test_user.username, new_password) + expect(page).to_have_url( + re.compile(rf"^{re.escape(flask_server['base_url'])}/?$"), + timeout=10000, + ) + + navbar = NavbarComponent(page) + navbar.expect_logged_in() diff --git a/tests/e2e/account/test_dashboard.py b/tests/e2e/account/test_dashboard.py new file mode 100644 index 000000000..73ed4b1c1 --- /dev/null +++ b/tests/e2e/account/test_dashboard.py @@ -0,0 +1,172 @@ +""" +E2E tests for the user dashboard. +""" + +import re +import uuid + +import pytest +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import ( + create_test_comment, + create_test_post, + get_post_by_url_id, +) +from tests.e2e.pages.login_page import LoginPage + + +def _suffix() -> str: + return uuid.uuid4().hex[:8] + + +def _login(page, flask_server, username: str, password: str): + login_page = LoginPage(page, flask_server["base_url"]) + login_page.navigate("/login/redirect=&") + login_page.login(username, password) + page.wait_for_url("**/", timeout=5000) + + +def _get_csrf_token(page) -> str: + token = page.locator('input[name="csrf_token"]').first.get_attribute("value") + assert token is not None and token != "" + return token + + +class TestDashboard: + """Tests for user dashboard functionality.""" + + def test_dashboard_shows_user_posts_and_comments( + self, page, flask_server, test_user, db_path + ): + """Dashboard should display the user's posts and comments.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Dashboard Post {seed}", + content=f"Content for dashboard test {seed}", + abstract=f"Abstract for dashboard test {seed}. " + "A" * 160, + author=test_user.username, + ) + + comment_text = f"Dashboard comment {seed} with enough text to display." + create_test_comment( + db_path=str(db_path), + post_id=post["id"], + comment=comment_text, + username=test_user.username, + ) + + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/dashboard/{test_user.username}", + wait_until="domcontentloaded", + ) + + expect(page.locator("body")).to_contain_text( + f"Dashboard Post {seed}", timeout=5000 + ) + expect(page.locator("body")).to_contain_text(comment_text, timeout=5000) + + @pytest.mark.auth + def test_dashboard_requires_login(self, page, flask_server): + """Accessing dashboard without login should redirect to login page.""" + page.goto( + f"{flask_server['base_url']}/dashboard/someuser", + wait_until="domcontentloaded", + ) + + expect(page).to_have_url( + re.compile(rf"^{re.escape(flask_server['base_url'])}/login/.*$"), + timeout=10000, + ) + + @pytest.mark.auth + def test_dashboard_redirects_to_own_dashboard(self, page, flask_server, test_user): + """Accessing another user's dashboard should redirect to your own.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/dashboard/admin", + wait_until="domcontentloaded", + ) + + expect(page).to_have_url( + re.compile( + rf"^{re.escape(flask_server['base_url'])}/dashboard/{re.escape(test_user.username.lower())}.*$" + ), + timeout=5000, + ) + + def test_dashboard_can_delete_post(self, page, flask_server, test_user, db_path): + """User should be able to delete their own post from the dashboard.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Deletable Dashboard Post {seed}", + content=f"Content for delete test {seed}", + abstract=f"Abstract for delete test {seed}. " + "A" * 160, + author=test_user.username, + ) + + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/dashboard/{test_user.username}", + wait_until="domcontentloaded", + ) + + post_card = page.locator( + ".card.bg-base-200", has_text=f"Deletable Dashboard Post {seed}" + ) + expect(post_card).to_be_visible(timeout=5000) + + post_card.locator('button[name="post_delete_button"]').click() + page.wait_for_load_state("domcontentloaded") + + deleted_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert deleted_post is None, "post should be deleted from database" + + @pytest.mark.auth + def test_dashboard_forged_request_cannot_delete_another_users_post( + self, page, flask_server, test_user, db_path + ): + """Forged dashboard POST must not delete a post owned by another user.""" + seed = _suffix() + protected_post = create_test_post( + db_path=str(db_path), + title=f"Protected Dashboard Post {seed}", + content=f"Protected content {seed}", + abstract=f"Protected abstract {seed}. " + "A" * 160, + author="admin", + ) + + create_test_post( + db_path=str(db_path), + title=f"Owned Dashboard Post {seed}", + content=f"Owned content {seed}", + abstract=f"Owned abstract {seed}. " + "A" * 160, + author=test_user.username, + ) + + _login(page, flask_server, test_user.username, test_user.password) + + dashboard_url = f"{flask_server['base_url']}/dashboard/{test_user.username}" + page.goto(dashboard_url, wait_until="domcontentloaded") + csrf_token = _get_csrf_token(page) + + response = page.request.post( + dashboard_url, + form={ + "csrf_token": csrf_token, + "post_delete_button": "1", + "post_id": str(protected_post["id"]), + }, + ) + assert response.ok + + still_exists = get_post_by_url_id(str(db_path), protected_post["url_id"]) + assert still_exists is not None, ( + "Dashboard delete must not remove posts owned by another user" + ) diff --git a/tests/e2e/account/test_profile_and_preferences.py b/tests/e2e/account/test_profile_and_preferences.py new file mode 100644 index 000000000..dd2e22a28 --- /dev/null +++ b/tests/e2e/account/test_profile_and_preferences.py @@ -0,0 +1,208 @@ +""" +E2E tests for profile/account settings and personalization routes. +""" + +import re +import uuid + +import pytest +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import ( + create_test_comment, + create_test_post, + get_comment_by_id, + get_post_by_url_id, + get_user_by_username, +) +from tests.e2e.pages.login_page import LoginPage + + +def _suffix() -> str: + return uuid.uuid4().hex[:8] + + +def _login(page, flask_server, username: str, password: str): + login_page = LoginPage(page, flask_server["base_url"]) + login_page.navigate("/login/redirect=&") + login_page.login(username, password) + page.wait_for_url("**/", timeout=5000) + + +class TestChangeUsername: + """Tests for the change username flow.""" + + @pytest.mark.auth + def test_change_username_updates_user_posts_comments_and_session( + self, page, flask_server, test_user, db_path + ): + """Changing username should migrate author/comment ownership and update session.""" + original_username = test_user.username + seed = _suffix() + new_username = f"renamed_{seed}" + + post = create_test_post( + db_path=str(db_path), + title=f"Rename Post {seed}", + content=f"Content for rename test {seed}", + abstract=f"Abstract for rename test {seed}. " + "A" * 160, + author=original_username, + ) + comment_id = create_test_comment( + db_path=str(db_path), + post_id=post["id"], + comment=f"Rename comment {seed} with enough text.", + username=original_username, + ) + + _login(page, flask_server, original_username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/change-username", + wait_until="domcontentloaded", + ) + page.fill('input[name="new_username"]', new_username) + page.click('button[type="submit"]') + expect(page).to_have_url( + f"{flask_server['base_url']}/account-settings", timeout=10000 + ) + + updated_user = get_user_by_username(str(db_path), new_username) + assert updated_user is not None, "Renamed user should exist in database" + assert get_user_by_username(str(db_path), original_username) is None, ( + "Old username should no longer exist in users table" + ) + + updated_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert updated_post is not None + assert updated_post["author"] == new_username + + updated_comment = get_comment_by_id(str(db_path), comment_id) + assert updated_comment is not None + assert updated_comment["username"] == new_username + + expect(page.locator(f'a[href="/user/{new_username.lower()}"]')).to_be_visible( + timeout=5000 + ) + + page.goto( + f"{flask_server['base_url']}/dashboard/{new_username}", + wait_until="domcontentloaded", + ) + expect(page).to_have_url( + re.compile( + rf"^{re.escape(flask_server['base_url'])}/dashboard/{re.escape(new_username.lower())}/?$" + ), + timeout=5000, + ) + + @pytest.mark.auth + def test_change_username_rejects_existing_username_case_insensitive( + self, page, flask_server, test_user, app_settings, db_path + ): + """Changing username to an existing username should fail without DB mutation.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/change-username", + wait_until="domcontentloaded", + ) + csrf_token = page.locator('input[name="csrf_token"]').first.get_attribute( + "value" + ) + assert csrf_token is not None and csrf_token != "" + + response = page.request.post( + f"{flask_server['base_url']}/change-username", + form={ + "csrf_token": csrf_token, + "new_username": app_settings["default_admin"]["username"].upper(), + }, + ) + assert response.ok + + page.goto( + f"{flask_server['base_url']}/dashboard/{test_user.username}", + wait_until="domcontentloaded", + ) + expect(page).to_have_url( + re.compile( + rf"^{re.escape(flask_server['base_url'])}/dashboard/{re.escape(test_user.username.lower())}/?$" + ), + timeout=5000, + ) + + still_exists = get_user_by_username(str(db_path), test_user.username) + assert still_exists is not None, "Username should remain unchanged on failure" + + +class TestChangeProfilePicture: + """Tests for profile picture updates.""" + + @pytest.mark.auth + def test_change_profile_picture_persists_new_dicebear_url( + self, page, flask_server, test_user, db_path + ): + """Changing profile picture should store the expected Dicebear URL.""" + seed = f"profile_{_suffix()}" + expected_profile_picture = ( + f"https://api.dicebear.com/7.x/identicon/svg?seed={seed}&radius=10" + ) + + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/change-profile-picture", + wait_until="domcontentloaded", + ) + page.fill('input[name="new_profile_picture_seed"]', seed) + page.click('button[type="submit"]') + + expect(page).to_have_url( + f"{flask_server['base_url']}/account-settings", timeout=10000 + ) + + updated_user = get_user_by_username(str(db_path), test_user.username) + assert updated_user is not None + assert updated_user["profile_picture"] == expected_profile_picture + + +class TestPreferences: + """Tests for language/theme preference routes.""" + + def test_set_language_redirects_and_persists_html_lang(self, page, flask_server): + """Selecting a language should redirect to change-language and persist in session.""" + page.goto( + f"{flask_server['base_url']}/set-language/tr", + wait_until="domcontentloaded", + ) + expect(page).to_have_url(f"{flask_server['base_url']}/change-language") + expect(page.locator("html")).to_have_attribute("lang", "tr", timeout=5000) + + page.goto(f"{flask_server['base_url']}/about", wait_until="domcontentloaded") + expect(page.locator("html")).to_have_attribute("lang", "tr", timeout=5000) + + def test_set_theme_redirects_to_referrer_and_persists_theme( + self, page, flask_server + ): + """Selecting a theme should return to previous page and keep theme across pages.""" + page.goto(f"{flask_server['base_url']}/about", wait_until="domcontentloaded") + page.eval_on_selector("#theme_modal", "modal => modal.showModal()") + page.click('a[href="/set-theme/cupcake"]') + + expect(page).to_have_url(f"{flask_server['base_url']}/about", timeout=5000) + expect(page.locator("html")).to_have_attribute( + "data-theme", + "cupcake", + timeout=5000, + ) + + page.goto( + f"{flask_server['base_url']}/search-bar", + wait_until="domcontentloaded", + ) + expect(page.locator("html")).to_have_attribute( + "data-theme", + "cupcake", + timeout=5000, + ) diff --git a/tests/e2e/account/test_static_pages.py b/tests/e2e/account/test_static_pages.py new file mode 100644 index 000000000..e03c38256 --- /dev/null +++ b/tests/e2e/account/test_static_pages.py @@ -0,0 +1,29 @@ +""" +E2E tests for static pages (about, privacy policy). +""" + +import pytest +from playwright.sync_api import expect + + +class TestStaticPages: + """Tests for static pages that require no authentication or database setup.""" + + @pytest.mark.smoke + def test_about_page_loads(self, page, flask_server): + """About page should render with github button and version badge.""" + page.goto(f"{flask_server['base_url']}/about", wait_until="domcontentloaded") + + expect(page.locator(".btn-primary").first).to_be_visible(timeout=5000) + expect(page.locator(".badge-ghost").first).to_be_visible(timeout=5000) + + @pytest.mark.smoke + def test_privacy_policy_page_loads(self, page, flask_server): + """Privacy policy page should render with article content and heading.""" + page.goto( + f"{flask_server['base_url']}/privacy-policy", + wait_until="domcontentloaded", + ) + + expect(page.locator("article.prose")).to_be_visible(timeout=5000) + expect(page.locator("h1").first).to_be_visible(timeout=5000) diff --git a/tests/e2e/admin/__init__.py b/tests/e2e/admin/__init__.py new file mode 100644 index 000000000..59bbfe2c0 --- /dev/null +++ b/tests/e2e/admin/__init__.py @@ -0,0 +1 @@ +# Admin E2E Tests Package diff --git a/tests/e2e/admin/test_admin.py b/tests/e2e/admin/test_admin.py new file mode 100644 index 000000000..cce9ab0d7 --- /dev/null +++ b/tests/e2e/admin/test_admin.py @@ -0,0 +1,216 @@ +""" +E2E tests for the admin panel. +""" + +import uuid + +import pytest +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import ( + create_test_comment, + create_test_post, + create_test_user, + get_user_by_username, +) +from tests.e2e.pages.login_page import LoginPage + + +def _suffix() -> str: + return uuid.uuid4().hex[:8] + + +def _login(page, flask_server, username: str, password: str): + login_page = LoginPage(page, flask_server["base_url"]) + login_page.navigate("/login/redirect=&") + login_page.login(username, password) + page.wait_for_url("**/", timeout=5000) + + +class TestAdminPanelAccess: + """Tests for admin panel access control.""" + + @pytest.mark.smoke + @pytest.mark.admin + def test_admin_panel_loads_for_admin(self, logged_in_page, flask_server): + """Admin user should see the admin panel with users, posts, and comments links.""" + logged_in_page.goto( + f"{flask_server['base_url']}/admin", wait_until="domcontentloaded" + ) + + expect(logged_in_page.locator('a[href="admin/users"]')).to_be_visible( + timeout=5000 + ) + expect(logged_in_page.locator('a[href="admin/posts"]')).to_be_visible( + timeout=5000 + ) + expect(logged_in_page.locator('a[href="admin/comments"]')).to_be_visible( + timeout=5000 + ) + + @pytest.mark.admin + def test_admin_panel_redirects_non_admin(self, page, flask_server, test_user): + """Non-admin user should be redirected away from admin panel to /.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto(f"{flask_server['base_url']}/admin", wait_until="domcontentloaded") + + expect(page).to_have_url(f"{flask_server['base_url']}/", timeout=5000) + + @pytest.mark.admin + def test_admin_posts_redirects_non_admin(self, page, flask_server, test_user): + """Non-admin user should be redirected away from /admin/posts to /.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/admin/posts", wait_until="domcontentloaded" + ) + + expect(page).to_have_url(f"{flask_server['base_url']}/", timeout=5000) + + @pytest.mark.admin + def test_admin_comments_redirects_non_admin(self, page, flask_server, test_user): + """Non-admin user should be redirected away from /admin/comments to /.""" + _login(page, flask_server, test_user.username, test_user.password) + + page.goto( + f"{flask_server['base_url']}/admin/comments", + wait_until="domcontentloaded", + ) + + expect(page).to_have_url(f"{flask_server['base_url']}/", timeout=5000) + + +class TestAdminUsers: + """Tests for admin user management page.""" + + @pytest.mark.admin + def test_admin_users_page_lists_users(self, logged_in_page, flask_server, db_path): + """Admin users page should display user cards with the admin user visible.""" + logged_in_page.goto( + f"{flask_server['base_url']}/admin/users", wait_until="domcontentloaded" + ) + + # admin user is always on page 1 + expect(logged_in_page.locator("body")).to_contain_text("admin", timeout=5000) + expect(logged_in_page.locator(".card.bg-base-200").first).to_be_visible( + timeout=5000 + ) + + @pytest.mark.admin + def test_admin_can_change_user_role( + self, page, flask_server, app_settings, db_path + ): + """Admin should be able to change a user's role from user to admin via POST.""" + seed = _suffix() + username = f"roletest{seed}" + create_test_user( + db_path=str(db_path), + username=username, + email=f"{username}@test.com", + password="TestPassword123!", + role="user", + ) + + _login( + page, + flask_server, + app_settings["default_admin"]["username"], + app_settings["default_admin"]["password"], + ) + + # navigate to admin/users to get a valid csrf token + page.goto( + f"{flask_server['base_url']}/admin/users", wait_until="domcontentloaded" + ) + + csrf_token = page.locator('input[name="csrf_token"]').first.get_attribute( + "value" + ) + + # submit role change via POST directly (avoids pagination) + page.request.post( + f"{flask_server['base_url']}/admin/users", + form={ + "csrf_token": csrf_token, + "username": username, + "user_role_change_button": "1", + }, + ) + + user = get_user_by_username(str(db_path), username) + assert user is not None + assert user["role"] == "admin" + + @pytest.mark.admin + def test_admin_can_delete_user(self, page, flask_server, app_settings, db_path): + """Admin should be able to delete a regular user from /admin/users.""" + seed = _suffix() + username = f"deltest{seed}" + create_test_user( + db_path=str(db_path), + username=username, + email=f"{username}@test.com", + password="TestPassword123!", + role="user", + ) + + _login( + page, + flask_server, + app_settings["default_admin"]["username"], + app_settings["default_admin"]["password"], + ) + + page.goto( + f"{flask_server['base_url']}/admin/users", wait_until="domcontentloaded" + ) + csrf_token = page.locator('input[name="csrf_token"]').first.get_attribute( + "value" + ) + + response = page.request.post( + f"{flask_server['base_url']}/admin/users", + form={ + "csrf_token": csrf_token, + "username": username, + "user_delete_button": "1", + }, + ) + assert response.ok + + deleted_user = get_user_by_username(str(db_path), username) + assert deleted_user is None, "Deleted user should no longer exist in database" + + +class TestAdminContent: + """Tests for admin content management pages.""" + + @pytest.mark.admin + def test_admin_comments_page_shows_comments( + self, logged_in_page, flask_server, db_path + ): + """Admin comments page should display created comments.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Admin Comment Post {seed}", + content=f"Content for admin comment test {seed}", + abstract=f"Abstract for admin comment test {seed}. " + "A" * 160, + ) + + comment_text = f"Admin visible comment {seed} with enough text to display." + create_test_comment( + db_path=str(db_path), + post_id=post["id"], + comment=comment_text, + username="admin", + ) + + logged_in_page.goto( + f"{flask_server['base_url']}/admin/comments", wait_until="domcontentloaded" + ) + + expect(logged_in_page.locator("body")).to_contain_text( + comment_text, timeout=5000 + ) diff --git a/tests/e2e/auth/test_logout.py b/tests/e2e/auth/test_logout.py index dd1b9b643..bfcb4939f 100644 --- a/tests/e2e/auth/test_logout.py +++ b/tests/e2e/auth/test_logout.py @@ -216,7 +216,9 @@ def test_logout_regular_user(self, page, flask_server, test_user): # Login as test user login_page.navigate("/login/redirect=&") login_page.login(test_user.username, test_user.password) - page.wait_for_url("**/", timeout=5000) + expect(page).to_have_url( + re.compile(rf"^{re.escape(flask_server['base_url'])}/?$"), timeout=10000 + ) # Verify logged in navbar.expect_logged_in() diff --git a/tests/e2e/helpers/__init__.py b/tests/e2e/helpers/__init__.py index 06ae7fb75..37fbba44f 100644 --- a/tests/e2e/helpers/__init__.py +++ b/tests/e2e/helpers/__init__.py @@ -1,14 +1,28 @@ # Test Helpers Package from tests.e2e.helpers.database_helpers import ( + create_test_comment, + create_test_post, reset_database, create_test_user, + get_comment_by_id, + get_comment_for_post, + get_post_by_title, + get_post_by_url_id, + get_post_views, get_user_by_username, ) from tests.e2e.helpers.test_data import UserData __all__ = [ + "create_test_comment", + "create_test_post", "reset_database", "create_test_user", + "get_comment_by_id", + "get_comment_for_post", + "get_post_by_title", + "get_post_by_url_id", + "get_post_views", "get_user_by_username", "UserData", ] diff --git a/tests/e2e/helpers/database_helpers.py b/tests/e2e/helpers/database_helpers.py index 6c727d924..b7c56e747 100644 --- a/tests/e2e/helpers/database_helpers.py +++ b/tests/e2e/helpers/database_helpers.py @@ -3,6 +3,8 @@ """ import sqlite3 +import time +import uuid from passlib.hash import sha512_crypt as encryption @@ -137,3 +139,195 @@ def get_user_points(db_path: str, username: str) -> int | None: if user: return user.get("points", 0) return None + + +def create_test_post( + db_path: str, + title: str, + content: str, + abstract: str, + author: str = "admin", + tags: str = "test,post", + category: str = "Technology", + url_id: str | None = None, + banner: bytes | None = None, + views: int = 0, +) -> dict: + """ + Create a test post in the database. + Returns a dictionary with id, url_id, and title. + """ + conn = get_db_connection(db_path) + cursor = conn.cursor() + + try: + now = int(time.time()) + resolved_url_id = url_id or f"testpost_{uuid.uuid4().hex[:12]}" + resolved_banner = banner if banner is not None else b"test-banner-image" + + cursor.execute( + """ + INSERT INTO posts ( + title, + tags, + content, + banner, + author, + views, + time_stamp, + last_edit_time_stamp, + category, + url_id, + abstract + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + title, + tags, + content, + resolved_banner, + author, + views, + now, + now, + category, + resolved_url_id, + abstract, + ), + ) + conn.commit() + + return { + "id": cursor.lastrowid, + "url_id": resolved_url_id, + "title": title, + } + finally: + conn.close() + + +def get_post_by_title(db_path: str, title: str) -> dict | None: + """ + Get post data by exact title match. + Returns a dictionary with post fields or None if not found. + """ + conn = get_db_connection(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + cursor.execute("SELECT * FROM posts WHERE title = ?", (title,)) + row = cursor.fetchone() + + if row: + return dict(row) + return None + finally: + conn.close() + + +def get_post_by_url_id(db_path: str, url_id: str) -> dict | None: + """ + Get post data by URL ID. + Returns a dictionary with post fields or None if not found. + """ + conn = get_db_connection(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + cursor.execute("SELECT * FROM posts WHERE url_id = ?", (url_id,)) + row = cursor.fetchone() + + if row: + return dict(row) + return None + finally: + conn.close() + + +def get_post_views(db_path: str, url_id: str) -> int | None: + """ + Get post view count by URL ID. + Returns the views value or None if post not found. + """ + post = get_post_by_url_id(db_path, url_id) + if post: + return post.get("views", 0) + return None + + +def get_comment_for_post(db_path: str, post_id: int, comment_text: str) -> dict | None: + """ + Get a specific comment by post ID and exact comment text. + Returns a dictionary with comment fields or None if not found. + """ + conn = get_db_connection(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + cursor.execute( + """ + SELECT * FROM comments + WHERE post_id = ? AND comment = ? + ORDER BY time_stamp DESC + LIMIT 1 + """, + (post_id, comment_text), + ) + row = cursor.fetchone() + + if row: + return dict(row) + return None + finally: + conn.close() + + +def create_test_comment( + db_path: str, + post_id: int, + comment: str, + username: str = "admin", +) -> int: + """ + Create a test comment for a post. + Returns the created comment ID. + """ + conn = get_db_connection(db_path) + cursor = conn.cursor() + + try: + cursor.execute( + """ + INSERT INTO comments (post_id, comment, username, time_stamp) + VALUES (?, ?, ?, ?) + """, + (post_id, comment, username, int(time.time())), + ) + conn.commit() + return cursor.lastrowid + finally: + conn.close() + + +def get_comment_by_id(db_path: str, comment_id: int) -> dict | None: + """ + Get comment data by comment ID. + Returns a dictionary with comment fields or None if not found. + """ + conn = get_db_connection(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + cursor.execute("SELECT * FROM comments WHERE id = ?", (comment_id,)) + row = cursor.fetchone() + + if row: + return dict(row) + return None + finally: + conn.close() diff --git a/tests/e2e/home/__init__.py b/tests/e2e/home/__init__.py new file mode 100644 index 000000000..bb4f9c2f2 --- /dev/null +++ b/tests/e2e/home/__init__.py @@ -0,0 +1 @@ +# Home E2E Tests Package diff --git a/tests/e2e/home/test_home.py b/tests/e2e/home/test_home.py new file mode 100644 index 000000000..14f15f19f --- /dev/null +++ b/tests/e2e/home/test_home.py @@ -0,0 +1,66 @@ +""" +E2E tests for the home page. +""" + +import uuid + +import pytest +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import create_test_post + + +def _suffix() -> str: + return uuid.uuid4().hex[:8] + + +class TestHomePage: + """Tests for home page rendering and sorting.""" + + @pytest.mark.smoke + def test_home_page_loads_and_shows_posts(self, page, flask_server, db_path): + """Home page should display a recently created post when sorted by newest.""" + seed = _suffix() + title = f"Home Test Post {seed}" + create_test_post( + db_path=str(db_path), + title=title, + content=f"Content for home test {seed}", + abstract=f"Abstract for home test {seed}. " + "A" * 160, + ) + + page.goto( + f"{flask_server['base_url']}/by=time_stamp/sort=desc", + wait_until="domcontentloaded", + ) + + expect(page.locator("body")).to_contain_text(title, timeout=5000) + + def test_home_page_sorting_by_views(self, page, flask_server, db_path): + """Posts sorted by views descending should show the highest-views post on page 1.""" + seed = _suffix() + high_title = f"High Views Post {seed}" + + create_test_post( + db_path=str(db_path), + title=high_title, + content=f"Content for high views {seed}", + abstract=f"Abstract for high views {seed}. " + "A" * 160, + views=999999, + ) + + page.goto( + f"{flask_server['base_url']}/by=views/sort=desc", + wait_until="domcontentloaded", + ) + + expect(page.locator("body")).to_contain_text(high_title, timeout=5000) + + def test_home_page_invalid_sort_redirects(self, page, flask_server): + """Invalid sort parameters should redirect back to /.""" + page.goto( + f"{flask_server['base_url']}/by=invalid/sort=desc", + wait_until="domcontentloaded", + ) + + expect(page).to_have_url(f"{flask_server['base_url']}/", timeout=5000) diff --git a/tests/e2e/pages/__init__.py b/tests/e2e/pages/__init__.py index cebc81b9d..0462909f5 100644 --- a/tests/e2e/pages/__init__.py +++ b/tests/e2e/pages/__init__.py @@ -1,7 +1,16 @@ # Page Objects Package from tests.e2e.pages.base_page import BasePage +from tests.e2e.pages.create_post_page import CreatePostPage from tests.e2e.pages.login_page import LoginPage from tests.e2e.pages.signup_page import SignupPage from tests.e2e.pages.navbar_component import NavbarComponent +from tests.e2e.pages.post_page import PostPage -__all__ = ["BasePage", "LoginPage", "SignupPage", "NavbarComponent"] +__all__ = [ + "BasePage", + "CreatePostPage", + "LoginPage", + "SignupPage", + "NavbarComponent", + "PostPage", +] diff --git a/tests/e2e/pages/create_post_page.py b/tests/e2e/pages/create_post_page.py new file mode 100644 index 000000000..f4483d845 --- /dev/null +++ b/tests/e2e/pages/create_post_page.py @@ -0,0 +1,77 @@ +""" +Create Post Page Object for interacting with the create-post form. +""" + +from playwright.sync_api import Page, expect + +from tests.e2e.pages.base_page import BasePage + + +class CreatePostPage(BasePage): + """Page object for the create-post page.""" + + def __init__(self, page: Page, base_url: str): + super().__init__(page, base_url) + + self.title_input = 'input[name="post_title"]' + self.tags_input = 'input[name="post_tags"]' + self.abstract_input = 'textarea[name="post_abstract"]' + self.banner_input = 'input[name="post_banner"]' + self.category_select = 'select[name="post_category"]' + self.content_input = 'textarea[name="post_content"]' + self.submit_button = 'button[type="submit"]' + self.csrf_token = 'input[name="csrf_token"]' + + def navigate(self, path: str = "/create-post"): + """Navigate to the create-post page.""" + return super().navigate(path) + + def fill_title(self, title: str): + self.page.fill(self.title_input, title) + return self + + def fill_tags(self, tags: str): + self.page.fill(self.tags_input, tags) + return self + + def fill_abstract(self, abstract: str): + self.page.fill(self.abstract_input, abstract) + return self + + def fill_content(self, content: str): + self.page.fill(self.content_input, content) + return self + + def select_category(self, category: str): + self.page.select_option(self.category_select, category) + return self + + def click_submit(self): + self.page.click(self.submit_button) + return self + + def create_post( + self, + title: str, + tags: str, + abstract: str, + content: str, + category: str = "Technology", + ): + self.fill_title(title) + self.fill_tags(tags) + self.fill_abstract(abstract) + self.fill_content(content) + self.select_category(category) + self.click_submit() + return self + + def expect_page_loaded(self): + expect(self.page.locator(self.title_input)).to_be_visible() + expect(self.page.locator(self.tags_input)).to_be_visible() + expect(self.page.locator(self.abstract_input)).to_be_visible() + expect(self.page.locator(self.content_input)).to_be_visible() + expect(self.page.locator(self.category_select)).to_be_visible() + expect(self.page.locator(self.submit_button)).to_be_visible() + expect(self.page.locator(self.csrf_token)).to_be_attached() + return self diff --git a/tests/e2e/pages/post_page.py b/tests/e2e/pages/post_page.py new file mode 100644 index 000000000..02a102c38 --- /dev/null +++ b/tests/e2e/pages/post_page.py @@ -0,0 +1,64 @@ +""" +Post Page Object for interacting with post detail and comments. +""" + +from playwright.sync_api import Page, expect + +from tests.e2e.pages.base_page import BasePage + + +class PostPage(BasePage): + """Page object for the post detail page.""" + + def __init__(self, page: Page, base_url: str): + super().__init__(page, base_url) + + self.post_title = "article h1" + self.views_count = "i.ti-eye + span" + self.comment_input = 'textarea[name="comment"]' + self.comment_submit_button = ( + 'form:has(textarea[name="comment"]) button[type="submit"]' + ) + self.post_delete_button = 'button[name="post_delete_button"]' + self.edit_post_link = 'a[href^="/edit-post/"]' + + def navigate(self, url_id: str, slug: str | None = None): + """Navigate to a post by URL ID, optionally with slug.""" + if slug: + return super().navigate(f"/post/{slug}-{url_id}") + return super().navigate(f"/post/{url_id}") + + def expect_page_loaded(self, timeout: int = 5000): + expect(self.page.locator(self.post_title).first).to_be_visible(timeout=timeout) + return self + + def get_views_count(self) -> int: + views_text = self.page.locator(self.views_count).first.inner_text().strip() + return int(views_text) + + def fill_comment(self, comment: str): + self.page.fill(self.comment_input, comment) + return self + + def submit_comment(self): + self.page.click(self.comment_submit_button) + return self + + def add_comment(self, comment: str): + self.fill_comment(comment) + self.submit_comment() + return self + + def expect_comment_visible(self, comment: str, timeout: int = 5000): + expect(self.page.get_by_text(comment, exact=False).first).to_be_visible( + timeout=timeout + ) + return self + + def delete_post(self): + self.page.click(self.post_delete_button) + return self + + def click_edit_post(self): + self.page.click(self.edit_post_link) + return self diff --git a/tests/e2e/post/__init__.py b/tests/e2e/post/__init__.py new file mode 100644 index 000000000..251c5294b --- /dev/null +++ b/tests/e2e/post/__init__.py @@ -0,0 +1 @@ +# Post E2E Tests Package diff --git a/tests/e2e/post/test_post.py b/tests/e2e/post/test_post.py new file mode 100644 index 000000000..80d89e4f4 --- /dev/null +++ b/tests/e2e/post/test_post.py @@ -0,0 +1,505 @@ +""" +E2E tests for post-related functionality. +""" + +import re +import uuid + +import pytest +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import ( + create_test_comment, + create_test_post, + get_comment_by_id, + get_comment_for_post, + get_post_by_title, + get_post_by_url_id, + get_post_views, + get_user_points, +) +from tests.e2e.pages.base_page import BasePage +from tests.e2e.pages.create_post_page import CreatePostPage +from tests.e2e.pages.login_page import LoginPage +from tests.e2e.pages.post_page import PostPage + + +def _suffix() -> str: + return uuid.uuid4().hex[:8] + + +def _valid_abstract(seed: str) -> str: + return (f"Abstract for {seed}. " + ("A" * 170))[:180] + + +def _valid_content(seed: str) -> str: + return f"Content for {seed}. " + ("B" * 140) + + +def _login(page, flask_server, username: str, password: str): + login_page = LoginPage(page, flask_server["base_url"]) + login_page.navigate("/login/redirect=&") + login_page.login(username, password) + page.wait_for_url("**/", timeout=5000) + + +def _get_csrf_token(page) -> str: + token = page.locator('input[name="csrf_token"]').first.get_attribute("value") + assert token is not None and token != "" + return token + + +class TestCreatePostAccess: + """Tests for create-post access and rendering.""" + + @pytest.mark.auth + def test_create_post_requires_login(self, page, flask_server): + """Unauthenticated users should be redirected to login for /create-post.""" + create_post_page = CreatePostPage(page, flask_server["base_url"]) + create_post_page.navigate() + + expect(page).to_have_url( + re.compile(rf"^{re.escape(flask_server['base_url'])}/login/.*$"), + timeout=10000, + ) + + @pytest.mark.smoke + @pytest.mark.auth + def test_create_post_page_renders_for_logged_in_user( + self, logged_in_page, flask_server + ): + """Authenticated users can load the create-post page.""" + create_post_page = CreatePostPage(logged_in_page, flask_server["base_url"]) + create_post_page.navigate() + create_post_page.expect_page_loaded() + + +class TestCreatePostFlow: + """Tests for post creation success and errors.""" + + @pytest.mark.smoke + @pytest.mark.auth + def test_create_post_with_valid_data_persists_in_database( + self, page, flask_server, test_user, db_path + ): + """Valid create-post submission should save a new post and award points.""" + db_path_str = str(db_path) + _login(page, flask_server, test_user.username, test_user.password) + + points_before = get_user_points(db_path_str, test_user.username) + assert points_before is not None + + seed = _suffix() + post_title = f"E2E Post {seed}" + + create_post_page = CreatePostPage(page, flask_server["base_url"]) + create_post_page.navigate() + create_post_page.create_post( + title=post_title, + tags="e2e,test,post", + abstract=_valid_abstract(seed), + content=_valid_content(seed), + category="Technology", + ) + + page.wait_for_url("**/", timeout=5000) + create_post_page.expect_success_flash() + + post = get_post_by_title(db_path_str, post_title) + assert post is not None, "Created post should exist in database" + assert post["author"] == test_user.username + assert post["category"] == "Technology" + assert post["views"] == 0 + + points_after = get_user_points(db_path_str, test_user.username) + assert points_after == points_before + 20 + + @pytest.mark.auth + def test_create_post_with_empty_content_shows_error( + self, page, flask_server, test_user, db_path + ): + """Submitting create-post with empty content should fail and not create a post.""" + db_path_str = str(db_path) + _login(page, flask_server, test_user.username, test_user.password) + + seed = _suffix() + post_title = f"Invalid Empty Content Post {seed}" + + create_post_page = CreatePostPage(page, flask_server["base_url"]) + create_post_page.navigate() + create_post_page.fill_title(post_title) + create_post_page.fill_tags("invalid,e2e") + create_post_page.fill_abstract(_valid_abstract(seed)) + create_post_page.fill_content("") + create_post_page.select_category("Technology") + create_post_page.click_submit() + + create_post_page.expect_error_flash() + post = get_post_by_title(db_path_str, post_title) + assert post is None, "Post should not be created when content is empty" + + +class TestPostRoutingAndViews: + """Tests for post routing and view counting behavior.""" + + @pytest.mark.auth + def test_post_url_redirects_to_canonical_slug(self, page, flask_server, db_path): + """Visiting /post/ should redirect to /post/-.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Canonical URL Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + ) + + page.goto(f"{flask_server['base_url']}/post/{post['url_id']}") + expect(page).to_have_url( + re.compile( + rf"^{re.escape(flask_server['base_url'])}/post/.+-{re.escape(post['url_id'])}/?$" + ), + timeout=10000, + ) + + @pytest.mark.auth + def test_post_views_increment_when_post_is_opened( + self, page, flask_server, db_path + ): + """Opening a post should increment its views by 1.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Views Counter Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + views=7, + ) + + post_page = PostPage(page, flask_server["base_url"]) + post_page.navigate(post["url_id"]) + post_page.expect_page_loaded() + + updated_views = get_post_views(str(db_path), post["url_id"]) + assert updated_views == 8 + assert post_page.get_views_count() == 8 + + +class TestPostComments: + """Tests for post comment behavior.""" + + @pytest.mark.auth + def test_logged_in_user_can_comment_on_post( + self, page, flask_server, test_user, db_path + ): + """Authenticated user can comment on a post and comment is persisted.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Commentable Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author="admin", + ) + + _login(page, flask_server, test_user.username, test_user.password) + + post_page = PostPage(page, flask_server["base_url"]) + post_page.navigate(post["url_id"]) + post_page.expect_page_loaded() + + comment_text = f"This is a valid E2E comment {seed} with enough characters." + post_page.add_comment(comment_text) + + base_page = BasePage(page, flask_server["base_url"]) + base_page.expect_success_flash() + page.wait_for_url("**/post/**", timeout=5000) + post_page.expect_comment_visible(comment_text) + + stored_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert stored_post is not None + + stored_comment = get_comment_for_post( + db_path=str(db_path), + post_id=stored_post["id"], + comment_text=comment_text, + ) + assert stored_comment is not None, "Comment should be saved in database" + assert stored_comment["username"] == test_user.username + + +class TestPostEditAndDelete: + """Tests for post edit and delete authorization/behavior.""" + + @pytest.mark.auth + def test_author_can_edit_own_post(self, page, flask_server, test_user, db_path): + """Post author can edit their own post.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Editable Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author=test_user.username, + ) + + _login(page, flask_server, test_user.username, test_user.password) + + editor_page = CreatePostPage(page, flask_server["base_url"]) + editor_page.navigate(f"/edit-post/{post['url_id']}") + editor_page.expect_page_loaded() + + updated_seed = _suffix() + updated_title = f"Updated Post {updated_seed}" + updated_abstract = _valid_abstract(updated_seed) + updated_content = _valid_content(updated_seed) + editor_page.create_post( + title=updated_title, + tags="updated,e2e,post", + abstract=updated_abstract, + content=updated_content, + category="Science", + ) + + expect(page).to_have_url( + re.compile(rf"^{re.escape(flask_server['base_url'])}/post/.*$"), + timeout=10000, + ) + + updated_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert updated_post is not None + assert updated_post["title"] == updated_title + assert updated_post["tags"] == "updated,e2e,post" + assert updated_post["category"] == "Science" + assert updated_post["abstract"] == updated_abstract + + @pytest.mark.auth + def test_non_author_cannot_edit_post(self, page, flask_server, test_user, db_path): + """Non-author user should be redirected away from edit-post page.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Admin Owned Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author="admin", + ) + + _login(page, flask_server, test_user.username, test_user.password) + page.goto(f"{flask_server['base_url']}/edit-post/{post['url_id']}") + + page.wait_for_url("**/", timeout=5000) + assert "/edit-post/" not in page.url + + base_page = BasePage(page, flask_server["base_url"]) + base_page.expect_error_flash() + + @pytest.mark.auth + def test_author_can_delete_own_post(self, page, flask_server, test_user, db_path): + """Post author can delete their own post from the post page.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Deletable Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author=test_user.username, + ) + + _login(page, flask_server, test_user.username, test_user.password) + + post_page = PostPage(page, flask_server["base_url"]) + post_page.navigate(post["url_id"]) + post_page.expect_page_loaded() + post_page.delete_post() + + page.wait_for_url("**/", timeout=5000) + deleted_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert deleted_post is None, "Post should be deleted from database" + + +class TestPostAuthorizationEdgeCases: + """Tests for permission checks on forged post/comment deletion requests.""" + + @pytest.mark.auth + def test_non_author_cannot_delete_post_via_forged_request( + self, page, flask_server, test_user, db_path + ): + """Non-author should not be able to delete a post by forging POST payload.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Protected Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author="admin", + ) + + _login(page, flask_server, test_user.username, test_user.password) + + post_page = PostPage(page, flask_server["base_url"]) + post_page.navigate(post["url_id"]) + post_page.expect_page_loaded() + + csrf_token = _get_csrf_token(page) + canonical_url = page.url + + response = page.request.post( + canonical_url, + form={ + "csrf_token": csrf_token, + "post_delete_button": "1", + }, + ) + assert response.ok + + page.goto(canonical_url) + protected_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert protected_post is not None, "Post must remain when deleted by non-author" + + @pytest.mark.auth + def test_non_owner_cannot_delete_comment_via_forged_request( + self, page, flask_server, test_user, db_path + ): + """Non-owner should not be able to delete another user's comment by forging POST payload.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Comment Protected Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author="admin", + ) + + saved_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert saved_post is not None + + comment_text = f"Admin owned comment {seed} with enough text to be realistic." + comment_id = create_test_comment( + db_path=str(db_path), + post_id=saved_post["id"], + comment=comment_text, + username="admin", + ) + + _login(page, flask_server, test_user.username, test_user.password) + + post_page = PostPage(page, flask_server["base_url"]) + post_page.navigate(post["url_id"]) + post_page.expect_page_loaded() + post_page.expect_comment_visible(comment_text) + + csrf_token = _get_csrf_token(page) + canonical_url = page.url + + response = page.request.post( + canonical_url, + form={ + "csrf_token": csrf_token, + "comment_delete_button": "1", + "comment_id": str(comment_id), + }, + ) + assert response.ok + + page.goto(canonical_url) + protected_comment = get_comment_by_id(str(db_path), comment_id) + assert protected_comment is not None, ( + "Comment must remain when deleted by non-owner" + ) + + @pytest.mark.auth + @pytest.mark.admin + def test_admin_can_delete_other_users_post_via_forged_request( + self, page, flask_server, test_user, app_settings, db_path + ): + """Admin should be able to delete another user's post via authorized POST.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Admin Deletable Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author=test_user.username, + ) + + _login( + page, + flask_server, + app_settings["default_admin"]["username"], + app_settings["default_admin"]["password"], + ) + + post_page = PostPage(page, flask_server["base_url"]) + post_page.navigate(post["url_id"]) + post_page.expect_page_loaded() + + csrf_token = _get_csrf_token(page) + canonical_url = page.url + + response = page.request.post( + canonical_url, + form={ + "csrf_token": csrf_token, + "post_delete_button": "1", + }, + ) + assert response.ok + + deleted_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert deleted_post is None, ( + "Admin should be able to remove another user's post" + ) + + @pytest.mark.auth + @pytest.mark.admin + def test_admin_can_delete_other_users_comment_via_forged_request( + self, page, flask_server, test_user, app_settings, db_path + ): + """Admin should be able to delete another user's comment via authorized POST.""" + seed = _suffix() + post = create_test_post( + db_path=str(db_path), + title=f"Admin Comment Delete Post {seed}", + content=_valid_content(seed), + abstract=_valid_abstract(seed), + author="admin", + ) + + saved_post = get_post_by_url_id(str(db_path), post["url_id"]) + assert saved_post is not None + + comment_id = create_test_comment( + db_path=str(db_path), + post_id=saved_post["id"], + comment=f"Comment owned by {test_user.username} {seed} with enough length.", + username=test_user.username, + ) + + _login( + page, + flask_server, + app_settings["default_admin"]["username"], + app_settings["default_admin"]["password"], + ) + + post_page = PostPage(page, flask_server["base_url"]) + post_page.navigate(post["url_id"]) + post_page.expect_page_loaded() + + csrf_token = _get_csrf_token(page) + canonical_url = page.url + + response = page.request.post( + canonical_url, + form={ + "csrf_token": csrf_token, + "comment_delete_button": "1", + "comment_id": str(comment_id), + }, + ) + assert response.ok + + deleted_comment = get_comment_by_id(str(db_path), comment_id) + assert deleted_comment is None, ( + "Admin should be able to remove another user's comment" + ) diff --git a/tests/e2e/search/__init__.py b/tests/e2e/search/__init__.py new file mode 100644 index 000000000..0488f5c97 --- /dev/null +++ b/tests/e2e/search/__init__.py @@ -0,0 +1 @@ +# Search E2E Tests Package diff --git a/tests/e2e/search/test_category.py b/tests/e2e/search/test_category.py new file mode 100644 index 000000000..83c47938f --- /dev/null +++ b/tests/e2e/search/test_category.py @@ -0,0 +1,64 @@ +""" +E2E tests for the category page. +""" + +import uuid + +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import create_test_post + + +def _suffix() -> str: + return uuid.uuid4().hex[:8] + + +class TestCategory: + """Tests for category page filtering and error handling.""" + + def test_category_page_shows_posts_from_category(self, page, flask_server, db_path): + """Category page should display posts belonging to that category.""" + seed = _suffix() + title = f"Science Post {seed}" + create_test_post( + db_path=str(db_path), + title=title, + content=f"Content for category test {seed}", + abstract=f"Abstract for category test {seed}. " + "A" * 160, + category="Science", + ) + + page.goto( + f"{flask_server['base_url']}/category/Science", + wait_until="domcontentloaded", + ) + + expect(page.locator("body")).to_contain_text(title, timeout=5000) + + def test_category_excludes_other_categories(self, page, flask_server, db_path): + """Category page should not display posts from other categories.""" + seed = _suffix() + title = f"Science Only Post {seed}" + create_test_post( + db_path=str(db_path), + title=title, + content=f"Content for exclusion test {seed}", + abstract=f"Abstract for exclusion test {seed}. " + "A" * 160, + category="Science", + ) + + page.goto( + f"{flask_server['base_url']}/category/Technology", + wait_until="domcontentloaded", + ) + + expect(page.locator("body")).not_to_contain_text(title, timeout=5000) + + def test_invalid_category_returns_404(self, page, flask_server): + """Navigating to a nonexistent category should return a 404 page.""" + page.goto( + f"{flask_server['base_url']}/category/nonexistent", + wait_until="domcontentloaded", + ) + + expect(page.locator("h1")).to_contain_text("404", timeout=5000) diff --git a/tests/e2e/search/test_search.py b/tests/e2e/search/test_search.py new file mode 100644 index 000000000..031dd2749 --- /dev/null +++ b/tests/e2e/search/test_search.py @@ -0,0 +1,64 @@ +""" +E2E tests for the search page. +""" + +import uuid + +from playwright.sync_api import expect + +from tests.e2e.helpers.database_helpers import create_test_post, create_test_user + + +def _suffix() -> str: + return uuid.uuid4().hex[:8] + + +class TestSearch: + """Tests for search functionality.""" + + def test_search_finds_post_by_title(self, page, flask_server, db_path): + """Searching for a post title should show the matching post.""" + seed = _suffix() + title = f"Searchable Post {seed}" + create_test_post( + db_path=str(db_path), + title=title, + content=f"Content for search test {seed}", + abstract=f"Abstract for search test {seed}. " + "A" * 160, + ) + + page.goto( + f"{flask_server['base_url']}/search/{title}", + wait_until="domcontentloaded", + ) + + expect(page.locator("body")).to_contain_text(title, timeout=5000) + + def test_search_finds_user_by_username(self, page, flask_server, db_path): + """Searching for a username should show the matching user.""" + seed = _suffix() + username = f"searchuser{seed}" + create_test_user( + db_path=str(db_path), + username=username, + email=f"{username}@test.com", + password="TestPassword123!", + ) + + page.goto( + f"{flask_server['base_url']}/search/{username}", + wait_until="domcontentloaded", + ) + + expect(page.locator("body")).to_contain_text(username, timeout=5000) + + def test_search_no_results_shows_empty_state(self, page, flask_server): + """Searching for a nonexistent term should show an empty state alert.""" + random_query = uuid.uuid4().hex + + page.goto( + f"{flask_server['base_url']}/search/{random_query}", + wait_until="domcontentloaded", + ) + + expect(page.locator(".alert-warning").first).to_be_visible(timeout=5000)