validation: nats jetstream

This commit is contained in:
Quaternions 2024-12-10 18:04:02 -08:00
parent 7c2e8f00ad
commit ecaff95ca7
5 changed files with 91 additions and 46 deletions

View File

@ -1,3 +1,4 @@
mod types;
mod nats_types; mod nats_types;
mod validator; mod validator;
mod publish_new; mod publish_new;
@ -5,10 +6,10 @@ mod publish_fix;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
enum StartupError{ pub enum StartupError{
API(api::ReqwestError), API(api::ReqwestError),
NatsConnect(async_nats::ConnectError), NatsConnect(async_nats::ConnectError),
NatsSubscribe(async_nats::SubscribeError), NatsStartup(types::NatsStartupError),
GRPCConnect(tonic::transport::Error), GRPCConnect(tonic::transport::Error),
} }
impl std::fmt::Display for StartupError{ impl std::fmt::Display for StartupError{
@ -20,9 +21,6 @@ impl std::error::Error for StartupError{}
pub const GROUP_STRAFESNET:u64=6980477; pub const GROUP_STRAFESNET:u64=6980477;
// annoying mile-long type
pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient<tonic::transport::channel::Channel>;
#[tokio::main] #[tokio::main]
async fn main()->Result<(),StartupError>{ async fn main()->Result<(),StartupError>{
// talk to roblox through STRAFESNET_CI2 account // talk to roblox through STRAFESNET_CI2 account
@ -39,16 +37,18 @@ async fn main()->Result<(),StartupError>{
// data-service grpc for creating map entries // data-service grpc for creating map entries
let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required"); let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required");
let maps_grpc=MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?;
// use nats jetstream
let jetstream=async_nats::jetstream::new(nasty);
// connect to nats // connect to nats
let (publish_new,publish_fix,validator)=tokio::try_join!( let (publish_new,publish_fix,validator)=tokio::try_join!(
publish_new::Publisher::new(nasty.clone(),cookie_context.clone(),api.clone(),maps_grpc), publish_new::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone(),maps_grpc),
publish_fix::Publisher::new(nasty.clone(),cookie_context.clone(),api.clone()), publish_fix::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone()),
// clone nats here because it's dropped within the function scope, // clone nats here because it's dropped within the function scope,
// meanining the last reference is dropped... // meanining the last reference is dropped...
validator::Validator::new(nasty.clone(),cookie_context,api) validator::Validator::new(jetstream.clone(),cookie_context,api)
).map_err(StartupError::NatsSubscribe)?; ).map_err(StartupError::NatsStartup)?;
// publisher threads // publisher threads
tokio::spawn(publish_new.run()); tokio::spawn(publish_new.run());

View File

@ -1,10 +1,12 @@
use futures::StreamExt; use futures::StreamExt;
use crate::types::{MessageResult,NatsStartupError};
use crate::nats_types::PublishFixRequest; use crate::nats_types::PublishFixRequest;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
enum PublishError{ enum PublishError{
Messages(async_nats::jetstream::consumer::pull::MessagesError),
DoubleAck(async_nats::Error),
Get(rbx_asset::cookie::GetError), Get(rbx_asset::cookie::GetError),
Json(serde_json::Error), Json(serde_json::Error),
Upload(rbx_asset::cookie::UploadError), Upload(rbx_asset::cookie::UploadError),
@ -18,35 +20,42 @@ impl std::fmt::Display for PublishError{
impl std::error::Error for PublishError{} impl std::error::Error for PublishError{}
pub struct Publisher{ pub struct Publisher{
subscriber:async_nats::Subscriber, messages:async_nats::jetstream::consumer::pull::Stream,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
} }
impl Publisher{ impl Publisher{
pub async fn new( pub async fn new(
nats:async_nats::Client, nats:async_nats::jetstream::Context,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
)->Result<Self,async_nats::SubscribeError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
subscriber:nats.subscribe("publish_fix").await?, messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)?
.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{
durable_name:Some("pull".to_owned()),
..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)?
.messages().await.map_err(NatsStartupError::Stream)?,
roblox_cookie, roblox_cookie,
api, api,
}) })
} }
pub async fn run(mut self){ pub async fn run(mut self){
while let Some(message)=self.subscriber.next().await{ while let Some(message_result)=self.messages.next().await{
self.publish_supress_error(message).await self.publish_supress_error(message_result).await
} }
} }
async fn publish_supress_error(&self,message:async_nats::Message){ async fn publish_supress_error(&self,message_result:MessageResult){
match self.publish(message).await{ match self.publish(message_result).await{
Ok(())=>println!("[PublishFix] Published, hooray!"), Ok(())=>println!("[PublishFix] Published, hooray!"),
Err(e)=>println!("[PublishFix] There was an error, oopsie! {e}"), Err(e)=>println!("[PublishFix] There was an error, oopsie! {e}"),
} }
} }
async fn publish(&self,message:async_nats::Message)->Result<(),PublishError>{ async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{
println!("publish_fix {:?}",message); println!("publish_fix {:?}",message_result);
let message=message_result.map_err(PublishError::Messages)?;
message.double_ack().await.map_err(PublishError::DoubleAck)?;
// decode json // decode json
let publish_info:PublishFixRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?; let publish_info:PublishFixRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?;

View File

@ -1,10 +1,12 @@
use futures::StreamExt; use futures::StreamExt;
use crate::types::{MessageResult,NatsStartupError};
use crate::nats_types::PublishNewRequest; use crate::nats_types::PublishNewRequest;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
enum PublishError{ enum PublishError{
Messages(async_nats::jetstream::consumer::pull::MessagesError),
DoubleAck(async_nats::Error),
Get(rbx_asset::cookie::GetError), Get(rbx_asset::cookie::GetError),
Json(serde_json::Error), Json(serde_json::Error),
Create(rbx_asset::cookie::CreateError), Create(rbx_asset::cookie::CreateError),
@ -20,38 +22,45 @@ impl std::fmt::Display for PublishError{
impl std::error::Error for PublishError{} impl std::error::Error for PublishError{}
pub struct Publisher{ pub struct Publisher{
subscriber:async_nats::Subscriber, messages:async_nats::jetstream::consumer::pull::Stream,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
maps_grpc:crate::MapsServiceClient, maps_grpc:crate::types::MapsServiceClient,
} }
impl Publisher{ impl Publisher{
pub async fn new( pub async fn new(
nats:async_nats::Client, nats:async_nats::jetstream::Context,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
maps_grpc:crate::MapsServiceClient, maps_grpc:crate::types::MapsServiceClient,
)->Result<Self,async_nats::SubscribeError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
subscriber:nats.subscribe("publish_new").await?, messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)?
.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{
durable_name:Some("pull".to_owned()),
..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)?
.messages().await.map_err(NatsStartupError::Stream)?,
roblox_cookie, roblox_cookie,
api, api,
maps_grpc, maps_grpc,
}) })
} }
pub async fn run(mut self){ pub async fn run(mut self){
while let Some(message)=self.subscriber.next().await{ while let Some(message_result)=self.messages.next().await{
self.publish_supress_error(message).await self.publish_supress_error(message_result).await
} }
} }
async fn publish_supress_error(&self,message:async_nats::Message){ async fn publish_supress_error(&self,message_result:MessageResult){
match self.publish(message).await{ match self.publish(message_result).await{
Ok(())=>println!("[PublishNew] Published, hooray!"), Ok(())=>println!("[PublishNew] Published, hooray!"),
Err(e)=>println!("[PublishNew] There was an error, oopsie! {e}"), Err(e)=>println!("[PublishNew] There was an error, oopsie! {e}"),
} }
} }
async fn publish(&self,message:async_nats::Message)->Result<(),PublishError>{ async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{
println!("publish_new {:?}",message); println!("publish_new {:?}",message_result);
let message=message_result.map_err(PublishError::Messages)?;
message.double_ack().await.map_err(PublishError::DoubleAck)?;
// decode json // decode json
let publish_info:PublishNewRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?; let publish_info:PublishNewRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?;

17
validation/src/types.rs Normal file
View File

@ -0,0 +1,17 @@
// annoying mile-long types
pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient<tonic::transport::channel::Channel>;
pub type MessageResult=Result<async_nats::jetstream::Message,async_nats::jetstream::consumer::pull::MessagesError>;
#[derive(Debug)]
pub enum NatsStartupError{
GetStream(async_nats::jetstream::context::GetStreamError),
ConsumerCreateStrict(async_nats::jetstream::stream::ConsumerCreateStrictError),
Stream(async_nats::jetstream::consumer::StreamError),
}
impl std::fmt::Display for NatsStartupError{
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
write!(f,"{self:?}")
}
}
impl std::error::Error for NatsStartupError{}

View File

@ -1,4 +1,5 @@
use futures::{StreamExt,TryStreamExt}; use futures::{StreamExt,TryStreamExt};
use crate::types::{MessageResult,NatsStartupError};
use crate::nats_types::ValidateRequest; use crate::nats_types::ValidateRequest;
const SCRIPT_CONCURRENCY:usize=16; const SCRIPT_CONCURRENCY:usize=16;
@ -15,6 +16,8 @@ enum Policy{
enum ValidateError{ enum ValidateError{
Blocked, Blocked,
NotAllowed, NotAllowed,
Messages(async_nats::jetstream::consumer::pull::MessagesError),
DoubleAck(async_nats::Error),
Get(rbx_asset::cookie::GetError), Get(rbx_asset::cookie::GetError),
Json(serde_json::Error), Json(serde_json::Error),
ReadDom(ReadDomError), ReadDom(ReadDomError),
@ -34,36 +37,43 @@ impl std::fmt::Display for ValidateError{
impl std::error::Error for ValidateError{} impl std::error::Error for ValidateError{}
pub struct Validator{ pub struct Validator{
subscriber:async_nats::Subscriber, messages:async_nats::jetstream::consumer::pull::Stream,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
} }
impl Validator{ impl Validator{
pub async fn new( pub async fn new(
nats:async_nats::Client, nats:async_nats::jetstream::Context,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
)->Result<Self,async_nats::SubscribeError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
subscriber:nats.subscribe("validate").await?, messages:nats.get_stream("submissions_validate").await.map_err(NatsStartupError::GetStream)?
.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{
durable_name:Some("pull".to_owned()),
..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)?
.messages().await.map_err(NatsStartupError::Stream)?,
roblox_cookie, roblox_cookie,
api, api,
}) })
} }
pub async fn run(mut self){ pub async fn run(mut self){
while let Some(message)=self.subscriber.next().await{ while let Some(message_result)=self.messages.next().await{
self.validate_supress_error(message).await self.validate_supress_error(message_result).await
} }
} }
async fn validate_supress_error(&self,message:async_nats::Message){ async fn validate_supress_error(&self,message_result:MessageResult){
match self.validate(message).await{ match self.validate(message_result).await{
Ok(())=>println!("Validated, hooray!"), Ok(())=>println!("[Validation] Validated, hooray!"),
Err(e)=>println!("There was an error, oopsie! {e}"), Err(e)=>println!("[Validation] There was an error, oopsie! {e}"),
} }
} }
async fn validate(&self,message:async_nats::Message)->Result<(),ValidateError>{ async fn validate(&self,message_result:MessageResult)->Result<(),ValidateError>{
println!("validate {:?}",message); println!("validate {:?}",message_result);
let message=message_result.map_err(ValidateError::Messages)?;
message.double_ack().await.map_err(ValidateError::DoubleAck)?;
// decode json // decode json
let validate_info:ValidateRequest=serde_json::from_slice(&message.payload).map_err(ValidateError::Json)?; let validate_info:ValidateRequest=serde_json::from_slice(&message.payload).map_err(ValidateError::Json)?;