split publishing for new maps and map fixes

This commit is contained in:
Quaternions 2024-12-06 20:31:28 -08:00
parent f46df3a8eb
commit 2aacb4c87c
3 changed files with 44 additions and 6 deletions

View File

@ -1,6 +1,7 @@
mod nats_types; mod nats_types;
mod publisher;
mod validator; mod validator;
mod publish_new;
mod publish_fix;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
@ -28,15 +29,17 @@ async fn main()->Result<(),StartupError>{
let nasty=async_nats::connect("nats").await.map_err(StartupError::Connect)?; let nasty=async_nats::connect("nats").await.map_err(StartupError::Connect)?;
// connect to nats // connect to nats
let (publisher,validator)=tokio::try_join!( let (publish_new,publish_fix,validator)=tokio::try_join!(
publisher::Publisher::new(nasty.clone(),cookie_context.clone()), publish_new::Publisher::new(nasty.clone(),cookie_context.clone()),
publish_fix::Publisher::new(nasty.clone(),cookie_context.clone()),
// clone nats here because it's dropped within the function scope, // clone nats here because it's dropped within the function scope,
// meanining the last reference is dropped... // meanining the last reference is dropped...
validator::Validator::new(nasty.clone(),cookie_context,api) validator::Validator::new(nasty.clone(),cookie_context,api)
).map_err(StartupError::Subscribe)?; ).map_err(StartupError::Subscribe)?;
// publisher thread // publisher threads
tokio::spawn(publisher.run()); tokio::spawn(publish_new.run());
tokio::spawn(publish_fix.run());
// run validator on the main thread indefinitely // run validator on the main thread indefinitely
validator.run().await; validator.run().await;

View File

@ -20,7 +20,7 @@ impl Publisher{
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
)->Result<Self,async_nats::SubscribeError>{ )->Result<Self,async_nats::SubscribeError>{
Ok(Self{ Ok(Self{
subscriber:nats.subscribe("publish").await?, subscriber:nats.subscribe("publish_fix").await?,
roblox_cookie, roblox_cookie,
}) })
} }

View File

@ -0,0 +1,35 @@
use futures::StreamExt;
#[derive(Debug)]
enum PublishError{
}
impl std::fmt::Display for PublishError{
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
write!(f,"{self:?}")
}
}
impl std::error::Error for PublishError{}
pub struct Publisher{
subscriber:async_nats::Subscriber,
roblox_cookie:rbx_asset::cookie::CookieContext,
}
impl Publisher{
pub async fn new(
nats:async_nats::Client,
roblox_cookie:rbx_asset::cookie::CookieContext,
)->Result<Self,async_nats::SubscribeError>{
Ok(Self{
subscriber:nats.subscribe("publish_new").await?,
roblox_cookie,
})
}
pub async fn run(mut self){
while let Some(message)=self.subscriber.next().await{
self.publish(message).await
}
}
async fn publish(&self,message:async_nats::Message){
println!("publish {:?}",message);
}
}