download_assets with naive exponential backoff

This commit is contained in:
Quaternions 2025-01-27 10:28:37 -08:00
parent db3ab1ec4b
commit 072adf1f87
2 changed files with 161 additions and 29 deletions

@ -22,10 +22,11 @@ enum Commands{
ConvertTextures,
}
fn main() -> AResult<()> {
let cli = Cli::parse();
#[tokio::main]
async fn main()->AResult<()>{
let cli=Cli::parse();
match cli.command{
Commands::Roblox(commands)=>commands.run(),
Commands::Roblox(commands)=>commands.run().await,
Commands::Source(commands)=>commands.run(),
Commands::ConvertTextures=>common::convert_textures(),
}

@ -3,8 +3,12 @@ use std::io::{Cursor,Read,Seek};
use std::collections::HashSet;
use clap::{Args,Subcommand};
use anyhow::Result as AResult;
use futures::StreamExt;
use rbx_dom_weak::Instance;
use strafesnet_deferred_loader::rbxassetid::RobloxAssetId;
use tokio::io::AsyncReadExt;
const DOWNLOAD_LIMIT:usize=16;
#[derive(Subcommand)]
pub enum Commands{
@ -21,27 +25,40 @@ pub struct RobloxToSNFSubcommand {
}
#[derive(Args)]
pub struct DownloadAssetsSubcommand{
#[arg(long,required=true)]
roblox_files:Vec<PathBuf>
#[arg(required=true)]
roblox_files:Vec<PathBuf>,
// #[arg(long)]
// cookie_file:Option<String>,
}
impl Commands{
pub fn run(self)->AResult<()>{
pub async fn run(self)->AResult<()>{
match self{
Commands::RobloxToSNF(subcommand)=>roblox_to_snf(subcommand.input_files,subcommand.output_folder),
Commands::DownloadAssets(subcommand)=>download_assets(subcommand.roblox_files),
Commands::DownloadAssets(subcommand)=>download_assets(
subcommand.roblox_files,
rbx_asset::cookie::Cookie::new("".to_string()),
).await,
}
}
}
fn load_dom<R:Read+Seek>(mut input:R)->AResult<rbx_dom_weak::WeakDom>{
#[allow(unused)]
#[derive(Debug)]
enum LoadDomError{
IO(std::io::Error),
Binary(rbx_binary::DecodeError),
Xml(rbx_xml::DecodeError),
UnknownFormat,
}
fn load_dom<R:Read+Seek>(mut input:R)->Result<rbx_dom_weak::WeakDom,LoadDomError>{
let mut first_8=[0u8;8];
input.read_exact(&mut first_8)?;
input.rewind()?;
input.read_exact(&mut first_8).map_err(LoadDomError::IO)?;
input.rewind().map_err(LoadDomError::IO)?;
match &first_8{
b"<roblox!"=>rbx_binary::from_reader(input).map_err(anyhow::Error::msg),
b"<roblox "=>rbx_xml::from_reader(input,rbx_xml::DecodeOptions::default()).map_err(anyhow::Error::msg),
_=>Err(anyhow::Error::msg("unsupported file type")),
b"<roblox!"=>rbx_binary::from_reader(input).map_err(LoadDomError::Binary),
b"<roblox "=>rbx_xml::from_reader(input,rbx_xml::DecodeOptions::default()).map_err(LoadDomError::Xml),
_=>Err(LoadDomError::UnknownFormat),
}
}
@ -80,13 +97,13 @@ fn accumulate_content_id(content_list:&mut HashSet<RobloxAssetId>,object:&Instan
println!("Content failed to parse into AssetID: {:?}",content);
}
}else{
println!("property={} does not exist for class={}",object.class.as_str(),property);
println!("property={} does not exist for class={}",property,object.class.as_str());
}
}
fn read_entire_file(path:impl AsRef<Path>)->Result<Cursor<Vec<u8>>,std::io::Error>{
let mut file=std::fs::File::open(path)?;
async fn read_entire_file(path:impl AsRef<Path>)->Result<Cursor<Vec<u8>>,std::io::Error>{
let mut file=tokio::fs::File::open(path).await?;
let mut data=Vec::new();
file.read_to_end(&mut data)?;
file.read_to_end(&mut data).await?;
Ok(Cursor::new(data))
}
#[derive(Default)]
@ -123,11 +140,18 @@ impl UniqueAssets{
}
}
}
fn unique_assets(path:&Path)->AResult<UniqueAssets>{
#[allow(unused)]
#[derive(Debug)]
enum UniqueAssetError{
IO(std::io::Error),
LoadDom(LoadDomError),
}
async fn unique_assets(path:&Path)->Result<UniqueAssets,UniqueAssetError>{
// read entire file
let mut assets=UniqueAssets::default();
let data=read_entire_file(path)?;
let dom=load_dom(data)?;
let data=read_entire_file(path).await.map_err(UniqueAssetError::IO)?;
let dom=load_dom(data).map_err(UniqueAssetError::LoadDom)?;
for object in dom.into_raw().1.into_values(){
assets.collect(&object);
}
@ -135,18 +159,125 @@ fn unique_assets(path:&Path)->AResult<UniqueAssets>{
}
struct UniqueAssetsResult{
path:std::path::PathBuf,
result:AResult<UniqueAssets>,
result:Result<UniqueAssets,UniqueAssetError>,
}
fn do_thread(path:std::path::PathBuf,send:std::sync::mpsc::Sender<UniqueAssetsResult>){
std::thread::spawn(move ||{
let result=unique_assets(path.as_path());
send.send(UniqueAssetsResult{
enum DownloadType{
Texture(RobloxAssetId),
Mesh(RobloxAssetId),
Union(RobloxAssetId),
}
impl DownloadType{
fn path(&self)->PathBuf{
match self{
DownloadType::Texture(asset_id)=>format!("downloads/textures/{}",asset_id.0.to_string()).into(),
DownloadType::Mesh(asset_id)=>format!("downloads/meshes/{}",asset_id.0.to_string()).into(),
DownloadType::Union(asset_id)=>format!("downloads/unions/{}",asset_id.0.to_string()).into(),
}
}
fn asset_id(&self)->u64{
match self{
DownloadType::Texture(asset_id)=>asset_id.0,
DownloadType::Mesh(asset_id)=>asset_id.0,
DownloadType::Union(asset_id)=>asset_id.0,
}
}
}
async fn download_assets(paths:Vec<PathBuf>,cookie:rbx_asset::cookie::Cookie)->AResult<()>{
tokio::try_join!(
tokio::fs::create_dir_all("downloads/textures"),
tokio::fs::create_dir_all("downloads/meshes"),
tokio::fs::create_dir_all("downloads/unions"),
)?;
let context=rbx_asset::cookie::CookieContext::new(cookie);
let thread_limit=std::thread::available_parallelism()?.get();
// read files multithreaded
// produce UniqueAssetsResult per file
// insert into global unique assets guy, add to download queue if the asset is globally unique and does not already exist on disk
let mut globally_unique_assets=UniqueAssets::default();
futures::stream::iter(paths).map(|path|async{
let result=unique_assets(path.as_path()).await;
UniqueAssetsResult{
path,
result,
}).unwrap();
});
}
fn download_assets(paths:Vec<PathBuf>)->AResult<()>{
}
})
.buffer_unordered(thread_limit)
.flat_map(|UniqueAssetsResult{path,result}|{
futures::stream::iter(match result{
Ok(unique_assets)=>{
let mut download_instructions=Vec::new();
for texture_id in unique_assets.textures{
if globally_unique_assets.textures.insert(RobloxAssetId(texture_id.0)){
download_instructions.push(DownloadType::Texture(texture_id));
}
}
for mesh_id in unique_assets.meshes{
if globally_unique_assets.meshes.insert(RobloxAssetId(mesh_id.0)){
download_instructions.push(DownloadType::Mesh(mesh_id));
}
}
for union_id in unique_assets.unions{
if globally_unique_assets.unions.insert(RobloxAssetId(union_id.0)){
download_instructions.push(DownloadType::Union(union_id));
}
}
download_instructions
},
Err(e)=>{
println!("file {:?} had error so sad: {e:?}",path.as_path().file_stem());
vec![]
}
})
})
.map(|download_instruction|async{
let download_instruction=download_instruction;
// check if file exists on disk
let path=download_instruction.path();
if tokio::fs::try_exists(path.as_path()).await?{
return Ok::<_,std::io::Error>(());
}
let asset_id=download_instruction.asset_id();
// if not, download file
let mut retry=0;
const BACKOFF_MUL:f32=1.3956124250860895286;//exp(1/3)
let mut backoff=1000f32;
let asset_result=loop{
let asset_result=context.get_asset(rbx_asset::cookie::GetAssetRequest{
asset_id,
version:None,
}).await;
match asset_result{
Ok(asset_result)=>break Some(asset_result),
Err(rbx_asset::cookie::GetError::Response(rbx_asset::ResponseError::StatusCodeWithUrlAndBody(scwuab)))=>{
if scwuab.status_code.as_u16()==429{
if retry==12{
println!("Giving up asset download {asset_id}");
break None;
}
println!("Roblox killing me, waiting {:.0}ms...",backoff);
tokio::time::sleep(std::time::Duration::from_millis(backoff as u64)).await;
backoff*=BACKOFF_MUL;
retry+=1;
}else{
println!("weird scuwab error: {scwuab:?}");
break None;
}
},
Err(e)=>{
println!("sadly error: {e}");
break None;
},
}
};
if let Some(data)=asset_result{
tokio::fs::write(path,data).await?;
}
Ok(())
})
.buffer_unordered(DOWNLOAD_LIMIT)
//there's gotta be a better way to just make it run to completion
.for_each(|_|async{}).await;
Ok(())
}