Skip to content

Commit

Permalink
chore(rust): improve privileged portals outputs, status, and persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Nov 12, 2024
1 parent 04815e5 commit 9004917
Show file tree
Hide file tree
Showing 16 changed files with 204 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,21 @@ pub struct TcpInlet {
bind_addr: SocketAddr,
outlet_addr: MultiAddr,
alias: String,
privileged: bool,
}

impl TcpInlet {
pub fn new(bind_addr: &SocketAddr, outlet_addr: &MultiAddr, alias: &str) -> TcpInlet {
pub fn new(
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
alias: &str,
privileged: bool,
) -> TcpInlet {
Self {
bind_addr: *bind_addr,
outlet_addr: outlet_addr.clone(),
alias: alias.to_string(),
privileged,
}
}

Expand All @@ -59,4 +66,8 @@ impl TcpInlet {
pub fn alias(&self) -> String {
self.alias.clone()
}

pub fn privileged(&self) -> bool {
self.privileged
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use ockam_core::Error;
use ockam_core::Result;
use ockam_core::{async_trait, Address};
use ockam_multiaddr::MultiAddr;
use ockam_node::database::Boolean;
use ockam_transport_core::HostnamePort;

#[derive(Clone)]
Expand Down Expand Up @@ -46,14 +47,15 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
) -> ockam_core::Result<()> {
let query = query(
r#"
INSERT INTO tcp_inlet (node_name, bind_addr, outlet_addr, alias)
VALUES ($1, $2, $3, $4)
INSERT INTO tcp_inlet (node_name, bind_addr, outlet_addr, alias, privileged)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING"#,
)
.bind(node_name)
.bind(tcp_inlet.bind_addr().to_string())
.bind(tcp_inlet.outlet_addr().to_string())
.bind(tcp_inlet.alias());
.bind(tcp_inlet.alias())
.bind(tcp_inlet.privileged());
query.execute(&*self.database.pool).await.void()?;
Ok(())
}
Expand All @@ -64,7 +66,7 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
alias: &str,
) -> ockam_core::Result<Option<TcpInlet>> {
let query = query_as(
"SELECT bind_addr, outlet_addr, alias FROM tcp_inlet WHERE node_name = $1 AND alias = $2",
"SELECT bind_addr, outlet_addr, alias, privileged FROM tcp_inlet WHERE node_name = $1 AND alias = $2",
)
.bind(node_name)
.bind(alias);
Expand All @@ -90,14 +92,15 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
) -> ockam_core::Result<()> {
let query = query(
r#"
INSERT INTO tcp_outlet_status (node_name, socket_addr, worker_addr, payload)
VALUES ($1, $2, $3, $4)
INSERT INTO tcp_outlet_status (node_name, socket_addr, worker_addr, payload, privileged)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING"#,
)
.bind(node_name)
.bind(tcp_outlet_status.to.to_string())
.bind(tcp_outlet_status.worker_addr.to_string())
.bind(tcp_outlet_status.payload.as_ref());
.bind(tcp_outlet_status.payload.as_ref())
.bind(tcp_outlet_status.privileged);
query.execute(&*self.database.pool).await.void()?;
Ok(())
}
Expand All @@ -107,7 +110,7 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
node_name: &str,
worker_addr: &Address,
) -> ockam_core::Result<Option<OutletStatus>> {
let query = query_as("SELECT socket_addr, worker_addr, payload FROM tcp_outlet_status WHERE node_name = $1 AND worker_addr = $2")
let query = query_as("SELECT socket_addr, worker_addr, payload, privileged FROM tcp_outlet_status WHERE node_name = $1 AND worker_addr = $2")
.bind(node_name)
.bind(worker_addr.to_string());
let result: Option<TcpOutletStatusRow> = query
Expand Down Expand Up @@ -139,6 +142,7 @@ struct TcpInletRow {
bind_addr: String,
outlet_addr: String,
alias: String,
privileged: Boolean,
}

impl TcpInletRow {
Expand All @@ -157,6 +161,7 @@ impl TcpInletRow {
&self.bind_addr()?,
&self.outlet_addr()?,
&self.alias,
self.privileged.to_bool(),
))
}
}
Expand All @@ -167,6 +172,7 @@ struct TcpOutletStatusRow {
socket_addr: String,
worker_addr: String,
payload: Option<String>,
privileged: Boolean,
}

