From 991e763bfe1042e29960937197f84c1a0675a56e Mon Sep 17 00:00:00 2001 From: profilesearch <222277199+profilesearch@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:52:18 -0700 Subject: [PATCH] feat: add admin marketplace ops visibility --- backend/api_core.py | 73 ++++++++++++++++++++++++++ backend/test_deep_audit_regressions.py | 56 ++++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/backend/api_core.py b/backend/api_core.py index 38f8bb6..ef9a9fc 100644 --- a/backend/api_core.py +++ b/backend/api_core.py @@ -3567,6 +3567,79 @@ def _handle_routes(db): # ADMIN ROUTES # ═══════════════════════════════════════════════════════════════════════════ + elif path == "/admin/marketplace-ops" and method == "GET": + user = authenticate(db) + if not user or not user['is_admin']: + return error_response("Admin access required", 403) + + limit = min(max(1, int(params.get("limit", 20))), 100) + job_rows = db.execute( + """SELECT j.*, u.name as employer_name, + COUNT(DISTINCT a.id) as application_count + FROM jobs j + JOIN users u ON u.id = j.employer_id + LEFT JOIN applications a ON a.job_id = j.id + GROUP BY j.id + ORDER BY j.created_at DESC + LIMIT ?""", + [limit] + ).fetchall() + + recent_jobs = [] + for job in job_rows: + job_dict = row_to_dict(job) + job_id = job_dict["id"] + job_link = f"#/jobs/{job_id}" + notification_rows = db.execute( + """SELECT n.id, n.user_id, n.type, n.title, n.link, n.is_read, n.created_at, + u.name as user_name + FROM notifications n + JOIN users u ON u.id = n.user_id + WHERE n.link = ? AND n.type = 'job_match' + ORDER BY n.created_at DESC""", + [job_link] + ).fetchall() + application_rows = db.execute( + """SELECT a.id, a.worker_id, a.status, a.created_at, + u.name as worker_name + FROM applications a + JOIN users u ON u.id = a.worker_id + WHERE a.job_id = ? + ORDER BY a.created_at DESC""", + [job_id] + ).fetchall() + matching_worker_rows = db.execute( + """SELECT DISTINCT s.worker_id, u.name as worker_name, COUNT(s.id) as matching_service_count + FROM services s + JOIN users u ON u.id = s.worker_id + WHERE s.category = ? AND s.status = 'active' AND s.worker_id != ? + GROUP BY s.worker_id, u.name + ORDER BY matching_service_count DESC, u.name ASC + LIMIT 50""", + [job_dict["category"], job_dict["employer_id"]] + ).fetchall() + job_dict["job_match_notifications"] = [row_to_dict(n) for n in notification_rows] + job_dict["job_match_notification_count"] = len(notification_rows) + job_dict["applications"] = [row_to_dict(a) for a in application_rows] + job_dict["matching_workers"] = [row_to_dict(w) for w in matching_worker_rows] + job_dict["matching_worker_count"] = len(matching_worker_rows) + recent_jobs.append(job_dict) + + summary = { + "total_users": db.execute("SELECT COUNT(*) as c FROM users").fetchone()['c'], + "workers_registered": db.execute("SELECT COUNT(*) as c FROM worker_profiles").fetchone()['c'], + "employers_registered": db.execute("SELECT COUNT(*) as c FROM employer_profiles").fetchone()['c'], + "active_services": db.execute("SELECT COUNT(*) as c FROM services WHERE status='active'").fetchone()['c'], + "open_jobs": db.execute("SELECT COUNT(*) as c FROM jobs WHERE status='open'").fetchone()['c'], + "total_applications": db.execute("SELECT COUNT(*) as c FROM applications").fetchone()['c'], + "job_match_notifications_24h": db.execute("SELECT COUNT(*) as c FROM notifications WHERE type='job_match' AND datetime(created_at) >= datetime('now','-1 day')").fetchone()['c'], + } + return json_response({ + "summary": summary, + "recent_jobs": recent_jobs, + "limit": limit, + }) + elif path == "/admin/dashboard" and method == "GET": user = authenticate(db) if not user or not user['is_admin']: diff --git a/backend/test_deep_audit_regressions.py b/backend/test_deep_audit_regressions.py index b10a128..3914f4f 100644 --- a/backend/test_deep_audit_regressions.py +++ b/backend/test_deep_audit_regressions.py @@ -165,6 +165,62 @@ def test_job_creation_notifies_matching_service_workers(self): finally: db.close() + def test_admin_marketplace_ops_requires_admin(self): + self.module._request_ctx.request_method = "GET" + self.module._request_ctx.path_info = "/api/v1/admin/marketplace-ops" + self.module._request_ctx.query_string = "" + self.module._request_ctx.http_authorization = "" + self.module._request_ctx.http_x_api_key = "" + self.module._request_ctx.stdin_data = "" + self.module._request_ctx.content_type = "" + self.module._request_ctx.content_length = "0" + self.module._request_ctx.remote_addr = "127.0.0.1" + with contextlib.redirect_stdout(io.StringIO()) as out: + self.module.handle_request() + status, body = parse_cgi_output(out.getvalue()) + self.assertEqual(status, 403, body) + + def test_admin_marketplace_ops_surfaces_job_notifications_and_applications(self): + db = self.module.get_db() + token = "tok-admin" + try: + db.execute("INSERT INTO users (id,email,password_hash,name,is_admin) VALUES (1,'admin@example.com','x','Admin',1)") + db.execute("INSERT INTO users (id,email,password_hash,name) VALUES (2,'worker@example.com','x','Worker')") + db.execute("INSERT INTO worker_profiles (user_id) VALUES (2)") + db.execute("INSERT INTO users (id,email,password_hash,name) VALUES (3,'employer@example.com','x','Employer')") + db.execute("INSERT INTO employer_profiles (user_id) VALUES (3)") + db.execute("INSERT INTO sessions (user_id,token,expires_at) VALUES (1,?,datetime('now','+1 day'))", [token]) + db.execute("INSERT INTO services (id,worker_id,title,description,category,pricing_type,price,status) VALUES (1,2,'Testing Svc','Desc','testing','fixed',25,'active')") + db.execute("INSERT INTO jobs (id,employer_id,title,description,category,budget_type,budget_amount,status) VALUES (7,3,'QA Job','Desc','testing','fixed',25,'open')") + db.execute("INSERT INTO notifications (user_id,type,title,message,link,is_read) VALUES (2,'job_match','New job','Msg','#/jobs/7',0)") + db.execute("INSERT INTO applications (job_id,worker_id,cover_message,status) VALUES (7,2,'I can help','pending')") + db.commit() + finally: + db.close() + + self.module._request_ctx.request_method = "GET" + self.module._request_ctx.path_info = "/api/v1/admin/marketplace-ops" + self.module._request_ctx.query_string = "limit=5" + self.module._request_ctx.http_authorization = f"Bearer {token}" + self.module._request_ctx.http_x_api_key = "" + self.module._request_ctx.stdin_data = "" + self.module._request_ctx.content_type = "" + self.module._request_ctx.content_length = "0" + self.module._request_ctx.remote_addr = "127.0.0.1" + with contextlib.redirect_stdout(io.StringIO()) as out: + self.module.handle_request() + status, body = parse_cgi_output(out.getvalue()) + self.assertEqual(status, 200, body) + self.assertEqual(body["summary"]["open_jobs"], 1) + self.assertEqual(body["summary"]["job_match_notifications_24h"], 1) + job = body["recent_jobs"][0] + self.assertEqual(job["id"], 7) + self.assertEqual(job["application_count"], 1) + self.assertEqual(job["job_match_notification_count"], 1) + self.assertEqual(job["job_match_notifications"][0]["user_id"], 2) + self.assertEqual(job["applications"][0]["worker_id"], 2) + self.assertEqual(job["matching_workers"][0]["worker_id"], 2) + def test_public_pricing_info_uses_connector_fee_language(self): self.module._request_ctx.request_method = "GET" self.module._request_ctx.path_info = "/pricing/info"