Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url", "experimental_async_runtime"]
logs = ["opentelemetry/logs"]
metrics = ["opentelemetry/metrics"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "tokio/sync"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "tokio/macros", "tokio/rt-multi-thread"]
experimental_async_runtime = []
rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
internal-logs = ["opentelemetry/internal-logs"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"]
spec_unstable_metrics_views = ["metrics"]
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@
//! metrics aggregation can be added via the following flags:
//!
//! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime.
//! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s runtime. Automatically detects the runtime
//! flavor (multi-threaded or current-thread) and uses the appropriate spawning strategy to avoid
//! deadlocks.
//!
//! [tokio]: https://crates.io/crates/tokio
#![warn(
Expand Down
99 changes: 44 additions & 55 deletions opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ use std::{
};
use std::{
sync::atomic::{AtomicUsize, Ordering},
sync::Mutex,
time::Duration,
};

use super::{BatchConfig, LogProcessor};
#[cfg(feature = "experimental_async_runtime")]
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
use crate::runtime::{to_interval_stream, JoinError, JoinHandle, RuntimeChannel, TrySend};
use futures_channel::oneshot;
use futures_util::{
future::{self, Either},
Expand All @@ -33,7 +34,8 @@ enum BatchMessage {
/// pre configured interval or a call to `force_push` function.
Flush(Option<oneshot::Sender<OTelSdkResult>>),
/// Shut down the worker thread, push all logs in buffer to the backend.
Shutdown(oneshot::Sender<OTelSdkResult>),
/// The result is returned through the worker's join handle.
Shutdown,
/// Set the resource for the exporter.
SetResource(Arc<Resource>),
}
Expand All @@ -43,6 +45,9 @@ enum BatchMessage {
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

/// Handle to the background worker task. Used to join on shutdown.
worker_handle: Arc<Mutex<Option<R::SpawnHandle<OTelSdkResult>>>>,

// Track dropped logs - we'll log this at shutdown
dropped_logs_count: AtomicUsize,

Expand Down Expand Up @@ -98,14 +103,30 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}
let (res_sender, res_receiver) = oneshot::channel();

self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
.try_send(BatchMessage::Shutdown)
.map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))?;

futures_executor::block_on(res_receiver)
.map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))
.and_then(std::convert::identity)
// Take the worker handle and join it to get the shutdown result
let handle = self
.worker_handle
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Lock poisoned: {e}")))?
.take();

match handle {
Some(h) => h.join().map_err(|e| match e {
JoinError::Panic(_) => {
OTelSdkError::InternalFailure("Worker task panicked".to_string())
}
#[cfg(feature = "rt-tokio")]
JoinError::Cancelled => {
OTelSdkError::InternalFailure("Worker task was cancelled".to_string())
}
})?,
None => Ok(()), // Already shut down
}
}

fn set_resource(&mut self, resource: &Resource) {
Expand All @@ -124,9 +145,10 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let inner_runtime = runtime.clone();
let max_queue_size = config.max_queue_size;

// Spawn worker process via user-defined spawn function.
runtime.spawn(async move {
let worker_handle = runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
Expand Down Expand Up @@ -179,7 +201,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
BatchMessage::Shutdown(ch) => {
BatchMessage::Shutdown => {
let result = export_with_timeout(
config.max_export_timeout,
&mut exporter,
Expand All @@ -189,27 +211,23 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
.await;

let _ = exporter.shutdown(); //TODO - handle error

if let Err(send_error) = ch.send(result) {
otel_debug!(
name: "BatchLogProcessor.Shutdown.SendResultError",
error = format!("{:?}", send_error),
);
}
break;
return result;
}
// propagate the resource
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
}
}
}
Ok(()) // Channel closed without explicit shutdown
});

// Return batch processor with link to worker
BatchLogProcessor {
message_sender,
worker_handle: Arc::new(Mutex::new(Some(worker_handle))),
dropped_logs_count: AtomicUsize::new(0),
max_queue_size: config.max_queue_size,
max_queue_size,
}
}

Expand Down Expand Up @@ -551,40 +569,17 @@ mod tests {
#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

processor.shutdown().unwrap();
}

#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread(
) {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);

//
// deadlock happens in shutdown with tokio current_thread runtime
//
processor.shutdown().unwrap();
}

#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
processor.shutdown().unwrap();
}

Expand Down Expand Up @@ -827,11 +822,8 @@ mod tests {
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
{
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);

processor.shutdown().unwrap();
}
Expand All @@ -848,11 +840,8 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);

processor.shutdown().unwrap();
}
Expand Down
Loading
Loading