diff --git a/validation/src/validator.rs b/validation/src/validator.rs index 08414df..258c853 100644 --- a/validation/src/validator.rs +++ b/validation/src/validator.rs @@ -1,6 +1,8 @@ -use futures::StreamExt; +use futures::{StreamExt,TryStreamExt}; use crate::nats_types::ValidateRequest; +const SCRIPT_CONCURRENCY:usize=16; + struct ModelVersion{ model_id:u64, model_version:u64, @@ -99,14 +101,21 @@ impl Validator{ } } } + // send all scripts to REST endpoint and receive the replacements - for (source,replacement) in &mut script_map{ + futures::stream::iter(script_map.iter_mut().map(Ok)) + .try_for_each_concurrent(Some(SCRIPT_CONCURRENCY),|(source,replacement)|async{ + // get the hash let mut hasher=siphasher::sip::SipHasher::new(); std::hash::Hasher::write(&mut hasher,source.as_bytes()); let hash=std::hash::Hasher::finish(&hasher); + + // fetch the script policy let script_policy=self.api.get_script_policy_from_hash(api::ScriptPolicyHashRequest{ hash:format!("{:x}",hash), }).await.map_err(ValidateError::ApiGetScriptPolicy)?; + + // write the policy to the script_map, fetching the replacement code if necessary *replacement=match script_policy.Policy{ api::Policy::Allowed=>Policy::Allowed, api::Policy::Blocked=>Policy::Blocked, @@ -118,7 +127,10 @@ impl Validator{ Policy::Replace(script.Source) }, }; - } + + Ok(()) + }) + .await?; // make the replacements let mut modified=false;