decode in parallel, download one at a time

this will probably still hit the roblox rate limit
This commit is contained in:
Quaternions 2025-01-27 13:30:47 -08:00
parent 072adf1f87
commit 2ce8d4e2f8

@ -3,7 +3,6 @@ use std::io::{Cursor,Read,Seek};
use std::collections::HashSet;
use clap::{Args,Subcommand};
use anyhow::Result as AResult;
use futures::StreamExt;
use rbx_dom_weak::Instance;
use strafesnet_deferred_loader::rbxassetid::RobloxAssetId;
use tokio::io::AsyncReadExt;
@ -157,10 +156,6 @@ async fn unique_assets(path:&Path)->Result<UniqueAssets,UniqueAssetError>{
}
Ok(assets)
}
struct UniqueAssetsResult{
path:std::path::PathBuf,
result:Result<UniqueAssets,UniqueAssetError>,
}
enum DownloadType{
Texture(RobloxAssetId),
Mesh(RobloxAssetId),
@ -182,102 +177,133 @@ impl DownloadType{
}
}
}
#[derive(Default,Debug)]
struct Stats{
total_assets:u32,
cached_assets:u32,
downloaded_assets:u32,
failed_downloads:u32,
timed_out_downloads:u32,
}
async fn download_retry(stats:&mut Stats,context:&rbx_asset::cookie::CookieContext,download_instruction:DownloadType)->Result<(),std::io::Error>{
stats.total_assets+=1;
let download_instruction=download_instruction;
// check if file exists on disk
let path=download_instruction.path();
if tokio::fs::try_exists(path.as_path()).await?{
stats.cached_assets+=1;
return Ok(());
}
let asset_id=download_instruction.asset_id();
// if not, download file
let mut retry=0;
const BACKOFF_MUL:f32=1.3956124250860895286;//exp(1/3)
let mut backoff=1000f32;
loop{
let asset_result=context.get_asset(rbx_asset::cookie::GetAssetRequest{
asset_id,
version:None,
}).await;
match asset_result{
Ok(asset_result)=>{
stats.downloaded_assets+=1;
tokio::fs::write(path,asset_result).await?;
break;
},
Err(rbx_asset::cookie::GetError::Response(rbx_asset::ResponseError::StatusCodeWithUrlAndBody(scwuab)))=>{
if scwuab.status_code.as_u16()==429{
if retry==12{
println!("Giving up asset download {asset_id}");
stats.timed_out_downloads+=1;
break;
}
println!("Hit roblox rate limit, waiting {:.0}ms...",backoff);
tokio::time::sleep(std::time::Duration::from_millis(backoff as u64)).await;
backoff*=BACKOFF_MUL;
retry+=1;
}else{
stats.failed_downloads+=1;
println!("weird scuwab error: {scwuab:?}");
break;
}
},
Err(e)=>{
stats.failed_downloads+=1;
println!("sadly error: {e}");
break;
},
}
}
Ok(())
}
async fn download_assets(paths:Vec<PathBuf>,cookie:rbx_asset::cookie::Cookie)->AResult<()>{
tokio::try_join!(
tokio::fs::create_dir_all("downloads/textures"),
tokio::fs::create_dir_all("downloads/meshes"),
tokio::fs::create_dir_all("downloads/unions"),
)?;
let context=rbx_asset::cookie::CookieContext::new(cookie);
// use mpsc
let thread_limit=std::thread::available_parallelism()?.get();
let (send,mut recv)=tokio::sync::mpsc::channel(DOWNLOAD_LIMIT);
// map decode dispatcher
// read files multithreaded
// produce UniqueAssetsResult per file
// insert into global unique assets guy, add to download queue if the asset is globally unique and does not already exist on disk
tokio::spawn(async move{
// move send so it gets dropped when all maps have been decoded
// closing the channel
let mut it=paths.into_iter();
static SEM:tokio::sync::Semaphore=tokio::sync::Semaphore::const_new(0);
SEM.add_permits(thread_limit);
while let (Ok(permit),Some(path))=(SEM.acquire().await,it.next()){
let send=send.clone();
tokio::spawn(async move{
let result=unique_assets(path.as_path()).await;
_=send.send(result).await;
drop(permit);
});
}
});
// download manager
// insert into global unique assets guy
// add to download queue if the asset is globally unique and does not already exist on disk
let mut stats=Stats::default();
let context=rbx_asset::cookie::CookieContext::new(cookie);
let mut globally_unique_assets=UniqueAssets::default();
futures::stream::iter(paths).map(|path|async{
let result=unique_assets(path.as_path()).await;
UniqueAssetsResult{
path,
result,
}
})
.buffer_unordered(thread_limit)
.flat_map(|UniqueAssetsResult{path,result}|{
futures::stream::iter(match result{
Ok(unique_assets)=>{
let mut download_instructions=Vec::new();
for texture_id in unique_assets.textures{
if globally_unique_assets.textures.insert(RobloxAssetId(texture_id.0)){
download_instructions.push(DownloadType::Texture(texture_id));
}
}
for mesh_id in unique_assets.meshes{
if globally_unique_assets.meshes.insert(RobloxAssetId(mesh_id.0)){
download_instructions.push(DownloadType::Mesh(mesh_id));
}
}
for union_id in unique_assets.unions{
if globally_unique_assets.unions.insert(RobloxAssetId(union_id.0)){
download_instructions.push(DownloadType::Union(union_id));
}
}
download_instructions
},
// pop a job = retry_queue.pop_front() or ingest(recv.recv().await)
// SLOW MODE:
// acquire all permits
// drop all permits
// pop one job
// if it succeeds go into fast mode
// FAST MODE:
// acquire one permit
// pop a job
while let Some(result)=recv.recv().await{
let unique_assets=match result{
Ok(unique_assets)=>unique_assets,
Err(e)=>{
println!("file {:?} had error so sad: {e:?}",path.as_path().file_stem());
vec![]
}
})
})
.map(|download_instruction|async{
let download_instruction=download_instruction;
// check if file exists on disk
let path=download_instruction.path();
if tokio::fs::try_exists(path.as_path()).await?{
return Ok::<_,std::io::Error>(());
}
let asset_id=download_instruction.asset_id();
// if not, download file
let mut retry=0;
const BACKOFF_MUL:f32=1.3956124250860895286;//exp(1/3)
let mut backoff=1000f32;
let asset_result=loop{
let asset_result=context.get_asset(rbx_asset::cookie::GetAssetRequest{
asset_id,
version:None,
}).await;
match asset_result{
Ok(asset_result)=>break Some(asset_result),
Err(rbx_asset::cookie::GetError::Response(rbx_asset::ResponseError::StatusCodeWithUrlAndBody(scwuab)))=>{
if scwuab.status_code.as_u16()==429{
if retry==12{
println!("Giving up asset download {asset_id}");
break None;
}
println!("Roblox killing me, waiting {:.0}ms...",backoff);
tokio::time::sleep(std::time::Duration::from_millis(backoff as u64)).await;
backoff*=BACKOFF_MUL;
retry+=1;
}else{
println!("weird scuwab error: {scwuab:?}");
break None;
}
},
Err(e)=>{
println!("sadly error: {e}");
break None;
},
}
println!("error: {e:?}");
continue;
},
};
if let Some(data)=asset_result{
tokio::fs::write(path,data).await?;
for texture_id in unique_assets.textures{
if globally_unique_assets.textures.insert(RobloxAssetId(texture_id.0)){
download_retry(&mut stats,&context,DownloadType::Texture(texture_id)).await?;
}
}
Ok(())
})
.buffer_unordered(DOWNLOAD_LIMIT)
//there's gotta be a better way to just make it run to completion
.for_each(|_|async{}).await;
for mesh_id in unique_assets.meshes{
if globally_unique_assets.meshes.insert(RobloxAssetId(mesh_id.0)){
download_retry(&mut stats,&context,DownloadType::Mesh(mesh_id)).await?;
}
}
for union_id in unique_assets.unions{
if globally_unique_assets.unions.insert(RobloxAssetId(union_id.0)){
download_retry(&mut stats,&context,DownloadType::Union(union_id)).await?;
}
}
}
dbg!(stats);
Ok(())
}