maps-service: nats jetstream

This commit is contained in:
Quaternions 2024-12-10 19:30:14 -08:00
parent ecaff95ca7
commit 7f1054d509
3 changed files with 22 additions and 5 deletions

View File

@ -87,9 +87,26 @@ func serve(ctx *cli.Context) error {
if err != nil { if err != nil {
log.WithError(err).Fatal("failed to connect nats") log.WithError(err).Fatal("failed to connect nats")
} }
js, err := nc.JetStream()
if err != nil {
log.WithError(err).Fatal("failed to start jetstream")
}
// this should be somewhere else but whatever
_, err = js.AddStream(&nats.StreamConfig{Name:"submissions_validate"})
if err != nil {
log.WithError(err).Fatal("failed to add stream submissions_validate")
}
_, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_new"})
if err != nil {
log.WithError(err).Fatal("failed to add stream submissions_publish_new")
}
_, err = js.AddStream(&nats.StreamConfig{Name:"submissions_publish_fix"})
if err != nil {
log.WithError(err).Fatal("failed to add stream submissions_publish_fix")
}
svc := &service.Service{ svc := &service.Service{
DB: db, DB: db,
Nats: nc, Nats: js,
} }
conn, err := grpc.Dial(ctx.String("auth-rpc-host"), grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := grpc.Dial(ctx.String("auth-rpc-host"), grpc.WithTransportCredentials(insecure.NewCredentials()))

View File

@ -17,7 +17,7 @@ var (
type Service struct { type Service struct {
DB datastore.Datastore DB datastore.Datastore
Nats *nats.Conn Nats nats.JetStreamContext
} }
// NewError creates *ErrorStatusCode from error returned by handler. // NewError creates *ErrorStatusCode from error returned by handler.

View File

@ -309,7 +309,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a
return err return err
} }
svc.Nats.Publish("publish_new", []byte(j)) svc.Nats.Publish("submissions_publish_new", []byte(j))
}else{ }else{
// this is a map fix // this is a map fix
publish_fix_request := model.PublishFixRequest{ publish_fix_request := model.PublishFixRequest{
@ -324,7 +324,7 @@ func (svc *Service) ActionSubmissionTriggerPublish(ctx context.Context, params a
return err return err
} }
svc.Nats.Publish("publish_fix", []byte(j)) svc.Nats.Publish("submissions_publish_fix", []byte(j))
} }
return nil return nil
@ -365,7 +365,7 @@ func (svc *Service) ActionSubmissionTriggerValidate(ctx context.Context, params
return err return err
} }
svc.Nats.Publish("validate", []byte(j)) svc.Nats.Publish("submissions_validate", []byte(j))
return nil return nil
} }