nats: work queue policy

This commit is contained in:
Quaternions 2024-12-11 22:58:48 -08:00
parent eea57786ff
commit 95e9b4d8b3
5 changed files with 15 additions and 7 deletions

View File

@ -95,6 +95,7 @@ func serve(ctx *cli.Context) error {
_, err = js.AddStream(&nats.StreamConfig{ _, err = js.AddStream(&nats.StreamConfig{
Name:"maptest", Name:"maptest",
Subjects: []string{"maptest.>"}, Subjects: []string{"maptest.>"},
Retention: nats.WorkQueuePolicy,
}) })
if err != nil { if err != nil {
log.WithError(err).Fatal("failed to add stream") log.WithError(err).Fatal("failed to add stream")

View File

@ -31,10 +31,12 @@ impl Publisher{
api:api::Context, api:api::Context,
)->Result<Self,NatsStartupError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ messages:stream.get_or_create_consumer("publish_fix",async_nats::jetstream::consumer::pull::Config{
name:Some("publish_fix".to_owned()),
durable_name:Some("publish_fix".to_owned()),
filter_subject:"maptest.submissions.publish.fix".to_owned(), filter_subject:"maptest.submissions.publish.fix".to_owned(),
..Default::default() ..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)? }).await.map_err(NatsStartupError::Consumer)?
.messages().await.map_err(NatsStartupError::Stream)?, .messages().await.map_err(NatsStartupError::Stream)?,
roblox_cookie, roblox_cookie,
api, api,

View File

@ -35,10 +35,12 @@ impl Publisher{
maps_grpc:crate::types::MapsServiceClient, maps_grpc:crate::types::MapsServiceClient,
)->Result<Self,NatsStartupError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ messages:stream.get_or_create_consumer("publish_new",async_nats::jetstream::consumer::pull::Config{
name:Some("publish_new".to_owned()),
durable_name:Some("publish_new".to_owned()),
filter_subject:"maptest.submissions.publish.new".to_owned(), filter_subject:"maptest.submissions.publish.new".to_owned(),
..Default::default() ..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)? }).await.map_err(NatsStartupError::Consumer)?
.messages().await.map_err(NatsStartupError::Stream)?, .messages().await.map_err(NatsStartupError::Stream)?,
roblox_cookie, roblox_cookie,
api, api,

View File

@ -5,7 +5,7 @@ pub type MessageResult=Result<async_nats::jetstream::Message,async_nats::jetstre
#[derive(Debug)] #[derive(Debug)]
pub enum NatsStartupError{ pub enum NatsStartupError{
ConsumerCreateStrict(async_nats::jetstream::stream::ConsumerCreateStrictError), Consumer(async_nats::jetstream::stream::ConsumerError),
Stream(async_nats::jetstream::consumer::StreamError), Stream(async_nats::jetstream::consumer::StreamError),
} }
impl std::fmt::Display for NatsStartupError{ impl std::fmt::Display for NatsStartupError{

View File

@ -49,10 +49,13 @@ impl Validator{
api:api::Context, api:api::Context,
)->Result<Self,NatsStartupError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ messages:stream.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{
name:Some("validation".to_owned()),
durable_name:Some("validation".to_owned()),
ack_policy:async_nats::jetstream::consumer::AckPolicy::Explicit,
filter_subject:"maptest.submissions.validate".to_owned(), filter_subject:"maptest.submissions.validate".to_owned(),
..Default::default() ..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)? }).await.map_err(NatsStartupError::Consumer)?
.messages().await.map_err(NatsStartupError::Stream)?, .messages().await.map_err(NatsStartupError::Stream)?,
roblox_cookie, roblox_cookie,
api, api,