From b6ac6ce47f38a3cdb250ffb7e603a2a05e40f5ea Mon Sep 17 00:00:00 2001 From: Rhys Lloyd Date: Fri, 6 Mar 2026 09:49:53 -0800 Subject: [PATCH 1/2] validator: switch futures to leaner futures-util --- Cargo.lock | 38 +-------------------- Cargo.toml | 1 + validation/Cargo.toml | 2 +- validation/src/main.rs | 2 +- validation/src/release_submissions_batch.rs | 7 ++-- validation/src/validator.rs | 5 +-- 6 files changed, 11 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd36ae9..f0fad33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1484,21 +1484,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" -[[package]] -name = "futures" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.32" @@ -1506,7 +1491,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -1515,23 +1499,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" -[[package]] -name = "futures-executor" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" - [[package]] name = "futures-macro" version = "0.3.32" @@ -1561,13 +1528,10 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ - "futures-channel", "futures-core", - "futures-io", "futures-macro", "futures-sink", "futures-task", - "memchr", "pin-project-lite", "slab", ] @@ -2411,7 +2375,7 @@ name = "maps-validation" version = "0.1.1" dependencies = [ "async-nats", - "futures", + "futures-util", "heck", "rbx_asset", "rbx_binary", diff --git a/Cargo.toml b/Cargo.toml index c390c6f..cc88b5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ resolver = "2" [workspace.dependencies] async-nats = "0.46.0" +futures-util = "0.3.31" rbx_asset = { version = "0.5.0", features = ["gzip", "rustls-tls"], default-features = false, registry = "strafesnet" } rbx_binary = "2.0.1" rbx_dom_weak = "4.1.0" diff --git a/validation/Cargo.toml b/validation/Cargo.toml index bf92b7d..58fbde9 100644 --- a/validation/Cargo.toml +++ b/validation/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] async-nats.workspace = true -futures = "0.3.31" +futures-util.workspace = true rbx_asset.workspace = true rbx_binary.workspace = true rbx_dom_weak.workspace = true diff --git a/validation/src/main.rs b/validation/src/main.rs index cfc3670..442fe0f 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -1,4 +1,4 @@ -use futures::StreamExt; +use futures_util::StreamExt; mod download; mod grpc; diff --git a/validation/src/release_submissions_batch.rs b/validation/src/release_submissions_batch.rs index 50cfcbf..9dda599 100644 --- a/validation/src/release_submissions_batch.rs +++ b/validation/src/release_submissions_batch.rs @@ -1,4 +1,5 @@ -use futures::StreamExt; +use futures_util::stream::iter as stream_iter; +use futures_util::StreamExt; use crate::download::download_asset_version; use crate::nats_types::ReleaseSubmissionsBatchRequest; @@ -92,7 +93,7 @@ async fn release_inner( .collect(); // fut_download - let fut_download=futures::stream::iter(asset_versions) + let fut_download=stream_iter(asset_versions) .map(|(index,asset_version)|async move{ let modes=download_fut(cloud_context,asset_version).await; (index,modes) @@ -137,7 +138,7 @@ async fn release_inner( } // concurrently dispatch results - let release_results:Vec<_> =futures::stream::iter( + let release_results:Vec<_> =stream_iter( release_info .Submissions .into_iter() diff --git a/validation/src/validator.rs b/validation/src/validator.rs index 68e0766..664f44f 100644 --- a/validation/src/validator.rs +++ b/validation/src/validator.rs @@ -1,4 +1,5 @@ -use futures::TryStreamExt; +use futures_util::stream::iter as stream_iter; +use futures_util::TryStreamExt; use rust_grpc::validator::Policy; use crate::download::download_asset_version; @@ -153,7 +154,7 @@ impl crate::message_handler::MessageHandler{ } // send all script hashes to REST endpoint and retrieve the replacements - futures::stream::iter(script_map.iter_mut().map(Ok)) + stream_iter(script_map.iter_mut().map(Ok)) .try_for_each_concurrent(Some(SCRIPT_CONCURRENCY),|(source,NamePolicy{policy,name})|async{ // get the hash let hash=hash_source(source.as_str()); -- 2.49.1 From d26126c9d3fc7124c908925026079a67ddec942c Mon Sep 17 00:00:00 2001 From: Rhys Lloyd Date: Fri, 6 Mar 2026 10:07:19 -0800 Subject: [PATCH 2/2] combobulator: use up to 16 parallel requests --- Cargo.lock | 1 + combobulator/Cargo.toml | 1 + combobulator/src/process.rs | 72 +++++++++++++++++++++++++++---------- 3 files changed, 56 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0fad33..0a87795 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2356,6 +2356,7 @@ dependencies = [ "async-nats", "aws-config", "aws-sdk-s3", + "futures-util", "map-tool", "rbx_asset", "rbx_binary", diff --git a/combobulator/Cargo.toml b/combobulator/Cargo.toml index c532b4a..83c4452 100644 --- a/combobulator/Cargo.toml +++ b/combobulator/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" async-nats.workspace = true aws-config = { version = "1", features = ["behavior-version-latest"] } aws-sdk-s3 = "1" +futures-util.workspace = true map-tool = { version = "3.0.0", registry = "strafesnet", features = ["roblox"], default-features = false } rbx_asset.workspace = true rbx_binary.workspace = true diff --git a/combobulator/src/process.rs b/combobulator/src/process.rs index a229fd8..b2b3a50 100644 --- a/combobulator/src/process.rs +++ b/combobulator/src/process.rs @@ -3,8 +3,12 @@ use std::io::Cursor; use crate::nats_types::ReleaseMapfixRequest; use crate::s3::S3Cache; +use futures_util::stream::iter as stream_iter; +use futures_util::{StreamExt,TryStreamExt}; use strafesnet_deferred_loader::deferred_loader::LoadFailureMode; +const CONCURRENT_REQUESTS:usize=16; + #[expect(dead_code)] #[derive(Debug)] pub enum ConvertError{ @@ -121,10 +125,9 @@ impl Processor{ let assets=map_tool::roblox::get_unique_assets(&dom); // place textures into 'loader' - let mut texture_loader=crate::loader::TextureLoader::new(); - + let texture_loader=crate::loader::TextureLoader::new(); // process textures: download, cache, convert to DDS - for &id in &assets.textures{ + let texture_loader=stream_iter(assets.textures).map(async|id|{ let asset_id=id.0; let dds_key=S3Cache::texture_dds_key(asset_id); @@ -138,7 +141,9 @@ impl Processor{ map_tool::roblox::convert_texture_to_dds(&data) }else{ println!("[combobulator] Downloading texture {asset_id}"); - let Some(data)=self.download_asset(asset_id).await? else{continue}; + let Some(data)=self.download_asset(asset_id).await? else{ + return Ok(None); + }; // decode while we have ownership let dds_result=map_tool::roblox::convert_texture_to_dds(&data); @@ -152,7 +157,7 @@ impl Processor{ Ok(dds)=>dds, Err(e)=>{ println!("[combobulator] Texture {asset_id} convert error: {e}"); - continue; + return Ok(None); } }; @@ -162,12 +167,19 @@ impl Processor{ }; println!("[combobulator] Texture {asset_id} processed"); - texture_loader.insert(id,dds); - } + Ok(Some((id,dds))) + }) + .buffer_unordered(CONCURRENT_REQUESTS) + .try_fold(texture_loader,async|mut texture_loader,maybe_loaded_texture|{ + if let Some((id,dds))=maybe_loaded_texture{ + texture_loader.insert(id,dds); + } + Ok(texture_loader) + }).await?; - let mut mesh_loader=crate::loader::MeshLoader::new(); + let mesh_loader=crate::loader::MeshLoader::new(); // process meshes - for &id in &assets.meshes{ + let mesh_loader=stream_iter(assets.meshes).map(async|id|{ let asset_id=id.0; let mesh_key=S3Cache::mesh_key(asset_id); @@ -175,7 +187,9 @@ impl Processor{ strafesnet_rbx_loader::mesh::convert(&data) }else{ println!("[combobulator] Downloading mesh {asset_id}"); - let Some(data)=self.download_asset(asset_id).await? else{continue}; + let Some(data)=self.download_asset(asset_id).await? else{ + return Ok(None); + }; // decode while we have ownership let mesh_result=strafesnet_rbx_loader::mesh::convert(&data); @@ -187,13 +201,23 @@ impl Processor{ // handle error after cacheing data match mesh_result{ - Ok(mesh)=>mesh_loader.insert_mesh(id,mesh), - Err(e)=>println!("[combobulator] Mesh {asset_id} convert error: {e}"), + Ok(mesh)=>Ok(Some((id,mesh))), + Err(e)=>{ + println!("[combobulator] Mesh {asset_id} convert error: {e}"); + Ok(None) + }, } - } + }) + .buffer_unordered(CONCURRENT_REQUESTS) + .try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_mesh|{ + if let Some((id,mesh))=maybe_loaded_mesh{ + mesh_loader.insert_mesh(id,mesh); + } + Ok(mesh_loader) + }).await?; // process unions - for &id in &assets.unions{ + let mesh_loader=stream_iter(assets.unions).map(async|id|{ let asset_id=id.0; let union_key=S3Cache::union_key(asset_id); @@ -201,7 +225,9 @@ impl Processor{ rbx_binary::from_reader(data.as_slice()) }else{ println!("[combobulator] Downloading union {asset_id}"); - let Some(data)=self.download_asset(asset_id).await? else{continue}; + let Some(data)=self.download_asset(asset_id).await? else{ + return Ok(None); + }; // decode the data while we have ownership let union_result=rbx_binary::from_reader(data.as_slice()); @@ -213,10 +239,20 @@ impl Processor{ // handle error after cacheing data match union_result{ - Ok(union)=>mesh_loader.insert_union(id,union), - Err(e)=>println!("[combobulator] Union {asset_id} convert error: {e}"), + Ok(union)=>Ok(Some((id,union))), + Err(e)=>{ + println!("[combobulator] Union {asset_id} convert error: {e}"); + Ok(None) + }, } - } + }) + .buffer_unordered(CONCURRENT_REQUESTS) + .try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_union|{ + if let Some((id,union))=maybe_loaded_union{ + mesh_loader.insert_union(id,union); + } + Ok(mesh_loader) + }).await?; // convert to SNF and upload println!("[combobulator] Converting to SNF"); -- 2.49.1