Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 65 additions & 8 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3638,9 +3638,11 @@
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
consistency_level=ConsistencyLevel.ONE,
fetch_size=self._schema_meta_page_size)
local_query = QueryMessage(query=maybe_add_timeout_to_query(sel_local, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
consistency_level=ConsistencyLevel.ONE,
fetch_size=self._schema_meta_page_size)
Comment on lines +3641 to +3645

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any support for paging in connection.wait_for_responses, which is used here. Won't this just query just a first page and ignore subsequent pages? cc @dkropachev

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot - please respond to the above feedback.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right! I've addressed this by implementing a _fetch_all_pages helper method that properly handles paging by fetching all pages, not just the first one. The implementation now:

  1. Fetches the first page via wait_for_responses
  2. Checks if there's a paging_state in the result
  3. If yes, continues fetching additional pages using wait_for_response with the paging_state
  4. Combines all rows from all pages into the final result

This ensures all nodes are discovered in large clusters (>1000 nodes) while still using paging to avoid unpaged query metrics.

Changes in commit ef9dc96.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot and is there any test for the new functionality?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are two comprehensive unit tests added in commit 2dce180:

  1. test_topology_queries_use_paging - Verifies that all topology queries have the fetch_size parameter set correctly
  2. test_topology_queries_fetch_all_pages - Verifies that the _fetch_all_pages method properly handles multi-page results by:
    • Mocking a result with a paging_state
    • Verifying that subsequent pages are fetched
    • Confirming that all rows from all pages are combined correctly

Both tests pass successfully (24 total tests in test_control_connection.py).

(peers_success, peers_result), (local_success, local_result) = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout, fail_on_error=False)

Expand All @@ -3652,10 +3654,17 @@
self._uses_peers_v2 = False
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
consistency_level=ConsistencyLevel.ONE,
fetch_size=self._schema_meta_page_size)
peers_result = connection.wait_for_response(
peers_query, timeout=self._timeout)

# Fetch all pages if there are more results
# Note: system.local always has exactly 1 row, so it will never have additional pages
# system.peers might have multiple pages for very large clusters (>1000 nodes)
peers_result = self._fetch_all_pages(connection, peers_result, peers_query, self._timeout)
local_result = self._fetch_all_pages(connection, local_result, local_query, self._timeout)

shared_results = (peers_result, local_result)
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=-1)
Expand Down Expand Up @@ -3716,6 +3725,34 @@
pass
return None

def _fetch_all_pages(self, connection, result, query_msg, timeout):
"""
Fetch all pages for a paged query result.
Returns the result with all parsed_rows combined from all pages.
"""
if not result or not result.paging_state:
return result

all_rows = list(result.parsed_rows) if result.parsed_rows else []

# Save original paging_state to restore later
original_paging_state = query_msg.paging_state

try:
while result and result.paging_state:
query_msg.paging_state = result.paging_state
result = connection.wait_for_response(query_msg, timeout=timeout)
if result and result.parsed_rows:
all_rows.extend(result.parsed_rows)

# Update the result with all rows
if result:
result.parsed_rows = all_rows
return result
finally:
# Restore original paging_state to prevent affecting subsequent uses of this QueryMessage
query_msg.paging_state = original_paging_state

def shutdown(self):
# stop trying to reconnect (if we are)
with self._reconnection_lock:
Expand Down Expand Up @@ -3797,11 +3834,19 @@
log.debug("[control connection] Refreshing node list and token map")
sel_local = self._SELECT_LOCAL
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
consistency_level=cl)
consistency_level=cl,
fetch_size=self._schema_meta_page_size)
local_query = QueryMessage(query=maybe_add_timeout_to_query(sel_local, self._metadata_request_timeout),
consistency_level=cl)
consistency_level=cl,
fetch_size=self._schema_meta_page_size)
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)

# Fetch all pages if there are more results
# Note: system.local always has exactly 1 row, so it will never have additional pages
# system.peers might have multiple pages for very large clusters (>1000 nodes)
peers_result = self._fetch_all_pages(connection, peers_result, peers_query, self._timeout)
local_result = self._fetch_all_pages(connection, local_result, local_query, self._timeout)

peers_result = dict_factory(peers_result.column_names, peers_result.parsed_rows)

Expand Down Expand Up @@ -3856,10 +3901,14 @@
# in system.local. See CASSANDRA-9436.
local_rpc_address_query = QueryMessage(
query=maybe_add_timeout_to_query(self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
consistency_level=ConsistencyLevel.ONE,
fetch_size=self._schema_meta_page_size)
success, local_rpc_address_result = connection.wait_for_response(
local_rpc_address_query, timeout=self._timeout, fail_on_error=False)
if success:
# Fetch all pages for consistency (system.local table always contains exactly one row, so this is effectively a no-op)
local_rpc_address_result = self._fetch_all_pages(connection, local_rpc_address_result,
local_rpc_address_query, self._timeout)
row = dict_factory(
local_rpc_address_result.column_names,
local_rpc_address_result.parsed_rows)
Expand Down Expand Up @@ -4092,13 +4141,21 @@

while elapsed < total_timeout:
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
consistency_level=cl)
consistency_level=cl,
fetch_size=self._schema_meta_page_size)
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
consistency_level=cl)
consistency_level=cl,
fetch_size=self._schema_meta_page_size)
try:
timeout = min(self._timeout, total_timeout - elapsed)
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=timeout)

