Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Introduce a streaming way to encode BodyChunkIpld. Fixes #498
Browse files Browse the repository at this point in the history
* Introduce a `Scratch` trait, for `Storage` providers to provide a
  temporary read/write space that does not persist.
* Introduce streaming `BodyChunkIpld::encode` and
  `BodyChunkIpld::decode` methods.
* Streaming mechanisms store data in memory until a threshold is hit,
  which then stores to disk.
* Mark non-streaming functions `BodyChunkIpld::load_all_bytes` and
  `BodyChunkIpld::store_bytes` as deprecated.
* Remove `BodyChunkDecoder` (now implemented as `BodyChunkIpld::decode`.
* Promote `bytes` to a workspace dependency.
  • Loading branch information
jsantell committed Aug 18, 2023
1 parent de7d260 commit 615dd41
Show file tree
Hide file tree
Showing 31 changed files with 1,059 additions and 98 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ resolver = "2"
anyhow = { version = "1" }
async-stream = { version = "0.3" }
axum = { version = "^0.6.18" }
bytes = { version = "1" }
cid = { version = "0.10" }
directories = { version = "5" }
fastcdc = { version = "3.1" }
Expand All @@ -31,6 +32,7 @@ libipld = { version = "0.16" }
libipld-core = { version = "0.16" }
libipld-cbor = { version = "0.16" }
pathdiff = { version = "0.2.1" }
rand = { version = "0.8.5" }
sentry-tracing = { version = "0.31.5" }
serde = { version = "^1" }
serde_json = { version = "^1" }
Expand All @@ -42,6 +44,7 @@ tempfile = { version = "^3" }
thiserror = { version = "1" }
tokio = { version = "^1" }
tokio-stream = { version = "~0.1" }
tokio-util = { version = "0.7.7" }
tower = { version = "^0.4.13" }
tower-http = { version = "^0.4.3" }
tracing = { version = "0.1" }
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ noosphere-storage = { version = "0.8.1", path = "../noosphere-storage" }
iroh-car = { workspace = true }
reqwest = { version = "0.11.15", default-features = false, features = ["json", "rustls-tls", "stream"] }
tokio-stream = { workspace = true }
tokio-util = "0.7.7"
tokio-util = { workspace = true }

ucan = { workspace = true }
ucan-key-support = { workspace = true }
Expand Down
10 changes: 6 additions & 4 deletions rust/noosphere-cli/src/native/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result};
use cid::Cid;
use globset::{Glob, GlobSet, GlobSetBuilder};
use noosphere_core::data::{BodyChunkIpld, ContentType};
use noosphere_storage::{BlockStore, MemoryStore};
use noosphere_storage::{BlockStore, MemoryStore, Scratch};
use pathdiff::diff_paths;
use std::collections::{BTreeMap, BTreeSet};
use subtext::util::to_slug;
Expand Down Expand Up @@ -84,7 +84,10 @@ impl Content {
/// provided store.
// TODO(#556): This is slow; we could probably do a concurrent traversal
// similar to how we traverse when rendering files to disk
pub async fn read_all<S: BlockStore>(paths: &SpherePaths, store: &mut S) -> Result<Content> {
pub async fn read_all<S: BlockStore + Scratch>(
paths: &SpherePaths,
store: &mut S,
) -> Result<Content> {
let root_path = paths.root();
let mut directories = vec![(None, tokio::fs::read_dir(root_path).await?)];

Expand Down Expand Up @@ -144,7 +147,7 @@ impl Content {
};

let file_bytes = fs::read(path).await?;
let body_cid = BodyChunkIpld::store_bytes(&file_bytes, store).await?;
let body_cid = BodyChunkIpld::encode(file_bytes.as_ref(), store, None).await?;

content.matched.insert(
slug,
Expand Down Expand Up @@ -172,7 +175,6 @@ impl Content {
let mut new_blocks = MemoryStore::default();
let file_content =
Content::read_all(workspace.require_sphere_paths()?, &mut new_blocks).await?;

let sphere_context = workspace.sphere_context().await?;
let walker = SphereWalker::from(&sphere_context);

Expand Down
15 changes: 13 additions & 2 deletions rust/noosphere-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ sentry = ["dep:sentry-tracing"]
helpers = []

[dependencies]
bytes = { workspace = true }
tempfile = { workspace = true }
tracing = { workspace = true }
cid = { workspace = true }
url = { workspace = true }
Expand All @@ -34,14 +36,14 @@ async-stream = { workspace = true }
async-once-cell = "~0.4"
anyhow = { workspace = true }
thiserror = { workspace = true }
fastcdc = { workspace = true }
fastcdc = { workspace = true, features = ["tokio"] }
futures = "~0.3"
serde = { workspace = true }
serde_json = { workspace = true }
byteorder = "^1.4"
base64 = "0.21"
ed25519-zebra = "^3"
rand = "~0.8"
rand = { workspace = true }
once_cell = "^1"
tiny-bip39 = "^1"
tokio-stream = { workspace = true }
Expand All @@ -60,6 +62,7 @@ sentry-tracing = { workspace = true, optional = true }
[dev-dependencies]
wasm-bindgen-test = { workspace = true }
serde_bytes = "~0.11"
tokio-util = { workspace = true, features = ["io-util"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["full"] }
Expand All @@ -71,3 +74,11 @@ getrandom = { version = "~0.2", features = ["js"] }
tokio = { workspace = true, features = ["sync", "macros"] }
console_error_panic_hook = "0.1"
tracing-wasm = "~0.2"

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen = { workspace = true }
web-time = { version = "0.2.0" }

[[example]]
name = "encoding"
path = "examples/encoding.rs"
187 changes: 187 additions & 0 deletions rust/noosphere-core/examples/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
//! WIP on cross-platform benchmarking our encoder.
//!
//! wasm32 builds use `wasm_bindgen_test` runs as if
//! it were running tests, hence the `wasm_bindgen_test`
//! attribute on functions. Native builds run as expected.
use async_stream::try_stream;
use bytes::Bytes;
use cid::Cid;
use noosphere_core::data::{BodyChunkIpld, BufferStrategy};
use noosphere_core::tracing::initialize_tracing;
use noosphere_storage::{helpers::make_disposable_storage, SphereDb, Storage};
use std::collections::HashMap;
use tokio::{self, io::AsyncRead};
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;

#[cfg(target_arch = "wasm32")]
use web_time::Instant;

#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;

#[derive(PartialEq, Debug)]
enum BenchmarkPosition {
Start,
End,
}

/// Simple timer util to record duration of processing.
/// Does not support nested, overlapping, or duplicate time ranges.
struct EncodingBenchmark {
name: String,
timestamps: Vec<(BenchmarkPosition, String, Instant)>,
}

impl EncodingBenchmark {
pub fn new(name: &str) -> Self {
EncodingBenchmark {
name: name.to_owned(),
timestamps: vec![],
}
}

pub fn name(&self) -> &str {
&self.name
}

pub fn start(&mut self, name: &str) {
self.timestamps
.push((BenchmarkPosition::Start, name.to_owned(), Instant::now()))
}

pub fn end(&mut self, name: &str) {
self.timestamps
.push((BenchmarkPosition::End, name.to_owned(), Instant::now()))
}

pub fn results(&self) -> anyhow::Result<HashMap<String, String>> {
let mut current: Option<&(BenchmarkPosition, String, Instant)> = None;
let mut results = HashMap::default();
for timestamp in self.timestamps.iter() {
if let Some(current_timestamp) = current {
assert!(timestamp.0 == BenchmarkPosition::End);
assert_eq!(timestamp.1, current_timestamp.1);
let duration = current_timestamp.2.elapsed().as_millis();
if results
.insert(timestamp.1.to_owned(), format!("{}ms", duration))
.is_some()
{
return Err(anyhow::anyhow!("Duplicate entry for {}", timestamp.1));
}
current = None;
} else {
assert!(timestamp.0 == BenchmarkPosition::Start);
current = Some(timestamp);
}
}
Ok(results)
}
}

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test;
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[cfg(not(target_arch = "wasm32"))]
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
initialize_tracing(None);
bench_100_x_1kb().await?;
bench_500_x_2kb().await?;
bench_4_x_256kb().await?;
bench_10_x_1mb().await?;
bench_10000_x_1kb().await?;
Ok(())
}

#[cfg(target_arch = "wasm32")]
pub fn main() {
initialize_tracing(None);
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
async fn bench_100_x_1kb() -> anyhow::Result<()> {
run_bench("100 x 1kb", 1024, 100, 0).await
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
async fn bench_500_x_2kb() -> anyhow::Result<()> {
run_bench("500 x 2kb", 1024 * 2, 500, 0).await
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
async fn bench_4_x_256kb() -> anyhow::Result<()> {
run_bench("4 x 256kb", 1024 * 256, 4, 0).await
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
async fn bench_10_x_1mb() -> anyhow::Result<()> {
run_bench("10 x 1mb", 1024 * 1024, 10, 0).await
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
async fn bench_10000_x_1kb() -> anyhow::Result<()> {
run_bench("10^3 x 1kb", 1024, 1024 * 10, 0).await
}

async fn run_bench(
name: &str,
chunk_size: u32,
chunk_count: usize,
memory_limit: u64,
) -> anyhow::Result<()> {
let mut bench = EncodingBenchmark::new(name);
let provider = make_disposable_storage().await?;
let db = SphereDb::new(&provider).await?;
let total_size = chunk_size * <usize as TryInto<u32>>::try_into(chunk_count).unwrap();
assert!(total_size as u64 > memory_limit);

let stream = make_stream(chunk_size, chunk_count);
let reader = StreamReader::new(stream);
bench.start("encode");
let cid = encode_stream(reader, &db, memory_limit).await?;
bench.end("encode");
bench.start("decode");
let bytes_read = decode_stream(&cid, &db).await?;
bench.end("decode");

assert_eq!(bytes_read, total_size);

tracing::info!("{}: {:#?}", bench.name(), bench.results());
Ok(())
}

fn make_stream<'a>(
chunk_size: u32,
chunk_count: usize,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Unpin + 'a {
Box::pin(try_stream! {
for n in 1..=chunk_count {
let chunk: Vec<u8> = vec![n as u8; <u32 as TryInto<usize>>::try_into(chunk_size).unwrap()];
yield Bytes::from(chunk);
}
})
}

async fn encode_stream<S, R>(content: R, db: &SphereDb<S>, memory_limit: u64) -> anyhow::Result<Cid>
where
R: AsyncRead + Unpin,
S: Storage,
{
BodyChunkIpld::encode(content, db, Some(BufferStrategy::Limit(memory_limit))).await
}

async fn decode_stream<S>(cid: &Cid, db: &SphereDb<S>) -> anyhow::Result<u32>
where
S: Storage,
{
let stream = BodyChunkIpld::decode(cid, db);
tokio::pin!(stream);
let mut bytes_read: u32 = 0;
while let Some(chunk) = stream.try_next().await? {
bytes_read += chunk.len() as u32;
}
Ok(bytes_read)
}
Loading

0 comments on commit 615dd41

Please sign in to comment.