From 70ef070e8644f87864ee0b957237a3267b10a579 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 11 Nov 2024 13:33:08 +0000 Subject: [PATCH] policy: add filters for TCP and TLS policy Signed-off-by: Zahari Dichev --- .../app/outbound/src/opaq/logical/route.rs | 24 ++++++++- .../src/opaq/logical/route/filters.rs | 44 +++++++++++++++ .../app/outbound/src/opaq/logical/router.rs | 34 ++++++------ linkerd/app/outbound/src/tls/logical/route.rs | 24 ++++++++- .../outbound/src/tls/logical/route/filters.rs | 44 +++++++++++++++ .../app/outbound/src/tls/logical/router.rs | 36 +++++++------ linkerd/proxy/client-policy/src/lib.rs | 17 ++++-- linkerd/proxy/client-policy/src/opaq.rs | 50 +++++++++++------ linkerd/proxy/client-policy/src/tls.rs | 53 ++++++++++++------- 9 files changed, 256 insertions(+), 70 deletions(-) create mode 100644 linkerd/app/outbound/src/opaq/logical/route/filters.rs create mode 100644 linkerd/app/outbound/src/tls/logical/route/filters.rs diff --git a/linkerd/app/outbound/src/opaq/logical/route.rs b/linkerd/app/outbound/src/opaq/logical/route.rs index 948bbf8b80..dcff4ba3c2 100644 --- a/linkerd/app/outbound/src/opaq/logical/route.rs +++ b/linkerd/app/outbound/src/opaq/logical/route.rs @@ -2,12 +2,16 @@ use super::{super::Concrete, Logical}; use crate::RouteRef; use linkerd_app_core::{io, svc, Error}; use linkerd_distribute as distribute; -use std::{fmt::Debug, hash::Hash}; +use linkerd_proxy_client_policy as policy; +use std::{fmt::Debug, hash::Hash, sync::Arc}; + +mod filters; #[derive(Debug, PartialEq, Eq, Hash)] pub(crate) struct Backend { pub(crate) route_ref: RouteRef, pub(crate) concrete: Concrete, + pub(super) filters: Arc<[policy::opaq::Filter]>, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -20,6 +24,7 @@ pub(crate) struct Route { pub(super) parent: T, pub(super) logical: Logical, pub(super) route_ref: RouteRef, + pub(super) filters: Arc<[policy::opaq::Filter]>, pub(super) distribution: BackendDistribution, } @@ -42,6 +47,7 @@ impl Clone for Backend { Self { route_ref: self.route_ref.clone(), concrete: self.concrete.clone(), + filters: self.filters.clone(), } } } @@ -71,12 +77,16 @@ where svc::stack(inner) .push_map_target(|t| t) .push_map_target(|b: Backend| b.concrete) + // apply backend filters + .push_filter(filters::apply) .lift_new() .push(NewDistribute::layer()) // The router does not take the backend's availability into // consideration, so we must eagerly fail requests to prevent // leaking tasks onto the runtime. .push_on_service(svc::LoadShed::layer()) + // apply route level filters + .push_filter(filters::apply) .push(svc::NewMapErr::layer_with(|rt: &Self| { let route = rt.params.route_ref.clone(); move |source| RouteError { @@ -95,3 +105,15 @@ impl svc::Param> for MatchedRoute { self.params.distribution.clone() } } + +impl svc::Param> for MatchedRoute { + fn param(&self) -> Arc<[policy::opaq::Filter]> { + self.params.filters.clone() + } +} + +impl svc::Param> for Backend { + fn param(&self) -> Arc<[policy::opaq::Filter]> { + self.filters.clone() + } +} diff --git a/linkerd/app/outbound/src/opaq/logical/route/filters.rs b/linkerd/app/outbound/src/opaq/logical/route/filters.rs new file mode 100644 index 0000000000..3b69953b3e --- /dev/null +++ b/linkerd/app/outbound/src/opaq/logical/route/filters.rs @@ -0,0 +1,44 @@ +use linkerd_app_core::{svc, Error}; +use linkerd_proxy_client_policy::opaq; +use std::{fmt::Debug, sync::Arc}; + +pub(crate) fn apply(t: T) -> Result +where + T: Clone, + T: svc::Param>, +{ + let filters: &[opaq::Filter] = &t.param(); + if let Some(filter) = filters.iter().next() { + match filter { + opaq::Filter::ForbiddenRoute => { + return Err(errors::TCPForbiddenRoute.into()); + } + + opaq::Filter::InvalidBackend(message) => { + return Err(errors::TCPInvalidBackend(message.clone()).into()); + } + + opaq::Filter::InternalError(message) => { + return Err(errors::TCPInvalidPolicy(message).into()); + } + } + } + + Ok(t) +} + +pub mod errors { + use super::*; + + #[derive(Debug, thiserror::Error)] + #[error("forbidden TCP route")] + pub struct TCPForbiddenRoute; + + #[derive(Debug, thiserror::Error)] + #[error("invalid TCP backend: {0}")] + pub struct TCPInvalidBackend(pub Arc); + + #[derive(Debug, thiserror::Error)] + #[error("invalid client policy: {0}")] + pub struct TCPInvalidPolicy(pub &'static str); +} diff --git a/linkerd/app/outbound/src/opaq/logical/router.rs b/linkerd/app/outbound/src/opaq/logical/router.rs index a5a2a374c5..cef2571e4a 100644 --- a/linkerd/app/outbound/src/opaq/logical/router.rs +++ b/linkerd/app/outbound/src/opaq/logical/router.rs @@ -100,6 +100,7 @@ where let concrete = mk_dispatch(&rb.backend); route::Backend { route_ref: route_ref.clone(), + filters: rb.filters.clone(), concrete, } }; @@ -122,21 +123,24 @@ where } }; - let mk_policy = - |policy::RoutePolicy:: { - meta, distribution, .. - }| { - let route_ref = RouteRef(meta); - let logical = logical.clone(); - - let distribution = mk_distribution(&route_ref, &distribution); - route::Route { - logical, - parent: parent.clone(), - route_ref, - distribution, - } - }; + let mk_policy = |policy::RoutePolicy:: { + meta, + distribution, + filters, + .. + }| { + let route_ref = RouteRef(meta); + let logical = logical.clone(); + + let distribution = mk_distribution(&route_ref, &distribution); + route::Route { + logical, + parent: parent.clone(), + route_ref, + filters, + distribution, + } + }; let routes = routes.as_ref().map(|route| opaq_route::Route { policy: mk_policy(route.policy.clone()), diff --git a/linkerd/app/outbound/src/tls/logical/route.rs b/linkerd/app/outbound/src/tls/logical/route.rs index 7f46c9f28e..77a8ae900a 100644 --- a/linkerd/app/outbound/src/tls/logical/route.rs +++ b/linkerd/app/outbound/src/tls/logical/route.rs @@ -2,13 +2,17 @@ use super::super::Concrete; use crate::{ParentRef, RouteRef}; use linkerd_app_core::{io, svc, Addr, Error}; use linkerd_distribute as distribute; +use linkerd_proxy_client_policy as policy; use linkerd_tls_route as tls_route; -use std::{fmt::Debug, hash::Hash}; +use std::{fmt::Debug, hash::Hash, sync::Arc}; + +mod filters; #[derive(Debug, PartialEq, Eq, Hash)] pub(crate) struct Backend { pub(crate) route_ref: RouteRef, pub(crate) concrete: Concrete, + pub(super) filters: Arc<[policy::tls::Filter]>, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -23,6 +27,7 @@ pub(crate) struct Route { pub(super) addr: Addr, pub(super) parent_ref: ParentRef, pub(super) route_ref: RouteRef, + pub(super) filters: Arc<[policy::tls::Filter]>, pub(super) distribution: BackendDistribution, } @@ -45,6 +50,7 @@ impl Clone for Backend { Self { route_ref: self.route_ref.clone(), concrete: self.concrete.clone(), + filters: self.filters.clone(), } } } @@ -74,12 +80,16 @@ where svc::stack(inner) .push_map_target(|t| t) .push_map_target(|b: Backend| b.concrete) + // apply backend filters + .push_filter(filters::apply) .lift_new() .push(NewDistribute::layer()) // The router does not take the backend's availability into // consideration, so we must eagerly fail requests to prevent // leaking tasks onto the runtime. .push_on_service(svc::LoadShed::layer()) + // apply route level filters + .push_filter(filters::apply) .push(svc::NewMapErr::layer_with(|rt: &Self| { let route = rt.params.route_ref.clone(); move |source| RouteError { @@ -98,3 +108,15 @@ impl svc::Param> for MatchedRoute { self.params.distribution.clone() } } + +impl svc::Param> for MatchedRoute { + fn param(&self) -> Arc<[policy::tls::Filter]> { + self.params.filters.clone() + } +} + +impl svc::Param> for Backend { + fn param(&self) -> Arc<[policy::tls::Filter]> { + self.filters.clone() + } +} diff --git a/linkerd/app/outbound/src/tls/logical/route/filters.rs b/linkerd/app/outbound/src/tls/logical/route/filters.rs new file mode 100644 index 0000000000..cc2d598cdb --- /dev/null +++ b/linkerd/app/outbound/src/tls/logical/route/filters.rs @@ -0,0 +1,44 @@ +use linkerd_app_core::{svc, Error}; +use linkerd_proxy_client_policy::tls; +use std::{fmt::Debug, sync::Arc}; + +pub(crate) fn apply(t: T) -> Result +where + T: Clone, + T: svc::Param>, +{ + let filters: &[tls::Filter] = &t.param(); + if let Some(filter) = filters.iter().next() { + match filter { + tls::Filter::ForbiddenRoute => { + return Err(errors::TlSForbiddenRoute.into()); + } + + tls::Filter::InvalidBackend(message) => { + return Err(errors::TLSInvalidBackend(message.clone()).into()); + } + + tls::Filter::InternalError(message) => { + return Err(errors::TLSInvalidPolicy(message).into()); + } + } + } + + Ok(t) +} + +pub mod errors { + use super::*; + + #[derive(Debug, thiserror::Error)] + #[error("forbidden TLS route")] + pub struct TlSForbiddenRoute; + + #[derive(Debug, thiserror::Error)] + #[error("invalid TLS backend: {0}")] + pub struct TLSInvalidBackend(pub Arc); + + #[derive(Debug, thiserror::Error)] + #[error("invalid client policy: {0}")] + pub struct TLSInvalidPolicy(pub &'static str); +} diff --git a/linkerd/app/outbound/src/tls/logical/router.rs b/linkerd/app/outbound/src/tls/logical/router.rs index 60bc6b64cd..398d0d0b8b 100644 --- a/linkerd/app/outbound/src/tls/logical/router.rs +++ b/linkerd/app/outbound/src/tls/logical/router.rs @@ -105,6 +105,7 @@ where let concrete = mk_dispatch(&rb.backend); route::Backend { route_ref: route_ref.clone(), + filters: rb.filters.clone(), concrete, } }; @@ -127,22 +128,25 @@ where } }; - let mk_policy = - |policy::RoutePolicy:: { - meta, distribution, .. - }| { - let route_ref = RouteRef(meta); - let parent_ref = parent_ref.clone(); - - let distribution = mk_distribution(&route_ref, &distribution); - route::Route { - addr: addr.clone(), - parent: parent.clone(), - parent_ref: parent_ref.clone(), - route_ref, - distribution, - } - }; + let mk_policy = |policy::RoutePolicy:: { + meta, + distribution, + filters, + .. + }| { + let route_ref = RouteRef(meta); + let parent_ref = parent_ref.clone(); + + let distribution = mk_distribution(&route_ref, &distribution); + route::Route { + addr: addr.clone(), + parent: parent.clone(), + parent_ref: parent_ref.clone(), + route_ref, + filters, + distribution, + } + }; let routes = routes .iter() diff --git a/linkerd/proxy/client-policy/src/lib.rs b/linkerd/proxy/client-policy/src/lib.rs index 5f4fe1659e..ca8d434121 100644 --- a/linkerd/proxy/client-policy/src/lib.rs +++ b/linkerd/proxy/client-policy/src/lib.rs @@ -172,9 +172,20 @@ impl ClientPolicy { routes: HTTP_ROUTES.clone(), failure_accrual: Default::default(), }, - // TODO(ver): Include a route with a filter that emits errors - // for connections. - opaque: opaq::Opaque { routes: None }, + + opaque: opaq::Opaque { + routes: Some(opaq::Route { + policy: opaq::Policy { + meta: META.clone(), + filters: std::iter::once(opaq::Filter::InternalError( + "invalid client policy configuration", + )) + .collect(), + distribution: RouteDistribution::Empty, + params: (), + }, + }), + }, }, backends: BACKENDS.clone(), } diff --git a/linkerd/proxy/client-policy/src/opaq.rs b/linkerd/proxy/client-policy/src/opaq.rs index c759d6c50a..0cc6070ead 100644 --- a/linkerd/proxy/client-policy/src/opaq.rs +++ b/linkerd/proxy/client-policy/src/opaq.rs @@ -13,7 +13,11 @@ pub struct Opaque { pub struct NonIoErrors; #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub enum Filter {} +pub enum Filter { + ForbiddenRoute, + InvalidBackend(std::sync::Arc), + InternalError(&'static str), +} impl NonIoErrors { pub fn contains(&self, e: &(dyn std::error::Error + 'static)) -> bool { @@ -27,7 +31,7 @@ pub(crate) mod proto { use super::*; use crate::{ proto::{BackendSet, InvalidBackend, InvalidDistribution, InvalidMeta}, - Meta, RouteBackend, RouteDistribution, + Backend, Meta, RouteBackend, RouteDistribution, }; use linkerd2_proxy_api::outbound::{self, opaque_route}; @@ -83,7 +87,7 @@ pub(crate) mod proto { outbound::OpaqueRoute { metadata, rules, - error: _, // TODO + error, }: outbound::OpaqueRoute, ) -> Result { let meta = Arc::new( @@ -100,21 +104,27 @@ pub(crate) mod proto { } let rule = rules.first().cloned().expect("already checked"); - let policy = try_rule(&meta, rule)?; + let policy = try_rule(&meta, rule, error)?; Ok(Route { policy }) } fn try_rule( meta: &Arc, opaque_route::Rule { backends }: opaque_route::Rule, + route_error: Option, ) -> Result { let distribution = backends .ok_or(InvalidOpaqueRoute::Missing("distribution"))? .try_into()?; + let filters = match route_error { + Some(e) => Arc::new([e.into()]), + None => NO_FILTERS.clone(), + }; + Ok(Policy { meta: meta.clone(), - filters: NO_FILTERS.clone(), + filters, params: (), distribution, }) @@ -165,22 +175,30 @@ pub(crate) mod proto { impl TryFrom for RouteBackend { type Error = InvalidBackend; fn try_from( - opaque_route::RouteBackend { - backend, - invalid: _, // TODO - }: opaque_route::RouteBackend, + opaque_route::RouteBackend { backend, invalid }: opaque_route::RouteBackend, ) -> Result { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, std::iter::empty::<()>()) + + let backend = Backend::try_from(backend)?; + + let filters = match invalid { + Some(invalid) => Arc::new([invalid.into()]), + None => NO_FILTERS.clone(), + }; + + Ok(RouteBackend { filters, backend }) + } + } + + impl From for Filter { + fn from(_: opaque_route::RouteError) -> Self { + Self::ForbiddenRoute } } - // Necessary to satisfy `RouteBackend::try_from_proto` type constraints. - // TODO(eliza): if filters are added to opaque routes, change this to a - // proper `TryFrom` impl... - impl From<()> for Filter { - fn from(_: ()) -> Self { - unreachable!("no filters can be configured on opaque routes yet") + impl From for Filter { + fn from(ib: opaque_route::route_backend::Invalid) -> Self { + Self::InvalidBackend(ib.message.into()) } } } diff --git a/linkerd/proxy/client-policy/src/tls.rs b/linkerd/proxy/client-policy/src/tls.rs index 4018715c77..3481359d71 100644 --- a/linkerd/proxy/client-policy/src/tls.rs +++ b/linkerd/proxy/client-policy/src/tls.rs @@ -1,7 +1,6 @@ use linkerd_tls_route as tls; -use std::sync::Arc; - pub use linkerd_tls_route::{find, sni, RouteMatch}; +use std::sync::Arc; pub type Policy = crate::RoutePolicy; pub type Route = tls::Route; @@ -12,7 +11,11 @@ pub struct Tls { } #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub enum Filter {} +pub enum Filter { + ForbiddenRoute, + InvalidBackend(Arc), + InternalError(&'static str), +} pub fn default(distribution: crate::RouteDistribution) -> Route { Route { @@ -39,7 +42,7 @@ pub(crate) mod proto { use super::*; use crate::{ proto::{BackendSet, InvalidBackend, InvalidDistribution, InvalidMeta}, - Meta, RouteBackend, RouteDistribution, + Backend, Meta, RouteBackend, RouteDistribution, }; use linkerd2_proxy_api::outbound::{self, tls_route}; use linkerd_tls_route::sni::proto::InvalidSniMatch; @@ -98,7 +101,7 @@ pub(crate) mod proto { rules, snis, metadata, - .. + error, } = proto; let meta = Arc::new( metadata @@ -120,7 +123,7 @@ pub(crate) mod proto { let policy = rules .into_iter() - .map(|rule| try_rule(&meta, rule)) + .map(|rule| try_rule(&meta, rule, error.clone())) .next() .ok_or(InvalidTlsRoute::OnlyOneRule(0))??; @@ -130,14 +133,20 @@ pub(crate) mod proto { fn try_rule( meta: &Arc, tls_route::Rule { backends }: tls_route::Rule, + route_error: Option, ) -> Result { let distribution = backends .ok_or(InvalidTlsRoute::Missing("distribution"))? .try_into()?; + let filters = match route_error { + Some(e) => Arc::new([e.into()]), + None => NO_FILTERS.clone(), + }; + Ok(Policy { meta: meta.clone(), - filters: NO_FILTERS.clone(), + filters, params: (), distribution, }) @@ -188,22 +197,30 @@ pub(crate) mod proto { impl TryFrom for RouteBackend { type Error = InvalidBackend; fn try_from( - tls_route::RouteBackend { - backend, - invalid: _, // TODO - }: tls_route::RouteBackend, + tls_route::RouteBackend { backend, invalid }: tls_route::RouteBackend, ) -> Result { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, std::iter::empty::<()>()) + + let backend = Backend::try_from(backend)?; + + let filters = match invalid { + Some(invalid) => Arc::new([invalid.into()]), + None => NO_FILTERS.clone(), + }; + + Ok(RouteBackend { filters, backend }) + } + } + + impl From for Filter { + fn from(_: tls_route::RouteError) -> Self { + Self::ForbiddenRoute } } - // Necessary to satisfy `RouteBackend::try_from_proto` type constraints. - // TODO(eliza): if filters are added to opaque routes, change this to a - // proper `TryFrom` impl... - impl From<()> for Filter { - fn from(_: ()) -> Self { - unreachable!("no filters can be configured on opaque routes yet") + impl From for Filter { + fn from(ib: tls_route::route_backend::Invalid) -> Self { + Self::InvalidBackend(ib.message.into()) } } }