wip: validator

This commit is contained in:
Quaternions 2024-12-02 22:52:01 -08:00
parent 92fe223d96
commit 1124f8adea
3 changed files with 118 additions and 11 deletions

View File

@ -8,17 +8,33 @@ enum StartupError{
Connect(async_nats::ConnectError),
Subscribe(async_nats::SubscribeError),
}
impl std::fmt::Display for StartupError{
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
write!(f,"{self:?}")
}
}
impl std::error::Error for StartupError{}
#[tokio::main]
async fn main()->Result<(),StartupError>{
// cookies and clouds
let cookie_context=rbx_asset::cookie::CookieContext::new(rbx_asset::cookie::Cookie::new("".to_owned()));
let cloud_context=rbx_asset::cloud::CloudContext::new(rbx_asset::cloud::ApiKey::new("".to_owned()));
// nats
let nasty=async_nats::connect("nats").await.map_err(StartupError::Connect)?;
// connect to nats
let (publisher,validator)=tokio::try_join!(
publisher::Publisher::new(nasty.clone()),
validator::Validator::new(nasty)
publisher::Publisher::new(nasty.clone(),cookie_context.clone(),cloud_context),
validator::Validator::new(nasty,cookie_context)
).map_err(StartupError::Subscribe)?;
// publisher thread
tokio::spawn(publisher.run());
// run validator on the main thread indefinitely
validator.run().await;
Ok(())
}

View File

@ -1,17 +1,32 @@
use futures::StreamExt;
#[derive(Debug)]
enum PublishError{
}
impl std::fmt::Display for PublishError{
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
write!(f,"{self:?}")
}
}
impl std::error::Error for PublishError{}
pub struct Publisher{
nats:async_nats::Client,
subscriber:async_nats::Subscriber,
roblox_cookie:rbx_asset::cookie::CookieContext,
roblox_cloud:rbx_asset::cloud::CloudContext,
}
impl Publisher{
pub async fn new(nats:async_nats::Client)->Result<Self,async_nats::SubscribeError>{
pub async fn new(
nats:async_nats::Client,
roblox_cookie:rbx_asset::cookie::CookieContext,
roblox_cloud:rbx_asset::cloud::CloudContext,
)->Result<Self,async_nats::SubscribeError>{
Ok(Self{
subscriber:nats.subscribe("publish").await?,
nats,
roblox_cookie,
roblox_cloud,
})
}
pub async fn run(mut self){

View File

@ -1,37 +1,113 @@
use futures::StreamExt;
use rbx_dom_weak::WeakDom;
use crate::nats_types::ValidateRequest;
struct ModelVersion{
model_id:u64,
model_version:u64,
}
enum Valid{
Untouched(WeakDom),
Modified(WeakDom),
Untouched,
Modified(ModelVersion),
}
#[allow(dead_code)]
#[derive(Debug)]
enum ValidateError{
Get(rbx_asset::cookie::GetError),
Json(serde_json::Error),
ReadDom(ReadDomError),
}
impl std::fmt::Display for ValidateError{
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
write!(f,"{self:?}")
}
}
impl std::error::Error for ValidateError{}
pub struct Validator{
nats:async_nats::Client,
subscriber:async_nats::Subscriber,
roblox_cookie:rbx_asset::cookie::CookieContext,
}
impl Validator{
pub async fn new(nats:async_nats::Client)->Result<Self,async_nats::SubscribeError>{
pub async fn new(
nats:async_nats::Client,
roblox_cookie:rbx_asset::cookie::CookieContext,
)->Result<Self,async_nats::SubscribeError>{
Ok(Self{
subscriber:nats.subscribe("validate").await?,
nats,
roblox_cookie,
})
}
pub async fn run(mut self){
while let Some(message)=self.subscriber.next().await{
self.validate(message).await
self.validate_supress_error(message).await
}
}
async fn validate(&self,message:async_nats::Message){
async fn validate_supress_error(&self,message:async_nats::Message){
match self.validate(message).await{
Ok(valid)=>{
unimplemented!();
// self.nats.publish("validated","yo it validated".into()).await.unwrap();
},
Err(e)=>{
println!("there was an error, oopsie! {e}");
}
}
}
async fn validate(&self,message:async_nats::Message)->Result<Valid,ValidateError>{
println!("validate {:?}",message);
// decode json
let validate_info:ValidateRequest=serde_json::from_slice(&message.payload).map_err(ValidateError::Json)?;
// download map
let data=self.roblox_cookie.get_asset(rbx_asset::cookie::GetAssetRequest{
asset_id:validate_info.model_id,
version:Some(validate_info.model_version),
}).await.map_err(ValidateError::Get)?;
// decode dom (slow!)
let mut dom=read_dom(&mut std::io::Cursor::new(data)).map_err(ValidateError::ReadDom)?;
// validate map
// validate(dom)
// reply with validity
// Ok(Valid::Untouched(dom))
Ok(Valid::Untouched)
}
}
#[allow(dead_code)]
#[derive(Debug)]
enum ReadDomError{
Binary(rbx_binary::DecodeError),
Xml(rbx_xml::DecodeError),
Read(std::io::Error),
Seek(std::io::Error),
UnknownFormat([u8;8]),
}
impl std::fmt::Display for ReadDomError{
fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{
write!(f,"{self:?}")
}
}
impl std::error::Error for ReadDomError{}
fn read_dom<R:std::io::Read+std::io::Seek>(input:&mut R)->Result<rbx_dom_weak::WeakDom,ReadDomError>{
let mut first_8=[0u8;8];
std::io::Read::read_exact(input,&mut first_8).map_err(ReadDomError::Read)?;
std::io::Seek::rewind(input).map_err(ReadDomError::Seek)?;
match &first_8[0..4]{
b"<rob"=>{
match &first_8[4..8]{
b"lox!"=>rbx_binary::from_reader(input).map_err(ReadDomError::Binary),
b"lox "=>rbx_xml::from_reader(input,rbx_xml::DecodeOptions::default()).map_err(ReadDomError::Xml),
_=>Err(ReadDomError::UnknownFormat(first_8)),
}
},
_=>Err(ReadDomError::UnknownFormat(first_8)),
}
}