From 58a0809f24b0a94f3978cf2d26c3204e2b9b0749 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 13 Jun 2024 10:32:45 -0400 Subject: [PATCH 1/6] chore: Polish --- contract-tests/src/client_entity.rs | 28 +++---- contract-tests/src/command_params.rs | 4 +- launchdarkly-server-sdk/src/client.rs | 8 +- launchdarkly-server-sdk/src/events/event.rs | 6 +- launchdarkly-server-sdk/src/lib.rs | 2 +- .../src/migrations/migrator.rs | 78 ++++++++----------- launchdarkly-server-sdk/src/migrations/mod.rs | 9 ++- .../src/migrations/tracker.rs | 4 +- 8 files changed, 58 insertions(+), 81 deletions(-) diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index 0968b0a..79befcb 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -1,6 +1,6 @@ use futures::future::FutureExt; use launchdarkly_server_sdk::{ - Context, ContextBuilder, ExecutionOrder, MigratorBuilder, MultiContextBuilder, Reference, + Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference, }; use std::sync::Arc; use std::time::Duration; @@ -242,22 +242,13 @@ impl ClientEntity { let mut builder = MigratorBuilder::new(self.client.clone()); - let execution_order = match params.read_execution_order.as_str() { - "serial" => ExecutionOrder::Serial, - "random" => ExecutionOrder::Random, - _ => ExecutionOrder::Parallel, - }; - let old_endpoint = params.old_endpoint.clone(); - - let new_endpoint = params.new_endpoint.clone(); - builder = builder - .read_execution_order(execution_order) + .read_execution_order(params.read_execution_order) .track_errors(params.track_errors) .track_latency(params.track_latency) .read( |payload: &Option| { - let old_endpoint = old_endpoint.clone(); + let old_endpoint = params.old_endpoint.clone(); async move { let result = send_payload(&old_endpoint, payload.clone()).await; match result { @@ -268,7 +259,7 @@ impl ClientEntity { .boxed() }, |payload| { - let new_endpoint = new_endpoint.clone(); + let new_endpoint = params.new_endpoint.clone(); async move { let result = send_payload(&new_endpoint, payload.clone()).await; match result { @@ -286,7 +277,7 @@ impl ClientEntity { ) .write( |payload| { - let old_endpoint = old_endpoint.clone(); + let old_endpoint = params.old_endpoint.clone(); async move { let result = send_payload(&old_endpoint, payload.clone()).await; match result { @@ -297,7 +288,7 @@ impl ClientEntity { .boxed() }, |payload| { - let new_endpoint = new_endpoint.clone(); + let new_endpoint = params.new_endpoint.clone(); async move { let result = send_payload(&new_endpoint, payload.clone()).await; match result { @@ -309,7 +300,7 @@ impl ClientEntity { }, ); - let migrator = builder.build().expect("builder failed"); + let mut migrator = builder.build().expect("builder failed"); match params.operation { launchdarkly_server_sdk::Operation::Read => { let result = migrator @@ -349,7 +340,10 @@ impl ClientEntity { MigrationOperationResponse { result: payload }, ))) } - _ => todo!(), + _ => Err(format!( + "Invalid operation requested: {:?}", + params.operation + )), } } command => Err(format!("Invalid command requested: {}", command)), diff --git a/contract-tests/src/command_params.rs b/contract-tests/src/command_params.rs index fded244..a02819d 100644 --- a/contract-tests/src/command_params.rs +++ b/contract-tests/src/command_params.rs @@ -1,5 +1,5 @@ use launchdarkly_server_sdk::{ - AttributeValue, Context, FlagDetail, FlagValue, Operation, Reason, Stage, + AttributeValue, Context, ExecutionOrder, FlagDetail, FlagValue, Operation, Reason, Stage, }; use serde::{self, Deserialize, Serialize}; use std::collections::HashMap; @@ -157,7 +157,7 @@ pub struct MigrationOperationParams { pub key: String, pub context: Context, pub default_stage: Stage, - pub read_execution_order: String, + pub read_execution_order: ExecutionOrder, pub operation: Operation, pub old_endpoint: String, pub new_endpoint: String, diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index ff348d5..3f9a435 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -1741,7 +1741,7 @@ mod tests { ) .expect("patch should apply"); - let migrator = MigratorBuilder::new(client.clone()) + let mut migrator = MigratorBuilder::new(client.clone()) .read( |_| async move { Ok(serde_json::Value::Null) }.boxed(), |_| async move { Ok(serde_json::Value::Null) }.boxed(), @@ -1855,7 +1855,7 @@ mod tests { ) .expect("patch should apply"); - let migrator = MigratorBuilder::new(client.clone()) + let mut migrator = MigratorBuilder::new(client.clone()) .track_latency(true) .read( |_| { @@ -1957,7 +1957,7 @@ mod tests { ) .expect("patch should apply"); - let migrator = MigratorBuilder::new(client.clone()) + let mut migrator = MigratorBuilder::new(client.clone()) .track_latency(true) .read( |_| async move { Err("fail".into()) }.boxed(), @@ -2202,7 +2202,7 @@ mod tests { ) .expect("patch should apply"); - let migrator = MigratorBuilder::new(client.clone()) + let mut migrator = MigratorBuilder::new(client.clone()) .track_latency(true) .read( |_| { diff --git a/launchdarkly-server-sdk/src/events/event.rs b/launchdarkly-server-sdk/src/events/event.rs index d1679cf..4a19eb5 100644 --- a/launchdarkly-server-sdk/src/events/event.rs +++ b/launchdarkly-server-sdk/src/events/event.rs @@ -95,7 +95,7 @@ impl BaseEvent { } } -/// MigrationOpEventData is generated through the migration op tracker provided through the SDK. +/// A MigrationOpEvent is generated through the migration op tracker provided through the SDK. #[derive(Clone, Debug)] pub struct MigrationOpEvent { pub(crate) base: BaseEvent, @@ -131,21 +131,17 @@ impl Serialize for MigrationOpEvent { key: self.key.clone(), value: self.evaluation.value, default: self.default_stage, - // QUESTION: In the ruby implementation, this can be nil. Why not here? reason: self.evaluation.reason.clone(), variation_index: self.evaluation.variation_index, version: self.version, }; state.serialize_field("evaluation", &evaluation)?; - // TODO: Add sampling here if it is set and not 1 - let mut measurements = vec![]; if !self.invoked.is_empty() { measurements.push(MigrationOpMeasurement::Invoked(&self.invoked)); } - // TODO: There is something here to do with consistency check ratio if let Some(consistency_check) = self.consistency_check { measurements.push(MigrationOpMeasurement::ConsistencyCheck( consistency_check, diff --git a/launchdarkly-server-sdk/src/lib.rs b/launchdarkly-server-sdk/src/lib.rs index 8f42967..06c791f 100644 --- a/launchdarkly-server-sdk/src/lib.rs +++ b/launchdarkly-server-sdk/src/lib.rs @@ -42,7 +42,7 @@ pub use feature_requester_builders::{ }; pub use launchdarkly_server_sdk_evaluation::{Flag, Segment, Versioned}; pub use migrations::{ - ExecutionOrder, MigrationOpTracker, MigratorBuilder, Operation, Origin, Stage, + ExecutionOrder, MigrationOpTracker, Migrator, MigratorBuilder, Operation, Origin, Stage, }; pub use service_endpoints::ServiceEndpointsBuilder; pub use stores::persistent_store::{PersistentDataStore, PersistentStoreError}; diff --git a/launchdarkly-server-sdk/src/migrations/migrator.rs b/launchdarkly-server-sdk/src/migrations/migrator.rs index f9cb57d..56297d4 100644 --- a/launchdarkly-server-sdk/src/migrations/migrator.rs +++ b/launchdarkly-server-sdk/src/migrations/migrator.rs @@ -1,6 +1,3 @@ -// TODO: Remove this when subsequent PRs have added the required implementations. -#![allow(dead_code)] - use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; @@ -9,8 +6,11 @@ use futures::future::join_all; use futures::future::BoxFuture; use futures::future::FutureExt; use launchdarkly_server_sdk_evaluation::Context; +use rand::thread_rng; use serde::Serialize; +use crate::sampler::Sampler; +use crate::sampler::ThreadRngSampler; use crate::{Client, ExecutionOrder, MigrationOpTracker, Operation, Origin, Stage}; #[derive(Serialize)] @@ -54,27 +54,6 @@ where compare: Option>, } -/// Migrator represents the interface through which migration support is executed. -// pub trait Migrator { -// /// read uses the provided flag key and context to execute a migration-backed read operation. -// fn read( -// &self, -// key: String, -// context: Context, -// default_stage: Stage, -// payload: serde_json::Value, -// ) -> impl Future; -// -// /// write uses the provided flag key and context to execute a migration-backed write operation. -// fn write( -// &self, -// key: String, -// context: Context, -// default_stage: Stage, -// payload: serde_json::Value, -// ) -> impl Future; -// } - /// The migration builder is used to configure and construct an instance of a [Migrator]. This /// migrator can be used to perform LaunchDarkly assisted technology migrations through the use of /// migration-based feature flags. @@ -107,7 +86,7 @@ where pub fn new(client: Arc) -> Self { MigratorBuilder { client, - read_execution_order: ExecutionOrder::Parallel, + read_execution_order: ExecutionOrder::Concurrent, measure_latency: true, measure_errors: true, read_config: None, @@ -115,7 +94,7 @@ where } } - /// The read execution order influences the parallelism and execution order for read operations + /// The read execution order influences the concurrency and execution order for read operations /// involving multiple origins. pub fn read_execution_order(mut self, order: ExecutionOrder) -> Self { self.read_execution_order = order; @@ -150,7 +129,7 @@ where } /// Write can be used to configure the migration-write behavior of the resulting - /// [Migrations::Migrator] instance. + /// [crate::Migrator] instance. /// /// Users are required to provide two different write methods -- one to write to the old /// migration origin, and one to write to the new origin. Not every stage requires @@ -165,9 +144,7 @@ where self } - // TODO: Do we make this a real error type? - - /// Build constructs a [Migrations::Migrator] instance to support migration-based reads and + /// Build constructs a [crate::Migrator] instance to support migration-based reads and /// writes. A string describing any failure conditions will be returned if the build fails. pub fn build(self) -> Result, String> { let read_config = self.read_config.ok_or("read configuration not provided")?; @@ -186,6 +163,9 @@ where } } +/// The migrator is the primary interface for executing migration operations. It is configured +/// through the [MigratorBuilder] and can be used to perform LaunchDarkly assisted technology +/// migrations through the use of migration-based feature flags. pub struct Migrator where T: Send + Sync, @@ -200,6 +180,7 @@ where measure_errors: bool, read_config: MigrationConfig, write_config: MigrationConfig, + sampler: Box, } impl Migrator @@ -225,11 +206,13 @@ where measure_errors, read_config, write_config, + sampler: Box::new(ThreadRngSampler::new(thread_rng())), } } + /// Uses the provided flag key and context to execute a migration-backed read operation. pub async fn read( - &self, + &mut self, key: String, context: Context, default_stage: Stage, @@ -269,6 +252,7 @@ where self.read_config.compare, self.read_execution_order, tracker.clone(), + self.sampler.as_mut(), ) .await } @@ -279,6 +263,7 @@ where self.read_config.compare, self.read_execution_order, tracker.clone(), + self.sampler.as_mut(), ) .await } @@ -291,6 +276,7 @@ where result } + /// Uses the provided flag key and context to execute a migration-backed write operation. pub async fn write( &self, key: String, @@ -349,6 +335,7 @@ async fn read_both( compare: Option>, execution_order: ExecutionOrder, tracker: Arc>, + sampler: &mut dyn Sampler, ) -> MigrationOriginResult where T: Send + Sync, @@ -359,7 +346,7 @@ where let nonauthoritative_result: MigrationOriginResult; match execution_order { - ExecutionOrder::Parallel => { + ExecutionOrder::Concurrent => { let auth_handle = authoritative.run().boxed(); let nonauth_handle = nonauthoritative.run().boxed(); let handles = vec![auth_handle, nonauth_handle]; @@ -378,12 +365,11 @@ where result: Err("Failed to execute authoritative read".into()), }); } - // TODO: Add a sampler.sample(2) style call - ExecutionOrder::Random => { + ExecutionOrder::Random if sampler.sample(2) => { nonauthoritative_result = nonauthoritative.run().await; authoritative_result = authoritative.run().await; } - ExecutionOrder::Serial => { + _ => { authoritative_result = authoritative.run().await; nonauthoritative_result = nonauthoritative.run().await; } @@ -530,7 +516,7 @@ mod tests { let (sender, receiver) = mpsc::channel(); let old_sender = sender.clone(); let new_sender = sender.clone(); - let migrator = MigratorBuilder::new(client) + let mut migrator = MigratorBuilder::new(client) .track_latency(false) .track_errors(false) .write( @@ -664,7 +650,7 @@ mod tests { let (sender, receiver) = mpsc::channel(); let old_sender = sender.clone(); let new_sender = sender.clone(); - let migrator = MigratorBuilder::new(client) + let mut migrator = MigratorBuilder::new(client) .track_latency(false) .track_errors(false) .write( @@ -721,7 +707,7 @@ mod tests { } #[tokio::test] - async fn read_handles_parallel_execution() { + async fn read_handles_concurrent_execution() { let config = ConfigBuilder::new("sdk-key") .offline(true) .build() @@ -730,14 +716,14 @@ mod tests { let client = Arc::new(Client::build(config).expect("client failed to build")); client.start_with_default_executor(); - let migrator = MigratorBuilder::new(client) + let mut migrator = MigratorBuilder::new(client) .track_latency(false) .track_errors(false) .write( |_| async move { Ok(()) }.boxed(), |_| async move { Ok(()) }.boxed(), ) - .read_execution_order(ExecutionOrder::Parallel) + .read_execution_order(ExecutionOrder::Concurrent) .read( |_| { async move { @@ -774,12 +760,12 @@ mod tests { } #[tokio::test] - async fn read_handles_nonparallel_execution() { - read_handles_nonparallel_execution_driver(ExecutionOrder::Serial).await; - read_handles_nonparallel_execution_driver(ExecutionOrder::Random).await; + async fn read_handles_nonconcurrent_execution() { + read_handles_nonconcurrent_execution_driver(ExecutionOrder::Serial).await; + read_handles_nonconcurrent_execution_driver(ExecutionOrder::Random).await; } - async fn read_handles_nonparallel_execution_driver(execution_order: ExecutionOrder) { + async fn read_handles_nonconcurrent_execution_driver(execution_order: ExecutionOrder) { let config = ConfigBuilder::new("sdk-key") .offline(true) .build() @@ -788,7 +774,7 @@ mod tests { let client = Arc::new(Client::build(config).expect("client failed to build")); client.start_with_default_executor(); - let migrator = MigratorBuilder::new(client) + let mut migrator = MigratorBuilder::new(client) .track_latency(false) .track_errors(false) .write( @@ -999,7 +985,7 @@ mod tests { #[test_case(ExecutionOrder::Serial)] #[test_case(ExecutionOrder::Random)] - #[test_case(ExecutionOrder::Parallel)] + #[test_case(ExecutionOrder::Concurrent)] fn can_modify_execution_order(execution_order: ExecutionOrder) { let config = ConfigBuilder::new("sdk-key") .offline(true) diff --git a/launchdarkly-server-sdk/src/migrations/mod.rs b/launchdarkly-server-sdk/src/migrations/mod.rs index f83cf8e..91f9541 100644 --- a/launchdarkly-server-sdk/src/migrations/mod.rs +++ b/launchdarkly-server-sdk/src/migrations/mod.rs @@ -88,7 +88,7 @@ impl TryFrom for Stage { } #[non_exhaustive] -#[derive(Debug, Copy, Clone, Serialize)] +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] /// ExecutionOrder represents the various execution modes this SDK can operate under while /// performing migration-assisted reads. @@ -98,11 +98,12 @@ pub enum ExecutionOrder { Serial, /// Random execution randomly decides if the authoritative read should execute first or second. Random, - /// Parallel executes both reads in separate threads, and waits until both calls have - /// finished before proceeding. - Parallel, + /// Concurrent executes both concurrently, waiting until both calls have finished before + /// proceeding. + Concurrent, } +pub use migrator::Migrator; pub use migrator::MigratorBuilder; pub use tracker::MigrationOpTracker; diff --git a/launchdarkly-server-sdk/src/migrations/tracker.rs b/launchdarkly-server-sdk/src/migrations/tracker.rs index c763b9b..8fdd369 100644 --- a/launchdarkly-server-sdk/src/migrations/tracker.rs +++ b/launchdarkly-server-sdk/src/migrations/tracker.rs @@ -14,8 +14,8 @@ use crate::{ use super::{Operation, Origin, Stage}; -/// An MigrationOpTracker is responsible for managing the collection of measurements that which a user might wish to record -/// throughout a migration-assisted operation. +/// A MigrationOpTracker is responsible for managing the collection of measurements that a user +/// might wish to record throughout a migration-assisted operation. /// /// Example measurements include latency, errors, and consistency. pub struct MigrationOpTracker { From 7a4b6d75ba1c1651db2dce904b73d64748a662ac Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 18 Jun 2024 00:17:35 -0400 Subject: [PATCH 2/6] Borrow context and change parameter order --- contract-tests/src/client_entity.rs | 4 +- launchdarkly-server-sdk/src/client.rs | 16 +++---- .../src/migrations/migrator.rs | 48 +++++++++---------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index 79befcb..25d0d51 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -305,8 +305,8 @@ impl ClientEntity { launchdarkly_server_sdk::Operation::Read => { let result = migrator .read( + ¶ms.context, params.key, - params.context, params.default_stage, params.payload, ) @@ -324,8 +324,8 @@ impl ClientEntity { launchdarkly_server_sdk::Operation::Write => { let result = migrator .write( + ¶ms.context, params.key, - params.context, params.default_stage, params.payload, ) diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index 3f9a435..3ab8db7 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -1761,8 +1761,8 @@ mod tests { if let Operation::Read = operation { migrator .read( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) @@ -1770,8 +1770,8 @@ mod tests { } else { migrator .write( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) @@ -1900,8 +1900,8 @@ mod tests { if let Operation::Read = operation { migrator .read( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) @@ -1909,8 +1909,8 @@ mod tests { } else { migrator .write( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) @@ -1977,8 +1977,8 @@ mod tests { migrator .read( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) @@ -2046,8 +2046,8 @@ mod tests { migrator .write( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) @@ -2155,8 +2155,8 @@ mod tests { migrator .write( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) @@ -2246,8 +2246,8 @@ mod tests { migrator .read( + &context, "stage-flag".into(), - context, Stage::Off, serde_json::Value::Null, ) diff --git a/launchdarkly-server-sdk/src/migrations/migrator.rs b/launchdarkly-server-sdk/src/migrations/migrator.rs index 56297d4..9bda19f 100644 --- a/launchdarkly-server-sdk/src/migrations/migrator.rs +++ b/launchdarkly-server-sdk/src/migrations/migrator.rs @@ -213,14 +213,14 @@ where /// Uses the provided flag key and context to execute a migration-backed read operation. pub async fn read( &mut self, - key: String, - context: Context, + context: &Context, + flag_key: String, default_stage: Stage, payload: T, ) -> MigrationOriginResult { - let (stage, mut tracker) = self - .client - .migration_variation(&context, &key, default_stage); + let (stage, mut tracker) = + self.client + .migration_variation(context, &flag_key, default_stage); tracker.operation(Operation::Read); let tracker = Arc::new(Mutex::new(tracker)); @@ -279,14 +279,14 @@ where /// Uses the provided flag key and context to execute a migration-backed write operation. pub async fn write( &self, - key: String, - context: Context, + context: &Context, + flag_key: String, default_stage: Stage, payload: T, ) -> MigrationWriteResult { - let (stage, mut tracker) = self - .client - .migration_variation(&context, &key, default_stage); + let (stage, mut tracker) = + self.client + .migration_variation(context, &flag_key, default_stage); tracker.operation(Operation::Write); let tracker = Arc::new(Mutex::new(tracker)); @@ -548,10 +548,10 @@ mod tests { let _result = migrator .read( - "migration-key".into(), - ContextBuilder::new("user-key") + &ContextBuilder::new("user-key") .build() .expect("context failed to build"), + "migration-key".into(), crate::Stage::Shadow, 1, ) @@ -608,10 +608,10 @@ mod tests { let _result = migrator .write( - "migration-key".into(), - ContextBuilder::new("user-key") + &ContextBuilder::new("user-key") .build() .expect("context failed to build"), + "migration-key".into(), crate::Stage::Shadow, 1, ) @@ -682,10 +682,10 @@ mod tests { let _result = migrator .read( - "migration-key".into(), - ContextBuilder::new("user-key") + &ContextBuilder::new("user-key") .build() .expect("context failed to build"), + "migration-key".into(), stage, "payload", ) @@ -747,10 +747,10 @@ mod tests { let start = Instant::now(); let _result = migrator .read( - "migration-key".into(), - ContextBuilder::new("user-key") + &ContextBuilder::new("user-key") .build() .expect("context failed to build"), + "migration-key".into(), crate::Stage::Shadow, (), ) @@ -805,10 +805,10 @@ mod tests { let start = Instant::now(); let _result = migrator .read( - "migration-key".into(), - ContextBuilder::new("user-key") + &ContextBuilder::new("user-key") .build() .expect("context failed to build"), + "migration-key".into(), crate::Stage::Shadow, (), ) @@ -874,10 +874,10 @@ mod tests { let _result = migrator .write( - "migration-key".into(), - ContextBuilder::new("user-key") + &ContextBuilder::new("user-key") .build() .expect("context failed to build"), + "migration-key".into(), stage, (), ) @@ -959,10 +959,10 @@ mod tests { let _result = migrator .write( - "migration-key".into(), - ContextBuilder::new("user-key") + &ContextBuilder::new("user-key") .build() .expect("context failed to build"), + "migration-key".into(), stage, (), ) From 70aaf0be85db66d6b9d3b8de94b4a43bf0b92050 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 20 Jun 2024 10:22:46 -0400 Subject: [PATCH 3/6] Payload and return can vary in type --- launchdarkly-server-sdk/src/client.rs | 2 +- .../src/migrations/migrator.rs | 129 ++++++++++-------- 2 files changed, 74 insertions(+), 57 deletions(-) diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index 3ab8db7..779e982 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -1962,7 +1962,7 @@ mod tests { .read( |_| async move { Err("fail".into()) }.boxed(), |_| async move { Err("fail".into()) }.boxed(), - Some(|_, _| true), + Some(|_: &String, _: &String| true), ) .write( |_| async move { Err("fail".into()) }.boxed(), diff --git a/launchdarkly-server-sdk/src/migrations/migrator.rs b/launchdarkly-server-sdk/src/migrations/migrator.rs index 9bda19f..8e49b8f 100644 --- a/launchdarkly-server-sdk/src/migrations/migrator.rs +++ b/launchdarkly-server-sdk/src/migrations/migrator.rs @@ -43,44 +43,49 @@ pub struct MigrationWriteResult { // provided results are equal, this method will return true and false otherwise. type MigrationComparisonFn = fn(&T, &T) -> bool; -struct MigrationConfig +struct MigrationConfig where + P: Send + Sync, T: Send + Sync, - FO: Fn(&T) -> BoxFuture> + Sync + Send, - FN: Fn(&T) -> BoxFuture> + Sync + Send, + FO: Fn(&P) -> BoxFuture> + Sync + Send, + FN: Fn(&P) -> BoxFuture> + Sync + Send, { old: FO, new: FN, compare: Option>, + + _p: std::marker::PhantomData

, } /// The migration builder is used to configure and construct an instance of a [Migrator]. This /// migrator can be used to perform LaunchDarkly assisted technology migrations through the use of /// migration-based feature flags. -pub struct MigratorBuilder +pub struct MigratorBuilder where + P: Send + Sync, T: Send + Sync, - FRO: Fn(&T) -> BoxFuture> + Sync + Send, - FRN: Fn(&T) -> BoxFuture> + Sync + Send, - FWO: Fn(&T) -> BoxFuture> + Sync + Send, - FWN: Fn(&T) -> BoxFuture> + Sync + Send, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, { client: Arc, read_execution_order: ExecutionOrder, measure_latency: bool, measure_errors: bool, - read_config: Option>, - write_config: Option>, + read_config: Option>, + write_config: Option>, } -impl MigratorBuilder +impl MigratorBuilder where + P: Send + Sync, T: Send + Sync, - FRO: Fn(&T) -> BoxFuture> + Sync + Send, - FRN: Fn(&T) -> BoxFuture> + Sync + Send, - FWO: Fn(&T) -> BoxFuture> + Sync + Send, - FWN: Fn(&T) -> BoxFuture> + Sync + Send, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, { /// Create a new migrator builder instance with the provided client. pub fn new(client: Arc) -> Self { @@ -124,7 +129,12 @@ where /// /// Depending on the migration stage, one or both of these read methods may be called. pub fn read(mut self, old: FRO, new: FRN, compare: Option>) -> Self { - self.read_config = Some(MigrationConfig { old, new, compare }); + self.read_config = Some(MigrationConfig { + old, + new, + compare, + _p: std::marker::PhantomData, + }); self } @@ -140,13 +150,14 @@ where old, new, compare: None, + _p: std::marker::PhantomData, }); self } /// Build constructs a [crate::Migrator] instance to support migration-based reads and /// writes. A string describing any failure conditions will be returned if the build fails. - pub fn build(self) -> Result, String> { + pub fn build(self) -> Result, String> { let read_config = self.read_config.ok_or("read configuration not provided")?; let write_config = self .write_config @@ -166,38 +177,40 @@ where /// The migrator is the primary interface for executing migration operations. It is configured /// through the [MigratorBuilder] and can be used to perform LaunchDarkly assisted technology /// migrations through the use of migration-based feature flags. -pub struct Migrator +pub struct Migrator where + P: Send + Sync, T: Send + Sync, - FRO: Fn(&T) -> BoxFuture> + Sync + Send, - FRN: Fn(&T) -> BoxFuture> + Sync + Send, - FWO: Fn(&T) -> BoxFuture> + Sync + Send, - FWN: Fn(&T) -> BoxFuture> + Sync + Send, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, { client: Arc, read_execution_order: ExecutionOrder, measure_latency: bool, measure_errors: bool, - read_config: MigrationConfig, - write_config: MigrationConfig, + read_config: MigrationConfig, + write_config: MigrationConfig, sampler: Box, } -impl Migrator +impl Migrator where + P: Send + Sync, T: Send + Sync, - FRO: Fn(&T) -> BoxFuture> + Sync + Send, - FRN: Fn(&T) -> BoxFuture> + Sync + Send, - FWO: Fn(&T) -> BoxFuture> + Sync + Send, - FWN: Fn(&T) -> BoxFuture> + Sync + Send, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, { fn new( client: Arc, read_execution_order: ExecutionOrder, measure_latency: bool, measure_errors: bool, - read_config: MigrationConfig, - write_config: MigrationConfig, + read_config: MigrationConfig, + write_config: MigrationConfig, ) -> Self { Migrator { client, @@ -216,7 +229,7 @@ where context: &Context, flag_key: String, default_stage: Stage, - payload: T, + payload: P, ) -> MigrationOriginResult { let (stage, mut tracker) = self.client @@ -282,7 +295,7 @@ where context: &Context, flag_key: String, default_stage: Stage, - payload: T, + payload: P, ) -> MigrationWriteResult { let (stage, mut tracker) = self.client @@ -329,18 +342,19 @@ where } } -async fn read_both( - mut authoritative: Executor<'_, T, FA>, - mut nonauthoritative: Executor<'_, T, FB>, +async fn read_both( + mut authoritative: Executor<'_, P, T, FA>, + mut nonauthoritative: Executor<'_, P, T, FB>, compare: Option>, execution_order: ExecutionOrder, tracker: Arc>, sampler: &mut dyn Sampler, ) -> MigrationOriginResult where + P: Send + Sync, T: Send + Sync, - FA: Fn(&T) -> BoxFuture> + Sync + Send, - FB: Fn(&T) -> BoxFuture> + Sync + Send, + FA: Fn(&P) -> BoxFuture> + Sync + Send, + FB: Fn(&P) -> BoxFuture> + Sync + Send, { let authoritative_result: MigrationOriginResult; let nonauthoritative_result: MigrationOriginResult; @@ -391,14 +405,15 @@ where authoritative_result } -async fn write_both( - mut authoritative: Executor<'_, T, FA>, - mut nonauthoritative: Executor<'_, T, FB>, +async fn write_both( + mut authoritative: Executor<'_, P, T, FA>, + mut nonauthoritative: Executor<'_, P, T, FB>, ) -> MigrationWriteResult where + P: Send + Sync, T: Send + Sync, - FA: Fn(&T) -> BoxFuture> + Sync + Send, - FB: Fn(&T) -> BoxFuture> + Sync + Send, + FA: Fn(&P) -> BoxFuture> + Sync + Send, + FB: Fn(&P) -> BoxFuture> + Sync + Send, { let authoritative_result = authoritative.run().await; @@ -417,23 +432,25 @@ where } } -struct Executor<'a, T, F> +struct Executor<'a, P, T, F> where + P: Send + Sync, T: Send + Sync, - F: Fn(&T) -> BoxFuture> + Sync + Send, + F: Fn(&P) -> BoxFuture> + Sync + Send, { origin: Origin, function: &'a F, tracker: Arc>, measure_latency: bool, measure_errors: bool, - payload: &'a T, + payload: &'a P, } -impl<'a, T, F> Executor<'a, T, F> +impl<'a, P, T, F> Executor<'a, P, T, F> where + P: Send + Sync, T: Send + Sync, - F: Fn(&T) -> BoxFuture> + Sync + Send, + F: Fn(&P) -> BoxFuture> + Sync + Send, { async fn run(&mut self) -> MigrationOriginResult { let start = Instant::now(); @@ -490,13 +507,13 @@ mod tests { .track_latency(false) .track_errors(false) .read( - |_| async move { Ok(()) }.boxed(), - |_| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), Some(|_, _| true), ) .write( - |_| async move { Ok(()) }.boxed(), - |_| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), ) .build(); @@ -997,13 +1014,13 @@ mod tests { .track_latency(false) .track_errors(false) .read( - |_| async move { Ok(()) }.boxed(), - |_| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), Some(|_, _| true), ) .write( - |_| async move { Ok(()) }.boxed(), - |_| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), ) .read_execution_order(execution_order) .build(); From 2b4cb3864b4d7b29e38a7ce3f0bfdd9e62ad1b06 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 20 Jun 2024 10:44:15 -0400 Subject: [PATCH 4/6] remove mutex from tracker --- .../src/migrations/tracker.rs | 48 +++---------------- 1 file changed, 6 insertions(+), 42 deletions(-) diff --git a/launchdarkly-server-sdk/src/migrations/tracker.rs b/launchdarkly-server-sdk/src/migrations/tracker.rs index 8fdd369..2ff747f 100644 --- a/launchdarkly-server-sdk/src/migrations/tracker.rs +++ b/launchdarkly-server-sdk/src/migrations/tracker.rs @@ -1,6 +1,5 @@ use std::{ collections::{HashMap, HashSet}, - sync::Mutex, time::Duration, }; @@ -24,8 +23,6 @@ pub struct MigrationOpTracker { context: Context, detail: Detail, default_stage: Stage, - - mutex: Mutex<()>, operation: Option, invoked: HashSet, consistent: Option, @@ -56,7 +53,6 @@ impl MigrationOpTracker { context, detail, default_stage, - mutex: Mutex::new(()), operation: None, invoked: HashSet::new(), consistent: None, @@ -68,22 +64,12 @@ impl MigrationOpTracker { /// Sets the migration related operation associated with these tracking measurements. pub fn operation(&mut self, operation: Operation) { - match self.mutex.lock() { - Ok(_guard) => self.operation = Some(operation), - Err(e) => { - error!("failed to acquire lock for operation tracking: {}", e); - } - } + self.operation = Some(operation); } /// Allows recording which origins were called during a migration. pub fn invoked(&mut self, origin: Origin) { - match self.mutex.lock() { - Ok(_guard) => _ = self.invoked.insert(origin), - Err(e) => { - error!("failed to acquire lock for invocation tracking: {}", e); - } - } + self.invoked.insert(origin); } /// This method accepts a callable which should take no parameters and return a single boolean @@ -92,26 +78,14 @@ impl MigrationOpTracker { /// A callable is provided in case sampling rules do not require consistency checking to run. /// In this case, we can avoid the overhead of a function by not using the callable. pub fn consistent(&mut self, is_consistent: impl Fn() -> bool) { - match self.mutex.lock() { - Ok(_guard) => { - if ThreadRngSampler::new(thread_rng()).sample(self.consistent_ratio.unwrap_or(1)) { - self.consistent = Some(is_consistent()); - } - } - Err(e) => { - error!("failed to acquire lock for consistency tracking: {}", e); - } + if ThreadRngSampler::new(thread_rng()).sample(self.consistent_ratio.unwrap_or(1)) { + self.consistent = Some(is_consistent()); } } /// Allows recording which origins were called during a migration. pub fn error(&mut self, origin: Origin) { - match self.mutex.lock() { - Ok(_guard) => _ = self.errors.insert(origin), - Err(e) => { - error!("failed to acquire lock for invocation tracking: {}", e); - } - } + self.errors.insert(origin); } /// Allows tracking the recorded latency for an individual operation. @@ -120,23 +94,13 @@ impl MigrationOpTracker { return; } - match self.mutex.lock() { - Ok(_guard) => _ = self.latencies.insert(origin, latency), - Err(e) => { - error!("failed to acquire lock for latency tracking: {}", e); - } - } + self.latencies.insert(origin, latency); } /// Creates an instance of [crate::MigrationOpEvent]. This event data can be /// provided to the [crate::Client::track_migration_op] method to rely this metric /// information upstream to LaunchDarkly services. pub fn build(&self) -> Result { - let _guard = self - .mutex - .lock() - .map_err(|e| format!("failed to acquire lock for building event: {:?}", e))?; - let operation = self .operation .ok_or_else(|| "operation not provided".to_string())?; From 5543941627e3dc6afea861bef71385a17b32a898 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 20 Jun 2024 10:55:04 -0400 Subject: [PATCH 5/6] Return tracker as arc mutex --- launchdarkly-server-sdk/src/client.rs | 7 +++-- .../src/migrations/migrator.rs | 26 ++++++++++++------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index 779e982..5f9dfc4 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -641,7 +641,7 @@ impl Client { context: &Context, flag_key: &str, default_stage: Stage, - ) -> (Stage, MigrationOpTracker) { + ) -> (Stage, Arc>) { let (detail, flag) = self.variation_internal(context, flag_key, default_stage, &self.events_default); @@ -655,7 +655,10 @@ impl Client { default_stage, ); - (migration_detail.value.unwrap_or(default_stage), tracker) + ( + migration_detail.value.unwrap_or(default_stage), + Arc::new(Mutex::new(tracker)), + ) } /// Reports that a context has performed an event. diff --git a/launchdarkly-server-sdk/src/migrations/migrator.rs b/launchdarkly-server-sdk/src/migrations/migrator.rs index 8e49b8f..f550875 100644 --- a/launchdarkly-server-sdk/src/migrations/migrator.rs +++ b/launchdarkly-server-sdk/src/migrations/migrator.rs @@ -231,12 +231,15 @@ where default_stage: Stage, payload: P, ) -> MigrationOriginResult { - let (stage, mut tracker) = - self.client - .migration_variation(context, &flag_key, default_stage); - tracker.operation(Operation::Read); + let (stage, tracker) = self + .client + .migration_variation(context, &flag_key, default_stage); - let tracker = Arc::new(Mutex::new(tracker)); + if let Ok(mut tracker) = tracker.lock() { + tracker.operation(Operation::Read); + } else { + error!("Failed to acquire tracker lock. Cannot track migration write."); + } let mut old = Executor { origin: Origin::Old, @@ -297,12 +300,15 @@ where default_stage: Stage, payload: P, ) -> MigrationWriteResult { - let (stage, mut tracker) = - self.client - .migration_variation(context, &flag_key, default_stage); - tracker.operation(Operation::Write); + let (stage, tracker) = self + .client + .migration_variation(context, &flag_key, default_stage); - let tracker = Arc::new(Mutex::new(tracker)); + if let Ok(mut tracker) = tracker.lock() { + tracker.operation(Operation::Write); + } else { + error!("Failed to acquire tracker lock. Cannot track migration write."); + } let mut old = Executor { origin: Origin::Old, From 15bf18955070cd39057e090bfbdfd663b41f2f55 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 20 Jun 2024 11:10:01 -0400 Subject: [PATCH 6/6] write is mut like read for future proofing --- launchdarkly-server-sdk/src/client.rs | 4 ++-- launchdarkly-server-sdk/src/migrations/migrator.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index 5f9dfc4..43c36ca 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -2029,7 +2029,7 @@ mod tests { ) .expect("patch should apply"); - let migrator = MigratorBuilder::new(client.clone()) + let mut migrator = MigratorBuilder::new(client.clone()) .track_latency(true) .read( |_| async move { Ok(serde_json::Value::Null) }.boxed(), @@ -2120,7 +2120,7 @@ mod tests { ) .expect("patch should apply"); - let migrator = MigratorBuilder::new(client.clone()) + let mut migrator = MigratorBuilder::new(client.clone()) .track_latency(true) .read( |_| async move { Ok(serde_json::Value::Null) }.boxed(), diff --git a/launchdarkly-server-sdk/src/migrations/migrator.rs b/launchdarkly-server-sdk/src/migrations/migrator.rs index f550875..4e00268 100644 --- a/launchdarkly-server-sdk/src/migrations/migrator.rs +++ b/launchdarkly-server-sdk/src/migrations/migrator.rs @@ -294,7 +294,7 @@ where /// Uses the provided flag key and context to execute a migration-backed write operation. pub async fn write( - &self, + &mut self, context: &Context, flag_key: String, default_stage: Stage, @@ -600,7 +600,7 @@ mod tests { let (sender, receiver) = mpsc::channel(); let old_sender = sender.clone(); let new_sender = sender.clone(); - let migrator = MigratorBuilder::new(client) + let mut migrator = MigratorBuilder::new(client) .track_latency(false) .track_errors(false) .read( @@ -866,7 +866,7 @@ mod tests { let (sender, receiver) = mpsc::channel(); let old_sender = sender.clone(); let new_sender = sender.clone(); - let migrator = MigratorBuilder::new(client) + let mut migrator = MigratorBuilder::new(client) .track_latency(false) .track_errors(false) .read( @@ -951,7 +951,7 @@ mod tests { let (sender, receiver) = mpsc::channel(); let old_sender = sender.clone(); let new_sender = sender.clone(); - let migrator = MigratorBuilder::new(client) + let mut migrator = MigratorBuilder::new(client) .track_latency(false) .track_errors(false) .read(