Skip to content

Commit

Permalink
feat: Add support for event sampling (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 authored Jun 3, 2024
1 parent 18971a3 commit 0d54337
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 49 deletions.
3 changes: 2 additions & 1 deletion launchdarkly-server-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ lazy_static = "1.4.0"
log = "0.4.14"
lru = { version = "0.12.0", default-features = false }
ring = "0.17.5"
launchdarkly-server-sdk-evaluation = { git = "https://github.com/launchdarkly/rust-server-sdk-evaluation", branch = "feat/migrations" }
launchdarkly-server-sdk-evaluation = { git = "https://github.com/launchdarkly/rust-server-sdk-evaluation", branch = "mk/sc-243771/event-sampling" }
serde = { version = "1.0.132", features = ["derive"] }
serde_json = { version = "1.0.73", features = ["float_roundtrip"] }
thiserror = "1.0"
Expand All @@ -40,6 +40,7 @@ triomphe = { version = "<=0.1.10" }
uuid = {version = "1.2.2", features = ["v4"] }
hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24.1" , optional = true}
rand = "0.8.5"

[dev-dependencies]
maplit = "1.0.1"
Expand Down
9 changes: 4 additions & 5 deletions launchdarkly-server-sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.invoked.len() == origins.len());
assert!(event.invoked.iter().all(|i| origins.contains(&i)));
assert!(event.invoked.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down Expand Up @@ -1813,7 +1813,6 @@ mod tests {
assert!(event
.latency
.values()
.into_iter()
.all(|l| l > &Duration::from_millis(100)));
}
_ => panic!("Expected migration event"),
Expand Down Expand Up @@ -1871,7 +1870,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.errors.len() == origins.len());
assert!(event.errors.iter().all(|i| origins.contains(&i)));
assert!(event.errors.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down Expand Up @@ -1929,7 +1928,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.errors.len() == origins.len());
assert!(event.errors.iter().all(|i| origins.contains(&i)));
assert!(event.errors.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down Expand Up @@ -2002,7 +2001,7 @@ mod tests {
match &events[1] {
OutputEvent::MigrationOp(event) => {
assert!(event.errors.len() == origins.len());
assert!(event.errors.iter().all(|i| origins.contains(&i)));
assert!(event.errors.iter().all(|i| origins.contains(i)));
}
_ => panic!("Expected migration event"),
}
Expand Down
151 changes: 129 additions & 22 deletions launchdarkly-server-sdk/src/events/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crossbeam_channel::{bounded, select, tick, Receiver, Sender};
use rand::thread_rng;
use std::collections::HashSet;
use std::iter::FromIterator;
use std::time::SystemTime;

use launchdarkly_server_sdk_evaluation::Context;
use lru::LruCache;

use crate::sampler::{Sampler, ThreadRngSampler};

use super::event::FeatureRequestEvent;
use super::sender::EventSenderResult;
use super::{
Expand Down Expand Up @@ -76,6 +79,7 @@ pub(super) struct EventDispatcher {
last_known_time: u128,
disabled: bool,
thread_count: usize,
sampler: Box<dyn Sampler>,
}

impl EventDispatcher {
Expand All @@ -87,6 +91,7 @@ impl EventDispatcher {
last_known_time: 0,
disabled: false,
thread_count: 5,
sampler: Box::new(ThreadRngSampler::new(thread_rng())),
}
}

Expand Down Expand Up @@ -184,11 +189,19 @@ impl EventDispatcher {

fn process_event(&mut self, event: InputEvent) {
match event {
InputEvent::MigrationOp(migration_op) => self
.outbox
.add_event(OutputEvent::MigrationOp(migration_op)),
InputEvent::MigrationOp(migration_op) => {
if self
.sampler
.sample(migration_op.sampling_ratio.unwrap_or(1))
{
self.outbox
.add_event(OutputEvent::MigrationOp(migration_op));
}
}
InputEvent::FeatureRequest(fre) => {
self.outbox.add_to_summary(&fre);
if !fre.exclude_from_summaries {
self.outbox.add_to_summary(&fre);
}

let inlined = fre.clone().into_inline_with_anonymous_redaction(
self.events_configuration.all_attributes_private,
Expand All @@ -209,7 +222,10 @@ impl EventDispatcher {

if let Some(debug_events_until_date) = fre.debug_events_until_date {
let time = u128::from(debug_events_until_date);
if time > now && time > self.last_known_time {
if time > now
&& time > self.last_known_time
&& self.sampler.sample(fre.sampling_ratio.unwrap_or(1))
{
self.outbox
.add_event(OutputEvent::Debug(fre.clone().into_inline(
self.events_configuration.all_attributes_private,
Expand All @@ -218,19 +234,21 @@ impl EventDispatcher {
}
}

if fre.track_events {
if fre.track_events && self.sampler.sample(fre.sampling_ratio.unwrap_or(1)) {
self.outbox.add_event(OutputEvent::FeatureRequest(inlined));
}
}
InputEvent::Identify(identify) => {
self.notice_context(&identify.base.context);
self.outbox
.add_event(OutputEvent::Identify(identify.into_inline(
self.events_configuration.all_attributes_private,
HashSet::from_iter(
self.events_configuration.private_attributes.iter().cloned(),
),
)));
if self.sampler.sample(identify.sampling_ratio.unwrap_or(1)) {
self.outbox
.add_event(OutputEvent::Identify(identify.into_inline(
self.events_configuration.all_attributes_private,
HashSet::from_iter(
self.events_configuration.private_attributes.iter().cloned(),
),
)));
}
}
InputEvent::Custom(custom) => {
if self.notice_context(&custom.base.context) {
Expand All @@ -241,7 +259,9 @@ impl EventDispatcher {
)));
}

self.outbox.add_event(OutputEvent::Custom(custom));
if self.sampler.sample(custom.sampling_ratio.unwrap_or(1)) {
self.outbox.add_event(OutputEvent::Custom(custom));
}
}
}
}
Expand Down Expand Up @@ -362,6 +382,88 @@ mod tests {
assert_eq!(1, dispatcher.outbox.summary.features.len());
}

#[test]
fn dispatcher_ignores_feature_events_with_0_sampling_ratio() {
let (event_sender, _) = create_event_sender();
let events_configuration =
create_events_configuration(event_sender, Duration::from_secs(100));
let mut dispatcher = create_dispatcher(events_configuration);

let context = ContextBuilder::new("context")
.build()
.expect("Failed to create context");
let mut flag = basic_flag("flag");
flag.sampling_ratio = Some(0);
flag.debug_events_until_date = Some(64_060_606_800_000);
flag.track_events = true;

let detail = Detail {
value: Some(FlagValue::from(false)),
variation_index: Some(1),
reason: Reason::Fallthrough {
in_experiment: false,
},
};

let event_factory = EventFactory::new(true);
let feature_request_event = event_factory.new_eval_event(
&flag.key,
context,
&flag,
detail,
FlagValue::from(false),
None,
);

dispatcher.process_event(feature_request_event);
assert_eq!(1, dispatcher.outbox.events.len());
assert_eq!("index", dispatcher.outbox.events[0].kind());
assert_eq!(1, dispatcher.context_keys.len());
assert_eq!(1, dispatcher.outbox.summary.features.len());
}

#[test]
fn dispatcher_can_exclude_feature_event_from_summaries() {
let (event_sender, _) = create_event_sender();
let events_configuration =
create_events_configuration(event_sender, Duration::from_secs(100));
let mut dispatcher = create_dispatcher(events_configuration);

let context = ContextBuilder::new("context")
.build()
.expect("Failed to create context");
let mut flag = basic_flag("flag");
flag.exclude_from_summaries = true;
flag.debug_events_until_date = Some(64_060_606_800_000);
flag.track_events = true;

let detail = Detail {
value: Some(FlagValue::from(false)),
variation_index: Some(1),
reason: Reason::Fallthrough {
in_experiment: false,
},
};

let event_factory = EventFactory::new(true);
let feature_request_event = event_factory.new_eval_event(
&flag.key,
context,
&flag,
detail,
FlagValue::from(false),
None,
);

dispatcher.process_event(feature_request_event);
assert_eq!(3, dispatcher.outbox.events.len());
assert_eq!("index", dispatcher.outbox.events[0].kind());
assert_eq!("debug", dispatcher.outbox.events[1].kind());
assert_eq!("feature", dispatcher.outbox.events[2].kind());
assert_eq!(1, dispatcher.context_keys.len());
assert_eq!(0, dispatcher.outbox.summary.features.len());
}

#[test_case(0, 64_060_606_800_000, vec!["debug", "index", "summary"])]
#[test_case(64_060_606_800_000, 64_060_606_800_000, vec!["index", "summary"])]
#[test_case(64_060_606_800_001, 64_060_606_800_000, vec!["index", "summary"])]
Expand Down Expand Up @@ -398,11 +500,12 @@ mod tests {
create_events_configuration(event_sender, Duration::from_secs(100));
let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);

let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.last_known_time = last_known_time;

let dispatcher_handle = thread::Builder::new()
.spawn(move || dispatcher.start(inbox_rx))
.spawn(move || {
let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.last_known_time = last_known_time;
dispatcher.start(inbox_rx)
})
.unwrap();

inbox_tx
Expand Down Expand Up @@ -478,9 +581,11 @@ mod tests {
create_events_configuration(event_sender, Duration::from_secs(100));
let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);

let mut dispatcher = create_dispatcher(events_configuration);
let dispatcher_handle = thread::Builder::new()
.spawn(move || dispatcher.start(inbox_rx))
.spawn(move || {
let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.start(inbox_rx)
})
.unwrap();

let context = ContextBuilder::new("context")
Expand Down Expand Up @@ -514,9 +619,11 @@ mod tests {
create_events_configuration(event_sender, Duration::from_millis(200));
let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);

let mut dispatcher = create_dispatcher(events_configuration);
let _ = thread::Builder::new()
.spawn(move || dispatcher.start(inbox_rx))
.spawn(move || {
let mut dispatcher = create_dispatcher(events_configuration);
dispatcher.start(inbox_rx)
})
.unwrap();

let context = ContextBuilder::new("context")
Expand Down
Loading

0 comments on commit 0d54337

Please sign in to comment.