Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support NO_PROXY environment variable #2290

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions crates/serde-util/src/authority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use http::uri::Authority;
use serde::Deserialize;
use serde_with::{DeserializeAs, SerializeAs};

/// SerializeAs/DeserializeAs to implement ser/de trait for [Authority]
/// Use it with `#[serde(with = "serde_with::As::<AuthoritySerde>")]`.
pub struct AuthoritySerde;

impl SerializeAs<Authority> for AuthoritySerde {
fn serialize_as<S>(source: &Authority, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(source.as_str())
}
}

impl<'de> DeserializeAs<'de, Authority> for AuthoritySerde {
fn deserialize_as<D>(deserializer: D) -> Result<Authority, D::Error>
where
D: serde::Deserializer<'de>,
{
let buf = String::deserialize(deserializer)?;
Authority::try_from(buf).map_err(serde::de::Error::custom)
}
}
1 change: 1 addition & 0 deletions crates/serde-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod header_map;
#[cfg(feature = "proto")]
mod proto;

pub mod authority;
pub mod default;
mod duration;
pub mod header_value;
Expand Down
2 changes: 2 additions & 0 deletions crates/service-client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ impl HttpClient {
HttpClient {
client: builder.clone().build::<_, BoxBody>(ProxyConnector::new(
options.http_proxy.clone(),
options.no_proxy.clone(),
https_connector,
)),
h2c_prior_knowledge_client: {
builder.http2_only(true);
builder.build::<_, BoxBody>(ProxyConnector::new(
options.http_proxy.clone(),
options.no_proxy.clone(),
http_connector,
))
},
Expand Down
80 changes: 77 additions & 3 deletions crates/service-client/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,87 @@

use hyper::Uri;
use restate_types::config::ProxyUri;
use std::task::{Context, Poll};
use rustls::pki_types::IpAddr;
use std::{
collections::HashSet,
task::{Context, Poll},
};
use tower_service::Service;

#[derive(Clone, Debug)]
pub struct ProxyConnector<C> {
proxy: Option<ProxyUri>,
no_proxy_ips: HashSet<IpAddr>,
no_proxy_domains: Vec<String>,
connector: C,
}

impl<C> ProxyConnector<C> {
pub fn new(proxy: Option<ProxyUri>, connector: C) -> Self {
Self { proxy, connector }
pub fn new(proxy: Option<ProxyUri>, no_proxy: Vec<http::uri::Authority>, connector: C) -> Self {
let mut no_proxy_ips = HashSet::new();
let mut no_proxy_domains = Vec::new();

for no_proxy_authority in no_proxy {
match IpAddr::try_from(no_proxy_authority.as_str()) {
Ok(ip) => {
no_proxy_ips.insert(ip);
}
Err(_) => no_proxy_domains.push(no_proxy_authority.host().to_owned()),
}
}

Self {
proxy,
no_proxy_ips,
no_proxy_domains,
connector,
}
}

fn no_proxy(&self, host: &str) -> bool {
// According to RFC3986, raw IPv6 hosts will be wrapped in []. So we need to strip those off
// the end in order to parse correctly
let authority = if host.starts_with('[') && host.ends_with(']') {
&host[1..host.len() - 1]
} else {
host
};
match IpAddr::try_from(authority) {
// If we can parse an IP addr, then use it, otherwise, assume it is a domain
Ok(ip) => self.no_proxy_ip(ip),
Err(_) => self.no_proxy_domain(authority),
}
}

fn no_proxy_ip(&self, ip: IpAddr) -> bool {
self.no_proxy_ips.contains(&ip)
}

// Copied from reqwest: https://github.com/seanmonstar/reqwest/blob/master/src/proxy.rs <dual-licensed Apache and MIT>
// The following links may be useful to understand the origin of these rules:
// * https://curl.se/libcurl/c/CURLOPT_NOPROXY.html
// * https://github.com/curl/curl/issues/1208
fn no_proxy_domain(&self, domain: &str) -> bool {
let domain_len = domain.len();
for d in &self.no_proxy_domains {
let d = d.as_str();
if d == domain || d.strip_prefix('.') == Some(domain) {
return true;
} else if domain.ends_with(d) {
if d.starts_with('.') {
// If the first character of d is a dot, that means the first character of domain
// must also be a dot, so we are looking at a subdomain of d and that matches
return true;
} else if domain.as_bytes().get(domain_len - d.len() - 1) == Some(&b'.') {
// Given that d is a prefix of domain, if the prior character in domain is a dot
// then that means we must be matching a subdomain of d, and that matches
return true;
}
} else if d == "*" {
return true;
}
}
false
}
}

Expand All @@ -38,6 +107,11 @@ where
}

fn call(&mut self, uri: Uri) -> Self::Future {
if let Some(host) = uri.host() {
if self.no_proxy(host) {
return self.connector.call(uri);
}
}
self.connector.call(match &self.proxy {
Some(proxy) => proxy.dst(uri),
None => uri,
Expand Down
1 change: 1 addition & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ figment = { version = "0.10.8", features = ["env", "toml"] }
flexbuffers = { workspace = true }
hostname = { workspace = true }
http = { workspace = true }
http-serde = { workspace = true }
humantime = { workspace = true }
itertools = { workspace = true }
moka = { workspace = true, features = ["sync", "logging"] }
Expand Down
13 changes: 13 additions & 0 deletions crates/types/src/config/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use http::Uri;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use restate_serde_util::authority::AuthoritySerde;

/// # HTTP client options
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
Expand All @@ -37,6 +39,16 @@ pub struct HttpOptions {
/// Can be overridden by the `HTTP_PROXY` environment variable.
#[cfg_attr(feature = "schemars", schemars(with = "Option<String>"))]
pub http_proxy: Option<ProxyUri>,

/// # No proxy
///
/// HTTP authorities eg `localhost`, `restate.dev`, `127.0.0.1` that should not be proxied by the http_proxy.
/// Ports are ignored. Subdomains are also matched. An entry “*” matches all hostnames.
/// Can be overridden by the `NO_PROXY` environment variable, which supports comma separated values.
#[serde_as(as = "Vec<AuthoritySerde>")]
#[cfg_attr(feature = "schemars", schemars(with = "Vec<String>"))]
pub no_proxy: Vec<http::uri::Authority>,

/// # Connect timeout
///
/// How long to wait for a TCP connection to be established before considering
Expand All @@ -51,6 +63,7 @@ impl Default for HttpOptions {
Self {
http_keep_alive_options: Http2KeepAliveOptions::default(),
http_proxy: None,
no_proxy: Vec::new(),
connect_timeout: HttpOptions::default_connect_timeout(),
}
}
Expand Down
14 changes: 12 additions & 2 deletions crates/types/src/config_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ConfigLoader {
}

fn merge_with_env(figment: Figment) -> Figment {
figment
let fig = figment
.merge(
Env::prefixed("RESTATE_")
.split("__")
Expand All @@ -78,6 +78,7 @@ impl ConfigLoader {
.only(&["HTTP_PROXY"])
.map(|_| "http-proxy".into()),
)
.merge(Env::raw().only(&["NO_PROXY"]).map(|_| "no-proxy".into()))
.merge(
Env::raw()
.only(&["AWS_EXTERNAL_ID"])
Expand All @@ -87,7 +88,16 @@ impl ConfigLoader {
Env::raw()
.only(&["MEMORY_LIMIT"])
.map(|_| "rocksdb-total-memory-limit".into()),
)
);

if let Some(no_proxy) = Env::var("NO_PROXY") {
fig.join((
"no-proxy",
no_proxy.split(',').map(str::trim).collect::<Vec<_>>(),
))
} else {
fig
}
}

pub fn start(self) {
Expand Down
Loading