Skip to content

Commit

Permalink
g3proxy: add proxy_protocol support in direct_fixed escaper
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Feb 28, 2025
1 parent 31027a4 commit c933f3f
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 19 deletions.
12 changes: 11 additions & 1 deletion g3proxy/src/config/escaper/direct_fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use g3_types::acl::{AclAction, AclNetworkRuleBuilder};
use g3_types::metrics::{NodeName, StaticMetricsTags};
#[cfg(any(target_os = "linux", target_os = "android"))]
use g3_types::net::InterfaceName;
use g3_types::net::{HappyEyeballsConfig, TcpKeepAliveConfig, TcpMiscSockOpts, UdpMiscSockOpts};
use g3_types::net::{
HappyEyeballsConfig, ProxyProtocolVersion, TcpKeepAliveConfig, TcpMiscSockOpts, UdpMiscSockOpts,
};
use g3_types::resolve::{QueryStrategy, ResolveRedirectionBuilder, ResolveStrategy};
use g3_yaml::YamlDocPosition;

Expand Down Expand Up @@ -54,6 +56,7 @@ pub(crate) struct DirectFixedEscaperConfig {
pub(crate) tcp_misc_opts: TcpMiscSockOpts,
pub(crate) udp_misc_opts: UdpMiscSockOpts,
pub(crate) enable_path_selection: bool,
pub(crate) use_proxy_protocol: Option<ProxyProtocolVersion>,
pub(crate) extra_metrics_tags: Option<Arc<StaticMetricsTags>>,
}

