diff --git a/validation/src/main.rs b/validation/src/main.rs index 30b443d..5e59e43 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -20,6 +20,7 @@ pub enum StartupError{ NatsConnect(async_nats::ConnectError), NatsGetStream(async_nats::jetstream::context::GetStreamError), NatsConsumer(async_nats::jetstream::stream::ConsumerError), + NatsConsumerUpdate(async_nats::jetstream::stream::ConsumerUpdateError), NatsStream(async_nats::jetstream::consumer::StreamError), } impl std::fmt::Display for StartupError{ @@ -52,17 +53,32 @@ async fn main()->Result<(),StartupError>{ // nats let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required"); let nats_fut=async{ + const STREAM_NAME:&str="maptest"; + const DURABLE_NAME:&str="validation"; + const FILTER_SUBJECT:&str="maptest.>"; + + let nats_config=async_nats::jetstream::consumer::pull::Config{ + name:Some(DURABLE_NAME.to_owned()), + durable_name:Some(DURABLE_NAME.to_owned()), + filter_subject:FILTER_SUBJECT.to_owned(), + ..Default::default() + }; + let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; + // use nats jetstream - async_nats::jetstream::new(nasty) - .get_stream("maptest").await.map_err(StartupError::NatsGetStream)? - .get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{ - name:Some("validation".to_owned()), - durable_name:Some("validation".to_owned()), - filter_subject:"maptest.>".to_owned(), - ..Default::default() - }).await.map_err(StartupError::NatsConsumer)? - .messages().await.map_err(StartupError::NatsStream) + let stream=async_nats::jetstream::new(nasty) + .get_stream(STREAM_NAME).await.map_err(StartupError::NatsGetStream)?; + + let consumer=stream.get_or_create_consumer(DURABLE_NAME,nats_config.clone()).await.map_err(StartupError::NatsConsumer)?; + + // check if config matches expected config + if consumer.cached_info().config.filter_subject!=FILTER_SUBJECT{ + stream.update_consumer(nats_config).await.map_err(StartupError::NatsConsumerUpdate)?; + } + + // only need messages + consumer.messages().await.map_err(StartupError::NatsStream) }; let message_handler=message_handler::MessageHandler::new(cookie_context,group_id,api);