diff --git a/scripts/consumer_performance_benchmark.py b/scripts/consumer_performance_benchmark.py new file mode 100644 index 0000000000..e8379ec724 --- /dev/null +++ b/scripts/consumer_performance_benchmark.py @@ -0,0 +1,21 @@ +from concurrent.futures import ProcessPoolExecutor +from logging import getLogger + +from snuba.cli.rust_consumer import rust_consumer_impl + +NUM_CONSUMERS = 4 + +logger = getLogger(__name__) + +if __name__ == "__main__": + with ProcessPoolExecutor(max_workers=NUM_CONSUMERS) as executor: + for i in range(NUM_CONSUMERS): + logger.info(f"starting consumer {i}") + executor.submit( + rust_consumer_impl, + ("eap_items",), + "eap_items_group", + no_strict_offset_reset=True, + auto_offset_reset="latest", + enforce_schema=True, + ) diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index e58f684715..4ecde1bb70 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -1,7 +1,7 @@ import json import sys from dataclasses import asdict -from typing import Optional, Sequence +from typing import Sequence import click @@ -26,7 +26,6 @@ ) @click.option( "--auto-offset-reset", - default="earliest", type=click.Choice(["error", "earliest", "latest"]), help="Kafka consumer auto offset reset.", ) @@ -37,13 +36,11 @@ ) @click.option( "--queued-max-messages-kbytes", - default=settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES, type=int, help="Maximum number of kilobytes per topic+partition in the local consumer queue.", ) @click.option( "--queued-min-messages", - default=settings.DEFAULT_QUEUED_MIN_MESSAGES, type=int, help="Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.", ) @@ -76,13 +73,11 @@ ) @click.option( "--max-batch-size", - default=settings.DEFAULT_MAX_BATCH_SIZE, type=int, help="Max number of messages to batch in memory before writing to Kafka.", ) @click.option( "--max-batch-time-ms", - default=settings.DEFAULT_MAX_BATCH_TIME_MS, type=int, help="Max length of time to buffer messages in memory before writing to Kafka.", ) @@ -91,7 +86,6 @@ "log_level", type=click.Choice(["error", "warn", "info", "debug", "trace"], False), help="Logging level to use.", - default="info", ) @click.option( "--concurrency", @@ -107,40 +101,33 @@ "use_rust_processor", is_flag=True, help="Use the Rust (if available) or Python message processor", - default=True, ) @click.option( "--group-instance-id", type=str, - default=None, help="Kafka group instance id. passing a value here will run kafka with static membership.", ) @click.option( "--python-max-queue-depth", type=int, - default=None, help="How many messages should be queued up in the Python message processor before backpressure kicks in. Defaults to the number of processes.", ) @click.option( "--max-poll-interval-ms", type=int, - default=30000, ) @click.option( "--async-inserts", is_flag=True, - default=False, help="Enable async inserts for ClickHouse", ) @click.option( "--max-dlq-buffer-length", type=int, - default=25000, help="Set a per-partition limit to the length of the DLQ buffer", ) @click.option( "--health-check-file", - default=None, type=str, help="Arroyo will touch this file at intervals to indicate health. If not provided, no health check is performed.", ) @@ -148,7 +135,6 @@ "--enforce-schema", type=bool, is_flag=True, - default=False, help="Enforce schema on the raw events topic.", ) @click.option( @@ -159,36 +145,30 @@ @click.option( "--batch-write-timeout-ms", type=int, - default=None, help="Optional timeout for batch writer client connecting and sending request to Clickhouse", ) @click.option( "--quantized-rebalance-consumer-group-delay-secs", type=int, - default=None, help="Quantized rebalancing means that during deploys, rebalancing is triggered across all pods within a consumer group at the same time. The value is used by the pods to align their group join/leave activity to some multiple of the delay", ) @click.option( "--join-timeout-ms", type=int, - default=1000, help="number of milliseconds to wait for the current batch to be flushed by the consumer in case of rebalance", ) @click.option( "--health-check", - default="arroyo", type=click.Choice(["snuba", "arroyo"]), help="Specify which health check to use for the consumer. If not specified, the default Arroyo health check is used.", ) @click.option( "--use-row-binary", is_flag=True, - default=False, help="Use RowBinary format for ClickHouse inserts instead of JSONEachRow. Currently only supported for EAPItemsProcessor.", ) @click.option( "--consumer-version", - default="v2", type=click.Choice(["v1", "v2"]), help="DEPRECATED: value is ignored.", ) @@ -200,32 +180,103 @@ def rust_consumer( no_strict_offset_reset: bool, queued_max_messages_kbytes: int, queued_min_messages: int, - raw_events_topic: Optional[str], - commit_log_topic: Optional[str], - replacements_topic: Optional[str], + raw_events_topic: str | None, + commit_log_topic: str | None, + replacements_topic: str | None, bootstrap_servers: Sequence[str], commit_log_bootstrap_servers: Sequence[str], replacement_bootstrap_servers: Sequence[str], max_batch_size: int, max_batch_time_ms: int, log_level: str, - concurrency: Optional[int], - clickhouse_concurrency: Optional[int], + concurrency: int | None, + clickhouse_concurrency: int | None, use_rust_processor: bool, - group_instance_id: Optional[str], + group_instance_id: str | None, max_poll_interval_ms: int, async_inserts: bool, health_check: str, - python_max_queue_depth: Optional[int], - health_check_file: Optional[str], + python_max_queue_depth: int | None, + health_check_file: str | None, enforce_schema: bool, - stop_at_timestamp: Optional[int], - batch_write_timeout_ms: Optional[int], - max_dlq_buffer_length: Optional[int], - quantized_rebalance_consumer_group_delay_secs: Optional[int], - join_timeout_ms: Optional[int], + stop_at_timestamp: int | None, + batch_write_timeout_ms: int | None, + max_dlq_buffer_length: int | None, + quantized_rebalance_consumer_group_delay_secs: int | None, + join_timeout_ms: int | None, use_row_binary: bool, - consumer_version: Optional[str], + consumer_version: str | None, +) -> None: + """ + Experimental alternative to `snuba consumer` + """ + rust_consumer_impl( + storage_names=storage_names, + consumer_group=consumer_group, + auto_offset_reset=auto_offset_reset, + no_strict_offset_reset=no_strict_offset_reset, + queued_max_messages_kbytes=queued_max_messages_kbytes, + queued_min_messages=queued_min_messages, + raw_events_topic=raw_events_topic, + commit_log_topic=commit_log_topic, + replacements_topic=replacements_topic, + bootstrap_servers=bootstrap_servers, + commit_log_bootstrap_servers=commit_log_bootstrap_servers, + replacement_bootstrap_servers=replacement_bootstrap_servers, + max_batch_size=max_batch_size, + max_batch_time_ms=max_batch_time_ms, + log_level=log_level, + concurrency=concurrency, + clickhouse_concurrency=clickhouse_concurrency, + use_rust_processor=use_rust_processor, + group_instance_id=group_instance_id, + max_poll_interval_ms=max_poll_interval_ms, + async_inserts=async_inserts, + health_check=health_check, + python_max_queue_depth=python_max_queue_depth, + health_check_file=health_check_file, + enforce_schema=enforce_schema, + stop_at_timestamp=stop_at_timestamp, + batch_write_timeout_ms=batch_write_timeout_ms, + max_dlq_buffer_length=max_dlq_buffer_length, + quantized_rebalance_consumer_group_delay_secs=quantized_rebalance_consumer_group_delay_secs, + join_timeout_ms=join_timeout_ms, + use_row_binary=use_row_binary, + ) + + +def rust_consumer_impl( + storage_names: Sequence[str], + consumer_group: str, + auto_offset_reset: str = "earliest", + no_strict_offset_reset: bool = False, + queued_max_messages_kbytes: int = settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES, + queued_min_messages: int = settings.DEFAULT_QUEUED_MIN_MESSAGES, + raw_events_topic: str | None = None, + commit_log_topic: str | None = None, + replacements_topic: str | None = None, + bootstrap_servers: Sequence[str] = (), + commit_log_bootstrap_servers: Sequence[str] = (), + replacement_bootstrap_servers: Sequence[str] = (), + max_batch_size: int = settings.DEFAULT_MAX_BATCH_SIZE, + max_batch_time_ms: int = settings.DEFAULT_MAX_BATCH_TIME_MS, + log_level: str = "info", + concurrency: int | None = None, + clickhouse_concurrency: int | None = None, + use_rust_processor: bool | None = True, + group_instance_id: str | None = None, + max_poll_interval_ms: int | None = 30000, + async_inserts: bool | None = False, + health_check: str = "arroyo", + python_max_queue_depth: int | None = None, + health_check_file: str | None = None, + enforce_schema: bool = False, + stop_at_timestamp: int | None = None, + batch_write_timeout_ms: int | None = None, + max_dlq_buffer_length: int | None = 25000, + quantized_rebalance_consumer_group_delay_secs: int | None = None, + join_timeout_ms: int | None = 1000, + use_row_binary: bool = False, ) -> None: """ Experimental alternative to `snuba consumer`