From 2aacb4c87c33c0a5200437f844078468b4e8f75a Mon Sep 17 00:00:00 2001 From: Quaternions Date: Fri, 6 Dec 2024 20:31:28 -0800 Subject: [PATCH] split publishing for new maps and map fixes --- validation/src/main.rs | 13 ++++--- .../src/{publisher.rs => publish_fix.rs} | 2 +- validation/src/publish_new.rs | 35 +++++++++++++++++++ 3 files changed, 44 insertions(+), 6 deletions(-) rename validation/src/{publisher.rs => publish_fix.rs} (93%) create mode 100644 validation/src/publish_new.rs diff --git a/validation/src/main.rs b/validation/src/main.rs index cc906e6..097207c 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -1,6 +1,7 @@ mod nats_types; -mod publisher; mod validator; +mod publish_new; +mod publish_fix; #[allow(dead_code)] #[derive(Debug)] @@ -28,15 +29,17 @@ async fn main()->Result<(),StartupError>{ let nasty=async_nats::connect("nats").await.map_err(StartupError::Connect)?; // connect to nats - let (publisher,validator)=tokio::try_join!( - publisher::Publisher::new(nasty.clone(),cookie_context.clone()), + let (publish_new,publish_fix,validator)=tokio::try_join!( + publish_new::Publisher::new(nasty.clone(),cookie_context.clone()), + publish_fix::Publisher::new(nasty.clone(),cookie_context.clone()), // clone nats here because it's dropped within the function scope, // meanining the last reference is dropped... validator::Validator::new(nasty.clone(),cookie_context,api) ).map_err(StartupError::Subscribe)?; - // publisher thread - tokio::spawn(publisher.run()); + // publisher threads + tokio::spawn(publish_new.run()); + tokio::spawn(publish_fix.run()); // run validator on the main thread indefinitely validator.run().await; diff --git a/validation/src/publisher.rs b/validation/src/publish_fix.rs similarity index 93% rename from validation/src/publisher.rs rename to validation/src/publish_fix.rs index 2f9f5b9..31423fd 100644 --- a/validation/src/publisher.rs +++ b/validation/src/publish_fix.rs @@ -20,7 +20,7 @@ impl Publisher{ roblox_cookie:rbx_asset::cookie::CookieContext, )->Result{ Ok(Self{ - subscriber:nats.subscribe("publish").await?, + subscriber:nats.subscribe("publish_fix").await?, roblox_cookie, }) } diff --git a/validation/src/publish_new.rs b/validation/src/publish_new.rs new file mode 100644 index 0000000..f9c0530 --- /dev/null +++ b/validation/src/publish_new.rs @@ -0,0 +1,35 @@ +use futures::StreamExt; + +#[derive(Debug)] +enum PublishError{ +} +impl std::fmt::Display for PublishError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for PublishError{} + +pub struct Publisher{ + subscriber:async_nats::Subscriber, + roblox_cookie:rbx_asset::cookie::CookieContext, +} +impl Publisher{ + pub async fn new( + nats:async_nats::Client, + roblox_cookie:rbx_asset::cookie::CookieContext, + )->Result{ + Ok(Self{ + subscriber:nats.subscribe("publish_new").await?, + roblox_cookie, + }) + } + pub async fn run(mut self){ + while let Some(message)=self.subscriber.next().await{ + self.publish(message).await + } + } + async fn publish(&self,message:async_nats::Message){ + println!("publish {:?}",message); + } +}