ref(metrics): Remove Unused librdkafka Metrics#531
Conversation
volokluev
left a comment
There was a problem hiding this comment.
do not remove the librdkafka fetch_queue_count
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Two consumer metrics not removed as PR intends
- Removed the remaining
total_queue_sizeandfetch_queue_countconsumer metrics from Rust emission and Python configuration/metric definitions to fully align with STREAM-842.
- Removed the remaining
Or push these changes by commenting:
@cursor push 4dd01d3ee2
Preview (4dd01d3ee2)
diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py
--- a/arroyo/backends/kafka/configuration.py
+++ b/arroyo/backends/kafka/configuration.py
@@ -43,13 +43,6 @@
return broker_config
-def stats_callback(stats_json: str) -> None:
- stats = json.loads(stats_json)
- get_metrics().gauge(
- "arroyo.consumer.librdkafka.total_queue_size", stats.get("replyq", 0)
- )
-
-
def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> None:
stats = json.loads(stats_json)
metrics = get_metrics()
@@ -224,8 +217,6 @@
"queued.max.messages.kbytes": queued_max_messages_kbytes,
"queued.min.messages": queued_min_messages,
"enable.partition.eof": False,
- "statistics.interval.ms": STATS_COLLECTION_FREQ_MS,
- "stats_cb": stats_callback,
}
broker_config.update(config_update)
diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py
--- a/arroyo/utils/metric_defs.py
+++ b/arroyo/utils/metric_defs.py
@@ -95,8 +95,6 @@
"arroyo.consumer.resume",
# Counter: Incremented when the consumer main thread is stuck and not processing messages.
"arroyo.consumer.stuck",
- # Gauge: Queue size of background queue that librdkafka uses to prefetch messages.
- "arroyo.consumer.librdkafka.total_queue_size",
# Counter: Counter metric to measure how often the healthcheck file has been touched.
"arroyo.processing.strategies.healthcheck.touch",
# Counter: Number of messages dropped in the FilterStep strategy
diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs
--- a/rust-arroyo/src/backends/kafka/mod.rs
+++ b/rust-arroyo/src/backends/kafka/mod.rs
@@ -4,7 +4,6 @@
use super::Consumer as ArroyoConsumer;
use super::ConsumerError;
use crate::backends::kafka::types::KafkaPayload;
-use crate::gauge;
use crate::types::{BrokerMessage, Partition, Topic};
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
@@ -168,21 +167,7 @@
})
}
- fn stats(&self, stats: Statistics) {
- gauge!(
- "arroyo.consumer.librdkafka.total_queue_size",
- stats.replyq as u64,
- );
- for (topic_name, topic) in stats.topics.iter() {
- for (partition_num, partition) in topic.partitions.iter() {
- gauge!(
- "arroyo.consumer.librdkafka.fetch_queue_count",
- partition.fetchq_cnt as u64,
- "topic" => topic_name,
- "partition" => partition_num.to_string()
- );
- }
- }
+ fn stats(&self, _stats: Statistics) {
}
}This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Removed metrics still defined in metric definitions file
- Removed the six stale librdkafka producer metric entries from
arroyo/utils/metric_defs.pyso the type definitions and generated metric docs only include emitted metrics.
- Removed the six stale librdkafka producer metric entries from
Or push these changes by commenting:
@cursor push cf7aaeff14
Preview (cf7aaeff14)
diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py
--- a/arroyo/utils/metric_defs.py
+++ b/arroyo/utils/metric_defs.py
@@ -137,33 +137,15 @@
# Gauge: Maximum producer message count from librdkafka statistics
# Tagged by producer_name
"arroyo.producer.librdkafka.message_count_max",
- # Gauge: Producer message size from librdkafka statistics
- # Tagged by producer_name
- "arroyo.producer.librdkafka.message_size",
- # Gauge: Maximum producer message size from librdkafka statistics
- # Tagged by producer_name
- "arroyo.producer.librdkafka.message_size_max",
- # Gauge: Total number of messages transmitted from librdkafka statistics
- # Tagged by producer_name
- "arroyo.producer.librdkafka.txmsgs",
# Gauge: Total number of transmission requests from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_tx",
# Gauge: Total number of bytes transmitted from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_txbytes",
- # Gauge: Number of requests awaiting transmission to broker from librdkafka statistics
- # Tagged by broker_id, producer_name
- "arroyo.producer.librdkafka.broker_outbuf_requests",
- # Gauge: Number of messages awaiting transmission to broker from librdkafka statistics
- # Tagged by broker_id, producer_name
- "arroyo.producer.librdkafka.broker_outbuf_messages",
# Gauge: Number of connection attempts to broker from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_connects",
- # Gauge: Number of disconnections from broker from librdkafka statistics
- # Tagged by broker_id, producer_name
- "arroyo.producer.librdkafka.broker_disconnects",
# Gauge: Total number of transmission errors from librdkafka statistics
# Tagged by broker_id, producer_name
"arroyo.producer.librdkafka.broker_txerrs",This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.


Completes STREAM-840 by removing...
producer.librdkafka.message_sizeproducer.librdkafka.message_size_maxproducer.librdkafka.txmsgsCompletes STREAM-841 by removing...
producer.librdkafka.broker_disconnectsproducer.librdkafka.broker_outbuf_requestsproducer.librdkafka.broker_outbuf_messagesCompletes STREAM-842 by removing...
arroyo.consumer.librdkafka.fetch_queue_sizeI did not remove...
arroyo.consumer.librdkafka.total_queue_sizearroyo.consumer.librdkafka.fetch_queue_count