Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: clean tokio Runtime usage #8755

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 12 additions & 10 deletions implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use tokio::sync::broadcast::{channel, Receiver, Sender};
use ockam::SqlxDatabase;
use ockam_core::env::get_env_with_default;
use ockam_node::database::{DatabaseConfiguration, DatabaseType, OCKAM_SQLITE_IN_MEMORY};
use ockam_node::Executor;

use crate::cli_state::error::Result;
use crate::cli_state::CliStateError;
Expand Down Expand Up @@ -51,11 +50,6 @@ pub struct CliState {
}

impl CliState {
/// Create a new CliState in a given directory
pub fn new(mode: CliStateMode) -> Result<Self> {
Executor::execute_future(Self::create(mode))?
}

pub fn dir(&self) -> Result<PathBuf> {
match &self.mode {
CliStateMode::Persistent(dir) => Ok(dir.to_path_buf()),
Expand Down Expand Up @@ -127,14 +121,14 @@ impl CliState {
impl CliState {
/// Return a new CliState using a default directory to store its data or
/// using an in-memory storage if the OCKAM_SQLITE_IN_MEMORY environment variable is set to true
pub fn from_env() -> Result<Self> {
pub async fn from_env() -> Result<Self> {
let in_memory = get_env_with_default::<bool>(OCKAM_SQLITE_IN_MEMORY, false)?;
let mode = if in_memory {
CliStateMode::InMemory
} else {
CliStateMode::with_default_dir()?
};
Self::new(mode)
Self::create(mode).await
}

/// Stop nodes and remove all the directories storing state
Expand Down Expand Up @@ -182,7 +176,7 @@ impl CliState {
/// Backup and reset is used to save aside
/// some corrupted local state for later inspection and then reset the state.
/// The database is backed-up only if it is a SQLite database.
pub fn backup_and_reset() -> Result<()> {
pub async fn backup_and_reset() -> Result<()> {
let dir = Self::default_dir()?;

// Reset backup directory
Expand All @@ -202,7 +196,7 @@ impl CliState {

// Reset state
Self::delete_at(&dir)?;
Self::new(CliStateMode::Persistent(dir.clone()))?;
Self::create(CliStateMode::Persistent(dir.clone())).await?;

let backup_dir = CliState::backup_default_dir()?;
eprintln!("The {dir:?} directory has been reset and has been backed up to {backup_dir:?}");
Expand Down Expand Up @@ -230,19 +224,26 @@ impl CliState {
impl CliState {
/// Create a new CliState where the data is stored at a given path
pub async fn create(mode: CliStateMode) -> Result<Self> {
// log("Point 4.0");
if let CliStateMode::Persistent(ref dir) = mode {
std::fs::create_dir_all(dir.as_path())?;
}
// log("Point 4.1");
let database = SqlxDatabase::create(&Self::make_database_configuration(&mode)?).await?;
// log("Point 4.2");
let configuration = Self::make_application_database_configuration(&mode)?;
// log("Point 4.3");
// FIXME
let application_database =
SqlxDatabase::create_application_database(&configuration).await?;
// log("Point 4.4");
debug!("Opened the main database with options {:?}", database);
debug!(
"Opened the application database with options {:?}",
application_database
);
let (notifications, _) = channel::<Notification>(NOTIFICATIONS_CHANNEL_CAPACITY);
// log("Point 4.5");
let state = Self {
mode,
database,
Expand All @@ -254,6 +255,7 @@ impl CliState {
exporting_enabled: ExportingEnabled::Off,
notifications,
};
// log("Point 4.6");
Ok(state)
}

Expand Down
7 changes: 7 additions & 0 deletions implementations/rust/ockam/ockam_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ pub use session::connection_status::ConnectionStatus;
pub use ui::*;
pub use util::*;
pub use version::*;

#[inline]
pub fn log(str: &str) {
let now = chrono::Utc::now();
let now = now.to_rfc3339();
println!("{}: {}", now, str);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::logs::ExportingEnabled;
use crate::CliState;
use ockam_core::env::{get_env_with_default, FromString};
use ockam_core::errcode::{Kind, Origin};
use ockam_node::Executor;
use std::env::current_exe;
use std::fmt::{Display, Formatter};
use std::net::{SocketAddr, ToSocketAddrs};
Expand Down Expand Up @@ -116,14 +115,15 @@ impl ExportingConfiguration {

/// Create a tracing configuration for a user command running in the foreground.
/// (meaning that the process will shut down once the command has been executed)
pub fn foreground(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state)? {
pub async fn foreground(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state).await? {
None => ExportingConfiguration::off(),
Some(endpoint) => Ok(ExportingConfiguration {
enabled: exporting_enabled(
&endpoint,
opentelemetry_endpoint_foreground_connection_timeout()?,
)?,
)
.await?,
span_export_timeout: span_export_timeout()?,
log_export_timeout: log_export_timeout()?,
span_export_scheduled_delay: foreground_span_export_scheduled_delay()?,
Expand All @@ -139,14 +139,15 @@ impl ExportingConfiguration {
}

/// Create a tracing configuration for a background node
pub fn background(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state)? {
pub async fn background(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state).await? {
None => ExportingConfiguration::off(),
Some(endpoint) => Ok(ExportingConfiguration {
enabled: exporting_enabled(
&endpoint,
opentelemetry_endpoint_background_connection_timeout()?,
)?,
)
.await?,
span_export_timeout: span_export_timeout()?,
log_export_timeout: log_export_timeout()?,
span_export_scheduled_delay: background_span_export_scheduled_delay()?,
Expand Down Expand Up @@ -254,11 +255,11 @@ fn print_debug(message: impl Into<String>) {
/// - Exporting has not been deactivated by the user
/// - The opentelemetry endpoint is accessible
///
fn exporting_enabled(
async fn exporting_enabled(
endpoint: &OpenTelemetryEndpoint,
connection_check_timeout: Duration,
) -> ockam_core::Result<ExportingEnabled> {
if is_endpoint_accessible(&endpoint.url(), connection_check_timeout) {
if is_endpoint_accessible(&endpoint.url(), connection_check_timeout).await {
print_debug("Exporting is enabled");
Ok(ExportingEnabled::On)
} else {
Expand All @@ -275,23 +276,37 @@ fn exporting_enabled(
}

/// Return true if the endpoint can be accessed with a TCP connection
fn is_endpoint_accessible(url: &Url, connection_check_timeout: Duration) -> bool {
async fn is_endpoint_accessible(url: &Url, connection_check_timeout: Duration) -> bool {
match to_socket_addr(url) {
Some(address) => {
let retries = FibonacciBackoff::from_millis(100);
let now = Instant::now();

// TODO: Not sure we need to retry really, also maybe it could happen in the background
// to not slow things down
for timeout_duration in retries {
print_debug(format!(
"trying to connect to {address} in {timeout_duration:?}"
));
if std::net::TcpStream::connect_timeout(&address, timeout_duration).is_ok() {
return true;
} else {
if now.elapsed() >= connection_check_timeout {
return false;
};
std::thread::sleep(timeout_duration);

let res = tokio::time::timeout(
timeout_duration,
tokio::net::TcpStream::connect(&address),
)
.await;

match res {
Ok(res) => {
if res.is_ok() {
return true;
}
}
Err(_) => {
if now.elapsed() >= connection_check_timeout {
return false;
};
tokio::time::sleep(timeout_duration).await;
}
}
}
false
Expand Down Expand Up @@ -324,36 +339,34 @@ fn to_socket_addr(url: &Url) -> Option<SocketAddr> {
/// Return the tracing endpoint, defined by an environment variable
/// If the endpoint can be established with an Ockam portal to the opentelemetry-relay created in the project
/// use that URL, otherwise use the HTTPS endpoint
fn opentelemetry_endpoint(state: &CliState) -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
async fn opentelemetry_endpoint(
state: &CliState,
) -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
if !is_exporting_set()? {
print_debug("Exporting is turned off");
Ok(None)
} else {
let state = state.clone();
match Executor::execute_future(async move {
let res = {
// if a project is defined try to use the OpenTelemetry portal
// and if we allow traces to be exported via a portal
if state.projects().get_default_project().await.is_ok()
&& is_exporting_via_portal_set()?
{
print_debug("A default project exists. Getting the project export endpoint");
get_project_endpoint_url(&state).await
get_project_endpoint_url(state).await
} else {
print_debug("A default project does not exist. Getting the default HTTPs endpoint");
get_https_endpoint()
}
}) {
Ok(Ok(url)) => Ok(Some(url)),
Ok(Err(e)) => {
};
match res {
Ok(url) => Ok(Some(url)),
Err(e) => {
print_debug(format!(
"There was an issue when setting up the exporting of traces: {e:?}"
));
Ok(None)
}
Err(e) => {
print_debug(format!("There was an issue when running the code setting up the exporting of traces: {e:?}"));
Ok(None)
}
}
}
}
Expand Down
Loading
Loading