diff --git a/Cargo.lock b/Cargo.lock index 3d3f6033a4d47..8445589ae07fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -157,6 +157,7 @@ dependencies = [ "tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", "tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)", + "tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-util 0.1.0 (git+https://github.com/tower-rs/tower)", "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -997,7 +998,7 @@ dependencies = [ [[package]] name = "tower" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1005,7 +1006,7 @@ dependencies = [ [[package]] name = "tower-balance" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1018,7 +1019,7 @@ dependencies = [ [[package]] name = "tower-buffer" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1027,7 +1028,7 @@ dependencies = [ [[package]] name = "tower-discover" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1073,10 +1074,29 @@ dependencies = [ "tower 0.1.0 (git+https://github.com/tower-rs/tower)", ] +[[package]] +name = "tower-in-flight-limit" +version = "0.1.0" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" +dependencies = [ + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "tower 0.1.0 (git+https://github.com/tower-rs/tower)", + "tower-ready-service 0.1.0 (git+https://github.com/tower-rs/tower)", +] + +[[package]] +name = "tower-ready-service" +version = "0.1.0" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" +dependencies = [ + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "tower 0.1.0 (git+https://github.com/tower-rs/tower)", +] + [[package]] name = "tower-reconnect" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1086,10 +1106,11 @@ dependencies = [ [[package]] name = "tower-util" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045" +source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", + "tower-ready-service 0.1.0 (git+https://github.com/tower-rs/tower)", ] [[package]] @@ -1333,6 +1354,8 @@ dependencies = [ "checksum tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "" "checksum tower-grpc-build 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "" "checksum tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)" = "" +"checksum tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)" = "" +"checksum tower-ready-service 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 8d557c9c8b923..6d446c0d97d7a 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -36,15 +36,16 @@ ns-dns-tokio = "0.4" #futures-watch = { git = "https://github.com/carllerche/better-future" } -tokio-connect = { git = "https://github.com/carllerche/tokio-connect" } -tower = { git = "https://github.com/tower-rs/tower" } -tower-balance = { git = "https://github.com/tower-rs/tower" } -tower-buffer = { git = "https://github.com/tower-rs/tower" } -tower-discover = { git = "https://github.com/tower-rs/tower" } -tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" } -tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } -tower-reconnect = { git = "https://github.com/tower-rs/tower" } -tower-util = { git = "https://github.com/tower-rs/tower" } +tokio-connect = { git = "https://github.com/carllerche/tokio-connect" } +tower = { git = "https://github.com/tower-rs/tower" } +tower-balance = { git = "https://github.com/tower-rs/tower" } +tower-buffer = { git = "https://github.com/tower-rs/tower" } +tower-discover = { git = "https://github.com/tower-rs/tower" } +tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" } +tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } +tower-reconnect = { git = "https://github.com/tower-rs/tower" } +tower-in-flight-limit = { git = "https://github.com/tower-rs/tower" } +tower-util = { git = "https://github.com/tower-rs/tower" } [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2" diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index d30eda20604de..f51bd8ef3497a 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use http; use tower; use tower_buffer::{self, Buffer}; +use tower_in_flight_limit::{self, InFlightLimit}; use tower_h2; use conduit_proxy_router::Recognize; @@ -17,6 +18,8 @@ pub struct Inbound { bind: Bind, } +const MAX_IN_FLIGHT: usize = 10_000; + // ===== impl Inbound ===== impl Inbound { @@ -34,12 +37,14 @@ where { type Request = http::Request; type Response = bind::HttpResponse; - type Error = tower_buffer::Error< - as tower::Service>::Error + type Error = tower_in_flight_limit::Error< + tower_buffer::Error< + as tower::Service>::Error + > >; type Key = (SocketAddr, bind::Protocol); type RouteError = (); - type Service = Buffer>; + type Service = InFlightLimit>>; fn recognize(&self, req: &Self::Request) -> Option { let key = req.extensions() @@ -72,11 +77,11 @@ where let &(ref addr, proto) = key; debug!("building inbound {:?} client to {}", proto, addr); - // Wrap with buffering. This currently is an unbounded buffer, which - // is not ideal. - // - // TODO: Don't use unbounded buffering. - Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor()).map_err(|_| {}) + Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor()) + .map(|buffer| { + InFlightLimit::new(buffer, MAX_IN_FLIGHT) + }) + .map_err(|_| {}) } } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8e345d2c3018e..46b33483c8fc4 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -41,6 +41,7 @@ extern crate tower_h2; extern crate tower_reconnect; extern crate conduit_proxy_router; extern crate tower_util; +extern crate tower_in_flight_limit; extern crate url; use futures::*; diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 5eb6322c9c05c..5d1c5bf94ef94 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -8,6 +8,7 @@ use tower; use tower_balance::{self, choose, load, Balance}; use tower_buffer::Buffer; use tower_discover::{Change, Discover}; +use tower_in_flight_limit::InFlightLimit; use tower_h2; use conduit_proxy_router::Recognize; @@ -26,6 +27,8 @@ pub struct Outbound { default_zone: Option, } +const MAX_IN_FLIGHT: usize = 10_000; + // ===== impl Outbound ===== impl Outbound { @@ -56,10 +59,10 @@ where type Error = ::Error; type Key = (Destination, Protocol); type RouteError = (); - type Service = Buffer>, choose::PowerOfTwoChoices, - >>; + >>>; fn recognize(&self, req: &Self::Request) -> Option { let local = req.uri().authority_part().and_then(|authority| { @@ -128,11 +131,11 @@ where let balance = tower_balance::power_of_two_choices(loaded, rand::thread_rng()); - // Wrap with buffering. This currently is an unbounded buffer, - // which is not ideal. - // - // TODO: Don't use unbounded buffering. - Buffer::new(balance, self.bind.executor()).map_err(|_| {}) + Buffer::new(balance, self.bind.executor()) + .map(|buffer| { + InFlightLimit::new(buffer, MAX_IN_FLIGHT) + }) + .map_err(|_| {}) } }