diff --git a/validation/src/main.rs b/validation/src/main.rs index ac53b8c..bf3461f 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -1,3 +1,4 @@ +mod types; mod nats_types; mod validator; mod publish_new; @@ -5,10 +6,10 @@ mod publish_fix; #[allow(dead_code)] #[derive(Debug)] -enum StartupError{ +pub enum StartupError{ API(api::ReqwestError), NatsConnect(async_nats::ConnectError), - NatsSubscribe(async_nats::SubscribeError), + NatsStartup(types::NatsStartupError), GRPCConnect(tonic::transport::Error), } impl std::fmt::Display for StartupError{ @@ -20,9 +21,6 @@ impl std::error::Error for StartupError{} pub const GROUP_STRAFESNET:u64=6980477; -// annoying mile-long type -pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient; - #[tokio::main] async fn main()->Result<(),StartupError>{ // talk to roblox through STRAFESNET_CI2 account @@ -39,16 +37,18 @@ async fn main()->Result<(),StartupError>{ // data-service grpc for creating map entries let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required"); - let maps_grpc=MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; + let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; + // use nats jetstream + let jetstream=async_nats::jetstream::new(nasty); // connect to nats let (publish_new,publish_fix,validator)=tokio::try_join!( - publish_new::Publisher::new(nasty.clone(),cookie_context.clone(),api.clone(),maps_grpc), - publish_fix::Publisher::new(nasty.clone(),cookie_context.clone(),api.clone()), + publish_new::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone(),maps_grpc), + publish_fix::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone()), // clone nats here because it's dropped within the function scope, // meanining the last reference is dropped... - validator::Validator::new(nasty.clone(),cookie_context,api) - ).map_err(StartupError::NatsSubscribe)?; + validator::Validator::new(jetstream.clone(),cookie_context,api) + ).map_err(StartupError::NatsStartup)?; // publisher threads tokio::spawn(publish_new.run()); diff --git a/validation/src/publish_fix.rs b/validation/src/publish_fix.rs index 5377115..d38a5ae 100644 --- a/validation/src/publish_fix.rs +++ b/validation/src/publish_fix.rs @@ -1,10 +1,12 @@ use futures::StreamExt; - +use crate::types::{MessageResult,NatsStartupError}; use crate::nats_types::PublishFixRequest; #[allow(dead_code)] #[derive(Debug)] enum PublishError{ + Messages(async_nats::jetstream::consumer::pull::MessagesError), + DoubleAck(async_nats::Error), Get(rbx_asset::cookie::GetError), Json(serde_json::Error), Upload(rbx_asset::cookie::UploadError), @@ -18,35 +20,42 @@ impl std::fmt::Display for PublishError{ impl std::error::Error for PublishError{} pub struct Publisher{ - subscriber:async_nats::Subscriber, + messages:async_nats::jetstream::consumer::pull::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, } impl Publisher{ pub async fn new( - nats:async_nats::Client, + nats:async_nats::jetstream::Context, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - )->Result{ + )->Result{ Ok(Self{ - subscriber:nats.subscribe("publish_fix").await?, + messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)? + .create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + durable_name:Some("pull".to_owned()), + ..Default::default() + }).await.map_err(NatsStartupError::ConsumerCreateStrict)? + .messages().await.map_err(NatsStartupError::Stream)?, roblox_cookie, api, }) } pub async fn run(mut self){ - while let Some(message)=self.subscriber.next().await{ - self.publish_supress_error(message).await + while let Some(message_result)=self.messages.next().await{ + self.publish_supress_error(message_result).await } } - async fn publish_supress_error(&self,message:async_nats::Message){ - match self.publish(message).await{ + async fn publish_supress_error(&self,message_result:MessageResult){ + match self.publish(message_result).await{ Ok(())=>println!("[PublishFix] Published, hooray!"), Err(e)=>println!("[PublishFix] There was an error, oopsie! {e}"), } } - async fn publish(&self,message:async_nats::Message)->Result<(),PublishError>{ - println!("publish_fix {:?}",message); + async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{ + println!("publish_fix {:?}",message_result); + let message=message_result.map_err(PublishError::Messages)?; + message.double_ack().await.map_err(PublishError::DoubleAck)?; // decode json let publish_info:PublishFixRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?; diff --git a/validation/src/publish_new.rs b/validation/src/publish_new.rs index 220dcd7..07f6456 100644 --- a/validation/src/publish_new.rs +++ b/validation/src/publish_new.rs @@ -1,10 +1,12 @@ use futures::StreamExt; - +use crate::types::{MessageResult,NatsStartupError}; use crate::nats_types::PublishNewRequest; #[allow(dead_code)] #[derive(Debug)] enum PublishError{ + Messages(async_nats::jetstream::consumer::pull::MessagesError), + DoubleAck(async_nats::Error), Get(rbx_asset::cookie::GetError), Json(serde_json::Error), Create(rbx_asset::cookie::CreateError), @@ -20,38 +22,45 @@ impl std::fmt::Display for PublishError{ impl std::error::Error for PublishError{} pub struct Publisher{ - subscriber:async_nats::Subscriber, + messages:async_nats::jetstream::consumer::pull::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - maps_grpc:crate::MapsServiceClient, + maps_grpc:crate::types::MapsServiceClient, } impl Publisher{ pub async fn new( - nats:async_nats::Client, + nats:async_nats::jetstream::Context, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - maps_grpc:crate::MapsServiceClient, - )->Result{ + maps_grpc:crate::types::MapsServiceClient, + )->Result{ Ok(Self{ - subscriber:nats.subscribe("publish_new").await?, + messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)? + .create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + durable_name:Some("pull".to_owned()), + ..Default::default() + }).await.map_err(NatsStartupError::ConsumerCreateStrict)? + .messages().await.map_err(NatsStartupError::Stream)?, roblox_cookie, api, maps_grpc, }) } pub async fn run(mut self){ - while let Some(message)=self.subscriber.next().await{ - self.publish_supress_error(message).await + while let Some(message_result)=self.messages.next().await{ + self.publish_supress_error(message_result).await } } - async fn publish_supress_error(&self,message:async_nats::Message){ - match self.publish(message).await{ + async fn publish_supress_error(&self,message_result:MessageResult){ + match self.publish(message_result).await{ Ok(())=>println!("[PublishNew] Published, hooray!"), Err(e)=>println!("[PublishNew] There was an error, oopsie! {e}"), } } - async fn publish(&self,message:async_nats::Message)->Result<(),PublishError>{ - println!("publish_new {:?}",message); + async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{ + println!("publish_new {:?}",message_result); + let message=message_result.map_err(PublishError::Messages)?; + message.double_ack().await.map_err(PublishError::DoubleAck)?; // decode json let publish_info:PublishNewRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?; diff --git a/validation/src/types.rs b/validation/src/types.rs new file mode 100644 index 0000000..61016b1 --- /dev/null +++ b/validation/src/types.rs @@ -0,0 +1,17 @@ + +// annoying mile-long types +pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient; +pub type MessageResult=Result; + +#[derive(Debug)] +pub enum NatsStartupError{ + GetStream(async_nats::jetstream::context::GetStreamError), + ConsumerCreateStrict(async_nats::jetstream::stream::ConsumerCreateStrictError), + Stream(async_nats::jetstream::consumer::StreamError), +} +impl std::fmt::Display for NatsStartupError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for NatsStartupError{} diff --git a/validation/src/validator.rs b/validation/src/validator.rs index 23aebd9..c171a3c 100644 --- a/validation/src/validator.rs +++ b/validation/src/validator.rs @@ -1,4 +1,5 @@ use futures::{StreamExt,TryStreamExt}; +use crate::types::{MessageResult,NatsStartupError}; use crate::nats_types::ValidateRequest; const SCRIPT_CONCURRENCY:usize=16; @@ -15,6 +16,8 @@ enum Policy{ enum ValidateError{ Blocked, NotAllowed, + Messages(async_nats::jetstream::consumer::pull::MessagesError), + DoubleAck(async_nats::Error), Get(rbx_asset::cookie::GetError), Json(serde_json::Error), ReadDom(ReadDomError), @@ -34,36 +37,43 @@ impl std::fmt::Display for ValidateError{ impl std::error::Error for ValidateError{} pub struct Validator{ - subscriber:async_nats::Subscriber, + messages:async_nats::jetstream::consumer::pull::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, } impl Validator{ pub async fn new( - nats:async_nats::Client, + nats:async_nats::jetstream::Context, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, - )->Result{ + )->Result{ Ok(Self{ - subscriber:nats.subscribe("validate").await?, + messages:nats.get_stream("submissions_validate").await.map_err(NatsStartupError::GetStream)? + .create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + durable_name:Some("pull".to_owned()), + ..Default::default() + }).await.map_err(NatsStartupError::ConsumerCreateStrict)? + .messages().await.map_err(NatsStartupError::Stream)?, roblox_cookie, api, }) } pub async fn run(mut self){ - while let Some(message)=self.subscriber.next().await{ - self.validate_supress_error(message).await + while let Some(message_result)=self.messages.next().await{ + self.validate_supress_error(message_result).await } } - async fn validate_supress_error(&self,message:async_nats::Message){ - match self.validate(message).await{ - Ok(())=>println!("Validated, hooray!"), - Err(e)=>println!("There was an error, oopsie! {e}"), + async fn validate_supress_error(&self,message_result:MessageResult){ + match self.validate(message_result).await{ + Ok(())=>println!("[Validation] Validated, hooray!"), + Err(e)=>println!("[Validation] There was an error, oopsie! {e}"), } } - async fn validate(&self,message:async_nats::Message)->Result<(),ValidateError>{ - println!("validate {:?}",message); + async fn validate(&self,message_result:MessageResult)->Result<(),ValidateError>{ + println!("validate {:?}",message_result); + let message=message_result.map_err(ValidateError::Messages)?; + message.double_ack().await.map_err(ValidateError::DoubleAck)?; // decode json let validate_info:ValidateRequest=serde_json::from_slice(&message.payload).map_err(ValidateError::Json)?;