diff --git a/README.md b/README.md index 9df4097..7c749a3 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ The default configuration file comes with a sample configuration, making it easy connections = True # IP connections metrics connection_stats = False # Open IP connections metrics + connection_traffic = False # Open IP connections traffic metrics (high cardinality) interface = True # Interfaces traffic metrics wireguard_peers = False # Wireguard peers metrics @@ -512,7 +513,7 @@ With many connected devices everywhere, one can often only guess where do they g ``` connection_stats = False # Open IP connections metrics ``` -Setting this to `True` obviously enables the feature and allows to see something like that: +Setting this to `True` obviously enables the feature and allows to see something like that. Both IPv4 and IPv6 connections are included: conns @@ -529,6 +530,24 @@ Let's go check on that in the dashboard, or just get the info right from the com ``` *A few quick checks show all of the destination IPs relate to AWS instances, so supposedly it's legit... but let's remain vigilant, to know better :)* +The exporter reports these connection totals: + +- `mktxp_ip_connections_total` for combined IPv4 and IPv6 totals +- `mktxp_ipv4_connections_total` for IPv4-only totals +- `mktxp_ipv6_connections_total` for IPv6-only totals + +If you need per-connection byte counters for currently active src/dst/protocol tuples, enable the following option: + +``` +connection_traffic = False # Open IP connections traffic metrics (high cardinality) +``` + +This produces the following metrics for both IPv4 and IPv6 connection tracking tables: + +- `mktxp_connection_upload_bytes` +- `mktxp_connection_download_bytes` +- `mktxp_connection_total_bytes` + ### Parallel routers fetch Concurrent exports across multiple devices can considerably speed up things for slow network connections. This feature can be turned on and configured with the following [system options](https://github.com/akpw/mktxp/blob/main/README.md#mktxp-system-configuration): diff --git a/deploy/kubernetes/secret.yaml b/deploy/kubernetes/secret.yaml index 3c69b9d..cbe0a32 100644 --- a/deploy/kubernetes/secret.yaml +++ b/deploy/kubernetes/secret.yaml @@ -87,6 +87,7 @@ stringData: connections = True # IP connections metrics connection_stats = False # Open IP connections metrics + connection_traffic = False # Open IP connections traffic metrics (high cardinality) interface = True # Interfaces traffic metrics wireguard_peers = False # Wireguard peers metrics diff --git a/mktxp/cli/config/config.py b/mktxp/cli/config/config.py index 44e6aef..e8507a7 100755 --- a/mktxp/cli/config/config.py +++ b/mktxp/cli/config/config.py @@ -90,6 +90,7 @@ class MKTXPConfigKeys: FE_DHCP_LEASE_KEY = 'dhcp_lease' FE_IP_CONNECTIONS_KEY = 'connections' FE_CONNECTION_STATS_KEY = 'connection_stats' + FE_CONNECTION_TRAFFIC_KEY = 'connection_traffic' FE_INTERFACE_KEY = 'interface' FE_WG_PEER_KEY = 'wireguard_peers' @@ -213,7 +214,7 @@ class MKTXPConfigKeys: BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, FE_CHECK_FOR_UPDATES, FE_KID_CONTROL_DEVICE, FE_KID_CONTROL_DYNAMIC,FE_WG_PEER_KEY, - SSL_CERTIFICATE_VERIFY, FE_IPV6_ROUTE_KEY, FE_IPV6_DHCP_POOL_KEY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY, FE_CONNECTION_STATS_KEY, FE_BFD_KEY, FE_BGP_KEY, + SSL_CERTIFICATE_VERIFY, FE_IPV6_ROUTE_KEY, FE_IPV6_DHCP_POOL_KEY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY, FE_CONNECTION_STATS_KEY, FE_CONNECTION_TRAFFIC_KEY, FE_BFD_KEY, FE_BGP_KEY, FE_EOIP_KEY, FE_GRE_KEY, FE_IPIP_KEY, FE_IPSEC_KEY, FE_LTE_KEY, FE_SWITCH_PORT_KEY, FE_ROUTING_STATS_KEY, FE_CERTIFICATE_KEY, FE_DNS_KEY, FE_CONTAINER_KEY, FE_W60G_KEY, FE_MODULE_ONLY_KEY, FE_BRIDGE_VLAN_KEY} # Feature keys enabled by default @@ -248,7 +249,7 @@ class ConfigEntry: MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY, MKTXPConfigKeys.SSL_CHECK_HOSTNAME, MKTXPConfigKeys.SSL_CA_FILE, MKTXPConfigKeys.PLAINTEXT_LOGIN_KEY, MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_HEALTH_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY,MKTXPConfigKeys.FE_WG_PEER_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_W60G_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY, - MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CONNECTION_STATS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, + MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CONNECTION_STATS_KEY, MKTXPConfigKeys.FE_CONNECTION_TRAFFIC_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY, MKTXPConfigKeys.FE_INTERFACE_NAME_FORMAT, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_ADDRESS_LIST_KEY, MKTXPConfigKeys.FE_NEIGHBOR_KEY, MKTXPConfigKeys.FE_DNS_KEY, MKTXPConfigKeys.FE_IPV6_ROUTE_KEY, MKTXPConfigKeys.FE_IPV6_DHCP_POOL_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_ADDRESS_LIST_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY, diff --git a/mktxp/cli/config/mktxp.conf b/mktxp/cli/config/mktxp.conf index 3b2aa57..f25e749 100644 --- a/mktxp/cli/config/mktxp.conf +++ b/mktxp/cli/config/mktxp.conf @@ -43,6 +43,7 @@ connections = True # IP connections metrics connection_stats = False # Open IP connections metrics + connection_traffic = False # Open IP connections traffic metrics (high cardinality) interface = True # Interfaces traffic metrics wireguard_peers = False # Wireguard peers metrics diff --git a/mktxp/collector/connection_collector.py b/mktxp/collector/connection_collector.py index 62c4e9a..9ea16f5 100644 --- a/mktxp/collector/connection_collector.py +++ b/mktxp/collector/connection_collector.py @@ -14,7 +14,7 @@ from mktxp.collector.base_collector import BaseCollector from mktxp.flow.processor.output import BaseOutputProcessor -from mktxp.datasource.connection_ds import IPConnectionDatasource, IPConnectionStatsDatasource +from mktxp.datasource.connection_ds import IPConnectionDatasource, IPConnectionStatsDatasource, IPConnectionTrafficDatasource class IPConnectionCollector(BaseCollector): @@ -23,10 +23,16 @@ class IPConnectionCollector(BaseCollector): @staticmethod def collect(router_entry): if router_entry.config_entry.connections: - connection_records = IPConnectionDatasource.metric_records(router_entry) + connection_records = IPConnectionDatasource.metric_records(router_entry, include_stack_counts = True) if connection_records: - connection_metrics = BaseCollector.gauge_collector('ip_connections_total', 'Number of IP connections', connection_records, 'count',) - yield connection_metrics + connection_metrics = ( + ('ip_connections_total', 'Number of IP connections', 'count'), + ('ipv4_connections_total', 'Number of IPv4 connections', 'ipv4_count'), + ('ipv6_connections_total', 'Number of IPv6 connections', 'ipv6_count'), + ) + + for metric_name, metric_desc, metric_key in connection_metrics: + yield BaseCollector.gauge_collector(metric_name, metric_desc, connection_records, metric_key,) if router_entry.config_entry.connection_stats: connection_stats_records = IPConnectionStatsDatasource.metric_records(router_entry) @@ -39,3 +45,19 @@ def collect(router_entry): connection_stats_records, 'connection_count', connection_stats_labels) yield connection_stats_metrics_gauge + if router_entry.config_entry.connection_traffic: + connection_traffic_records = IPConnectionTrafficDatasource.metric_records(router_entry) + if connection_traffic_records: + for connection_traffic_record in connection_traffic_records: + BaseOutputProcessor.augment_record(router_entry, connection_traffic_record, id_key = 'src_address') + + connection_traffic_labels = ['src_address', 'dst_address', 'protocol', 'dhcp_name'] + connection_traffic_metrics = ( + ('connection_upload_bytes', 'Observed uploaded bytes on active src to dst connections', 'upload_bytes'), + ('connection_download_bytes', 'Observed downloaded bytes on active src to dst connections', 'download_bytes'), + ('connection_total_bytes', 'Observed total bytes on active src to dst connections', 'total_bytes'), + ) + + for metric_name, metric_desc, metric_key in connection_traffic_metrics: + yield BaseCollector.gauge_collector(metric_name, metric_desc, + connection_traffic_records, metric_key, connection_traffic_labels) diff --git a/mktxp/datasource/connection_ds.py b/mktxp/datasource/connection_ds.py index fd5357e..381e43a 100644 --- a/mktxp/datasource/connection_ds.py +++ b/mktxp/datasource/connection_ds.py @@ -18,46 +18,120 @@ class IPConnectionDatasource: ''' IP connections data provider - ''' + ''' + _RESOURCE_PATHS = (('ipv4', '/ip/firewall/connection/'), ('ipv6', '/ipv6/firewall/connection/')) + @staticmethod - def metric_records(router_entry, *, metric_labels = None): + def metric_records(router_entry, *, metric_labels = None, include_stack_counts = False): if metric_labels is None: - metric_labels = [] + metric_labels = [] try: - res = router_entry.api_connection.router_api().get_resource('/ip/firewall/connection/').call('print', {'count-only': ''}) - # result processing as described at: https://github.com/socialwifi/RouterOS-api/issues/79#issuecomment-2089744809 - cnt_str = res.done_message.get('ret') - try: - count = int(cnt_str) - except (ValueError, TypeError): - cnt_str = '0' - records = [{'count': cnt_str}] + count_by_family = IPConnectionDatasource._count_connection_records_by_family(router_entry) + records = [{'count': str(sum(count_by_family.values()))}] + if include_stack_counts: + records[0]['ipv4_count'] = str(count_by_family['ipv4']) + records[0]['ipv6_count'] = str(count_by_family['ipv6']) return BaseDSProcessor.trimmed_records(router_entry, router_records = records, metric_labels = metric_labels) except Exception as exc: print(f'Error getting IP connection info from router {router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') return None + @staticmethod + def _count_connection_records_by_family(router_entry): + router_api = router_entry.api_connection.router_api() + count_by_family = {'ipv4': 0, 'ipv6': 0} + last_exc = None + has_success = False + + for family, resource_path in IPConnectionDatasource._RESOURCE_PATHS: + try: + res = router_api.get_resource(resource_path).call('print', {'count-only': ''}) + cnt_str = res.done_message.get('ret') + try: + count_by_family[family] = int(cnt_str) + except (ValueError, TypeError): + pass + has_success = True + except Exception as exc: + last_exc = exc + + if not has_success: + raise last_exc if last_exc else RuntimeError('Unable to read connection counters') + + return count_by_family + + @staticmethod + def _read_connection_records(router_entry, *, proplist): + router_api = router_entry.api_connection.router_api() + records = [] + last_exc = None + has_success = False + + for _, resource_path in IPConnectionDatasource._RESOURCE_PATHS: + try: + connection_records = router_api.get_resource(resource_path).call('print', {'proplist': proplist}) + records.extend(connection_records) + has_success = True + except Exception as exc: + last_exc = exc + + if not has_success: + raise last_exc if last_exc else RuntimeError('Unable to read connection records') + + return records + + @staticmethod + def _normalize_address(address, port = None): + if not address: + return '' + + if port: + bracketed_suffix = f']:{port}' + plain_suffix = f':{port}' + + if address.startswith('[') and address.endswith(bracketed_suffix): + address = address[1:-len(bracketed_suffix)] + elif address.endswith(plain_suffix): + address = address[:-len(plain_suffix)] + elif address.startswith('[') and address.endswith(']'): + address = address[1:-1] + + return address + + @staticmethod + def _format_address(address, port = None): + address = IPConnectionDatasource._normalize_address(address, port) + if not address or not port: + return address + + if ':' in address: + return f'[{address}]:{port}' + + return f'{address}:{port}' + class IPConnectionStatsDatasource: ''' IP connections stats data provider - ''' + ''' @staticmethod def metric_records(router_entry, *, metric_labels = None, add_router_id = True): if metric_labels is None: - metric_labels = [] + metric_labels = [] try: # First, check if there are any connections count_records = IPConnectionDatasource.metric_records(router_entry) if count_records[0].get('count', 0) == '0': return [] - connection_records = router_entry.api_connection.router_api().get_resource('/ip/firewall/connection/').call('print', \ - {'proplist':'src-address,dst-address,protocol'}) - # calculate number of connections per src-address + connection_records = IPConnectionDatasource._read_connection_records( + router_entry, proplist = 'src-address,src-port,dst-address,dst-port,protocol') + # calculate number of connections per src-address connections_per_src_address = {} for connection_record in connection_records: - address = connection_record['src-address'].split(':')[0] - destination = f"{connection_record.get('dst-address')}({connection_record.get('protocol')})" + address = IPConnectionDatasource._normalize_address( + connection_record.get('src-address'), connection_record.get('src-port')) + destination = f"{IPConnectionDatasource._format_address(connection_record.get('dst-address'), connection_record.get('dst-port'))}" \ + f"({connection_record.get('protocol')})" count, destinations = 0, set() if connections_per_src_address.get(address): @@ -70,14 +144,65 @@ def metric_records(router_entry, *, metric_labels = None, add_router_id = True): records = [] for key, entry in connections_per_src_address.items(): record = {'src_address': key, 'connection_count': entry.count, 'dst_addresses': ', '.join(entry.destinations)} - if add_router_id: + if add_router_id: for router_key, router_value in router_entry.router_id.items(): record[router_key] = router_value records.append(record) - return records + return records except Exception as exc: print(f'Error getting IP connection stats info from router {router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') return None -ConnStatsEntry = namedtuple('ConnStatsEntry', ['count', 'destinations']) \ No newline at end of file +class IPConnectionTrafficDatasource: + ''' IP connection traffic data provider + ''' + @staticmethod + def metric_records(router_entry, *, metric_labels = None, add_router_id = True): + if metric_labels is None: + metric_labels = [] + try: + count_records = IPConnectionDatasource.metric_records(router_entry) + if count_records[0].get('count', 0) == '0': + return [] + + connection_records = IPConnectionDatasource._read_connection_records( + router_entry, proplist = 'src-address,src-port,dst-address,dst-port,protocol,orig-bytes,repl-bytes') + traffic_per_connection = {} + for connection_record in connection_records: + src_address = IPConnectionDatasource._normalize_address( + connection_record.get('src-address'), connection_record.get('src-port')) + dst_address = IPConnectionDatasource._normalize_address( + connection_record.get('dst-address'), connection_record.get('dst-port')) + protocol = connection_record.get('protocol', '') + key = (src_address, dst_address, protocol) + + upload_bytes = int(connection_record.get('orig-bytes') or 0) + download_bytes = int(connection_record.get('repl-bytes') or 0) + + entry = traffic_per_connection.get(key, ConnTrafficEntry(0, 0)) + traffic_per_connection[key] = ConnTrafficEntry( + entry.upload_bytes + upload_bytes, + entry.download_bytes + download_bytes, + ) + + records = [] + for (src_address, dst_address, protocol), entry in traffic_per_connection.items(): + record = {'src_address': src_address, + 'dst_address': dst_address, + 'protocol': protocol, + 'upload_bytes': entry.upload_bytes, + 'download_bytes': entry.download_bytes, + 'total_bytes': entry.upload_bytes + entry.download_bytes} + if add_router_id: + for router_key, router_value in router_entry.router_id.items(): + record[router_key] = router_value + records.append(record) + return BaseDSProcessor.trimmed_records(router_entry, router_records = records, metric_labels = metric_labels) + except Exception as exc: + print(f'Error getting IP connection traffic info from router {router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') + return None + + +ConnStatsEntry = namedtuple('ConnStatsEntry', ['count', 'destinations']) +ConnTrafficEntry = namedtuple('ConnTrafficEntry', ['upload_bytes', 'download_bytes']) diff --git a/mktxp/flow/collector_handler.py b/mktxp/flow/collector_handler.py index 33be87e..44d18b9 100644 --- a/mktxp/flow/collector_handler.py +++ b/mktxp/flow/collector_handler.py @@ -14,7 +14,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from timeit import default_timer from datetime import datetime -from threading import Event, Timer +from threading import Event, Lock, Timer from mktxp.cli.config.config import config_handler from mktxp.cli.config.config import MKTXPConfigKeys @@ -26,6 +26,8 @@ def __init__(self, entries_handler, collector_registry): self.entries_handler = entries_handler self.collector_registry = collector_registry self.last_collect_timestamp = 0 + self._metrics_cache = [] + self._cache_lock = Lock() def collect_sync(self): @@ -118,19 +120,32 @@ def timeout(timeout_event): def collect(self): if not self._valid_collect_interval(): + # Within minimal_collect_interval: replay the last successful collection + # so concurrent or closely-spaced scrapers do not see an empty registry + # (which would otherwise drop every mktxp_* series for that scrape and + # surface as phantom "down" gaps in dashboards). + with self._cache_lock: + cached = list(self._metrics_cache) + yield from cached return # bandwidth collector - yield from self.collector_registry.bandwidthCollector.collect() + collected = list(self.collector_registry.bandwidthCollector.collect()) # all other collectors # Check whether to run in parallel by looking at the mktxp system configuration parallel = config_handler.system_entry.fetch_routers_in_parallel max_worker_threads = config_handler.system_entry.max_worker_threads if parallel: - yield from self.collect_async(max_worker_threads=max_worker_threads) + collected.extend(self.collect_async(max_worker_threads=max_worker_threads)) else: - yield from self.collect_sync() + collected.extend(self.collect_sync()) + + if collected: + with self._cache_lock: + self._metrics_cache = collected + + yield from collected def _valid_collect_interval(self): now = datetime.now().timestamp() diff --git a/tests/collector/test_connection_collector.py b/tests/collector/test_connection_collector.py index ffc265c..99bca29 100644 --- a/tests/collector/test_connection_collector.py +++ b/tests/collector/test_connection_collector.py @@ -12,55 +12,196 @@ # GNU General Public License for more details. import pytest -from unittest.mock import MagicMock -from mktxp.datasource.connection_ds import IPConnectionStatsDatasource +from unittest.mock import MagicMock, patch + +from mktxp.collector.connection_collector import IPConnectionCollector +from mktxp.datasource.connection_ds import IPConnectionDatasource, IPConnectionStatsDatasource, IPConnectionTrafficDatasource from mktxp.flow.router_entry import RouterEntry -@pytest.mark.parametrize("connection_count_str, should_make_stats_call", [ - ('0', False), # Scenario with zero connections - ('123', True), # Scenario with non-zero connections -]) -def test_ip_connection_stats_datasource_checks_count_first(connection_count_str, should_make_stats_call): - """ - Verifies that IPConnectionStatsDatasource checks the connection count and avoids fetching the full stats list if the count is 0 - """ - # Mocking the router_entry and its components + +def _build_mock_router_entry(): mock_router_entry = MagicMock(spec=RouterEntry) mock_router_entry.router_name = "TestRouter" mock_router_entry.config_entry = MagicMock() mock_router_entry.config_entry.hostname = "testhost" + mock_router_entry.config_entry.connections = False + mock_router_entry.config_entry.connection_stats = False + mock_router_entry.config_entry.connection_traffic = False mock_router_entry.api_connection = MagicMock() mock_router_entry.router_id = {'routerboard_name': 'test_router'} + return mock_router_entry + - # Mock the API call & responces +def _configure_connection_api(mock_router_entry, *, ipv4_count = '0', ipv6_count = '0', + ipv4_records = None, ipv6_records = None, proplist = ''): mock_api = MagicMock() mock_router_entry.api_connection.router_api.return_value = mock_api - call_mock = mock_api.get_resource.return_value.call - count_response = MagicMock() - count_response.done_message = {'ret': connection_count_str} - - stats_response = [{'src-address': '1.1.1.1:123', 'dst-address': '2.2.2.2:80', 'protocol': 'tcp'}] - - # Side effect function to route calls based on arguments - def api_call_router(*args, **kwargs): - params = args[1] - if params.get('count-only') == '': - return count_response - elif params.get('proplist') == 'src-address,dst-address,protocol': - return stats_response - return MagicMock() - - call_mock.side_effect = api_call_router - - # Test the method of focus + + if ipv4_records is None: + ipv4_records = [] + if ipv6_records is None: + ipv6_records = [] + + def get_resource_side_effect(path): + mock_resource = MagicMock() + + def call_side_effect(command, params): + assert command == 'print' + if params.get('count-only') == '': + count_response = MagicMock() + count_response.done_message = {'ret': ipv4_count if path == '/ip/firewall/connection/' else ipv6_count} + return count_response + + assert params.get('proplist') == proplist + return ipv4_records if path == '/ip/firewall/connection/' else ipv6_records + + mock_resource.call.side_effect = call_side_effect + return mock_resource + + mock_api.get_resource.side_effect = get_resource_side_effect + return mock_api + + +@pytest.mark.parametrize("ipv4_count, ipv6_count, should_make_stats_call", [ + ('0', '0', False), + ('123', '0', True), + ('0', '7', True), +]) +def test_ip_connection_stats_datasource_checks_count_first(ipv4_count, ipv6_count, should_make_stats_call): + """ + Verifies that IPConnectionStatsDatasource checks both IPv4 and IPv6 connection counters and + avoids fetching the full stats list if both are empty. + """ + mock_router_entry = _build_mock_router_entry() + mock_api = _configure_connection_api( + mock_router_entry, + ipv4_count = ipv4_count, + ipv6_count = ipv6_count, + ipv4_records = [{'src-address': '1.1.1.1:123', 'src-port': '123', 'dst-address': '2.2.2.2:80', 'dst-port': '80', 'protocol': 'tcp'}], + ipv6_records = [{'src-address': '[2001:db8::10]:5353', 'src-port': '5353', 'dst-address': '[2001:db8::20]:443', 'dst-port': '443', 'protocol': 'tcp'}], + proplist = 'src-address,src-port,dst-address,dst-port,protocol' + ) + result = IPConnectionStatsDatasource.metric_records(mock_router_entry) + if should_make_stats_call: - # This one should have been called twice, once for count and once for the stats - assert call_mock.call_count == 2 + assert mock_api.get_resource.call_count == 4 assert result is not None assert len(result) > 0 - assert result[0]['src_address'] == '1.1.1.1' else: - # And this just once for the count - call_mock.assert_called_once() + assert mock_api.get_resource.call_count == 2 assert result == [] + + +def test_ip_connection_stats_datasource_supports_ipv4_and_ipv6(): + """ + Verifies that IPConnectionStatsDatasource keeps IPv6 addresses intact while still stripping + transport ports from both IPv4 and IPv6 source addresses. + """ + mock_router_entry = _build_mock_router_entry() + _configure_connection_api( + mock_router_entry, + ipv4_count = '1', + ipv6_count = '1', + ipv4_records = [{'src-address': '1.1.1.1:123', 'src-port': '123', 'dst-address': '2.2.2.2:80', 'dst-port': '80', 'protocol': 'tcp'}], + ipv6_records = [{'src-address': '[2001:db8::10]:5353', 'src-port': '5353', + 'dst-address': '[2001:db8::20]:443', 'dst-port': '443', 'protocol': 'tcp'}], + proplist = 'src-address,src-port,dst-address,dst-port,protocol' + ) + + result = IPConnectionStatsDatasource.metric_records(mock_router_entry) + records_by_src = {record['src_address']: record for record in result} + + assert records_by_src['1.1.1.1']['dst_addresses'] == '2.2.2.2:80(tcp)' + assert records_by_src['2001:db8::10']['dst_addresses'] == '[2001:db8::20]:443(tcp)' + + +def test_ip_connection_datasource_exposes_total_ipv4_and_ipv6_counts(): + """ + Verifies that IPConnectionDatasource returns combined and per-family connection totals. + """ + mock_router_entry = _build_mock_router_entry() + _configure_connection_api( + mock_router_entry, + ipv4_count = '123', + ipv6_count = '7', + ) + + result = IPConnectionDatasource.metric_records(mock_router_entry, include_stack_counts = True) + + assert result[0]['count'] == '130' + assert result[0]['ipv4_count'] == '123' + assert result[0]['ipv6_count'] == '7' + + +def test_ip_connection_traffic_datasource_supports_ipv4_and_ipv6(): + """ + Verifies that IPConnectionTrafficDatasource aggregates active connection byte counters for both + IPv4 and IPv6 records. + """ + mock_router_entry = _build_mock_router_entry() + _configure_connection_api( + mock_router_entry, + ipv4_count = '2', + ipv6_count = '1', + ipv4_records = [ + {'src-address': '1.1.1.1:123', 'src-port': '123', 'dst-address': '2.2.2.2:80', 'dst-port': '80', + 'protocol': 'tcp', 'orig-bytes': '10', 'repl-bytes': '20'}, + {'src-address': '1.1.1.1:123', 'src-port': '123', 'dst-address': '2.2.2.2:80', 'dst-port': '80', + 'protocol': 'tcp', 'orig-bytes': '1', 'repl-bytes': '2'}, + ], + ipv6_records = [ + {'src-address': '[2001:db8::10]:5353', 'src-port': '5353', 'dst-address': '[2001:db8::20]:443', 'dst-port': '443', + 'protocol': 'tcp', 'orig-bytes': '100', 'repl-bytes': '200'}, + ], + proplist = 'src-address,src-port,dst-address,dst-port,protocol,orig-bytes,repl-bytes' + ) + + result = IPConnectionTrafficDatasource.metric_records(mock_router_entry) + records_by_key = {(record['src_address'], record['dst_address'], record['protocol']): record for record in result} + + assert records_by_key[('1.1.1.1', '2.2.2.2', 'tcp')]['upload_bytes'] == 11 + assert records_by_key[('1.1.1.1', '2.2.2.2', 'tcp')]['download_bytes'] == 22 + assert records_by_key[('1.1.1.1', '2.2.2.2', 'tcp')]['total_bytes'] == 33 + assert records_by_key[('2001:db8::10', '2001:db8::20', 'tcp')]['total_bytes'] == 300 + + +def test_ip_connection_collector_collects_connection_traffic_metrics(): + """ + Verifies that IPConnectionCollector emits the per-connection traffic metrics only when the + dedicated feature flag is enabled. + """ + mock_router_entry = _build_mock_router_entry() + mock_router_entry.config_entry.connection_traffic = True + + connection_traffic_records = [{'src_address': '1.1.1.1', + 'dst_address': '2.2.2.2', + 'protocol': 'tcp', + 'upload_bytes': 10, + 'download_bytes': 20, + 'total_bytes': 30}] + + with patch('mktxp.collector.connection_collector.IPConnectionTrafficDatasource.metric_records', + return_value = connection_traffic_records), \ + patch('mktxp.collector.connection_collector.BaseOutputProcessor.augment_record'): + metrics = list(IPConnectionCollector.collect(mock_router_entry)) + + metric_names = [metric.name for metric in metrics] + assert metric_names == ['mktxp_connection_upload_bytes', 'mktxp_connection_download_bytes', 'mktxp_connection_total_bytes'] + + +def test_ip_connection_collector_collects_total_ipv4_and_ipv6_metrics(): + """ + Verifies that IPConnectionCollector emits combined and per-family connection totals. + """ + mock_router_entry = _build_mock_router_entry() + mock_router_entry.config_entry.connections = True + + connection_records = [{'count': '130', 'ipv4_count': '123', 'ipv6_count': '7'}] + + with patch('mktxp.collector.connection_collector.IPConnectionDatasource.metric_records', + return_value = connection_records): + metrics = list(IPConnectionCollector.collect(mock_router_entry)) + + metric_names = [metric.name for metric in metrics] + assert metric_names == ['mktxp_ip_connections_total', 'mktxp_ipv4_connections_total', 'mktxp_ipv6_connections_total'] diff --git a/tests/flow/test_collector_handler.py b/tests/flow/test_collector_handler.py index 7aba4b7..7aa1478 100644 --- a/tests/flow/test_collector_handler.py +++ b/tests/flow/test_collector_handler.py @@ -59,3 +59,94 @@ def test_collect_sync_lifecycle(is_ready, is_done_called, mock_entries_handler, mock_router_entry.is_done.assert_called_once() else: mock_router_entry.is_done.assert_not_called() + + + +class _StubMetric: + """Identifiable placeholder for a yielded metric family.""" + def __init__(self, label): + self.label = label + def __eq__(self, other): + return isinstance(other, _StubMetric) and other.label == self.label + def __hash__(self): + return hash((type(self), self.label)) + def __repr__(self): + return f'_StubMetric({self.label!r})' + + +def _system_entry_stub(mci): + """Build a stand-in for config_handler.system_entry that the handler reads.""" + return type('SysEntry', (), { + 'minimal_collect_interval': mci, + 'fetch_routers_in_parallel': False, + 'max_worker_threads': 1, + 'verbose_mode': False, + })() + + +def _make_handler(per_call_metrics): + from mktxp.flow.collector_handler import CollectorHandler + + router_entry = MagicMock() + router_entry.is_ready.return_value = True + router_entry.time_spent = {} + + entries_handler = MagicMock() + entries_handler.router_entries = [router_entry] + + registry = MagicMock() + registry.bandwidthCollector.collect.return_value = [] + + iterator = iter(per_call_metrics) + def collect_func(_entry): + try: + yield from next(iterator) + except StopIteration: + return + + registry.registered_collectors = {'mock_collector': collect_func} + return CollectorHandler(entries_handler, registry) + + +def test_collect_caches_and_replays_on_mci_defer(monkeypatch): + """When MCI defers, the handler must yield the cached metric families + instead of an empty registry — otherwise Prometheus drops every mktxp_* + series for that scrape and dashboards show false-down.""" + from mktxp.flow import collector_handler as ch_mod + + fresh = [_StubMetric('a'), _StubMetric('b')] + handler = _make_handler([fresh]) + + fake_cfg = MagicMock() + fake_cfg.system_entry = _system_entry_stub(mci=5) + monkeypatch.setattr(ch_mod, 'config_handler', fake_cfg) + + first = list(handler.collect()) + assert first == fresh, 'first scrape should yield freshly collected metrics' + + # Second call lands well within MCI; the inner collector iterator is + # exhausted, so without the cache the result would be empty. + second = list(handler.collect()) + assert second == fresh, 'within-MCI scrape must replay the cached metrics' + + +def test_empty_collection_does_not_clobber_cache(monkeypatch): + """If a fresh collection produces nothing (e.g., all routers not_ready), + the cache must be preserved so the next within-MCI scrape still has data.""" + from mktxp.flow import collector_handler as ch_mod + + seed = [_StubMetric('seed')] + handler = _make_handler([seed, []]) + + fake_cfg = MagicMock() + fake_cfg.system_entry = _system_entry_stub(mci=0) + monkeypatch.setattr(ch_mod, 'config_handler', fake_cfg) + + assert list(handler.collect()) == seed + + # Force a successful but empty collection (MCI=0 → always runs fresh). + assert list(handler.collect()) == [] + + # Bump MCI so the next call defers; cache should still hold the seed. + fake_cfg.system_entry = _system_entry_stub(mci=60) + assert list(handler.collect()) == seed