Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions tests/integration/_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,39 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, TypeVar

from crawlee._utils.crypto import crypto_random_object_id

from apify import Actor

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable

T = TypeVar('T')


async def call_with_exp_backoff(fn: Callable[[], Awaitable[T]], *, max_retries: int = 3) -> T | None:
"""Call an async callable with exponential backoff retries until it returns a truthy value.

In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests
become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with
exponential backoff to handle that delay in integration tests.
"""
result = None

for attempt in range(max_retries):
result = await fn()

if result:
return result

delay = 2**attempt
Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
await asyncio.sleep(delay)

return result


def generate_unique_resource_name(label: str) -> str:
"""Generates a unique resource name, which will contain the given label."""
Expand Down
144 changes: 93 additions & 51 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from crawlee import service_locator
from crawlee.crawlers import BasicCrawler

from ._utils import generate_unique_resource_name
from ._utils import call_with_exp_backoff, generate_unique_resource_name
from apify import Actor, Request
from apify.storage_clients import ApifyStorageClient
from apify.storage_clients._apify import ApifyRequestQueueClient
Expand All @@ -26,25 +26,12 @@
from apify.storage_clients._apify._models import ApifyRequestQueueMetadata


async def fetch_next_request_with_exp_backoff(rq: RequestQueue, max_retries: int = 5) -> Request | None:
"""Fetch the next request with exponential backoff retries.

In shared request queue mode, there is a propagation delay before newly added or reclaimed requests become visible
(see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle
that delay in integration tests.
"""
for attempt in range(max_retries):
result = await rq.fetch_next_request()
if result is not None:
return result
delay = 2**attempt
Actor.log.info(f'fetch_next_request returned None, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
await asyncio.sleep(delay)
return None


async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None:
async def test_add_and_fetch_requests(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test basic functionality of adding and fetching requests."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

desired_request_count = 100
Actor.log.info('Opening request queue...')
Expand All @@ -70,12 +57,19 @@ async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None
f'desired_request_count={desired_request_count}',
)
Actor.log.info('Waiting for queue to be finished...')
is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()
assert is_finished is True, f'is_finished={is_finished}'


async def test_add_requests_in_batches(request_queue_apify: RequestQueue) -> None:
async def test_add_requests_in_batches(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test adding multiple requests in a single batch operation."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

desired_request_count = 100
rq = request_queue_apify
Expand All @@ -101,12 +95,19 @@ async def test_add_requests_in_batches(request_queue_apify: RequestQueue) -> Non
f'handled_request_count={handled_request_count}',
f'desired_request_count={desired_request_count}',
)
is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()
assert is_finished is True, f'is_finished={is_finished}'


async def test_add_non_unique_requests_in_batch(request_queue_apify: RequestQueue) -> None:
async def test_add_non_unique_requests_in_batch(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test adding requests with duplicate unique keys in batch."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

desired_request_count = 100
rq = request_queue_apify
Expand Down Expand Up @@ -137,7 +138,10 @@ async def test_add_non_unique_requests_in_batch(request_queue_apify: RequestQueu
f'handled_request_count={handled_request_count}',
f'expected_count={expected_count}',
)
is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()
Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}')
assert is_finished is True, f'is_finished={is_finished}'

Expand Down Expand Up @@ -255,7 +259,7 @@ async def test_request_reclaim_functionality(
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
# (see https://github.com/apify/apify-sdk-python/issues/808).
if rq_access_mode == 'shared':
request2 = await fetch_next_request_with_exp_backoff(rq)
request2 = await call_with_exp_backoff(rq.fetch_next_request)
else:
request2 = await rq.fetch_next_request()

Expand All @@ -266,7 +270,10 @@ async def test_request_reclaim_functionality(

# Mark as handled this time
await rq.mark_request_as_handled(request2)
is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()
assert is_finished is True


Expand Down Expand Up @@ -300,7 +307,7 @@ async def test_request_reclaim_with_forefront(
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
# (see https://github.com/apify/apify-sdk-python/issues/808).
if rq_access_mode == 'shared':
next_request = await fetch_next_request_with_exp_backoff(rq)
next_request = await call_with_exp_backoff(rq.fetch_next_request)
else:
next_request = await rq.fetch_next_request()

Expand Down Expand Up @@ -427,8 +434,12 @@ async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None:
assert final_handled == 3, f'final_handled={final_handled}'


async def test_batch_operations_performance(request_queue_apify: RequestQueue) -> None:
async def test_batch_operations_performance(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test batch operations vs individual operations."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

rq = request_queue_apify
Actor.log.info('Request queue opened')
Expand Down Expand Up @@ -459,12 +470,20 @@ async def test_batch_operations_performance(request_queue_apify: RequestQueue) -
Actor.log.info(f'Processing completed. Total processed: {processed_count}')
assert processed_count == 50, f'processed_count={processed_count}'

is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()

assert is_finished is True, f'is_finished={is_finished}'


async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
async def test_state_consistency(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test queue state consistency during concurrent operations."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

rq = request_queue_apify
Actor.log.info('Request queue opened')
Expand All @@ -482,14 +501,14 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
reclaimed_requests = []

for i in range(5):
request = await rq.fetch_next_request()
if request:
next_request = await rq.fetch_next_request()
if next_request:
if i % 2 == 0: # Process even indices
await rq.mark_request_as_handled(request)
processed_requests.append(request)
await rq.mark_request_as_handled(next_request)
processed_requests.append(next_request)
else: # Reclaim odd indices
await rq.reclaim_request(request)
reclaimed_requests.append(request)
await rq.reclaim_request(next_request)
reclaimed_requests.append(next_request)

Actor.log.info(f'Processed {len(processed_requests)} requests, reclaimed {len(reclaimed_requests)}')

Expand All @@ -514,7 +533,10 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
await rq.mark_request_as_handled(next_request)

Actor.log.info(f'Processed {remaining_count} remaining requests')
is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()
assert is_finished is True, f'is_finished={is_finished}'


Expand Down Expand Up @@ -549,8 +571,12 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue) -> None:
assert metadata.pending_request_count == 0, f'metadata.pending_request_count={metadata.pending_request_count}'


async def test_large_batch_operations(request_queue_apify: RequestQueue) -> None:
async def test_large_batch_operations(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test handling large batches of requests."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

rq = request_queue_apify
Actor.log.info('Request queue opened')
Expand All @@ -571,18 +597,21 @@ async def test_large_batch_operations(request_queue_apify: RequestQueue) -> None
processed_count = 0

while not await rq.is_empty():
request = await rq.fetch_next_request()
next_request = await rq.fetch_next_request()

# The RQ is_empty should ensure we don't get None
assert request is not None, f'request={request}'
assert next_request is not None, f'next_request={next_request}'

await rq.mark_request_as_handled(request)
await rq.mark_request_as_handled(next_request)
processed_count += 1

Actor.log.info(f'Processing completed. Total processed: {processed_count}')
assert processed_count == 500, f'processed_count={processed_count}'

is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()
assert is_finished is True, f'is_finished={is_finished}'


Expand Down Expand Up @@ -993,26 +1022,33 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
assert apify_metadata.stats.write_count == add_request_count


async def test_rq_long_url(request_queue_apify: RequestQueue) -> None:
async def test_rq_long_url(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
"""Test handling of requests with long URLs and extended unique keys."""
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
rq = request_queue_apify
request = Request.from_url(
long_url_request = Request.from_url(
'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1',
use_extended_unique_key=True,
always_enqueue=True,
)

request_id = unique_key_to_request_id(request.unique_key)
request_id = unique_key_to_request_id(long_url_request.unique_key)

processed_request = await rq.add_request(request)
processed_request = await rq.add_request(long_url_request)
assert processed_request.id == request_id

request_obtained = await rq.fetch_next_request()
assert request_obtained is not None

await rq.mark_request_as_handled(request_obtained)

is_finished = await rq.is_finished()
if rq_access_mode == 'shared':
is_finished = await call_with_exp_backoff(rq.is_finished)
else:
is_finished = await rq.is_finished()
assert is_finished


Expand Down Expand Up @@ -1061,18 +1097,24 @@ async def test_force_cloud(

async def test_request_queue_is_finished(
request_queue_apify: RequestQueue,
request: pytest.FixtureRequest,
) -> None:
rq_access_mode = request.node.callspec.params.get('request_queue_apify')

await request_queue_apify.add_request(Request.from_url('http://example.com'))
assert not await request_queue_apify.is_finished()

request = await request_queue_apify.fetch_next_request()
assert request is not None
fetched = await request_queue_apify.fetch_next_request()
assert fetched is not None
assert not await request_queue_apify.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)

await request_queue_apify.mark_request_as_handled(request)
assert await request_queue_apify.is_finished()
await request_queue_apify.mark_request_as_handled(fetched)
if rq_access_mode == 'shared':
assert await call_with_exp_backoff(request_queue_apify.is_finished)
else:
assert await request_queue_apify.is_finished()


async def test_request_queue_deduplication_unprocessed_requests(
Expand Down