Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Zahari Dichev <[email protected]>
  • Loading branch information
zaharidichev committed Nov 12, 2024
1 parent b04a692 commit ff2eba4
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 250 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,6 @@ dependencies = [
"linkerd-app-core",
"linkerd-app-test",
"linkerd-distribute",
"linkerd-errno",
"linkerd-http-classify",
"linkerd-http-prom",
"linkerd-http-retry",
Expand Down Expand Up @@ -2549,7 +2548,6 @@ version = "0.1.0"
dependencies = [
"futures",
"linkerd-errno",
"linkerd-error",
"linkerd-io",
"linkerd-metrics",
"linkerd-stack",
Expand Down
1 change: 0 additions & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ tracing = "0.1"

linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-errno = { path = "../../errno" }
linkerd-distribute = { path = "../../distribute" }
linkerd-http-classify = { path = "../../http/classify" }
linkerd-http-prom = { path = "../../http/prom" }
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/outbound/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use linkerd_app_core::{
use std::fmt::Write;

pub(crate) mod error;
pub(crate) mod transport;
pub use linkerd_app_core::{metrics::*, proxy::balance};

/// Holds LEGACY outbound proxy metrics.
Expand Down
271 changes: 271 additions & 0 deletions linkerd/app/outbound/src/metrics/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
use crate::{opaq, tls};
use linkerd_app_core::{
io,
metrics::prom::{self, encoding::*, registry::Registry, EncodeLabelSetMut, Family},
svc::{layer, NewService, Param, Service, ServiceExt},
Error,
};
use std::{fmt::Debug, hash::Hash};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[derive(Clone, Debug)]
pub(crate) struct TransportRouteMetricsFamily<L> {
open: Family<L, prom::Counter>,
close: Family<ConnectionsClosedLabels<L>, prom::Counter>,
}

#[derive(Clone, Debug)]
struct TransportRouteMetrics {
open: prom::Counter,
close_no_err: prom::Counter,
close_forbidden: prom::Counter,
close_invalid_backend: prom::Counter,
close_invalid_policy: prom::Counter,
close_unexpected: prom::Counter,
}

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
enum ErrorKind {
Forbidden,
InvalidBackend,
InvalidPolicy,
Unexpected,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct ConnectionsClosedLabels<L> {
labels: L,
error: Option<ErrorKind>,
}

#[derive(Clone, Debug)]
pub(crate) struct NewTransportRouteMetrics<N, L: Clone> {
inner: N,
family: TransportRouteMetricsFamily<L>,
}

#[derive(Clone, Debug)]
pub(crate) struct TransportRouteMetricsService<T, N> {
target: T,
inner: N,
metrics: TransportRouteMetrics,
}
// === impl TransportRouteMetricsFamily ===

impl<L> Default for TransportRouteMetricsFamily<L>
where
L: EncodeLabelSetMut + std::fmt::Debug + std::hash::Hash,
L: Eq + Clone,
{
fn default() -> Self {
Self {
open: prom::Family::default(),
close: prom::Family::default(),
}
}
}

impl<L> TransportRouteMetricsFamily<L>
where
L: EncodeLabelSetMut + std::fmt::Debug + std::hash::Hash,
L: Eq + Clone + Send + Sync + 'static,
{
pub(crate) fn register(registry: &mut Registry) -> Self {
let open = prom::Family::<L, prom::Counter>::default();
registry.register("open", "The number of connections opened", open.clone());

let close = prom::Family::<ConnectionsClosedLabels<L>, prom::Counter>::default();
registry.register("close", "The number of connections closed", close.clone());

Self { open, close }
}

fn closed_counter(&self, labels: &L, error: Option<ErrorKind>) -> prom::Counter {
self.close
.get_or_create(&ConnectionsClosedLabels {
labels: labels.clone(),
error,
})
.clone()
}

fn metrics(&self, labels: L) -> TransportRouteMetrics {
TransportRouteMetrics {
open: self.open.get_or_create(&labels).clone(),
close_no_err: self.closed_counter(&labels, None),
close_forbidden: self.closed_counter(&labels, Some(ErrorKind::Forbidden)),
close_invalid_backend: self.closed_counter(&labels, Some(ErrorKind::InvalidBackend)),
close_invalid_policy: self.closed_counter(&labels, Some(ErrorKind::InvalidPolicy)),
close_unexpected: self.closed_counter(&labels, Some(ErrorKind::Unexpected)),
}
}
}

impl ErrorKind {
fn mk(err: &(dyn std::error::Error + 'static)) -> Self {
if err.is::<opaq::TCPForbiddenRoute>() {
ErrorKind::Forbidden
} else if err.is::<opaq::TCPInvalidBackend>() {
ErrorKind::InvalidBackend
} else if err.is::<opaq::TCPInvalidPolicy>() {
ErrorKind::InvalidPolicy
} else if err.is::<tls::TLSForbiddenRoute>() {
ErrorKind::Forbidden
} else if err.is::<tls::TLSInvalidBackend>() {
ErrorKind::InvalidBackend
} else if err.is::<tls::TLSInvalidPolicy>() {
ErrorKind::InvalidPolicy
} else if let Some(e) = err.source() {
Self::mk(e)
} else {
ErrorKind::Unexpected
}
}
}

impl std::fmt::Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Forbidden => write!(f, "forbidden"),
Self::InvalidBackend => write!(f, "invalid_backend"),
Self::InvalidPolicy => write!(f, "invalid_policy"),
Self::Unexpected => write!(f, "unexpected"),
}
}
}

// === impl ConnectionsClosedLabels ===

impl<L> EncodeLabelSetMut for ConnectionsClosedLabels<L>
where
L: Clone + Hash + Eq + EncodeLabelSetMut + Debug + Send + Sync + 'static,
{
fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result {
self.labels.encode_label_set(enc)?;
match self.error {
Some(error) => ("error", error.to_string()).encode(enc.encode_label())?,
None => ("error", "").encode(enc.encode_label())?,
}

Ok(())
}
}

impl<L> EncodeLabelSet for ConnectionsClosedLabels<L>
where
L: Clone + Hash + Eq + EncodeLabelSetMut + Debug + Send + Sync + 'static,
{
fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result {
self.encode_label_set(&mut enc)
}
}

// === impl NewTransportRouteMetrics ===

impl<N, L: Clone> NewTransportRouteMetrics<N, L> {
pub fn layer(
family: TransportRouteMetricsFamily<L>,
) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self {
inner,
family: family.clone(),
})
}
}

