nats: use subjects

This commit is contained in:
Quaternions 2024-12-11 21:07:51 -08:00
parent 64d6fe6059
commit eea57786ff
7 changed files with 26 additions and 30 deletions

View File

@ -91,19 +91,15 @@ func serve(ctx *cli.Context) error {
if err != nil { if err != nil {
log.WithError(err).Fatal("failed to start jetstream") log.WithError(err).Fatal("failed to start jetstream")
} }
// this should be somewhere else but whatever
_, err = js.AddStream(&nats.StreamConfig{Name:"submissions_validate"}) _, err = js.AddStream(&nats.StreamConfig{
Name:"maptest",
Subjects: []string{"maptest.>"},
})
if err != nil { if err != nil {
log.WithError(err).Fatal("failed to add stream submissions_validate") log.WithError(err).Fatal("failed to add stream")
}
_, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_new"})
if err != nil {
log.WithError(err).Fatal("failed to add stream submissions_publish_new")
}
_, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_fix"})
if err != nil {
log.WithError(err).Fatal("failed to add stream submissions_publish_fix")
} }
svc := &service.Service{ svc := &service.Service{
DB: db, DB: db,
Nats: js, Nats: js,

View File

@ -309,7 +309,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a
return err return err
} }
svc.Nats.Publish("submissions_publish_new", []byte(j)) svc.Nats.Publish("maptest.submissions.publish.new", []byte(j))
}else{ }else{
// this is a map fix // this is a map fix
publish_fix_request := model.PublishFixRequest{ publish_fix_request := model.PublishFixRequest{
@ -324,7 +324,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a
return err return err
} }
svc.Nats.Publish("submissions_publish_fix", []byte(j)) svc.Nats.Publish("maptest.submissions.publish.fix", []byte(j))
} }
return nil return nil
@ -365,7 +365,7 @@ func (svc *Service) ActionSubmissionTriggerValidate(ctx context.Context, params
return err return err
} }
svc.Nats.Publish("submissions_validate", []byte(j)) svc.Nats.Publish("maptest.submissions.validate", []byte(j))
return nil return nil
} }

View File

@ -9,6 +9,7 @@ mod publish_fix;
pub enum StartupError{ pub enum StartupError{
API(api::ReqwestError), API(api::ReqwestError),
NatsConnect(async_nats::ConnectError), NatsConnect(async_nats::ConnectError),
NatsGetStream(async_nats::jetstream::context::GetStreamError),
NatsStartup(types::NatsStartupError), NatsStartup(types::NatsStartupError),
GRPCConnect(tonic::transport::Error), GRPCConnect(tonic::transport::Error),
} }
@ -33,21 +34,21 @@ async fn main()->Result<(),StartupError>{
// nats // nats
let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required"); let nats_host=std::env::var("NATS_HOST").expect("NATS_HOST env required");
let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; let nasty=async_nats::connect(nats_host).await.map_err(StartupError::NatsConnect)?; // use nats jetstream
let stream=async_nats::jetstream::new(nasty)
.get_stream("maptest").await.map_err(StartupError::NatsGetStream)?;
// data-service grpc for creating map entries // data-service grpc for creating map entries
let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required"); let data_host=std::env::var("DATA_HOST").expect("DATA_HOST env required");
let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?; let maps_grpc=crate::types::MapsServiceClient::connect(data_host).await.map_err(StartupError::GRPCConnect)?;
// use nats jetstream
let jetstream=async_nats::jetstream::new(nasty);
// connect to nats // connect to nats
let (publish_new,publish_fix,validator)=tokio::try_join!( let (publish_new,publish_fix,validator)=tokio::try_join!(
publish_new::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone(),maps_grpc), publish_new::Publisher::new(stream.clone(),cookie_context.clone(),api.clone(),maps_grpc),
publish_fix::Publisher::new(jetstream.clone(),cookie_context.clone(),api.clone()), publish_fix::Publisher::new(stream.clone(),cookie_context.clone(),api.clone()),
// clone nats here because it's dropped within the function scope, // clone nats here because it's dropped within the function scope,
// meanining the last reference is dropped... // meanining the last reference is dropped...
validator::Validator::new(jetstream.clone(),cookie_context,api) validator::Validator::new(stream.clone(),cookie_context,api)
).map_err(StartupError::NatsStartup)?; ).map_err(StartupError::NatsStartup)?;
// publisher threads // publisher threads

View File

@ -26,13 +26,13 @@ pub struct Publisher{
} }
impl Publisher{ impl Publisher{
pub async fn new( pub async fn new(
nats:async_nats::jetstream::Context, stream:async_nats::jetstream::stream::Stream,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
)->Result<Self,NatsStartupError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)? messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{
.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ filter_subject:"maptest.submissions.publish.fix".to_owned(),
..Default::default() ..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)? }).await.map_err(NatsStartupError::ConsumerCreateStrict)?
.messages().await.map_err(NatsStartupError::Stream)?, .messages().await.map_err(NatsStartupError::Stream)?,

View File

@ -29,14 +29,14 @@ pub struct Publisher{
} }
impl Publisher{ impl Publisher{
pub async fn new( pub async fn new(
nats:async_nats::jetstream::Context, stream:async_nats::jetstream::stream::Stream,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
maps_grpc:crate::types::MapsServiceClient, maps_grpc:crate::types::MapsServiceClient,
)->Result<Self,NatsStartupError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
messages:nats.get_stream("submissions_publish_new").await.map_err(NatsStartupError::GetStream)? messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{
.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ filter_subject:"maptest.submissions.publish.new".to_owned(),
..Default::default() ..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)? }).await.map_err(NatsStartupError::ConsumerCreateStrict)?
.messages().await.map_err(NatsStartupError::Stream)?, .messages().await.map_err(NatsStartupError::Stream)?,

View File

@ -5,7 +5,6 @@ pub type MessageResult=Result<async_nats::jetstream::Message,async_nats::jetstre
#[derive(Debug)] #[derive(Debug)]
pub enum NatsStartupError{ pub enum NatsStartupError{
GetStream(async_nats::jetstream::context::GetStreamError),
ConsumerCreateStrict(async_nats::jetstream::stream::ConsumerCreateStrictError), ConsumerCreateStrict(async_nats::jetstream::stream::ConsumerCreateStrictError),
Stream(async_nats::jetstream::consumer::StreamError), Stream(async_nats::jetstream::consumer::StreamError),
} }

View File

@ -44,13 +44,13 @@ pub struct Validator{
impl Validator{ impl Validator{
pub async fn new( pub async fn new(
nats:async_nats::jetstream::Context, stream:async_nats::jetstream::stream::Stream,
roblox_cookie:rbx_asset::cookie::CookieContext, roblox_cookie:rbx_asset::cookie::CookieContext,
api:api::Context, api:api::Context,
)->Result<Self,NatsStartupError>{ )->Result<Self,NatsStartupError>{
Ok(Self{ Ok(Self{
messages:nats.get_stream("submissions_validate").await.map_err(NatsStartupError::GetStream)? messages:stream.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{
.create_consumer_strict(async_nats::jetstream::consumer::pull::Config{ filter_subject:"maptest.submissions.validate".to_owned(),
..Default::default() ..Default::default()
}).await.map_err(NatsStartupError::ConsumerCreateStrict)? }).await.map_err(NatsStartupError::ConsumerCreateStrict)?
.messages().await.map_err(NatsStartupError::Stream)?, .messages().await.map_err(NatsStartupError::Stream)?,