From 727e358cf9a604b02a7a0ec73ef1ea39a9a236ae Mon Sep 17 00:00:00 2001 From: Quaternions Date: Thu, 12 Dec 2024 17:40:40 -0800 Subject: [PATCH] validation: connect to nats and grpc concurrently --- validation/src/main.rs | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/validation/src/main.rs b/validation/src/main.rs index fec0420..11dff45 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -40,26 +40,32 @@ async fn main()->Result<(),StartupError>{ // nats let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required"); - let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; // use nats jetstream - let mut messages=async_nats::jetstream::new(nasty) - .get_stream("maptest").await.map_err(StartupError::NatsGetStream)? - .get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{ - name:Some("validation".to_owned()), - durable_name:Some("validation".to_owned()), - filter_subject:"maptest.submissions.>".to_owned(), - ..Default::default() - }).await.map_err(StartupError::NatsConsumer)? - .messages().await.map_err(StartupError::NatsStream)?; + let nats_fut=async{ + let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; + // use nats jetstream + async_nats::jetstream::new(nasty) + .get_stream("maptest").await.map_err(StartupError::NatsGetStream)? + .get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{ + name:Some("validation".to_owned()), + durable_name:Some("validation".to_owned()), + filter_subject:"maptest.submissions.>".to_owned(), + ..Default::default() + }).await.map_err(StartupError::NatsConsumer)? + .messages().await.map_err(StartupError::NatsStream) + }; // data-service grpc for creating map entries let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required"); - let maps_grpc=crate::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; + let message_handler_fut=async{ + let maps_grpc=crate::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; + Ok(message_handler::MessageHandler::new(cookie_context,api,maps_grpc)) + }; // Create a signal listener for SIGTERM - let mut sig_term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).expect("Failed to create SIGTERM signal listener"); + let mut sig_term=tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).expect("Failed to create SIGTERM signal listener"); - // the guy - let message_handler=message_handler::MessageHandler::new(cookie_context,api,maps_grpc); + // run futures + let (mut messages,message_handler)=tokio::try_join!(nats_fut,message_handler_fut)?; // nats consumer thread tokio::spawn(async move{