From 7f1054d509eee9465bab94b8e335b7e4f9f264d2 Mon Sep 17 00:00:00 2001 From: Quaternions Date: Tue, 10 Dec 2024 19:30:14 -0800 Subject: [PATCH] maps-service: nats jetstream --- pkg/cmds/serve.go | 19 ++++++++++++++++++- pkg/service/service.go | 2 +- pkg/service/submissions.go | 6 +++--- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/cmds/serve.go b/pkg/cmds/serve.go index 6e21946..108c270 100644 --- a/pkg/cmds/serve.go +++ b/pkg/cmds/serve.go @@ -87,9 +87,26 @@ func serve(ctx *cli.Context) error { if err != nil { log.WithError(err).Fatal("failed to connect nats") } + js, err := nc.JetStream() + if err != nil { + log.WithError(err).Fatal("failed to start jetstream") + } + // this should be somewhere else but whatever + _, err = js.AddStream(&nats.StreamConfig{Name:"submissions_validate"}) + if err != nil { + log.WithError(err).Fatal("failed to add stream submissions_validate") + } + _, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_new"}) + if err != nil { + log.WithError(err).Fatal("failed to add stream submissions_publish_new") + } + _, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_fix"}) + if err != nil { + log.WithError(err).Fatal("failed to add stream submissions_publish_fix") + } svc := &service.Service{ DB: db, - Nats: nc, + Nats: js, } conn, err := grpc.Dial(ctx.String("auth-rpc-host"), grpc.WithTransportCredentials(insecure.NewCredentials())) diff --git a/pkg/service/service.go b/pkg/service/service.go index 7268aa5..c632969 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -17,7 +17,7 @@ var ( type Service struct { DB datastore.Datastore - Nats *nats.Conn + Nats nats.JetStreamContext } // NewError creates *ErrorStatusCode from error returned by handler. diff --git a/pkg/service/submissions.go b/pkg/service/submissions.go index af93ca0..40fa5f2 100644 --- a/pkg/service/submissions.go +++ b/pkg/service/submissions.go @@ -309,7 +309,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a return err } - svc.Nats.Publish("publish_new", []byte(j)) + svc.Nats.Publish("submissions_publish_new", []byte(j)) }else{ // this is a map fix publish_fix_request := model.PublishFixRequest{ @@ -324,7 +324,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a return err } - svc.Nats.Publish("publish_fix", []byte(j)) + svc.Nats.Publish("submissions_publish_fix", []byte(j)) } return nil @@ -365,7 +365,7 @@ func (svc *Service) ActionSubmissionTriggerValidate(ctx context.Context, params return err } - svc.Nats.Publish("validate", []byte(j)) + svc.Nats.Publish("submissions_validate", []byte(j)) return nil }