validation: parallel request processing

This commit is contained in:
Quaternions 2024-12-12 18:14:36 -08:00
parent 727e358cf9
commit f77dd14ac9

View File

@ -28,6 +28,8 @@ pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClie
pub const GROUP_STRAFESNET:u64=6980477; pub const GROUP_STRAFESNET:u64=6980477;
pub const PARALLEL_REQUESTS:usize=16;
#[tokio::main] #[tokio::main]
async fn main()->Result<(),StartupError>{ async fn main()->Result<(),StartupError>{
// talk to roblox through STRAFESNET_CI2 account // talk to roblox through STRAFESNET_CI2 account
@ -67,17 +69,30 @@ async fn main()->Result<(),StartupError>{
// run futures // run futures
let (mut messages,message_handler)=tokio::try_join!(nats_fut,message_handler_fut)?; let (mut messages,message_handler)=tokio::try_join!(nats_fut,message_handler_fut)?;
// nats consumer thread // process up to PARALLEL_REQUESTS in parallel
tokio::spawn(async move{ let main_loop=async move{
while let Some(message_result)=messages.next().await{ static SEM:tokio::sync::Semaphore=tokio::sync::Semaphore::const_new(PARALLEL_REQUESTS);
match message_handler.handle_message_result(message_result).await{ // use memory leak to make static lifetime
Ok(())=>println!("[Validation] Success, hooray!"), let message_handler=Box::leak(Box::new(message_handler));
Err(e)=>println!("[Validation] There was an error, oopsie! {e}"), // acquire a permit before attempting to receive a message, exit if either fails
} while let (Ok(permit),Some(message_result))=(SEM.acquire().await,messages.next().await){
// handle the message on a new thread (mainly to decode the model file)
tokio::spawn(async{
match message_handler.handle_message_result(message_result).await{
Ok(())=>println!("[Validation] Success, hooray!"),
Err(e)=>println!("[Validation] There was an error, oopsie! {e}"),
}
// explicitly call drop to make the move semantics and permit release more obvious
core::mem::drop(permit);
});
} }
}); };
sig_term.recv().await; // race sigkill and main loop termination and then die
tokio::select!{
_=sig_term.recv()=>(),
_=main_loop=>(),
};
Ok(()) Ok(())
} }