Skip to content

feat(server): Ingest and normalize sample rates #910

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

- Extract crashpad annotations into contexts. ([#892](https://github.com/getsentry/relay/pull/892))
- Normalize user reports during ingestion and create empty fields. ([#903](https://github.com/getsentry/relay/pull/903))
- Ingest and normalize sample rates from envelope item headers. ([#910](https://github.com/getsentry/relay/pull/910))

## 20.12.1

Expand Down
29 changes: 28 additions & 1 deletion relay-general/src/protocol/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
use crate::processor::ProcessValue;
use crate::types::Annotated;
use crate::types::{Annotated, Array};

#[derive(Clone, Debug, Default, Empty, PartialEq, FromValue, ToValue)]
#[cfg_attr(feature = "jsonschema", derive(JsonSchema))]
pub struct SampleRate {
/// The unique identifier of the sampling rule or mechanism.
///
/// For client-side sampling, this identifies the sampling mechanism:
/// - `client_rate`: Default base sample rate configured in client options. Only reported in
/// the absence of the traces sampler callback.
/// - `client_sampler`: Return value from the traces sampler callback during runtime. Always
/// overrides the `client_rate`.
///
/// For server-side sampling, this identifies the dynamic sampling rule.
id: Annotated<String>,

/// The effective sample rate in the range `(0..1]`.
///
/// While allowed in the protocol, a value of `0` can never occur in practice since such events
/// would never be reported to Sentry and thus never generate this metric.
rate: Annotated<f64>,
}

/// Metrics captured during event ingestion and processing.
///
Expand Down Expand Up @@ -130,6 +151,12 @@ pub struct Metrics {
/// This metric is measured in Sentry and should be reported in all processing tasks.
#[metastructure(field = "flag.processing.fatal")]
pub flag_processing_fatal: Annotated<bool>,

/// A list of cumulative sample rates applied to this event.
///
/// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The
/// effective sample rate is multiplied.
pub sample_rates: Annotated<Array<SampleRate>>,
}

// Do not process Metrics
Expand Down
2 changes: 1 addition & 1 deletion relay-general/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub use self::fingerprint::Fingerprint;
pub use self::logentry::{LogEntry, Message};
pub use self::measurements::Measurements;
pub use self::mechanism::{CError, MachException, Mechanism, MechanismMeta, PosixSignal};
pub use self::metrics::Metrics;
pub use self::metrics::{Metrics, SampleRate};
pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Request};
#[cfg(feature = "jsonschema")]
pub use self::schema::event_json_schema;
Expand Down
44 changes: 37 additions & 7 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use relay_general::protocol::{
LenientString, Metrics, SecurityReportType, SessionUpdate, Timestamp, UserReport, Values,
};
use relay_general::store::ClockDriftProcessor;
use relay_general::types::{Annotated, Array, Object, ProcessingAction, Value};
use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value};
use relay_log::LogError;
use relay_quotas::RateLimits;
use relay_redis::RedisPool;
Expand Down Expand Up @@ -202,6 +202,12 @@ struct ProcessEnvelopeState {
/// persisted into the Event. All modifications afterwards will have no effect.
metrics: Metrics,

/// A list of cumulative sample rates applied to this event.
///
/// This element is obtained from the event or transaction item and re-serialized into the
/// resulting item.
sample_rates: Option<Value>,

/// Rate limits returned in processing mode.
///
/// The rate limiter is invoked in processing mode, after which the resulting limits are stored
Expand Down Expand Up @@ -463,6 +469,7 @@ impl EventProcessor {
envelope,
event: Annotated::empty(),
metrics: Metrics::default(),
sample_rates: None,
rate_limits: RateLimits::new(),
project_state,
project_id,
Expand Down Expand Up @@ -749,22 +756,25 @@ impl EventProcessor {
return Err(ProcessingError::DuplicateItem(duplicate.ty()));
}

let (event, event_len) = if let Some(item) = event_item.or(security_item) {
let (event, event_len) = if let Some(mut item) = event_item.or(security_item) {
relay_log::trace!("processing json event");
state.sample_rates = item.take_sample_rates();
metric!(timer(RelayTimers::EventProcessingDeserialize), {
// Event items can never include transactions, so retain the event type and let
// inference deal with this during store normalization.
self.event_from_json_payload(item, None)?
})
} else if let Some(item) = transaction_item {
} else if let Some(mut item) = transaction_item {
relay_log::trace!("processing json transaction");
state.sample_rates = item.take_sample_rates();
metric!(timer(RelayTimers::EventProcessingDeserialize), {
// Transaction items can only contain transaction events. Force the event type to
// hint to normalization that we're dealing with a transaction now.
self.event_from_json_payload(item, Some(EventType::Transaction))?
})
} else if let Some(item) = raw_security_item {
} else if let Some(mut item) = raw_security_item {
relay_log::trace!("processing security report");
state.sample_rates = item.take_sample_rates();
self.event_from_security_report(item)?
} else if attachment_item.is_some() || breadcrumbs1.is_some() || breadcrumbs2.is_some() {
relay_log::trace!("extracting attached event data");
Expand Down Expand Up @@ -847,17 +857,31 @@ impl EventProcessor {
// In processing mode, also write metrics into the event. Most metrics have already been
// collected at this state, except for the combined size of all attachments.
if self.config.processing_enabled() {
let mut metrics = std::mem::take(&mut state.metrics);

let attachment_size = envelope
.items()
.filter(|item| item.attachment_type() == Some(AttachmentType::Attachment))
.map(|item| item.len() as u64)
.sum::<u64>();

if attachment_size > 0 {
state.metrics.bytes_ingested_event_attachment = Annotated::new(attachment_size);
metrics.bytes_ingested_event_attachment = Annotated::new(attachment_size);
}

let sample_rates = state
.sample_rates
.take()
.and_then(|value| Array::from_value(Annotated::new(value)).into_value());

if let Some(rates) = sample_rates {
metrics
.sample_rates
.get_or_insert_with(Array::new)
.extend(rates)
}

event._metrics = Annotated::new(std::mem::take(&mut state.metrics));
event._metrics = Annotated::new(metrics);
}

// TODO: Temporary workaround before processing. Experimental SDKs relied on a buggy
Expand Down Expand Up @@ -1076,6 +1100,13 @@ impl EventProcessor {
let event_type = state.event_type().unwrap_or_default();
let mut event_item = Item::new(ItemType::from_event_type(event_type));
event_item.set_payload(ContentType::Json, data);

// If there are sample rates, write them back to the envelope. In processing mode, sample
// rates have been removed from the state and burnt into the event via `finalize_event`.
if let Some(sample_rates) = state.sample_rates.take() {
event_item.set_sample_rates(sample_rates);
}

state.envelope.add_item(event_item);

Ok(())
Expand Down Expand Up @@ -1127,7 +1158,6 @@ impl EventProcessor {
});

self.finalize_event(&mut state)?;

self.sample_event(&mut state)?;

if_processing!({
Expand Down
20 changes: 20 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ pub struct ItemHeaders {
#[serde(default, skip)]
rate_limited: bool,

/// A list of cumulative sample rates applied to this event.
///
/// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The
/// effective sample rate is multiplied.
#[serde(default, skip_serializing_if = "Option::is_none")]
sample_rates: Option<Value>,

/// Other attributes for forward compatibility.
#[serde(flatten)]
other: BTreeMap<String, Value>,
Expand All @@ -371,6 +378,7 @@ impl Item {
content_type: None,
filename: None,
rate_limited: false,
sample_rates: None,
other: BTreeMap::new(),
},
payload: Bytes::new(),
Expand Down Expand Up @@ -456,6 +464,18 @@ impl Item {
self.headers.rate_limited = rate_limited;
}

/// Removes sample rates from the headers, if any.
pub fn take_sample_rates(&mut self) -> Option<Value> {
self.headers.sample_rates.take()
}

/// Sets sample rates for this item.
pub fn set_sample_rates(&mut self, sample_rates: Value) {
if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) {
self.headers.sample_rates = Some(sample_rates);
}
}

/// Returns the specified header value, if present.
pub fn get_header<K>(&self, name: &K) -> Option<&Value>
where
Expand Down
76 changes: 39 additions & 37 deletions tests/integration/test_envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,12 @@ def generate_transaction_item():
def test_normalize_measurement_interface(
mini_sentry, relay_with_processing, transactions_consumer
):

# set up relay

relay = relay_with_processing()
mini_sentry.add_basic_project_config(42)

events_consumer = transactions_consumer()

# construct envelope

transaction_item = generate_transaction_item()

transaction_item.update(
{
"measurements": {
Expand All @@ -79,15 +73,9 @@ def test_normalize_measurement_interface(

envelope = Envelope()
envelope.add_transaction(transaction_item)

# ingest envelope

relay.send_envelope(42, envelope)

event, _ = events_consumer.try_get_event()

# test actual output

assert event["transaction"] == "/organizations/:orgId/performance/:eventSlug/"
assert "trace" in event["contexts"]
assert "measurements" in event, event
Expand All @@ -102,48 +90,29 @@ def test_normalize_measurement_interface(


def test_empty_measurement_interface(mini_sentry, relay_chain):

# set up relay

relay = relay_chain()
mini_sentry.add_basic_project_config(42)

# construct envelope

transaction_item = generate_transaction_item()

transaction_item.update({"measurements": {}})

envelope = Envelope()
envelope.add_transaction(transaction_item)

# ingest envelope

relay.send_envelope(42, envelope)

envelope = mini_sentry.captured_events.get(timeout=1)

event = envelope.get_transaction_event()

# test actual output

assert event["transaction"] == "/organizations/:orgId/performance/:eventSlug/"
assert "measurements" not in event, event


def test_strip_measurement_interface(
mini_sentry, relay_with_processing, events_consumer
):

# set up relay

relay = relay_with_processing()
mini_sentry.add_basic_project_config(42)

events_consumer = events_consumer()

# construct envelope

envelope = Envelope()
envelope.add_event(
{
Expand All @@ -155,16 +124,49 @@ def test_strip_measurement_interface(
},
}
)

# ingest envelope

relay.send_envelope(42, envelope)

events_consumer = events_consumer()
event, _ = events_consumer.try_get_event()

# test actual output

assert event["logentry"] == {"formatted": "Hello, World!"}

# expect measurements interface object to be stripped out since it's attached to a non-transaction event
assert "measurements" not in event, event


def test_sample_rates(mini_sentry, relay_chain):
relay = relay_chain()
mini_sentry.add_basic_project_config(42)

sample_rates = [
{"id": "client_sampler", "rate": 0.01},
{"id": "dyanmic_user", "rate": 0.5},
]

envelope = Envelope()
envelope.add_event({"message": "hello, world!"})
envelope.items[0].headers["sample_rates"] = sample_rates
relay.send_envelope(42, envelope)

envelope = mini_sentry.captured_events.get(timeout=1)
assert envelope.items[0].headers["sample_rates"] == sample_rates


def test_sample_rates_metrics(mini_sentry, relay_with_processing, events_consumer):
relay = relay_with_processing()
mini_sentry.add_basic_project_config(42)

sample_rates = [
{"id": "client_sampler", "rate": 0.01},
{"id": "dyanmic_user", "rate": 0.5},
]

envelope = Envelope()
envelope.add_event({"message": "hello, world!"})
envelope.items[0].headers["sample_rates"] = sample_rates
relay.send_envelope(42, envelope)

events_consumer = events_consumer()
event, _ = events_consumer.try_get_event()

assert event["_metrics"]["sample_rates"] == sample_rates