From 072adf1f87a22afa1804be8f3a49d5b836c61429 Mon Sep 17 00:00:00 2001
From: Quaternions <krakow20@gmail.com>
Date: Mon, 27 Jan 2025 10:28:37 -0800
Subject: [PATCH] download_assets with naive exponential backoff

---
 src/main.rs   |   7 +-
 src/roblox.rs | 183 +++++++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 161 insertions(+), 29 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index e54d7b1..8e0555d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -22,10 +22,11 @@ enum Commands{
 	ConvertTextures,
 }
 
-fn main() -> AResult<()> {
-	let cli = Cli::parse();
+#[tokio::main]
+async fn main()->AResult<()>{
+	let cli=Cli::parse();
 	match cli.command{
-		Commands::Roblox(commands)=>commands.run(),
+		Commands::Roblox(commands)=>commands.run().await,
 		Commands::Source(commands)=>commands.run(),
 		Commands::ConvertTextures=>common::convert_textures(),
 	}
diff --git a/src/roblox.rs b/src/roblox.rs
index b22b62f..e70b153 100644
--- a/src/roblox.rs
+++ b/src/roblox.rs
@@ -3,8 +3,12 @@ use std::io::{Cursor,Read,Seek};
 use std::collections::HashSet;
 use clap::{Args,Subcommand};
 use anyhow::Result as AResult;
+use futures::StreamExt;
 use rbx_dom_weak::Instance;
 use strafesnet_deferred_loader::rbxassetid::RobloxAssetId;
+use tokio::io::AsyncReadExt;
+
+const DOWNLOAD_LIMIT:usize=16;
 
 #[derive(Subcommand)]
 pub enum Commands{
@@ -21,27 +25,40 @@ pub struct RobloxToSNFSubcommand {
 }
 #[derive(Args)]
 pub struct DownloadAssetsSubcommand{
-	#[arg(long,required=true)]
-	roblox_files:Vec<PathBuf>
+	#[arg(required=true)]
+	roblox_files:Vec<PathBuf>,
+	// #[arg(long)]
+	// cookie_file:Option<String>,
 }
 
 impl Commands{
-	pub fn run(self)->AResult<()>{
+	pub async fn run(self)->AResult<()>{
 		match self{
 			Commands::RobloxToSNF(subcommand)=>roblox_to_snf(subcommand.input_files,subcommand.output_folder),
-			Commands::DownloadAssets(subcommand)=>download_assets(subcommand.roblox_files),
+			Commands::DownloadAssets(subcommand)=>download_assets(
+				subcommand.roblox_files,
+				rbx_asset::cookie::Cookie::new("".to_string()),
+			).await,
 		}
 	}
 }
 
-fn load_dom<R:Read+Seek>(mut input:R)->AResult<rbx_dom_weak::WeakDom>{
+#[allow(unused)]
+#[derive(Debug)]
+enum LoadDomError{
+	IO(std::io::Error),
+	Binary(rbx_binary::DecodeError),
+	Xml(rbx_xml::DecodeError),
+	UnknownFormat,
+}
+fn load_dom<R:Read+Seek>(mut input:R)->Result<rbx_dom_weak::WeakDom,LoadDomError>{
 	let mut first_8=[0u8;8];
-	input.read_exact(&mut first_8)?;
-	input.rewind()?;
+	input.read_exact(&mut first_8).map_err(LoadDomError::IO)?;
+	input.rewind().map_err(LoadDomError::IO)?;
 	match &first_8{
-		b"<roblox!"=>rbx_binary::from_reader(input).map_err(anyhow::Error::msg),
-		b"<roblox "=>rbx_xml::from_reader(input,rbx_xml::DecodeOptions::default()).map_err(anyhow::Error::msg),
-		_=>Err(anyhow::Error::msg("unsupported file type")),
+		b"<roblox!"=>rbx_binary::from_reader(input).map_err(LoadDomError::Binary),
+		b"<roblox "=>rbx_xml::from_reader(input,rbx_xml::DecodeOptions::default()).map_err(LoadDomError::Xml),
+		_=>Err(LoadDomError::UnknownFormat),
 	}
 }
 
@@ -80,13 +97,13 @@ fn accumulate_content_id(content_list:&mut HashSet<RobloxAssetId>,object:&Instan
 			println!("Content failed to parse into AssetID: {:?}",content);
 		}
 	}else{
-		println!("property={} does not exist for class={}",object.class.as_str(),property);
+		println!("property={} does not exist for class={}",property,object.class.as_str());
 	}
 }
-fn read_entire_file(path:impl AsRef<Path>)->Result<Cursor<Vec<u8>>,std::io::Error>{
-	let mut file=std::fs::File::open(path)?;
+async fn read_entire_file(path:impl AsRef<Path>)->Result<Cursor<Vec<u8>>,std::io::Error>{
+	let mut file=tokio::fs::File::open(path).await?;
 	let mut data=Vec::new();
-	file.read_to_end(&mut data)?;
+	file.read_to_end(&mut data).await?;
 	Ok(Cursor::new(data))
 }
 #[derive(Default)]
@@ -123,11 +140,18 @@ impl UniqueAssets{
 		}
 	}
 }
-fn unique_assets(path:&Path)->AResult<UniqueAssets>{
+
+#[allow(unused)]
+#[derive(Debug)]
+enum UniqueAssetError{
+	IO(std::io::Error),
+	LoadDom(LoadDomError),
+}
+async fn unique_assets(path:&Path)->Result<UniqueAssets,UniqueAssetError>{
 	// read entire file
 	let mut assets=UniqueAssets::default();
-	let data=read_entire_file(path)?;
-	let dom=load_dom(data)?;
+	let data=read_entire_file(path).await.map_err(UniqueAssetError::IO)?;
+	let dom=load_dom(data).map_err(UniqueAssetError::LoadDom)?;
 	for object in dom.into_raw().1.into_values(){
 		assets.collect(&object);
 	}
@@ -135,18 +159,125 @@ fn unique_assets(path:&Path)->AResult<UniqueAssets>{
 }
 struct UniqueAssetsResult{
 	path:std::path::PathBuf,
-	result:AResult<UniqueAssets>,
+	result:Result<UniqueAssets,UniqueAssetError>,
 }
-fn do_thread(path:std::path::PathBuf,send:std::sync::mpsc::Sender<UniqueAssetsResult>){
-	std::thread::spawn(move ||{
-		let result=unique_assets(path.as_path());
-		send.send(UniqueAssetsResult{
+enum DownloadType{
+	Texture(RobloxAssetId),
+	Mesh(RobloxAssetId),
+	Union(RobloxAssetId),
+}
+impl DownloadType{
+	fn path(&self)->PathBuf{
+		match self{
+			DownloadType::Texture(asset_id)=>format!("downloads/textures/{}",asset_id.0.to_string()).into(),
+			DownloadType::Mesh(asset_id)=>format!("downloads/meshes/{}",asset_id.0.to_string()).into(),
+			DownloadType::Union(asset_id)=>format!("downloads/unions/{}",asset_id.0.to_string()).into(),
+		}
+	}
+	fn asset_id(&self)->u64{
+		match self{
+			DownloadType::Texture(asset_id)=>asset_id.0,
+			DownloadType::Mesh(asset_id)=>asset_id.0,
+			DownloadType::Union(asset_id)=>asset_id.0,
+		}
+	}
+}
+async fn download_assets(paths:Vec<PathBuf>,cookie:rbx_asset::cookie::Cookie)->AResult<()>{
+	tokio::try_join!(
+		tokio::fs::create_dir_all("downloads/textures"),
+		tokio::fs::create_dir_all("downloads/meshes"),
+		tokio::fs::create_dir_all("downloads/unions"),
+	)?;
+	let context=rbx_asset::cookie::CookieContext::new(cookie);
+	let thread_limit=std::thread::available_parallelism()?.get();
+	// read files multithreaded
+	// produce UniqueAssetsResult per file
+	// insert into global unique assets guy, add to download queue if the asset is globally unique and does not already exist on disk
+	let mut globally_unique_assets=UniqueAssets::default();
+	futures::stream::iter(paths).map(|path|async{
+		let result=unique_assets(path.as_path()).await;
+		UniqueAssetsResult{
 			path,
 			result,
-		}).unwrap();
-	});
-}
-fn download_assets(paths:Vec<PathBuf>)->AResult<()>{
+		}
+	})
+	.buffer_unordered(thread_limit)
+	.flat_map(|UniqueAssetsResult{path,result}|{
+		futures::stream::iter(match result{
+			Ok(unique_assets)=>{
+				let mut download_instructions=Vec::new();
+				for texture_id in unique_assets.textures{
+					if globally_unique_assets.textures.insert(RobloxAssetId(texture_id.0)){
+						download_instructions.push(DownloadType::Texture(texture_id));
+					}
+				}
+				for mesh_id in unique_assets.meshes{
+					if globally_unique_assets.meshes.insert(RobloxAssetId(mesh_id.0)){
+						download_instructions.push(DownloadType::Mesh(mesh_id));
+					}
+				}
+				for union_id in unique_assets.unions{
+					if globally_unique_assets.unions.insert(RobloxAssetId(union_id.0)){
+						download_instructions.push(DownloadType::Union(union_id));
+					}
+				}
+				download_instructions
+			},
+			Err(e)=>{
+				println!("file {:?} had error so sad: {e:?}",path.as_path().file_stem());
+				vec![]
+			}
+		})
+	})
+	.map(|download_instruction|async{
+		let download_instruction=download_instruction;
+		// check if file exists on disk
+		let path=download_instruction.path();
+		if tokio::fs::try_exists(path.as_path()).await?{
+			return Ok::<_,std::io::Error>(());
+		}
+		let asset_id=download_instruction.asset_id();
+		// if not, download file
+		let mut retry=0;
+		const BACKOFF_MUL:f32=1.3956124250860895286;//exp(1/3)
+		let mut backoff=1000f32;
+		let asset_result=loop{
+			let asset_result=context.get_asset(rbx_asset::cookie::GetAssetRequest{
+				asset_id,
+				version:None,
+			}).await;
+			match asset_result{
+				Ok(asset_result)=>break Some(asset_result),
+				Err(rbx_asset::cookie::GetError::Response(rbx_asset::ResponseError::StatusCodeWithUrlAndBody(scwuab)))=>{
+					if scwuab.status_code.as_u16()==429{
+						if retry==12{
+							println!("Giving up asset download {asset_id}");
+							break None;
+						}
+						println!("Roblox killing me, waiting {:.0}ms...",backoff);
+						tokio::time::sleep(std::time::Duration::from_millis(backoff as u64)).await;
+						backoff*=BACKOFF_MUL;
+						retry+=1;
+					}else{
+						println!("weird scuwab error: {scwuab:?}");
+						break None;
+					}
+				},
+				Err(e)=>{
+					println!("sadly error: {e}");
+					break None;
+				},
+			}
+		};
+
+		if let Some(data)=asset_result{
+			tokio::fs::write(path,data).await?;
+		}
+		Ok(())
+	})
+	.buffer_unordered(DOWNLOAD_LIMIT)
+	//there's gotta be a better way to just make it run to completion
+	.for_each(|_|async{}).await;
 	Ok(())
 }