Expand All @@ -79,6 +82,7 @@ impl DirectFixedEscaperConfig {
tcp_misc_opts: Default::default(),
udp_misc_opts: Default::default(),
enable_path_selection: false,
use_proxy_protocol: None,
extra_metrics_tags: None,
}
}
Expand Down Expand Up @@ -195,6 +199,12 @@ impl DirectFixedEscaperConfig {
.context(format!("invalid happy eyeballs config value for key {k}"))?;
Ok(())
}
"use_proxy_protocol" => {
let version = g3_yaml::value::as_proxy_protocol_version(v)
.context(format!("invalid ProxyProtocolVersion value for key {k}"))?;
self.use_proxy_protocol = Some(version);
Ok(())
}
_ => Err(anyhow!("invalid key {k}")),
}
}
Expand Down
6 changes: 5 additions & 1 deletion g3proxy/src/escape/direct_fixed/ftp_connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ impl DirectFixedEscaper {
task_notes: &ServerTaskNotes,
task_stats: ArcFtpTaskRemoteControlStats,
) -> Result<BoxFtpRemoteConnection, TcpConnectError> {
let stream = self
let mut stream = self
.tcp_connect_to(task_conf, tcp_notes, task_notes)
.await?;
if let Some(version) = self.config.use_proxy_protocol {
self.send_tcp_proxy_protocol_header(version, &mut stream, task_notes, false)
.await?;

Check warning on line 43 in g3proxy/src/escape/direct_fixed/ftp_connect/mod.rs

View check run for this annotation

Codecov / codecov/patch

g3proxy/src/escape/direct_fixed/ftp_connect/mod.rs#L42-L43

Added lines #L42 - L43 were not covered by tests
}

let mut wrapper_stats = FtpControlRemoteWrapperStats::new(&self.stats, task_stats);
wrapper_stats.push_user_io_stats(self.fetch_user_upstream_io_stats(task_notes));
Expand Down
6 changes: 5 additions & 1 deletion g3proxy/src/escape/direct_fixed/http_forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ impl DirectFixedEscaper {
task_notes: &ServerTaskNotes,
task_stats: ArcHttpForwardTaskRemoteStats,
) -> Result<BoxHttpForwardConnection, TcpConnectError> {
let stream = self
let mut stream = self
.tcp_connect_to(task_conf, tcp_notes, task_notes)
.await?;
if let Some(version) = self.config.use_proxy_protocol {
self.send_tcp_proxy_protocol_header(version, &mut stream, task_notes, false)
.await?;

Check warning on line 51 in g3proxy/src/escape/direct_fixed/http_forward/mod.rs

View check run for this annotation

Codecov / codecov/patch

g3proxy/src/escape/direct_fixed/http_forward/mod.rs#L50-L51

Added lines #L50 - L51 were not covered by tests
}

let (ups_r, ups_w) = stream.into_split();

Expand Down
31 changes: 30 additions & 1 deletion g3proxy/src/escape/direct_fixed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use slog::Logger;
use tokio::io::{AsyncWrite, AsyncWriteExt};

use g3_daemon::stat::remote::ArcTcpConnectionTaskRemoteStats;
use g3_resolver::ResolveError;
use g3_socket::BindAddr;
use g3_socket::util::AddressFamily;
use g3_types::acl::AclNetworkRule;
use g3_types::metrics::NodeName;
use g3_types::net::{Host, UpstreamAddr};
use g3_types::net::{Host, ProxyProtocolEncoder, ProxyProtocolVersion, UpstreamAddr};
use g3_types::resolve::{ResolveRedirection, ResolveStrategy};

use super::{
Expand Down Expand Up @@ -255,6 +256,34 @@ impl DirectFixedEscaper {
}
}

async fn send_tcp_proxy_protocol_header<W>(
&self,
version: ProxyProtocolVersion,
writer: &mut W,
task_notes: &ServerTaskNotes,
do_flush: bool,
) -> Result<(), TcpConnectError>
where
W: AsyncWrite + Unpin,
{
let mut encoder = ProxyProtocolEncoder::new(version);
let bytes = encoder
.encode_tcp(task_notes.client_addr(), task_notes.server_addr())
.map_err(TcpConnectError::ProxyProtocolEncodeError)?;
writer
.write_all(bytes) // no need to flush data
.await
.map_err(TcpConnectError::ProxyProtocolWriteFailed)?;
self.stats.tcp.io.add_out_bytes(bytes.len() as u64);
if do_flush {
writer
.flush()
.await
.map_err(TcpConnectError::ProxyProtocolWriteFailed)?;
}

Check warning on line 283 in g3proxy/src/escape/direct_fixed/mod.rs

View check run for this annotation

Codecov / codecov/patch

g3proxy/src/escape/direct_fixed/mod.rs#L283

Added line #L283 was not covered by tests
Ok(())
}

fn fetch_user_upstream_io_stats(
&self,
task_notes: &ServerTaskNotes,
Expand Down
6 changes: 5 additions & 1 deletion g3proxy/src/escape/direct_fixed/tcp_connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,13 @@ impl DirectFixedEscaper {
task_notes: &ServerTaskNotes,
task_stats: ArcTcpConnectionTaskRemoteStats,
) -> TcpConnectResult {
let stream = self
let mut stream = self
.tcp_connect_to(task_conf, tcp_notes, task_notes)
.await?;
if let Some(version) = self.config.use_proxy_protocol {
self.send_tcp_proxy_protocol_header(version, &mut stream, task_notes, true)
.await?;
}
let (r, w) = stream.into_split();

let mut wrapper_stats = TcpConnectRemoteWrapperStats::new(&self.stats, task_stats);
Expand Down
6 changes: 5 additions & 1 deletion g3proxy/src/escape/direct_fixed/tls_connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ impl DirectFixedEscaper {
task_notes: &ServerTaskNotes,
tls_application: TlsApplication,
) -> Result<SslStream<impl AsyncRead + AsyncWrite + use<>>, TcpConnectError> {
let stream = self
let mut stream = self
.tcp_connect_to(&task_conf.tcp, tcp_notes, task_notes)
.await?;
if let Some(version) = self.config.use_proxy_protocol {
self.send_tcp_proxy_protocol_header(version, &mut stream, task_notes, false)
.await?;

Check warning on line 48 in g3proxy/src/escape/direct_fixed/tls_connect/mod.rs

View check run for this annotation

Codecov / codecov/patch

g3proxy/src/escape/direct_fixed/tls_connect/mod.rs#L47-L48

Added lines #L47 - L48 were not covered by tests
}

// set limit config and add escaper stats, do not count in task stats
let limit_config = &self.config.general.tcp_sock_speed_limit;
Expand Down
53 changes: 53 additions & 0 deletions scripts/coverage/g3proxy/0003_chain_tcp_stream/g3proxy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---

log: discard

stat:
target:
udp: 127.0.0.1:8125

resolver:
- name: default
type: c-ares
server:
- 127.0.0.1

escaper:
- name: default
type: direct_fixed
resolver: default
egress_net_filter:
default: allow
allow: 127.0.0.1
- name: to_inner
type: direct_fixed
resolver: default
egress_net_filter:
default: allow
allow: 127.0.0.1
use_proxy_protocol: 2

server:
- name: tcp
type: tcp_stream
listen: 127.0.0.1:8080
escaper: to_inner
upstream: 127.0.0.1:8081
- name: tls
type: tls_stream
escaper: to_inner
listen: 127.0.0.1:8443
tls_server:
cert_pairs:
certificate: ../httpbin.local.pem
private-key: ../httpbin.local-key.pem
upstream: 127.0.0.1:8081
- name: inner_tcp_server
type: tcp_stream
escaper: default
upstream: 127.0.0.1:80
- name: inner_tcp_port
type: plain_tcp_port
listen: 127.0.0.1:8081
server: inner_tcp_server
proxy_protocol: 2
5 changes: 5 additions & 0 deletions scripts/coverage/g3proxy/0003_chain_tcp_stream/testcases.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh

python3 "${PROJECT_DIR}/g3proxy/ci/python3+curl/test_httpbin.py" -T http://httpbin.local:8080

python3 "${PROJECT_DIR}/g3proxy/ci/python3+curl/test_httpbin.py" -T https://httpbin.local:8443 --ca-cert "${TEST_CA_CERT_FILE}"
11 changes: 11 additions & 0 deletions sphinx/g3proxy/configuration/escapers/direct_fixed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,14 @@ Weather we should enable path selection.
.. note:: Path selection on server side should be open, or this option will have no effects.

**default**: false

use_proxy_protocol
------------------

**optional**, **type**: :ref:`proxy protocol version <conf_value_proxy_protocol_version>`

Set the version of PROXY protocol we use for outgoing tcp connections except for FTP data connections.

**default**: not set, which means PROXY protocol won't be used

.. versionadded:: 1.11.3
11 changes: 0 additions & 11 deletions sphinx/g3proxy/configuration/escapers/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,6 @@ to the real username, the password field set to our package name (g3proxy if not

.. note:: This will conflict with the real auth of next proxy.

.. _conf_escaper_common_use_proxy_protocol:

use_proxy_protocol
------------------

**optional**, **type**: :ref:`proxy protocol version <conf_value_proxy_protocol_version>`

Set the version of PROXY protocol we use for outgoing tcp connections.

**default**: not set, which means PROXY protocol won't be used

.. _conf_escaper_common_peer_negotiation_timeout:

peer_negotiation_timeout
Expand Down
10 changes: 9 additions & 1 deletion sphinx/g3proxy/configuration/escapers/proxy_http.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ The following common keys are supported:
* :ref:`happy eyeballs <conf_escaper_common_happy_eyeballs>`
* :ref:`tcp_misc_opts <conf_escaper_common_tcp_misc_opts>`
* :ref:`pass_proxy_userid <conf_escaper_common_pass_proxy_userid>`
* :ref:`use_proxy_protocol <conf_escaper_common_use_proxy_protocol>`
* :ref:`peer negotiation timeout <conf_escaper_common_peer_negotiation_timeout>`
* :ref:`extra_metrics_tags <conf_escaper_common_extra_metrics_tags>`

Expand Down Expand Up @@ -113,3 +112,12 @@ Set tcp keepalive.
The tcp keepalive set in user config won't be taken into account.

**default**: no keepalive set

use_proxy_protocol
------------------

**optional**, **type**: :ref:`proxy protocol version <conf_value_proxy_protocol_version>`

Set the version of PROXY protocol to use after TCP connected to the peer.

**default**: not set, which means PROXY protocol won't be used
10 changes: 9 additions & 1 deletion sphinx/g3proxy/configuration/escapers/proxy_https.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ The following common keys are supported:
* :ref:`happy eyeballs <conf_escaper_common_happy_eyeballs>`
* :ref:`tcp_misc_opts <conf_escaper_common_tcp_misc_opts>`
* :ref:`pass_proxy_userid <conf_escaper_common_pass_proxy_userid>`
* :ref:`use_proxy_protocol <conf_escaper_common_use_proxy_protocol>`
* :ref:`peer negotiation timeout <conf_escaper_common_peer_negotiation_timeout>`
* :ref:`extra_metrics_tags <conf_escaper_common_extra_metrics_tags>`

Expand Down Expand Up @@ -132,3 +131,12 @@ Set tcp keepalive.
The tcp keepalive set in user config won't be taken into account.

**default**: no keepalive set

use_proxy_protocol
------------------

**optional**, **type**: :ref:`proxy protocol version <conf_value_proxy_protocol_version>`

Set the version of PROXY protocol to use after TCP connected to the peer.

**default**: not set, which means PROXY protocol won't be used

0 comments on commit c933f3f

Please sign in to comment.