impl TcpOutletStatusRow {
Expand All @@ -178,6 +184,7 @@ impl TcpOutletStatusRow {
to,
worker_addr,
payload: self.payload.clone(),
privileged: self.privileged.to_bool(),
})
}
}
Expand All @@ -197,6 +204,7 @@ mod tests {
&SocketAddr::from_str("127.0.0.1:80").unwrap(),
&MultiAddr::from_str("/node/outlet").unwrap(),
"alias",
true,
);
repository.store_tcp_inlet("node_name", &tcp_inlet).await?;
let actual = repository.get_tcp_inlet("node_name", "alias").await?;
Expand All @@ -211,6 +219,7 @@ mod tests {
HostnamePort::from_str("127.0.0.1:80").unwrap(),
worker_addr.clone(),
Some("payload".to_string()),
true,
);
repository
.store_tcp_outlet("node_name", &tcp_outlet_status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ impl CliState {
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
alias: &str,
privileged: bool,
) -> Result<TcpInlet> {
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addr, alias);
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addr, alias, privileged);
self.tcp_portals_repository()
.store_tcp_inlet(node_name, &tcp_inlet)
.await?;
Expand Down Expand Up @@ -56,8 +57,10 @@ impl CliState {
to: &HostnamePort,
worker_addr: &Address,
payload: &Option<String>,
privileged: bool,
) -> Result<OutletStatus> {
let tcp_outlet_status = OutletStatus::new(to.clone(), worker_addr.clone(), payload.clone());
let tcp_outlet_status =
OutletStatus::new(to.clone(), worker_addr.clone(), payload.clone(), privileged);

self.tcp_portals_repository()
.store_tcp_outlet(node_name, &tcp_outlet_status)
Expand Down
35 changes: 32 additions & 3 deletions implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ockam_core::{Address, IncomingAccessControl, OutgoingAccessControl, Route};
use ockam_multiaddr::MultiAddr;
use serde::{Deserialize, Serialize};

use crate::colors::color_primary;
use crate::colors::{color_primary, color_primary_alt};
use crate::error::ApiError;

use crate::output::Output;
Expand Down Expand Up @@ -208,6 +208,7 @@ pub struct InletStatus {
#[n(5)] pub outlet_route: Option<String>,
#[n(6)] pub status: ConnectionStatus,
#[n(7)] pub outlet_addr: String,
#[n(8)] pub privileged: bool,
}

impl InletStatus {
Expand All @@ -220,6 +221,7 @@ impl InletStatus {
outlet_route: impl Into<Option<String>>,
status: ConnectionStatus,
outlet_addr: impl Into<String>,
privileged: bool,
) -> Self {
Self {
bind_addr: bind_addr.into(),
Expand All @@ -229,6 +231,7 @@ impl InletStatus {
outlet_route: outlet_route.into(),
status,
outlet_addr: outlet_addr.into(),
privileged,
}
}
}
Expand Down Expand Up @@ -261,6 +264,14 @@ impl Display for InletStatus {
fmt::INDENTATION,
color_primary(&self.outlet_addr)
)?;
if self.privileged {
writeln!(
f,
"{}This Inlet is operating in {} mode",
fmt::INDENTATION,
color_primary_alt("privileged".to_string())
)?;
}
Ok(())
}
}
Expand All @@ -280,14 +291,21 @@ pub struct OutletStatus {
#[n(2)] pub worker_addr: Address,
/// An optional status payload
#[n(3)] pub payload: Option<String>,
#[n(4)] pub privileged: bool,
}

impl OutletStatus {
pub fn new(to: HostnamePort, worker_addr: Address, payload: impl Into<Option<String>>) -> Self {
pub fn new(
to: HostnamePort,
worker_addr: Address,
payload: impl Into<Option<String>>,
privileged: bool,
) -> Self {
Self {
to,
worker_addr,
payload: payload.into(),
privileged,
}
}

Expand Down Expand Up @@ -316,7 +334,18 @@ impl Display for OutletStatus {
.to_string()
),
color_primary(self.to.to_string()),
)
)?;

if self.privileged {
writeln!(
f,
"{}This Inlet is operating in {} mode",
fmt::INDENTATION,
color_primary_alt("privileged".to_string())
)?;
}

Ok(())
}
}

Expand Down
20 changes: 16 additions & 4 deletions implementations/rust/ockam/ockam_api/src/nodes/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,21 @@ pub(crate) struct InletInfo {
pub(crate) bind_addr: String,
pub(crate) outlet_addr: MultiAddr,
pub(crate) session: Arc<Mutex<Session>>,
pub(crate) privileged: bool,
}

impl InletInfo {
pub(crate) fn new(bind_addr: &str, outlet_addr: MultiAddr, session: Session) -> Self {
pub(crate) fn new(
bind_addr: &str,
outlet_addr: MultiAddr,
session: Session,
privileged: bool,
) -> Self {
Self {
bind_addr: bind_addr.to_owned(),
outlet_addr,
session: Arc::new(Mutex::new(session)),
privileged,
}
}
}
Expand All @@ -143,15 +150,20 @@ impl InletInfo {
pub struct OutletInfo {
pub(crate) to: HostnamePort,
pub(crate) worker_addr: Address,
pub(crate) privileged: bool,
}

impl OutletInfo {
pub(crate) fn new(to: HostnamePort, worker_addr: Option<&Address>) -> Self {
pub(crate) fn new(to: HostnamePort, worker_addr: Option<&Address>, privileged: bool) -> Self {
let worker_addr = match worker_addr {
Some(addr) => addr.clone(),
None => Address::from_string(""),
};
Self { to, worker_addr }
Self {
to,
worker_addr,
privileged,
}
}
}

Expand Down Expand Up @@ -334,6 +346,6 @@ mod tests {
}

fn outlet_info(worker_addr: Address) -> OutletInfo {
OutletInfo::new(HostnamePort::new("127.0.0.1", 0), Some(&worker_addr))
OutletInfo::new(HostnamePort::new("127.0.0.1", 0), Some(&worker_addr), true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,14 @@ impl NodeManager {
.entries()
.await
.iter()
.map(|(_, info)| OutletStatus::new(info.to.clone(), info.worker_addr.clone(), None))
.map(|(_, info)| {
OutletStatus::new(
info.to.clone(),
info.worker_addr.clone(),
None,
info.privileged,
)
})
.collect()
}

Expand Down
Loading

0 comments on commit 9004917

Please sign in to comment.