Skip to content

Commit

Permalink
support multi-threaded runtime in worker
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Feb 10, 2025
1 parent fa80c89 commit c87026c
Show file tree
Hide file tree
Showing 20 changed files with 401 additions and 80 deletions.
17 changes: 8 additions & 9 deletions g3bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,19 @@ fn main() -> anyhow::Result<()> {

proc_args.summary();

let _worker_guard = if let Some(worker_config) = proc_args.worker_runtime() {
let guard =
g3bench::worker::spawn_workers(worker_config).context("failed to start workers")?;
Some(guard)
} else {
None
};

let rt = proc_args
.main_runtime()
.start()
.context("failed to start main runtime")?;
rt.block_on(async move {
let _worker_guard = if let Some(worker_config) = proc_args.worker_runtime() {
let guard = g3bench::worker::spawn_workers(worker_config)
.await
.context("failed to start workers")?;
Some(guard)
} else {
None
};

match subcommand {
g3bench::target::h1::COMMAND => g3bench::target::h1::run(&proc_args, sub_args).await,
g3bench::target::h2::COMMAND => g3bench::target::h2::run(&proc_args, sub_args).await,
Expand Down
12 changes: 8 additions & 4 deletions g3bench/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn add_global_args(app: Command) -> Command {
.long(GLOBAL_ARG_THREADS)
.global(true)
.num_args(1)
.value_parser(value_parser!(usize)),
.value_parser(value_parser!(NonZeroUsize)),
)
.arg(
Arg::new(GLOBAL_ARG_THREAD_STACK_SIZE)
Expand Down Expand Up @@ -463,9 +463,9 @@ pub fn parse_global_args(args: &ArgMatches) -> anyhow::Result<ProcArgs> {
if args.get_flag(GLOBAL_ARG_UNCONSTRAINED) {
proc_args.task_unconstrained = true;
}
if let Some(n) = args.get_one::<usize>(GLOBAL_ARG_THREADS) {
proc_args.main_runtime.set_thread_number(*n);
proc_args.worker_runtime.set_thread_number(*n);
if let Some(n) = args.get_one::<NonZeroUsize>(GLOBAL_ARG_THREADS) {
proc_args.main_runtime.set_thread_number((*n).get());
proc_args.worker_runtime.set_thread_number_total(*n);
}
if let Some(stack_size) = g3_clap::humanize::get_usize(args, GLOBAL_ARG_THREAD_STACK_SIZE)? {
if stack_size > 0 {
Expand Down Expand Up @@ -547,5 +547,9 @@ pub fn parse_global_args(args: &ArgMatches) -> anyhow::Result<ProcArgs> {
proc_args.requests = Some(1);
}

proc_args
.worker_runtime
.check()
.context("invalid worker runtime config")?;
Ok(proc_args)
}
6 changes: 2 additions & 4 deletions g3bench/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ use g3_types::sync::GlobalInit;

static WORKER_HANDLERS: GlobalInit<Vec<Handle>> = GlobalInit::new(Vec::new());

pub async fn spawn_workers(config: &UnaidedRuntimeConfig) -> anyhow::Result<WorkersGuard> {
let guard = config
.start(|_, handle, _| WORKER_HANDLERS.with_mut(|vec| vec.push(handle)))
.await?;
pub fn spawn_workers(config: &UnaidedRuntimeConfig) -> anyhow::Result<WorkersGuard> {
let guard = config.start(|_, handle, _| WORKER_HANDLERS.with_mut(|vec| vec.push(handle)))?;
Ok(guard)
}

Expand Down
6 changes: 2 additions & 4 deletions g3fcgen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ fn main() -> anyhow::Result<()> {
#[cfg(unix)]
g3_daemon::daemonize::check_enter(&proc_args.daemon_config)?;

let _workers_guard =
g3_daemon::runtime::worker::spawn_workers().context("failed to spawn workers")?;
let ret = tokio_run(&proc_args);

match ret {
Expand All @@ -63,10 +65,6 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
rt.block_on(async {
g3_daemon::runtime::set_main_handle();

let _workers_guard = g3_daemon::runtime::worker::spawn_workers()
.await
.context("failed to spawn workers")?;

g3fcgen::run(args).await
})
}
5 changes: 2 additions & 3 deletions g3keymess/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ fn main() -> anyhow::Result<()> {
None
};

let _workers_guard =
g3_daemon::runtime::worker::spawn_workers().context("failed to spawn workers")?;
let ret = tokio_run(&proc_args);

if let Some(handlers) = stat_join {
Expand Down Expand Up @@ -132,9 +134,6 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {

g3keymess::signal::register().context("failed to setup signal handler")?;

let _workers_guard = g3_daemon::runtime::worker::spawn_workers()
.await
.context("failed to spawn workers")?;
match load_and_spawn(unique_ctl_path).await {
Ok(_) => g3_daemon::control::upgrade::finish(),
Err(e) => {
Expand Down
5 changes: 2 additions & 3 deletions g3proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ fn main() -> anyhow::Result<()> {
None
};

let _workers_guard =
g3_daemon::runtime::worker::spawn_workers().context("failed to spawn workers")?;
let ret = tokio_run(&proc_args);

if let Some(handlers) = stat_join {
Expand Down Expand Up @@ -140,9 +142,6 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
g3_daemon::runtime::metrics::add_tokio_stats(stats, "ip-locate".to_string());
}

let _workers_guard = g3_daemon::runtime::worker::spawn_workers()
.await
.context("failed to spawn workers")?;
match load_and_spawn().await {
Ok(_) => g3_daemon::control::upgrade::finish(),
Err(e) => {
Expand Down
5 changes: 2 additions & 3 deletions g3tiles/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ fn main() -> anyhow::Result<()> {
None
};

let _workers_guard =
g3_daemon::runtime::worker::spawn_workers().context("failed to spawn workers")?;
let ret = tokio_run(&proc_args);

if let Some(handlers) = stat_join {
Expand Down Expand Up @@ -115,9 +117,6 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {

g3tiles::signal::register().context("failed to setup signal handler")?;

let _workers_guard = g3_daemon::runtime::worker::spawn_workers()
.await
.context("failed to spawn workers")?;
match load_and_spawn().await {
Ok(_) => g3_daemon::control::upgrade::finish(),
Err(e) => {
Expand Down
22 changes: 10 additions & 12 deletions lib/g3-daemon/src/runtime/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,17 @@ fn build_cpu_core_worker_map() {
let _ = CPU_CORE_WORKER_MAP.set(map);
}

pub async fn spawn_workers() -> anyhow::Result<Option<WorkersGuard>> {
pub fn spawn_workers() -> anyhow::Result<Option<WorkersGuard>> {
if let Some(config) = crate::runtime::config::get_worker_config() {
let guard = config
.start(|id, handle, cpu_affinity| {
super::metrics::add_tokio_stats(handle.metrics(), format!("worker-{id}"));
let worker_handle = WorkerHandle {
handle,
id,
cpu_affinity,
};
WORKER_HANDLERS.with_mut(|vec| vec.push(worker_handle));
})
.await?;
let guard = config.start(|id, handle, cpu_affinity| {
super::metrics::add_tokio_stats(handle.metrics(), format!("worker-{id}"));
let worker_handle = WorkerHandle {
handle,
id,
cpu_affinity,
};
WORKER_HANDLERS.with_mut(|vec| vec.push(worker_handle));
})?;
build_cpu_core_worker_map();
Ok(Some(guard))
} else {
Expand Down
140 changes: 114 additions & 26 deletions lib/g3-runtime/src/unaided/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,35 @@

use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;

use anyhow::anyhow;
use log::{error, trace, warn};
use tokio::runtime::Handle;
use tokio::sync::{oneshot, watch};
use tokio::runtime::{Handle, Runtime};
use tokio::sync::watch;

use g3_compat::CpuAffinity;

#[cfg(feature = "yaml")]
mod yaml;

pub struct WorkersGuard {
pub struct MvWorkersGuard {
_rt_list: Vec<Runtime>,
}

pub struct CvWorkersGuard {
_close_sender: watch::Sender<()>,
}

pub enum WorkersGuard {
VariantC(CvWorkersGuard),
VariantM(MvWorkersGuard),
}

pub struct UnaidedRuntimeConfig {
thread_number: Option<NonZeroUsize>,
thread_number_total: NonZeroUsize,
thread_number_per_rt: NonZeroUsize,
thread_stack_size: Option<usize>,
sched_affinity: HashMap<usize, CpuAffinity>,
max_io_events_per_tick: Option<usize>,
Expand All @@ -50,8 +62,11 @@ impl Default for UnaidedRuntimeConfig {

impl UnaidedRuntimeConfig {
pub fn new() -> Self {
let target_thread_number =
std::thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
UnaidedRuntimeConfig {
thread_number: None,
thread_number_total: target_thread_number,
thread_number_per_rt: NonZeroUsize::MIN,
thread_stack_size: None,
sched_affinity: HashMap::new(),
max_io_events_per_tick: None,
Expand All @@ -62,12 +77,12 @@ impl UnaidedRuntimeConfig {
}
}

pub fn set_thread_number(&mut self, num: usize) {
if let Ok(n) = NonZeroUsize::try_from(num) {
self.thread_number = Some(n);
} else {
self.thread_number = None;
}
pub fn set_thread_number_per_rt(&mut self, num: NonZeroUsize) {
self.thread_number_per_rt = num;
}

pub fn set_thread_number_total(&mut self, num: NonZeroUsize) {
self.thread_number_total = num;
}

pub fn set_thread_stack_size(&mut self, size: usize) {
Expand All @@ -86,7 +101,12 @@ impl UnaidedRuntimeConfig {
target_os = "netbsd",
))]
pub fn set_mapped_sched_affinity(&mut self) -> anyhow::Result<()> {
let n = self.num_threads();
if self.thread_number_per_rt.get() != 1 {
return Err(anyhow!(
"unable to set CPU affinity for multi thread worker runtime"
));
}
let n = self.thread_number_total.get();
for i in 0..n {
let mut cpu = CpuAffinity::default();
cpu.add_id(i)
Expand Down Expand Up @@ -118,21 +138,80 @@ impl UnaidedRuntimeConfig {
}
}

pub async fn start<F>(&self, recv_handle: F) -> anyhow::Result<WorkersGuard>
pub fn check(&mut self) -> anyhow::Result<()> {
let threads_per_rt = self.thread_number_per_rt.get();
if self.thread_number_total.get() % threads_per_rt != 0 {
return Err(anyhow!(
"total thread number {} is not dividable by per-runtime thread number {}",
self.thread_number_total,
threads_per_rt
));
}
Ok(())
}

fn start_variant_m<F>(
&self,
recv_handle: F,
rt_num: usize,
rt_thread_num: usize,
) -> anyhow::Result<WorkersGuard>
where
F: Fn(usize, Handle, Option<CpuAffinity>),
{
let mut rt_list = Vec::with_capacity(rt_num);
for i in 0..rt_num {
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
rt_builder.worker_threads(rt_thread_num);
if let Some(stack_size) = self.thread_stack_size {
rt_builder.thread_stack_size(stack_size);
}
if let Some(n) = self.max_io_events_per_tick {
rt_builder.max_io_events_per_tick(n);
}
rt_builder.enable_all();

rt_builder.thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("worker-{i}#{id}")
});

if let Some(cpu_affinity) = self.sched_affinity.get(&i).cloned() {
rt_builder.on_thread_start(move || {
if let Err(e) = cpu_affinity.apply_to_local_thread() {
warn!("failed to set sched affinity for worker thread {i}: {e}");
}
});
}

match rt_builder.build() {
Ok(rt) => {
let cpu_affinity = self.sched_affinity.get(&i).cloned();
recv_handle(i, rt.handle().clone(), cpu_affinity);
rt_list.push(rt);
}
Err(e) => return Err(anyhow!("failed to create tokio worker runtime {i}: {e}")),
}
}

Ok(WorkersGuard::VariantM(MvWorkersGuard { _rt_list: rt_list }))
}

fn start_variant_c<F>(&self, recv_handle: F, thread_num: usize) -> anyhow::Result<WorkersGuard>
where
F: Fn(usize, Handle, Option<CpuAffinity>),
{
let (close_w, _close_r) = watch::channel(());

let n = self.num_threads();
for i in 0..n {
for i in 0..thread_num {
let mut close_r = close_w.subscribe();
let (sender, receiver) = oneshot::channel();
let (sender, receiver) = mpsc::sync_channel(1);

let mut thread_builder = std::thread::Builder::new().name(format!("worker#{i}"));

if let Some(thread_stack_size) = self.thread_stack_size {
thread_builder = thread_builder.stack_size(thread_stack_size);
if let Some(stack_size) = self.thread_stack_size {
thread_builder = thread_builder.stack_size(stack_size);
}

let cpu_set = self.sched_affinity.get(&i).cloned();
Expand Down Expand Up @@ -190,7 +269,7 @@ impl UnaidedRuntimeConfig {
})
.map_err(|e| anyhow!("failed to spawn worker thread {i}: {e}"))?;

match receiver.await {
match receiver.recv() {
Ok(handle) => {
let cpu_affinity = self.sched_affinity.get(&i).cloned();
recv_handle(i, handle, cpu_affinity)
Expand All @@ -203,15 +282,24 @@ impl UnaidedRuntimeConfig {
}
}

Ok(WorkersGuard {
Ok(WorkersGuard::VariantC(CvWorkersGuard {
_close_sender: close_w,
})
}))
}

fn num_threads(&self) -> usize {
self.thread_number
.or(std::thread::available_parallelism().ok())
.map(|v| v.get())
.unwrap_or(1)
pub fn start<F>(&self, recv_handle: F) -> anyhow::Result<WorkersGuard>
where
F: Fn(usize, Handle, Option<CpuAffinity>),
{
let threads_per_rt = self.thread_number_per_rt.get();
if threads_per_rt == 1 {
self.start_variant_c(recv_handle, self.thread_number_total.get())
} else {
self.start_variant_m(
recv_handle,
self.thread_number_total.get() / threads_per_rt,
threads_per_rt,
)
}
}
}
Loading

0 comments on commit c87026c

Please sign in to comment.