# Fetch all pages if there are more results
# Note: system.local always has exactly 1 row, so it will never have additional pages
# system.peers might have multiple pages for very large clusters (>1000 nodes)
peers_result = self._fetch_all_pages(connection, peers_result, peers_query, timeout)
local_result = self._fetch_all_pages(connection, local_result, local_query, timeout)
except OperationTimedOut as timeout:
log.debug("[control connection] Timed out waiting for "
"response during schema agreement check: %s", timeout)
Expand Down Expand Up @@ -4313,7 +4370,7 @@
if task:
log.debug("Not executing scheduled task due to Scheduler shutdown")
return
if run_at <= time.time():

Check failure on line 4373 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncio (3.11)

cannot schedule new futures after shutdown

Check failure on line 4373 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncio (3.11)

cannot schedule new futures after shutdown
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
Expand Down
72 changes: 69 additions & 3 deletions tests/unit/test_control_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import unittest

from concurrent.futures import ThreadPoolExecutor
from unittest.mock import Mock, ANY, call
from unittest.mock import Mock, ANY, call, MagicMock

from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType
from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS
from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType, ConsistencyLevel
from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS, QueryMessage
from cassandra.cluster import ControlConnection, _Scheduler, ProfileManager, EXEC_PROFILE_DEFAULT, ExecutionProfile
from cassandra.pool import Host
from cassandra.connection import EndPoint, DefaultEndPoint, DefaultEndPointFactory
Expand Down Expand Up @@ -305,6 +305,72 @@ def test_refresh_nodes_and_tokens(self):

assert self.connection.wait_for_responses.call_count == 1

def test_topology_queries_use_paging(self):
"""
Test that topology queries (system.peers and system.local) use fetch_size parameter
"""
# Test during refresh_node_list_and_token_map
self.control_connection.refresh_node_list_and_token_map()

# Verify that wait_for_responses was called
assert self.connection.wait_for_responses.called

# Get the QueryMessage arguments - both should be QueryMessage instances
call_args = self.connection.wait_for_responses.call_args[0]

# Verify both arguments are QueryMessage instances with fetch_size set
for query_msg in call_args:
assert isinstance(query_msg, QueryMessage)
assert query_msg.fetch_size == self.control_connection._schema_meta_page_size

def test_topology_queries_fetch_all_pages(self):
"""
Test that topology queries fetch all pages when results are paged
"""
# Create mock connection
mock_connection = MagicMock()
mock_connection.endpoint = DefaultEndPoint("192.168.1.0")
mock_connection.original_endpoint = mock_connection.endpoint

# Create first page of peers results with paging_state
first_page = ResultMessage(kind=RESULT_KIND_ROWS)
first_page.column_names = ["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"]
first_page.parsed_rows = [["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"]]
first_page.paging_state = b"has_more_pages"

# Create second page of peers results without paging_state
second_page = ResultMessage(kind=RESULT_KIND_ROWS)
second_page.column_names = ["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"]
second_page.parsed_rows = [["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]
second_page.paging_state = None

# Create local result without paging
local_result = ResultMessage(kind=RESULT_KIND_ROWS)
local_result.column_names = ["rpc_address", "schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"]
local_result.parsed_rows = [["192.168.1.0", "a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]
local_result.paging_state = None

# Setup mock: first call returns first page, second call returns second page
mock_connection.wait_for_responses.return_value = (first_page, local_result)
mock_connection.wait_for_response.return_value = second_page

# Test _fetch_all_pages
self.control_connection._connection = mock_connection
query_msg = QueryMessage(query="SELECT * FROM system.peers",
consistency_level=ConsistencyLevel.ONE,
fetch_size=self.control_connection._schema_meta_page_size)

result = self.control_connection._fetch_all_pages(mock_connection, first_page, query_msg, timeout=5)

# Verify that both pages were fetched
assert len(result.parsed_rows) == 2
assert result.parsed_rows[0][0] == "192.168.1.1"
assert result.parsed_rows[1][0] == "192.168.1.2"
assert result.paging_state is None

# Verify wait_for_response was called once to fetch the second page
assert mock_connection.wait_for_response.called

def test_refresh_nodes_and_tokens_with_invalid_peers(self):
def refresh_and_validate_added_hosts():
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(
Expand Down
Loading