From f77dd14ac9ebeb32e815bb612b3c983d2270065e Mon Sep 17 00:00:00 2001 From: Quaternions Date: Thu, 12 Dec 2024 18:14:36 -0800 Subject: [PATCH] validation: parallel request processing --- validation/src/main.rs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/validation/src/main.rs b/validation/src/main.rs index 11dff45..54a1cdd 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -28,6 +28,8 @@ pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClie pub const GROUP_STRAFESNET:u64=6980477; +pub const PARALLEL_REQUESTS:usize=16; + #[tokio::main] async fn main()->Result<(),StartupError>{ // talk to roblox through STRAFESNET_CI2 account @@ -67,17 +69,30 @@ async fn main()->Result<(),StartupError>{ // run futures let (mut messages,message_handler)=tokio::try_join!(nats_fut,message_handler_fut)?; - // nats consumer thread - tokio::spawn(async move{ - while let Some(message_result)=messages.next().await{ - match message_handler.handle_message_result(message_result).await{ - Ok(())=>println!("[Validation] Success, hooray!"), - Err(e)=>println!("[Validation] There was an error, oopsie! {e}"), - } + // process up to PARALLEL_REQUESTS in parallel + let main_loop=async move{ + static SEM:tokio::sync::Semaphore=tokio::sync::Semaphore::const_new(PARALLEL_REQUESTS); + // use memory leak to make static lifetime + let message_handler=Box::leak(Box::new(message_handler)); + // acquire a permit before attempting to receive a message, exit if either fails + while let (Ok(permit),Some(message_result))=(SEM.acquire().await,messages.next().await){ + // handle the message on a new thread (mainly to decode the model file) + tokio::spawn(async{ + match message_handler.handle_message_result(message_result).await{ + Ok(())=>println!("[Validation] Success, hooray!"), + Err(e)=>println!("[Validation] There was an error, oopsie! {e}"), + } + // explicitly call drop to make the move semantics and permit release more obvious + core::mem::drop(permit); + }); } - }); + }; - sig_term.recv().await; + // race sigkill and main loop termination and then die + tokio::select!{ + _=sig_term.recv()=>(), + _=main_loop=>(), + }; Ok(()) }