Skip to content

Commit

Permalink
Merge pull request #66 from HerodotusDev/batching-added-to-sqlite-que…
Browse files Browse the repository at this point in the history
…ries

Batching added to sqlite queries
  • Loading branch information
maaasyn authored Oct 30, 2024
2 parents 0df2e5e + 5c0fe03 commit c82ce0f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "accumulators"
version = "0.4.8"
version = "0.4.9"
edition = "2021"
license-file = "LICENSE"
description = "Complete package of multiple Accumulators with Stores and hashing functions (Hashers)"
Expand Down
74 changes: 51 additions & 23 deletions src/store/stores/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use sqlx::Error;
use sqlx::{sqlite::SqliteConnectOptions, Pool, Row, Sqlite, SqlitePool};
use std::cmp::min;
use std::collections::HashMap;
use tokio::sync::Mutex;

Expand All @@ -15,6 +16,10 @@ pub struct SQLiteStore {
db: Mutex<Pool<Sqlite>>,
}

//? SQLite's default maximum number of variables per statement is 999.
//? We use a smaller number to be safe.
const MAX_VARIABLE_NUMBER: usize = 900;

impl SQLiteStore {
pub async fn new(
path: &str,
Expand Down Expand Up @@ -77,23 +82,35 @@ impl Store for SQLiteStore {

async fn get_many(&self, keys: Vec<&str>) -> Result<HashMap<String, String>, StoreError> {
let pool = self.db.lock().await;
let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let query_statement = format!(
"SELECT key, value FROM store WHERE key IN ({})",
placeholders
);
let mut query = sqlx::query(&query_statement);

for key in &keys {
query = query.bind(key);
}

let rows = query.fetch_all(&*pool).await?;
let mut map = HashMap::new();
for row in rows {
let key: String = row.get("key");
let value: String = row.get("value");
map.insert(key, value);

let total_keys = keys.len();
let mut offset = 0;

while offset < total_keys {
let end = min(offset + MAX_VARIABLE_NUMBER, total_keys);
let key_slice = &keys[offset..end];

let placeholders = key_slice.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let query_statement = format!(
"SELECT key, value FROM store WHERE key IN ({})",
placeholders
);

let mut query = sqlx::query(&query_statement);

for key in key_slice {
query = query.bind(*key);
}

let rows = query.fetch_all(&*pool).await?;
for row in rows {
let key: String = row.get("key");
let value: String = row.get("value");
map.insert(key, value);
}

offset = end;
}

Ok(map)
Expand Down Expand Up @@ -138,16 +155,27 @@ impl Store for SQLiteStore {

async fn delete_many(&self, keys: Vec<&str>) -> Result<(), StoreError> {
let pool = self.db.lock().await;
let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");

let query_statement = format!("DELETE FROM store WHERE key IN ({})", placeholders);
let mut query = sqlx::query(&query_statement);
let total_keys = keys.len();
let mut offset = 0;

for key in &keys {
query = query.bind(key);
}
while offset < total_keys {
let end = min(offset + MAX_VARIABLE_NUMBER, total_keys);
let key_slice = &keys[offset..end];

let placeholders = key_slice.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let query_statement = format!("DELETE FROM store WHERE key IN ({})", placeholders);

query.execute(&*pool).await?;
let mut query = sqlx::query(&query_statement);

for key in key_slice {
query = query.bind(*key);
}

query.execute(&*pool).await?;

offset = end;
}

Ok(())
}
Expand Down

0 comments on commit c82ce0f

Please sign in to comment.