impl<T, N, L> NewService<T> for NewTransportRouteMetrics<N, L>
where
N: Clone,
L: Clone + Hash + Eq + EncodeLabelSetMut + Debug + Send + Sync + 'static,
T: Param<L>,
{
type Service = TransportRouteMetricsService<T, N>;

fn new_service(&self, target: T) -> Self::Service {
let labels: L = target.param();
let metrics = self.family.metrics(labels);
TransportRouteMetricsService::new(target, self.inner.clone(), metrics)
}
}

// === impl TransportRouteMetricsService ===

impl<T, N> TransportRouteMetricsService<T, N> {
fn new(target: T, inner: N, metrics: TransportRouteMetrics) -> Self {
Self {
target,
inner,
metrics,
}
}
}

impl<T, I, N, S> Service<I> for TransportRouteMetricsService<T, N>
where
T: Clone + Send + Sync + 'static,
I: io::AsyncRead + io::AsyncWrite + Debug + Send + Unpin + 'static,
N: NewService<T, Service = S> + Clone + Send + 'static,
S: Service<I> + Send,
S::Error: Into<Error>,
S::Future: Send,
{
type Response = S::Response;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, Error>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, io: I) -> Self::Future {
let target = self.target.clone();
let new_accept = self.inner.clone();
let metrics = self.metrics.clone();

Box::pin(async move {
let svc = new_accept.new_service(target);
metrics.inc_open();

match svc.oneshot(io).await.map_err(Into::into) {
Ok(result) => {
metrics.inc_closed(None);
Ok(result)
}
Err(error) => {
metrics.inc_closed(Some(&*error));
Err(error)
}
}
})
}
}

