Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Introduce a streaming way to encode BodyChunkIpld. Fixes #498
Browse files Browse the repository at this point in the history
* Introduce a `Scratch` trait, for `Storage` providers to provide a
  temporary read/write space that does not persist.
* Introduce streaming `BodyChunkIpld::encode` and
  `BodyChunkIpld::decode` methods.
* Streaming mechanisms store data in memory until a threshold is hit,
  which then stores to disk.
* Mark non-streaming functions `BodyChunkIpld::load_all_bytes` and
  `BodyChunkIpld::store_bytes` as deprecated.
* Remove `BodyChunkDecoder` (now implemented as `BodyChunkIpld::decode`.
* Promote `bytes` to a workspace dependency.
  • Loading branch information
jsantell committed Aug 15, 2023
1 parent de7d260 commit 7c557f2
Show file tree
Hide file tree
Showing 29 changed files with 873 additions and 94 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ resolver = "2"
anyhow = { version = "1" }
async-stream = { version = "0.3" }
axum = { version = "^0.6.18" }
bytes = { version = "1" }
cid = { version = "0.10" }
directories = { version = "5" }
fastcdc = { version = "3.1" }
Expand All @@ -31,6 +32,7 @@ libipld = { version = "0.16" }
libipld-core = { version = "0.16" }
libipld-cbor = { version = "0.16" }
pathdiff = { version = "0.2.1" }
rand = { version = "0.8.5" }
sentry-tracing = { version = "0.31.5" }
serde = { version = "^1" }
serde_json = { version = "^1" }
Expand Down
10 changes: 6 additions & 4 deletions rust/noosphere-cli/src/native/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result};
use cid::Cid;
use globset::{Glob, GlobSet, GlobSetBuilder};
use noosphere_core::data::{BodyChunkIpld, ContentType};
use noosphere_storage::{BlockStore, MemoryStore};
use noosphere_storage::{BlockStore, MemoryStore, Scratch};
use pathdiff::diff_paths;
use std::collections::{BTreeMap, BTreeSet};
use subtext::util::to_slug;
Expand Down Expand Up @@ -84,7 +84,10 @@ impl Content {
/// provided store.
// TODO(#556): This is slow; we could probably do a concurrent traversal
// similar to how we traverse when rendering files to disk
pub async fn read_all<S: BlockStore>(paths: &SpherePaths, store: &mut S) -> Result<Content> {
pub async fn read_all<S: BlockStore + Scratch>(
paths: &SpherePaths,
store: &mut S,
) -> Result<Content> {
let root_path = paths.root();
let mut directories = vec![(None, tokio::fs::read_dir(root_path).await?)];

Expand Down Expand Up @@ -144,7 +147,7 @@ impl Content {
};

let file_bytes = fs::read(path).await?;
let body_cid = BodyChunkIpld::store_bytes(&file_bytes, store).await?;
let body_cid = BodyChunkIpld::encode(file_bytes.as_ref(), store).await?;

content.matched.insert(
slug,
Expand Down Expand Up @@ -172,7 +175,6 @@ impl Content {
let mut new_blocks = MemoryStore::default();
let file_content =
Content::read_all(workspace.require_sphere_paths()?, &mut new_blocks).await?;

let sphere_context = workspace.sphere_context().await?;
let walker = SphereWalker::from(&sphere_context);

Expand Down
6 changes: 4 additions & 2 deletions rust/noosphere-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ sentry = ["dep:sentry-tracing"]
helpers = []

[dependencies]
bytes = { workspace = true }
tempfile = { workspace = true }
tracing = { workspace = true }
cid = { workspace = true }
url = { workspace = true }
Expand All @@ -34,14 +36,14 @@ async-stream = { workspace = true }
async-once-cell = "~0.4"
anyhow = { workspace = true }
thiserror = { workspace = true }
fastcdc = { workspace = true }
fastcdc = { workspace = true, features = ["tokio"] }
futures = "~0.3"
serde = { workspace = true }
serde_json = { workspace = true }
byteorder = "^1.4"
base64 = "0.21"
ed25519-zebra = "^3"
rand = "~0.8"
rand = { workspace = true }
once_cell = "^1"
tiny-bip39 = "^1"
tokio-stream = { workspace = true }
Expand Down
154 changes: 149 additions & 5 deletions rust/noosphere-core/src/data/body_chunk.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use crate::stream::reverse_stream;
use anyhow::{anyhow, Result};
use async_stream::try_stream;
use bytes::Bytes;
use cid::Cid;
use fastcdc::v2020::FastCDC;
use fastcdc::v2020::{AsyncStreamCDC, FastCDC};
use libipld_cbor::DagCborCodec;
use noosphere_storage::{BlockStore, Scratch};
use serde::{Deserialize, Serialize};

use noosphere_storage::BlockStore;
use tokio::io::AsyncRead;
use tokio_stream::{Stream, StreamExt};

pub const BODY_CHUNK_MAX_SIZE: u32 = 1024 * 1024; // ~1mb/chunk worst case, ~.5mb/chunk average case
/// Encoding content larger than `CONTENT_STORAGE_MEMORY_LIMIT` will
/// use disk-storage rather than memory storage.
pub const CONTENT_STORAGE_MEMORY_LIMIT: u64 = 1024 * 1024 * 5; // 5mb

/// A body chunk is a simplified flexible byte layout used for linking
/// chunks of bytes. This is necessary to support cases when body contents
Expand All @@ -21,7 +28,7 @@ pub struct BodyChunkIpld {
}

impl BodyChunkIpld {
// TODO(#498): Re-write to address potentially unbounded memory overhead
#[deprecated(note = "Use `BodyChunkIpld::encode` instead for a streaming interface")]
pub async fn store_bytes<S: BlockStore>(bytes: &[u8], store: &mut S) -> Result<Cid> {
let chunks = FastCDC::new(
bytes,
Expand Down Expand Up @@ -56,7 +63,7 @@ impl BodyChunkIpld {
next_chunk_cid.ok_or_else(|| anyhow!("No CID; did you try to store zero bytes?"))
}

// TODO(#498): Re-write to address potentially unbounded memory overhead
#[deprecated(note = "Use `BodyChunkIpld::decode` instead for a streaming interface")]
pub async fn load_all_bytes<S: BlockStore>(&self, store: &S) -> Result<Vec<u8>> {
let mut all_bytes = self.bytes.clone();
let mut next_cid = self.next;
Expand All @@ -70,4 +77,141 @@ impl BodyChunkIpld {

Ok(all_bytes)
}

/// Encode `content` as a [BodyChunkIpld] chain in streaming fashion,
/// returning a [Stream] that yields a [Cid] and [BodyChunkIpld] tuple
/// in reverse order.
pub async fn encode_streaming<'a, R, S>(
content: R,
store: &'a S,
) -> impl Stream<Item = Result<(Cid, BodyChunkIpld), anyhow::Error>> + Unpin + 'a
where
R: AsyncRead + Unpin + 'a,
S: Scratch + BlockStore,
{
Box::pin(try_stream! {
let mut chunker = AsyncStreamCDC::new(
content,
fastcdc::v2020::MINIMUM_MIN,
BODY_CHUNK_MAX_SIZE / 2,
BODY_CHUNK_MAX_SIZE,
);
let stream = chunker.as_stream().map(|chunk_data| chunk_data.and_then(|chunk_data| Ok(chunk_data.data)));
let stream = reverse_stream(stream, store, CONTENT_STORAGE_MEMORY_LIMIT);
tokio::pin!(stream);

let mut store = store.to_owned();
let mut next_chunk_cid = None;
while let Some(chunk) = stream.try_next().await? {
let chunk = BodyChunkIpld {
bytes: chunk,
next: next_chunk_cid,
};

let cid = store.save::<DagCborCodec, _>(&chunk).await?;
yield (cid, chunk);
next_chunk_cid = Some(cid);
}
})
}

/// Encode `content` as a [BodyChunkIpld] chain in streaming fashion,
/// returning the root [Cid] upon completion.
pub async fn encode<R, S>(content: R, store: &S) -> Result<Cid>
where
R: AsyncRead + Unpin,
S: Scratch + BlockStore,
{
let stream = BodyChunkIpld::encode_streaming(content, store).await;
tokio::pin!(stream);

let mut head_cid = None;
while let Some((cid, _)) = stream.try_next().await? {
head_cid = Some(cid);
}
match head_cid {
Some(cid) => Ok(cid),
None => Err(anyhow!("Could not encode empty buffer.")),
}
}

/// Decode a [BodyChunkIpld] chain via [Cid] into a [Bytes] stream.
pub fn decode<S>(
cid: &Cid,
store: &S,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Unpin
where
S: BlockStore,
{
let mut next = Some(*cid);
let store = store.clone();
Box::pin(try_stream! {
while let Some(cid) = next {
debug!("Unpacking block {}...", cid);
let chunk = store.load::<DagCborCodec, BodyChunkIpld>(&cid).await.map_err(|error| {
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string())
})?;
yield Bytes::from(chunk.bytes);
next = chunk.next;
}
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use noosphere_storage::{helpers::make_disposable_storage, MemoryStore, SphereDb};
use tokio_stream::StreamExt;

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test;
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_reads_and_writes_chunks_nonstreaming() -> Result<()> {
let mut store = MemoryStore::default();
let chunk1 = vec![1; BODY_CHUNK_MAX_SIZE.try_into().unwrap()];
let chunk2 = vec![2; BODY_CHUNK_MAX_SIZE.try_into().unwrap()];
let chunk3 = vec![3; <u32 as TryInto::<usize>>::try_into(BODY_CHUNK_MAX_SIZE).unwrap() / 2];
let bytes = [chunk1.clone(), chunk2.clone(), chunk3.clone()].concat();

#[allow(deprecated)]
let cid = BodyChunkIpld::store_bytes(&bytes, &mut store).await?;
let ipld_chunk = store.load::<DagCborCodec, BodyChunkIpld>(&cid).await?;

#[allow(deprecated)]
let output = ipld_chunk.load_all_bytes(&store).await?;
assert_eq!(output, bytes);
Ok(())
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_reads_and_writes_chunks_streaming() -> Result<()> {
let provider = make_disposable_storage().await?;
let db = SphereDb::new(&provider).await?;
let mut chunks = vec![];
for n in 1..10 {
let mut chunk: Vec<u8> = vec![0; BODY_CHUNK_MAX_SIZE.try_into().unwrap()];
chunk.fill(n);
chunks.push(chunk);
}
let bytes = chunks.concat();
assert!(bytes.len() as u64 > CONTENT_STORAGE_MEMORY_LIMIT);

let cid = BodyChunkIpld::encode(bytes.as_ref(), &db).await?;

let stream = BodyChunkIpld::decode(&cid, &db);
drop(db);
tokio::pin!(stream);
let mut output: Vec<Vec<u8>> = vec![];
while let Some(chunk) = stream.try_next().await? {
output.push(chunk.into());
}
assert_eq!(output.concat(), bytes);
Ok(())
}
}
1 change: 1 addition & 0 deletions rust/noosphere-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod view;

pub mod error;
pub mod tracing;
pub mod stream;

#[cfg(any(test, feature = "helpers"))]
pub mod helpers;
Loading

0 comments on commit 7c557f2

Please sign in to comment.