diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index 65338ab3..c7fac93d 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -66,7 +66,7 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No int_latency = broker_stats.get("int_latency", {}) if int_latency: p99_latency_ms = int_latency.get("p99", 0) / 1000.0 - metrics.timing( + metrics.gauge( "arroyo.producer.librdkafka.p99_int_latency", p99_latency_ms, tags=broker_tags, @@ -75,7 +75,7 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No outbuf_latency = broker_stats.get("outbuf_latency", {}) if outbuf_latency: p99_latency_ms = outbuf_latency.get("p99", 0) / 1000.0 - metrics.timing( + metrics.gauge( "arroyo.producer.librdkafka.p99_outbuf_latency", p99_latency_ms, tags=broker_tags, @@ -84,7 +84,7 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No rtt = broker_stats.get("rtt", {}) if rtt: p99_rtt_ms = rtt.get("p99", 0) / 1000.0 - metrics.timing( + metrics.gauge( "arroyo.producer.librdkafka.p99_rtt", p99_rtt_ms, tags=broker_tags, diff --git a/rust-arroyo/src/backends/kafka/producer.rs b/rust-arroyo/src/backends/kafka/producer.rs index 8bb45cfc..2ddaf680 100644 --- a/rust-arroyo/src/backends/kafka/producer.rs +++ b/rust-arroyo/src/backends/kafka/producer.rs @@ -7,7 +7,6 @@ use crate::backends::{ }; use crate::counter; use crate::gauge; -use crate::timer; use crate::types::TopicOrPartition; use rdkafka::client::ClientContext; use rdkafka::config::ClientConfig; @@ -16,7 +15,6 @@ use rdkafka::producer::{ DeliveryResult, FutureProducer, ProducerContext as RdkafkaProducerContext, ThreadedProducer, }; use rdkafka::Statistics; -use std::time::Duration; pub struct ProducerContext { producer_name: String, @@ -42,9 +40,9 @@ impl ClientContext for ProducerContext { // Record broker latency metrics if let Some(int_latency) = &broker_stats.int_latency { let p99_latency_ms = int_latency.p99 as f64 / 1000.0; - timer!( + gauge!( "arroyo.producer.librdkafka.p99_int_latency", - Duration::from_millis(p99_latency_ms as u64), + p99_latency_ms as u64, "broker_id" => broker_id_str.clone(), "producer_name" => producer_name ); @@ -52,9 +50,9 @@ impl ClientContext for ProducerContext { if let Some(outbuf_latency) = &broker_stats.outbuf_latency { let p99_latency_ms = outbuf_latency.p99 as f64 / 1000.0; - timer!( + gauge!( "arroyo.producer.librdkafka.p99_outbuf_latency", - Duration::from_millis(p99_latency_ms as u64), + p99_latency_ms as u64, "broker_id" => broker_id_str.clone(), "producer_name" => producer_name ); @@ -62,9 +60,9 @@ impl ClientContext for ProducerContext { if let Some(rtt) = &broker_stats.rtt { let p99_rtt_ms = rtt.p99 as f64 / 1000.0; - timer!( + gauge!( "arroyo.producer.librdkafka.p99_rtt", - Duration::from_millis(p99_rtt_ms as u64), + p99_rtt_ms as u64, "broker_id" => broker_id_str.clone(), "producer_name" => producer_name ); diff --git a/tests/backends/test_kafka_producer.py b/tests/backends/test_kafka_producer.py index b0a6920d..a93affef 100644 --- a/tests/backends/test_kafka_producer.py +++ b/tests/backends/test_kafka_producer.py @@ -24,13 +24,13 @@ def test_producer_stats_callback_with_both_latencies( producer_stats_callback(stats_json, None) - assert mock_metrics.timing.call_count == 2 - mock_metrics.timing.assert_any_call( + assert mock_metrics.gauge.call_count == 2 + mock_metrics.gauge.assert_any_call( "arroyo.producer.librdkafka.p99_int_latency", 2.0, tags={"broker_id": "1", "producer_name": "unknown"}, ) - mock_metrics.timing.assert_any_call( + mock_metrics.gauge.assert_any_call( "arroyo.producer.librdkafka.p99_outbuf_latency", 4.0, tags={"broker_id": "1", "producer_name": "unknown"}, @@ -46,7 +46,7 @@ def test_producer_stats_callback_no_brokers(mock_get_metrics: mock.Mock) -> None producer_stats_callback(stats_json, None) - mock_metrics.timing.assert_not_called() + mock_metrics.gauge.assert_not_called() @mock.patch("arroyo.backends.kafka.configuration.get_metrics") @@ -60,7 +60,7 @@ def test_producer_stats_callback_empty_broker_stats( producer_stats_callback(stats_json, None) - mock_metrics.timing.assert_not_called() + mock_metrics.gauge.assert_not_called() @mock.patch("arroyo.backends.kafka.configuration.get_metrics") @@ -82,18 +82,18 @@ def test_producer_stats_callback_with_all_metrics(mock_get_metrics: mock.Mock) - producer_stats_callback(stats_json, None) - assert mock_metrics.timing.call_count == 3 - mock_metrics.timing.assert_any_call( + assert mock_metrics.gauge.call_count == 3 + mock_metrics.gauge.assert_any_call( "arroyo.producer.librdkafka.p99_int_latency", 2.0, tags={"broker_id": "1", "producer_name": "unknown"}, ) - mock_metrics.timing.assert_any_call( + mock_metrics.gauge.assert_any_call( "arroyo.producer.librdkafka.p99_outbuf_latency", 4.0, tags={"broker_id": "1", "producer_name": "unknown"}, ) - mock_metrics.timing.assert_any_call( + mock_metrics.gauge.assert_any_call( "arroyo.producer.librdkafka.p99_rtt", 1.5, tags={"broker_id": "1", "producer_name": "unknown"},