Skip to content

Commit

Permalink
need to add a flag
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju committed Jan 17, 2025
1 parent d62e31a commit cb1cfb8
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 187 deletions.
2 changes: 2 additions & 0 deletions cli/lib/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
worker_type: args.worker_type,
stdio: stdio.clone(),
cache_storage_dir,
use_lsc_cache: false,
strace_ops: shared.options.strace_ops.clone(),
close_on_idle: args.close_on_idle,
maybe_worker_metadata: args.maybe_worker_metadata,
Expand Down Expand Up @@ -462,6 +463,7 @@ impl<TSys: DenoLibSys> LibMainWorkerFactory<TSys> {
should_wait_for_inspector_session: shared.options.inspect_wait,
strace_ops: shared.options.strace_ops.clone(),
cache_storage_dir,
use_lsc_cache: false,
origin_storage_dir,
stdio,
skip_op_registration: shared.options.skip_op_registration,
Expand Down
215 changes: 161 additions & 54 deletions ext/cache/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
// Copyright 2018-2025 the Deno authors. MIT license.

use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use deno_core::op2;
use deno_core::serde::Deserialize;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::ByteString;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_error::JsErrorBox;
use futures::Stream;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;

mod lsc_shard;
mod lscache;
Expand All @@ -22,6 +30,7 @@ mod sqlite;
pub use lsc_shard::CacheShard;
pub use lscache::LscBackend;
pub use sqlite::SqliteBackedCache;
use tokio_util::io::StreamReader;

#[derive(Debug, thiserror::Error, deno_error::JsError)]
pub enum CacheError {
Expand Down Expand Up @@ -83,24 +92,21 @@ pub enum CacheError {
}

#[derive(Clone)]
pub struct CreateCache<C: Cache + 'static>(
pub Arc<dyn Fn() -> Result<C, CacheError>>,
);
pub struct CreateCache(pub Arc<dyn Fn() -> Result<CacheImpl, CacheError>>);

deno_core::extension!(deno_cache,
deps = [ deno_webidl, deno_web, deno_url, deno_fetch ],
parameters=[CA: Cache],
ops = [
op_cache_storage_open<CA>,
op_cache_storage_has<CA>,
op_cache_storage_delete<CA>,
op_cache_put<CA>,
op_cache_match<CA>,
op_cache_delete<CA>,
op_cache_storage_open,
op_cache_storage_has,
op_cache_storage_delete,
op_cache_put,
op_cache_match,
op_cache_delete,
],
esm = [ "01_cache.js" ],
options = {
maybe_create_cache: Option<CreateCache<CA>>,
maybe_create_cache: Option<CreateCache>,
},
state = |state, options| {
if let Some(create_cache) = options.maybe_create_cache {
Expand Down Expand Up @@ -184,52 +190,160 @@ pub trait Cache: Clone + 'static {
) -> Result<bool, CacheError>;
}

#[derive(Clone)]
pub enum CacheImpl {
Sqlite(SqliteBackedCache),
Lsc(LscBackend),
}

#[async_trait(?Send)]
impl Cache for CacheImpl {
type CacheMatchResourceType = CacheResponseResource;

async fn storage_open(&self, cache_name: String) -> Result<i64, CacheError> {
match self {
Self::Sqlite(cache) => cache.storage_open(cache_name).await,
Self::Lsc(cache) => cache.storage_open(cache_name).await,
}
}

async fn storage_has(&self, cache_name: String) -> Result<bool, CacheError> {
match self {
Self::Sqlite(cache) => cache.storage_has(cache_name).await,
Self::Lsc(cache) => cache.storage_has(cache_name).await,
}
}

async fn storage_delete(
&self,
cache_name: String,
) -> Result<bool, CacheError> {
match self {
Self::Sqlite(cache) => cache.storage_delete(cache_name).await,
Self::Lsc(cache) => cache.storage_delete(cache_name).await,
}
}

async fn put(
&self,
request_response: CachePutRequest,
resource: Option<Rc<dyn Resource>>,
) -> Result<(), CacheError> {
match self {
Self::Sqlite(cache) => cache.put(request_response, resource).await,
Self::Lsc(cache) => cache.put(request_response, resource).await,
}
}

async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
Option<(CacheMatchResponseMeta, Option<Self::CacheMatchResourceType>)>,
CacheError,
> {
match self {
Self::Sqlite(cache) => cache.r#match(request).await,
Self::Lsc(cache) => cache.r#match(request).await,
}
}

async fn delete(
&self,
request: CacheDeleteRequest,
) -> Result<bool, CacheError> {
match self {
Self::Sqlite(cache) => cache.delete(request).await,
Self::Lsc(cache) => cache.delete(request).await,
}
}
}

pub enum CacheResponseResource {
Sqlite(AsyncRefCell<tokio::fs::File>),
Lsc(AsyncRefCell<Pin<Box<dyn AsyncRead>>>),
}

impl CacheResponseResource {
fn sqlite(file: tokio::fs::File) -> Self {
Self::Sqlite(AsyncRefCell::new(file))
}

fn lsc(
body: impl Stream<Item = Result<Bytes, std::io::Error>> + 'static,
) -> Self {
Self::Lsc(AsyncRefCell::new(Box::pin(StreamReader::new(body))))
}

async fn read(
self: Rc<Self>,
data: &mut [u8],
) -> Result<usize, std::io::Error> {
let nread = match &*self {
CacheResponseResource::Sqlite(_) => {
let resource = deno_core::RcRef::map(&self, |r| match r {
Self::Sqlite(r) => r,
_ => unreachable!(),
});
let mut file = resource.borrow_mut().await;
file.read(data).await?
}
CacheResponseResource::Lsc(_) => {
let resource = deno_core::RcRef::map(&self, |r| match r {
Self::Lsc(r) => r,
_ => unreachable!(),
});
let mut file = resource.borrow_mut().await;
file.read(data).await?
}
};

Ok(nread)
}
}

impl Resource for CacheResponseResource {
deno_core::impl_readable_byob!();

fn name(&self) -> Cow<str> {
"CacheResponseResource".into()
}
}

#[op2(async)]
#[number]
pub async fn op_cache_storage_open<CA>(
pub async fn op_cache_storage_open(
state: Rc<RefCell<OpState>>,
#[string] cache_name: String,
) -> Result<i64, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<i64, CacheError> {
let cache = get_cache(&state)?;
cache.storage_open(cache_name).await
}

#[op2(async)]
pub async fn op_cache_storage_has<CA>(
pub async fn op_cache_storage_has(
state: Rc<RefCell<OpState>>,
#[string] cache_name: String,
) -> Result<bool, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<bool, CacheError> {
let cache = get_cache(&state)?;
cache.storage_has(cache_name).await
}

#[op2(async)]
pub async fn op_cache_storage_delete<CA>(
pub async fn op_cache_storage_delete(
state: Rc<RefCell<OpState>>,
#[string] cache_name: String,
) -> Result<bool, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<bool, CacheError> {
let cache = get_cache(&state)?;
cache.storage_delete(cache_name).await
}

#[op2(async)]
pub async fn op_cache_put<CA>(
pub async fn op_cache_put(
state: Rc<RefCell<OpState>>,
#[serde] request_response: CachePutRequest,
) -> Result<(), CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<(), CacheError> {
let cache = get_cache(&state)?;
let resource = match request_response.response_rid {
Some(rid) => Some(
state
Expand All @@ -245,14 +359,11 @@ where

#[op2(async)]
#[serde]
pub async fn op_cache_match<CA>(
pub async fn op_cache_match(
state: Rc<RefCell<OpState>>,
#[serde] request: CacheMatchRequest,
) -> Result<Option<CacheMatchResponse>, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<Option<CacheMatchResponse>, CacheError> {
let cache = get_cache(&state)?;
match cache.r#match(request).await? {
Some((meta, None)) => Ok(Some(CacheMatchResponse(meta, None))),
Some((meta, Some(resource))) => {
Expand All @@ -264,28 +375,24 @@ where
}

#[op2(async)]
pub async fn op_cache_delete<CA>(
pub async fn op_cache_delete(
state: Rc<RefCell<OpState>>,
#[serde] request: CacheDeleteRequest,
) -> Result<bool, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<bool, CacheError> {
let cache = get_cache(&state)?;
cache.delete(request).await
}

pub fn get_cache<CA>(state: &Rc<RefCell<OpState>>) -> Result<CA, CacheError>
where
CA: Cache,
{
pub fn get_cache(
state: &Rc<RefCell<OpState>>,
) -> Result<CacheImpl, CacheError> {
let mut state = state.borrow_mut();
if let Some(cache) = state.try_borrow::<CA>() {
if let Some(cache) = state.try_borrow::<CacheImpl>() {
Ok(cache.clone())
} else if let Some(create_cache) = state.try_borrow::<CreateCache<CA>>() {
} else if let Some(create_cache) = state.try_borrow::<CreateCache>() {
let cache = create_cache.0()?;
state.put(cache);
Ok(state.borrow::<CA>().clone())
Ok(state.borrow::<CacheImpl>().clone())
} else {
Err(CacheError::ContextUnsupported)
}
Expand Down
Loading

0 comments on commit cb1cfb8

Please sign in to comment.