Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outbound): TCP/TLS route metrics #3355

Merged
merged 13 commits into from
Nov 13, 2024
1 change: 1 addition & 0 deletions linkerd/app/outbound/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use linkerd_app_core::{
use std::fmt::Write;

pub(crate) mod error;
pub(crate) mod transport;
pub use linkerd_app_core::{metrics::*, proxy::balance};

/// Holds LEGACY outbound proxy metrics.
Expand Down
263 changes: 263 additions & 0 deletions linkerd/app/outbound/src/metrics/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
use crate::{opaq, tls};
use linkerd_app_core::{
io,
metrics::prom::{self, encoding::*, registry::Registry, EncodeLabelSetMut, Family},
svc::{layer, NewService, Param, Service},
Error,
};
use std::{fmt::Debug, hash::Hash};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[derive(Clone, Debug)]
pub(crate) struct TransportRouteMetricsFamily<L> {
open: Family<L, prom::Counter>,
close: Family<ConnectionsClosedLabels<L>, prom::Counter>,
}

#[derive(Clone, Debug)]
struct TransportRouteMetrics {
open: prom::Counter,
close_no_err: prom::Counter,
close_forbidden: prom::Counter,
close_invalid_backend: prom::Counter,
close_invalid_policy: prom::Counter,
close_unexpected: prom::Counter,
}

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
enum ErrorKind {
Forbidden,
InvalidBackend,
InvalidPolicy,
Unexpected,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct ConnectionsClosedLabels<L> {
labels: L,
error: Option<ErrorKind>,
}

#[derive(Clone, Debug)]
pub(crate) struct NewTransportRouteMetrics<N, L: Clone> {
inner: N,
family: TransportRouteMetricsFamily<L>,
}

#[derive(Clone, Debug)]
pub(crate) struct TransportRouteMetricsService<S> {
inner: S,
metrics: TransportRouteMetrics,
}
// === impl TransportRouteMetricsFamily ===

impl<L> Default for TransportRouteMetricsFamily<L>
where
L: Clone + Hash + Eq,
{
fn default() -> Self {
Self {
open: prom::Family::default(),
close: prom::Family::default(),
}
}
}

impl<L> TransportRouteMetricsFamily<L>
where
L: Clone + Hash + Eq + EncodeLabelSetMut + Debug + Send + Sync + 'static,
{
pub(crate) fn register(registry: &mut Registry) -> Self {
let open = prom::Family::<L, prom::Counter>::default();
registry.register("open", "The number of connections opened", open.clone());

let close = prom::Family::<ConnectionsClosedLabels<L>, prom::Counter>::default();
registry.register("close", "The number of connections closed", close.clone());

Self { open, close }
}

fn closed_counter(&self, labels: &L, error: Option<ErrorKind>) -> prom::Counter {
self.close
.get_or_create(&ConnectionsClosedLabels {
labels: labels.clone(),
error,
})
.clone()
}

fn metrics(&self, labels: L) -> TransportRouteMetrics {
TransportRouteMetrics {
open: self.open.get_or_create(&labels).clone(),
close_no_err: self.closed_counter(&labels, None),
close_forbidden: self.closed_counter(&labels, Some(ErrorKind::Forbidden)),
close_invalid_backend: self.closed_counter(&labels, Some(ErrorKind::InvalidBackend)),
close_invalid_policy: self.closed_counter(&labels, Some(ErrorKind::InvalidPolicy)),
close_unexpected: self.closed_counter(&labels, Some(ErrorKind::Unexpected)),
}
}
}

impl ErrorKind {
fn mk(err: &(dyn std::error::Error + 'static)) -> Self {
if err.is::<opaq::TCPForbiddenRoute>() {
ErrorKind::Forbidden
} else if err.is::<opaq::TCPInvalidBackend>() {
ErrorKind::InvalidBackend
} else if err.is::<opaq::TCPInvalidPolicy>() {
ErrorKind::InvalidPolicy
} else if err.is::<tls::TLSForbiddenRoute>() {
ErrorKind::Forbidden
} else if err.is::<tls::TLSInvalidBackend>() {
ErrorKind::InvalidBackend
} else if err.is::<tls::TLSInvalidPolicy>() {
ErrorKind::InvalidPolicy
} else if let Some(e) = err.source() {
Self::mk(e)
} else {
ErrorKind::Unexpected
}
}
}

impl std::fmt::Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Forbidden => write!(f, "forbidden"),
Self::InvalidBackend => write!(f, "invalid_backend"),
Self::InvalidPolicy => write!(f, "invalid_policy"),
Self::Unexpected => write!(f, "unexpected"),
}
}
}

// === impl ConnectionsClosedLabels ===

