Compare commits

..

8 Commits

Author SHA1 Message Date
f9fb1fb23c inline very thin function 2024-12-30 00:40:36 -08:00
4116eaf829 comment code 2024-12-28 21:10:10 -08:00
c4508480c1 reuse hash source function 2024-12-26 19:59:27 -08:00
a6b8b326f1 tidy id from filename code 2024-12-26 19:44:44 -08:00
3eb39f2c6c publish api 2024-12-26 19:44:44 -08:00
af2cf4b7a8 fix api 2024-12-26 19:44:44 -08:00
bc11f918aa uniformity 2024-12-26 19:44:44 -08:00
a16e8faf8b upload scripts 2024-12-26 19:44:44 -08:00
3 changed files with 193 additions and 95 deletions

2
Cargo.lock generated
View File

@ -1144,6 +1144,8 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]] [[package]]
name = "submissions-api" name = "submissions-api"
version = "0.3.0" version = "0.3.0"
source = "sparse+https://git.itzana.me/api/packages/strafesnet/cargo/"
checksum = "72e67dbf479fc6a5e22514208d533534a2e0543eab7c6c1b8860ee3f6f0c6290"
dependencies = [ dependencies = [
"reqwest", "reqwest",
"serde", "serde",

View File

@ -1,26 +0,0 @@
use clap::{Args,Parser,Subcommand};
#[derive(Parser)]
#[command(author,version,about,long_about=None)]
#[command(propagate_version=true)]
pub struct Cli{
#[command(subcommand)]
command:Commands,
}
#[derive(Subcommand)]
pub enum Commands{
Review(ReviewCommand),
UploadScripts(UploadScriptsCommand),
}
#[derive(Args)]
struct ReviewCommand{
#[arg(long)]
cookie:String,
}
#[derive(Args)]
struct UploadScriptsCommand{
#[arg(long)]
session_id:PathBuf,
}

View File

@ -1,16 +1,50 @@
mod cmd; use clap::{Args,Parser,Subcommand};
use cmd::{Cli,Commands};
use futures::{StreamExt,TryStreamExt}; use futures::{StreamExt,TryStreamExt};
const READ_CONCURRENCY:usize=16;
const REMOTE_CONCURRENCY:usize=16;
#[derive(Parser)]
#[command(author,version,about,long_about=None)]
#[command(propagate_version=true)]
struct Cli{
#[command(subcommand)]
command:Commands,
}
#[derive(Subcommand)]
enum Commands{
Review(ReviewCommand),
UploadScripts(UploadScriptsCommand),
}
#[derive(Args)]
struct ReviewCommand{
#[arg(long)]
session_id:String,
#[arg(long)]
api_url:String,
}
#[derive(Args)]
struct UploadScriptsCommand{
#[arg(long)]
session_id:String,
#[arg(long)]
api_url:String,
}
#[tokio::main] #[tokio::main]
async fn main(){ async fn main(){
let cli=Cli::parse(); let cli=Cli::parse();
match cli.command{ match cli.command{
Commands::Review(command)=>review(ReviewConfig{ Commands::Review(command)=>review(ReviewConfig{
cookie:command.cookie, session_id:command.session_id,
api_url:command.api_url,
}).await.unwrap(),
Commands::UploadScripts(command)=>upload_scripts(UploadConfig{
session_id:command.session_id,
api_url:command.api_url,
}).await.unwrap(), }).await.unwrap(),
Commands::UploadScripts(command)=>upload_scripts(command.session_id).await.unwrap(),
} }
} }
@ -51,14 +85,15 @@ enum ReviewError{
} }
struct ReviewConfig{ struct ReviewConfig{
cookie:String, session_id:String,
api_url:String,
} }
async fn review(config:ReviewConfig)->Result<(),ReviewError>{ async fn review(config:ReviewConfig)->Result<(),ReviewError>{
// download unreviewed policies // download unreviewed policies
// review them // review them
let cookie=submissions_api::Cookie::new(&config.cookie).map_err(ReviewError::Cookie)?; let cookie=submissions_api::Cookie::new(&config.session_id).map_err(ReviewError::Cookie)?;
let api=submissions_api::external::Context::new("http://localhost:8083".to_owned(),cookie).map_err(ReviewError::Reqwest)?; let api=submissions_api::external::Context::new(config.api_url,cookie).map_err(ReviewError::Reqwest)?;
let unreviewed_policies=api.get_script_policies(submissions_api::types::GetScriptPoliciesRequest{ let unreviewed_policies=api.get_script_policies(submissions_api::types::GetScriptPoliciesRequest{
Page:1, Page:1,
@ -104,9 +139,7 @@ async fn review(config:ReviewConfig)->Result<(),ReviewError>{
submissions_api::types::Policy::Allowed submissions_api::types::Policy::Allowed
}else{ }else{
// compute hash // compute hash
let mut hasher=siphasher::sip::SipHasher::new(); let hash=hash_source(source.as_str());
std::hash::Hasher::write(&mut hasher,source.as_bytes());
let hash=std::hash::Hasher::finish(&hasher);
// check if modified script already exists // check if modified script already exists
let maybe_script_response=api.get_script_from_hash(submissions_api::types::HashRequest{ let maybe_script_response=api.get_script_from_hash(submissions_api::types::HashRequest{
@ -134,7 +167,7 @@ async fn review(config:ReviewConfig)->Result<(),ReviewError>{
// update policy // update policy
api.update_script_policy(submissions_api::types::UpdateScriptPolicyRequest{ api.update_script_policy(submissions_api::types::UpdateScriptPolicyRequest{
ScriptPolicyID:unreviewed_policy.ID, ID:unreviewed_policy.ID,
FromScriptID:None, FromScriptID:None,
ToScriptID:to_script_id, ToScriptID:to_script_id,
Policy:Some(reviewed_policy), Policy:Some(reviewed_policy),
@ -144,6 +177,87 @@ async fn review(config:ReviewConfig)->Result<(),ReviewError>{
Ok(()) Ok(())
} }
#[allow(dead_code)]
#[derive(Debug)]
enum ScriptUploadError{
Cookie(submissions_api::CookieError),
Reqwest(submissions_api::ReqwestError),
AllowedSet(std::io::Error),
AllowedMap(GetMapError),
ReplaceMap(GetMapError),
BlockedSet(std::io::Error),
GOC(GOCError),
GOCPolicyReplace(GOCError),
GOCPolicyAllowed(GOCError),
GOCPolicyBlocked(GOCError),
}
fn read_dir_stream(dir:tokio::fs::ReadDir)->impl futures::stream::Stream<Item=std::io::Result<tokio::fs::DirEntry>>{
futures::stream::unfold(dir,|mut dir|async{
match dir.next_entry().await{
Ok(Some(entry))=>Some((Ok(entry),dir)),
Ok(None)=>None, // End of directory
Err(e)=>Some((Err(e),dir)), // Error encountered
}
})
}
async fn get_set_from_file(path:impl AsRef<std::path::Path>)->std::io::Result<std::collections::HashSet<String>>{
read_dir_stream(tokio::fs::read_dir(path).await?)
.map(|dir_entry|async{
tokio::fs::read_to_string(dir_entry?.path()).await
})
.buffer_unordered(READ_CONCURRENCY)
.try_collect().await
}
async fn get_allowed_set()->std::io::Result<std::collections::HashSet<String>>{
get_set_from_file("scripts/allowed").await
}
async fn get_blocked_set()->std::io::Result<std::collections::HashSet<String>>{
get_set_from_file("scripts/blocked").await
}
#[allow(dead_code)]
#[derive(Debug)]
enum GetMapError{
IO(std::io::Error),
FileStem,
ToStr,
ParseInt(std::num::ParseIntError),
}
async fn get_allowed_map()->Result<std::collections::HashMap::<u32,String>,GetMapError>{
read_dir_stream(tokio::fs::read_dir("scripts/allowed").await.map_err(GetMapError::IO)?)
.map(|dir_entry|async{
let path=dir_entry.map_err(GetMapError::IO)?.path();
let id:u32=path
.file_stem().ok_or(GetMapError::FileStem)?
.to_str().ok_or(GetMapError::ToStr)?
.parse().map_err(GetMapError::ParseInt)?;
let source=tokio::fs::read_to_string(path).await.map_err(GetMapError::IO)?;
Ok((id,source))
})
.buffer_unordered(READ_CONCURRENCY)
.try_collect().await
}
async fn get_replace_map()->Result<std::collections::HashMap::<String,u32>,GetMapError>{
read_dir_stream(tokio::fs::read_dir("scripts/replace").await.map_err(GetMapError::IO)?)
.map(|dir_entry|async{
let path=dir_entry.map_err(GetMapError::IO)?.path();
let id:u32=path
.file_stem().ok_or(GetMapError::FileStem)?
.to_str().ok_or(GetMapError::ToStr)?
.parse().map_err(GetMapError::ParseInt)?;
let source=tokio::fs::read_to_string(path).await.map_err(GetMapError::IO)?;
Ok((source,id))
})
.buffer_unordered(READ_CONCURRENCY)
.try_collect().await
}
fn hash_source(source:&str)->u64{ fn hash_source(source:&str)->u64{
let mut hasher=siphasher::sip::SipHasher::new(); let mut hasher=siphasher::sip::SipHasher::new();
std::hash::Hasher::write(&mut hasher,source.as_bytes()); std::hash::Hasher::write(&mut hasher,source.as_bytes());
@ -197,41 +311,40 @@ async fn check_or_create_script_poicy(
Ok(()) Ok(())
} }
async fn do_policy( struct UploadConfig{
api:&submissions_api::external::Context, session_id:String,
script_ids:&std::collections::HashMap<&str,submissions_api::types::ScriptID>, api_url:String,
source:&str,
to_script_id:submissions_api::types::ScriptID,
policy:submissions_api::types::Policy,
)->Result<(),GOCError>{
let hash=hash_format(hash_source(source));
check_or_create_script_poicy(api,hash.as_str(),submissions_api::types::CreateScriptPolicyRequest{
FromScriptID:script_ids[source],
ToScriptID:to_script_id,
Policy:policy,
}).await
} }
async fn upload_scripts(session_id:PathBuf)->Result<()>{ async fn upload_scripts(config:UploadConfig)->Result<(),ScriptUploadError>{
let cookie={ let cookie=submissions_api::Cookie::new(&config.session_id).map_err(ScriptUploadError::Cookie)?;
let mut cookie=String::new(); let api=&submissions_api::external::Context::new(config.api_url,cookie).map_err(ScriptUploadError::Reqwest)?;
std::fs::File::open(session_id)?.read_to_string(&mut cookie)?;
submissions_api::Cookie::new(&cookie)?
};
let api=&submissions_api::external::Context::new("http://localhost:8083".to_owned(),cookie)?;
let allowed_set=get_allowed_set()?; // load all script files
let allowed_map=get_allowed_map()?; let (
let replace_map=get_replace_map()?; allowed_set_result,
let blocked=get_blocked()?; allowed_map_result,
replace_map_result,
blocked_set_result,
)=tokio::join!(
get_allowed_set(),
get_allowed_map(),
get_replace_map(),
get_blocked_set(),
);
let allowed_set=allowed_set_result.map_err(ScriptUploadError::AllowedSet)?;
let allowed_map=allowed_map_result.map_err(ScriptUploadError::AllowedMap)?;
let replace_map=replace_map_result.map_err(ScriptUploadError::ReplaceMap)?;
let blocked_set=blocked_set_result.map_err(ScriptUploadError::BlockedSet)?;
// create a unified deduplicated set of all scripts // create a unified deduplicated set of all scripts
let script_set:std::collections::HashSet<&str>=allowed_set.iter() let script_set:std::collections::HashSet<&str>=allowed_set.iter()
.map(|s|s.as_str()) .map(String::as_str)
.chain( .chain(
replace_map.keys().map(|s|s.as_str()) replace_map.keys().map(String::as_str)
).chain( ).chain(
blocked.iter().map(|s|s.as_str()) blocked_set.iter().map(String::as_str)
).collect(); ).collect();
// get or create every unique script // get or create every unique script
@ -239,48 +352,57 @@ async fn upload_scripts(session_id:PathBuf)->Result<()>{
futures::stream::iter(script_set) futures::stream::iter(script_set)
.map(|source|async move{ .map(|source|async move{
let script_id=get_or_create_script(api,source).await?; let script_id=get_or_create_script(api,source).await?;
Ok::<_,GOCError>((source,script_id)) Ok((source,script_id))
}) })
.buffer_unordered(16) .buffer_unordered(REMOTE_CONCURRENCY)
.try_collect().await?; .try_collect().await.map_err(ScriptUploadError::GOC)?;
// get or create policy for each script in each category // get or create policy for each script in each category
// //
// replace // replace
futures::stream::iter(replace_map.iter().map(Ok)) let replace_fut=futures::stream::iter(replace_map.iter().map(Ok))
.try_for_each_concurrent(Some(16),|(source,id)|async{ .try_for_each_concurrent(Some(REMOTE_CONCURRENCY),|(source,id)|async{
do_policy( check_or_create_script_poicy(
api, api,
&script_ids, hash_format(hash_source(source)).as_str(),
source, submissions_api::types::CreateScriptPolicyRequest{
script_ids[allowed_map[id].as_str()], FromScriptID:script_ids[source.as_str()],
submissions_api::types::Policy::Replace ToScriptID:script_ids[allowed_map[id].as_str()],
).await Policy:submissions_api::types::Policy::Replace,
}).await?; }
).await.map_err(ScriptUploadError::GOCPolicyReplace)
});
// allowed // allowed
futures::stream::iter(allowed_set.iter().map(Ok)) let allowed_fut=futures::stream::iter(allowed_set.iter().map(Ok))
.try_for_each_concurrent(Some(16),|source|async{ .try_for_each_concurrent(Some(REMOTE_CONCURRENCY),|source|async{
do_policy( check_or_create_script_poicy(
api, api,
&script_ids, hash_format(hash_source(source)).as_str(),
source, submissions_api::types::CreateScriptPolicyRequest{
script_ids[source.as_str()], FromScriptID:script_ids[source.as_str()],
submissions_api::types::Policy::Allowed ToScriptID:script_ids[source.as_str()],
).await Policy:submissions_api::types::Policy::Allowed,
}).await?; }
).await.map_err(ScriptUploadError::GOCPolicyAllowed)
});
// blocked // blocked
futures::stream::iter(blocked.iter().map(Ok)) let blocked_fut=futures::stream::iter(blocked_set.iter().map(Ok))
.try_for_each_concurrent(Some(16),|source|async{ .try_for_each_concurrent(Some(REMOTE_CONCURRENCY),|source|async{
do_policy( check_or_create_script_poicy(
api, api,
&script_ids, hash_format(hash_source(source)).as_str(),
source, submissions_api::types::CreateScriptPolicyRequest{
script_ids[source.as_str()], FromScriptID:script_ids[source.as_str()],
submissions_api::types::Policy::Blocked ToScriptID:script_ids[source.as_str()],
).await Policy:submissions_api::types::Policy::Blocked,
}).await?; }
).await.map_err(ScriptUploadError::GOCPolicyBlocked)
});
// run futures
tokio::try_join!(replace_fut,allowed_fut,blocked_fut)?;
Ok(()) Ok(())
} }