Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 74 additions & 20 deletions packages/agent/src/argus_agent/collectors/process_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,72 @@ def get_process_list(
sort_by: str = "cpu_percent",
limit: int = 50,
) -> list[dict[str, Any]]:
"""Get current process list snapshot (synchronous, for tools)."""
"""Get current process list snapshot (synchronous, for tools).

Uses `ps` for a complete listing (no AccessDenied gaps),
falls back to psutil if `ps` is unavailable.
"""
processes = _get_process_list_ps()
if processes is None:
processes = _get_process_list_psutil()

# Sort
if sort_by in ("cpu_percent", "memory_percent", "pid"):
processes.sort(key=lambda p: p.get(sort_by, 0), reverse=sort_by != "pid")
else:
processes.sort(key=lambda p: p.get("cpu_percent", 0), reverse=True)

return processes[:limit]


def _get_process_list_ps() -> list[dict[str, Any]] | None:
"""List processes via `ps -eo`. Returns None if ps fails."""
import subprocess

try:
result = subprocess.run(
["ps", "-eo", "pid,user,%cpu,%mem,stat,comm,args"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
return None
except (FileNotFoundError, subprocess.TimeoutExpired):
return None

lines = result.stdout.strip().splitlines()
if len(lines) < 2:
return None

processes: list[dict[str, Any]] = []
# Skip header line
for line in lines[1:]:
parts = line.split(None, 6)
if len(parts) < 6:
continue
try:
pid = int(parts[0])
cpu = float(parts[2])
mem = float(parts[3])
except (ValueError, IndexError):
continue
processes.append({
"pid": pid,
"name": parts[5],
"status": parts[4],
"cpu_percent": cpu,
"memory_percent": round(mem, 2),
"username": parts[1],
"cmdline": parts[6][:200] if len(parts) > 6 else "",
})

return processes


def _get_process_list_psutil() -> list[dict[str, Any]]:
"""Fallback: list processes via psutil (may miss AccessDenied processes)."""
processes: list[dict[str, Any]] = []
for proc in psutil.process_iter(
["pid", "name", "status", "cpu_percent", "memory_percent", "username", "cmdline"]
):
Expand All @@ -255,24 +318,15 @@ def get_process_list(
if not info:
continue
cmdline = info.get("cmdline") or []
processes.append(
{
"pid": info["pid"],
"name": info.get("name", ""),
"status": info.get("status", ""),
"cpu_percent": info.get("cpu_percent", 0.0) or 0.0,
"memory_percent": round(info.get("memory_percent", 0.0) or 0.0, 2),
"username": info.get("username", ""),
"cmdline": " ".join(cmdline)[:200] if cmdline else "",
}
)
processes.append({
"pid": info["pid"],
"name": info.get("name", ""),
"status": info.get("status", ""),
"cpu_percent": info.get("cpu_percent", 0.0) or 0.0,
"memory_percent": round(info.get("memory_percent", 0.0) or 0.0, 2),
"username": info.get("username", ""),
"cmdline": " ".join(cmdline)[:200] if cmdline else "",
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue

# Sort
if sort_by in ("cpu_percent", "memory_percent", "pid"):
processes.sort(key=lambda p: p.get(sort_by, 0), reverse=sort_by != "pid")
else:
processes.sort(key=lambda p: p.get("cpu_percent", 0), reverse=True)

return processes[:limit]
return processes
Loading