diff --git a/pkg/cmds/serve.go b/pkg/cmds/serve.go index 2713f1a..5003cc6 100644 --- a/pkg/cmds/serve.go +++ b/pkg/cmds/serve.go @@ -95,6 +95,7 @@ func serve(ctx *cli.Context) error { _, err = js.AddStream(&nats.StreamConfig{ Name:"maptest", Subjects: []string{"maptest.>"}, + Retention: nats.WorkQueuePolicy, }) if err != nil { log.WithError(err).Fatal("failed to add stream") diff --git a/validation/src/publish_fix.rs b/validation/src/publish_fix.rs index e89c449..a6dca00 100644 --- a/validation/src/publish_fix.rs +++ b/validation/src/publish_fix.rs @@ -31,10 +31,12 @@ impl Publisher{ api:api::Context, )->Result{ Ok(Self{ - messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + messages:stream.get_or_create_consumer("publish_fix",async_nats::jetstream::consumer::pull::Config{ + name:Some("publish_fix".to_owned()), + durable_name:Some("publish_fix".to_owned()), filter_subject:"maptest.submissions.publish.fix".to_owned(), ..Default::default() - }).await.map_err(NatsStartupError::ConsumerCreateStrict)? + }).await.map_err(NatsStartupError::Consumer)? .messages().await.map_err(NatsStartupError::Stream)?, roblox_cookie, api, diff --git a/validation/src/publish_new.rs b/validation/src/publish_new.rs index b0642a1..48ab774 100644 --- a/validation/src/publish_new.rs +++ b/validation/src/publish_new.rs @@ -35,10 +35,12 @@ impl Publisher{ maps_grpc:crate::types::MapsServiceClient, )->Result{ Ok(Self{ - messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + messages:stream.get_or_create_consumer("publish_new",async_nats::jetstream::consumer::pull::Config{ + name:Some("publish_new".to_owned()), + durable_name:Some("publish_new".to_owned()), filter_subject:"maptest.submissions.publish.new".to_owned(), ..Default::default() - }).await.map_err(NatsStartupError::ConsumerCreateStrict)? + }).await.map_err(NatsStartupError::Consumer)? .messages().await.map_err(NatsStartupError::Stream)?, roblox_cookie, api, diff --git a/validation/src/types.rs b/validation/src/types.rs index 7403331..e41f9b4 100644 --- a/validation/src/types.rs +++ b/validation/src/types.rs @@ -5,7 +5,7 @@ pub type MessageResult=ResultResult{ Ok(Self{ - messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ + messages:stream.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{ + name:Some("validation".to_owned()), + durable_name:Some("validation".to_owned()), + ack_policy:async_nats::jetstream::consumer::AckPolicy::Explicit, filter_subject:"maptest.submissions.validate".to_owned(), ..Default::default() - }).await.map_err(NatsStartupError::ConsumerCreateStrict)? + }).await.map_err(NatsStartupError::Consumer)? .messages().await.map_err(NatsStartupError::Stream)?, roblox_cookie, api,