Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement notifications handling for extension logs #1415

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions crates/goose/src/agents/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ impl Capabilities {
} => {
let transport = StdioTransport::new(cmd, args.to_vec(), envs.get_env());
let handle = transport.start().await?;
eprintln!("added this extension");

// Set up logging in the default cache directory
println!("Setting up logging for extension {}", config.name());
let log_path = format!("{}/.cache/goose/logs/{}.log",
std::env::var("HOME").unwrap_or_else(|_| "~".to_string()),
config.name());
println!("Will attempt to create log file at: {}", log_path);
if let Err(e) = handle.enable_file_logging(&log_path).await {
println!("Failed to enable file logging for {}: {}", config.name(), e);
} else {
println!("Enabled logging for {} at {}", config.name(), log_path);
// Verify the file exists
if std::path::Path::new(&log_path).exists() {
println!("Confirmed log file was created at {}", log_path);
} else {
println!("Log file does not exist at {} after setup", log_path);
}
}

let service = McpService::with_timeout(handle, Duration::from_secs(300));
Box::new(McpClient::new(service))
}
Expand Down
1 change: 1 addition & 0 deletions crates/mcp-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower = { version = "0.4", features = ["timeout", "util"] }
tower-service = "0.3"
rand = "0.8"
chrono = "0.4.39"

[dev-dependencies]
54 changes: 54 additions & 0 deletions crates/mcp-client/src/transport/logging/file_logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use chrono::Local;

use super::LogMessage;

pub struct FileLogger {
file: Arc<Mutex<File>>,
}

impl FileLogger {
pub fn new(path: PathBuf) -> std::io::Result<Self> {
println!("Creating new FileLogger for path: {:?}", path);
// Create parent directories if they don't exist
if let Some(parent) = path.parent() {
println!("Creating parent directories: {:?}", parent);
std::fs::create_dir_all(parent)?;
}

println!("Opening log file...");
let file = OpenOptions::new()
.create(true)
.append(true)
.write(true)
.open(&path)?;

println!("FileLogger successfully created");
Ok(Self {
file: Arc::new(Mutex::new(file)),
})
}

pub async fn log(&self, message: &LogMessage) -> std::io::Result<()> {
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
let log_line = format!(
"[{}] [{}] {}: {}\n",
timestamp,
message.level,
message.logger.as_deref().unwrap_or("unknown"),
message.message
);

println!("Writing log line: {}", log_line.trim());
let mut file = self.file.lock().await;
file.write_all(log_line.as_bytes())?;
file.flush()?;
println!("Successfully wrote and flushed log line");

Ok(())
}
}
215 changes: 215 additions & 0 deletions crates/mcp-client/src/transport/logging/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::transport::Error;
use mcp_core::protocol::{JsonRpcMessage, JsonRpcNotification, JsonRpcRequest};

/// Log levels supported by the MCP protocol
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Debug,
Info,
Notice,
Warning,
Error,
Critical,
Alert,
Emergency,
}

impl std::fmt::Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Debug => write!(f, "DEBUG"),
LogLevel::Info => write!(f, "INFO"),
LogLevel::Notice => write!(f, "NOTICE"),
LogLevel::Warning => write!(f, "WARN"),
LogLevel::Error => write!(f, "ERROR"),
LogLevel::Critical => write!(f, "CRIT"),
LogLevel::Alert => write!(f, "ALERT"),
LogLevel::Emergency => write!(f, "EMERG"),
}
}
}

/// Logging capability for client/server capability negotiation
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LoggingCapability {}

/// Parameters for setting the log level
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetLevelParams {
pub level: LogLevel,
}

/// Parameters for logging message notifications
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoggingMessageParams {
pub level: LogLevel,
pub logger: Option<String>,
pub data: Value,
}

/// A log message received from the server
#[derive(Debug, Clone)]
pub struct LogMessage {
pub level: LogLevel,
pub logger: Option<String>,
pub message: String,
}

/// Handler type for log messages
pub type LogHandler = Box<dyn Fn(LogMessage) + Send + Sync>;

