Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions arroyo/backends/kafka/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No
int_latency = broker_stats.get("int_latency", {})
if int_latency:
p99_latency_ms = int_latency.get("p99", 0) / 1000.0
metrics.timing(
metrics.gauge(
"arroyo.producer.librdkafka.p99_int_latency",
p99_latency_ms,
tags=broker_tags,
Expand All @@ -75,7 +75,7 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No
outbuf_latency = broker_stats.get("outbuf_latency", {})
if outbuf_latency:
p99_latency_ms = outbuf_latency.get("p99", 0) / 1000.0
metrics.timing(
metrics.gauge(
"arroyo.producer.librdkafka.p99_outbuf_latency",
p99_latency_ms,
tags=broker_tags,
Expand All @@ -84,7 +84,7 @@ def producer_stats_callback(stats_json: str, producer_name: Optional[str]) -> No
rtt = broker_stats.get("rtt", {})
if rtt:
p99_rtt_ms = rtt.get("p99", 0) / 1000.0
metrics.timing(
metrics.gauge(
"arroyo.producer.librdkafka.p99_rtt",
p99_rtt_ms,
tags=broker_tags,
Expand Down
14 changes: 6 additions & 8 deletions rust-arroyo/src/backends/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::backends::{
};
use crate::counter;
use crate::gauge;
use crate::timer;
use crate::types::TopicOrPartition;
use rdkafka::client::ClientContext;
use rdkafka::config::ClientConfig;
Expand All @@ -16,7 +15,6 @@ use rdkafka::producer::{
DeliveryResult, FutureProducer, ProducerContext as RdkafkaProducerContext, ThreadedProducer,
};
use rdkafka::Statistics;
use std::time::Duration;

pub struct ProducerContext {
producer_name: String,
Expand All @@ -42,29 +40,29 @@ impl ClientContext for ProducerContext {
// Record broker latency metrics
if let Some(int_latency) = &broker_stats.int_latency {
let p99_latency_ms = int_latency.p99 as f64 / 1000.0;
timer!(
gauge!(
"arroyo.producer.librdkafka.p99_int_latency",
Duration::from_millis(p99_latency_ms as u64),
p99_latency_ms as u64,
"broker_id" => broker_id_str.clone(),
"producer_name" => producer_name
);
}

if let Some(outbuf_latency) = &broker_stats.outbuf_latency {
let p99_latency_ms = outbuf_latency.p99 as f64 / 1000.0;
timer!(
gauge!(
"arroyo.producer.librdkafka.p99_outbuf_latency",
Duration::from_millis(p99_latency_ms as u64),
p99_latency_ms as u64,
"broker_id" => broker_id_str.clone(),
"producer_name" => producer_name
);
}

if let Some(rtt) = &broker_stats.rtt {
let p99_rtt_ms = rtt.p99 as f64 / 1000.0;
timer!(
gauge!(
"arroyo.producer.librdkafka.p99_rtt",
Duration::from_millis(p99_rtt_ms as u64),
p99_rtt_ms as u64,
"broker_id" => broker_id_str.clone(),
"producer_name" => producer_name
);
Expand Down
18 changes: 9 additions & 9 deletions tests/backends/test_kafka_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ def test_producer_stats_callback_with_both_latencies(

producer_stats_callback(stats_json, None)

assert mock_metrics.timing.call_count == 2
mock_metrics.timing.assert_any_call(
assert mock_metrics.gauge.call_count == 2
mock_metrics.gauge.assert_any_call(
"arroyo.producer.librdkafka.p99_int_latency",
2.0,
tags={"broker_id": "1", "producer_name": "unknown"},
)
mock_metrics.timing.assert_any_call(
mock_metrics.gauge.assert_any_call(
"arroyo.producer.librdkafka.p99_outbuf_latency",
4.0,
tags={"broker_id": "1", "producer_name": "unknown"},
Expand All @@ -46,7 +46,7 @@ def test_producer_stats_callback_no_brokers(mock_get_metrics: mock.Mock) -> None

producer_stats_callback(stats_json, None)

mock_metrics.timing.assert_not_called()
mock_metrics.gauge.assert_not_called()


@mock.patch("arroyo.backends.kafka.configuration.get_metrics")
Expand All @@ -60,7 +60,7 @@ def test_producer_stats_callback_empty_broker_stats(

producer_stats_callback(stats_json, None)

mock_metrics.timing.assert_not_called()
mock_metrics.gauge.assert_not_called()


@mock.patch("arroyo.backends.kafka.configuration.get_metrics")
Expand All @@ -82,18 +82,18 @@ def test_producer_stats_callback_with_all_metrics(mock_get_metrics: mock.Mock) -

producer_stats_callback(stats_json, None)

assert mock_metrics.timing.call_count == 3
mock_metrics.timing.assert_any_call(
assert mock_metrics.gauge.call_count == 3
mock_metrics.gauge.assert_any_call(
"arroyo.producer.librdkafka.p99_int_latency",
2.0,
tags={"broker_id": "1", "producer_name": "unknown"},
)
mock_metrics.timing.assert_any_call(
mock_metrics.gauge.assert_any_call(
"arroyo.producer.librdkafka.p99_outbuf_latency",
4.0,
tags={"broker_id": "1", "producer_name": "unknown"},
)
mock_metrics.timing.assert_any_call(
mock_metrics.gauge.assert_any_call(
"arroyo.producer.librdkafka.p99_rtt",
1.5,
tags={"broker_id": "1", "producer_name": "unknown"},
Expand Down
Loading