validation: refactor to use a single consumer
This commit is contained in:
parent
1f96b5facb
commit
8250686477
@ -1,8 +1,10 @@
|
|||||||
mod types;
|
use futures::StreamExt;
|
||||||
|
|
||||||
mod nats_types;
|
mod nats_types;
|
||||||
mod validator;
|
mod validator;
|
||||||
mod publish_new;
|
mod publish_new;
|
||||||
mod publish_fix;
|
mod publish_fix;
|
||||||
|
mod message_handler;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -10,7 +12,8 @@ pub enum StartupError{
|
|||||||
API(api::ReqwestError),
|
API(api::ReqwestError),
|
||||||
NatsConnect(async_nats::ConnectError),
|
NatsConnect(async_nats::ConnectError),
|
||||||
NatsGetStream(async_nats::jetstream::context::GetStreamError),
|
NatsGetStream(async_nats::jetstream::context::GetStreamError),
|
||||||
NatsStartup(types::NatsStartupError),
|
NatsConsumer(async_nats::jetstream::stream::ConsumerError),
|
||||||
|
NatsStream(async_nats::jetstream::consumer::StreamError),
|
||||||
GRPCConnect(tonic::transport::Error),
|
GRPCConnect(tonic::transport::Error),
|
||||||
}
|
}
|
||||||
impl std::fmt::Display for StartupError{
|
impl std::fmt::Display for StartupError{
|
||||||
@ -20,6 +23,9 @@ impl std::fmt::Display for StartupError{
|
|||||||
}
|
}
|
||||||
impl std::error::Error for StartupError{}
|
impl std::error::Error for StartupError{}
|
||||||
|
|
||||||
|
// annoying mile-long type
|
||||||
|
pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient<tonic::transport::channel::Channel>;
|
||||||
|
|
||||||
pub const GROUP_STRAFESNET:u64=6980477;
|
pub const GROUP_STRAFESNET:u64=6980477;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@ -35,27 +41,35 @@ async fn main()->Result<(),StartupError>{
|
|||||||
// nats
|
// nats
|
||||||
let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required");
|
let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required");
|
||||||
let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; // use nats jetstream
|
let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; // use nats jetstream
|
||||||
let stream=async_nats::jetstream::new(nasty)
|
let mut messages=async_nats::jetstream::new(nasty)
|
||||||
.get_stream("maptest").await.map_err(StartupError::NatsGetStream)?;
|
.get_stream("maptest").await.map_err(StartupError::NatsGetStream)?
|
||||||
|
.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{
|
||||||
|
name:Some("validation".to_owned()),
|
||||||
|
durable_name:Some("validation".to_owned()),
|
||||||
|
filter_subject:"maptest.submissions.>".to_owned(),
|
||||||
|
..Default::default()
|
||||||
|
}).await.map_err(StartupError::NatsConsumer)?
|
||||||
|
.messages().await.map_err(StartupError::NatsStream)?;
|
||||||
|
|
||||||
// data-service grpc for creating map entries
|
// data-service grpc for creating map entries
|
||||||
let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required");
|
let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required");
|
||||||
let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?;
|
let maps_grpc=crate::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?;
|
||||||
|
|
||||||
// Create a signal listener for SIGTERM
|
// Create a signal listener for SIGTERM
|
||||||
let mut sig_term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).expect("Failed to create SIGTERM signal listener");
|
let mut sig_term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).expect("Failed to create SIGTERM signal listener");
|
||||||
|
|
||||||
// connect to nats
|
// the guy
|
||||||
let (publish_new,publish_fix,validator)=tokio::try_join!(
|
let message_handler=message_handler::MessageHandler::new(cookie_context,api,maps_grpc);
|
||||||
publish_new::Publisher::new(stream.clone(),cookie_context.clone(),api.clone(),maps_grpc),
|
|
||||||
publish_fix::Publisher::new(stream.clone(),cookie_context.clone(),api.clone()),
|
|
||||||
validator::Validator::new(stream,cookie_context,api)
|
|
||||||
).map_err(StartupError::NatsStartup)?;
|
|
||||||
|
|
||||||
// nats consumer threads
|
// nats consumer thread
|
||||||
tokio::spawn(publish_new.run());
|
tokio::spawn(async move{
|
||||||
tokio::spawn(publish_fix.run());
|
while let Some(message_result)=messages.next().await{
|
||||||
tokio::spawn(validator.run());
|
match message_handler.handle_message_result(message_result).await{
|
||||||
|
Ok(())=>println!("[Validation] Success, hooray!"),
|
||||||
|
Err(e)=>println!("[Validation] There was an error, oopsie! {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
sig_term.recv().await;
|
sig_term.recv().await;
|
||||||
|
|
||||||
|
48
validation/src/message_handler.rs
Normal file
48
validation/src/message_handler.rs
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
#[allow(dead_code)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum HandleMessageError{
|
||||||
|
Messages(async_nats::jetstream::consumer::pull::MessagesError),
|
||||||
|
DoubleAck(async_nats::Error),
|
||||||
|
UnknownSubject(String),
|
||||||
|
PublishNew(crate::publish_new::PublishError),
|
||||||
|
PublishFix(crate::publish_fix::PublishError),
|
||||||
|
Validation(crate::validator::ValidateError),
|
||||||
|
}
|
||||||
|
impl std::fmt::Display for HandleMessageError{
|
||||||
|
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
|
||||||
|
write!(f,"{self:?}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl std::error::Error for HandleMessageError{}
|
||||||
|
|
||||||
|
pub type MessageResult=Result<async_nats::jetstream::Message,async_nats::jetstream::consumer::pull::MessagesError>;
|
||||||
|
|
||||||
|
pub struct MessageHandler{
|
||||||
|
publish_new:crate::publish_new::Publisher,
|
||||||
|
publish_fix:crate::publish_fix::Publisher,
|
||||||
|
validator:crate::validator::Validator,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageHandler{
|
||||||
|
pub fn new(
|
||||||
|
cookie_context:rbx_asset::cookie::CookieContext,
|
||||||
|
api:api::Context,
|
||||||
|
maps_grpc:crate::MapsServiceClient,
|
||||||
|
)->Self{
|
||||||
|
Self{
|
||||||
|
publish_new:crate::publish_new::Publisher::new(cookie_context.clone(),api.clone(),maps_grpc),
|
||||||
|
publish_fix:crate::publish_fix::Publisher::new(cookie_context.clone(),api.clone()),
|
||||||
|
validator:crate::validator::Validator::new(cookie_context,api),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn handle_message_result(&self,message_result:MessageResult)->Result<(),HandleMessageError>{
|
||||||
|
let message=message_result.map_err(HandleMessageError::Messages)?;
|
||||||
|
message.double_ack().await.map_err(HandleMessageError::DoubleAck)?;
|
||||||
|
match message.subject.as_str(){
|
||||||
|
"maptest.submissions.publish.new"=>self.publish_new.publish(message).await.map_err(HandleMessageError::PublishNew),
|
||||||
|
"maptest.submissions.publish.fix"=>self.publish_fix.publish(message).await.map_err(HandleMessageError::PublishFix),
|
||||||
|
"maptest.submissions.validate"=>self.validator.validate(message).await.map_err(HandleMessageError::Validation),
|
||||||
|
other=>Err(HandleMessageError::UnknownSubject(other.to_owned()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,12 +1,8 @@
|
|||||||
use futures::StreamExt;
|
|
||||||
use crate::types::{MessageResult,NatsStartupError};
|
|
||||||
use crate::nats_types::PublishFixRequest;
|
use crate::nats_types::PublishFixRequest;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum PublishError{
|
pub enum PublishError{
|
||||||
Messages(async_nats::jetstream::consumer::pull::MessagesError),
|
|
||||||
DoubleAck(async_nats::Error),
|
|
||||||
Get(rbx_asset::cookie::GetError),
|
Get(rbx_asset::cookie::GetError),
|
||||||
Json(serde_json::Error),
|
Json(serde_json::Error),
|
||||||
Upload(rbx_asset::cookie::UploadError),
|
Upload(rbx_asset::cookie::UploadError),
|
||||||
@ -20,43 +16,21 @@ impl std::fmt::Display for PublishError{
|
|||||||
impl std::error::Error for PublishError{}
|
impl std::error::Error for PublishError{}
|
||||||
|
|
||||||
pub struct Publisher{
|
pub struct Publisher{
|
||||||
messages:async_nats::jetstream::consumer::pull::Stream,
|
|
||||||
roblox_cookie:rbx_asset::cookie::CookieContext,
|
roblox_cookie:rbx_asset::cookie::CookieContext,
|
||||||
api:api::Context,
|
api:api::Context,
|
||||||
}
|
}
|
||||||
impl Publisher{
|
impl Publisher{
|
||||||
pub async fn new(
|
pub const fn new(
|
||||||
stream:async_nats::jetstream::stream::Stream,
|
|
||||||
roblox_cookie:rbx_asset::cookie::CookieContext,
|
roblox_cookie:rbx_asset::cookie::CookieContext,
|
||||||
api:api::Context,
|
api:api::Context,
|
||||||
)->Result<Self,NatsStartupError>{
|
)->Self{
|
||||||
Ok(Self{
|
Self{
|
||||||
messages:stream.get_or_create_consumer("publish_fix",async_nats::jetstream::consumer::pull::Config{
|
|
||||||
name:Some("publish_fix".to_owned()),
|
|
||||||
durable_name:Some("publish_fix".to_owned()),
|
|
||||||
filter_subject:"maptest.submissions.publish.fix".to_owned(),
|
|
||||||
..Default::default()
|
|
||||||
}).await.map_err(NatsStartupError::Consumer)?
|
|
||||||
.messages().await.map_err(NatsStartupError::Stream)?,
|
|
||||||
roblox_cookie,
|
roblox_cookie,
|
||||||
api,
|
api,
|
||||||
})
|
|
||||||
}
|
|
||||||
pub async fn run(mut self){
|
|
||||||
while let Some(message_result)=self.messages.next().await{
|
|
||||||
self.publish_supress_error(message_result).await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn publish_supress_error(&self,message_result:MessageResult){
|
pub async fn publish(&self,message:async_nats::jetstream::Message)->Result<(),PublishError>{
|
||||||
match self.publish(message_result).await{
|
println!("publish_fix {:?}",message);
|
||||||
Ok(())=>println!("[PublishFix] Published, hooray!"),
|
|
||||||
Err(e)=>println!("[PublishFix] There was an error, oopsie! {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{
|
|
||||||
println!("publish_fix {:?}",message_result);
|
|
||||||
let message=message_result.map_err(PublishError::Messages)?;
|
|
||||||
message.double_ack().await.map_err(PublishError::DoubleAck)?;
|
|
||||||
// decode json
|
// decode json
|
||||||
let publish_info:PublishFixRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?;
|
let publish_info:PublishFixRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?;
|
||||||
|
|
||||||
|
@ -1,12 +1,8 @@
|
|||||||
use futures::StreamExt;
|
|
||||||
use crate::types::{MessageResult,NatsStartupError};
|
|
||||||
use crate::nats_types::PublishNewRequest;
|
use crate::nats_types::PublishNewRequest;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum PublishError{
|
pub enum PublishError{
|
||||||
Messages(async_nats::jetstream::consumer::pull::MessagesError),
|
|
||||||
DoubleAck(async_nats::Error),
|
|
||||||
Get(rbx_asset::cookie::GetError),
|
Get(rbx_asset::cookie::GetError),
|
||||||
Json(serde_json::Error),
|
Json(serde_json::Error),
|
||||||
Create(rbx_asset::cookie::CreateError),
|
Create(rbx_asset::cookie::CreateError),
|
||||||
@ -22,46 +18,24 @@ impl std::fmt::Display for PublishError{
|
|||||||
impl std::error::Error for PublishError{}
|
impl std::error::Error for PublishError{}
|
||||||
|
|
||||||
pub struct Publisher{
|
pub struct Publisher{
|
||||||
messages:async_nats::jetstream::consumer::pull::Stream,
|
|
||||||
roblox_cookie:rbx_asset::cookie::CookieContext,
|
roblox_cookie:rbx_asset::cookie::CookieContext,
|
||||||
api:api::Context,
|
api:api::Context,
|
||||||
maps_grpc:crate::types::MapsServiceClient,
|
maps_grpc:crate::MapsServiceClient,
|
||||||
}
|
}
|
||||||
impl Publisher{
|
impl Publisher{
|
||||||
pub async fn new(
|
pub const fn new(
|
||||||
stream:async_nats::jetstream::stream::Stream,
|
|
||||||
roblox_cookie:rbx_asset::cookie::CookieContext,
|
roblox_cookie:rbx_asset::cookie::CookieContext,
|
||||||
api:api::Context,
|
api:api::Context,
|
||||||
maps_grpc:crate::types::MapsServiceClient,
|
maps_grpc:crate::MapsServiceClient,
|
||||||
)->Result<Self,NatsStartupError>{
|
)->Self{
|
||||||
Ok(Self{
|
Self{
|
||||||
messages:stream.get_or_create_consumer("publish_new",async_nats::jetstream::consumer::pull::Config{
|
|
||||||
name:Some("publish_new".to_owned()),
|
|
||||||
durable_name:Some("publish_new".to_owned()),
|
|
||||||
filter_subject:"maptest.submissions.publish.new".to_owned(),
|
|
||||||
..Default::default()
|
|
||||||
}).await.map_err(NatsStartupError::Consumer)?
|
|
||||||
.messages().await.map_err(NatsStartupError::Stream)?,
|
|
||||||
roblox_cookie,
|
roblox_cookie,
|
||||||
api,
|
api,
|
||||||
maps_grpc,
|
maps_grpc,
|
||||||
})
|
|
||||||
}
|
|
||||||
pub async fn run(mut self){
|
|
||||||
while let Some(message_result)=self.messages.next().await{
|
|
||||||
self.publish_supress_error(message_result).await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn publish_supress_error(&self,message_result:MessageResult){
|
pub async fn publish(&self,message:async_nats::jetstream::Message)->Result<(),PublishError>{
|
||||||
match self.publish(message_result).await{
|
println!("publish_new {:?}",message);
|
||||||
Ok(())=>println!("[PublishNew] Published, hooray!"),
|
|
||||||
Err(e)=>println!("[PublishNew] There was an error, oopsie! {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async fn publish(&self,message_result:MessageResult)->Result<(),PublishError>{
|
|
||||||
println!("publish_new {:?}",message_result);
|
|
||||||
let message=message_result.map_err(PublishError::Messages)?;
|
|
||||||
message.double_ack().await.map_err(PublishError::DoubleAck)?;
|
|
||||||
// decode json
|
// decode json
|
||||||
let publish_info:PublishNewRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?;
|
let publish_info:PublishNewRequest=serde_json::from_slice(&message.payload).map_err(PublishError::Json)?;
|
||||||
|
|
||||||
|
@ -1,16 +0,0 @@
|
|||||||
|
|
||||||
// annoying mile-long types
|
|
||||||
pub type MapsServiceClient=rust_grpc::maps::maps_service_client::MapsServiceClient<tonic::transport::channel::Channel>;
|
|
||||||
pub type MessageResult=Result<async_nats::jetstream::Message,async_nats::jetstream::consumer::pull::MessagesError>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum NatsStartupError{
|
|
||||||
Consumer(async_nats::jetstream::stream::ConsumerError),
|
|
||||||
Stream(async_nats::jetstream::consumer::StreamError),
|
|
||||||
}
|
|
||||||
impl std::fmt::Display for NatsStartupError{
|
|
||||||
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
|
|
||||||
write!(f,"{self:?}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl std::error::Error for NatsStartupError{}
|
|
@ -1,5 +1,5 @@
|
|||||||
use futures::{StreamExt,TryStreamExt};
|
use futures::TryStreamExt;
|
||||||
use crate::types::{MessageResult,NatsStartupError};
|
|
||||||
use crate::nats_types::ValidateRequest;
|
use crate::nats_types::ValidateRequest;
|
||||||
|
|
||||||
const SCRIPT_CONCURRENCY:usize=16;
|
const SCRIPT_CONCURRENCY:usize=16;
|
||||||
@ -13,11 +13,9 @@ enum Policy{
|
|||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum ValidateError{
|
pub enum ValidateError{
|
||||||
Blocked,
|
Blocked,
|
||||||
NotAllowed,
|
NotAllowed,
|
||||||
Messages(async_nats::jetstream::consumer::pull::MessagesError),
|
|
||||||
DoubleAck(async_nats::Error),
|
|
||||||
Get(rbx_asset::cookie::GetError),
|
Get(rbx_asset::cookie::GetError),
|
||||||
Json(serde_json::Error),
|
Json(serde_json::Error),
|
||||||
ReadDom(ReadDomError),
|
ReadDom(ReadDomError),
|
||||||
@ -37,45 +35,22 @@ impl std::fmt::Display for ValidateError{
|
|||||||
impl std::error::Error for ValidateError{}
|
impl std::error::Error for ValidateError{}
|
||||||
|
|
||||||
pub struct Validator{
|
pub struct Validator{
|
||||||
messages:async_nats::jetstream::consumer::pull::Stream,
|
|
||||||
roblox_cookie:rbx_asset::cookie::CookieContext,
|
roblox_cookie:rbx_asset::cookie::CookieContext,
|
||||||
api:api::Context,
|
api:api::Context,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Validator{
|
impl Validator{
|
||||||
pub async fn new(
|
pub const fn new(
|
||||||
stream:async_nats::jetstream::stream::Stream,
|
|
||||||
roblox_cookie:rbx_asset::cookie::CookieContext,
|
roblox_cookie:rbx_asset::cookie::CookieContext,
|
||||||
api:api::Context,
|
api:api::Context,
|
||||||
)->Result<Self,NatsStartupError>{
|
)->Self{
|
||||||
Ok(Self{
|
Self{
|
||||||
messages:stream.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{
|
|
||||||
name:Some("validation".to_owned()),
|
|
||||||
durable_name:Some("validation".to_owned()),
|
|
||||||
ack_policy:async_nats::jetstream::consumer::AckPolicy::Explicit,
|
|
||||||
filter_subject:"maptest.submissions.validate".to_owned(),
|
|
||||||
..Default::default()
|
|
||||||
}).await.map_err(NatsStartupError::Consumer)?
|
|
||||||
.messages().await.map_err(NatsStartupError::Stream)?,
|
|
||||||
roblox_cookie,
|
roblox_cookie,
|
||||||
api,
|
api,
|
||||||
})
|
|
||||||
}
|
|
||||||
pub async fn run(mut self){
|
|
||||||
while let Some(message_result)=self.messages.next().await{
|
|
||||||
self.validate_supress_error(message_result).await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn validate_supress_error(&self,message_result:MessageResult){
|
pub async fn validate(&self,message:async_nats::jetstream::Message)->Result<(),ValidateError>{
|
||||||
match self.validate(message_result).await{
|
println!("validate {:?}",message);
|
||||||
Ok(())=>println!("[Validation] Validated, hooray!"),
|
|
||||||
Err(e)=>println!("[Validation] There was an error, oopsie! {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async fn validate(&self,message_result:MessageResult)->Result<(),ValidateError>{
|
|
||||||
println!("validate {:?}",message_result);
|
|
||||||
let message=message_result.map_err(ValidateError::Messages)?;
|
|
||||||
message.double_ack().await.map_err(ValidateError::DoubleAck)?;
|
|
||||||
// decode json
|
// decode json
|
||||||
let validate_info:ValidateRequest=serde_json::from_slice(&message.payload).map_err(ValidateError::Json)?;
|
let validate_info:ValidateRequest=serde_json::from_slice(&message.payload).map_err(ValidateError::Json)?;
|
||||||
|
|
||||||
@ -205,7 +180,7 @@ impl Validator{
|
|||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum ReadDomError{
|
pub enum ReadDomError{
|
||||||
Binary(rbx_binary::DecodeError),
|
Binary(rbx_binary::DecodeError),
|
||||||
Xml(rbx_xml::DecodeError),
|
Xml(rbx_xml::DecodeError),
|
||||||
Read(std::io::Error),
|
Read(std::io::Error),
|
||||||
|
Loading…
Reference in New Issue
Block a user