Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions backend/api_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3567,6 +3567,79 @@ def _handle_routes(db):
# ADMIN ROUTES
# ═══════════════════════════════════════════════════════════════════════════

elif path == "/admin/marketplace-ops" and method == "GET":
user = authenticate(db)
if not user or not user['is_admin']:
return error_response("Admin access required", 403)

limit = min(max(1, int(params.get("limit", 20))), 100)
job_rows = db.execute(
"""SELECT j.*, u.name as employer_name,
COUNT(DISTINCT a.id) as application_count
FROM jobs j
JOIN users u ON u.id = j.employer_id
LEFT JOIN applications a ON a.job_id = j.id
GROUP BY j.id
ORDER BY j.created_at DESC
LIMIT ?""",
[limit]
).fetchall()

recent_jobs = []
for job in job_rows:
job_dict = row_to_dict(job)
job_id = job_dict["id"]
job_link = f"#/jobs/{job_id}"
notification_rows = db.execute(
"""SELECT n.id, n.user_id, n.type, n.title, n.link, n.is_read, n.created_at,
u.name as user_name
FROM notifications n
JOIN users u ON u.id = n.user_id
WHERE n.link = ? AND n.type = 'job_match'
ORDER BY n.created_at DESC""",
[job_link]
).fetchall()
application_rows = db.execute(
"""SELECT a.id, a.worker_id, a.status, a.created_at,
u.name as worker_name
FROM applications a
JOIN users u ON u.id = a.worker_id
WHERE a.job_id = ?
ORDER BY a.created_at DESC""",
[job_id]
).fetchall()
matching_worker_rows = db.execute(
"""SELECT DISTINCT s.worker_id, u.name as worker_name, COUNT(s.id) as matching_service_count
FROM services s
JOIN users u ON u.id = s.worker_id
WHERE s.category = ? AND s.status = 'active' AND s.worker_id != ?
GROUP BY s.worker_id, u.name
ORDER BY matching_service_count DESC, u.name ASC
LIMIT 50""",
[job_dict["category"], job_dict["employer_id"]]
).fetchall()
job_dict["job_match_notifications"] = [row_to_dict(n) for n in notification_rows]
job_dict["job_match_notification_count"] = len(notification_rows)
job_dict["applications"] = [row_to_dict(a) for a in application_rows]
job_dict["matching_workers"] = [row_to_dict(w) for w in matching_worker_rows]
job_dict["matching_worker_count"] = len(matching_worker_rows)
recent_jobs.append(job_dict)

summary = {
"total_users": db.execute("SELECT COUNT(*) as c FROM users").fetchone()['c'],
"workers_registered": db.execute("SELECT COUNT(*) as c FROM worker_profiles").fetchone()['c'],
"employers_registered": db.execute("SELECT COUNT(*) as c FROM employer_profiles").fetchone()['c'],
"active_services": db.execute("SELECT COUNT(*) as c FROM services WHERE status='active'").fetchone()['c'],
"open_jobs": db.execute("SELECT COUNT(*) as c FROM jobs WHERE status='open'").fetchone()['c'],
"total_applications": db.execute("SELECT COUNT(*) as c FROM applications").fetchone()['c'],
"job_match_notifications_24h": db.execute("SELECT COUNT(*) as c FROM notifications WHERE type='job_match' AND datetime(created_at) >= datetime('now','-1 day')").fetchone()['c'],
}
return json_response({
"summary": summary,
"recent_jobs": recent_jobs,
"limit": limit,
})

elif path == "/admin/dashboard" and method == "GET":
user = authenticate(db)
if not user or not user['is_admin']:
Expand Down
56 changes: 56 additions & 0 deletions backend/test_deep_audit_regressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,62 @@ def test_job_creation_notifies_matching_service_workers(self):
finally:
db.close()

def test_admin_marketplace_ops_requires_admin(self):
self.module._request_ctx.request_method = "GET"
self.module._request_ctx.path_info = "/api/v1/admin/marketplace-ops"
self.module._request_ctx.query_string = ""
self.module._request_ctx.http_authorization = ""
self.module._request_ctx.http_x_api_key = ""
self.module._request_ctx.stdin_data = ""
self.module._request_ctx.content_type = ""
self.module._request_ctx.content_length = "0"
self.module._request_ctx.remote_addr = "127.0.0.1"
with contextlib.redirect_stdout(io.StringIO()) as out:
self.module.handle_request()
status, body = parse_cgi_output(out.getvalue())
self.assertEqual(status, 403, body)

def test_admin_marketplace_ops_surfaces_job_notifications_and_applications(self):
db = self.module.get_db()
token = "tok-admin"
try:
db.execute("INSERT INTO users (id,email,password_hash,name,is_admin) VALUES (1,'admin@example.com','x','Admin',1)")
db.execute("INSERT INTO users (id,email,password_hash,name) VALUES (2,'worker@example.com','x','Worker')")
db.execute("INSERT INTO worker_profiles (user_id) VALUES (2)")
db.execute("INSERT INTO users (id,email,password_hash,name) VALUES (3,'employer@example.com','x','Employer')")
db.execute("INSERT INTO employer_profiles (user_id) VALUES (3)")
db.execute("INSERT INTO sessions (user_id,token,expires_at) VALUES (1,?,datetime('now','+1 day'))", [token])
db.execute("INSERT INTO services (id,worker_id,title,description,category,pricing_type,price,status) VALUES (1,2,'Testing Svc','Desc','testing','fixed',25,'active')")
db.execute("INSERT INTO jobs (id,employer_id,title,description,category,budget_type,budget_amount,status) VALUES (7,3,'QA Job','Desc','testing','fixed',25,'open')")
db.execute("INSERT INTO notifications (user_id,type,title,message,link,is_read) VALUES (2,'job_match','New job','Msg','#/jobs/7',0)")
db.execute("INSERT INTO applications (job_id,worker_id,cover_message,status) VALUES (7,2,'I can help','pending')")
db.commit()
finally:
db.close()

self.module._request_ctx.request_method = "GET"
self.module._request_ctx.path_info = "/api/v1/admin/marketplace-ops"
self.module._request_ctx.query_string = "limit=5"
self.module._request_ctx.http_authorization = f"Bearer {token}"
self.module._request_ctx.http_x_api_key = ""
self.module._request_ctx.stdin_data = ""
self.module._request_ctx.content_type = ""
self.module._request_ctx.content_length = "0"
self.module._request_ctx.remote_addr = "127.0.0.1"
with contextlib.redirect_stdout(io.StringIO()) as out:
self.module.handle_request()
status, body = parse_cgi_output(out.getvalue())
self.assertEqual(status, 200, body)
self.assertEqual(body["summary"]["open_jobs"], 1)
self.assertEqual(body["summary"]["job_match_notifications_24h"], 1)
job = body["recent_jobs"][0]
self.assertEqual(job["id"], 7)
self.assertEqual(job["application_count"], 1)
self.assertEqual(job["job_match_notification_count"], 1)
self.assertEqual(job["job_match_notifications"][0]["user_id"], 2)
self.assertEqual(job["applications"][0]["worker_id"], 2)
self.assertEqual(job["matching_workers"][0]["worker_id"], 2)

def test_public_pricing_info_uses_connector_fee_language(self):
self.module._request_ctx.request_method = "GET"
self.module._request_ctx.path_info = "/pricing/info"
Expand Down
Loading