diff --git a/src/roblox.rs b/src/roblox.rs index e70b153..a39a9d1 100644 --- a/src/roblox.rs +++ b/src/roblox.rs @@ -3,7 +3,6 @@ use std::io::{Cursor,Read,Seek}; use std::collections::HashSet; use clap::{Args,Subcommand}; use anyhow::Result as AResult; -use futures::StreamExt; use rbx_dom_weak::Instance; use strafesnet_deferred_loader::rbxassetid::RobloxAssetId; use tokio::io::AsyncReadExt; @@ -157,10 +156,6 @@ async fn unique_assets(path:&Path)->Result<UniqueAssets,UniqueAssetError>{ } Ok(assets) } -struct UniqueAssetsResult{ - path:std::path::PathBuf, - result:Result<UniqueAssets,UniqueAssetError>, -} enum DownloadType{ Texture(RobloxAssetId), Mesh(RobloxAssetId), @@ -182,102 +177,133 @@ impl DownloadType{ } } } +#[derive(Default,Debug)] +struct Stats{ + total_assets:u32, + cached_assets:u32, + downloaded_assets:u32, + failed_downloads:u32, + timed_out_downloads:u32, +} +async fn download_retry(stats:&mut Stats,context:&rbx_asset::cookie::CookieContext,download_instruction:DownloadType)->Result<(),std::io::Error>{ + stats.total_assets+=1; + let download_instruction=download_instruction; + // check if file exists on disk + let path=download_instruction.path(); + if tokio::fs::try_exists(path.as_path()).await?{ + stats.cached_assets+=1; + return Ok(()); + } + let asset_id=download_instruction.asset_id(); + // if not, download file + let mut retry=0; + const BACKOFF_MUL:f32=1.3956124250860895286;//exp(1/3) + let mut backoff=1000f32; + loop{ + let asset_result=context.get_asset(rbx_asset::cookie::GetAssetRequest{ + asset_id, + version:None, + }).await; + match asset_result{ + Ok(asset_result)=>{ + stats.downloaded_assets+=1; + tokio::fs::write(path,asset_result).await?; + break; + }, + Err(rbx_asset::cookie::GetError::Response(rbx_asset::ResponseError::StatusCodeWithUrlAndBody(scwuab)))=>{ + if scwuab.status_code.as_u16()==429{ + if retry==12{ + println!("Giving up asset download {asset_id}"); + stats.timed_out_downloads+=1; + break; + } + println!("Hit roblox rate limit, waiting {:.0}ms...",backoff); + tokio::time::sleep(std::time::Duration::from_millis(backoff as u64)).await; + backoff*=BACKOFF_MUL; + retry+=1; + }else{ + stats.failed_downloads+=1; + println!("weird scuwab error: {scwuab:?}"); + break; + } + }, + Err(e)=>{ + stats.failed_downloads+=1; + println!("sadly error: {e}"); + break; + }, + } + } + Ok(()) +} async fn download_assets(paths:Vec<PathBuf>,cookie:rbx_asset::cookie::Cookie)->AResult<()>{ tokio::try_join!( tokio::fs::create_dir_all("downloads/textures"), tokio::fs::create_dir_all("downloads/meshes"), tokio::fs::create_dir_all("downloads/unions"), )?; - let context=rbx_asset::cookie::CookieContext::new(cookie); + // use mpsc let thread_limit=std::thread::available_parallelism()?.get(); + let (send,mut recv)=tokio::sync::mpsc::channel(DOWNLOAD_LIMIT); + // map decode dispatcher // read files multithreaded // produce UniqueAssetsResult per file - // insert into global unique assets guy, add to download queue if the asset is globally unique and does not already exist on disk + tokio::spawn(async move{ + // move send so it gets dropped when all maps have been decoded + // closing the channel + let mut it=paths.into_iter(); + static SEM:tokio::sync::Semaphore=tokio::sync::Semaphore::const_new(0); + SEM.add_permits(thread_limit); + while let (Ok(permit),Some(path))=(SEM.acquire().await,it.next()){ + let send=send.clone(); + tokio::spawn(async move{ + let result=unique_assets(path.as_path()).await; + _=send.send(result).await; + drop(permit); + }); + } + }); + // download manager + // insert into global unique assets guy + // add to download queue if the asset is globally unique and does not already exist on disk + let mut stats=Stats::default(); + let context=rbx_asset::cookie::CookieContext::new(cookie); let mut globally_unique_assets=UniqueAssets::default(); - futures::stream::iter(paths).map(|path|async{ - let result=unique_assets(path.as_path()).await; - UniqueAssetsResult{ - path, - result, - } - }) - .buffer_unordered(thread_limit) - .flat_map(|UniqueAssetsResult{path,result}|{ - futures::stream::iter(match result{ - Ok(unique_assets)=>{ - let mut download_instructions=Vec::new(); - for texture_id in unique_assets.textures{ - if globally_unique_assets.textures.insert(RobloxAssetId(texture_id.0)){ - download_instructions.push(DownloadType::Texture(texture_id)); - } - } - for mesh_id in unique_assets.meshes{ - if globally_unique_assets.meshes.insert(RobloxAssetId(mesh_id.0)){ - download_instructions.push(DownloadType::Mesh(mesh_id)); - } - } - for union_id in unique_assets.unions{ - if globally_unique_assets.unions.insert(RobloxAssetId(union_id.0)){ - download_instructions.push(DownloadType::Union(union_id)); - } - } - download_instructions - }, + // pop a job = retry_queue.pop_front() or ingest(recv.recv().await) + // SLOW MODE: + // acquire all permits + // drop all permits + // pop one job + // if it succeeds go into fast mode + // FAST MODE: + // acquire one permit + // pop a job + while let Some(result)=recv.recv().await{ + let unique_assets=match result{ + Ok(unique_assets)=>unique_assets, Err(e)=>{ - println!("file {:?} had error so sad: {e:?}",path.as_path().file_stem()); - vec![] - } - }) - }) - .map(|download_instruction|async{ - let download_instruction=download_instruction; - // check if file exists on disk - let path=download_instruction.path(); - if tokio::fs::try_exists(path.as_path()).await?{ - return Ok::<_,std::io::Error>(()); - } - let asset_id=download_instruction.asset_id(); - // if not, download file - let mut retry=0; - const BACKOFF_MUL:f32=1.3956124250860895286;//exp(1/3) - let mut backoff=1000f32; - let asset_result=loop{ - let asset_result=context.get_asset(rbx_asset::cookie::GetAssetRequest{ - asset_id, - version:None, - }).await; - match asset_result{ - Ok(asset_result)=>break Some(asset_result), - Err(rbx_asset::cookie::GetError::Response(rbx_asset::ResponseError::StatusCodeWithUrlAndBody(scwuab)))=>{ - if scwuab.status_code.as_u16()==429{ - if retry==12{ - println!("Giving up asset download {asset_id}"); - break None; - } - println!("Roblox killing me, waiting {:.0}ms...",backoff); - tokio::time::sleep(std::time::Duration::from_millis(backoff as u64)).await; - backoff*=BACKOFF_MUL; - retry+=1; - }else{ - println!("weird scuwab error: {scwuab:?}"); - break None; - } - }, - Err(e)=>{ - println!("sadly error: {e}"); - break None; - }, - } + println!("error: {e:?}"); + continue; + }, }; - - if let Some(data)=asset_result{ - tokio::fs::write(path,data).await?; + for texture_id in unique_assets.textures{ + if globally_unique_assets.textures.insert(RobloxAssetId(texture_id.0)){ + download_retry(&mut stats,&context,DownloadType::Texture(texture_id)).await?; + } } - Ok(()) - }) - .buffer_unordered(DOWNLOAD_LIMIT) - //there's gotta be a better way to just make it run to completion - .for_each(|_|async{}).await; + for mesh_id in unique_assets.meshes{ + if globally_unique_assets.meshes.insert(RobloxAssetId(mesh_id.0)){ + download_retry(&mut stats,&context,DownloadType::Mesh(mesh_id)).await?; + } + } + for union_id in unique_assets.unions{ + if globally_unique_assets.unions.insert(RobloxAssetId(union_id.0)){ + download_retry(&mut stats,&context,DownloadType::Union(union_id)).await?; + } + } + } + + dbg!(stats); Ok(()) }