validation: detect filter_subject mismatch and update consumer
This commit is contained in:
parent
ec15c1f2e5
commit
2516854d6f
@ -20,6 +20,7 @@ pub enum StartupError{
|
|||||||
NatsConnect(async_nats::ConnectError),
|
NatsConnect(async_nats::ConnectError),
|
||||||
NatsGetStream(async_nats::jetstream::context::GetStreamError),
|
NatsGetStream(async_nats::jetstream::context::GetStreamError),
|
||||||
NatsConsumer(async_nats::jetstream::stream::ConsumerError),
|
NatsConsumer(async_nats::jetstream::stream::ConsumerError),
|
||||||
|
NatsConsumerUpdate(async_nats::jetstream::stream::ConsumerUpdateError),
|
||||||
NatsStream(async_nats::jetstream::consumer::StreamError),
|
NatsStream(async_nats::jetstream::consumer::StreamError),
|
||||||
}
|
}
|
||||||
impl std::fmt::Display for StartupError{
|
impl std::fmt::Display for StartupError{
|
||||||
@ -52,17 +53,32 @@ async fn main()->Result<(),StartupError>{
|
|||||||
// nats
|
// nats
|
||||||
let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required");
|
let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required");
|
||||||
let nats_fut=async{
|
let nats_fut=async{
|
||||||
|
const STREAM_NAME:&str="maptest";
|
||||||
|
const DURABLE_NAME:&str="validation";
|
||||||
|
const FILTER_SUBJECT:&str="maptest.>";
|
||||||
|
|
||||||
|
let nats_config=async_nats::jetstream::consumer::pull::Config{
|
||||||
|
name:Some(DURABLE_NAME.to_owned()),
|
||||||
|
durable_name:Some(DURABLE_NAME.to_owned()),
|
||||||
|
filter_subject:FILTER_SUBJECT.to_owned(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?;
|
let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?;
|
||||||
|
|
||||||
// use nats jetstream
|
// use nats jetstream
|
||||||
async_nats::jetstream::new(nasty)
|
let stream=async_nats::jetstream::new(nasty)
|
||||||
.get_stream("maptest").await.map_err(StartupError::NatsGetStream)?
|
.get_stream(STREAM_NAME).await.map_err(StartupError::NatsGetStream)?;
|
||||||
.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{
|
|
||||||
name:Some("validation".to_owned()),
|
let consumer=stream.get_or_create_consumer(DURABLE_NAME,nats_config.clone()).await.map_err(StartupError::NatsConsumer)?;
|
||||||
durable_name:Some("validation".to_owned()),
|
|
||||||
filter_subject:"maptest.>".to_owned(),
|
// check if config matches expected config
|
||||||
..Default::default()
|
if consumer.cached_info().config.filter_subject!=FILTER_SUBJECT{
|
||||||
}).await.map_err(StartupError::NatsConsumer)?
|
stream.update_consumer(nats_config).await.map_err(StartupError::NatsConsumerUpdate)?;
|
||||||
.messages().await.map_err(StartupError::NatsStream)
|
}
|
||||||
|
|
||||||
|
// only need messages
|
||||||
|
consumer.messages().await.map_err(StartupError::NatsStream)
|
||||||
};
|
};
|
||||||
|
|
||||||
let message_handler=message_handler::MessageHandler::new(cookie_context,group_id,api);
|
let message_handler=message_handler::MessageHandler::new(cookie_context,group_id,api);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user