diff --git a/src/worker.rs b/src/worker.rs index e16164a..5779a64 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,59 +1,65 @@ use std::thread; use std::sync::{mpsc,Arc}; use parking_lot::{Mutex,Condvar}; -use std::sync::atomic::{AtomicBool, Ordering}; -struct Worker { - id: usize, - receiver: Arc<(Mutex>, Condvar)>, - is_active: Arc, +struct Worker { + sender: mpsc::Sender, + receiver: Arc<(Mutex>,Condvar)>, } -impl Worker { - fn new(id: usize, receiver: Arc<(Mutex>, Condvar)>, is_active: Arc) -> Worker { - Worker { id, receiver, is_active } +impl Worker { + fn new() -> Self { + let (sender, receiver) = mpsc::channel::(); + Self { + sender, + receiver:Arc::new((Mutex::new(receiver),Condvar::new())), + } } - fn start(self) { + fn send(&self,task:Task)->Result<(), mpsc::SendError>{ + let ret=self.sender.send(task); + self.receiver.1.notify_one(); + ret + } + + fn start(&self) { + let receiver=self.receiver.clone(); thread::spawn(move || { - loop { - match self.receiver.0.lock().recv() { - Ok(task) => { - println!("Worker {} got a task: {}", self.id, task); - // Process the task - } - Err(_) => { - println!("Worker {} stopping.", self.id); - break; + loop{ + loop { + match receiver.0.lock().recv() { + Ok(_task) => { + println!("Worker got a task"); + // Process the task + } + Err(_) => { + println!("Worker stopping.",); + break; + } } } + receiver.1.wait(&mut receiver.0.lock()); } - - // Set is_active to false when the worker is done - self.is_active.store(false, Ordering::SeqCst); - self.receiver.1.notify_one(); }); } } -type Task = String; - #[test] fn test_worker() { - let (sender, receiver) = mpsc::channel::(); - let receiver = Arc::new((Mutex::new(receiver), Condvar::new())); - let is_active = Arc::new(AtomicBool::new(true)); // Create the worker thread - let worker = Worker::new(1, Arc::clone(&receiver), Arc::clone(&is_active)); + let worker = Worker::new(); // Start the worker thread worker.start(); // Send tasks to the worker for i in 0..5 { - let task = format!("Task {}", i); - sender.send(task).unwrap(); + let task = crate::instruction::TimedInstruction{ + time:0, + instruction:crate::body::PhysicsInstruction::StrafeTick, + }; + worker.send(task).unwrap(); } // Optional: Signal the worker to stop (in a real-world scenario) @@ -62,17 +68,10 @@ fn test_worker() { // Sleep to allow the worker thread to finish processing thread::sleep(std::time::Duration::from_secs(2)); - // Check if the worker is still active - let is_worker_active = is_active.load(Ordering::SeqCst); - - if !is_worker_active { - // If the worker is done, signal it to process a new task - is_active.store(true, Ordering::SeqCst); - thread::sleep(std::time::Duration::from_secs(2)); - - // Send a new task - sender.send("New Task".to_string()).unwrap(); - } else { - println!("Worker is still active. Skipping new task."); - } + // Send a new task + let task = crate::instruction::TimedInstruction{ + time:0, + instruction:crate::body::PhysicsInstruction::StrafeTick, + }; + worker.send(task).unwrap(); }