/// Manages logging state and handlers
#[derive(Clone)]
pub struct LoggingManager {
handlers: Arc<RwLock<Vec<LogHandler>>>,
}

impl Default for LoggingManager {
fn default() -> Self {
Self::new()
}
}

impl LoggingManager {
pub fn new() -> Self {
Self {
handlers: Arc::new(RwLock::new(Vec::new())),
}
}

/// Register a new handler for log messages
pub async fn add_handler<F>(&self, handler: F)
where
F: Fn(LogMessage) + Send + Sync + 'static,
{
println!("Adding new log handler");
self.handlers.write().await.push(Box::new(handler));
let count = self.handlers.read().await.len();
println!("Now have {} log handlers registered", count);
}

/// Handle a logging notification message
pub async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<(), Error> {
println!("LoggingManager handling notification: {:?}", notification);
// Parse notification parameters
let params: LoggingMessageParams = serde_json::from_value(
notification.params.ok_or_else(|| Error::UnsupportedMessage)?,
)
.map_err(|e| Error::Serialization(e))?;

// Convert data to string - handle both string and structured data
let message = match params.data {
Value::String(s) => s,
_ => serde_json::to_string(&params.data)
.map_err(|e| Error::Serialization(e))?,
};

let log_message = LogMessage {
level: params.level,
logger: params.logger.clone(),
message: message.clone(),
};

println!(
"Created log message: level={:?}, logger={:?}, message={}",
log_message.level,
log_message.logger,
log_message.message
);

let handler_count = self.handlers.read().await.len();
println!("About to notify {} registered handlers", handler_count);

// Notify all registered handlers
for (idx, handler) in self.handlers.read().await.iter().enumerate() {
println!("Calling handler {}", idx);
handler(log_message.clone());
println!("Handler {} completed", idx);
}

Ok(())
}
}

/// Helper function to handle incoming notifications in the transport
pub async fn handle_notification(
notification: JsonRpcNotification,
logging_manager: &LoggingManager,
) -> Result<(), Error> {
match notification.method.as_str() {
"notifications/message" => {
logging_manager.handle_notification(notification).await?;
}
_ => {
// Ignore other notification types
println!("Ignoring unknown notification: {}", notification.method);
}
}
Ok(())
}

/// Helper function to create a setLevel request
pub fn create_set_level_request(level: LogLevel) -> JsonRpcMessage {
JsonRpcMessage::Request(JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: Some(0), // The transport will set the actual ID
method: "logging/setLevel".to_string(),
params: Some(serde_json::to_value(SetLevelParams { level }).unwrap()),
})
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};

#[tokio::test]
async fn test_logging_manager() {
let manager = LoggingManager::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();

// Add a test handler
manager
.add_handler(move |msg| {
assert_eq!(msg.level, LogLevel::Info);
assert_eq!(msg.message, "test message");
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.await;

// Create a test notification
let notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/message".to_string(),
params: Some(serde_json::json!({
"level": "info",
"data": "test message"
})),
};

// Handle the notification
manager.handle_notification(notification).await.unwrap();

// Verify the handler was called
assert_eq!(counter.load(Ordering::SeqCst), 1);
}

#[test]
fn test_create_set_level_request() {
let request = create_set_level_request(LogLevel::Debug);
match request {
JsonRpcMessage::Request(req) => {
assert_eq!(req.method, "logging/setLevel");
assert!(req.params.is_some());
}
_ => panic!("Expected Request"),
}
}
}
8 changes: 8 additions & 0 deletions crates/mcp-client/src/transport/logging/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub(crate) mod file_logger;
mod manager;

pub use file_logger::FileLogger;
pub use manager::{
LogLevel, LogMessage, LoggingCapability, LoggingManager, LoggingMessageParams, SetLevelParams,
create_set_level_request, handle_notification,
};
3 changes: 3 additions & 0 deletions crates/mcp-client/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,6 @@ pub use stdio::StdioTransport;

pub mod sse;
pub use sse::SseTransport;

pub mod logging;
pub use logging::{LogLevel, LogMessage, LoggingManager};
Loading
Loading