forked from StrafesNET/strafe-project
explore worker pool ideas
This commit is contained in:
parent
2356286dc2
commit
07e85776bb
@ -163,6 +163,83 @@ impl<Task:Clone+Send+'static> INWorker<Task>{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//worker pools work by cloning a mpsc and passing it into the thread, the thread sends its results through that
|
||||||
|
//worker pools have a master thread that manages the pool so that the work submission thread does not need to implement async
|
||||||
|
|
||||||
|
pub struct QQWorkerPool<Task:Send,Value:Send>{
|
||||||
|
sender:mpsc::Sender<Task>,
|
||||||
|
queue:Arc<Mutex<std::collections::VecDeque<Value>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Task:Send+'static,Value:Send+'static> QQWorkerPool<Task,Value>{
|
||||||
|
pub fn new<F:Fn(Task)->Value+Send+'static>(pool_size:usize,f:F)->Self{
|
||||||
|
let (task_sender,task_receiver)=mpsc::channel::<Task>();
|
||||||
|
let (value_sender,value_receiver)=mpsc::channel::<Value>();
|
||||||
|
let ret=Self{
|
||||||
|
sender:task_sender,
|
||||||
|
queue:Arc::new(Mutex::new(std::collections::VecDeque::new())),
|
||||||
|
};
|
||||||
|
let mut queue_collect=ret.queue.clone();
|
||||||
|
let mut worker_senders=Vec::with_capacity(pool_size);
|
||||||
|
let mut active_workers_collect=Arc::new(Mutex::new(0usize));
|
||||||
|
let mut active_workers_dispatch=active_workers_collect.clone();
|
||||||
|
let mut condvar_collect=Arc::new(Condvar::new());
|
||||||
|
let mut condvar_dispatch=condvar_collect.clone();
|
||||||
|
//task dispatch thread
|
||||||
|
thread::spawn(move ||{
|
||||||
|
loop{
|
||||||
|
//block if no workers are available, value collection thread will notify
|
||||||
|
let n=*active_workers_dispatch.lock();
|
||||||
|
if n==pool_size{
|
||||||
|
//wait for notifiy
|
||||||
|
condvar_dispatch.wait(&mut active_workers_dispatch.lock());
|
||||||
|
}
|
||||||
|
match task_receiver.recv(){
|
||||||
|
Ok(task)=>{
|
||||||
|
*active_workers_dispatch.lock()+=1;
|
||||||
|
//workers are never full here
|
||||||
|
//else if workers busy: spawn a new worker
|
||||||
|
//else: send task to an available worker
|
||||||
|
}
|
||||||
|
Err(_)=>{
|
||||||
|
println!("Dispatch stopping.",);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
//value collection thread (bad idea, put this logic in each thread)
|
||||||
|
thread::spawn(move ||{
|
||||||
|
loop{
|
||||||
|
match value_receiver.recv(){
|
||||||
|
Ok(value)=>{
|
||||||
|
//maybe I can be smart with this as a signal that a worker finished a task
|
||||||
|
let n=*active_workers_collect.lock();
|
||||||
|
*active_workers_collect.lock()-=1;
|
||||||
|
if n==pool_size{
|
||||||
|
condvar_collect.notify_one();
|
||||||
|
}
|
||||||
|
(*queue_collect.lock()).push_front(value);
|
||||||
|
}
|
||||||
|
Err(_)=>{
|
||||||
|
println!("Collection stopping.",);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&self,task:Task)->Result<(),mpsc::SendError<Task>>{
|
||||||
|
self.sender.send(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self)->Option<Value>{
|
||||||
|
(*self.queue.lock()).pop_back()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]//How to run this test with printing: cargo test --release -- --nocapture
|
#[test]//How to run this test with printing: cargo test --release -- --nocapture
|
||||||
fn test_worker() {
|
fn test_worker() {
|
||||||
println!("hiiiii");
|
println!("hiiiii");
|
||||||
|
Loading…
Reference in New Issue
Block a user