validation: detect nats filter_subject mismatch and update consumer
This commit is contained in:
parent
ec15c1f2e5
commit
66890ccd44
@ -20,6 +20,7 @@ pub enum StartupError{
|
||||
NatsConnect(async_nats::ConnectError),
|
||||
NatsGetStream(async_nats::jetstream::context::GetStreamError),
|
||||
NatsConsumer(async_nats::jetstream::stream::ConsumerError),
|
||||
NatsConsumerUpdate(async_nats::jetstream::stream::ConsumerUpdateError),
|
||||
NatsStream(async_nats::jetstream::consumer::StreamError),
|
||||
}
|
||||
impl std::fmt::Display for StartupError{
|
||||
@ -52,17 +53,32 @@ async fn main()->Result<(),StartupError>{
|
||||
// nats
|
||||
let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required");
|
||||
let nats_fut=async{
|
||||
const STREAM_NAME:&str="maptest";
|
||||
const DURABLE_NAME:&str="validation";
|
||||
const FILTER_SUBJECT:&str="maptest.>";
|
||||
|
||||
let nats_config=async_nats::jetstream::consumer::pull::Config{
|
||||
name:Some(DURABLE_NAME.to_owned()),
|
||||
durable_name:Some(DURABLE_NAME.to_owned()),
|
||||
filter_subject:FILTER_SUBJECT.to_owned(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?;
|
||||
|
||||
// use nats jetstream
|
||||
async_nats::jetstream::new(nasty)
|
||||
.get_stream("maptest").await.map_err(StartupError::NatsGetStream)?
|
||||
.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{
|
||||
name:Some("validation".to_owned()),
|
||||
durable_name:Some("validation".to_owned()),
|
||||
filter_subject:"maptest.>".to_owned(),
|
||||
..Default::default()
|
||||
}).await.map_err(StartupError::NatsConsumer)?
|
||||
.messages().await.map_err(StartupError::NatsStream)
|
||||
let stream=async_nats::jetstream::new(nasty)
|
||||
.get_stream(STREAM_NAME).await.map_err(StartupError::NatsGetStream)?;
|
||||
|
||||
let consumer=stream.get_or_create_consumer(DURABLE_NAME,nats_config.clone()).await.map_err(StartupError::NatsConsumer)?;
|
||||
|
||||
// check if config matches expected config
|
||||
if consumer.cached_info().config.filter_subject!=FILTER_SUBJECT{
|
||||
stream.update_consumer(nats_config).await.map_err(StartupError::NatsConsumerUpdate)?;
|
||||
}
|
||||
|
||||
// only need messages
|
||||
consumer.messages().await.map_err(StartupError::NatsStream)
|
||||
};
|
||||
|
||||
let message_handler=message_handler::MessageHandler::new(cookie_context,group_id,api);
|
||||
|
Loading…
x
Reference in New Issue
Block a user