Skip to content

Commit

Permalink
feat: Add support for event sampling (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 committed Jul 11, 2024
1 parent c3b872c commit 6cf1854
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 46 deletions.
1 change: 1 addition & 0 deletions launchdarkly-server-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ triomphe = { version = "<=0.1.10" }
uuid = {version = "1.2.2", features = ["v4"] }
hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24.1" , optional = true}
rand = "0.8.5"

[dev-dependencies]
maplit = "1.0.1"
Expand Down
9 changes: 4 additions & 5 deletions launchdarkly-server-sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.invoked.len() == origins.len());
assert!(event.invoked.iter().all(|i| origins.contains(&i)));
assert!(event.invoked.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down Expand Up @@ -1813,7 +1813,6 @@ mod tests {
assert!(event
.latency
.values()
.into_iter()
.all(|l| l > &Duration::from_millis(100)));
}
_ => panic!("Expected migration event"),
Expand Down Expand Up @@ -1871,7 +1870,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.errors.len() == origins.len());
assert!(event.errors.iter().all(|i| origins.contains(&i)));
assert!(event.errors.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down Expand Up @@ -1929,7 +1928,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.errors.len() == origins.len());
assert!(event.errors.iter().all(|i| origins.contains(&i)));
assert!(event.errors.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down Expand Up @@ -2002,7 +2001,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.errors.len() == origins.len());
assert!(event.errors.iter().all(|i| origins.contains(&i)));
assert!(event.errors.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down
147 changes: 127 additions & 20 deletions launchdarkly-server-sdk/src/events/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crossbeam_channel::{bounded, select, tick, Receiver, Sender};
use rand::thread_rng;
use std::time::SystemTime;

use launchdarkly_server_sdk_evaluation::Context;
use lru::LruCache;

use super::event::{BaseEvent, FeatureRequestEvent, IndexEvent};
use crate::sampler::{Sampler, ThreadRngSampler};

use super::sender::EventSenderResult;
use super::{
event::{EventSummary, InputEvent, OutputEvent},
Expand Down Expand Up @@ -74,6 +77,7 @@ pub(super) struct EventDispatcher {
last_known_time: u128,
disabled: bool,
thread_count: usize,
sampler: Box<dyn Sampler>,
}

impl EventDispatcher {
Expand All @@ -85,6 +89,7 @@ impl EventDispatcher {
last_known_time: 0,
disabled: false,
thread_count: 5,
sampler: Box::new(ThreadRngSampler::new(thread_rng())),
}
}

Expand Down Expand Up @@ -182,11 +187,19 @@ impl EventDispatcher {

fn process_event(&mut self, event: InputEvent) {
match event {
InputEvent::MigrationOp(migration_op) => self
.outbox
.add_event(OutputEvent::MigrationOp(migration_op)),
InputEvent::MigrationOp(migration_op) => {
if self
.sampler
.sample(migration_op.sampling_ratio.unwrap_or(1))
{
self.outbox
.add_event(OutputEvent::MigrationOp(migration_op));
}
}
InputEvent::FeatureRequest(fre) => {
self.outbox.add_to_summary(&fre);
if !fre.exclude_from_summaries {
self.outbox.add_to_summary(&fre);
}

let inlined = fre.clone().into_inline_with_anonymous_redaction(
self.events_configuration.all_attributes_private,
Expand All @@ -209,7 +222,10 @@ impl EventDispatcher {

if let Some(debug_events_until_date) = fre.debug_events_until_date {
let time = u128::from(debug_events_until_date);
if time > now && time > self.last_known_time {
if time > now
&& time > self.last_known_time
&& self.sampler.sample(fre.sampling_ratio.unwrap_or(1))
{
self.outbox
.add_event(OutputEvent::Debug(fre.clone().into_inline(
self.events_configuration.all_attributes_private,
Expand All @@ -218,7 +234,7 @@ impl EventDispatcher {
}
}

if fre.track_events {
if fre.track_events && self.sampler.sample(fre.sampling_ratio.unwrap_or(1)) {
self.outbox.add_event(OutputEvent::FeatureRequest(inlined));
}
}
Expand All @@ -231,11 +247,13 @@ impl EventDispatcher {
}

self.notice_context(&identify.base.context);
self.outbox
.add_event(OutputEvent::Identify(identify.into_inline(
self.events_configuration.all_attributes_private,
self.events_configuration.private_attributes.clone(),
)));
if self.sampler.sample(identify.sampling_ratio.unwrap_or(1)) {
self.outbox
.add_event(OutputEvent::Identify(identify.into_inline(
self.events_configuration.all_attributes_private,
self.events_configuration.private_attributes.clone(),
)));
}
}
InputEvent::Custom(custom) => {
if let Some(context) = self.get_indexable_context(&custom.base) {
Expand All @@ -247,7 +265,9 @@ impl EventDispatcher {
.add_event(OutputEvent::Index(IndexEvent::from(base)));
}

self.outbox.add_event(OutputEvent::Custom(custom));
if self.sampler.sample(custom.sampling_ratio.unwrap_or(1)) {
self.outbox.add_event(OutputEvent::Custom(custom));
}
}
}
}
Expand Down Expand Up @@ -429,6 +449,88 @@ mod tests {
assert_eq!(1, dispatcher.outbox.summary.features.len());
}

#[test]
fn dispatcher_ignores_feature_events_with_0_sampling_ratio() {
let (event_sender, _) = create_event_sender();
let events_configuration =
create_events_configuration(event_sender, Duration::from_secs(100));
let mut dispatcher = create_dispatcher(events_configuration);

let context = ContextBuilder::new("context")
.build()
.expect("Failed to create context");
let mut flag = basic_flag("flag");
flag.sampling_ratio = Some(0);
flag.debug_events_until_date = Some(64_060_606_800_000);
flag.track_events = true;

let detail = Detail {
value: Some(FlagValue::from(false)),
variation_index: Some(1),
reason: Reason::Fallthrough {
in_experiment: false,
},
};

let event_factory = EventFactory::new(true);
let feature_request_event = event_factory.new_eval_event(
&flag.key,
context,
&flag,
detail,
FlagValue::from(false),
None,
);

dispatcher.process_event(feature_request_event);
assert_eq!(1, dispatcher.outbox.events.len());
assert_eq!("index", dispatcher.outbox.events[0].kind());
assert_eq!(1, dispatcher.context_keys.len());
assert_eq!(1, dispatcher.outbox.summary.features.len());
}

#[test]
fn dispatcher_can_exclude_feature_event_from_summaries() {
let (event_sender, _) = create_event_sender();
let events_configuration =
create_events_configuration(event_sender, Duration::from_secs(100));
let mut dispatcher = create_dispatcher(events_configuration);

let context = ContextBuilder::new("context")
.build()
.expect("Failed to create context");
let mut flag = basic_flag("flag");
flag.exclude_from_summaries = true;
flag.debug_events_until_date = Some(64_060_606_800_000);
flag.track_events = true;

let detail = Detail {
value: Some(FlagValue::from(false)),
variation_index: Some(1),
reason: Reason::Fallthrough {
in_experiment: false,
},
};

let event_factory = EventFactory::new(true);
let feature_request_event = event_factory.new_eval_event(
&flag.key,
context,
&flag,
detail,
FlagValue::from(false),
None,
);

dispatcher.process_event(feature_request_event);
assert_eq!(3, dispatcher.outbox.events.len());
assert_eq!("index", dispatcher.outbox.events[0].kind());
assert_eq!("debug", dispatcher.outbox.events[1].kind());
assert_eq!("feature", dispatcher.outbox.events[2].kind());
assert_eq!(1, dispatcher.context_keys.len());
assert_eq!(0, dispatcher.outbox.summary.features.len());
}

#[test_case(0, 64_060_606_800_000, vec!["debug", "index", "summary"])]
#[test_case(64_060_606_800_000, 64_060_606_800_000, vec!["index", "summary"])]
#[test_case(64_060_606_800_001, 64_060_606_800_000, vec!["index", "summary"])]
Expand Down Expand Up @@ -465,11 +567,12 @@ mod tests {
create_events_configuration(event_sender, Duration::from_secs(100));
let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);

let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.last_known_time = last_known_time;

let dispatcher_handle = thread::Builder::new()
.spawn(move || dispatcher.start(inbox_rx))
.spawn(move || {
let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.last_known_time = last_known_time;
dispatcher.start(inbox_rx)
})
.unwrap();

inbox_tx
Expand Down Expand Up @@ -656,9 +759,11 @@ mod tests {
create_events_configuration(event_sender, Duration::from_secs(100));
let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);

let mut dispatcher = create_dispatcher(events_configuration);
let dispatcher_handle = thread::Builder::new()
.spawn(move || dispatcher.start(inbox_rx))
.spawn(move || {
let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.start(inbox_rx)
})
.unwrap();

let context = ContextBuilder::new("context")
Expand Down Expand Up @@ -692,9 +797,11 @@ mod tests {
create_events_configuration(event_sender, Duration::from_millis(200));
let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);

let mut dispatcher = create_dispatcher(events_configuration);
let _ = thread::Builder::new()
.spawn(move || dispatcher.start(inbox_rx))
.spawn(move || {
let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.start(inbox_rx)
})
.unwrap();

let context = ContextBuilder::new("context")
Expand Down
Loading

0 comments on commit 6cf1854

Please sign in to comment.