Skip to content

ref(metrics): Remove Unused librdkafka Metrics#531

Merged
george-sentry merged 6 commits intomainfrom
george/remove-unused-librdkafka-metrics
Apr 1, 2026
Merged

ref(metrics): Remove Unused librdkafka Metrics#531
george-sentry merged 6 commits intomainfrom
george/remove-unused-librdkafka-metrics

Conversation

@george-sentry
Copy link
Copy Markdown
Member

@george-sentry george-sentry commented Mar 28, 2026

Completes STREAM-840 by removing...

  • producer.librdkafka.message_size
  • producer.librdkafka.message_size_max
  • producer.librdkafka.txmsgs

Completes STREAM-841 by removing...

  • producer.librdkafka.broker_disconnects
  • producer.librdkafka.broker_outbuf_requests
  • producer.librdkafka.broker_outbuf_messages

Completes STREAM-842 by removing...

  • arroyo.consumer.librdkafka.fetch_queue_size

I did not remove...

  • arroyo.consumer.librdkafka.total_queue_size
  • arroyo.consumer.librdkafka.fetch_queue_count

@george-sentry george-sentry requested review from a team as code owners March 28, 2026 20:54
Copy link
Copy Markdown
Member

@volokluev volokluev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not remove the librdkafka fetch_queue_count

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Two consumer metrics not removed as PR intends
    • Removed the remaining total_queue_size and fetch_queue_count consumer metrics from Rust emission and Python configuration/metric definitions to fully align with STREAM-842.

Create PR

Or push these changes by commenting:

@cursor push 4dd01d3ee2
Preview (4dd01d3ee2)
diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py
--- a/arroyo/backends/kafka/configuration.py
+++ b/arroyo/backends/kafka/configuration.py
@@ -43,13 +43,6 @@
     return broker_config
 
 
-def stats_callback(stats_json: str) -> None:
-    stats = json.loads(stats_json)
-    get_metrics().gauge(
-        "arroyo.consumer.librdkafka.total_queue_size", stats.get("replyq", 0)
-    )
-
-
 def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> None:
     stats = json.loads(stats_json)
     metrics = get_metrics()
@@ -224,8 +217,6 @@
         "queued.max.messages.kbytes": queued_max_messages_kbytes,
         "queued.min.messages": queued_min_messages,
         "enable.partition.eof": False,
-        "statistics.interval.ms": STATS_COLLECTION_FREQ_MS,
-        "stats_cb": stats_callback,
     }
 
     broker_config.update(config_update)

diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py
--- a/arroyo/utils/metric_defs.py
+++ b/arroyo/utils/metric_defs.py
@@ -95,8 +95,6 @@
     "arroyo.consumer.resume",
     # Counter: Incremented when the consumer main thread is stuck and not processing messages.
     "arroyo.consumer.stuck",
-    # Gauge: Queue size of background queue that librdkafka uses to prefetch messages.
-    "arroyo.consumer.librdkafka.total_queue_size",
     # Counter: Counter metric to measure how often the healthcheck file has been touched.
     "arroyo.processing.strategies.healthcheck.touch",
     # Counter: Number of messages dropped in the FilterStep strategy

diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs
--- a/rust-arroyo/src/backends/kafka/mod.rs
+++ b/rust-arroyo/src/backends/kafka/mod.rs
@@ -4,7 +4,6 @@
 use super::Consumer as ArroyoConsumer;
 use super::ConsumerError;
 use crate::backends::kafka::types::KafkaPayload;
-use crate::gauge;
 use crate::types::{BrokerMessage, Partition, Topic};
 use chrono::{DateTime, Utc};
 use parking_lot::Mutex;
@@ -168,21 +167,7 @@
         })
     }
 
-    fn stats(&self, stats: Statistics) {
-        gauge!(
-            "arroyo.consumer.librdkafka.total_queue_size",
-            stats.replyq as u64,
-        );
-        for (topic_name, topic) in stats.topics.iter() {
-            for (partition_num, partition) in topic.partitions.iter() {
-                gauge!(
-                    "arroyo.consumer.librdkafka.fetch_queue_count",
-                    partition.fetchq_cnt as u64,
-                    "topic" => topic_name,
-                    "partition" => partition_num.to_string()
-                );
-            }
-        }
+    fn stats(&self, _stats: Statistics) {
     }
 }

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Removed metrics still defined in metric definitions file
    • Removed the six stale librdkafka producer metric entries from arroyo/utils/metric_defs.py so the type definitions and generated metric docs only include emitted metrics.

Create PR

Or push these changes by commenting:

@cursor push cf7aaeff14
Preview (cf7aaeff14)
diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py
--- a/arroyo/utils/metric_defs.py
+++ b/arroyo/utils/metric_defs.py
@@ -137,33 +137,15 @@
     # Gauge: Maximum producer message count from librdkafka statistics
     # Tagged by producer_name
     "arroyo.producer.librdkafka.message_count_max",
-    # Gauge: Producer message size from librdkafka statistics
-    # Tagged by producer_name
-    "arroyo.producer.librdkafka.message_size",
-    # Gauge: Maximum producer message size from librdkafka statistics
-    # Tagged by producer_name
-    "arroyo.producer.librdkafka.message_size_max",
-    # Gauge: Total number of messages transmitted from librdkafka statistics
-    # Tagged by producer_name
-    "arroyo.producer.librdkafka.txmsgs",
     # Gauge: Total number of transmission requests from librdkafka statistics
     # Tagged by broker_id, producer_name
     "arroyo.producer.librdkafka.broker_tx",
     # Gauge: Total number of bytes transmitted from librdkafka statistics
     # Tagged by broker_id, producer_name
     "arroyo.producer.librdkafka.broker_txbytes",
-    # Gauge: Number of requests awaiting transmission to broker from librdkafka statistics
-    # Tagged by broker_id, producer_name
-    "arroyo.producer.librdkafka.broker_outbuf_requests",
-    # Gauge: Number of messages awaiting transmission to broker from librdkafka statistics
-    # Tagged by broker_id, producer_name
-    "arroyo.producer.librdkafka.broker_outbuf_messages",
     # Gauge: Number of connection attempts to broker from librdkafka statistics
     # Tagged by broker_id, producer_name
     "arroyo.producer.librdkafka.broker_connects",
-    # Gauge: Number of disconnections from broker from librdkafka statistics
-    # Tagged by broker_id, producer_name
-    "arroyo.producer.librdkafka.broker_disconnects",
     # Gauge: Total number of transmission errors from librdkafka statistics
     # Tagged by broker_id, producer_name
     "arroyo.producer.librdkafka.broker_txerrs",

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@george-sentry george-sentry requested a review from volokluev March 30, 2026 21:18
@george-sentry george-sentry merged commit e31dd03 into main Apr 1, 2026
17 checks passed
@george-sentry george-sentry deleted the george/remove-unused-librdkafka-metrics branch April 1, 2026 19:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants