Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 254 additions & 22 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ def _handle_requests(self):
"message": "Consumption reset successfully",
}
response_type = ZMQRequestType.RESET_CONSUMPTION_RESPONSE
elif request_msg.request_type == ZMQRequestType.KV_RETRIEVE_META:
response_body = self._mock_kv_retrieve_meta(request_msg.body)
response_type = ZMQRequestType.KV_RETRIEVE_META_RESPONSE
elif request_msg.request_type == ZMQRequestType.KV_RETRIEVE_KEYS:
response_body = self._mock_kv_retrieve_keys(request_msg.body)
response_type = ZMQRequestType.KV_RETRIEVE_KEYS_RESPONSE
Expand Down Expand Up @@ -193,7 +196,7 @@ def _mock_batch_meta(self, request_body):

return {"metadata": metadata}

def _mock_kv_retrieve_keys(self, request_body):
def _mock_kv_retrieve_meta(self, request_body):
"""Mock KV retrieve keys response."""
keys = request_body.get("keys", [])
create = request_body.get("create", False)
Expand Down Expand Up @@ -250,6 +253,42 @@ def _mock_kv_list(self, request_body):

return {"partition_info": {partition_id: {k: {} for k in keys}}, "message": "success"}

def _mock_kv_retrieve_keys(self, request_body):
"""Mock KV retrieve indexes response."""
global_indexes = request_body.get("global_indexes", [])
partition_id = request_body.get("partition_id", "")

# Initialize key tracking if not exists
if not hasattr(self, "_kv_partition_keys"):
self._kv_partition_keys = {}

# Initialize index to key mapping if not exists
if not hasattr(self, "_kv_index_to_key"):
self._kv_index_to_key = {}

# Get keys for this partition
partition_keys = self._kv_partition_keys.get(partition_id, [])

# Build reverse mapping from index to key if needed
if not hasattr(self, "_kv_partition_index_map"):
self._kv_partition_index_map = {}

if partition_id not in self._kv_partition_index_map:
# Build the mapping from stored keys
start_idx = self._get_next_kv_index(partition_id) - len(partition_keys)
self._kv_partition_index_map[partition_id] = {}
for i, key in enumerate(partition_keys):
self._kv_partition_index_map[partition_id][start_idx + i] = key

index_map = self._kv_partition_index_map.get(partition_id, {})

# Retrieve keys for the given global_indexes
keys = []
for idx in global_indexes:
keys.append(index_map.get(idx, None))

return {"keys": keys}

def _get_next_kv_index(self, partition_id):
"""Get next available index for KV keys in partition."""
if not hasattr(self, "_kv_index_map"):
Expand Down Expand Up @@ -970,12 +1009,12 @@ class TestClientKVInterface:
"""Tests for client KV interface methods."""

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_single(self, client_setup):
"""Test async_kv_retrieve_keys with single key."""
async def test_async_kv_retrieve_meta_single(self, client_setup):
"""Test async_kv_retrieve_meta with single key."""
client, _, _ = client_setup

