126 lines
4.2 KiB
Rust
126 lines
4.2 KiB
Rust
use futures::StreamExt;
|
|
|
|
mod rbx_util;
|
|
mod message_handler;
|
|
mod nats_types;
|
|
mod types;
|
|
mod download;
|
|
mod check;
|
|
mod check_mapfix;
|
|
mod check_submission;
|
|
mod create;
|
|
mod create_mapfix;
|
|
mod create_submission;
|
|
mod upload_mapfix;
|
|
mod upload_submission;
|
|
mod validator;
|
|
mod validate_mapfix;
|
|
mod validate_submission;
|
|
|
|
#[allow(dead_code)]
|
|
#[derive(Debug)]
|
|
pub enum StartupError{
|
|
API(submissions_api::ReqwestError),
|
|
NatsConnect(async_nats::ConnectError),
|
|
NatsGetStream(async_nats::jetstream::context::GetStreamError),
|
|
NatsConsumer(async_nats::jetstream::stream::ConsumerError),
|
|
NatsConsumerUpdate(async_nats::jetstream::stream::ConsumerUpdateError),
|
|
NatsStream(async_nats::jetstream::consumer::StreamError),
|
|
}
|
|
impl std::fmt::Display for StartupError{
|
|
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
|
|
write!(f,"{self:?}")
|
|
}
|
|
}
|
|
impl std::error::Error for StartupError{}
|
|
|
|
pub const PARALLEL_REQUESTS:usize=16;
|
|
|
|
#[tokio::main]
|
|
async fn main()->Result<(),StartupError>{
|
|
let group_id:Option<u64>=match std::env::var("ROBLOX_GROUP_ID"){
|
|
Ok(s)=>match s.as_str(){
|
|
"None"=>None,
|
|
_=>Some(s.parse().expect("ROBLOX_GROUP_ID int parse")),
|
|
},
|
|
Err(e)=>Err(e).expect("ROBLOX_GROUP_ID env required"),
|
|
};
|
|
|
|
// create / upload models through STRAFESNET_CI2 account
|
|
let cookie=std::env::var("RBXCOOKIE").expect("RBXCOOKIE env required");
|
|
let cookie_context=rbx_asset::cookie::Context::new(rbx_asset::cookie::Cookie::new(cookie));
|
|
// download models through cloud api
|
|
let api_key=std::env::var("RBX_API_KEY").expect("RBX_API_KEY env required");
|
|
let cloud_context=rbx_asset::cloud::Context::new(rbx_asset::cloud::ApiKey::new(api_key));
|
|
|
|
// maps-service api
|
|
let api_host_internal=std::env::var("API_HOST_INTERNAL").expect("API_HOST_INTERNAL env required");
|
|
let api=submissions_api::internal::Context::new(api_host_internal).map_err(StartupError::API)?;
|
|
|
|
// nats
|
|
let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required");
|
|
let nats_fut=async{
|
|
const STREAM_NAME:&str="maptest";
|
|
const DURABLE_NAME:&str="validation";
|
|
const FILTER_SUBJECT:&str="maptest.>";
|
|
|
|
let nats_config=async_nats::jetstream::consumer::pull::Config{
|
|
name:Some(DURABLE_NAME.to_owned()),
|
|
durable_name:Some(DURABLE_NAME.to_owned()),
|
|
filter_subject:FILTER_SUBJECT.to_owned(),
|
|
..Default::default()
|
|
};
|
|
|
|
let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?;
|
|
|
|
// use nats jetstream
|
|
let stream=async_nats::jetstream::new(nasty)
|
|
.get_stream(STREAM_NAME).await.map_err(StartupError::NatsGetStream)?;
|
|
|
|
let consumer=stream.get_or_create_consumer(DURABLE_NAME,nats_config.clone()).await.map_err(StartupError::NatsConsumer)?;
|
|
|
|
// check if config matches expected config
|
|
if consumer.cached_info().config.filter_subject!=FILTER_SUBJECT{
|
|
stream.update_consumer(nats_config).await.map_err(StartupError::NatsConsumerUpdate)?;
|
|
}
|
|
|
|
// only need messages
|
|
consumer.messages().await.map_err(StartupError::NatsStream)
|
|
};
|
|
|
|
let message_handler=message_handler::MessageHandler::new(cloud_context,cookie_context,group_id,api);
|
|
|
|
// Create a signal listener for SIGTERM
|
|
let mut sig_term=tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).expect("Failed to create SIGTERM signal listener");
|
|
|
|
// run futures
|
|
let mut messages=nats_fut.await?;
|
|
|
|
// process up to PARALLEL_REQUESTS in parallel
|
|
let main_loop=async move{
|
|
static SEM:tokio::sync::Semaphore=tokio::sync::Semaphore::const_new(PARALLEL_REQUESTS);
|
|
// use memory leak to make static lifetime
|
|
let message_handler=Box::leak(Box::new(message_handler));
|
|
// acquire a permit before attempting to receive a message, exit if either fails
|
|
while let (Ok(permit),Some(message_result))=(SEM.acquire().await,messages.next().await){
|
|
// handle the message on a new thread (mainly to decode the model file)
|
|
tokio::spawn(async{
|
|
match message_handler.handle_message_result(message_result).await{
|
|
Ok(())=>println!("[Validation] Success, hooray!"),
|
|
Err(e)=>println!("[Validation] There was an error, oopsie! {e}"),
|
|
}
|
|
// explicitly call drop to make the move semantics and permit release more obvious
|
|
drop(permit);
|
|
});
|
|
}
|
|
};
|
|
|
|
// race sigkill and main loop termination and then die
|
|
tokio::select!{
|
|
_=sig_term.recv()=>(),
|
|
_=main_loop=>(),
|
|
};
|
|
|
|
Ok(())
|
|
}
|