From 1f31e1f15fdab91991f82f11ae6822510d5204c7 Mon Sep 17 00:00:00 2001 From: Mark Rattle Date: Wed, 23 Oct 2024 12:04:17 -0400 Subject: [PATCH] fix quiet mode parsing for meta_set --- src/lib.rs | 48 ++++-- src/parser/meta.rs | 21 ++- tests/integration_tests.rs | 302 ++++++++++++++++++------------------- 3 files changed, 208 insertions(+), 163 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fed758c..7ac8be8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -358,8 +358,9 @@ impl Client { K: AsRef<[u8]>, V: AsMemcachedValue, { - let kr = key.as_ref(); + let kr = Self::validate_key_length(key.as_ref())?; let vr = value.as_bytes(); + let mut quiet_mode = false; self.conn.write_all(b"ms ").await?; self.conn.write_all(kr).await?; @@ -371,23 +372,48 @@ impl Client { if let Some(meta_flags) = meta_flags { self.conn.write_all(b" ").await?; self.conn.write_all(meta_flags.join(" ").as_bytes()).await?; + if meta_flags.contains(&"q") { + quiet_mode = true; + } } self.conn.write_all(b"\r\n").await?; self.conn.write_all(vr.as_ref()).await?; self.conn.write_all(b"\r\n").await?; + + if quiet_mode { + self.conn.write_all(b"mn\r\n").await?; + } + self.conn.flush().await?; - match self.drive_receive(parse_meta_set_response).await? { - Response::Status(Status::Stored) => Ok(None), - Response::Status(s) => Err(s.into()), - Response::Data(d) => d - .map(|mut items| { - let item = items.remove(0); - Ok(item) - }) - .transpose(), - _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), + if quiet_mode { + match self.drive_receive(parse_meta_set_response).await? { + Response::Status(Status::NoOp) => { + println!("got Status:: NoOp"); + Ok(None) + } + Response::Status(s) => Err(s.into()), + Response::Data(d) => d + .map(|mut items| { + let item = items.remove(0); + Ok(item) + }) + .transpose(), + _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), + } + } else { + match self.drive_receive(parse_meta_set_response).await? { + Response::Status(Status::Stored) => Ok(None), + Response::Status(s) => Err(s.into()), + Response::Data(d) => d + .map(|mut items| { + let item = items.remove(0); + Ok(item) + }) + .transpose(), + _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), + } } } diff --git a/src/parser/meta.rs b/src/parser/meta.rs index 0fb7fd5..a4eb3bb 100644 --- a/src/parser/meta.rs +++ b/src/parser/meta.rs @@ -152,14 +152,20 @@ fn parse_meta_get_data_value(buf: &[u8]) -> IResult<&[u8], Response> { } fn parse_meta_set_data_value(buf: &[u8]) -> IResult<&[u8], Response> { + println!("buf: {:?}", std::str::from_utf8(buf).unwrap()); + println!("this is triggering before status parsing"); let (input, status) = parse_meta_set_status(buf)?; + println!("this is triggering after status parsing"); match status { // match arm for "HD" response Response::Status(Status::Stored) => { + println!("Status:: Stored this is triggering before meta flag parsing"); // no value (data block) or size in this case, potentially just flags let (input, meta_values_array) = parse_meta_flag_values_as_slice(input) .map_err(|_| nom::Err::Failure(nom::error::Error::new(buf, Fail)))?; + println!("this is triggering before crlf parsing"); let (input, _) = crlf(input)?; // consume the trailing crlf and leave the buffer empty + println!("this is triggering after crlf parsing"); // early return if there were no flags passed in if meta_values_array.is_empty() { @@ -176,11 +182,13 @@ fn parse_meta_set_data_value(buf: &[u8]) -> IResult<&[u8], Response> { } // match arm for "NS" response Response::Status(Status::NotStored) => { + println!("Status:: NotStored this is triggering before meta flag parsing"); // no value (data block) or size in this case, potentially just flags let (input, meta_values_array) = parse_meta_flag_values_as_slice(input) .map_err(|_| nom::Err::Failure(nom::error::Error::new(buf, Fail)))?; + println!("this is triggering before crlf parsing"); let (input, _) = crlf(input)?; // consume the trailing crlf and leave the buffer empty - + println!("this is triggering after crlf parsing"); // early return if there were no flags passed in if meta_values_array.is_empty() { return Ok((input, Response::Status(Status::NotStored))); @@ -196,9 +204,12 @@ fn parse_meta_set_data_value(buf: &[u8]) -> IResult<&[u8], Response> { } // match arm for "EX" response Response::Status(Status::Exists) => { + println!("Status:: Exists this is triggering before meta flag parsing"); // no value (data block) or size in this case, potentially just flags let (input, meta_values_array) = parse_meta_flag_values_as_slice(input)?; + println!("this is triggering before crlf parsing"); let (input, _) = crlf(input)?; // consume the trailing crlf and leave the buffer empty + println!("this is triggering after crlf parsing"); // early return if there were no flags passed in if meta_values_array.is_empty() { @@ -215,9 +226,12 @@ fn parse_meta_set_data_value(buf: &[u8]) -> IResult<&[u8], Response> { } // match arm for "NF" response Response::Status(Status::NotFound) => { + println!("Status:: NotFound this is triggering before meta flag parsing"); // no value (data block) or size in this case, potentially just flags let (input, meta_values_array) = parse_meta_flag_values_as_slice(input)?; + println!("this is triggering before crlf parsing"); let (input, _) = crlf(input)?; // consume the trailing crlf and leave the buffer empty + println!("this is triggering after crlf parsing"); // early return if there were no flags passed in if meta_values_array.is_empty() { @@ -232,6 +246,11 @@ fn parse_meta_set_data_value(buf: &[u8]) -> IResult<&[u8], Response> { Ok((input, Response::Data(Some(vec![value])))) } + Response::Status(Status::NoOp) => { + println!("Status:: NoOp"); + println!("input: {:?}", std::str::from_utf8(input).unwrap()); + Ok((input, Response::Status(Status::NoOp))) + } _ => Err(nom::Err::Error(nom::error::Error::new( input, nom::error::ErrorKind::Eof, diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 1186869..21b150e 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1275,157 +1275,157 @@ async fn test_meta_set_nonexistent_key_in_replace_mode() { ); } -// #[ignore = "Relies on a running memcached server"] -// #[tokio::test] -// #[parallel] -// async fn test_quiet_mode_meta_set() { -// // NOTE: This test should hang for now. -// let key = "quiet-mode-meta-set-test-key"; -// let value = "test-value"; - -// let mut client = setup_client(&[key]).await; - -// let flags = ["k", "q"]; - -// let result = client.meta_set(key, value, Some(&flags)).await; - -// assert!(result.is_ok()); -// } - -// #[ignore = "Relies on a running memcached server"] -// #[tokio::test] -// #[parallel] -// async fn test_quiet_mode_meta_set_with_cas_match_on_key_that_exists() { -// // TODO: This test should hang for now. -// let key = "quiet-mode-meta-set-cas-match-on-key-that-exists-test-key"; -// let value = "test-value"; -// let mut client = setup_client(&[key]).await; - -// // Set the key using meta_set to prepopulate, use E flag to set CAS value -// let meta_flags = ["MS", "E12321", "q"]; -// let set_result = client.meta_set(key, value, Some(&meta_flags)).await; -// assert!( -// set_result.is_ok(), -// "Failed to set key using meta_set: {:?}", -// set_result -// ); - -// let get_flags = ["v", "c"]; -// let get_result = client.meta_get(key, Some(&get_flags)).await.unwrap(); -// assert!(get_result.is_some(), "Key not found after meta_set"); - -// let get_value = get_result.unwrap(); - -// assert_eq!( -// std::str::from_utf8(&get_value.data.unwrap()).unwrap(), -// value -// ); -// assert_eq!(get_value.cas.unwrap(), 12321); - -// // Set the key again using the force-set CAS value via C flag -// let meta_flags = ["C12321", "q"]; -// let new_value = "new-value"; - -// let set_result = client.meta_set(key, new_value, Some(&meta_flags)).await; -// assert!( -// set_result.is_ok(), -// "meta_set should set a new value when C flag token matches existing CAS value" -// ); - -// let get_flags = ["v", "c"]; -// let get_result = client.meta_get(key, Some(&get_flags)).await.unwrap(); -// assert!(get_result.is_some(), "Key not found after meta_set"); - -// let get_value = get_result.unwrap(); -// assert_eq!( -// std::str::from_utf8(&get_value.data.unwrap()).unwrap(), -// new_value -// ); -// // CAS value should be reset to a new atomic counter value -// assert!(get_value.cas.unwrap() != 12321); -// } - -// #[ignore = "Relies on a running memcached server"] -// #[tokio::test] -// #[parallel] -// async fn test_quiet_mode_meta_set_with_cas_semantics_on_nonexistent_key() { -// // NOTE: This test should proceed as normal because an error is returned. -// let key = "quiet-mode-meta-set-cas-semantics-on-nonexistent-key-test-key"; -// let value = "test-value"; -// let mut client = setup_client(&[key]).await; - -// let meta_flags = ["C12321", "q"]; - -// let set_result = client.meta_set(key, value, Some(&meta_flags)).await; -// assert!( -// set_result.is_err(), -// "meta_set should have returned an error for a nonexistent key with CAS semantics" -// ); -// } - -// #[ignore = "Relies on a running memcached server"] -// #[tokio::test] -// #[parallel] -// async fn test_quiet_mode_meta_set_with_cas_mismatch_on_key_that_exists() { -// // TODO: This test proceeds as normal because an error is returned. -// let key = "quiet-mode-meta-set-cas-mismatch-on-key-that-exists-test-key"; -// let value = "test-value"; -// let mut client = setup_client(&[key]).await; - -// // Set the key using meta_set to prepopulate, use E flag to set CAS value -// let meta_flags = ["MS", "E99999"]; -// let set_result = client.meta_set(key, value, Some(&meta_flags)).await; -// assert!( -// set_result.is_ok(), -// "Failed to set key using meta_set: {:?}", -// set_result -// ); - -// let get_flags = ["v", "c"]; -// let get_result = client.meta_get(key, Some(&get_flags)).await.unwrap(); -// assert!(get_result.is_some(), "Key not found after meta_set"); - -// let get_value = get_result.unwrap(); - -// assert_eq!( -// std::str::from_utf8(&get_value.data.unwrap()).unwrap(), -// value -// ); -// assert_eq!(get_value.cas.unwrap(), 99999); - -// // Try to set the key again with a CAS value that doesn't match -// let meta_flags = ["C12321", "q"]; -// let new_value = "new-value"; - -// let set_result = client.meta_set(key, new_value, Some(&meta_flags)).await; -// assert!( -// set_result.is_err(), -// "meta_set should err when C token doesn't match existing CAS value" -// ); -// } - -// #[ignore = "Relies on a running memcached server"] -// #[tokio::test] -// #[parallel] -// async fn test_quiet_mode_meta_set_nonexistent_key_in_replace_mode() { -// // NOTE: This test proceeds as normal because an error is returned. -// let key = "quiet-mode-meta-set-replace-non-existent-key"; -// let original_value = "test-value"; - -// let mut client = setup_client(&[key]).await; - -// let meta_flags = ["MR", "F24", "q"]; - -// // Set the key using meta_set to pre-populate (in replace mode) -// let set_result = client -// .meta_set(key, original_value, Some(&meta_flags)) -// .await; -// assert!( -// set_result.is_err(), -// "Should have received Err(Protocol(NotStored)) but got: {:?}", -// set_result -// ); -// } +#[ignore = "Relies on a running memcached server"] +#[tokio::test] +#[parallel] +async fn test_quiet_mode_meta_set() { + // TODO: This test should hang for now. + let key = "quiet-mode-meta-set-test-key"; + let value = "test-value"; + + let mut client = setup_client(&[key]).await; + + let flags = ["k", "q"]; + + let result = client.meta_set(key, value, Some(&flags)).await; + + assert!(result.is_ok()); +} + +#[ignore = "Relies on a running memcached server"] +#[tokio::test] +#[parallel] +async fn test_quiet_mode_meta_set_with_cas_match_on_key_that_exists() { + // TODO: This test should hang for now. + let key = "quiet-mode-meta-set-cas-match-on-key-that-exists-test-key"; + let value = "test-value"; + let mut client = setup_client(&[key]).await; + + // Set the key using meta_set to prepopulate, use E flag to set CAS value + let meta_flags = ["MS", "E12321", "q"]; + let set_result = client.meta_set(key, value, Some(&meta_flags)).await; + assert!( + set_result.is_ok(), + "Failed to set key using meta_set: {:?}", + set_result + ); + + let get_flags = ["v", "c"]; + let get_result = client.meta_get(key, Some(&get_flags)).await.unwrap(); + assert!(get_result.is_some(), "Key not found after meta_set"); + + let get_value = get_result.unwrap(); + + assert_eq!( + std::str::from_utf8(&get_value.data.unwrap()).unwrap(), + value + ); + assert_eq!(get_value.cas.unwrap(), 12321); + + // Set the key again using the force-set CAS value via C flag + let meta_flags = ["C12321", "q"]; + let new_value = "new-value"; + + let set_result = client.meta_set(key, new_value, Some(&meta_flags)).await; + assert!( + set_result.is_ok(), + "meta_set should set a new value when C flag token matches existing CAS value" + ); + + let get_flags = ["v", "c"]; + let get_result = client.meta_get(key, Some(&get_flags)).await.unwrap(); + assert!(get_result.is_some(), "Key not found after meta_set"); + + let get_value = get_result.unwrap(); + assert_eq!( + std::str::from_utf8(&get_value.data.unwrap()).unwrap(), + new_value + ); + // CAS value should be reset to a new atomic counter value + assert!(get_value.cas.unwrap() != 12321); +} + +#[ignore = "Relies on a running memcached server"] +#[tokio::test] +#[parallel] +async fn test_quiet_mode_meta_set_with_cas_semantics_on_nonexistent_key() { + // NOTE: This test should proceed as normal because an error is returned. + let key = "quiet-mode-meta-set-cas-semantics-on-nonexistent-key-test-key"; + let value = "test-value"; + let mut client = setup_client(&[key]).await; + + let meta_flags = ["C12321", "q"]; + + let set_result = client.meta_set(key, value, Some(&meta_flags)).await; + assert!( + set_result.is_err(), + "meta_set should have returned an error for a nonexistent key with CAS semantics" + ); +} + +#[ignore = "Relies on a running memcached server"] +#[tokio::test] +#[parallel] +async fn test_quiet_mode_meta_set_with_cas_mismatch_on_key_that_exists() { + // TODO: This test proceeds as normal because an error is returned. + let key = "quiet-mode-meta-set-cas-mismatch-on-key-that-exists-test-key"; + let value = "test-value"; + let mut client = setup_client(&[key]).await; + + // Set the key using meta_set to prepopulate, use E flag to set CAS value + let meta_flags = ["MS", "E99999"]; + let set_result = client.meta_set(key, value, Some(&meta_flags)).await; + assert!( + set_result.is_ok(), + "Failed to set key using meta_set: {:?}", + set_result + ); + + let get_flags = ["v", "c"]; + let get_result = client.meta_get(key, Some(&get_flags)).await.unwrap(); + assert!(get_result.is_some(), "Key not found after meta_set"); + + let get_value = get_result.unwrap(); + + assert_eq!( + std::str::from_utf8(&get_value.data.unwrap()).unwrap(), + value + ); + assert_eq!(get_value.cas.unwrap(), 99999); + + // Try to set the key again with a CAS value that doesn't match + let meta_flags = ["C12321", "q"]; + let new_value = "new-value"; + + let set_result = client.meta_set(key, new_value, Some(&meta_flags)).await; + assert!( + set_result.is_err(), + "meta_set should err when C token doesn't match existing CAS value" + ); +} + +#[ignore = "Relies on a running memcached server"] +#[tokio::test] +#[parallel] +async fn test_quiet_mode_meta_set_nonexistent_key_in_replace_mode() { + // NOTE: This test proceeds as normal because an error is returned. + let key = "quiet-mode-meta-set-replace-non-existent-key"; + let original_value = "test-value"; + + let mut client = setup_client(&[key]).await; + + let meta_flags = ["MR", "F24", "q"]; + + // Set the key using meta_set to pre-populate (in replace mode) + let set_result = client + .meta_set(key, original_value, Some(&meta_flags)) + .await; + assert!( + set_result.is_err(), + "Should have received Err(Protocol(NotStored)) but got: {:?}", + set_result + ); +} #[ignore = "Relies on a running memcached server"] #[tokio::test]