# Test async_kv_retrieve_keys with single key
metadata = await client.async_kv_retrieve_keys(
# Test async_kv_retrieve_meta with single key
metadata = await client.async_kv_retrieve_meta(
keys="test_key_1",
partition_id="test_partition",
create=True,
Expand All @@ -988,13 +1027,13 @@ async def test_async_kv_retrieve_keys_single(self, client_setup):
assert metadata.size == 1

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_multiple(self, client_setup):
"""Test async_kv_retrieve_keys with multiple keys."""
async def test_async_kv_retrieve_meta_multiple(self, client_setup):
"""Test async_kv_retrieve_meta with multiple keys."""
client, _, _ = client_setup

# Test async_kv_retrieve_keys with multiple keys
# Test async_kv_retrieve_meta with multiple keys
keys = ["key_a", "key_b", "key_c"]
metadata = await client.async_kv_retrieve_keys(
metadata = await client.async_kv_retrieve_meta(
keys=keys,
partition_id="test_partition",
create=True,
Expand All @@ -1007,19 +1046,19 @@ async def test_async_kv_retrieve_keys_multiple(self, client_setup):
assert metadata.size == 3

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_create_false(self, client_setup):
"""Test async_kv_retrieve_keys with create=False (retrieve existing keys)."""
async def test_async_kv_retrieve_meta_create_false(self, client_setup):
"""Test async_kv_retrieve_meta with create=False (retrieve existing keys)."""
client, _, _ = client_setup

# create some keys
await client.async_kv_retrieve_keys(
await client.async_kv_retrieve_meta(
keys="existing_key",
partition_id="existing_partition",
create=True,
)

# Then retrieve them with create=False
metadata = await client.async_kv_retrieve_keys(
metadata = await client.async_kv_retrieve_meta(
keys="existing_key",
partition_id="existing_partition",
create=False,
Expand All @@ -1030,13 +1069,13 @@ async def test_async_kv_retrieve_keys_create_false(self, client_setup):
assert metadata.size == 1

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_invalid_keys_type(self, client_setup):
"""Test async_kv_retrieve_keys raises error with invalid keys type."""
async def test_async_kv_retrieve_meta_invalid_keys_type(self, client_setup):
"""Test async_kv_retrieve_meta raises error with invalid keys type."""
client, _, _ = client_setup

# Test with invalid keys type (not string or list)
with pytest.raises(TypeError):
await client.async_kv_retrieve_keys(
await client.async_kv_retrieve_meta(
keys=123, # Invalid type
partition_id="test_partition",
create=True,
Expand All @@ -1048,7 +1087,7 @@ async def test_async_kv_list_with_keys(self, client_setup):
client, mock_controller, _ = client_setup

# First register some keys
await client.async_kv_retrieve_keys(
await client.async_kv_retrieve_meta(
keys=["key_1", "key_2"],
partition_id="kv_partition",
create=True,
Expand All @@ -1069,12 +1108,12 @@ async def test_async_kv_list_multiple_partitions(self, client_setup):
client, _, _ = client_setup

# Create keys in different partitions
await client.async_kv_retrieve_keys(
await client.async_kv_retrieve_meta(
keys="partition_a_key",
partition_id="partition_a",
create=True,
)
await client.async_kv_retrieve_keys(
await client.async_kv_retrieve_meta(
keys="partition_b_key",
partition_id="partition_b",
create=True,
Expand All @@ -1096,19 +1135,212 @@ async def test_async_kv_list_multiple_partitions(self, client_setup):
assert list(partition_a["partition_a"].values()) == [{}]
assert list(partition_b["partition_b"].values()) == [{}]

def test_kv_retrieve_keys_type_validation(self, client_setup):
"""Test synchronous kv_retrieve_keys type validation."""
def test_kv_retrieve_meta_type_validation(self, client_setup):
"""Test synchronous kv_retrieve_meta type validation."""
import asyncio

client, _, _ = client_setup

# Test with non-string element in list
async def test_invalid_list():
with pytest.raises(TypeError):
await client.async_kv_retrieve_keys(
await client.async_kv_retrieve_meta(
keys=["valid_key", 123], # Invalid: 123 is not a string
partition_id="test_partition",
create=True,
)

asyncio.run(test_invalid_list())

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_single(self, client_setup):
"""Test async_kv_retrieve_keys with single global_index."""
client, _, _ = client_setup
partition_id = "test_partition_idx"

# First create a key using kv_retrieve_meta
await client.async_kv_retrieve_meta(
keys=["test_key"],
partition_id=partition_id,
create=True,
)

# Now retrieve the key using global_index 0
keys = await client.async_kv_retrieve_keys(
global_indexes=[0],
partition_id=partition_id,
)

assert keys == ["test_key"]

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_multiple(self, client_setup):
"""Test async_kv_retrieve_keys with multiple global_indexes."""
client, _, _ = client_setup
partition_id = "test_partition_idx"

# First create keys using kv_retrieve_meta
keys_to_create = ["key_a", "key_b", "key_c"]
await client.async_kv_retrieve_meta(
keys=keys_to_create,
partition_id=partition_id,
create=True,
)

# Retrieve keys using global_indexes [0, 1, 2]
keys = await client.async_kv_retrieve_keys(
global_indexes=[0, 1, 2],
partition_id=partition_id,
)

assert keys == ["key_a", "key_b", "key_c"]

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_partial(self, client_setup):
"""Test async_kv_retrieve_keys with subset of global_indexes."""
client, _, _ = client_setup
partition_id = "test_partition_idx"

# First create keys using kv_retrieve_meta
await client.async_kv_retrieve_meta(
keys=["first_key", "second_key", "third_key"],
partition_id=partition_id,
create=True,
)

# Retrieve only first and third keys
keys = await client.async_kv_retrieve_keys(
global_indexes=[0, 2],
partition_id=partition_id,
)

assert keys == ["first_key", "third_key"]

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_single_int(self, client_setup):
"""Test async_kv_retrieve_keys accepts a single int."""
client, _, _ = client_setup
partition_id = "test_partition_idx"

# First create a key using kv_retrieve_meta
await client.async_kv_retrieve_meta(
keys=["single_key"],
partition_id=partition_id,
create=True,
)

# Now retrieve the key using a single int (not a list)
keys = await client.async_kv_retrieve_keys(
global_indexes=0,
partition_id=partition_id,
)

assert keys == ["single_key"]

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_invalid_type(self, client_setup):
"""Test async_kv_retrieve_keys raises error with invalid global_indexes type."""
client, _, _ = client_setup

# Test with invalid type (string instead of int)
with pytest.raises(TypeError):
await client.async_kv_retrieve_keys(
global_indexes=["not_an_int"],
partition_id="test_partition",
)

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_empty_list(self, client_setup):
"""Test async_kv_retrieve_keys raises error with empty list."""
client, _, _ = client_setup

with pytest.raises(ValueError):
await client.async_kv_retrieve_keys(
global_indexes=[],
partition_id="test_partition",
)

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_non_existent(self, client_setup):
"""Test async_kv_retrieve_keys returns None for non-existent global_indexes."""
client, _, _ = client_setup
partition_id = "test_partition_idx"

# First create a key using kv_retrieve_meta
await client.async_kv_retrieve_meta(
keys=["existing_key"],
partition_id=partition_id,
create=True,
)

# Try to retrieve a non-existent global_index
keys = await client.async_kv_retrieve_keys(
global_indexes=[99],
partition_id=partition_id,
)
assert keys == [None]

@pytest.mark.asyncio
async def test_async_kv_retrieve_keys_multiple_partitions(self, client_setup):
"""Test async_kv_retrieve_keys returns keys from the correct partition."""
client, _, _ = client_setup
partition_1 = "partition_1"
partition_2 = "partition_2"

# Create keys in both partitions
await client.async_kv_retrieve_meta(
keys=["key_1"],
partition_id=partition_1,
create=True,
)
await client.async_kv_retrieve_meta(
keys=["key_2"],
partition_id=partition_2,
create=True,
)

# Retrieve key from partition_1 (global_index 0)
keys_1 = await client.async_kv_retrieve_keys(
global_indexes=[0],
partition_id=partition_1,
)

# Retrieve key from partition_2 (global_index 0)
keys_2 = await client.async_kv_retrieve_keys(
global_indexes=[0],
partition_id=partition_2,
)

assert keys_1 == ["key_1"]
assert keys_2 == ["key_2"]

def test_kv_retrieve_keys_sync(self, client_setup):
"""Test synchronous kv_retrieve_keys."""
client, _, _ = client_setup
partition_id = "test_partition_sync"

# First create a key using kv_retrieve_meta
client.kv_retrieve_meta(
keys=["sync_key"],
partition_id=partition_id,
create=True,
)

# Now retrieve the key using global_index
keys = client.kv_retrieve_keys(
global_indexes=[0],
partition_id=partition_id,
)

assert keys == ["sync_key"]

def test_kv_retrieve_keys_type_validation(self, client_setup):
"""Test synchronous kv_retrieve_keys type validation."""
client, _, _ = client_setup

# Test with non-int element in list
with pytest.raises(TypeError):
client.kv_retrieve_keys(
global_indexes=[0, "invalid"],
partition_id="test_partition",
)
Loading