Skip to content

Commit

Permalink
use fill_wait_data instead of fill_wait_eof
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Dec 12, 2024
1 parent d082dfe commit 0a7d825
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions g3proxy/src/inspect/http/v1/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub(crate) enum H1InterceptionError {
InvalidRequestHeader(HttpRequestParseError),
#[error("closed by upstream")]
ClosedByUpstream,
#[error("unexpected data from upstream")]
UnexpectedUpstreamData,
#[error("upstream closed with error: {0:?}")]
UpstreamClosedWithError(io::Error),
#[error("invalid upgrade protocol: {0}")]
Expand Down
5 changes: 3 additions & 2 deletions g3proxy/src/inspect/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ where
Some(r) => r,
None => return Ok(None),
},
r = rsp_io.ups_r.fill_wait_eof() => {
r = rsp_io.ups_r.fill_wait_data() => {
req_acceptor.close();
return match r {
Ok(_) => Err(H1InterceptionError::ClosedByUpstream),
Ok(true) => Err(H1InterceptionError::UnexpectedUpstreamData),
Ok(false) => Err(H1InterceptionError::ClosedByUpstream),
Err(e) => Err(H1InterceptionError::UpstreamClosedWithError(e)),
};
}
Expand Down
4 changes: 2 additions & 2 deletions g3proxy/src/module/http_forward/connection/eof_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ impl HttpConnectionEofCheck {
tokio::select! {
biased;

_ = conn.1.fill_wait_eof() => {
// close early to avoid waiting at other side
_ = conn.1.fill_wait_data() => {
// close early when EOF or unexpected data, to avoid waiting at other side
wait_channel.close();
// make sure we correctly shutdown tls connection
// FIXME use async drop at escaper side when supported
Expand Down
7 changes: 5 additions & 2 deletions g3proxy/src/serve/http_proxy/task/forward/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,13 @@ impl<'a> HttpProxyForwardTask<'a> {
CDW: AsyncWrite + Send + Unpin,
{
if self.http_notes.reused_connection {
if let Some(r) = ups_c.1.fill_wait_eof().now_or_never() {
if let Some(r) = ups_c.1.fill_wait_data().now_or_never() {
self.http_notes.retry_new_connection = true;
return match r {
Ok(_) => Err(ServerTaskError::ClosedByUpstream),
Ok(true) => Err(ServerTaskError::UpstreamAppError(anyhow!(
"unexpected data found when polling IDLE connection"
))),
Ok(false) => Err(ServerTaskError::ClosedByUpstream),
Err(e) => Err(ServerTaskError::UpstreamReadFailed(e)),
};
}
Expand Down
8 changes: 6 additions & 2 deletions g3proxy/src/serve/http_rproxy/task/forward/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::sync::Arc;

use anyhow::anyhow;
use futures_util::FutureExt;
use http::header;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt};
Expand Down Expand Up @@ -649,10 +650,13 @@ impl<'a> HttpRProxyForwardTask<'a> {
CDW: AsyncWrite + Unpin,
{
if self.http_notes.reused_connection {
if let Some(r) = ups_c.1.fill_wait_eof().now_or_never() {
if let Some(r) = ups_c.1.fill_wait_data().now_or_never() {
self.http_notes.retry_new_connection = true;
return match r {
Ok(_) => Err(ServerTaskError::ClosedByUpstream),
Ok(true) => Err(ServerTaskError::UpstreamAppError(anyhow!(
"unexpected data found when polling IDLE connection"
))),
Ok(false) => Err(ServerTaskError::ClosedByUpstream),
Err(e) => Err(ServerTaskError::UpstreamReadFailed(e)),
};
}
Expand Down
1 change: 1 addition & 0 deletions lib/g3-icap-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust-version = "1.75.0"
[dependencies]
anyhow.workspace = true
thiserror.workspace = true
log.workspace = true
memchr.workspace = true
atoi.workspace = true
url.workspace = true
Expand Down
13 changes: 12 additions & 1 deletion lib/g3-icap-client/src/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::Context;
use log::debug;
use tokio::io::BufReader;
use tokio::net::TcpStream;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -141,7 +142,17 @@ impl IcapConnectionEofPoller {

pub(super) async fn into_running(mut self) {
tokio::select! {
_ = self.conn.1.fill_wait_eof() => {}
r = self.conn.1.fill_wait_data() => {
match r {
Ok(true) => {
debug!("unexpected data found in this ICAP connection, drop it now");
}
Ok(false) => {}
Err(e) => {
debug!("ICAP connection closed with error {e}");
}
}
}
r = self.req_receiver.recv_async() => {
if let Ok(req) = r {
let IcapConnectionPollRequest {
Expand Down
57 changes: 0 additions & 57 deletions lib/g3-io-ext/src/io/ext/fill_wait_eof.rs

This file was deleted.

13 changes: 4 additions & 9 deletions lib/g3-io-ext/src/io/ext/limited_buf_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use bytes::BytesMut;
use tokio::io::AsyncBufRead;

use super::fill_wait_data::FillWaitData;
use super::fill_wait_eof::FillWaitEof;
use super::limited_read_buf_until::LimitedReadBufUntil;
use super::limited_read_until::LimitedReadUntil;
use super::limited_skip_until::LimitedSkipUntil;
Expand Down Expand Up @@ -55,15 +54,11 @@ pub trait LimitedBufReadExt: AsyncBufRead {
LimitedSkipUntil::new(self, delimiter, max_len)
}

/// return Poll::Ready(Ok(())) if read ready but no data can be read
/// Wait for data on Buffered IO Reader
///
/// return Poll::Ready(Ok(true)) if some data can be read
/// return Poll::Ready(Ok(false)) if read ready but no data can be read
/// return Poll::Ready(Err(e)) if read io error
fn fill_wait_eof(&mut self) -> FillWaitEof<Self>
where
Self: Unpin,
{
FillWaitEof::new(self)
}

fn fill_wait_data(&mut self) -> FillWaitData<Self>
where
Self: Unpin,
Expand Down
1 change: 0 additions & 1 deletion lib/g3-io-ext/src/io/ext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

mod fill_wait_data;
mod fill_wait_eof;
mod limited_read_buf_until;
mod limited_read_until;
mod limited_skip_until;
Expand Down

0 comments on commit 0a7d825

Please sign in to comment.