From 2ee70138d1d29ed724634a1e2162773259a15ebc Mon Sep 17 00:00:00 2001 From: Quaternions <krakow20@gmail.com> Date: Tue, 24 Oct 2023 23:12:43 -0700 Subject: [PATCH] true gamer --- src/worker.rs | 56 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index 8dab534..a334681 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -104,17 +104,15 @@ QN = WorkerDescription{ } */ //None Output Worker does all its work internally from the perspective of the work submitter -pub struct QNWorker<Task:Send> { +pub struct QNWorker<'a,Task:Send>{ sender: mpsc::Sender<Task>, + handle:thread::ScopedJoinHandle<'a,()>, } -impl<Task:Send+'static> QNWorker<Task>{ - pub fn new<F:FnMut(Task)+Send+'static>(mut f:F)->Self{ +impl<'a,Task:Send+'a> QNWorker<'a,Task>{ + pub fn new<F:FnMut(Task)+Send+'a>(scope:&'a thread::Scope<'a,'_>,mut f:F)->QNWorker<'a,Task>{ let (sender,receiver)=mpsc::channel::<Task>(); - let ret=Self { - sender, - }; - thread::spawn(move ||{ + let handle=scope.spawn(move ||{ loop { match receiver.recv() { Ok(task)=>f(task), @@ -125,7 +123,10 @@ impl<Task:Send+'static> QNWorker<Task>{ } } }); - ret + Self{ + sender, + handle, + } } pub fn send(&self,task:Task)->Result<(),mpsc::SendError<Task>>{ self.sender.send(task) @@ -139,27 +140,36 @@ IN = WorkerDescription{ } */ //Inputs are dropped if the worker is busy -pub struct INWorker<Task:Clone>{ - input:Arc<(Mutex<Task>,Condvar)>, +pub struct INWorker<'a,Task:Send>{ + sender: mpsc::SyncSender<Task>, + handle:thread::ScopedJoinHandle<'a,()>, } -impl<Task:Clone+Send+'static> INWorker<Task>{ - pub fn new<F:FnMut(Task)+Send+'static>(task:Task,mut f:F)->Self{ - let ret=Self { - input:Arc::new((Mutex::new(task),Condvar::new())), - }; - let input=ret.input.clone(); - thread::spawn(move ||{ +impl<'a,Task:Send+'a> INWorker<'a,Task>{ + pub fn new<F:FnMut(Task)+Send+'a>(scope:&'a thread::Scope<'a,'_>,mut f:F)->INWorker<'a,Task>{ + let (sender,receiver)=mpsc::sync_channel::<Task>(1); + let handle=scope.spawn(move ||{ loop { - input.1.wait(&mut input.0.lock()); - f(input.0.lock().clone()); + match receiver.recv() { + Ok(task)=>f(task), + Err(_)=>{ + println!("Worker stopping.",); + break; + } + } } }); - ret + Self{ + sender, + handle, + } } - pub fn send(&self,task:Task){ - *self.input.0.lock()=task; - self.input.1.notify_one(); + //blocking! + pub fn blocking_send(&self,task:Task)->Result<(), mpsc::SendError<Task>>{ + self.sender.send(task) + } + pub fn send(&self,task:Task)->Result<(), mpsc::TrySendError<Task>>{ + self.sender.try_send(task) } }