From 07e85776bba3d2bba7209621b26e9522bf89cb90 Mon Sep 17 00:00:00 2001 From: Quaternions Date: Fri, 20 Oct 2023 18:59:15 -0700 Subject: [PATCH] explore worker pool ideas --- src/worker.rs | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/src/worker.rs b/src/worker.rs index 8dab534..877fed3 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -163,6 +163,83 @@ impl INWorker{ } } +//worker pools work by cloning a mpsc and passing it into the thread, the thread sends its results through that +//worker pools have a master thread that manages the pool so that the work submission thread does not need to implement async + +pub struct QQWorkerPool{ + sender:mpsc::Sender, + queue:Arc>>, +} + +impl QQWorkerPool{ + pub fn newValue+Send+'static>(pool_size:usize,f:F)->Self{ + let (task_sender,task_receiver)=mpsc::channel::(); + let (value_sender,value_receiver)=mpsc::channel::(); + let ret=Self{ + sender:task_sender, + queue:Arc::new(Mutex::new(std::collections::VecDeque::new())), + }; + let mut queue_collect=ret.queue.clone(); + let mut worker_senders=Vec::with_capacity(pool_size); + let mut active_workers_collect=Arc::new(Mutex::new(0usize)); + let mut active_workers_dispatch=active_workers_collect.clone(); + let mut condvar_collect=Arc::new(Condvar::new()); + let mut condvar_dispatch=condvar_collect.clone(); + //task dispatch thread + thread::spawn(move ||{ + loop{ + //block if no workers are available, value collection thread will notify + let n=*active_workers_dispatch.lock(); + if n==pool_size{ + //wait for notifiy + condvar_dispatch.wait(&mut active_workers_dispatch.lock()); + } + match task_receiver.recv(){ + Ok(task)=>{ + *active_workers_dispatch.lock()+=1; + //workers are never full here + //else if workers busy: spawn a new worker + //else: send task to an available worker + } + Err(_)=>{ + println!("Dispatch stopping.",); + break; + } + } + } + }); + //value collection thread (bad idea, put this logic in each thread) + thread::spawn(move ||{ + loop{ + match value_receiver.recv(){ + Ok(value)=>{ + //maybe I can be smart with this as a signal that a worker finished a task + let n=*active_workers_collect.lock(); + *active_workers_collect.lock()-=1; + if n==pool_size{ + condvar_collect.notify_one(); + } + (*queue_collect.lock()).push_front(value); + } + Err(_)=>{ + println!("Collection stopping.",); + break; + } + } + } + }); + ret + } + + pub fn send(&self,task:Task)->Result<(),mpsc::SendError>{ + self.sender.send(task) + } + + pub fn get(&self)->Option{ + (*self.queue.lock()).pop_back() + } +} + #[test]//How to run this test with printing: cargo test --release -- --nocapture fn test_worker() { println!("hiiiii");