From c7f3cdc0d606ee32fda862b7fc5636ffe585f19c Mon Sep 17 00:00:00 2001 From: Quaternions Date: Mon, 2 Dec 2024 22:09:01 -0800 Subject: [PATCH] the natsy --- validation/src/main.rs | 22 ++++++++++++++++++++-- validation/src/publisher.rs | 25 +++++++++++++++++++++++++ validation/src/validator.rs | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 validation/src/publisher.rs diff --git a/validation/src/main.rs b/validation/src/main.rs index fcd33d8..8bf2d57 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -1,5 +1,23 @@ +mod publisher; mod validator; -#[tokio::main] -async fn main(){ +#[allow(dead_code)] +#[derive(Debug)] +enum StartupError{ + Connect(async_nats::ConnectError), + Subscribe(async_nats::SubscribeError), +} + +#[tokio::main] +async fn main()->Result<(),StartupError>{ + let nasty=async_nats::connect("nats").await.map_err(StartupError::Connect)?; + let (publisher,validator)=tokio::try_join!( + publisher::Publisher::new(nasty.clone()), + validator::Validator::new(nasty) + ).map_err(StartupError::Subscribe)?; + // publisher thread + tokio::spawn(publisher.run()); + // run validator on the main thread indefinitely + validator.run().await; + Ok(()) } diff --git a/validation/src/publisher.rs b/validation/src/publisher.rs new file mode 100644 index 0000000..4cc1135 --- /dev/null +++ b/validation/src/publisher.rs @@ -0,0 +1,25 @@ +use futures::StreamExt; + +enum PublishError{ +} + +pub struct Publisher{ + nats:async_nats::Client, + subscriber:async_nats::Subscriber, +} +impl Publisher{ + pub async fn new(nats:async_nats::Client)->Result{ + Ok(Self{ + subscriber:nats.subscribe("publish").await?, + nats, + }) + } + pub async fn run(mut self){ + while let Some(message)=self.subscriber.next().await{ + self.publish(message).await + } + } + async fn publish(&self,message:async_nats::Message){ + println!("publish {:?}",message); + } +} diff --git a/validation/src/validator.rs b/validation/src/validator.rs index e69de29..339d386 100644 --- a/validation/src/validator.rs +++ b/validation/src/validator.rs @@ -0,0 +1,37 @@ +use futures::StreamExt; +use rbx_dom_weak::WeakDom; + +enum Valid{ + Untouched(WeakDom), + Modified(WeakDom), +} + +enum ValidateError{ +} + + +pub struct Validator{ + nats:async_nats::Client, + subscriber:async_nats::Subscriber, +} +impl Validator{ + pub async fn new(nats:async_nats::Client)->Result{ + Ok(Self{ + subscriber:nats.subscribe("validate").await?, + nats, + }) + } + pub async fn run(mut self){ + while let Some(message)=self.subscriber.next().await{ + self.validate(message).await + } + } + async fn validate(&self,message:async_nats::Message){ + println!("validate {:?}",message); + // download map + // validate map + // validate(dom) + // reply with validity + // Ok(Valid::Untouched(dom)) + } +}