Skip to content

Commit

Permalink
Proxy: Limit the max number of in-flight requests. (#398)
Browse files Browse the repository at this point in the history
Currently, the max number of in-flight requests in the proxy is
unbounded. This is due to the `Buffer` middleware being unbounded.

This is resolved by adding an instance of `InFlightLimit` around
`Buffer`, capping the max number of in-flight requests for a given
endpoint.

Currently, the limit is hardcoded to 10,000. However, this will
eventually become a configuration value.

Fixes #287

Signed-off-by: Carl Lerche <[email protected]>
  • Loading branch information
carllerche authored Feb 21, 2018
1 parent c579a8f commit 2871288
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 30 deletions.
35 changes: 29 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ ns-dns-tokio = "0.4"

#futures-watch = { git = "https://github.com/carllerche/better-future" }

tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
tower = { git = "https://github.com/tower-rs/tower" }
tower-balance = { git = "https://github.com/tower-rs/tower" }
tower-buffer = { git = "https://github.com/tower-rs/tower" }
tower-discover = { git = "https://github.com/tower-rs/tower" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
tower-reconnect = { git = "https://github.com/tower-rs/tower" }
tower-util = { git = "https://github.com/tower-rs/tower" }
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
tower = { git = "https://github.com/tower-rs/tower" }
tower-balance = { git = "https://github.com/tower-rs/tower" }
tower-buffer = { git = "https://github.com/tower-rs/tower" }
tower-discover = { git = "https://github.com/tower-rs/tower" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
tower-reconnect = { git = "https://github.com/tower-rs/tower" }
tower-in-flight-limit = { git = "https://github.com/tower-rs/tower" }
tower-util = { git = "https://github.com/tower-rs/tower" }

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"
Expand Down
21 changes: 13 additions & 8 deletions proxy/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use http;
use tower;
use tower_buffer::{self, Buffer};
use tower_in_flight_limit::{self, InFlightLimit};
use tower_h2;
use conduit_proxy_router::Recognize;

Expand All @@ -17,6 +18,8 @@ pub struct Inbound<B> {
bind: Bind<B>,
}

const MAX_IN_FLIGHT: usize = 10_000;

// ===== impl Inbound =====

impl<B> Inbound<B> {
Expand All @@ -34,12 +37,14 @@ where
{
type Request = http::Request<B>;
type Response = bind::HttpResponse;
type Error = tower_buffer::Error<
<bind::Service<B> as tower::Service>::Error
type Error = tower_in_flight_limit::Error<
tower_buffer::Error<
<bind::Service<B> as tower::Service>::Error
>
>;
type Key = (SocketAddr, bind::Protocol);
type RouteError = ();
type Service = Buffer<bind::Service<B>>;
type Service = InFlightLimit<Buffer<bind::Service<B>>>;

fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let key = req.extensions()
Expand Down Expand Up @@ -72,11 +77,11 @@ where
let &(ref addr, proto) = key;
debug!("building inbound {:?} client to {}", proto, addr);

// Wrap with buffering. This currently is an unbounded buffer, which
// is not ideal.
//
// TODO: Don't use unbounded buffering.
Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor()).map_err(|_| {})
Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor())
.map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT)
})
.map_err(|_| {})
}
}

Expand Down
1 change: 1 addition & 0 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extern crate tower_h2;
extern crate tower_reconnect;
extern crate conduit_proxy_router;
extern crate tower_util;
extern crate tower_in_flight_limit;
extern crate url;

use futures::*;
Expand Down
17 changes: 10 additions & 7 deletions proxy/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tower;
use tower_balance::{self, choose, load, Balance};
use tower_buffer::Buffer;
use tower_discover::{Change, Discover};
use tower_in_flight_limit::InFlightLimit;
use tower_h2;
use conduit_proxy_router::Recognize;

Expand All @@ -26,6 +27,8 @@ pub struct Outbound<B> {
default_zone: Option<String>,
}

const MAX_IN_FLIGHT: usize = 10_000;

// ===== impl Outbound =====

impl<B> Outbound<B> {
Expand Down Expand Up @@ -56,10 +59,10 @@ where
type Error = <Self::Service as tower::Service>::Error;
type Key = (Destination, Protocol);
type RouteError = ();
type Service = Buffer<Balance<
type Service = InFlightLimit<Buffer<Balance<
load::WithPendingRequests<Discovery<B>>,
choose::PowerOfTwoChoices<rand::ThreadRng>,
>>;
>>>;

fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let local = req.uri().authority_part().and_then(|authority| {
Expand Down Expand Up @@ -128,11 +131,11 @@ where

let balance = tower_balance::power_of_two_choices(loaded, rand::thread_rng());

// Wrap with buffering. This currently is an unbounded buffer,
// which is not ideal.
//
// TODO: Don't use unbounded buffering.
Buffer::new(balance, self.bind.executor()).map_err(|_| {})
Buffer::new(balance, self.bind.executor())
.map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT)
})
.map_err(|_| {})
}
}

Expand Down

0 comments on commit 2871288

Please sign in to comment.