Skip to content

Commit

Permalink
Add settings.
Browse files Browse the repository at this point in the history
  • Loading branch information
milesj committed Jan 17, 2025
1 parent 0bc216f commit 0eede1b
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 40 deletions.
34 changes: 20 additions & 14 deletions .moon/workspace.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,23 @@ docker:
include:
- '*.config.js'
- '*.json'
# unstable_remote:
# host: 'http://0.0.0.0:8080'
# # host: 'grpc://0.0.0.0:9092'
# cache:
# compression: 'zstd'
# mtls:
# caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem'
# clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem'
# clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key'
# domain: 'localhost'
# tls:
# # assumeHttp2: true
# cert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem'
# # domain: 'localhost'
unstable_remote:
# host: 'http://0.0.0.0:8080'
# host: 'grpcs://0.0.0.0:9092'
host: 'grpcs://cache.depot.dev'
auth:
token: 'DEPOT_TOKEN'
headers:
'X-Depot-Org': '1xtpjd084j'
'X-Depot-Project': '90xxfkst9n'
# cache:
# compression: 'zstd'
# mtls:
# caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem'
# clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem'
# clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key'
# domain: 'localhost'
# tls:
# # assumeHttp2: true
# cert: 'crates/remote/tests/__fixtures__/certs-local/client.pem'
# domain: 'localhost'
17 changes: 16 additions & 1 deletion crates/config/src/workspace/remote_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::portable_path::FilePath;
use rustc_hash::FxHashMap;
use schematic::{derive_enum, validate, Config, ConfigEnum, ValidateError, ValidateResult};

