Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5778340
init commit for trace-agent poc
ajgajg1134 Apr 28, 2026
9df644d
fmt and clippy
ajgajg1134 Apr 28, 2026
c39b138
fix weird constant
ajgajg1134 Apr 28, 2026
cee140b
connect to blackhole for testing
ajgajg1134 Apr 30, 2026
c2f2a91
add v1 sampler and onboarding components
ajgajg1134 Apr 30, 2026
49c7156
add more samplers and return sampling rates
ajgajg1134 May 1, 2026
799ae2e
add stats and obfuscation
ajgajg1134 May 1, 2026
d1fae5f
fixups from review and encoder
ajgajg1134 May 1, 2026
d976db3
idx tracer payload
ajgajg1134 May 1, 2026
d1ca26a
migrate OTLP trace encoder to unified Trace fields (step 8)
ajgajg1134 May 11, 2026
75aad20
unify Event::Trace type across APM and OTLP paths (steps 1-7, 9)
ajgajg1134 May 11, 2026
1925ab8
delete v1_trace_obfuscation and route APM pipeline through shared tra…
ajgajg1134 May 11, 2026
ddcd59b
complete step 10: topology wiring, v1 types source-local, remove reso…
ajgajg1134 May 11, 2026
61b5c76
add Span.attributes and migrate from meta/metrics/meta_struct
ajgajg1134 May 11, 2026
801d5a6
merge v1_trace_sampler into TraceSamplerConfiguration, remove TraceSa…
ajgajg1134 May 11, 2026
4cfe163
post-migration corrections: SpanLink attributes typed, remove Span.tr…
ajgajg1134 May 12, 2026
26dd376
widen AttributeValue, consolidate to idx encoder, fix ottl resource a…
ajgajg1134 May 12, 2026
2d10788
Fix rates response
ajgajg1134 May 12, 2026
2996c6c
fix span kind and top_level
ajgajg1134 May 12, 2026
afcbbec
Add todo for 429s
ajgajg1134 May 12, 2026
477bc39
todo for client dropped p0 weights
ajgajg1134 May 12, 2026
a5eff09
must use streaming strings for decoding msgpack
ajgajg1134 May 12, 2026
38556f9
unify the priority samplers
ajgajg1134 May 12, 2026
c4551c0
just use the raresampler the v1 implementation was wrong
ajgajg1134 May 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ tracing-appender = { version = "0.2", default-features = false }
base64 = { version = "0.22.1", default-features = false }
treediff = { version = "5", default-features = false }
argh = { version = "0.1", default-features = false }
rmp = { version = "0.8" }
rmp-serde = { version = "1.3", default-features = false }
serde_bytes = { version = "0.11.19", default-features = false }
num-traits = { version = "0.2", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions bin/agent-data-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fips = ["saluki-app/tls-fips"]
[dependencies]
argh = { workspace = true, features = ["help"] }
async-trait = { workspace = true }
axum = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
colored = { workspace = true }
Expand All @@ -30,6 +31,7 @@ memory-accounting = { workspace = true }
metrics = { workspace = true }
ottl = { workspace = true }
papaya = { workspace = true }
rmp = { workspace = true }
prometheus-exposition = { workspace = true }
prost-types = { workspace = true }
saluki-api = { workspace = true }
Expand Down
134 changes: 108 additions & 26 deletions bin/agent-data-plane/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use saluki_components::{
encoders::{
BufferedIncrementalConfiguration, DatadogApmStatsEncoderConfiguration, DatadogEventsConfiguration,
DatadogLogsConfiguration, DatadogMetricsConfiguration, DatadogServiceChecksConfiguration,
DatadogTraceConfiguration,
V1DatadogTraceConfiguration,
},
forwarders::{DatadogConfiguration, OtlpForwarderConfiguration},
relays::otlp::OtlpRelayConfiguration,
sources::{DogStatsDConfiguration, OtlpConfiguration},
sources::{apm::sampling_rates::V1SamplingRatesHandle, ApmReceiverConfiguration, DogStatsDConfiguration, OtlpConfiguration},
transforms::{
AggregateConfiguration, ApmStatsTransformConfiguration, ChainedConfiguration, DogStatsDMapperConfiguration,
DogStatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, HostTagsConfiguration,
Expand All @@ -41,6 +41,7 @@ use crate::{
components::{
apm_onboarding::ApmOnboardingConfiguration, ottl_filter_processor::OttlFilterConfiguration,
ottl_transform_processor::OttlTransformConfiguration, tag_filterlist::TagFilterlistConfiguration,
v1_apm_onboarding::V1ApmOnboardingConfiguration,
},
internal::{create_internal_supervisor, remote_agent::RemoteAgentBootstrap},
};
Expand Down Expand Up @@ -135,25 +136,14 @@ pub async fn handle_run_command(

let dsd_stats_config = DogStatsDStatisticsConfiguration::new();

// Create our primary data topology and spawn any internal processes, which will ensure all relevant components are
// registered and accounted for in terms of memory usage.
let blueprint = create_topology(
&config,
&dp_config,
&env_provider,
&component_registry,
dsd_stats_config.clone(),
)
.await?;

// Create the internal supervisor (control plane + observability)
let mut internal_supervisor = create_internal_supervisor(
&config,
&dp_config,
&component_registry,
health_registry.clone(),
env_provider,
dsd_stats_config,
env_provider.clone(),
dsd_stats_config.clone(),
ra_bootstrap,
)
.await
Expand All @@ -178,19 +168,31 @@ pub async fn handle_run_command(
}
}

// Bounds validation succeeded, so now we'll build and spawn the topology.
let built_topology = blueprint.build().await?;
let mut running_topology = built_topology.spawn(&health_registry, memory_limiter).await?;
// Build and spawn the topology only when at least one data pipeline needs it.
// Some pipelines (e.g. the APM receiver) run entirely as control-plane workers and
// produce no topology components, so there is nothing to build or wait on.
let mut running_topology = if dp_config.topology_required() {
let blueprint = create_topology(
&config,
&dp_config,
&env_provider,
&component_registry,
dsd_stats_config,
)
.await?;

let built_topology = blueprint.build().await?;
Some(built_topology.spawn(&health_registry, memory_limiter).await?)
} else {
None
};

let startup_time = started.elapsed();

// Emit the startup metrics for the application.
emit_startup_metrics();

info!(
init_time_ms = startup_time.as_millis(),
"Topology running. Waiting for interrupt..."
);
info!(init_time_ms = startup_time.as_millis(), "Waiting for interrupt...");

// Wait for all components to become ready.
tokio::spawn(async move {
Expand Down Expand Up @@ -239,7 +241,12 @@ pub async fn handle_run_command(
}
}
}
_ = running_topology.wait_for_unexpected_finish() => {
_ = async {
match running_topology.as_mut() {
Some(t) => t.wait_for_unexpected_finish().await,
None => std::future::pending().await,
}
} => {
error!("Topology component unexpectedly finished. Shutting down...");
topology_failed = true;
},
Expand All @@ -248,8 +255,11 @@ pub async fn handle_run_command(
}
}

// Shutdown the primary topology
let topology_result = running_topology.shutdown_with_timeout(Duration::from_secs(30)).await;
// Shutdown the primary topology if one was running.
let topology_result = match running_topology {
Some(t) => t.shutdown_with_timeout(Duration::from_secs(30)).await,
None => Ok(()),
};

// Signal the internal supervisor to shutdown (if still running) and drive it to completion.
// If the supervisor already exited (i.e., the select! above matched its branch), both the send
Expand Down Expand Up @@ -302,6 +312,7 @@ async fn create_topology(
if dp_config.metrics_pipeline_required()
|| dp_config.logs_pipeline_required()
|| dp_config.traces_pipeline_required()
|| dp_config.apm_pipeline_required()
{
let dd_forwarder_config =
DatadogConfiguration::from_configuration(config).error_context("Failed to configure Datadog forwarder.")?;
Expand Down Expand Up @@ -329,9 +340,80 @@ async fn create_topology(
add_otlp_pipeline_to_blueprint(&mut blueprint, config, dp_config, env_provider)?;
}

if dp_config.apm_pipeline_required() {
add_apm_pipeline_to_blueprint(&mut blueprint, config, dp_config, env_provider).await?;
}

Ok(blueprint)
}

async fn add_apm_pipeline_to_blueprint(
blueprint: &mut TopologyBlueprint, config: &GenericConfiguration, dp_config: &DataPlaneConfiguration,
env_provider: &ADPEnvironmentProvider,
) -> Result<(), GenericError> {
let sampling_rates = V1SamplingRatesHandle::new();

let apm_receiver_config = ApmReceiverConfiguration::from_configuration(config)
.error_context("Failed to configure APM receiver.")?
.with_sampling_rates(sampling_rates.clone());

let v1_trace_obfuscation_config = TraceObfuscationConfiguration::from_apm_configuration(config)
.error_context("Failed to configure trace obfuscation.")?;

let v1_trace_sampler_config = TraceSamplerConfiguration::from_configuration(config)
.error_context("Failed to configure V1 trace sampler.")?
.with_sampling_rates(sampling_rates.clone());

let v1_traces_enrich_config = ChainedConfiguration::default()
.with_transform_builder("v1_apm_onboarding", V1ApmOnboardingConfiguration)
.with_transform_builder("trace_obfuscation", v1_trace_obfuscation_config)
.with_transform_builder("v1_trace_sampler", v1_trace_sampler_config);

let v1_dd_traces_config = V1DatadogTraceConfiguration::from_configuration(config)
.error_context("Failed to configure V1 Datadog Traces encoder.")?
.with_environment_provider(env_provider.clone())
.await?;

let apm_stats_config = ApmStatsTransformConfiguration::from_configuration(config)
.error_context("Failed to configure APM stats transform.")?
.with_environment_provider(env_provider.clone())
.await?;

blueprint
.add_source("apm_in", apm_receiver_config)?
.add_transform("v1_traces_enrich", v1_traces_enrich_config)?
.add_transform("apm_dd_apm_stats", apm_stats_config)?
.add_encoder("v1_dd_traces_encode", v1_dd_traces_config)?
.connect_component("v1_traces_enrich", ["apm_in.traces"])?
.connect_component("v1_dd_traces_encode", ["v1_traces_enrich"])?
.connect_component("apm_dd_apm_stats", ["v1_traces_enrich"])?
.connect_component("dd_out", ["v1_dd_traces_encode"])?;

// `dd_stats_encode` is shared with the OTLP traces pipeline when both are active.
//
// APM-only: we own the encoder — register it first, then connect apm_dd_apm_stats
// as its input and dd_out as its output.
//
// OTLP+APM: the encoder already exists (registered by add_baseline_traces_pipeline)
// and dd_out is already wired to it; we only need to add apm_dd_apm_stats
// as a second upstream. Adding the dd_out edge again would create a
// duplicate graph edge that forwards every stats payload twice.
if !dp_config.traces_pipeline_required() {
let dd_apm_stats_encoder = DatadogApmStatsEncoderConfiguration::from_configuration(config)
.error_context("Failed to configure Datadog APM Stats encoder.")?
.with_environment_provider(env_provider.clone())
.await?;
blueprint
.add_encoder("dd_stats_encode", dd_apm_stats_encoder)?
.connect_component("dd_stats_encode", ["apm_dd_apm_stats"])?
.connect_component("dd_out", ["dd_stats_encode"])?;
} else {
blueprint.connect_component("dd_stats_encode", ["apm_dd_apm_stats"])?;
}

Ok(())
}

async fn add_baseline_metrics_pipeline_to_blueprint(
blueprint: &mut TopologyBlueprint, config: &GenericConfiguration, dp_config: &DataPlaneConfiguration,
env_provider: &ADPEnvironmentProvider,
Expand Down Expand Up @@ -381,7 +463,7 @@ async fn add_baseline_logs_pipeline_to_blueprint(
async fn add_baseline_traces_pipeline_to_blueprint(
blueprint: &mut TopologyBlueprint, config: &GenericConfiguration, env_provider: &ADPEnvironmentProvider,
) -> Result<(), GenericError> {
let dd_traces_config = DatadogTraceConfiguration::from_configuration(config)
let dd_traces_config = V1DatadogTraceConfiguration::from_configuration(config)
.error_context("Failed to configure Datadog Traces encoder.")?
.with_environment_provider(env_provider.clone())
.await?;
Expand Down
12 changes: 5 additions & 7 deletions bin/agent-data-plane/src/components/apm_onboarding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use saluki_common::{
};
use saluki_core::{
components::{transforms::*, ComponentContext},
data_model::event::trace::{Span, Trace},
data_model::event::trace::{AttributeValue, Span, Trace},
topology::EventsBuffer,
};
use saluki_error::GenericError;
use stringtheory::MetaString;
use tracing::debug;

mod install_info;
use self::install_info::InstallInfo;
use super::install_info::InstallInfo;

static META_TAG_INSTALL_ID: MetaString = MetaString::from_static("_dd.install.id");
static META_TAG_INSTALL_TYPE: MetaString = MetaString::from_static("_dd.install.type");
Expand Down Expand Up @@ -102,6 +101,7 @@ impl SynchronousTransform for ApmOnboarding {
}

fn get_root_span_from_trace_mut(trace: &mut Trace) -> Option<&mut Span> {
let trace_id_low = trace.trace_id_low;
let spans = trace.spans_mut();
if spans.is_empty() {
return None;
Expand Down Expand Up @@ -130,7 +130,7 @@ fn get_root_span_from_trace_mut(trace: &mut Trace) -> Option<&mut Span> {

if parent_to_child.len() != 1 {
debug!(
trace_id = spans[0].trace_id(),
trace_id = trace_id_low,
"Failed to reliably identify a root span for a trace."
);
}
Expand All @@ -155,7 +155,5 @@ fn add_onboarding_metadata_to_span(span: &mut Span, install_info: &InstallInfo)
}

fn add_meta_entry_if_missing(span: &mut Span, key: &MetaString, value: &MetaString) {
if !span.meta().contains_key(key) {
span.meta_mut().insert(key.clone(), value.clone());
}
span.attributes.entry(key.clone()).or_insert_with(|| AttributeValue::String(value.clone()));
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ impl InstallInfo {
pub async fn load_or_create() -> Result<Self, GenericError> {
let path = PlatformSettings::get_config_dir_path().join("install.json");

// See if the file exists, and load it if so.
let (install_info, should_write) = match tokio::fs::read(&path).await {
Ok(data) => {
// Try and decode the installation info.
//
// If we fail, we don't try to update it.
let install_info = serde_json::from_slice(&data).with_error_context(|| {
format!(
"Failed to decode installation info file '{}'.",
Expand All @@ -57,10 +53,8 @@ impl InstallInfo {
}

Err(e) => match e.kind() {
// If the file doesn't exist, then _we'll_ try and create it.
ErrorKind::NotFound => (Self::from_environment(), true),

// There was a legitimate error so we bail out.
_ => {
return Err(e).with_error_context(|| {
format!("Failed to read installation info file '{}'.", path.as_path().display())
Expand All @@ -69,9 +63,6 @@ impl InstallInfo {
},
};

// Write it out if we were the ones to create it.
//
// If we fail to write it out, then we also just bail out.
if should_write {
let install_info_json =
serde_json::to_vec(&install_info).error_context("Failed to serialize installation info to JSON.")?;
Expand Down
2 changes: 2 additions & 0 deletions bin/agent-data-plane/src/components/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod apm_onboarding;
mod install_info;
pub mod ottl_filter_processor;
pub mod ottl_transform_processor;
pub mod tag_filterlist;
pub mod v1_apm_onboarding;
Loading
Loading