From eea57786fffa018a8c08e34888b21b3cfda551df Mon Sep 17 00:00:00 2001 From: Quaternions Date: Wed, 11 Dec 2024 21:07:51 -0800 Subject: [PATCH] nats: use subjects --- pkg/cmds/serve.go | 18 +++++++----------- pkg/service/submissions.go | 6 +++--- validation/src/main.rs | 13 +++++++------ validation/src/publish_fix.rs | 6 +++--- validation/src/publish_new.rs | 6 +++--- validation/src/types.rs | 1 - validation/src/validator.rs | 6 +++--- 7 files changed, 26 insertions(+), 30 deletions(-) diff --git a/pkg/cmds/serve.go b/pkg/cmds/serve.go index 108c270..2713f1a 100644 --- a/pkg/cmds/serve.go +++ b/pkg/cmds/serve.go @@ -91,19 +91,15 @@ func serve(ctx *cli.Context) error { if err != nil { log.WithError(err).Fatal("failed to start jetstream") } - // this should be somewhere else but whatever - _, err = js.AddStream(&nats.StreamConfig{Name:"submissions_validate"}) + + _, err = js.AddStream(&nats.StreamConfig{ + Name:"maptest", + Subjects: []string{"maptest.>"}, + }) if err != nil { - log.WithError(err).Fatal("failed to add stream submissions_validate") - } - _, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_new"}) - if err != nil { - log.WithError(err).Fatal("failed to add stream submissions_publish_new") - } - _, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_fix"}) - if err != nil { - log.WithError(err).Fatal("failed to add stream submissions_publish_fix") + log.WithError(err).Fatal("failed to add stream") } + svc := &service.Service{ DB: db, Nats: js, diff --git a/pkg/service/submissions.go b/pkg/service/submissions.go index 40fa5f2..ac529d6 100644 --- a/pkg/service/submissions.go +++ b/pkg/service/submissions.go @@ -309,7 +309,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a return err } - svc.Nats.Publish("submissions_publish_new", []byte(j)) + svc.Nats.Publish("maptest.submissions.publish.new", []byte(j)) }else{ // this is a map fix publish_fix_request := model.PublishFixRequest{ @@ -324,7 +324,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a return err } - svc.Nats.Publish("submissions_publish_fix", []byte(j)) + svc.Nats.Publish("maptest.submissions.publish.fix", []byte(j)) } return nil @@ -365,7 +365,7 @@ func (svc *Service) ActionSubmissionTriggerValidate(ctx context.Context, params return err } - svc.Nats.Publish("submissions_validate", []byte(j)) + svc.Nats.Publish("maptest.submissions.validate", []byte(j)) return nil } diff --git a/validation/src/main.rs b/validation/src/main.rs index bf3461f..137f1ae 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -9,6 +9,7 @@ mod publish_fix; pub enum StartupError{ API(api::ReqwestError), NatsConnect(async_nats::ConnectError), + NatsGetStream(async_nats::jetstream::context::GetStreamError), NatsStartup(types::NatsStartupError), GRPCConnect(tonic::transport::Error), } @@ -33,21 +34,21 @@ async fn main()->Result<(),StartupError>{ // nats let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required"); - let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; + let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; // use nats jetstream + let stream=async_nats::jetstream::new(nasty) + .get_stream("maptest").await.map_err(StartupError::NatsGetStream)?; // data-service grpc for creating map entries let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required"); let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; - // use nats jetstream - let jetstream=async_nats::jetstream::new(nasty); // connect to nats let (publish_new,publish_fix,validator)=tokio::try_join!( - publish_new::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone(),maps_grpc), - publish_fix::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone()), + publish_new::Publisher::new(stream.clone(),cookie_context.clone(),api.clone(),maps_grpc), + publish_fix::Publisher::new(stream.clone(),cookie_context.clone(),api.clone()), // clone nats here because it's dropped within the function scope, // meanining the last reference is dropped... - validator::Validator::new(jetstream.clone(),cookie_context,api) + validator::Validator::new(stream.clone(),cookie_context,api) ).map_err(StartupError::NatsStartup)?; // publisher threads diff --git a/validation/src/publish_fix.rs b/validation/src/publish_fix.rs index c119942..e89c449 100644 --- a/validation/src/publish_fix.rs +++ b/validation/src/publish_fix.rs @@ -26,13 +26,13 @@ pub struct Publisher{ } impl Publisher{ pub async fn new( - nats:async_nats::jetstream::Context, + stream:async_nats::jetstream::stream::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, )->Result{ Ok(Self{ - messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)? - .create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + filter_subject:"maptest.submissions.publish.fix".to_owned(), ..Default::default() }).await.map_err(NatsStartupError::ConsumerCreateStrict)? .messages().await.map_err(NatsStartupError::Stream)?, diff --git a/validation/src/publish_new.rs b/validation/src/publish_new.rs index 1a81c62..b0642a1 100644 --- a/validation/src/publish_new.rs +++ b/validation/src/publish_new.rs @@ -29,14 +29,14 @@ pub struct Publisher{ } impl Publisher{ pub async fn new( - nats:async_nats::jetstream::Context, + stream:async_nats::jetstream::stream::Stream, roblox_cookie:rbx_asset::cookie::CookieContext, api:api::Context, maps_grpc:crate::types::MapsServiceClient, )->Result{ Ok(Self{ - messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)? - .create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + filter_subject:"maptest.submissions.publish.new".to_owned(), ..Default::default() }).await.map_err(NatsStartupError::ConsumerCreateStrict)? .messages().await.map_err(NatsStartupError::Stream)?, diff --git a/validation/src/types.rs b/validation/src/types.rs index 61016b1..7403331 100644 --- a/validation/src/types.rs +++ b/validation/src/types.rs @@ -5,7 +5,6 @@ pub type MessageResult=ResultResult{ Ok(Self{ - messages:nats.get_stream("submissions_validate").await.map_err(NatsStartupError::GetStream)? - .create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + filter_subject:"maptest.submissions.validate".to_owned(), ..Default::default() }).await.map_err(NatsStartupError::ConsumerCreateStrict)? .messages().await.map_err(NatsStartupError::Stream)?,