Skip to content

Commit

Permalink
fix(outbound): fall back to profile-provided authority (#3342)
Browse files Browse the repository at this point in the history
To fix a regression in 7eadac4, where authority labels are not properly
set on legacy TCP metrics, this commit uses the profile-provided authority
when it is set.
  • Loading branch information
olix0r authored Nov 8, 2024
1 parent 7eadac4 commit 5c635e6
Showing 1 changed file with 30 additions and 27 deletions.
57 changes: 30 additions & 27 deletions linkerd/app/outbound/src/opaq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use linkerd_app_core::{
},
svc,
transport::addrs::*,
Addr, Error,
Addr, Error, NameAddr,
};
use once_cell::sync::Lazy;
use std::{fmt::Debug, hash::Hash, sync::Arc};
Expand Down Expand Up @@ -86,16 +86,6 @@ impl OpaqMetrics {
}
}

fn should_override_opaq_policy(
rx: &watch::Receiver<profiles::Profile>,
) -> Option<profiles::LogicalAddr> {
let p = rx.borrow();
if p.has_targets() {
p.addr.clone()
} else {
None
}
}
/// Given both profiles and policy information, this function constructs `opaq::Routes``.
/// The decision on whether profiles or policy should be used is made by inspecting the
/// returned profiles and checking whether there are any targets defined. This is done
Expand All @@ -106,18 +96,34 @@ pub fn routes_from_discovery(
profile: Option<profiles::Receiver>,
mut policy: policy::Receiver,
) -> watch::Receiver<Routes> {
if let Some(mut profile) = profile.map(watch::Receiver::from) {
if let Some(paddr) = should_override_opaq_policy(&profile) {
tracing::debug!("Using ServiceProfile");
let init = routes_from_profile(paddr.clone(), &profile.borrow_and_update());

return spawn_routes(profile, init, move |profile: &profiles::Profile| {
Some(routes_from_profile(paddr.clone(), profile))
});
let profile_addr = if let Some(mut rx) = profile.map(watch::Receiver::from) {
let profile = rx.borrow_and_update();
if let Some(profiles::LogicalAddr(addr)) = profile.addr.clone() {
if profile.has_targets() {
tracing::debug!(%addr, "Using ServiceProfile");
let init = routes_from_profile(addr.clone(), &profile);
drop(profile);
return spawn_routes(rx, init, move |profile: &profiles::Profile| {
Some(routes_from_profile(addr.clone(), profile))
});
}
}
}

tracing::debug!("Using ClientPolicy routes");
profile
.addr
.clone()
.map(|profiles::LogicalAddr(pa)| Addr::Name(pa))
} else {
None
};

// Prefer a named address if the given address is a socket address.
let addr = match addr {
Addr::Name(na) => Addr::Name(na),
addr => profile_addr.unwrap_or(addr),
};

tracing::debug!(%addr, "Using ClientPolicy routes");
let init = routes_from_policy(addr.clone(), &policy.borrow_and_update())
.expect("initial policy must be opaque");

Expand Down Expand Up @@ -147,10 +153,7 @@ fn routes_from_policy(addr: Addr, policy: &policy::ClientPolicy) -> Option<Route
})
}

fn routes_from_profile(
profiles::LogicalAddr(profiles_addr): profiles::LogicalAddr,
profile: &profiles::Profile,
) -> Routes {
fn routes_from_profile(addr: NameAddr, profile: &profiles::Profile) -> Routes {
// TODO: make configurable
let queue = {
policy::Queue {
Expand All @@ -165,7 +168,7 @@ fn routes_from_profile(
});

// TODO(ver) use resource metadata from the profile response.
let parent_meta = service_meta(&profiles_addr).unwrap_or_else(|| UNKNOWN_META.clone());
let parent_meta = service_meta(&addr).unwrap_or_else(|| UNKNOWN_META.clone());

let backends: Vec<(policy::RouteBackend<policy::opaq::Filter>, u32)> = profile
.targets
Expand Down Expand Up @@ -207,7 +210,7 @@ fn routes_from_profile(

Routes {
logical: Logical {
addr: profiles_addr.into(),
addr: addr.into(),
meta: ParentRef(parent_meta),
},
backends: backends.into_iter().map(|(b, _)| b.backend).collect(),
Expand Down

0 comments on commit 5c635e6

Please sign in to comment.