impl TransportRouteMetrics {
fn inc_open(&self) {
self.open.inc();
}
fn inc_closed(&self, err: Option<&(dyn std::error::Error + 'static)>) {
match err.map(ErrorKind::mk) {
Some(ErrorKind::Forbidden) => {
self.close_forbidden.inc();
}
Some(ErrorKind::InvalidBackend) => {
self.close_invalid_backend.inc();
}
Some(ErrorKind::InvalidPolicy) => {
self.close_invalid_policy.inc();
}
Some(ErrorKind::Unexpected) => {
self.close_unexpected.inc();
}
None => {
self.close_no_err.inc();
}
}
}
}
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/opaq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::watch;
mod concrete;
mod logical;

pub use self::logical::{Concrete, Logical, Routes};
pub use self::logical::{route::filters::errors::*, Concrete, Logical, Routes};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Opaq<T>(T);
Expand Down
17 changes: 7 additions & 10 deletions linkerd/app/outbound/src/opaq/logical/route.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use super::{super::Concrete, Logical};
use crate::{ParentRef, RouteRef};
use linkerd_app_core::{
io,
metrics::prom,
svc,
transport::metrics::tcp::{client::NewInstrumentConnection, TcpMetricsParams},
Addr, Error,
use crate::{
metrics::transport::{NewTransportRouteMetrics, TransportRouteMetricsFamily},
ParentRef, RouteRef,
};
use linkerd_app_core::{io, metrics::prom, svc, Addr, Error};
use linkerd_distribute as distribute;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};

mod filters;
pub(crate) mod filters;

pub type TcpRouteMetrics = TcpMetricsParams<RouteLabels>;
pub type TcpRouteMetrics = TransportRouteMetricsFamily<RouteLabels>;

#[derive(Debug, PartialEq, Eq, Hash)]
pub(crate) struct Backend<T> {
Expand Down Expand Up @@ -110,7 +107,7 @@ where
source,
}
}))
.push(NewInstrumentConnection::layer(metrics.clone()))
.push(NewTransportRouteMetrics::layer(metrics.clone()))
.arc_new_clone_tcp()
.into_inner()
})
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::sync::watch;
mod concrete;
mod logical;

pub use self::logical::{Concrete, Routes};
pub use self::logical::{route::filters::errors::*, Concrete, Routes};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Tls<T> {
Expand Down
18 changes: 7 additions & 11 deletions linkerd/app/outbound/src/tls/logical/route.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use super::super::Concrete;
use crate::{ParentRef, RouteRef};
use linkerd_app_core::{
io,
metrics::prom,
svc,
tls::ServerName,
transport::metrics::tcp::{client::NewInstrumentConnection, TcpMetricsParams},
Addr, Error,
use crate::{
metrics::transport::{NewTransportRouteMetrics, TransportRouteMetricsFamily},
ParentRef, RouteRef,
};
use linkerd_app_core::{io, metrics::prom, svc, tls::ServerName, Addr, Error};
use linkerd_distribute as distribute;
use linkerd_proxy_client_policy as policy;
use linkerd_tls_route as tls_route;
use std::{fmt::Debug, hash::Hash, sync::Arc};

mod filters;
pub(crate) mod filters;

pub type TlsRouteMetrics = TcpMetricsParams<RouteLabels>;
pub type TlsRouteMetrics = TransportRouteMetricsFamily<RouteLabels>;

#[derive(Debug, PartialEq, Eq, Hash)]
pub(crate) struct Backend<T> {
Expand Down Expand Up @@ -115,7 +111,7 @@ where
source,
}
}))
.push(NewInstrumentConnection::layer(metrics.clone()))
.push(NewTransportRouteMetrics::layer(metrics.clone()))
.arc_new_clone_tcp()
.into_inner()
})
Expand Down
Loading

0 comments on commit ff2eba4

Please sign in to comment.