fat concurrent script fetching

This commit is contained in:
Quaternions 2024-12-06 18:57:26 -08:00
parent 0977943834
commit 56fefbe52d

View File

@ -1,6 +1,8 @@
use futures::StreamExt; use futures::{StreamExt,TryStreamExt};
use crate::nats_types::ValidateRequest; use crate::nats_types::ValidateRequest;
const SCRIPT_CONCURRENCY:usize=16;
struct ModelVersion{ struct ModelVersion{
model_id:u64, model_id:u64,
model_version:u64, model_version:u64,
@ -99,14 +101,21 @@ impl Validator{
} }
} }
} }
// send all scripts to REST endpoint and receive the replacements // send all scripts to REST endpoint and receive the replacements
for (source,replacement) in &mut script_map{ futures::stream::iter(script_map.iter_mut().map(Ok))
.try_for_each_concurrent(Some(SCRIPT_CONCURRENCY),|(source,replacement)|async{
// get the hash
let mut hasher=siphasher::sip::SipHasher::new(); let mut hasher=siphasher::sip::SipHasher::new();
std::hash::Hasher::write(&mut hasher,source.as_bytes()); std::hash::Hasher::write(&mut hasher,source.as_bytes());
let hash=std::hash::Hasher::finish(&hasher); let hash=std::hash::Hasher::finish(&hasher);
// fetch the script policy
let script_policy=self.api.get_script_policy_from_hash(api::ScriptPolicyHashRequest{ let script_policy=self.api.get_script_policy_from_hash(api::ScriptPolicyHashRequest{
hash:format!("{:x}",hash), hash:format!("{:x}",hash),
}).await.map_err(ValidateError::ApiGetScriptPolicy)?; }).await.map_err(ValidateError::ApiGetScriptPolicy)?;
// write the policy to the script_map, fetching the replacement code if necessary
*replacement=match script_policy.Policy{ *replacement=match script_policy.Policy{
api::Policy::Allowed=>Policy::Allowed, api::Policy::Allowed=>Policy::Allowed,
api::Policy::Blocked=>Policy::Blocked, api::Policy::Blocked=>Policy::Blocked,
@ -118,7 +127,10 @@ impl Validator{
Policy::Replace(script.Source) Policy::Replace(script.Source)
}, },
}; };
}
Ok(())
})
.await?;
// make the replacements // make the replacements
let mut modified=false; let mut modified=false;