Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 56 additions & 61 deletions hookwise/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from datetime import date, datetime, timedelta, timezone
from datetime import time as dtime
from typing import Any, Tuple, cast
from typing import Any, Dict, Tuple, cast

from flask import Response, current_app, flash, jsonify, redirect, render_template, request, session, url_for
from prometheus_client import CONTENT_TYPE_LATEST, Gauge, generate_latest
Expand All @@ -21,11 +21,59 @@
QUEUE_SIZE = Gauge("hookwise_celery_queue_size", "Approximate number of tasks in queue")


def _format_webhook_log(log: WebhookLog) -> Dict[str, Any]:
"""Format a WebhookLog for the activity history API."""
# Reconstruct the message based on status and action
# This mimics the log_to_web calls in tasks.py
message = log.error_message or "Processed"
level = "info"

if log.status == "failed":
message = log.error_message or "Unknown error"
level = "error"
elif log.status == "skipped":
err_msg = log.error_message or "No action required"
prefix = "Skipped: "
# Prevent double-prefixing if the message already starts with "Skipped:"
if err_msg.strip().startswith("Skipped:"):
message = err_msg
else:
message = f"{prefix}{err_msg}"
level = "info"
elif log.status == "processed":
if log.action == "create":
message = f"Created NEW ticket (ID: {log.ticket_id})"
level = "warning"
elif log.action == "update":
if not log.error_message:
message = f"Updated existing ticket (ID: {log.ticket_id})"
level = "info"
elif log.action == "close":
message = f"Closed ticket (ID: {log.ticket_id})"
level = "success"

payload_data: Any = {"raw": log.payload}
if log.payload and log.payload.startswith(("{", "[")):
try:
payload_data = json.loads(log.payload)
except (json.JSONDecodeError, TypeError):
pass

return {
"timestamp": log.created_at.isoformat(),
"message": message,
"level": level,
"config_name": log.config.name if log.config else "System",
"payload": payload_data,
"ticket_id": log.ticket_id,
}


def _register() -> None:
from .routes import main_bp

# --- History & Logs ---

@main_bp.route("/api/activity/history")
@auth_required
def get_activity_history() -> Any:
Expand All @@ -35,75 +83,22 @@
.limit(50)
.all()
)
history = []
for log in logs:
# Reconstruct the message based on status and action
# This mimics the log_to_web calls in tasks.py
message = log.error_message or "Processed"
level = "info"

if log.status == "failed":
message = log.error_message or "Unknown error"
level = "error"
elif log.status == "skipped":
err_msg = log.error_message or "No action required"
prefix = "Skipped: "
# Prevent double-prefixing if the message already starts with "Skipped:"
if err_msg.strip().startswith("Skipped:"):
message = err_msg
else:
message = f"{prefix}{err_msg}"
level = "info"
elif log.status == "processed":
if log.action == "create":
message = f"Created NEW ticket (ID: {log.ticket_id})"
level = "warning"
elif log.action == "update":
if not log.error_message:
message = f"Updated existing ticket (ID: {log.ticket_id})"
level = "info"
elif log.action == "close":
message = f"Closed ticket (ID: {log.ticket_id})"
level = "success"
# Removed the dead 'skipped' action branch as it's handled by log.status

payload_data = {"raw": log.payload}
if log.payload and log.payload.startswith(("{", "[")):
try:
payload_data = json.loads(log.payload)
except (json.JSONDecodeError, TypeError):
pass

history.append({
"timestamp": log.created_at.isoformat(),
"message": message,
"level": level,
"config_name": log.config.name if log.config else "System",
"payload": payload_data,
"ticket_id": log.ticket_id
})
return jsonify(history)
return jsonify([_format_webhook_log(log) for log in logs])

@main_bp.route("/api/activity/trigger-timeout-check", methods=["POST"])
@auth_required
def trigger_timeout_check() -> Any:
from .tasks import check_webhook_timeouts

try:
# Trigger the task in the background
task = check_webhook_timeouts.delay()
return jsonify({
"status": "success",
"message": "Manual timeout check triggered in background.",
"task_id": task.id
})
return jsonify(
{"status": "success", "message": "Manual timeout check triggered in background.", "task_id": task.id}
)
except Exception as e:
current_app.logger.error(f"Failed to enqueue timeout check: {e}")
return jsonify({
"status": "error",
"message": "Failed to enqueue timeout check",
"details": str(e)
}), 503

return jsonify({"status": "error", "message": "Failed to enqueue timeout check", "details": str(e)}), 503

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

@main_bp.route("/history")
@auth_required
Expand Down
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,17 @@
def setup_test_env():
# Already set at top level, but kept for clarity
pass

from unittest.mock import MagicMock
import sys

@pytest.fixture(autouse=True)
def mock_redis(monkeypatch):
mock = MagicMock()
# Mocking in hookwise.extensions
monkeypatch.setattr("hookwise.extensions.redis_client", mock)
# Mocking in hookwise.tasks
monkeypatch.setattr("hookwise.tasks.redis_client", mock)
# Mocking in hookwise.api
monkeypatch.setattr("hookwise.api.redis_client", mock)
return mock
78 changes: 78 additions & 0 deletions tests/test_api_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from datetime import datetime, timezone

import pytest

from hookwise import create_app
from hookwise.extensions import db
from hookwise.models import User, WebhookConfig, WebhookLog


@pytest.fixture
def app():
app = create_app()
app.config["TESTING"] = True
app.config["WTF_CSRF_ENABLED"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
app.config["GUI_PASSWORD"] = "testpass"
return app


@pytest.fixture
def client(app):
with app.app_context():
db.create_all()
yield app.test_client()
db.session.remove()
db.drop_all()


def test_get_activity_history(client, app):
"""Test the get_activity_history API endpoint."""
with app.app_context():
# Setup test data
config = WebhookConfig(id="test-config", name="Test Config")
db.session.add(config)

user = User(username="admin", password_hash="hash")
db.session.add(user)
db.session.commit()
user_id = user.id

log1 = WebhookLog(
config_id="test-config",
request_id="req-1",
payload='{"key": "val"}',
status="processed",
action="create",
ticket_id=123,
created_at=datetime.now(timezone.utc),
)
log2 = WebhookLog(
config_id="test-config",
request_id="req-2",
payload="invalid-json",
status="failed",
error_message="Something went wrong",
created_at=datetime.now(timezone.utc),
)
db.session.add_all([log1, log2])
db.session.commit()

with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["username"] = "admin"

response = client.get("/api/activity/history")
assert response.status_code == 200
data = response.json
assert len(data) >= 2

processed_log = next(log_item for log_item in data if log_item.get("ticket_id") == 123)
assert processed_log["message"] == "Created NEW ticket (ID: 123)"
assert processed_log["level"] == "warning"
assert processed_log["payload"] == {"key": "val"}
assert processed_log["config_name"] == "Test Config"

failed_log = next(log_item for log_item in data if log_item["message"] == "Something went wrong")
assert failed_log["level"] == "error"
assert failed_log["payload"] == {"raw": "invalid-json"}
Loading