Skip to content

Commit

Permalink
Merge pull request #68 from HerodotusDev/develop
Browse files Browse the repository at this point in the history
v0.4.9 - SQLiteStore huge queries support
  • Loading branch information
beeinger authored Oct 30, 2024
2 parents 4d6ca22 + 5a7ceea commit da7a79c
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "accumulators"
version = "0.4.8"
version = "0.4.9"
edition = "2021"
license-file = "LICENSE"
description = "Complete package of multiple Accumulators with Stores and hashing functions (Hashers)"
Expand Down
83 changes: 53 additions & 30 deletions src/store/stores/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub struct SQLiteStore {
db: Mutex<Pool<Sqlite>>,
}

//? SQLite's default maximum number of variables per statement is 999.
//? We use a smaller number to be safe.
const MAX_VARIABLE_NUMBER: usize = 900;

impl SQLiteStore {
pub async fn new(
path: &str,
Expand Down Expand Up @@ -66,7 +70,6 @@ impl Store for SQLiteStore {
.fetch_optional(&*pool)
.await?;

// Extract the value from the row, if it exists
if let Some(row) = row {
let value: String = row.try_get("value")?;
Ok(Some(value))
Expand All @@ -77,23 +80,27 @@ impl Store for SQLiteStore {

async fn get_many(&self, keys: Vec<&str>) -> Result<HashMap<String, String>, StoreError> {
let pool = self.db.lock().await;
let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let query_statement = format!(
"SELECT key, value FROM store WHERE key IN ({})",
placeholders
);
let mut query = sqlx::query(&query_statement);

for key in &keys {
query = query.bind(key);
}

let rows = query.fetch_all(&*pool).await?;
let mut map = HashMap::new();
for row in rows {
let key: String = row.get("key");
let value: String = row.get("value");
map.insert(key, value);

for key_chunk in keys.chunks(MAX_VARIABLE_NUMBER) {
let placeholders = key_chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let query_statement = format!(
"SELECT key, value FROM store WHERE key IN ({})",
placeholders
);

let mut query = sqlx::query(&query_statement);

for key in key_chunk {
query = query.bind(*key);
}

let rows = query.fetch_all(&*pool).await?;
for row in rows {
let key: String = row.get("key");
let value: String = row.get("value");
map.insert(key, value);
}
}

Ok(map)
Expand All @@ -114,12 +121,25 @@ impl Store for SQLiteStore {
let pool = self.db.lock().await;
let mut transaction = pool.begin().await?;

for (key, value) in entries.iter() {
sqlx::query("INSERT OR REPLACE INTO store (key, value) VALUES (?, ?)")
.bind(key)
.bind(value)
.execute(&mut *transaction)
.await?;
for entry_chunk in entries
.iter()
.collect::<Vec<_>>()
.chunks(MAX_VARIABLE_NUMBER)
{
let mut query = String::from("INSERT OR REPLACE INTO store (key, value) VALUES ");
let placeholders = entry_chunk
.iter()
.map(|_| "(?, ?)")
.collect::<Vec<_>>()
.join(", ");
query.push_str(&placeholders);

let mut sqlx_query = sqlx::query(&query);
for (key, value) in entry_chunk {
sqlx_query = sqlx_query.bind(key).bind(value);
}

sqlx_query.execute(&mut *transaction).await?;
}

transaction.commit().await?;
Expand All @@ -138,16 +158,19 @@ impl Store for SQLiteStore {

async fn delete_many(&self, keys: Vec<&str>) -> Result<(), StoreError> {
let pool = self.db.lock().await;
let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");

let query_statement = format!("DELETE FROM store WHERE key IN ({})", placeholders);
let mut query = sqlx::query(&query_statement);
for key_chunk in keys.chunks(MAX_VARIABLE_NUMBER) {
let placeholders = key_chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let query_statement = format!("DELETE FROM store WHERE key IN ({})", placeholders);

for key in &keys {
query = query.bind(key);
}
let mut query = sqlx::query(&query_statement);

for key in key_chunk {
query = query.bind(*key);
}

query.execute(&*pool).await?;
query.execute(&*pool).await?;
}

Ok(())
}
Expand Down
22 changes: 22 additions & 0 deletions tests/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,25 @@ async fn test_get_some_in_store_table() {
.unwrap();
assert_eq!(value.unwrap(), "value1".to_string());
}

#[tokio::test]
async fn test_batch_insertion_and_retrieval() {
let store = InMemoryStore::default();
let store = Arc::new(store);

let mut entries = HashMap::new();
for i in 0..10_000 {
entries.insert(format!("key{}", i), format!("value{}", i));
}

store.set_many(entries.clone()).await.unwrap();

let keys: Vec<_> = entries.keys().map(|k| k.as_str()).collect();
let values = store.get_many(keys).await.unwrap();

for i in 0..10_000 {
let key = format!("key{}", i);
let value = format!("value{}", i);
assert_eq!(values.get(&key), Some(&value));
}
}
25 changes: 25 additions & 0 deletions tests/store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,28 @@ async fn test_get_some_in_store_table() {
.unwrap();
assert_eq!(value.unwrap(), "value1".to_string());
}

#[tokio::test]
async fn test_batch_insertion_and_retrieval() {
let store = SQLiteStore::new(":memory:", None, Some("test"))
.await
.unwrap();

let store = Arc::new(store);

let mut entries = HashMap::new();
for i in 0..10_000 {
entries.insert(format!("key{}", i), format!("value{}", i));
}

store.set_many(entries.clone()).await.unwrap();

let keys: Vec<_> = entries.keys().map(|k| k.as_str()).collect();
let values = store.get_many(keys).await.unwrap();

for i in 0..10_000 {
let key = format!("key{}", i);
let value = format!("value{}", i);
assert_eq!(values.get(&key), Some(&value));
}
}

0 comments on commit da7a79c

Please sign in to comment.