impl<L> EncodeLabelSetMut for ConnectionsClosedLabels<L>
where
L: Clone + Hash + Eq + EncodeLabelSetMut + Debug + Send + Sync + 'static,
{
fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result {
self.labels.encode_label_set(enc)?;
match self.error {
Some(error) => ("error", error.to_string()).encode(enc.encode_label())?,
None => ("error", "").encode(enc.encode_label())?,
}

Ok(())
}
}

impl<L> EncodeLabelSet for ConnectionsClosedLabels<L>
where
L: Clone + Hash + Eq + EncodeLabelSetMut + Debug + Send + Sync + 'static,
{
fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result {
self.encode_label_set(&mut enc)
}
}

// === impl NewTransportRouteMetrics ===

impl<N, L: Clone> NewTransportRouteMetrics<N, L> {
pub fn layer(
family: TransportRouteMetricsFamily<L>,
) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self {
inner,
family: family.clone(),
})
}
}

impl<T, N, L, S> NewService<T> for NewTransportRouteMetrics<N, L>
where
N: NewService<T, Service = S>,
L: Clone + Hash + Eq + EncodeLabelSetMut + Debug + Send + Sync + 'static,
T: Param<L> + Clone,
{
type Service = TransportRouteMetricsService<S>;

fn new_service(&self, target: T) -> Self::Service {
let labels: L = target.param();
let metrics = self.family.metrics(labels);
let svc = self.inner.new_service(target);
TransportRouteMetricsService::new(svc, metrics)
}
}

// === impl TransportRouteMetricsService ===

impl<S> TransportRouteMetricsService<S> {
fn new(inner: S, metrics: TransportRouteMetrics) -> Self {
Self { inner, metrics }
}
}

impl<I, S> Service<I> for TransportRouteMetricsService<S>
where
I: io::AsyncRead + io::AsyncWrite + Send + 'static,
S: Service<I> + Send + Clone + 'static,
S::Error: Into<Error>,
S::Future: Send,
{
type Response = S::Response;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, Error>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, io: I) -> Self::Future {
let metrics = self.metrics.clone();

self.metrics.inc_open();
let call = self.inner.call(io);

Box::pin(async move {
match call.await.map_err(Into::into) {
Ok(result) => {
metrics.inc_closed(None);
Ok(result)
}
Err(error) => {
metrics.inc_closed(Some(&*error));
Err(error)
}
}
})
}
}

// === impl TransportRouteMetrics ===

impl TransportRouteMetrics {
fn inc_open(&self) {
self.open.inc();
}
fn inc_closed(&self, err: Option<&(dyn std::error::Error + 'static)>) {
match err.map(ErrorKind::mk) {
Some(ErrorKind::Forbidden) => {
self.close_forbidden.inc();
}
Some(ErrorKind::InvalidBackend) => {
self.close_invalid_backend.inc();
}
Some(ErrorKind::InvalidPolicy) => {
self.close_invalid_policy.inc();
}
Some(ErrorKind::Unexpected) => {
self.close_unexpected.inc();
}
None => {
self.close_no_err.inc();
}
}
}
}
7 changes: 5 additions & 2 deletions linkerd/app/outbound/src/opaq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ use tokio::sync::watch;
mod concrete;
mod logical;

pub use self::logical::{Concrete, Logical, Routes};
pub use self::logical::{route::filters::errors::*, Concrete, Logical, Routes};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Opaq<T>(T);

#[derive(Clone, Debug, Default)]
pub struct OpaqMetrics {
balance: concrete::BalancerMetrics,
route: logical::route::TcpRouteMetrics,
}

// === impl Outbound ===
Expand Down Expand Up @@ -82,7 +83,9 @@ impl OpaqMetrics {
pub fn register(registry: &mut prom::Registry) -> Self {
let balance =
concrete::BalancerMetrics::register(registry.sub_registry_with_prefix("balancer"));
Self { balance }
let route =
logical::route::TcpRouteMetrics::register(registry.sub_registry_with_prefix("route"));
Self { balance, route }
}
}

Expand Down
6 changes: 4 additions & 2 deletions linkerd/app/outbound/src/opaq/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ impl<N> Outbound<N> {
NSvc::Future: Send,
NSvc::Error: Into<Error>,
{
self.map_stack(|_config, _, concrete| {
self.map_stack(|_config, rt, concrete| {
let metrics = rt.metrics.prom.opaq.route.clone();

concrete
.lift_new()
.push_on_service(router::Router::layer())
.push_on_service(router::Router::layer(metrics.clone()))
.push_on_service(svc::NewMapErr::layer_from_target::<LogicalError, _>())
// Rebuild the inner router stack every time the watch changes.
.push(svc::NewSpawnWatch::<Routes, _>::layer_into::<
Expand Down
Loading
Loading