Skip to content

Commit

Permalink
fix(windows): improve process termination and port cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: David Anyatonwu <[email protected]>
  • Loading branch information
onyedikachi-david committed Jan 15, 2025
1 parent 5c50996 commit 48226a1
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 63 deletions.
1 change: 1 addition & 0 deletions screenpipe-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,5 @@ nix = { version = "0.29", features = ["signal"] }
windows = { version = "0.58", features = [
"Win32_System_Threading",
"Win32_Foundation",
"Win32_Networking_WinSock",
] }
246 changes: 183 additions & 63 deletions screenpipe-server/src/auto_destruct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,89 +2,209 @@ use std::process::Command;
use std::time::Duration;
use tokio::time::sleep;
#[cfg(target_os = "windows")]
use tracing::debug;
use tracing::info;
#[cfg(target_os = "windows")]
use windows::Win32::Foundation::{CloseHandle, HANDLE, STILL_ACTIVE};
#[cfg(target_os = "windows")]
use windows::Win32::System::Threading::{
GetExitCodeProcess, OpenProcess, PROCESS_QUERY_INFORMATION,
CreateJobObjectW, AssignProcessToJobObject, GetExitCodeProcess, OpenProcess, TerminateProcess,

Check failure on line 8 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-windows

unresolved imports `windows::Win32::System::Threading::CreateJobObjectW`, `windows::Win32::System::Threading::AssignProcessToJobObject`

Check failure on line 8 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-windows

unresolved imports `windows::Win32::System::Threading::CreateJobObjectW`, `windows::Win32::System::Threading::AssignProcessToJobObject`
PROCESS_QUERY_INFORMATION, PROCESS_TERMINATE, PROCESS_ALL_ACCESS,
};
use tracing::{debug, error, info, warn};

Check warning on line 11 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-linux

unused imports: `debug`, `error`, and `warn`

Check warning on line 11 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-macos

unused imports: `debug`, `error`, and `warn`

Check warning on line 11 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-ubuntu

unused imports: `debug`, `error`, and `warn`
use std::io::Error as IoError;

Check warning on line 12 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-linux

unused import: `std::io::Error as IoError`

Check warning on line 12 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-macos

unused import: `std::io::Error as IoError`

Check warning on line 12 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-ubuntu

unused import: `std::io::Error as IoError`
use thiserror::Error;

Check warning on line 13 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-linux

unused import: `thiserror::Error`

Check warning on line 13 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-macos

unused import: `thiserror::Error`

Check warning on line 13 in screenpipe-server/src/auto_destruct.rs

View workflow job for this annotation

GitHub Actions / test-ubuntu

unused import: `thiserror::Error`

#[cfg(target_os = "windows")]
#[derive(Error, Debug)]
pub enum ProcessError {
#[error("Failed to open process: {0}")]
OpenProcess(IoError),
#[error("Failed to terminate process: {0}")]
TerminateProcess(IoError),
#[error("Process became unresponsive")]
Unresponsive,
#[error("Port is still in use")]
PortInUse,
}

#[cfg(target_os = "windows")]
#[derive(Debug)]
pub struct ProcessStatus {
pub terminated: bool,
pub exit_code: Option<u32>,
pub port_released: bool,
}

