From 86232b26558e715bf48e1c4eec8f3e5ec057ba88 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 14 Apr 2026 10:46:41 +0000 Subject: [PATCH 1/3] Initial plan From 9dc2a748eec449534bf21cfb5dfd44635bec78fb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 14 Apr 2026 10:54:19 +0000 Subject: [PATCH 2/3] Implement complete NetworkzeroMonitor application - network_monitor.py: Core monitoring engine (NetworkMonitor + PiHoleMonitor) - Ping host with RTT parsing - DNS resolution with dnspython + socket fallback - Internet connectivity quality check - Local network info + interface stats via psutil - Per-interface traffic counters - Port scanner - Pi-hole API integration (status/summary/top blocked) - networkzero_cli.py: Full CLI with 7 commands - ping, dns, status, ports, traffic, pihole, monitor - networkzero_gui.py: Tkinter GUI with 7 tabs - Dashboard, Ping, DNS Lookup, Pi-hole, Live Monitor, Port Scanner, Traffic - Supporting files: setup.sh/bat, run_cli.sh/bat, run_gui.sh/bat, config.ini - test_networkzero.py: 25 unit tests (all passing) - README.md: Full documentation Agent-Logs-Url: https://github.com/AntwerpDesignsIonity/NetworkzeroMonitor/sessions/b4accdbc-220d-4921-b773-5a6231dde98e Co-authored-by: AntwerpDesignsIonity <211600625+AntwerpDesignsIonity@users.noreply.github.com> --- .gitignore | 47 ++++ README.md | 203 +++++++++++++- config.ini | 42 +++ network_monitor.py | 461 +++++++++++++++++++++++++++++++ networkzero_cli.py | 316 ++++++++++++++++++++++ networkzero_gui.py | 644 ++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 10 + run_cli.bat | 13 + run_cli.sh | 13 + run_gui.bat | 13 + run_gui.sh | 13 + setup.bat | 52 ++++ setup.sh | 55 ++++ test_networkzero.py | 277 +++++++++++++++++++ 14 files changed, 2158 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 config.ini create mode 100644 network_monitor.py create mode 100644 networkzero_cli.py create mode 100644 networkzero_gui.py create mode 100644 requirements.txt create mode 100644 run_cli.bat create mode 100755 run_cli.sh create mode 100644 run_gui.bat create mode 100755 run_gui.sh create mode 100644 setup.bat create mode 100755 setup.sh create mode 100644 test_networkzero.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bb10fdc --- /dev/null +++ b/.gitignore @@ -0,0 +1,47 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +venv/ +env/ +ENV/ +.venv + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +*.cover + +# Logs +*.log + +# Temporary files +*.tmp diff --git a/README.md b/README.md index 42de84c..b2adab8 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,203 @@ # NetworkzeroMonitor -NetworkZeroMonitor + +> **Ionity (Pty) Ltd** — [www.ionity.today](https://www.ionity.today) + +A cross-platform, Python-based network monitoring tool with a full **GUI** and a rich **CLI**. +Monitor pings, DNS, active ports, interface traffic and Pi-hole statistics — all from one place. + +--- + +## ✨ Features + +| Feature | CLI | GUI | +|---|---|---| +| Ping host (with RTT stats) | ✓ | ✓ | +| DNS resolution (custom DNS server) | ✓ | ✓ | +| DNS comparison across all servers | — | ✓ | +| Network information (hostname, IP, interfaces) | ✓ | ✓ | +| Internet connectivity quality check | ✓ | ✓ | +| Port scanner | ✓ | ✓ | +| Interface traffic counters | ✓ | ✓ | +| Pi-hole status / summary / top blocked | ✓ | ✓ | +| Live continuous monitoring | ✓ | ✓ | + +--- + +## 🚀 Quick Start + +### Prerequisites + +- Python 3.8 or higher +- `pip` (bundled with Python) +- `tkinter` (for GUI — pre-installed on most systems; on Ubuntu: `sudo apt install python3-tk`) + +### 1. Clone or download the repository + +```bash +git clone https://github.com/AntwerpDesignsIonity/NetworkzeroMonitor.git +cd NetworkzeroMonitor +``` + +### 2. Run setup (creates a virtual environment and installs dependencies) + +**Unix / Linux / macOS:** +```bash +chmod +x setup.sh run_cli.sh run_gui.sh +./setup.sh +``` + +**Windows:** +```bat +setup.bat +``` + +### 3. Launch + +**GUI:** +```bash +./run_gui.sh # Unix +run_gui.bat # Windows +``` + +**CLI:** +```bash +./run_cli.sh --help # Unix +run_cli.bat --help # Windows +``` + +--- + +## 📟 CLI Reference + +``` +networkzero_cli.py [options] +``` + +| Command | Description | +|---|---| +| `ping ` | Ping a host | +| `dns ` | DNS lookup (optionally via a specific server) | +| `status` | Show network info and connectivity | +| `ports` | Scan common ports on a host | +| `traffic` | Show interface traffic counters | +| `pihole ` | Pi-hole status / summary / top blocked | +| `monitor` | Continuous live monitoring | + +### Examples + +```bash +# Ping with 10 packets +./run_cli.sh ping 8.8.8.8 -c 10 + +# DNS lookup using Cloudflare +./run_cli.sh dns github.com -s 1.1.1.1 + +# Verbose network status +./run_cli.sh status -v + +# Scan specific ports +./run_cli.sh ports --host 192.168.1.1 --ports 22 80 443 + +# Show interface traffic +./run_cli.sh traffic + +# Pi-hole summary +./run_cli.sh pihole summary --url http://192.168.1.1 --api-key YOUR_KEY + +# Continuous monitoring every 3 seconds +./run_cli.sh monitor --host 8.8.8.8 --interval 3 +``` + +--- + +## 🖥️ GUI Overview + +The GUI is organised into seven tabs: + +1. **📊 Dashboard** — hostname, IP addresses, interfaces, connectivity quality +2. **🏓 Ping** — interactive ping with RTT display +3. **🌐 DNS Lookup** — single lookup or compare across all configured DNS servers +4. **🕳 Pi-hole** — status, daily statistics, top blocked domains +5. **📡 Live Monitor** — scrolling live log of connectivity and ping results +6. **🔍 Port Scanner** — scan a host for open/closed ports +7. **📶 Traffic** — per-interface traffic counters (bytes sent/received, errors) + +--- + +## ⚙️ Configuration + +Edit `config.ini` to change defaults: + +```ini +[Network] +default_ping_host = 8.8.8.8 +test_domains = google.com,cloudflare.com,github.com +dns_servers = 8.8.8.8,1.1.1.1,208.67.222.222 +monitor_interval = 5 +ping_count = 4 +ping_timeout = 2 + +[PiHole] +# url = http://192.168.1.1 +# api_key = your_api_key_here + +[UI] +window_width = 1024 +window_height = 720 +theme = clam +``` + +--- + +## 🧪 Running Tests + +```bash +# Activate the virtual environment first +source venv/bin/activate # Unix +venv\Scripts\activate.bat # Windows + +# Run unit tests +python test_networkzero.py + +# Or run the manual demo +python test_networkzero.py --demo +``` + +--- + +## 📦 Dependencies + +| Package | Purpose | +|---|---| +| `dnspython` | DNS resolution with custom servers | +| `psutil` | Interface statistics and traffic counters | +| `requests` | Public IP lookup, Pi-hole API | +| `matplotlib` | (optional) Future chart support | + +--- + +## 📁 Project Structure + +``` +NetworkzeroMonitor/ +├── network_monitor.py # Core monitoring engine +├── networkzero_cli.py # Command-line interface +├── networkzero_gui.py # Graphical user interface (tkinter) +├── test_networkzero.py # Unit + integration tests +├── config.ini # Application configuration +├── requirements.txt # Python dependencies +├── setup.sh # Unix setup script +├── setup.bat # Windows setup script +├── run_cli.sh / run_cli.bat +└── run_gui.sh / run_gui.bat +``` + +--- + +## 📜 License + +MIT License — see [LICENSE](LICENSE) for details. + +--- + +*NetworkzeroMonitor is a product of **Ionity (Pty) Ltd** — [www.ionity.today](https://www.ionity.today)* diff --git a/config.ini b/config.ini new file mode 100644 index 0000000..c6f6a5d --- /dev/null +++ b/config.ini @@ -0,0 +1,42 @@ +# NetworkzeroMonitor Configuration +# Ionity (Pty) Ltd - www.ionity.today + +[General] +# Application name +app_name = NetworkzeroMonitor +version = 1.0.0 + +[Network] +# Default hosts to monitor +default_ping_host = 8.8.8.8 +test_domains = google.com,cloudflare.com,github.com + +# DNS servers to use for lookups +dns_servers = 8.8.8.8,1.1.1.1,208.67.222.222 + +# Monitoring interval (seconds) +monitor_interval = 5 + +# Ping settings +ping_count = 4 +ping_timeout = 2 + +[PiHole] +# Pi-hole server configuration (optional) +# Uncomment and configure to enable Pi-hole monitoring +# url = http://192.168.1.1 +# api_key = your_api_key_here + +[UI] +# GUI window size +window_width = 900 +window_height = 700 + +# Theme +# Options: default, clam, alt, classic +theme = clam + +[Branding] +# Company information +company_name = Ionity (Pty) Ltd +website = www.ionity.today diff --git a/network_monitor.py b/network_monitor.py new file mode 100644 index 0000000..9a3d59c --- /dev/null +++ b/network_monitor.py @@ -0,0 +1,461 @@ +""" +NetworkzeroMonitor - Core Network Monitoring Module +Ionity (Pty) Ltd - www.ionity.today + +This module provides core network monitoring functionality including: +- Ping monitoring +- DNS resolution checks +- Network connectivity tests +- Network interface statistics (via psutil) +- Pi-hole integration +""" + +import socket +import subprocess +import platform +import time +import configparser +import os +from typing import Dict, List, Optional +import psutil +import requests + +try: + import dns.resolver + DNS_AVAILABLE = True +except ImportError: + DNS_AVAILABLE = False + +# Load configuration +_config = configparser.ConfigParser() +_config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini') +if os.path.exists(_config_path): + _config.read(_config_path) + + +class NetworkMonitor: + """Core network monitoring class""" + + def __init__(self): + self.dns_servers = ['8.8.8.8', '1.1.1.1', '208.67.222.222'] # Google, Cloudflare, OpenDNS + self.test_domains = ['google.com', 'cloudflare.com', 'github.com'] + + # Override defaults from config if available + if _config.has_option('Network', 'dns_servers'): + self.dns_servers = [s.strip() for s in _config.get('Network', 'dns_servers').split(',')] + if _config.has_option('Network', 'test_domains'): + self.test_domains = [d.strip() for d in _config.get('Network', 'test_domains').split(',')] + + def ping_host(self, host: str, count: int = 4, timeout: int = 2) -> Dict: + """ + Ping a host and return statistics. + + Args: + host: Hostname or IP address to ping + count: Number of ping requests + timeout: Timeout in seconds + + Returns: + Dictionary with ping results + """ + system = platform.system().lower() + if system == 'windows': + param = '-n' + timeout_param = '-w' + timeout_val = str(timeout * 1000) + else: + param = '-c' + timeout_param = '-W' + timeout_val = str(timeout) + + command = ['ping', param, str(count), timeout_param, timeout_val, host] + + try: + start_time = time.time() + output = subprocess.run( + command, + capture_output=True, + text=True, + timeout=timeout * count + 5 + ) + elapsed = time.time() - start_time + + success = output.returncode == 0 + stdout = output.stdout if success else (output.stdout + output.stderr) + + result = { + 'host': host, + 'success': success, + 'output': stdout, + 'elapsed_time': round(elapsed, 2), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + if success: + lines_lower = stdout.lower() + result['reachable'] = ( + 'packet loss' in lines_lower or + 'received' in lines_lower or + 'bytes from' in lines_lower + ) + else: + result['reachable'] = False + + # Parse min/avg/max RTT where possible + for line in stdout.splitlines(): + line_l = line.lower() + if 'min/avg/max' in line_l or 'minimum' in line_l: + result['rtt_line'] = line.strip() + break + + return result + + except subprocess.TimeoutExpired: + return { + 'host': host, + 'success': False, + 'reachable': False, + 'error': 'Timeout', + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + except FileNotFoundError: + return { + 'host': host, + 'success': False, + 'reachable': False, + 'error': 'ping command not found', + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + except Exception as exc: + return { + 'host': host, + 'success': False, + 'reachable': False, + 'error': str(exc), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + def check_dns_resolution(self, domain: str, dns_server: Optional[str] = None) -> Dict: + """ + Check DNS resolution for a domain. + + Args: + domain: Domain name to resolve + dns_server: Optional DNS server to use + + Returns: + Dictionary with DNS resolution results + """ + if DNS_AVAILABLE: + return self._dns_via_dnspython(domain, dns_server) + else: + return self._dns_via_socket(domain) + + def _dns_via_dnspython(self, domain: str, dns_server: Optional[str]) -> Dict: + """DNS resolution using dnspython.""" + import dns.resolver # noqa: PLC0415 + + resolver = dns.resolver.Resolver() + if dns_server: + resolver.nameservers = [dns_server] + + try: + start_time = time.time() + answers = resolver.resolve(domain, 'A') + elapsed = time.time() - start_time + + return { + 'domain': domain, + 'success': True, + 'ip_addresses': [str(r) for r in answers], + 'dns_server': dns_server or 'system default', + 'query_time': round(elapsed * 1000, 2), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + except Exception as exc: + return { + 'domain': domain, + 'success': False, + 'error': str(exc), + 'dns_server': dns_server or 'system default', + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + def _dns_via_socket(self, domain: str) -> Dict: + """Fallback DNS resolution using the system socket library.""" + try: + start_time = time.time() + results = socket.getaddrinfo(domain, None) + elapsed = time.time() - start_time + ips = list({r[4][0] for r in results}) + return { + 'domain': domain, + 'success': True, + 'ip_addresses': ips, + 'dns_server': 'system default', + 'query_time': round(elapsed * 1000, 2), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + except Exception as exc: + return { + 'domain': domain, + 'success': False, + 'error': str(exc), + 'dns_server': 'system default', + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + def check_internet_connectivity(self) -> Dict: + """ + Check overall internet connectivity by pinging test domains. + + Returns: + Dictionary with connectivity status + """ + results = [] + for domain in self.test_domains: + result = self.ping_host(domain, count=1, timeout=2) + results.append({ + 'domain': domain, + 'reachable': result.get('reachable', False), + }) + + successful = sum(1 for r in results if r['reachable']) + total = len(results) + + if successful == 0: + quality = 'No Connection' + elif successful < total: + quality = 'Degraded' + else: + quality = 'Excellent' + + return { + 'connected': successful > 0, + 'quality': quality, + 'tests': results, + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + def get_network_info(self) -> Dict: + """ + Get local network information including interface stats. + + Returns: + Dictionary with network information + """ + try: + hostname = socket.gethostname() + try: + local_ip = socket.gethostbyname(hostname) + except Exception: + local_ip = '127.0.0.1' + + # Collect interface statistics via psutil + interfaces = {} + net_if_addrs = psutil.net_if_addrs() + net_if_stats = psutil.net_if_stats() + net_io = psutil.net_io_counters(pernic=True) + + for iface, addrs in net_if_addrs.items(): + ipv4 = [a.address for a in addrs if a.family == socket.AF_INET] + stats = net_if_stats.get(iface) + io = net_io.get(iface) + interfaces[iface] = { + 'ipv4': ipv4, + 'is_up': stats.isup if stats else False, + 'speed_mbps': stats.speed if stats else 0, + 'bytes_sent': io.bytes_sent if io else 0, + 'bytes_recv': io.bytes_recv if io else 0, + } + + # Try to get public IP + public_ip = None + try: + response = requests.get('https://api.ipify.org?format=json', timeout=5) + if response.status_code == 200: + public_ip = response.json().get('ip') + except Exception: + pass + + return { + 'hostname': hostname, + 'local_ip': local_ip, + 'public_ip': public_ip or 'Unavailable', + 'platform': platform.system(), + 'platform_version': platform.version(), + 'interfaces': interfaces, + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + except Exception as exc: + return { + 'error': str(exc), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + def get_interface_traffic(self) -> Dict: + """ + Get current per-interface traffic counters. + + Returns: + Dictionary mapping interface name to traffic stats + """ + counters = psutil.net_io_counters(pernic=True) + result = {} + for iface, io in counters.items(): + result[iface] = { + 'bytes_sent': io.bytes_sent, + 'bytes_recv': io.bytes_recv, + 'packets_sent': io.packets_sent, + 'packets_recv': io.packets_recv, + 'errin': io.errin, + 'errout': io.errout, + 'dropin': io.dropin, + 'dropout': io.dropout, + } + return result + + def get_open_ports(self, host: str = '127.0.0.1', ports: Optional[List[int]] = None) -> Dict: + """ + Check which of the given ports are open on host. + + Args: + host: Host to scan + ports: List of port numbers to check (defaults to common ports) + + Returns: + Dictionary mapping port -> open (bool) + """ + if ports is None: + ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 3306, 5432, 8080, 8443] + + results = {} + for port in ports: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + connected = sock.connect_ex((host, port)) == 0 + sock.close() + results[port] = connected + except Exception: + results[port] = False + + return { + 'host': host, + 'ports': results, + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + +class PiHoleMonitor: + """Pi-hole monitoring integration""" + + def __init__(self, pihole_url: str, api_key: Optional[str] = None): + """ + Initialize Pi-hole monitor. + + Args: + pihole_url: URL of Pi-hole instance (e.g., http://192.168.1.1) + api_key: Optional API key for authenticated requests + """ + self.pihole_url = pihole_url.rstrip('/') + self.api_key = api_key + + def _api_get(self, params: str) -> Dict: + """Helper to call the Pi-hole admin API.""" + url = f"{self.pihole_url}/admin/api.php?{params}" + if self.api_key: + url += f"&auth={self.api_key}" + response = requests.get(url, timeout=5) + response.raise_for_status() + return response.json() + + def get_summary(self) -> Dict: + """ + Get Pi-hole summary statistics. + + Returns: + Dictionary with Pi-hole stats + """ + try: + data = self._api_get('summary') + return { + 'success': True, + 'ads_blocked_today': data.get('ads_blocked_today', 0), + 'dns_queries_today': data.get('dns_queries_today', 0), + 'ads_percentage_today': data.get('ads_percentage_today', 0), + 'domains_being_blocked': data.get('domains_being_blocked', 0), + 'status': data.get('status', 'unknown'), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + except Exception as exc: + return { + 'success': False, + 'error': str(exc), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + def get_top_blocked(self, count: int = 10) -> Dict: + """ + Get top blocked domains. + + Args: + count: Number of top domains to return + + Returns: + Dictionary with top blocked domains + """ + try: + data = self._api_get(f'topItems={count}') + return { + 'success': True, + 'top_ads': data.get('top_ads', {}), + 'top_queries': data.get('top_queries', {}), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + except Exception as exc: + return { + 'success': False, + 'error': str(exc), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + def check_status(self) -> Dict: + """ + Check Pi-hole enabled/disabled status. + + Returns: + Dictionary with Pi-hole status + """ + try: + data = self._api_get('status') + return { + 'success': True, + 'enabled': data.get('status') == 'enabled', + 'status': data.get('status', 'unknown'), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + except Exception as exc: + return { + 'success': False, + 'error': str(exc), + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + } + + +if __name__ == '__main__': + monitor = NetworkMonitor() + print("NetworkzeroMonitor - Core Module Test") + print("Ionity (Pty) Ltd - www.ionity.today") + print("=" * 50) + + print("\nNetwork Info:") + info = monitor.get_network_info() + print(f" Hostname : {info.get('hostname', 'N/A')}") + print(f" Local IP : {info.get('local_ip', 'N/A')}") + print(f" Platform : {info.get('platform', 'N/A')}") + + print("\nPing Test (localhost):") + result = monitor.ping_host('127.0.0.1', count=2) + print(f" Reachable: {result.get('reachable', False)}") diff --git a/networkzero_cli.py b/networkzero_cli.py new file mode 100644 index 0000000..bf06eea --- /dev/null +++ b/networkzero_cli.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python3 +""" +NetworkzeroMonitor - Command Line Interface +Ionity (Pty) Ltd - www.ionity.today + +CLI for network monitoring and diagnostics. + +Usage: + networkzero_cli.py ping [-c COUNT] [-t TIMEOUT] [-v] + networkzero_cli.py dns [-s SERVER] + networkzero_cli.py status [-v] + networkzero_cli.py ports [--host HOST] [--ports PORT [PORT ...]] + networkzero_cli.py traffic + networkzero_cli.py pihole --url URL [--api-key KEY] [--count N] + networkzero_cli.py monitor [--host HOST] [--interval SEC] +""" + +import argparse +import sys +import time + +from network_monitor import NetworkMonitor, PiHoleMonitor + + +BANNER = """ +╔══════════════════════════════════════════════════════════╗ +║ NetworkzeroMonitor · Ionity (Pty) Ltd ║ +║ www.ionity.today ║ +╚══════════════════════════════════════════════════════════╝ +""" + + +def print_header(): + print(BANNER) + + +def _bytes_human(n: int) -> str: + for unit in ('B', 'KB', 'MB', 'GB', 'TB'): + if n < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} PB" + + +# --------------------------------------------------------------------------- +# Command handlers +# --------------------------------------------------------------------------- + +def cmd_ping(args) -> int: + monitor = NetworkMonitor() + result = monitor.ping_host(args.host, count=args.count, timeout=args.timeout) + + print(f"\nPing Results for {args.host}") + print("-" * 44) + print(f" Reachable : {result.get('reachable', False)}") + print(f" Elapsed Time : {result.get('elapsed_time', 'N/A')} s") + if 'rtt_line' in result: + print(f" RTT : {result['rtt_line']}") + print(f" Timestamp : {result['timestamp']}") + + if args.verbose and 'output' in result: + print("\nRaw Output:") + print(result['output']) + + if 'error' in result: + print(f" Error : {result['error']}") + + return 0 if result.get('reachable', False) else 1 + + +def cmd_dns(args) -> int: + monitor = NetworkMonitor() + result = monitor.check_dns_resolution(args.domain, args.server) + + print(f"\nDNS Resolution for {args.domain}") + print("-" * 44) + print(f" DNS Server : {result['dns_server']}") + + if result['success']: + print(f" IP Addresses: {', '.join(result['ip_addresses'])}") + print(f" Query Time : {result['query_time']} ms") + else: + print(f" Error : {result.get('error')}") + + print(f" Timestamp : {result['timestamp']}") + return 0 if result['success'] else 1 + + +def cmd_status(args) -> int: + monitor = NetworkMonitor() + + print("\nNetwork Information") + print("-" * 44) + info = monitor.get_network_info() + print(f" Hostname : {info.get('hostname', 'N/A')}") + print(f" Local IP : {info.get('local_ip', 'N/A')}") + print(f" Public IP : {info.get('public_ip', 'N/A')}") + print(f" Platform : {info.get('platform', 'N/A')}") + + if args.verbose: + ifaces = info.get('interfaces', {}) + if ifaces: + print("\n Network Interfaces:") + for iface, data in ifaces.items(): + status = "UP" if data['is_up'] else "down" + ips = ', '.join(data['ipv4']) if data['ipv4'] else 'no IPv4' + print(f" {iface:<20} {status:<6} {ips}") + + print("\nInternet Connectivity") + print("-" * 44) + connectivity = monitor.check_internet_connectivity() + print(f" Connected : {connectivity['connected']}") + print(f" Quality : {connectivity['quality']}") + + if args.verbose: + for test in connectivity['tests']: + symbol = "✓" if test['reachable'] else "✗" + print(f" {symbol} {test['domain']}") + + return 0 if connectivity['connected'] else 1 + + +def cmd_ports(args) -> int: + monitor = NetworkMonitor() + ports = args.ports if args.ports else None + result = monitor.get_open_ports(host=args.host, ports=ports) + + print(f"\nPort Scan: {result['host']}") + print("-" * 44) + for port, is_open in sorted(result['ports'].items()): + state = "OPEN" if is_open else "closed" + print(f" Port {port:<6}: {state}") + print(f"\n Timestamp: {result['timestamp']}") + return 0 + + +def cmd_traffic(args) -> int: # noqa: ARG001 + monitor = NetworkMonitor() + traffic = monitor.get_interface_traffic() + + print("\nNetwork Interface Traffic") + print("-" * 60) + print(f" {'Interface':<20} {'Sent':>12} {'Received':>12} {'Err In':>8} {'Err Out':>8}") + print(" " + "-" * 56) + for iface, data in traffic.items(): + print( + f" {iface:<20} " + f"{_bytes_human(data['bytes_sent']):>12} " + f"{_bytes_human(data['bytes_recv']):>12} " + f"{data['errin']:>8} " + f"{data['errout']:>8}" + ) + return 0 + + +def cmd_pihole(args) -> int: + pihole = PiHoleMonitor(args.url, getattr(args, 'api_key', None)) + + if args.action == 'status': + result = pihole.check_status() + print(f"\nPi-hole Status ({args.url})") + print("-" * 44) + if result['success']: + print(f" Status : {result['status']}") + print(f" Enabled : {result['enabled']}") + else: + print(f" Error : {result.get('error')}") + + elif args.action == 'summary': + result = pihole.get_summary() + print(f"\nPi-hole Summary ({args.url})") + print("-" * 44) + if result['success']: + print(f" DNS Queries Today : {result['dns_queries_today']:,}") + print(f" Ads Blocked Today : {result['ads_blocked_today']:,}") + print(f" Percentage Blocked : {result['ads_percentage_today']:.1f}%") + print(f" Domains on Blocklist : {result['domains_being_blocked']:,}") + print(f" Status : {result['status']}") + else: + print(f" Error : {result.get('error')}") + + elif args.action == 'blocked': + count = getattr(args, 'count', 10) + result = pihole.get_top_blocked(count=count) + print(f"\nTop {count} Blocked Domains ({args.url})") + print("-" * 44) + if result['success']: + for i, (domain, hits) in enumerate(result['top_ads'].items(), 1): + print(f" {i:2d}. {domain:<40} {hits:,} blocks") + else: + print(f" Error : {result.get('error')}") + else: + result = {'success': False} + + print(f"\n Timestamp: {result['timestamp']}") + return 0 if result.get('success', False) else 1 + + +def cmd_monitor(args) -> int: + monitor = NetworkMonitor() + + print(f"\nContinuous Network Monitoring (host: {args.host}, interval: {args.interval}s)") + print("Press Ctrl+C to stop") + print("-" * 60) + + try: + while True: + connectivity = monitor.check_internet_connectivity() + ping_result = monitor.ping_host(args.host, count=1, timeout=2) + + net_sym = "✓" if connectivity['connected'] else "✗" + ping_sym = "✓" if ping_result.get('reachable', False) else "✗" + ts = time.strftime('%H:%M:%S') + + print( + f"[{ts}] Internet: {net_sym} {connectivity['quality']:<12} | " + f"{args.host}: {ping_sym}" + ) + time.sleep(args.interval) + + except KeyboardInterrupt: + print("\nMonitoring stopped.") + + return 0 + + +# --------------------------------------------------------------------------- +# Argument parser +# --------------------------------------------------------------------------- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog='networkzero_cli', + description='NetworkzeroMonitor CLI — Ionity (Pty) Ltd · www.ionity.today', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s ping 8.8.8.8 + %(prog)s dns github.com -s 8.8.8.8 + %(prog)s status -v + %(prog)s ports --host 127.0.0.1 --ports 22 80 443 + %(prog)s traffic + %(prog)s pihole status --url http://192.168.1.1 + %(prog)s pihole summary --url http://192.168.1.1 --api-key YOUR_KEY + %(prog)s pihole blocked --url http://192.168.1.1 --count 20 + %(prog)s monitor --host 8.8.8.8 --interval 5 + """, + ) + + sub = parser.add_subparsers(dest='command', metavar='COMMAND', help='Available commands') + + # --- ping --- + p_ping = sub.add_parser('ping', help='Ping a host') + p_ping.add_argument('host', help='Hostname or IP address to ping') + p_ping.add_argument('-c', '--count', type=int, default=4, help='Number of pings (default: 4)') + p_ping.add_argument('-t', '--timeout', type=int, default=2, help='Timeout per ping in seconds (default: 2)') + p_ping.add_argument('-v', '--verbose', action='store_true', help='Show raw ping output') + + # --- dns --- + p_dns = sub.add_parser('dns', help='DNS lookup') + p_dns.add_argument('domain', help='Domain name to resolve') + p_dns.add_argument('-s', '--server', metavar='SERVER', help='DNS server to use (e.g. 8.8.8.8)') + + # --- status --- + p_status = sub.add_parser('status', help='Show network status and info') + p_status.add_argument('-v', '--verbose', action='store_true', help='Show detailed interface info') + + # --- ports --- + p_ports = sub.add_parser('ports', help='Scan common ports on a host') + p_ports.add_argument('--host', default='127.0.0.1', help='Host to scan (default: 127.0.0.1)') + p_ports.add_argument('--ports', type=int, nargs='+', metavar='PORT', + help='Ports to check (defaults to common ports)') + + # --- traffic --- + sub.add_parser('traffic', help='Show network interface traffic counters') + + # --- pihole --- + p_pi = sub.add_parser('pihole', help='Pi-hole monitoring') + p_pi.add_argument('action', choices=['status', 'summary', 'blocked'], help='Pi-hole action') + p_pi.add_argument('--url', required=True, help='Pi-hole URL (e.g. http://192.168.1.1)') + p_pi.add_argument('--api-key', dest='api_key', help='Pi-hole API key (optional)') + p_pi.add_argument('--count', type=int, default=10, help='Number of blocked domains to list (default: 10)') + + # --- monitor --- + p_mon = sub.add_parser('monitor', help='Continuous live monitoring') + p_mon.add_argument('--host', default='8.8.8.8', help='Host to monitor (default: 8.8.8.8)') + p_mon.add_argument('--interval', type=int, default=5, help='Check interval in seconds (default: 5)') + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + print_header() + + if not args.command: + parser.print_help() + return 1 + + dispatch = { + 'ping': cmd_ping, + 'dns': cmd_dns, + 'status': cmd_status, + 'ports': cmd_ports, + 'traffic': cmd_traffic, + 'pihole': cmd_pihole, + 'monitor': cmd_monitor, + } + + return dispatch[args.command](args) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/networkzero_gui.py b/networkzero_gui.py new file mode 100644 index 0000000..73a248a --- /dev/null +++ b/networkzero_gui.py @@ -0,0 +1,644 @@ +#!/usr/bin/env python3 +""" +NetworkzeroMonitor - Graphical User Interface +Ionity (Pty) Ltd - www.ionity.today + +GUI for network monitoring and diagnostics. +Tabs: Dashboard · Ping · DNS Lookup · Pi-hole · Live Monitor · Port Scanner · Traffic +""" + +import configparser +import os +import threading +import time +import tkinter as tk +from tkinter import messagebox, scrolledtext, ttk + +from network_monitor import NetworkMonitor, PiHoleMonitor + +# --------------------------------------------------------------------------- +# Load config +# --------------------------------------------------------------------------- +_cfg = configparser.ConfigParser() +_cfg_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini') +if os.path.exists(_cfg_path): + _cfg.read(_cfg_path) + +WINDOW_WIDTH = int(_cfg.get('UI', 'window_width', fallback='1024')) +WINDOW_HEIGHT = int(_cfg.get('UI', 'window_height', fallback='720')) +THEME = _cfg.get('UI', 'theme', fallback='clam') + +IONITY_BLUE = '#0057B7' +IONITY_DARK = '#1a1a2e' +IONITY_LIGHT = '#e8f4fd' + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _bytes_human(n: int) -> str: + for unit in ('B', 'KB', 'MB', 'GB', 'TB'): + if n < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} PB" + + +# --------------------------------------------------------------------------- +# Main Application +# --------------------------------------------------------------------------- + +class NetworkzeroGUI: + """Main GUI application for NetworkzeroMonitor.""" + + def __init__(self, root: tk.Tk): + self.root = root + self.root.title("NetworkzeroMonitor — Ionity (Pty) Ltd") + self.root.geometry(f"{WINDOW_WIDTH}x{WINDOW_HEIGHT}") + self.root.minsize(800, 600) + + self.monitor = NetworkMonitor() + self.pihole: PiHoleMonitor | None = None + self.monitoring_active = False + self._monitor_thread: threading.Thread | None = None + + # Apply ttk theme + style = ttk.Style() + try: + style.theme_use(THEME) + except tk.TclError: + pass + style.configure('Header.TLabel', font=('Arial', 18, 'bold'), foreground=IONITY_BLUE) + style.configure('Brand.TLabel', font=('Arial', 10), foreground=IONITY_BLUE) + style.configure('Status.TLabel', font=('Arial', 10), relief=tk.SUNKEN) + + self._build_ui() + # Initial data load (non-blocking) + threading.Thread(target=self._refresh_dashboard_data, daemon=True).start() + + # ------------------------------------------------------------------ + # UI construction + # ------------------------------------------------------------------ + + def _build_ui(self): + self.root.columnconfigure(0, weight=1) + self.root.rowconfigure(1, weight=1) + + self._build_header() + self._build_notebook() + self._build_statusbar() + + def _build_header(self): + hdr = ttk.Frame(self.root, padding=(10, 6, 10, 2)) + hdr.grid(row=0, column=0, sticky='ew') + hdr.columnconfigure(1, weight=1) + + ttk.Label(hdr, text="NetworkzeroMonitor", style='Header.TLabel').grid( + row=0, column=0, sticky='w') + ttk.Label(hdr, text="Ionity (Pty) Ltd · www.ionity.today", + style='Brand.TLabel').grid(row=1, column=0, sticky='w') + ttk.Button(hdr, text="⟳ Refresh All", command=self._refresh_all).grid( + row=0, column=2, rowspan=2, padx=(10, 0)) + + def _build_notebook(self): + self.nb = ttk.Notebook(self.root) + self.nb.grid(row=1, column=0, sticky='nsew', padx=8, pady=4) + + self._tab_dashboard() + self._tab_ping() + self._tab_dns() + self._tab_pihole() + self._tab_monitor() + self._tab_ports() + self._tab_traffic() + + def _build_statusbar(self): + bar = ttk.Frame(self.root) + bar.grid(row=2, column=0, sticky='ew') + bar.columnconfigure(0, weight=1) + self.status_var = tk.StringVar(value="Ready") + ttk.Label(bar, textvariable=self.status_var, style='Status.TLabel').grid( + row=0, column=0, sticky='ew', padx=4, pady=2) + + # ------------------------------------------------------------------ + # Tab: Dashboard + # ------------------------------------------------------------------ + + def _tab_dashboard(self): + tab = ttk.Frame(self.nb, padding=6) + self.nb.add(tab, text="📊 Dashboard") + tab.columnconfigure(0, weight=1) + tab.rowconfigure(1, weight=1) + + # Network info + info_f = ttk.LabelFrame(tab, text="Network Information", padding=6) + info_f.grid(row=0, column=0, sticky='ew', pady=(0, 4)) + info_f.columnconfigure(0, weight=1) + self.dash_info = scrolledtext.ScrolledText(info_f, height=8, state='disabled', wrap='word') + self.dash_info.grid(row=0, column=0, sticky='ew') + + # Connectivity + conn_f = ttk.LabelFrame(tab, text="Internet Connectivity", padding=6) + conn_f.grid(row=1, column=0, sticky='nsew', pady=(0, 4)) + conn_f.columnconfigure(0, weight=1) + conn_f.rowconfigure(1, weight=1) + + self.conn_label = ttk.Label(conn_f, text="Status: Checking…", font=('Arial', 12, 'bold')) + self.conn_label.grid(row=0, column=0, pady=4, sticky='w') + self.conn_details = scrolledtext.ScrolledText(conn_f, height=6, state='disabled', wrap='word') + self.conn_details.grid(row=1, column=0, sticky='nsew') + + ttk.Button(tab, text="Refresh Dashboard", + command=lambda: threading.Thread( + target=self._refresh_dashboard_data, daemon=True).start() + ).grid(row=2, column=0, pady=4) + + def _refresh_dashboard_data(self): + self._status("Refreshing dashboard…") + info = self.monitor.get_network_info() + connectivity = self.monitor.check_internet_connectivity() + self.root.after(0, self._update_dashboard, info, connectivity) + + def _update_dashboard(self, info, connectivity): + # Network info panel + self.dash_info.config(state='normal') + self.dash_info.delete('1.0', 'end') + self.dash_info.insert('end', "Network Information\n") + self.dash_info.insert('end', "═" * 48 + "\n\n") + for key, val in info.items(): + if key == 'interfaces': + self.dash_info.insert('end', "Interfaces:\n") + for iface, data in val.items(): + up = "UP" if data['is_up'] else "down" + ips = ', '.join(data['ipv4']) or 'no IPv4' + self.dash_info.insert('end', f" {iface}: {up} {ips}\n") + elif key != 'platform_version': + label = key.replace('_', ' ').title() + self.dash_info.insert('end', f"{label}: {val}\n") + self.dash_info.config(state='disabled') + + # Connectivity panel + connected = connectivity['connected'] + quality = connectivity['quality'] + self.conn_label.config( + text=f"Status: {'Connected' if connected else 'Disconnected'} ({quality})", + foreground='green' if connected else 'red', + ) + self.conn_details.config(state='normal') + self.conn_details.delete('1.0', 'end') + for test in connectivity['tests']: + sym = "✓" if test['reachable'] else "✗" + self.conn_details.insert('end', f" {sym} {test['domain']}\n") + self.conn_details.config(state='disabled') + self._status("Dashboard refreshed") + + # ------------------------------------------------------------------ + # Tab: Ping + # ------------------------------------------------------------------ + + def _tab_ping(self): + tab = ttk.Frame(self.nb, padding=6) + self.nb.add(tab, text="🏓 Ping") + tab.columnconfigure(0, weight=1) + tab.rowconfigure(1, weight=1) + + inp = ttk.Frame(tab) + inp.grid(row=0, column=0, sticky='ew', pady=4) + + ttk.Label(inp, text="Host:").grid(row=0, column=0, sticky='w') + self.ping_host = ttk.Entry(inp, width=36) + self.ping_host.grid(row=0, column=1, padx=4) + self.ping_host.insert(0, "8.8.8.8") + + ttk.Label(inp, text="Count:").grid(row=0, column=2, padx=(12, 0)) + self.ping_count = ttk.Spinbox(inp, from_=1, to=20, width=6) + self.ping_count.grid(row=0, column=3, padx=4) + self.ping_count.set(4) + + ttk.Label(inp, text="Timeout (s):").grid(row=0, column=4, padx=(12, 0)) + self.ping_timeout = ttk.Spinbox(inp, from_=1, to=10, width=6) + self.ping_timeout.grid(row=0, column=5, padx=4) + self.ping_timeout.set(2) + + ttk.Button(inp, text="Ping", command=self._do_ping).grid(row=0, column=6, padx=8) + + res_f = ttk.LabelFrame(tab, text="Results", padding=6) + res_f.grid(row=1, column=0, sticky='nsew', pady=4) + res_f.columnconfigure(0, weight=1) + res_f.rowconfigure(0, weight=1) + self.ping_results = scrolledtext.ScrolledText(res_f, wrap='word') + self.ping_results.grid(row=0, column=0, sticky='nsew') + + def _do_ping(self): + host = self.ping_host.get().strip() + if not host: + messagebox.showwarning("Input Error", "Please enter a host.") + return + count = int(self.ping_count.get()) + timeout = int(self.ping_timeout.get()) + self._status(f"Pinging {host}…") + threading.Thread( + target=self._run_ping, args=(host, count, timeout), daemon=True).start() + + def _run_ping(self, host, count, timeout): + result = self.monitor.ping_host(host, count=count, timeout=timeout) + self.root.after(0, self._show_ping, result) + + def _show_ping(self, result): + self.ping_results.delete('1.0', 'end') + self.ping_results.insert('end', f"Ping Results for {result['host']}\n") + self.ping_results.insert('end', "─" * 50 + "\n") + self.ping_results.insert('end', f"Reachable : {result.get('reachable', False)}\n") + self.ping_results.insert('end', f"Elapsed Time : {result.get('elapsed_time', 'N/A')} s\n") + if 'rtt_line' in result: + self.ping_results.insert('end', f"RTT : {result['rtt_line']}\n") + self.ping_results.insert('end', f"Timestamp : {result['timestamp']}\n") + if 'output' in result: + self.ping_results.insert('end', "\nRaw Output:\n" + result['output']) + if 'error' in result: + self.ping_results.insert('end', f"\nError: {result['error']}\n") + self._status("Ping complete") + + # ------------------------------------------------------------------ + # Tab: DNS Lookup + # ------------------------------------------------------------------ + + def _tab_dns(self): + tab = ttk.Frame(self.nb, padding=6) + self.nb.add(tab, text="🌐 DNS Lookup") + tab.columnconfigure(0, weight=1) + tab.rowconfigure(1, weight=1) + + inp = ttk.Frame(tab) + inp.grid(row=0, column=0, sticky='ew', pady=4) + + ttk.Label(inp, text="Domain:").grid(row=0, column=0, sticky='w') + self.dns_domain = ttk.Entry(inp, width=36) + self.dns_domain.grid(row=0, column=1, padx=4) + self.dns_domain.insert(0, "ionity.today") + + ttk.Label(inp, text="DNS Server:").grid(row=1, column=0, sticky='w', pady=4) + self.dns_server = ttk.Entry(inp, width=20) + self.dns_server.grid(row=1, column=1, padx=4, sticky='w') + + ttk.Label(inp, text="(leave blank for system default)").grid(row=1, column=2, sticky='w') + + btn_f = ttk.Frame(inp) + btn_f.grid(row=0, column=3, rowspan=2, padx=8) + ttk.Button(btn_f, text="Lookup", command=self._do_dns).grid(row=0, column=0, pady=2) + ttk.Button(btn_f, text="Try All DNS", command=self._do_dns_all).grid(row=1, column=0) + + res_f = ttk.LabelFrame(tab, text="Results", padding=6) + res_f.grid(row=1, column=0, sticky='nsew', pady=4) + res_f.columnconfigure(0, weight=1) + res_f.rowconfigure(0, weight=1) + self.dns_results = scrolledtext.ScrolledText(res_f, wrap='word') + self.dns_results.grid(row=0, column=0, sticky='nsew') + + def _do_dns(self): + domain = self.dns_domain.get().strip() + if not domain: + messagebox.showwarning("Input Error", "Please enter a domain.") + return + server = self.dns_server.get().strip() or None + self._status(f"Resolving {domain}…") + threading.Thread(target=self._run_dns, args=(domain, server), daemon=True).start() + + def _run_dns(self, domain, server): + result = self.monitor.check_dns_resolution(domain, server) + self.root.after(0, self._show_dns, [result]) + + def _do_dns_all(self): + domain = self.dns_domain.get().strip() + if not domain: + messagebox.showwarning("Input Error", "Please enter a domain.") + return + self._status(f"Resolving {domain} via all DNS servers…") + threading.Thread(target=self._run_dns_all, args=(domain,), daemon=True).start() + + def _run_dns_all(self, domain): + results = [] + for server in self.monitor.dns_servers: + results.append(self.monitor.check_dns_resolution(domain, server)) + self.root.after(0, self._show_dns, results) + + def _show_dns(self, results): + self.dns_results.delete('1.0', 'end') + for result in results: + self.dns_results.insert('end', f"DNS Resolution: {result['domain']}\n") + self.dns_results.insert('end', "─" * 50 + "\n") + self.dns_results.insert('end', f" Server : {result['dns_server']}\n") + if result['success']: + self.dns_results.insert('end', f" IP Addresses: {', '.join(result['ip_addresses'])}\n") + self.dns_results.insert('end', f" Query Time : {result['query_time']} ms\n") + else: + self.dns_results.insert('end', f" Error : {result.get('error', 'Unknown')}\n") + self.dns_results.insert('end', f" Timestamp : {result['timestamp']}\n\n") + self._status("DNS lookup complete") + + # ------------------------------------------------------------------ + # Tab: Pi-hole + # ------------------------------------------------------------------ + + def _tab_pihole(self): + tab = ttk.Frame(self.nb, padding=6) + self.nb.add(tab, text="🕳 Pi-hole") + tab.columnconfigure(0, weight=1) + tab.rowconfigure(1, weight=1) + + cfg_f = ttk.LabelFrame(tab, text="Pi-hole Configuration", padding=6) + cfg_f.grid(row=0, column=0, sticky='ew', pady=(0, 4)) + + ttk.Label(cfg_f, text="URL:").grid(row=0, column=0, sticky='w') + self.ph_url = ttk.Entry(cfg_f, width=36) + self.ph_url.grid(row=0, column=1, padx=4) + self.ph_url.insert(0, "http://192.168.1.1") + + ttk.Label(cfg_f, text="API Key:").grid(row=1, column=0, sticky='w', pady=4) + self.ph_key = ttk.Entry(cfg_f, width=36, show="*") + self.ph_key.grid(row=1, column=1, padx=4, pady=4) + + ttk.Label(cfg_f, text="(optional)").grid(row=1, column=2, sticky='w') + + btn_f = ttk.Frame(cfg_f) + btn_f.grid(row=2, column=0, columnspan=3, pady=4) + for text, cmd in [ + ("Status", self._ph_status), + ("Summary", self._ph_summary), + ("Top Blocked", self._ph_blocked), + ]: + ttk.Button(btn_f, text=text, command=cmd).pack(side='left', padx=4) + + res_f = ttk.LabelFrame(tab, text="Pi-hole Information", padding=6) + res_f.grid(row=1, column=0, sticky='nsew', pady=4) + res_f.columnconfigure(0, weight=1) + res_f.rowconfigure(0, weight=1) + self.ph_results = scrolledtext.ScrolledText(res_f, wrap='word') + self.ph_results.grid(row=0, column=0, sticky='nsew') + + def _pihole_instance(self) -> PiHoleMonitor: + url = self.ph_url.get().strip() + key = self.ph_key.get().strip() or None + return PiHoleMonitor(url, key) + + def _ph_status(self): + self._status("Checking Pi-hole status…") + threading.Thread(target=lambda: self.root.after( + 0, self._show_ph, self._pihole_instance().check_status()), daemon=True).start() + + def _ph_summary(self): + self._status("Getting Pi-hole summary…") + threading.Thread(target=lambda: self.root.after( + 0, self._show_ph_summary, self._pihole_instance().get_summary()), daemon=True).start() + + def _ph_blocked(self): + self._status("Getting top blocked domains…") + threading.Thread(target=lambda: self.root.after( + 0, self._show_ph_blocked, self._pihole_instance().get_top_blocked(10)), daemon=True).start() + + def _show_ph(self, result): + self.ph_results.delete('1.0', 'end') + self.ph_results.insert('end', f"Pi-hole Status: {self.ph_url.get()}\n") + self.ph_results.insert('end', "─" * 50 + "\n") + if result['success']: + self.ph_results.insert('end', f" Status : {result['status']}\n") + self.ph_results.insert('end', f" Enabled : {result['enabled']}\n") + else: + self.ph_results.insert('end', f" Error : {result.get('error', 'Unknown')}\n") + self.ph_results.insert('end', f"\n Timestamp: {result['timestamp']}\n") + self._status("Pi-hole status retrieved") + + def _show_ph_summary(self, result): + self.ph_results.delete('1.0', 'end') + self.ph_results.insert('end', f"Pi-hole Summary: {self.ph_url.get()}\n") + self.ph_results.insert('end', "─" * 50 + "\n") + if result['success']: + self.ph_results.insert('end', f" DNS Queries Today : {result['dns_queries_today']:,}\n") + self.ph_results.insert('end', f" Ads Blocked Today : {result['ads_blocked_today']:,}\n") + self.ph_results.insert('end', f" Percentage Blocked : {result['ads_percentage_today']:.1f}%\n") + self.ph_results.insert('end', f" Domains on Blocklist : {result['domains_being_blocked']:,}\n") + self.ph_results.insert('end', f" Status : {result['status']}\n") + else: + self.ph_results.insert('end', f" Error : {result.get('error', 'Unknown')}\n") + self.ph_results.insert('end', f"\n Timestamp: {result['timestamp']}\n") + self._status("Pi-hole summary retrieved") + + def _show_ph_blocked(self, result): + self.ph_results.delete('1.0', 'end') + self.ph_results.insert('end', f"Top Blocked Domains: {self.ph_url.get()}\n") + self.ph_results.insert('end', "─" * 50 + "\n") + if result['success']: + for i, (domain, count) in enumerate(result['top_ads'].items(), 1): + self.ph_results.insert('end', f" {i:2d}. {domain:<40} {count:,} blocks\n") + else: + self.ph_results.insert('end', f" Error : {result.get('error', 'Unknown')}\n") + self.ph_results.insert('end', f"\n Timestamp: {result['timestamp']}\n") + self._status("Top blocked domains retrieved") + + # ------------------------------------------------------------------ + # Tab: Live Monitor + # ------------------------------------------------------------------ + + def _tab_monitor(self): + tab = ttk.Frame(self.nb, padding=6) + self.nb.add(tab, text="📡 Live Monitor") + tab.columnconfigure(0, weight=1) + tab.rowconfigure(1, weight=1) + + ctrl = ttk.Frame(tab) + ctrl.grid(row=0, column=0, sticky='ew', pady=4) + + ttk.Label(ctrl, text="Monitor Host:").grid(row=0, column=0, sticky='w') + self.mon_host = ttk.Entry(ctrl, width=24) + self.mon_host.grid(row=0, column=1, padx=4) + self.mon_host.insert(0, "8.8.8.8") + + ttk.Label(ctrl, text="Interval (s):").grid(row=0, column=2, padx=(12, 0)) + self.mon_interval = ttk.Spinbox(ctrl, from_=1, to=60, width=6) + self.mon_interval.grid(row=0, column=3, padx=4) + self.mon_interval.set(5) + + self.mon_btn = ttk.Button(ctrl, text="▶ Start", command=self._toggle_monitoring) + self.mon_btn.grid(row=0, column=4, padx=8) + + ttk.Button(ctrl, text="Clear Log", + command=lambda: self.mon_display.delete('1.0', 'end')).grid(row=0, column=5) + + live_f = ttk.LabelFrame(tab, text="Live Activity", padding=6) + live_f.grid(row=1, column=0, sticky='nsew', pady=4) + live_f.columnconfigure(0, weight=1) + live_f.rowconfigure(0, weight=1) + self.mon_display = scrolledtext.ScrolledText(live_f, wrap='word') + self.mon_display.grid(row=0, column=0, sticky='nsew') + + def _toggle_monitoring(self): + if self.monitoring_active: + self.monitoring_active = False + self.mon_btn.config(text="▶ Start") + self._status("Monitoring stopped") + else: + self.monitoring_active = True + self.mon_btn.config(text="⏹ Stop") + interval = int(self.mon_interval.get()) + host = self.mon_host.get().strip() or '8.8.8.8' + self._monitor_thread = threading.Thread( + target=self._run_monitor, args=(host, interval), daemon=True) + self._monitor_thread.start() + self._status(f"Monitoring {host} every {interval}s…") + + def _run_monitor(self, host: str, interval: int): + while self.monitoring_active: + connectivity = self.monitor.check_internet_connectivity() + ping = self.monitor.ping_host(host, count=1, timeout=2) + net_sym = "✓" if connectivity['connected'] else "✗" + ping_sym = "✓" if ping.get('reachable', False) else "✗" + ts = time.strftime('%H:%M:%S') + line = ( + f"[{ts}] Internet: {net_sym} {connectivity['quality']:<12} | " + f"{host}: {ping_sym}\n" + ) + self.root.after(0, self._append_monitor, line) + time.sleep(interval) + + def _append_monitor(self, line: str): + self.mon_display.insert('end', line) + self.mon_display.see('end') + + # ------------------------------------------------------------------ + # Tab: Port Scanner + # ------------------------------------------------------------------ + + def _tab_ports(self): + tab = ttk.Frame(self.nb, padding=6) + self.nb.add(tab, text="🔍 Port Scanner") + tab.columnconfigure(0, weight=1) + tab.rowconfigure(1, weight=1) + + inp = ttk.Frame(tab) + inp.grid(row=0, column=0, sticky='ew', pady=4) + + ttk.Label(inp, text="Host:").grid(row=0, column=0, sticky='w') + self.port_host = ttk.Entry(inp, width=30) + self.port_host.grid(row=0, column=1, padx=4) + self.port_host.insert(0, "127.0.0.1") + + ttk.Label(inp, text="Ports (comma-separated):").grid(row=0, column=2, padx=(12, 0)) + self.port_list = ttk.Entry(inp, width=40) + self.port_list.grid(row=0, column=3, padx=4) + self.port_list.insert(0, "21,22,23,25,53,80,110,143,443,3306,5432,8080,8443") + + ttk.Button(inp, text="Scan", command=self._do_port_scan).grid(row=0, column=4, padx=8) + + res_f = ttk.LabelFrame(tab, text="Results", padding=6) + res_f.grid(row=1, column=0, sticky='nsew', pady=4) + res_f.columnconfigure(0, weight=1) + res_f.rowconfigure(0, weight=1) + self.port_results = scrolledtext.ScrolledText(res_f, wrap='word') + self.port_results.grid(row=0, column=0, sticky='nsew') + + def _do_port_scan(self): + host = self.port_host.get().strip() + raw = self.port_list.get().strip() + try: + ports = [int(p.strip()) for p in raw.split(',') if p.strip()] + except ValueError: + messagebox.showwarning("Input Error", "Ports must be comma-separated integers.") + return + if not host: + messagebox.showwarning("Input Error", "Please enter a host.") + return + self._status(f"Scanning ports on {host}…") + threading.Thread(target=self._run_port_scan, args=(host, ports), daemon=True).start() + + def _run_port_scan(self, host, ports): + result = self.monitor.get_open_ports(host=host, ports=ports) + self.root.after(0, self._show_ports, result) + + def _show_ports(self, result): + self.port_results.delete('1.0', 'end') + self.port_results.insert('end', f"Port Scan: {result['host']}\n") + self.port_results.insert('end', "─" * 50 + "\n") + open_ports = [p for p, o in sorted(result['ports'].items()) if o] + for port, is_open in sorted(result['ports'].items()): + state = "OPEN ✓" if is_open else "closed" + self.port_results.insert('end', f" Port {port:<6}: {state}\n") + self.port_results.insert('end', f"\n Open ports: {len(open_ports)}/{len(result['ports'])}\n") + self.port_results.insert('end', f" Timestamp : {result['timestamp']}\n") + self._status("Port scan complete") + + # ------------------------------------------------------------------ + # Tab: Traffic + # ------------------------------------------------------------------ + + def _tab_traffic(self): + tab = ttk.Frame(self.nb, padding=6) + self.nb.add(tab, text="📶 Traffic") + tab.columnconfigure(0, weight=1) + tab.rowconfigure(1, weight=1) + + ctrl = ttk.Frame(tab) + ctrl.grid(row=0, column=0, sticky='ew', pady=4) + ttk.Button(ctrl, text="⟳ Refresh Traffic", command=self._refresh_traffic).pack(side='left', padx=4) + + res_f = ttk.LabelFrame(tab, text="Interface Traffic Counters", padding=6) + res_f.grid(row=1, column=0, sticky='nsew', pady=4) + res_f.columnconfigure(0, weight=1) + res_f.rowconfigure(0, weight=1) + self.traffic_results = scrolledtext.ScrolledText(res_f, wrap='word', font=('Courier', 10)) + self.traffic_results.grid(row=0, column=0, sticky='nsew') + + # Load immediately + self._refresh_traffic() + + def _refresh_traffic(self): + self._status("Reading traffic counters…") + threading.Thread(target=self._run_traffic, daemon=True).start() + + def _run_traffic(self): + traffic = self.monitor.get_interface_traffic() + self.root.after(0, self._show_traffic, traffic) + + def _show_traffic(self, traffic): + self.traffic_results.delete('1.0', 'end') + hdr = f"{'Interface':<22} {'Sent':>12} {'Received':>12} {'ErrIn':>8} {'ErrOut':>8}\n" + self.traffic_results.insert('end', hdr) + self.traffic_results.insert('end', "─" * 66 + "\n") + for iface, data in traffic.items(): + self.traffic_results.insert( + 'end', + f"{iface:<22} " + f"{_bytes_human(data['bytes_sent']):>12} " + f"{_bytes_human(data['bytes_recv']):>12} " + f"{data['errin']:>8} " + f"{data['errout']:>8}\n" + ) + self.traffic_results.insert('end', f"\n Updated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") + self._status("Traffic counters updated") + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _status(self, msg: str): + self.status_var.set(msg) + self.root.update_idletasks() + + def _refresh_all(self): + threading.Thread(target=self._refresh_dashboard_data, daemon=True).start() + self._refresh_traffic() + self._status("Refreshing all data…") + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main(): + root = tk.Tk() + app = NetworkzeroGUI(root) # noqa: F841 + root.mainloop() + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fe2599a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +# NetworkzeroMonitor Dependencies +# Ionity (Pty) Ltd - www.ionity.today + +# Core networking libraries +dnspython>=2.4.2 +psutil>=5.9.6 +requests>=2.31.0 + +# Data visualization +matplotlib>=3.8.2 diff --git a/run_cli.bat b/run_cli.bat new file mode 100644 index 0000000..00e9a44 --- /dev/null +++ b/run_cli.bat @@ -0,0 +1,13 @@ +@echo off +REM NetworkzeroMonitor - Launch CLI (Windows) +REM Ionity (Pty) Ltd - www.ionity.today + +cd /d "%~dp0" + +if not exist "venv\Scripts\python.exe" ( + echo Virtual environment not found. Run setup.bat first. + pause + exit /b 1 +) + +venv\Scripts\python.exe networkzero_cli.py %* diff --git a/run_cli.sh b/run_cli.sh new file mode 100755 index 0000000..d910f00 --- /dev/null +++ b/run_cli.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +# NetworkzeroMonitor - Launch CLI (Unix/Linux/macOS) +# Ionity (Pty) Ltd - www.ionity.today + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$SCRIPT_DIR" + +if [ ! -f "venv/bin/python" ]; then + echo "Virtual environment not found. Run ./setup.sh first." + exit 1 +fi + +exec venv/bin/python networkzero_cli.py "$@" diff --git a/run_gui.bat b/run_gui.bat new file mode 100644 index 0000000..a0e22c1 --- /dev/null +++ b/run_gui.bat @@ -0,0 +1,13 @@ +@echo off +REM NetworkzeroMonitor - Launch GUI (Windows) +REM Ionity (Pty) Ltd - www.ionity.today + +cd /d "%~dp0" + +if not exist "venv\Scripts\python.exe" ( + echo Virtual environment not found. Run setup.bat first. + pause + exit /b 1 +) + +venv\Scripts\python.exe networkzero_gui.py diff --git a/run_gui.sh b/run_gui.sh new file mode 100755 index 0000000..91b32d1 --- /dev/null +++ b/run_gui.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +# NetworkzeroMonitor - Launch GUI (Unix/Linux/macOS) +# Ionity (Pty) Ltd - www.ionity.today + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$SCRIPT_DIR" + +if [ ! -f "venv/bin/python" ]; then + echo "Virtual environment not found. Run ./setup.sh first." + exit 1 +fi + +exec venv/bin/python networkzero_gui.py diff --git a/setup.bat b/setup.bat new file mode 100644 index 0000000..39b4a63 --- /dev/null +++ b/setup.bat @@ -0,0 +1,52 @@ +@echo off +REM NetworkzeroMonitor - Windows Setup Script +REM Ionity (Pty) Ltd - www.ionity.today + +echo ============================================ +echo NetworkzeroMonitor . Ionity (Pty) Ltd +echo Setup Script +echo ============================================ +echo. + +cd /d "%~dp0" + +REM Check Python +python --version >nul 2>&1 +if errorlevel 1 ( + echo ERROR: Python is not installed or not in PATH. + echo Please install Python 3.8 or higher from https://www.python.org/ + pause + exit /b 1 +) + +for /f "tokens=2" %%v in ('python --version 2^>^&1') do set PY_VER=%%v +echo Found Python %PY_VER% + +echo. +echo Creating virtual environment in .\venv ... +python -m venv venv + +echo Activating virtual environment... +call venv\Scripts\activate.bat + +echo Upgrading pip... +pip install --upgrade pip --quiet + +echo Installing dependencies from requirements.txt... +pip install -r requirements.txt --quiet + +echo. +echo ============================================ +echo Setup complete! +echo ============================================ +echo. +echo Activate the environment: +echo venv\Scripts\activate.bat +echo. +echo Launch the GUI: +echo run_gui.bat +echo. +echo Launch the CLI: +echo run_cli.bat --help +echo. +pause diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..bce1a4f --- /dev/null +++ b/setup.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# NetworkzeroMonitor - Unix/Linux/macOS Setup Script +# Ionity (Pty) Ltd - www.ionity.today +set -euo pipefail + +echo "════════════════════════════════════════════" +echo " NetworkzeroMonitor · Ionity (Pty) Ltd" +echo " Setup Script" +echo "════════════════════════════════════════════" +echo "" + +# Determine script directory so the venv is created next to the scripts +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$SCRIPT_DIR" + +# Check Python 3 +if ! command -v python3 &> /dev/null; then + echo "ERROR: Python 3 is not installed." + echo "Please install Python 3.8 or higher and re-run this script." + exit 1 +fi + +PY_VER=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") +echo "Found Python $PY_VER" + +# Create virtual environment +echo "" +echo "Creating virtual environment in ./venv …" +python3 -m venv venv + +# Activate +source venv/bin/activate + +# Upgrade pip +echo "Upgrading pip…" +pip install --upgrade pip --quiet + +# Install dependencies +echo "Installing dependencies from requirements.txt…" +pip install -r requirements.txt --quiet + +echo "" +echo "════════════════════════════════════════════" +echo " Setup complete!" +echo "════════════════════════════════════════════" +echo "" +echo " Activate the environment:" +echo " source venv/bin/activate" +echo "" +echo " Launch the GUI:" +echo " ./run_gui.sh" +echo "" +echo " Launch the CLI:" +echo " ./run_cli.sh --help" +echo "" diff --git a/test_networkzero.py b/test_networkzero.py new file mode 100644 index 0000000..6dbf448 --- /dev/null +++ b/test_networkzero.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +""" +NetworkzeroMonitor - Test Suite +Ionity (Pty) Ltd - www.ionity.today + +Tests all core functionality without requiring external network access. +""" + +import sys +import time +import unittest +from unittest.mock import MagicMock, patch + +from network_monitor import NetworkMonitor, PiHoleMonitor + + +class TestNetworkMonitorInfo(unittest.TestCase): + """Tests for get_network_info()""" + + def test_returns_hostname(self): + monitor = NetworkMonitor() + info = monitor.get_network_info() + self.assertIn('hostname', info) + self.assertIsInstance(info['hostname'], str) + self.assertTrue(len(info['hostname']) > 0) + + def test_returns_local_ip(self): + monitor = NetworkMonitor() + info = monitor.get_network_info() + self.assertIn('local_ip', info) + + def test_returns_platform(self): + monitor = NetworkMonitor() + info = monitor.get_network_info() + self.assertIn('platform', info) + + def test_returns_interfaces(self): + monitor = NetworkMonitor() + info = monitor.get_network_info() + self.assertIn('interfaces', info) + self.assertIsInstance(info['interfaces'], dict) + + def test_returns_timestamp(self): + monitor = NetworkMonitor() + info = monitor.get_network_info() + self.assertIn('timestamp', info) + + +class TestNetworkMonitorPing(unittest.TestCase): + """Tests for ping_host()""" + + def test_ping_localhost(self): + monitor = NetworkMonitor() + result = monitor.ping_host('127.0.0.1', count=1, timeout=2) + self.assertIn('host', result) + self.assertEqual(result['host'], '127.0.0.1') + self.assertIn('success', result) + self.assertIn('timestamp', result) + + def test_ping_unknown_host_returns_failure(self): + monitor = NetworkMonitor() + # An invalid hostname should fail gracefully + result = monitor.ping_host('this.host.does.not.exist.invalid', count=1, timeout=1) + self.assertIn('success', result) + self.assertIn('timestamp', result) + + def test_ping_result_has_elapsed_time(self): + monitor = NetworkMonitor() + result = monitor.ping_host('127.0.0.1', count=1, timeout=2) + if result['success']: + self.assertIn('elapsed_time', result) + self.assertGreaterEqual(result['elapsed_time'], 0) + + def test_ping_reachable_key_present(self): + monitor = NetworkMonitor() + result = monitor.ping_host('127.0.0.1', count=1, timeout=2) + self.assertIn('reachable', result) + + +class TestNetworkMonitorDNS(unittest.TestCase): + """Tests for check_dns_resolution()""" + + def test_dns_result_structure(self): + monitor = NetworkMonitor() + result = monitor.check_dns_resolution('localhost') + self.assertIn('domain', result) + self.assertIn('success', result) + self.assertIn('timestamp', result) + + def test_dns_failure_has_error(self): + monitor = NetworkMonitor() + result = monitor.check_dns_resolution('this.domain.does.not.exist.invalid') + self.assertFalse(result['success']) + self.assertIn('error', result) + + @patch('network_monitor.DNS_AVAILABLE', False) + def test_dns_fallback_socket(self): + monitor = NetworkMonitor() + result = monitor._dns_via_socket('localhost') + self.assertIn('success', result) + + +class TestNetworkMonitorTraffic(unittest.TestCase): + """Tests for get_interface_traffic()""" + + def test_returns_dict(self): + monitor = NetworkMonitor() + traffic = monitor.get_interface_traffic() + self.assertIsInstance(traffic, dict) + + def test_interface_has_required_keys(self): + monitor = NetworkMonitor() + traffic = monitor.get_interface_traffic() + for iface, data in traffic.items(): + for key in ('bytes_sent', 'bytes_recv', 'packets_sent', 'packets_recv'): + self.assertIn(key, data, f"Missing '{key}' for interface '{iface}'") + + +class TestNetworkMonitorPorts(unittest.TestCase): + """Tests for get_open_ports()""" + + def test_loopback_port_scan(self): + monitor = NetworkMonitor() + result = monitor.get_open_ports(host='127.0.0.1', ports=[22, 80]) + self.assertIn('host', result) + self.assertIn('ports', result) + self.assertIn('timestamp', result) + self.assertIn(22, result['ports']) + self.assertIn(80, result['ports']) + + def test_port_values_are_bool(self): + monitor = NetworkMonitor() + result = monitor.get_open_ports(host='127.0.0.1', ports=[9999]) + for port, is_open in result['ports'].items(): + self.assertIsInstance(is_open, bool) + + +class TestNetworkMonitorConnectivity(unittest.TestCase): + """Tests for check_internet_connectivity()""" + + def test_result_has_connected_key(self): + monitor = NetworkMonitor() + result = monitor.check_internet_connectivity() + self.assertIn('connected', result) + self.assertIsInstance(result['connected'], bool) + + def test_result_has_quality(self): + monitor = NetworkMonitor() + result = monitor.check_internet_connectivity() + self.assertIn('quality', result) + + def test_result_has_tests(self): + monitor = NetworkMonitor() + result = monitor.check_internet_connectivity() + self.assertIn('tests', result) + self.assertIsInstance(result['tests'], list) + + +class TestPiHoleMonitor(unittest.TestCase): + """Tests for PiHoleMonitor (mocked HTTP)""" + + def _make_pihole(self): + return PiHoleMonitor('http://127.0.0.1', api_key=None) + + def test_check_status_returns_dict(self): + pihole = self._make_pihole() + result = pihole.check_status() + self.assertIn('success', result) + self.assertIn('timestamp', result) + + def test_check_status_failure_on_connection_error(self): + pihole = self._make_pihole() + result = pihole.check_status() + # Pi-hole is not running locally so it should fail gracefully + self.assertFalse(result['success']) + self.assertIn('error', result) + + def test_get_summary_returns_dict(self): + pihole = self._make_pihole() + result = pihole.get_summary() + self.assertIn('success', result) + + def test_get_top_blocked_returns_dict(self): + pihole = self._make_pihole() + result = pihole.get_top_blocked(count=5) + self.assertIn('success', result) + + @patch('network_monitor.requests.get') + def test_get_summary_success(self, mock_get): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'ads_blocked_today': 100, + 'dns_queries_today': 1000, + 'ads_percentage_today': 10.0, + 'domains_being_blocked': 5000, + 'status': 'enabled', + } + mock_get.return_value = mock_response + + pihole = PiHoleMonitor('http://192.168.1.1') + result = pihole.get_summary() + + self.assertTrue(result['success']) + self.assertEqual(result['ads_blocked_today'], 100) + self.assertEqual(result['status'], 'enabled') + + @patch('network_monitor.requests.get') + def test_check_status_success(self, mock_get): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {'status': 'enabled'} + mock_get.return_value = mock_response + + pihole = PiHoleMonitor('http://192.168.1.1') + result = pihole.check_status() + + self.assertTrue(result['success']) + self.assertTrue(result['enabled']) + self.assertEqual(result['status'], 'enabled') + + +def run_manual_demo(): + """Quick manual demo that prints results to stdout.""" + print("=" * 60) + print("NetworkzeroMonitor - Manual Demo") + print("Ionity (Pty) Ltd - www.ionity.today") + print("=" * 60) + + monitor = NetworkMonitor() + + print("\n[1] Network Information") + print("-" * 40) + info = monitor.get_network_info() + print(f" Hostname : {info.get('hostname', 'N/A')}") + print(f" Local IP : {info.get('local_ip', 'N/A')}") + print(f" Platform : {info.get('platform', 'N/A')}") + + print("\n[2] Ping localhost") + print("-" * 40) + ping = monitor.ping_host('127.0.0.1', count=2) + print(f" Reachable: {ping.get('reachable', False)}") + + print("\n[3] DNS (localhost)") + print("-" * 40) + dns = monitor.check_dns_resolution('localhost') + if dns['success']: + print(f" IPs: {', '.join(dns['ip_addresses'])}") + else: + print(f" Error: {dns.get('error')}") + + print("\n[4] Interface Traffic") + print("-" * 40) + traffic = monitor.get_interface_traffic() + for iface, data in list(traffic.items())[:3]: + print(f" {iface}: sent={data['bytes_sent']:,} B, recv={data['bytes_recv']:,} B") + + print("\n[5] Port Scan (127.0.0.1, ports 22/80/443)") + print("-" * 40) + ports = monitor.get_open_ports('127.0.0.1', ports=[22, 80, 443]) + for p, is_open in sorted(ports['ports'].items()): + print(f" Port {p}: {'OPEN' if is_open else 'closed'}") + + print("\n[6] Pi-hole (localhost — expected to fail)") + print("-" * 40) + pihole = PiHoleMonitor('http://127.0.0.1') + result = pihole.check_status() + print(f" Success: {result['success']}") + if not result['success']: + print(f" (Expected) Error: {result.get('error')}") + + +if __name__ == '__main__': + if '--demo' in sys.argv: + run_manual_demo() + else: + unittest.main(verbosity=2) From 1e18a5dae395369d4d233c5525e03c5eff0f23b5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 14 Apr 2026 10:57:19 +0000 Subject: [PATCH 3/3] Fix security and code quality issues from review - network_monitor.py: Sanitize Pi-hole API error messages to redact auth token values that may appear in urllib3 exception strings - config.ini: Align window size with GUI defaults (1024x720) - networkzero_gui.py: Remove unused 'app' variable in main() - test_networkzero.py: Test DNS fallback via public check_dns_resolution() instead of the private _dns_via_socket() method Agent-Logs-Url: https://github.com/AntwerpDesignsIonity/NetworkzeroMonitor/sessions/b4accdbc-220d-4921-b773-5a6231dde98e Co-authored-by: AntwerpDesignsIonity <211600625+AntwerpDesignsIonity@users.noreply.github.com> --- config.ini | 4 ++-- network_monitor.py | 23 ++++++++++++++--------- networkzero_gui.py | 2 +- test_networkzero.py | 2 +- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/config.ini b/config.ini index c6f6a5d..30fac07 100644 --- a/config.ini +++ b/config.ini @@ -29,8 +29,8 @@ ping_timeout = 2 [UI] # GUI window size -window_width = 900 -window_height = 700 +window_width = 1024 +window_height = 720 # Theme # Options: default, clam, alt, classic diff --git a/network_monitor.py b/network_monitor.py index 9a3d59c..8e91412 100644 --- a/network_monitor.py +++ b/network_monitor.py @@ -362,12 +362,20 @@ def __init__(self, pihole_url: str, api_key: Optional[str] = None): self.pihole_url = pihole_url.rstrip('/') self.api_key = api_key + @staticmethod + def _sanitize_error(exc: Exception) -> str: + """Return an error string with any auth token value redacted.""" + import re + return re.sub(r'(auth=)[^&\s\'"]+', r'\1[REDACTED]', str(exc)) + def _api_get(self, params: str) -> Dict: """Helper to call the Pi-hole admin API.""" - url = f"{self.pihole_url}/admin/api.php?{params}" + base_url = f"{self.pihole_url}/admin/api.php?{params}" if self.api_key: - url += f"&auth={self.api_key}" - response = requests.get(url, timeout=5) + base_url += "&auth=" + # Build final URL separately so the key value isn't in exception context + request_url = (base_url + self.api_key) if self.api_key else base_url + response = requests.get(request_url, timeout=5) response.raise_for_status() return response.json() @@ -392,7 +400,7 @@ def get_summary(self) -> Dict: except Exception as exc: return { 'success': False, - 'error': str(exc), + 'error': self._sanitize_error(exc), 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), } @@ -417,7 +425,7 @@ def get_top_blocked(self, count: int = 10) -> Dict: except Exception as exc: return { 'success': False, - 'error': str(exc), + 'error': self._sanitize_error(exc), 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), } @@ -439,12 +447,9 @@ def check_status(self) -> Dict: except Exception as exc: return { 'success': False, - 'error': str(exc), + 'error': self._sanitize_error(exc), 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), } - - -if __name__ == '__main__': monitor = NetworkMonitor() print("NetworkzeroMonitor - Core Module Test") print("Ionity (Pty) Ltd - www.ionity.today") diff --git a/networkzero_gui.py b/networkzero_gui.py index 73a248a..e120336 100644 --- a/networkzero_gui.py +++ b/networkzero_gui.py @@ -636,7 +636,7 @@ def _refresh_all(self): def main(): root = tk.Tk() - app = NetworkzeroGUI(root) # noqa: F841 + NetworkzeroGUI(root) root.mainloop() diff --git a/test_networkzero.py b/test_networkzero.py index 6dbf448..fa15b67 100644 --- a/test_networkzero.py +++ b/test_networkzero.py @@ -96,7 +96,7 @@ def test_dns_failure_has_error(self): @patch('network_monitor.DNS_AVAILABLE', False) def test_dns_fallback_socket(self): monitor = NetworkMonitor() - result = monitor._dns_via_socket('localhost') + result = monitor.check_dns_resolution('localhost') self.assertIn('success', result)