From 22086e772c49c28f67da9940d48c71c50180dd0b Mon Sep 17 00:00:00 2001 From: Quaternions Date: Thu, 12 Dec 2024 16:41:34 -0800 Subject: [PATCH] validation: listen for sigkill --- validation/Cargo.lock | 10 ++++++++++ validation/Cargo.toml | 2 +- validation/src/main.rs | 9 ++++++--- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/validation/Cargo.lock b/validation/Cargo.lock index 0f2dd18..1413935 100644 --- a/validation/Cargo.lock +++ b/validation/Cargo.lock @@ -1925,6 +1925,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "signatory" version = "0.27.1" @@ -2149,6 +2158,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/validation/Cargo.toml b/validation/Cargo.toml index cf1f620..cebba9f 100644 --- a/validation/Cargo.toml +++ b/validation/Cargo.toml @@ -16,5 +16,5 @@ rust-grpc = { version = "1.0.3", registry = "strafesnet" } serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.133" siphasher = "1.0.1" -tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread", "fs"] } +tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread", "fs", "signal"] } tonic = "0.12.3" diff --git a/validation/src/main.rs b/validation/src/main.rs index 137f1ae..63e535f 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -42,6 +42,9 @@ async fn main()->Result<(),StartupError>{ let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required"); let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; + // Create a signal listener for SIGTERM + let mut sig_term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).expect("Failed to create SIGTERM signal listener"); + // connect to nats let (publish_new,publish_fix,validator)=tokio::try_join!( publish_new::Publisher::new(stream.clone(),cookie_context.clone(),api.clone(),maps_grpc), @@ -51,12 +54,12 @@ async fn main()->Result<(),StartupError>{ validator::Validator::new(stream.clone(),cookie_context,api) ).map_err(StartupError::NatsStartup)?; - // publisher threads + // nats consumer threads tokio::spawn(publish_new.run()); tokio::spawn(publish_fix.run()); + tokio::spawn(validator.run()); - // run validator on the main thread indefinitely - validator.run().await; + sig_term.recv().await; Ok(()) }