parent
ffcba57408
commit
3404251c14
@ -411,7 +411,7 @@ func (svc *Service) ActionSubmissionTriggerUpload(ctx context.Context, params ap
|
||||
return err
|
||||
}
|
||||
|
||||
svc.Nats.Publish("maptest.submissions.uploadnew", []byte(j))
|
||||
svc.Nats.Publish("maptest.submissions.upload", []byte(j))
|
||||
} else {
|
||||
// refuse to operate
|
||||
return ErrUploadedAssetIDAlreadyExists
|
||||
|
@ -52,7 +52,7 @@ async fn main()->Result<(),StartupError>{
|
||||
.get_or_create_consumer("validation",async_nats::jetstream::consumer::pull::Config{
|
||||
name:Some("validation".to_owned()),
|
||||
durable_name:Some("validation".to_owned()),
|
||||
filter_subject:"maptest.submissions.>".to_owned(),
|
||||
filter_subject:"maptest.>".to_owned(),
|
||||
..Default::default()
|
||||
}).await.map_err(StartupError::NatsConsumer)?
|
||||
.messages().await.map_err(StartupError::NatsStream)
|
||||
|
@ -44,8 +44,8 @@ impl MessageHandler{
|
||||
let message=message_result.map_err(HandleMessageError::Messages)?;
|
||||
message.double_ack().await.map_err(HandleMessageError::DoubleAck)?;
|
||||
match message.subject.as_str(){
|
||||
"maptest.submissions.uploadnew"=>self.upload_new.upload(from_slice(&message.payload)?).await.map_err(HandleMessageError::UploadNew),
|
||||
"maptest.submissions.uploadfix"=>self.upload_fix.upload(from_slice(&message.payload)?).await.map_err(HandleMessageError::UploadFix),
|
||||
"maptest.mapfixes.upload"=>self.upload_fix.upload(from_slice(&message.payload)?).await.map_err(HandleMessageError::UploadFix),
|
||||
"maptest.submissions.upload"=>self.upload_new.upload(from_slice(&message.payload)?).await.map_err(HandleMessageError::UploadNew),
|
||||
"maptest.submissions.validate"=>self.validator.validate(from_slice(&message.payload)?).await.map_err(HandleMessageError::Validation),
|
||||
other=>Err(HandleMessageError::UnknownSubject(other.to_owned()))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user