diff --git a/validation/src/main.rs b/validation/src/main.rs index e504733..fec0420 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -1,8 +1,10 @@ -mod types; +use futures::StreamExt; + mod nats_types; mod validator; mod publish_new; mod publish_fix; +mod message_handler; #[allow(dead_code)] #[derive(Debug)] @@ -10,7 +12,8 @@ pub enum StartupError{ API(api::ReqwestError), NatsConnect(async_nats::ConnectError), NatsGetStream(async_nats::jetstream::context::GetStreamError), - NatsStartup(types::NatsStartupError), + NatsConsumer(async_nats::jetstream::stream::ConsumerError), + NatsStream(async_nats::jetstream::consumer::StreamError), GRPCConnect(tonic::transport::Error), } impl std::fmt::Display for StartupError{ @@ -20,6 +23,9 @@ impl std::fmt::Display for StartupError{ } impl std::error::Error for StartupError{} +// annoying mile-long type +pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient; + pub const GROUP_STRAFESNET:u64=6980477; #[tokio::main] @@ -35,27 +41,35 @@ async fn main()->Result<(),StartupError>{ // nats let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required"); let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; // use nats jetstream - let stream=async_nats::jetstream::new(nasty) - .get_stream("maptest").await.map_err(StartupError::NatsGetStream)?; + let mut messages=async_nats::jetstream::new(nasty) + .get_stream("maptest").await.map_err(StartupError::NatsGetStream)? + .get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{ + name:Some("validation".to_owned()), + durable_name:Some("validation".to_owned()), + filter_subject:"maptest.submissions.>".to_owned(), + ..Default::default() + }).await.map_err(StartupError::NatsConsumer)? + .messages().await.map_err(StartupError::NatsStream)?; // data-service grpc for creating map entries let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required"); - let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; + let maps_grpc=crate::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; // Create a signal listener for SIGTERM let mut sig_term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).expect("Failed to create SIGTERM signal listener"); - // connect to nats - let (publish_new,publish_fix,validator)=tokio::try_join!( - publish_new::Publisher::new(stream.clone(),cookie_context.clone(),api.clone(),maps_grpc), - publish_fix::Publisher::new(stream.clone(),cookie_context.clone(),api.clone()), - validator::Validator::new(stream,cookie_context,api) - ).map_err(StartupError::NatsStartup)?; + // the guy + let message_handler=message_handler::MessageHandler::new(cookie_context,api,maps_grpc); - // nats consumer threads - tokio::spawn(publish_new.run()); - tokio::spawn(publish_fix.run()); - tokio::spawn(validator.run()); + // nats consumer thread + tokio::spawn(async move{ + while let Some(message_result)=messages.next().await{ + match message_handler.handle_message_result(message_result).await{ + Ok(())=>println!("[Validation] Success, hooray!"), + Err(e)=>println!("[Validation] There was an error, oopsie! {e}"), + } + } + }); sig_term.recv().await; diff --git a/validation/src/message_handler.rs b/validation/src/message_handler.rs new file mode 100644 index 0000000..514d3b8 --- /dev/null +++ b/validation/src/message_handler.rs @@ -0,0 +1,48 @@ +#[allow(dead_code)] +#[derive(Debug)] +pub enum HandleMessageError{ + Messages(async_nats::jetstream::consumer::pull::MessagesError), + DoubleAck(async_nats::Error), + UnknownSubject(String), + PublishNew(crate::publish_new::PublishError), + PublishFix(crate::publish_fix::PublishError), + Validation(crate::validator::ValidateError), +} +impl std::fmt::Display for HandleMessageError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for HandleMessageError{} + +pub type MessageResult=Result; + +pub struct MessageHandler{ + publish_new:crate::publish_new::Publisher, + publish_fix:crate::publish_fix::Publisher, + validator:crate::validator::Validator, +} + +impl MessageHandler{ + pub fn new( + cookie_context:rbx_asset::cookie::CookieContext, + api:api::Context, + maps_grpc:crate::MapsServiceClient, + )->Self{ + Self{ + publish_new:crate::publish_new::Publisher::new(cookie_context.clone(),api.clone(),maps_grpc), + publish_fix:crate::publish_fix::Publisher::new(cookie_context.clone(),api.clone()), + validator:crate::validator::Validator::new(cookie_context,api), + } + } + pub async fn handle_message_result(&self,message_result:MessageResult)->Result<(),HandleMessageError>{ + let message=message_result.map_err(HandleMessageError::Messages)?; + message.double_ack().await.map_err(HandleMessageError::DoubleAck)?; + match message.subject.as_str(){ + "maptest.submissions.publish.new"=>self.publish_new.publish(message).await.map_err(HandleMessageError::PublishNew), + "maptest.submissions.publish.fix"=>self.publish_fix.publish(message).await.map_err(HandleMessageError::PublishFix), + "maptest.submissions.validate"=>self.validator.validate(message).await.map_err(HandleMessageError::Validation), + other=>Err(HandleMessageError::UnknownSubject(other.to_owned())) + } + } +} diff --git a/validation/src/publish_fix.rs b/validation/src/publish_fix.rs index a6dca00..586021a 100644 --- a/validation/src/publish_fix.rs +++ b/validation/src/publish_fix.rs @@ -1,12 +1,8 @@ -use futures::StreamExt; -use crate::types::{MessageResult,NatsStartupError}; use crate::nats_types::PublishFixRequest; #[allow(dead_code)] #[derive(Debug)] -enum PublishError{ - Messages(async_nats::jetstream::consumer::pull::MessagesError), - DoubleAck(async_nats::Error), +pub enum PublishError{ Get(rbx_asset::cookie::GetError), Json(serde_json::Error), Upload(rbx_asset::cookie::UploadError), @@ -20,43 +16,21 @@ impl std::fmt::Display for PublishError{ impl std::error::Error for PublishError{} pub struct Publisher{ - messages:async_nats::jetstream::consumer::pull::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, } impl Publisher{ - pub async fn new( - stream:async_nats::jetstream::stream::Stream, + pub const fn new( roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - )->Result{ - Ok(Self{ - messages:stream.get_or_create_consumer("publish_fix",async_nats::jetstream::consumer::pull::Config{ - name:Some("publish_fix".to_owned()), - durable_name:Some("publish_fix".to_owned()), - filter_subject:"maptest.submissions.publish.fix".to_owned(), - ..Default::default() - }).await.map_err(NatsStartupError::Consumer)? - .messages().await.map_err(NatsStartupError::Stream)?, + )->Self{ + Self{ roblox_cookie, api, - }) - } - pub async fn run(mut self){ - while let Some(message_result)=self.messages.next().await{ - self.publish_supress_error(message_result).await } } - async fn publish_supress_error(&self,message_result:MessageResult){ - match self.publish(message_result).await{ - Ok(())=>println!("[PublishFix] Published, hooray!"), - Err(e)=>println!("[PublishFix] There was an error, oopsie! {e}"), - } - } - async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{ - println!("publish_fix {:?}",message_result); - let message=message_result.map_err(PublishError::Messages)?; - message.double_ack().await.map_err(PublishError::DoubleAck)?; + pub async fn publish(&self,message:async_nats::jetstream::Message)->Result<(),PublishError>{ + println!("publish_fix {:?}",message); // decode json let publish_info:PublishFixRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?; diff --git a/validation/src/publish_new.rs b/validation/src/publish_new.rs index 48ab774..4da6e1c 100644 --- a/validation/src/publish_new.rs +++ b/validation/src/publish_new.rs @@ -1,12 +1,8 @@ -use futures::StreamExt; -use crate::types::{MessageResult,NatsStartupError}; use crate::nats_types::PublishNewRequest; #[allow(dead_code)] #[derive(Debug)] -enum PublishError{ - Messages(async_nats::jetstream::consumer::pull::MessagesError), - DoubleAck(async_nats::Error), +pub enum PublishError{ Get(rbx_asset::cookie::GetError), Json(serde_json::Error), Create(rbx_asset::cookie::CreateError), @@ -22,46 +18,24 @@ impl std::fmt::Display for PublishError{ impl std::error::Error for PublishError{} pub struct Publisher{ - messages:async_nats::jetstream::consumer::pull::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - maps_grpc:crate::types::MapsServiceClient, + maps_grpc:crate::MapsServiceClient, } impl Publisher{ - pub async fn new( - stream:async_nats::jetstream::stream::Stream, + pub const fn new( roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - maps_grpc:crate::types::MapsServiceClient, - )->Result{ - Ok(Self{ - messages:stream.get_or_create_consumer("publish_new",async_nats::jetstream::consumer::pull::Config{ - name:Some("publish_new".to_owned()), - durable_name:Some("publish_new".to_owned()), - filter_subject:"maptest.submissions.publish.new".to_owned(), - ..Default::default() - }).await.map_err(NatsStartupError::Consumer)? - .messages().await.map_err(NatsStartupError::Stream)?, + maps_grpc:crate::MapsServiceClient, + )->Self{ + Self{ roblox_cookie, api, maps_grpc, - }) - } - pub async fn run(mut self){ - while let Some(message_result)=self.messages.next().await{ - self.publish_supress_error(message_result).await } } - async fn publish_supress_error(&self,message_result:MessageResult){ - match self.publish(message_result).await{ - Ok(())=>println!("[PublishNew] Published, hooray!"), - Err(e)=>println!("[PublishNew] There was an error, oopsie! {e}"), - } - } - async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{ - println!("publish_new {:?}",message_result); - let message=message_result.map_err(PublishError::Messages)?; - message.double_ack().await.map_err(PublishError::DoubleAck)?; + pub async fn publish(&self,message:async_nats::jetstream::Message)->Result<(),PublishError>{ + println!("publish_new {:?}",message); // decode json let publish_info:PublishNewRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?; diff --git a/validation/src/types.rs b/validation/src/types.rs deleted file mode 100644 index e41f9b4..0000000 --- a/validation/src/types.rs +++ /dev/null @@ -1,16 +0,0 @@ - -// annoying mile-long types -pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient; -pub type MessageResult=Result; - -#[derive(Debug)] -pub enum NatsStartupError{ - Consumer(async_nats::jetstream::stream::ConsumerError), - Stream(async_nats::jetstream::consumer::StreamError), -} -impl std::fmt::Display for NatsStartupError{ - fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ - write!(f,"{self:?}") - } -} -impl std::error::Error for NatsStartupError{} diff --git a/validation/src/validator.rs b/validation/src/validator.rs index e7e6905..61c277a 100644 --- a/validation/src/validator.rs +++ b/validation/src/validator.rs @@ -1,5 +1,5 @@ -use futures::{StreamExt,TryStreamExt}; -use crate::types::{MessageResult,NatsStartupError}; +use futures::TryStreamExt; + use crate::nats_types::ValidateRequest; const SCRIPT_CONCURRENCY:usize=16; @@ -13,11 +13,9 @@ enum Policy{ #[allow(dead_code)] #[derive(Debug)] -enum ValidateError{ +pub enum ValidateError{ Blocked, NotAllowed, - Messages(async_nats::jetstream::consumer::pull::MessagesError), - DoubleAck(async_nats::Error), Get(rbx_asset::cookie::GetError), Json(serde_json::Error), ReadDom(ReadDomError), @@ -37,45 +35,22 @@ impl std::fmt::Display for ValidateError{ impl std::error::Error for ValidateError{} pub struct Validator{ - messages:async_nats::jetstream::consumer::pull::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, } impl Validator{ - pub async fn new( - stream:async_nats::jetstream::stream::Stream, + pub const fn new( roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - )->Result{ - Ok(Self{ - messages:stream.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{ - name:Some("validation".to_owned()), - durable_name:Some("validation".to_owned()), - ack_policy:async_nats::jetstream::consumer::AckPolicy::Explicit, - filter_subject:"maptest.submissions.validate".to_owned(), - ..Default::default() - }).await.map_err(NatsStartupError::Consumer)? - .messages().await.map_err(NatsStartupError::Stream)?, + )->Self{ + Self{ roblox_cookie, api, - }) - } - pub async fn run(mut self){ - while let Some(message_result)=self.messages.next().await{ - self.validate_supress_error(message_result).await } } - async fn validate_supress_error(&self,message_result:MessageResult){ - match self.validate(message_result).await{ - Ok(())=>println!("[Validation] Validated, hooray!"), - Err(e)=>println!("[Validation] There was an error, oopsie! {e}"), - } - } - async fn validate(&self,message_result:MessageResult)->Result<(),ValidateError>{ - println!("validate {:?}",message_result); - let message=message_result.map_err(ValidateError::Messages)?; - message.double_ack().await.map_err(ValidateError::DoubleAck)?; + pub async fn validate(&self,message:async_nats::jetstream::Message)->Result<(),ValidateError>{ + println!("validate {:?}",message); // decode json let validate_info:ValidateRequest=serde_json::from_slice(&message.payload).map_err(ValidateError::Json)?; @@ -205,7 +180,7 @@ impl Validator{ #[allow(dead_code)] #[derive(Debug)] -enum ReadDomError{ +pub enum ReadDomError{ Binary(rbx_binary::DecodeError), Xml(rbx_xml::DecodeError), Read(std::io::Error),