limit concurrent downloads using JoinSet

This commit is contained in:
Quaternions 2024-01-12 17:07:06 -08:00
parent 1b3a8be142
commit 0d0f1b1792

View File

@ -362,10 +362,12 @@ async fn download_history(config:DownloadHistoryConfig)->AResult<()>{
let asset_id_string=config.asset_id.to_string(); let asset_id_string=config.asset_id.to_string();
//limit concurrent downloads
let mut join_set=tokio::task::JoinSet::new();
//poll paged list of all asset versions //poll paged list of all asset versions
let mut cursor:Option<String>=None; let mut cursor:Option<String>=None;
let mut asset_list=Vec::new(); let mut asset_list=Vec::new();
let mut join_handles=Vec::new();
loop{ loop{
let mut page=download_page(&client,config.cookie.as_str(),config.asset_id,cursor).await?; let mut page=download_page(&client,config.cookie.as_str(),config.asset_id,cursor).await?;
let mut cancel_paging=false; let mut cancel_paging=false;
@ -375,11 +377,14 @@ async fn download_history(config:DownloadHistoryConfig)->AResult<()>{
cancel_paging=true; cancel_paging=true;
continue;//don't trust roblox returned order continue;//don't trust roblox returned order
} }
while CONCURRENT_REQUESTS<=join_set.len(){
join_set.join_next().await.unwrap()??;
}
let client=client.clone(); let client=client.clone();
let cookie=config.cookie.clone(); let cookie=config.cookie.clone();
let asset_id_str=asset_id_string.clone(); let asset_id_str=asset_id_string.clone();
let output_folder=config.output_folder.clone(); let output_folder=config.output_folder.clone();
join_handles.push(tokio::spawn(async move{ join_set.spawn(async move{
let resp=download_asset_version(&client,cookie.as_str(),asset_id_str.as_str(),version_number.to_string().as_str()).await?; let resp=download_asset_version(&client,cookie.as_str(),asset_id_str.as_str(),version_number.to_string().as_str()).await?;
let contents=match maybe_gzip_decode(std::io::Cursor::new(resp.bytes().await?))?{ let contents=match maybe_gzip_decode(std::io::Cursor::new(resp.bytes().await?))?{
ReaderType::GZip(readable)=>read_readable(readable)?, ReaderType::GZip(readable)=>read_readable(readable)?,
@ -392,7 +397,7 @@ async fn download_history(config:DownloadHistoryConfig)->AResult<()>{
tokio::fs::write(path,contents).await?; tokio::fs::write(path,contents).await?;
Ok::<_,anyhow::Error>(()) Ok::<_,anyhow::Error>(())
})); });
} }
if page.nextPageCursor.is_none()||cancel_paging{ if page.nextPageCursor.is_none()||cancel_paging{
for asset_version in page.data.into_iter(){ for asset_version in page.data.into_iter(){
@ -413,8 +418,8 @@ async fn download_history(config:DownloadHistoryConfig)->AResult<()>{
path.set_file_name("versions.json"); path.set_file_name("versions.json");
tokio::fs::write(path,serde_json::to_string(&asset_list)?).await?; tokio::fs::write(path,serde_json::to_string(&asset_list)?).await?;
for join_handle in join_handles{ while let Some(result)=join_set.join_next().await{
join_handle.await??; result??;
} }
Ok(()) Ok(())