fn path_is_required<D, C>(
Expand All @@ -14,6 +15,16 @@ fn path_is_required<D, C>(
Ok(())
}

/// Configures basic HTTP authentication.
#[derive(Clone, Config, Debug)]
pub struct RemoteAuthConfig {
/// HTTP headers to inject into every request.
pub headers: FxHashMap<String, String>,

/// The name of an environment variable to use as a bearer token.
pub token: Option<String>,
}

derive_enum!(
#[derive(Copy, ConfigEnum, Default)]
pub enum RemoteCompression {
Expand Down Expand Up @@ -81,6 +92,10 @@ pub struct RemoteMtlsConfig {
/// Configures the remote service, powered by the Bazel Remote Execution API.
#[derive(Clone, Config, Debug)]
pub struct RemoteConfig {
/// Connect to the host using basic HTTP authentication.
#[setting(nested)]
pub auth: Option<RemoteAuthConfig>,

/// Configures the action cache (AC) and content addressable cache (CAS).
#[setting(nested)]
pub cache: RemoteCacheConfig,
Expand All @@ -106,6 +121,6 @@ impl RemoteConfig {
}

pub fn is_secure(&self) -> bool {
self.tls.is_some() || self.mtls.is_some()
self.auth.is_some() || self.tls.is_some() || self.mtls.is_some()
}
}
122 changes: 97 additions & 25 deletions crates/remote/src/grpc_remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use bazel_remote_apis::build::bazel::remote::execution::v2::{
GetCapabilitiesRequest, ServerCapabilities, UpdateActionResultRequest,
};
use moon_common::color;
use moon_config::{RemoteCompression, RemoteConfig};
use std::{error::Error, path::Path};
use moon_config::RemoteConfig;
use std::{env, error::Error, path::Path};
use tonic::{
metadata::{MetadataKey, MetadataValue},
transport::{Channel, Endpoint},
Code,
Code, Request, Status,
};
use tracing::{trace, warn};

Expand All @@ -26,6 +27,8 @@ fn map_transport_error(error: tonic::transport::Error) -> RemoteError {
}

fn map_status_error(error: tonic::Status) -> RemoteError {
dbg!(&error);

match error.source() {
Some(src) => RemoteError::GrpcCallFailedViaSource {
error: src.to_string(),
Expand All @@ -39,8 +42,46 @@ fn map_status_error(error: tonic::Status) -> RemoteError {
#[derive(Default)]
pub struct GrpcRemoteClient {
channel: Option<Channel>,
compression: RemoteCompression,
instance_name: String,
config: RemoteConfig,
}

impl GrpcRemoteClient {
fn inject_auth_headers(&self, mut req: Request<()>) -> Result<Request<()>, Status> {
if self.config.mtls.is_some() || self.config.tls.is_some() {
return Ok(req);
}

if let Some(auth) = &self.config.auth {
let headers = req.metadata_mut();

for (key, value) in &auth.headers {
headers.insert(
MetadataKey::from_bytes(key.as_bytes()).unwrap(),
MetadataValue::try_from(value).unwrap(),
);
}

if let Some(token_name) = &auth.token {
let token = env::var(token_name).unwrap_or_default();

if token.is_empty() {
warn!(
"Remote service auth token {} does not exist, unable to authorize",
color::property(token_name)
);
} else {
headers.insert(
"Authorization",
MetadataValue::try_from(format!("Bearer {token}")).unwrap(),
);
}
}
}

dbg!(&req);

Ok(req)
}
}

#[async_trait::async_trait]
Expand All @@ -60,12 +101,22 @@ impl RemoteClient for GrpcRemoteClient {
"(with mTLS)"
} else if config.tls.is_some() {
"(with TLS)"
} else if config.auth.is_some() {
"(with auth)"
} else {
"(insecure)"
}
);

let mut endpoint = Endpoint::from_shared(host.to_owned())
// Although we use a grpc(s) protocol for the host,
// tonic only supports http(s), so change it
let url = if let Some(suffix) = host.strip_prefix("grpc") {
format!("http{suffix}")
} else {
host.to_owned()
};

let mut endpoint = Endpoint::from_shared(url)
.map_err(map_transport_error)?
.user_agent("moon")
.map_err(map_transport_error)?
Expand All @@ -92,22 +143,31 @@ impl RemoteClient for GrpcRemoteClient {
);
}

self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?);
self.compression = config.cache.compression;
self.instance_name = config.cache.instance_name.clone();
self.config = config.to_owned();

// We can't inject auth headers into this initial connection,
// so defer the connection until a client is used
if self.config.auth.is_some() {
self.channel = Some(endpoint.connect_lazy());
} else {
self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?);
}

Ok(())
}

// https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L452
async fn load_capabilities(&self) -> miette::Result<ServerCapabilities> {
let mut client = CapabilitiesClient::new(self.channel.clone().unwrap());
let mut client =
CapabilitiesClient::with_interceptor(self.channel.clone().unwrap(), |req| {
self.inject_auth_headers(req)
});

trace!("Loading remote execution API capabilities from gRPC server");

let response = client
.get_capabilities(GetCapabilitiesRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
})
.await
.map_err(map_status_error)?;
Expand All @@ -117,13 +177,16 @@ impl RemoteClient for GrpcRemoteClient {

// https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L170
async fn get_action_result(&self, digest: &Digest) -> miette::Result<Option<ActionResult>> {
let mut client = ActionCacheClient::new(self.channel.clone().unwrap());
let mut client =
ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| {
self.inject_auth_headers(req)
});

trace!(hash = &digest.hash, "Checking for a cached action result");

match client
.get_action_result(GetActionResultRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
action_digest: Some(digest.to_owned()),
inline_stderr: true,
inline_stdout: true,
Expand Down Expand Up @@ -164,7 +227,10 @@ impl RemoteClient for GrpcRemoteClient {
digest: &Digest,
result: ActionResult,
) -> miette::Result<Option<ActionResult>> {
let mut client = ActionCacheClient::new(self.channel.clone().unwrap());
let mut client =
ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| {
self.inject_auth_headers(req)
});

trace!(
hash = &digest.hash,
Expand All @@ -177,7 +243,7 @@ impl RemoteClient for GrpcRemoteClient {

match client
.update_action_result(UpdateActionResultRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
action_digest: Some(digest.to_owned()),
action_result: Some(result),
digest_function: digest_function::Value::Sha256 as i32,
Expand Down Expand Up @@ -222,19 +288,22 @@ impl RemoteClient for GrpcRemoteClient {
digest: &Digest,
blob_digests: Vec<Digest>,
) -> miette::Result<Vec<Blob>> {
let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap());
let mut client = ContentAddressableStorageClient::with_interceptor(
self.channel.clone().unwrap(),
|req| self.inject_auth_headers(req),
);

trace!(
hash = &digest.hash,
compression = self.compression.to_string(),
compression = self.config.cache.compression.to_string(),
"Downloading {} output blobs",
blob_digests.len()
);

let response = match client
.batch_read_blobs(BatchReadBlobsRequest {
acceptable_compressors: get_acceptable_compressors(self.compression),
instance_name: self.instance_name.clone(),
acceptable_compressors: get_acceptable_compressors(self.config.cache.compression),
instance_name: self.config.cache.instance_name.clone(),
digests: blob_digests,
digest_function: digest_function::Value::Sha256 as i32,
})
Expand Down Expand Up @@ -272,7 +341,7 @@ impl RemoteClient for GrpcRemoteClient {
if let Some(digest) = download.digest {
blobs.push(Blob {
digest,
bytes: decompress_blob(self.compression, download.data)?,
bytes: decompress_blob(self.config.cache.compression, download.data)?,
});
}

Expand All @@ -295,11 +364,14 @@ impl RemoteClient for GrpcRemoteClient {
digest: &Digest,
blobs: Vec<Blob>,
) -> miette::Result<Vec<Option<Digest>>> {
let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap());
let mut client = ContentAddressableStorageClient::with_interceptor(
self.channel.clone().unwrap(),
|req| self.inject_auth_headers(req),
);

trace!(
hash = &digest.hash,
compression = self.compression.to_string(),
compression = self.config.cache.compression.to_string(),
"Uploading {} output blobs",
blobs.len()
);
Expand All @@ -309,14 +381,14 @@ impl RemoteClient for GrpcRemoteClient {
for blob in blobs {
requests.push(batch_update_blobs_request::Request {
digest: Some(blob.digest),
data: compress_blob(self.compression, blob.bytes)?,
compressor: get_compressor(self.compression),
data: compress_blob(self.config.cache.compression, blob.bytes)?,
compressor: get_compressor(self.config.cache.compression),
});
}

let response = match client
.batch_update_blobs(BatchUpdateBlobsRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
requests,
digest_function: digest_function::Value::Sha256 as i32,
})
Expand Down

0 comments on commit 0eede1b

Please sign in to comment.