Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions relay-server/src/processing/profiles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ impl Processor for ProfilesProcessor {
profiles: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
relay_statsd::metric!(
counter(RelayCounters::StandaloneItem) += profiles.profiles.len() as u64,
processor = "new",
item_type = "profile",
);

filter::feature_flag(ctx).reject(&profiles)?;

let profile = process::expand(profiles)?;
Expand Down
15 changes: 0 additions & 15 deletions relay-server/src/processing/transactions/types/output.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
#[cfg(test)]
use relay_event_schema::protocol::Event;
#[cfg(test)]
use relay_protocol::Annotated;
use relay_quotas::DataCategory;

use crate::Envelope;
Expand All @@ -26,17 +22,6 @@ pub enum TransactionOutput {
Indexed(Managed<Box<ExpandedTransaction<Indexed>>>),
}

impl TransactionOutput {
#[cfg(test)]
pub fn event(self) -> Option<Annotated<Event>> {
match self {
TransactionOutput::Full(managed) => Some(managed.accept(|x| x).event),
TransactionOutput::Profile(_) => None,
TransactionOutput::Indexed(managed) => Some(managed.accept(|x| x).event),
}
}
}

impl Forward for TransactionOutput {
fn serialize_envelope(
self,
Expand Down
6 changes: 0 additions & 6 deletions relay-server/src/processing/user_reports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,6 @@ impl Processor for UserReportsProcessor {
mut reports: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
relay_statsd::metric!(
counter(RelayCounters::StandaloneItem) += reports.reports.len() as u64,
processor = "new",
item_type = "user_report",
);

process::process(&mut reports);

let reports = self.limiter.enforce_quotas(reports, ctx).await?;
Expand Down
87 changes: 5 additions & 82 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,11 @@ use {
pub mod event;
mod metrics;
mod nel;
mod profile;
mod report;
#[cfg(feature = "processing")]
mod span;

#[cfg(all(sentry, feature = "processing"))]
pub mod playstation;
mod standalone;

/// Creates the block only if used with `processing` feature.
///
Expand Down Expand Up @@ -169,13 +166,6 @@ processing_group!(TransactionGroup, Transaction);
processing_group!(ErrorGroup, Error);

processing_group!(SessionGroup, Session);
processing_group!(
StandaloneGroup,
Standalone,
StandaloneAttachments,
StandaloneUserReports,
StandaloneProfiles
);
processing_group!(ClientReportGroup, ClientReport);
processing_group!(ReplayGroup, Replay);
processing_group!(CheckInGroup, CheckIn);
Expand Down Expand Up @@ -208,9 +198,6 @@ pub enum ProcessingGroup {
Error,
/// Session events.
Session,
/// Standalone items which can be sent alone without any event attached to it in the current
/// envelope e.g. some attachments, user reports.
Standalone,
/// Standalone attachments
///
/// Attachments that are send without an item that creates an event in the same envelope.
Expand Down Expand Up @@ -404,20 +391,6 @@ impl ProcessingGroup {
}
}

// Extract all standalone items.
//
// Note: only if there are no items in the envelope which can create events, otherwise they
// will be in the same envelope with all require event items.
if !envelope.items().any(Item::creates_event) {
let standalone_items = envelope.take_items_by(Item::requires_event);
if !standalone_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::Standalone,
Envelope::from_parts(headers.clone(), standalone_items),
))
}
};

// Make sure we create separate envelopes for each `RawSecurity` report.
let security_reports_items = envelope
.take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
Expand Down Expand Up @@ -479,7 +452,6 @@ impl ProcessingGroup {
ProcessingGroup::Transaction => "transaction",
ProcessingGroup::Error => "error",
ProcessingGroup::Session => "session",
ProcessingGroup::Standalone => "standalone",
ProcessingGroup::StandaloneAttachments => "standalone_attachment",
ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
ProcessingGroup::StandaloneProfiles => "standalone_profiles",
Expand All @@ -506,7 +478,6 @@ impl From<ProcessingGroup> for AppFeature {
ProcessingGroup::Transaction => AppFeature::Transactions,
ProcessingGroup::Error => AppFeature::Errors,
ProcessingGroup::Session => AppFeature::Sessions,
ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
Expand Down Expand Up @@ -604,6 +575,9 @@ pub enum ProcessingError {
#[cfg(feature = "processing")]
#[error("invalid attachment reference")]
InvalidAttachmentRef,

#[error("could not determine processing group for envelope items")]
NoProcessingGroup,
}

impl ProcessingError {
Expand Down Expand Up @@ -650,6 +624,7 @@ impl ProcessingError {
Self::InvalidAttachmentRef => {
Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
}
Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
}
}

Expand Down Expand Up @@ -845,17 +820,6 @@ fn send_metrics(
}
}

/// Function for on-off switches that filter specific item types (profiles, spans)
/// based on a feature flag.
///
/// If the project config did not come from the upstream, we keep the items.
fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
match config.relay_mode() {
RelayMode::Proxy => false,
RelayMode::Managed => !project_info.has_feature(feature),
}
}

/// The result of the envelope processing containing the processed envelope along with the partial
/// result.
#[derive(Debug)]
Expand Down Expand Up @@ -1348,44 +1312,6 @@ impl EnvelopeProcessorService {
} else { Ok(cached_result.event) })
}

/// Processes standalone items that require an event ID, but do not have an event on the same envelope.
async fn process_standalone(
&self,
managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
ctx: processing::Context<'_>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
for item in managed_envelope.envelope().items() {
let attachment_type_tag = match item.attachment_type() {
Some(t) => &t.to_string(),
None => "",
};
relay_statsd::metric!(
counter(RelayCounters::StandaloneItem) += 1,
processor = "old",
item_type = item.ty().name(),
attachment_type = attachment_type_tag,
);
}

let mut extracted_metrics = ProcessingExtractedMetrics::new();

standalone::process(managed_envelope);

profile::filter(managed_envelope, ctx.config, ctx.project_info);

self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
ctx,
)
.await?;

report::process_user_reports(managed_envelope);

Ok(Some(extracted_metrics))
}

async fn process_nel(
&self,
mut managed_envelope: ManagedEnvelope,
Expand Down Expand Up @@ -1552,7 +1478,6 @@ impl EnvelopeProcessorService {
self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
.await
}
ProcessingGroup::Standalone => run!(process_standalone, ctx),
ProcessingGroup::StandaloneAttachments => {
self.process_with_processor(
&self.inner.processing.attachments,
Expand Down Expand Up @@ -1647,9 +1572,7 @@ impl EnvelopeProcessorService {
"could not identify the processing group based on the envelope's items"
);

Ok(ProcessingResult::no_metrics(
managed_envelope.into_processed(),
))
Err(ProcessingError::NoProcessingGroup)
Comment on lines -1650 to +1575
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one "new" logic: We don't forward ungrouped but rather error out here.

}
// Leave this group unchanged.
//
Expand Down
Loading
Loading