#[cfg(target_os = "windows")]
fn is_process_alive(pid: u32) -> bool {
unsafe {
let process: HANDLE = match OpenProcess(PROCESS_QUERY_INFORMATION, false, pid) {
Ok(handle) => handle,
Err(e) => {
debug!("Failed to open process with PID ({}): {:?}", pid, e);
return false;
pub struct ProcessManager {
pid: u32,
port: u16,
job_handle: HANDLE,
}

#[cfg(target_os = "windows")]
impl ProcessManager {
pub fn new(pid: u32, port: u16) -> windows::core::Result<Self> {
unsafe {
let job_handle = CreateJobObjectW(None, None)?;
let process_handle = OpenProcess(PROCESS_ALL_ACCESS, false, pid)?;
AssignProcessToJobObject(job_handle, process_handle)?;
CloseHandle(process_handle).expect("Failed to close process handle");
Ok(Self {
pid,
port,
job_handle,
})
}
}

fn is_process_alive(&self) -> Result<bool, ProcessError> {
unsafe {
let process = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_TERMINATE, false, self.pid)
.map_err(|e| ProcessError::OpenProcess(IoError::from_raw_os_error(e.code().0)))?;

if process.is_invalid() {
return Ok(false);
}
};
if process.is_invalid() {
return false;

let mut exit_code: u32 = 0;
GetExitCodeProcess(process, &mut exit_code).map_err(|_| ProcessError::Unresponsive)?;
CloseHandle(process).expect("Failed to close process handle");

Ok(exit_code == STILL_ACTIVE.0 as u32)
}
let mut exit_code: u32 = 0;
let result = GetExitCodeProcess(process, &mut exit_code);
CloseHandle(process).expect("Failed to close process handle");
if result.is_err() {
debug!("Failed to get exit code for process with PID ({})", pid);
return false;
}

fn is_port_in_use(&self) -> bool {
let netstat_output = Command::new("netstat")
.args(&["-ano"])
.output()
.expect("failed to execute netstat");

let output = String::from_utf8_lossy(&netstat_output.stdout);
output.contains(&format!(":{}", self.port))
}

async fn try_graceful_shutdown(&self) -> Result<(), ProcessError> {
for _ in 0..3 {
if !self.is_process_alive()? {
return Ok(());
}
sleep(Duration::from_secs(1)).await;
}
exit_code == STILL_ACTIVE.0 as u32
Err(ProcessError::Unresponsive)
}
}

pub async fn watch_pid(pid: u32) -> bool {
info!("starting to watch for app termination (pid: {})", pid);
pub async fn force_terminate(&self) -> Result<(), ProcessError> {
unsafe {
let process = OpenProcess(PROCESS_TERMINATE, false, self.pid)
.map_err(|e| ProcessError::OpenProcess(IoError::from_raw_os_error(e.code().0)))?;

loop {
#[cfg(target_os = "windows")]
{
// Try Windows API first
if !is_process_alive(pid) {
debug!("Process ({}) not found via windows api", pid);
return true;
if !process.is_invalid() {
TerminateProcess(process, 1)
.map_err(|e| ProcessError::TerminateProcess(IoError::from_raw_os_error(e.code().0)))?;
CloseHandle(process).expect("Failed to close process handle");
}

Ok(())
}
}

pub async fn watch_and_cleanup(&self) -> Result<ProcessStatus, ProcessError> {
info!("starting enhanced process monitoring (pid: {}, port: {})", self.pid, self.port);

let mut consecutive_fails = 0;
let max_fails = 3;

loop {
match self.is_process_alive() {
Ok(false) => {
debug!("Process ({}) not found via Windows API", self.pid);
break;
}
Ok(true) => {
consecutive_fails = 0;
}
Err(e) => {
consecutive_fails += 1;
error!("Error checking process status: {:?}", e);
if consecutive_fails >= max_fails {
return Err(ProcessError::Unresponsive);
}
}
}

// Fallback to Command approach
let pid_output = Command::new("tasklist")
.args(&["/FI", &format!("PID eq {}", pid), "/NH", "/FO", "CSV"])
.output()
.expect("failed to check pid");

let app_output = Command::new("tasklist")
.args(&[
"/FI",
"IMAGENAME eq screenpipe-app.exe",
"/NH",
"/FO",
"CSV",
])
.output()
.expect("failed to check app name");

let pid_alive = String::from_utf8_lossy(&pid_output.stdout).contains(&pid.to_string());
let app_alive = !String::from_utf8_lossy(&app_output.stdout).is_empty();

info!("pid alive: {}, app alive: {}", pid_alive, app_alive);

if !pid_alive || !app_alive {
return true;
sleep(Duration::from_secs(1)).await;
}

if let Err(e) = self.try_graceful_shutdown().await {
warn!("Graceful shutdown failed: {:?}, attempting force terminate", e);
self.force_terminate().await?;
}

let mut port_released = false;
for _ in 0..5 {
if !self.is_port_in_use() {
port_released = true;
break;
}
sleep(Duration::from_secs(1)).await;
}

#[cfg(not(target_os = "windows"))]
{
let output = Command::new("ps")
.args(&["-p", &pid.to_string()])
.output()
.expect("failed to execute process check command");
Ok(ProcessStatus {
terminated: true,
exit_code: None,
port_released,
})
}
}

if !output.status.success() || output.stdout.is_empty() {
return true;
#[cfg(target_os = "windows")]
impl Drop for ProcessManager {
fn drop(&mut self) {
unsafe {
CloseHandle(self.job_handle).expect("Failed to close job handle");
}
}
}

#[cfg(target_os = "windows")]
pub async fn watch_pid(pid: u32) -> bool {
let port = 3030;
match ProcessManager::new(pid, port) {
Ok(manager) => {
match manager.watch_and_cleanup().await {
Ok(status) => {
info!(
"Process terminated successfully. Port released: {}",
status.port_released
);
true
}
Err(e) => {
error!("Failed to manage process: {:?}", e);
false
}
}
}
Err(e) => {
error!("Failed to create process manager: {:?}", e);
false
}
}
}

#[cfg(not(target_os = "windows"))]
pub async fn watch_pid(pid: u32) -> bool {
info!("starting to watch for app termination (pid: {})", pid);

loop {
let output = Command::new("ps")
.args(&["-p", &pid.to_string()])
.output()
.expect("failed to execute process check command");

if !output.status.success() || output.stdout.is_empty() {
return true;
}

sleep(Duration::from_secs(1)).await;
}
Expand Down

0 comments on commit 48226a1

Please sign in to comment.