diff --git a/hookwise/auth.py b/hookwise/auth.py index 0389d54..2990b23 100644 --- a/hookwise/auth.py +++ b/hookwise/auth.py @@ -11,7 +11,7 @@ from .extensions import db, limiter from .models import User -from .utils import auth_required, log_audit +from .utils import auth_required, decrypt_string, encrypt_string, log_audit def _bp() -> Any: @@ -40,7 +40,7 @@ def login() -> Any: otp = request.form.get("otp") user = User.query.get(pending_user_id) - if user and pyotp.TOTP(cast(str, user.otp_secret)).verify(cast(str, otp)): + if user and pyotp.TOTP(decrypt_string(cast(str, user.otp_secret))).verify(cast(str, otp)): # Success session["user_id"] = user.id session["username"] = user.username @@ -93,7 +93,7 @@ def setup_2fa() -> Any: otp = request.form.get("otp") secret = session.get("pending_otp_secret") if secret and pyotp.TOTP(cast(str, secret)).verify(cast(str, otp)): - user.otp_secret = secret + user.otp_secret = encrypt_string(cast(str, secret)) user.is_2fa_enabled = True db.session.commit() session.pop("pending_otp_secret") diff --git a/hookwise/models.py b/hookwise/models.py index 549f2f1..f911a14 100644 --- a/hookwise/models.py +++ b/hookwise/models.py @@ -19,7 +19,7 @@ class User(Base): id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) username = db.Column(db.String(100), unique=True, nullable=False) password_hash = db.Column(db.String(256), nullable=False) - otp_secret = db.Column(db.String(32)) + otp_secret = db.Column(db.String(256)) is_2fa_enabled = db.Column(db.Boolean, default=False, nullable=False) role = db.Column(db.String(20), default="user") # admin, user created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) diff --git a/tests/conftest.py b/tests/conftest.py index 5399d8a..5482412 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os +from unittest.mock import patch os.environ["SOCKETIO_ASYNC_MODE"] = "threading" os.environ["SECRET_KEY"] = "test-secret" @@ -6,11 +7,21 @@ os.environ["DATABASE_URL"] = "sqlite:///:memory:" os.environ["TESTING"] = "true" os.environ["LIMITER_STORAGE_URI"] = "memory://" +os.environ["GUI_PASSWORD"] = "admin" # Set GUI_PASSWORD to avoid startup error import pytest - @pytest.fixture(autouse=True, scope="session") def setup_test_env(): # Already set at top level, but kept for clarity pass + +@pytest.fixture(autouse=True) +def mock_redis(): + """Mock Redis to avoid connection errors in before_request check_maintenance.""" + # Need to patch in all modules that might use it + with patch("hookwise.tasks.redis_client") as m1, patch("hookwise.api.redis_client") as m2, patch("hookwise.extensions.redis_client") as m3: + m1.get.return_value = None + m2.get.return_value = None + m3.get.return_value = None + yield (m1, m2, m3) diff --git a/tests/test_auth_flow.py b/tests/test_auth_flow.py index 3174dac..b7e165b 100644 --- a/tests/test_auth_flow.py +++ b/tests/test_auth_flow.py @@ -7,6 +7,7 @@ from hookwise import create_app from hookwise.extensions import db from hookwise.models import User +from hookwise.utils import encrypt_string @pytest.fixture @@ -46,7 +47,7 @@ def sample_users(app): secret = pyotp.random_base32() u2 = User(username="user2", password_hash=generate_password_hash("pass2")) u2.is_2fa_enabled = True - u2.otp_secret = secret + u2.otp_secret = encrypt_string(secret) db.session.add_all([u1, u2]) db.session.commit() diff --git a/tests/test_otp_encryption.py b/tests/test_otp_encryption.py new file mode 100644 index 0000000..37a9a50 --- /dev/null +++ b/tests/test_otp_encryption.py @@ -0,0 +1,96 @@ +import pyotp +import pytest +from unittest.mock import patch +from werkzeug.security import generate_password_hash +from hookwise import create_app +from hookwise.extensions import db +from hookwise.models import User +from hookwise.utils import decrypt_string, encrypt_string + +@pytest.fixture +def app(): + app = create_app() + app.config["TESTING"] = True + app.config["WTF_CSRF_ENABLED"] = False + app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" + app.config["ENCRYPTION_KEY"] = "8_I5vGgB2kZ3T6q9xWp-1uY4sN7rL0cI2mP5vA8eK1M=" + return app + +@pytest.fixture +def client(app): + with app.app_context(): + db.create_all() + yield app.test_client() + db.session.remove() + db.drop_all() + +@pytest.fixture(autouse=True) +def mock_redis(): + """Mock Redis to avoid connection errors in before_request check_maintenance.""" + # Need to patch in all modules that might use it + with patch("hookwise.tasks.redis_client") as m1, patch("hookwise.api.redis_client") as m2, patch("hookwise.extensions.redis_client") as m3: + m1.get.return_value = None + m2.get.return_value = None + m3.get.return_value = None + yield (m1, m2, m3) + +def test_otp_secret_is_encrypted_in_db(client, app): + """Verify that OTP secret is encrypted when saved and decrypted when read.""" + with app.app_context(): + # Setup a user + u = User(username="testuser", password_hash=generate_password_hash("testpass")) + db.session.add(u) + db.session.commit() + + # Login + client.post("/login", data={"username": "testuser", "password": "testpass"}) + + # Enable 2FA + # 1. Get the secret from session (simulating setup_2fa GET) + with client.session_transaction() as sess: + secret = pyotp.random_base32() + sess["pending_otp_secret"] = secret + sess["user_id"] = u.id + + # 2. Verify with OTP + totp = pyotp.TOTP(secret) + otp = totp.now() + client.post("/settings/2fa/setup", data={"otp": otp}) + + # Check database + user_in_db = User.query.filter_by(username="testuser").first() + assert user_in_db.otp_secret != secret + assert decrypt_string(user_in_db.otp_secret) == secret + + # Verify authentication still works + # Logout first + client.get("/logout") + + # Login with credentials -> should get 2FA prompt + resp = client.post("/login", data={"username": "testuser", "password": "testpass"}) + assert b"Verify Code" in resp.data + + # Submit OTP + resp = client.post("/login", data={"otp": otp}, follow_redirects=True) + assert resp.status_code == 200 + assert b"Logout" in resp.data + +def test_backward_compatibility(client, app): + """Verify that plaintext secrets still work (for migration period).""" + with app.app_context(): + secret = pyotp.random_base32() + u = User(username="legacyuser", password_hash=generate_password_hash("testpass")) + u.is_2fa_enabled = True + u.otp_secret = secret # Plaintext + db.session.add(u) + db.session.commit() + + # Login + client.post("/login", data={"username": "legacyuser", "password": "testpass"}) + + # Verify OTP + totp = pyotp.TOTP(secret) + otp = totp.now() + resp = client.post("/login", data={"otp": otp}, follow_redirects=True) + assert resp.status_code == 200 + assert b"Logout" in resp.data