Parcourir la source

task: new task state management method

We remove `ISLEEP` and `USLEEP` states. Instead, there's
only one `Sleeping` state now. If we'd like to do an interruptible
sleep, we just store an extra copy of waker in
`SignalList::signal_waker`.
greatbridf il y a 10 mois
Parent
commit
c1c82197b2

+ 12 - 13
crates/eonix_runtime/src/executor.rs

@@ -4,7 +4,7 @@ mod output_handle;
 mod stack;
 
 use crate::{
-    run::{Contexted, PinRun, RunState},
+    run::{Contexted, Run, RunState},
     scheduler::Scheduler,
     task::Task,
 };
@@ -21,7 +21,7 @@ pub use execute_status::ExecuteStatus;
 pub use output_handle::OutputHandle;
 pub use stack::Stack;
 
-/// An `Executor` executes a `PinRun` object in a separate thread of execution
+/// An `Executor` executes a `Run` object in a separate thread of execution
 /// where we have a dedicated stack and context.
 pub trait Executor: Send {
     fn progress(&self) -> ExecuteStatus;
@@ -29,7 +29,7 @@ pub trait Executor: Send {
 
 struct RealExecutor<S, R>
 where
-    R: PinRun + Send + Contexted + 'static,
+    R: Run + Send + Contexted + 'static,
     R::Output: Send,
 {
     _stack: S,
@@ -40,7 +40,7 @@ where
 
 impl<S, R> RealExecutor<S, R>
 where
-    R: PinRun + Send + Contexted + 'static,
+    R: Run + Send + Contexted + 'static,
     R::Output: Send,
 {
     extern "C" fn execute(self: Pin<&Self>) -> ! {
@@ -59,18 +59,17 @@ where
                 let mut pinned_runnable =
                     unsafe { Pin::new_unchecked(&mut *(runnable_pointer as *mut R)) };
 
-                match pinned_runnable.as_mut().pinned_run(&waker) {
+                match pinned_runnable.as_mut().run(&waker) {
                     RunState::Finished(output) => break output,
                     RunState::Running => {
-                        if Task::current().is_runnable() {
-                            continue;
-                        }
-
-                        // We need to set the preempt count to 0 to allow preemption.
                         eonix_preempt::disable();
 
-                        // SAFETY: We are in the scheduler context and preemption is disabled.
-                        unsafe { Scheduler::goto_scheduler(&Task::current().execution_context) };
+                        if !Task::current().state.is_running() {
+                            unsafe {
+                                // SAFETY: Preemption is disabled.
+                                Scheduler::goto_scheduler(&Task::current().execution_context)
+                            };
+                        }
 
                         eonix_preempt::enable();
                     }
@@ -96,7 +95,7 @@ where
 impl<S, R> Executor for RealExecutor<S, R>
 where
     S: Send,
-    R: PinRun + Contexted + Send,
+    R: Run + Contexted + Send,
     R::Output: Send,
 {
     fn progress(&self) -> ExecuteStatus {

+ 2 - 2
crates/eonix_runtime/src/executor/builder.rs

@@ -1,7 +1,7 @@
 use super::{Executor, OutputHandle, RealExecutor, Stack};
 use crate::{
     context::ExecutionContext,
-    run::{Contexted, PinRun},
+    run::{Contexted, Run},
 };
 use alloc::{boxed::Box, sync::Arc};
 use core::{pin::Pin, sync::atomic::AtomicBool};
@@ -15,7 +15,7 @@ pub struct ExecutorBuilder<S, R> {
 impl<S, R> ExecutorBuilder<S, R>
 where
     S: Stack,
-    R: PinRun + Contexted + Send + 'static,
+    R: Run + Contexted + Send + 'static,
     R::Output: Send,
 {
     pub fn new() -> Self {

+ 5 - 1
crates/eonix_runtime/src/ready_queue.rs

@@ -3,7 +3,7 @@ use alloc::{collections::VecDeque, sync::Arc};
 use eonix_sync::Spin;
 
 #[arch::define_percpu_shared]
-static READYQUEUE: FifoReadyQueue = FifoReadyQueue::new();
+static READYQUEUE: Spin<FifoReadyQueue> = Spin::new(FifoReadyQueue::new());
 
 pub trait ReadyQueue {
     fn get(&mut self) -> Option<Arc<Task>>;
@@ -35,3 +35,7 @@ impl ReadyQueue for FifoReadyQueue {
 pub fn local_rq() -> &'static Spin<dyn ReadyQueue> {
     &*READYQUEUE
 }
+
+pub fn cpu_rq(cpuid: usize) -> &'static Spin<dyn ReadyQueue> {
+    READYQUEUE.get_for_cpu(cpuid).expect("CPU not found")
+}

+ 3 - 29
crates/eonix_runtime/src/run.rs

@@ -21,40 +21,14 @@ pub trait Contexted {
 pub trait Run {
     type Output;
 
-    fn run(&mut self, waker: &Waker) -> RunState<Self::Output>;
+    fn run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output>;
 
-    fn join(&mut self, waker: &Waker) -> Self::Output {
+    fn join(mut self: Pin<&mut Self>, waker: &Waker) -> Self::Output {
         loop {
-            match self.run(waker) {
+            match self.as_mut().run(waker) {
                 RunState::Running => continue,
                 RunState::Finished(output) => break output,
             }
         }
     }
 }
-
-pub trait PinRun {
-    type Output;
-
-    fn pinned_run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output>;
-
-    fn pinned_join(mut self: Pin<&mut Self>, waker: &Waker) -> Self::Output {
-        loop {
-            match self.as_mut().pinned_run(waker) {
-                RunState::Running => continue,
-                RunState::Finished(output) => break output,
-            }
-        }
-    }
-}
-
-impl<R> Run for R
-where
-    R: PinRun + Unpin,
-{
-    type Output = R::Output;
-
-    fn run(&mut self, waker: &Waker) -> RunState<Self::Output> {
-        Pin::new(self).pinned_run(waker)
-    }
-}

+ 3 - 3
crates/eonix_runtime/src/run/future_run.rs

@@ -1,4 +1,4 @@
-use super::{Contexted, PinRun, RunState};
+use super::{Contexted, Run, RunState};
 use core::{
     pin::Pin,
     task::{Context, Poll, Waker},
@@ -16,13 +16,13 @@ where
 }
 
 impl<F> Contexted for FutureRun<F> where F: Future {}
-impl<F> PinRun for FutureRun<F>
+impl<F> Run for FutureRun<F>
 where
     F: Future + 'static,
 {
     type Output = F::Output;
 
-    fn pinned_run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output> {
+    fn run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output> {
         let mut future = unsafe { self.map_unchecked_mut(|me| &mut me.0) };
         let mut context = Context::from_waker(waker);
 

+ 50 - 74
crates/eonix_runtime/src/scheduler.rs

@@ -1,18 +1,16 @@
 use crate::{
     context::ExecutionContext,
     executor::{ExecuteStatus, OutputHandle, Stack},
-    ready_queue::{local_rq, ReadyQueue},
-    run::{Contexted, PinRun},
+    ready_queue::{cpu_rq, local_rq},
+    run::{Contexted, Run},
     task::{Task, TaskAdapter, TaskHandle},
 };
 use alloc::sync::Arc;
 use core::{
-    future::Future,
     mem::forget,
-    pin::Pin,
     ptr::NonNull,
     sync::atomic::{compiler_fence, Ordering},
-    task::{Context, Poll, Waker},
+    task::Waker,
 };
 use eonix_log::println_trace;
 use eonix_preempt::assert_preempt_count_eq;
@@ -137,22 +135,36 @@ impl Scheduler {
         unsafe { TASKS.lock().cursor_mut_from_ptr(task as *const _).remove() };
     }
 
-    fn select_rq_for_task(&self, _task: &Task) -> &'static Spin<dyn ReadyQueue> {
-        // TODO: Select an appropriate ready queue.
-        local_rq()
+    fn select_cpu_for_task(&self, task: &Task) -> usize {
+        task.cpu.load(Ordering::Relaxed) as _
     }
 
     pub fn activate(&self, task: &Arc<Task>) {
-        if !task.on_rq.swap(true, Ordering::AcqRel) {
-            let rq = self.select_rq_for_task(&task);
-            rq.lock_irq().put(task.clone());
+        if task.on_rq.swap(true, Ordering::Acquire) {
+            // Lock the rq and check whether the task is on the rq again.
+            let cpuid = task.cpu.load(Ordering::Acquire);
+            let mut rq = cpu_rq(cpuid as _).lock_irq();
+
+            if !task.on_rq.load(Ordering::Acquire) {
+                // Task has just got off the rq. Put it back.
+                rq.put(task.clone());
+            } else {
+                // Task is already on the rq. Do nothing.
+                return;
+            }
+        } else {
+            // Task not on some rq. Select one and put it here.
+            let cpu = self.select_cpu_for_task(&task);
+            let mut rq = cpu_rq(cpu).lock_irq();
+            task.cpu.store(cpu as _, Ordering::Release);
+            rq.put(task.clone());
         }
     }
 
     pub fn spawn<S, R>(&self, runnable: R) -> JoinHandle<R::Output>
     where
         S: Stack + 'static,
-        R: PinRun + Contexted + Send + 'static,
+        R: Run + Contexted + Send + 'static,
         R::Output: Send + 'static,
     {
         let TaskHandle {
@@ -188,81 +200,44 @@ impl Scheduler {
         }
         eonix_preempt::enable();
     }
-
-    pub async fn yield_now() {
-        struct Yield(bool);
-
-        impl Future for Yield {
-            type Output = ();
-
-            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-                match *self {
-                    Yield(true) => Poll::Ready(()),
-                    Yield(false) => {
-                        self.set(Yield(true));
-                        cx.waker().wake_by_ref();
-                        Poll::Pending
-                    }
-                }
-            }
-        }
-
-        Yield(false).await
-    }
-
-    pub async fn sleep() {
-        struct Sleep(bool);
-
-        impl Future for Sleep {
-            type Output = ();
-
-            fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
-                match *self {
-                    Sleep(true) => Poll::Ready(()),
-                    Sleep(false) => {
-                        self.set(Sleep(true));
-                        Poll::Pending
-                    }
-                }
-            }
-        }
-
-        Sleep(false).await
-    }
 }
 
 extern "C" fn local_scheduler() -> ! {
     loop {
         assert_preempt_count_eq!(1, "Scheduler::idle_task");
+        let mut rq = local_rq().lock_irq();
+
         let previous_task = CURRENT_TASK
             .get()
             .map(|ptr| unsafe { Arc::from_raw(ptr.as_ptr()) });
-        let next_task = local_rq().lock().get();
+        let next_task = rq.get();
 
         match (previous_task, next_task) {
             (None, None) => {
                 // Nothing to do, halt the cpu and rerun the loop.
+                drop(rq);
                 arch::halt();
                 continue;
             }
             (None, Some(next)) => {
                 CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
             }
-            (Some(previous), None) if previous.is_runnable() => {
-                // Previous thread is `Running`, return to the current running thread.
-                println_trace!(
-                    "trace_scheduler",
-                    "Returning to task id({}) without doing context switch",
-                    previous.id
-                );
-
-                CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
-            }
             (Some(previous), None) => {
-                // Nothing to do, halt the cpu and rerun the loop.
-                CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
-                arch::halt();
-                continue;
+                if previous.state.is_running() {
+                    // Previous thread is `Running`, return to the current running thread.
+                    println_trace!(
+                        "trace_scheduler",
+                        "Returning to task id({}) without doing context switch",
+                        previous.id
+                    );
+                    CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
+                } else {
+                    // Nothing to do, halt the cpu and rerun the loop.
+                    CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
+                    drop(rq);
+                    arch::halt();
+                    continue;
+                }
             }
             (Some(previous), Some(next)) => {
                 println_trace!(
@@ -274,13 +249,9 @@ extern "C" fn local_scheduler() -> ! {
 
                 debug_assert_ne!(previous.id, next.id, "Switching to the same task");
 
-                let mut rq = local_rq().lock();
-                if previous.is_runnable() {
+                if previous.state.is_running() {
                     rq.put(previous);
                 } else {
-                    // TODO!!!!!!!!!: There is a race condition here if we reach here and there
-                    // is another thread waking the task up. They might read `on_rq` == true so
-                    // the task will never be waken up.
                     previous.on_rq.store(false, Ordering::Release);
                 }
 
@@ -288,9 +259,14 @@ extern "C" fn local_scheduler() -> ! {
             }
         }
 
+        drop(rq);
         // TODO: We can move the release of finished tasks to some worker thread.
         if let ExecuteStatus::Finished = Task::current().run() {
-            Scheduler::remove_task(&Task::current());
+            let current = CURRENT_TASK
+                .swap(None)
+                .map(|ptr| unsafe { Arc::from_raw(ptr.as_ptr()) })
+                .expect("Current task should be present");
+            Scheduler::remove_task(&current);
         }
     }
 }

+ 44 - 61
crates/eonix_runtime/src/task.rs

@@ -1,10 +1,11 @@
 mod adapter;
 mod task_state;
+mod wait_list;
 
 use crate::{
     context::ExecutionContext,
     executor::{ExecuteStatus, Executor, ExecutorBuilder, OutputHandle, Stack},
-    run::{Contexted, PinRun},
+    run::{Contexted, Run},
     scheduler::Scheduler,
 };
 use alloc::{boxed::Box, sync::Arc, task::Wake};
@@ -19,12 +20,11 @@ use intrusive_collections::RBTreeAtomicLink;
 use task_state::TaskState;
 
 pub use adapter::TaskAdapter;
+pub use wait_list::TaskWait;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
 pub struct TaskId(u32);
 
-pub struct UniqueWaker(Arc<Task>);
-
 pub struct TaskHandle<Output>
 where
     Output: Send,
@@ -34,17 +34,40 @@ where
 }
 
 /// A `Task` represents a schedulable unit.
+///
+/// ## Task Sleeping and Waking up
+///
+/// ### Waiters
+///
+/// lock => check condition no => save waker => set state sleep => unlock => return pending
+///
+/// executor check state -> if sleeping => goto scheduler => get rq lock => scheduler check state
+///
+///                                                                      -> if sleeping => on_rq = false
+///
+///                                                                      -> if running => enqueue
+///
+///                      -> if running => poll again
+///
+/// ### Wakers
+///
+/// lock => set condition yes => get waker => unlock => if has waker
+///
+/// set state running => swap on_rq true => get rq lock => check on_rq true again => if false enqueue
 pub struct Task {
     /// Unique identifier of the task.
     pub id: TaskId,
-    /// Whether the task is on some run queue.
+    /// Whether the task is on some run queue (a.k.a ready).
     pub(crate) on_rq: AtomicBool,
+    /// The last cpu that the task was executed on.
+    /// If `on_rq` is `false`, we can't assume that this task is still on the cpu.
+    pub(crate) cpu: AtomicU32,
+    /// Task state.
+    pub(crate) state: TaskState,
     /// Task execution context.
     pub(crate) execution_context: ExecutionContext,
     /// Executor object.
     executor: AtomicUniqueRefCell<Option<Pin<Box<dyn Executor>>>>,
-    /// Task state.
-    state: TaskState,
     /// Link in the global task list.
     link_task_list: RBTreeAtomicLink,
 }
@@ -62,7 +85,7 @@ impl Task {
     pub fn new<S, R>(runnable: R) -> TaskHandle<R::Output>
     where
         S: Stack + 'static,
-        R: PinRun + Contexted + Send + 'static,
+        R: Run + Contexted + Send + 'static,
         R::Output: Send + 'static,
     {
         static ID: AtomicU32 = AtomicU32::new(0);
@@ -75,9 +98,10 @@ impl Task {
         let task = Arc::new(Self {
             id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
             on_rq: AtomicBool::new(false),
+            cpu: AtomicU32::new(0),
+            state: TaskState::new(TaskState::RUNNING),
             executor: AtomicUniqueRefCell::new(Some(executor)),
             execution_context,
-            state: TaskState::new(TaskState::RUNNING),
             link_task_list: RBTreeAtomicLink::new(),
         });
 
@@ -87,41 +111,6 @@ impl Task {
         }
     }
 
-    pub fn is_runnable(&self) -> bool {
-        self.state.is_runnable()
-    }
-
-    pub(super) fn set_usleep(&self) {
-        let prev_state = self.state.swap(TaskState::USLEEP);
-        assert_eq!(
-            prev_state,
-            TaskState::RUNNING,
-            "Trying to set task {} usleep that is not running",
-            self.id.0
-        );
-    }
-
-    pub fn usleep(self: &Arc<Self>) -> Arc<UniqueWaker> {
-        // No need to dequeue. We have proved that the task is running so not in the queue.
-        self.set_usleep();
-
-        Arc::new(UniqueWaker(self.clone()))
-    }
-
-    pub fn isleep(self: &Arc<Self>) -> Arc<Self> {
-        // No need to dequeue. We have proved that the task is running so not in the queue.
-        let prev_state = self.state.cmpxchg(TaskState::RUNNING, TaskState::ISLEEP);
-
-        assert_eq!(
-            prev_state,
-            TaskState::RUNNING,
-            "Trying to sleep task {} that is not running",
-            self.id.0
-        );
-
-        self.clone()
-    }
-
     pub fn run(&self) -> ExecuteStatus {
         let mut executor_borrow = self.executor.borrow();
 
@@ -133,39 +122,33 @@ impl Task {
 
         if let ExecuteStatus::Finished = executor.progress() {
             executor_borrow.take();
-            self.set_usleep();
             ExecuteStatus::Finished
         } else {
             ExecuteStatus::Executing
         }
     }
-}
 
-impl Wake for Task {
-    fn wake(self: Arc<Self>) {
-        self.wake_by_ref();
-    }
-
-    fn wake_by_ref(self: &Arc<Self>) {
-        match self.state.cmpxchg(TaskState::ISLEEP, TaskState::RUNNING) {
-            TaskState::RUNNING | TaskState::USLEEP => return,
-            TaskState::ISLEEP => Scheduler::get().activate(self),
-            state => panic!("Invalid transition from state {:?} to `Running`", state),
-        }
+    /// Temporary solution.
+    pub unsafe fn sleep(&self) {
+        self.state.swap(TaskState::SLEEPING);
     }
 }
 
-impl Wake for UniqueWaker {
+impl Wake for Task {
     fn wake(self: Arc<Self>) {
         self.wake_by_ref();
     }
 
     fn wake_by_ref(self: &Arc<Self>) {
-        let Self(task) = &**self;
+        // TODO: Check the fast path where we're waking up current.
 
-        let prev_state = task.state.swap(TaskState::RUNNING);
-        assert_eq!(prev_state, TaskState::USLEEP);
+        // SAFETY: All the operations below should happen after we've read the sleeping state.
+        let old_state = self.state.swap(TaskState::RUNNING);
+        if old_state != TaskState::SLEEPING {
+            return;
+        }
 
-        Scheduler::get().activate(task);
+        // If we get here, we should be the only one waking up the task.
+        Scheduler::get().activate(self);
     }
 }

+ 1 - 2
crates/eonix_runtime/src/task/adapter.rs

@@ -1,8 +1,7 @@
+use super::{Task, TaskId};
 use alloc::sync::Arc;
 use intrusive_collections::{intrusive_adapter, KeyAdapter, RBTreeAtomicLink};
 
-use super::{Task, TaskId};
-
 intrusive_adapter!(pub TaskAdapter = Arc<Task>: Task { link_task_list: RBTreeAtomicLink });
 
 impl<'a> KeyAdapter<'a> for TaskAdapter {

+ 4 - 11
crates/eonix_runtime/src/task/task_state.rs

@@ -5,24 +5,17 @@ pub struct TaskState(AtomicU32);
 
 impl TaskState {
     pub const RUNNING: u32 = 0;
-    pub const ISLEEP: u32 = 1;
-    pub const USLEEP: u32 = 2;
+    pub const SLEEPING: u32 = 1;
 
-    pub const fn new(state: u32) -> Self {
+    pub(crate) const fn new(state: u32) -> Self {
         Self(AtomicU32::new(state))
     }
 
-    pub fn swap(&self, state: u32) -> u32 {
+    pub(crate) fn swap(&self, state: u32) -> u32 {
         self.0.swap(state, Ordering::AcqRel)
     }
 
-    pub fn cmpxchg(&self, current: u32, new: u32) -> u32 {
-        self.0
-            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Relaxed)
-            .unwrap_or_else(|x| x)
-    }
-
-    pub fn is_runnable(&self) -> bool {
+    pub(crate) fn is_running(&self) -> bool {
         self.0.load(Ordering::Acquire) == Self::RUNNING
     }
 }

+ 67 - 0
crates/eonix_runtime/src/task/wait_list.rs

@@ -0,0 +1,67 @@
+use super::task_state::TaskState;
+use crate::task::Task;
+use alloc::collections::vec_deque::VecDeque;
+use core::{fmt, task::Waker};
+use eonix_preempt::assert_preempt_enabled;
+use eonix_sync::{sleep, Spin, UnlockableGuard, UnlockedGuard, WaitList};
+
+pub struct TaskWait {
+    waiters: Spin<VecDeque<Waker>>,
+}
+
+impl TaskWait {
+    pub const fn new() -> Self {
+        Self {
+            waiters: Spin::new(VecDeque::new()),
+        }
+    }
+
+    fn wake(waker: &Waker) {
+        waker.wake_by_ref();
+    }
+}
+
+impl WaitList for TaskWait {
+    fn has_waiters(&self) -> bool {
+        !self.waiters.lock().is_empty()
+    }
+
+    fn notify_one(&self) -> bool {
+        self.waiters
+            .lock()
+            .pop_front()
+            .inspect(Self::wake)
+            .is_some()
+    }
+
+    fn notify_all(&self) -> usize {
+        self.waiters.lock().drain(..).inspect(Self::wake).count()
+    }
+
+    fn wait<G>(&self, guard: G) -> impl Future<Output = G> + Send
+    where
+        Self: Sized,
+        G: UnlockableGuard,
+        G::Unlocked: Send,
+    {
+        let waker = Waker::from(Task::current().clone());
+        self.waiters.lock().push_back(waker);
+
+        Task::current().state.swap(TaskState::SLEEPING);
+
+        let unlocked_guard = guard.unlock();
+        assert_preempt_enabled!("TaskWait::wait()");
+
+        async {
+            sleep().await;
+
+            unlocked_guard.relock()
+        }
+    }
+}
+
+impl fmt::Debug for TaskWait {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("WaitList").finish()
+    }
+}

+ 13 - 0
crates/eonix_sync/src/guard.rs

@@ -25,4 +25,17 @@ pub trait ForceUnlockableGuard {
     /// # Safety
     /// Calling this function twice on a force unlocked guard will cause deadlocks.
     unsafe fn force_relock(&mut self);
+
+    fn do_unlocked(&mut self, f: impl FnOnce())
+    where
+        Self: Sized,
+    {
+        // SAFETY: We unlock the lock before calling the function and relock it after
+        // calling the function. So we will end up with the lock being held again.
+        unsafe {
+            self.force_unlock();
+            f();
+            self.force_relock();
+        }
+    }
 }

+ 4 - 0
crates/eonix_sync/src/lib.rs

@@ -7,6 +7,7 @@ pub mod marker;
 mod mutex;
 mod rwlock;
 mod spin;
+mod wait_list;
 
 pub use guard::{ForceUnlockableGuard, UnlockableGuard, UnlockedGuard};
 pub use lazy_lock::LazyLock;
@@ -14,3 +15,6 @@ pub use locked::{AsProof, AsProofMut, Locked, Proof, ProofMut};
 pub use mutex::{Mutex, MutexGuard, Wait as MutexWait};
 pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard, Wait as RwLockWait};
 pub use spin::{LoopRelax, Relax, Spin, SpinGuard, SpinRelax, UnlockedSpinGuard};
+pub use wait_list::{sleep, yield_now, WaitList};
+
+extern crate alloc;

+ 41 - 0
crates/eonix_sync/src/wait_list.rs

@@ -0,0 +1,41 @@
+use crate::UnlockableGuard;
+use core::{future::poll_fn, task::Poll};
+
+pub trait WaitList {
+    fn has_waiters(&self) -> bool;
+    fn notify_one(&self) -> bool;
+    fn notify_all(&self) -> usize;
+
+    fn wait<G>(&self, guard: G) -> impl Future<Output = G> + Send
+    where
+        Self: Sized,
+        G: UnlockableGuard,
+        G::Unlocked: Send;
+}
+
+pub async fn yield_now() {
+    let mut yielded = false;
+    poll_fn(|ctx| {
+        if yielded {
+            Poll::Ready(())
+        } else {
+            yielded = true;
+            ctx.waker().wake_by_ref();
+            Poll::Pending
+        }
+    })
+    .await;
+}
+
+pub async fn sleep() {
+    let mut sleeped = false;
+    poll_fn(|_| {
+        if sleeped {
+            Poll::Ready(())
+        } else {
+            sleeped = true;
+            Poll::Pending
+        }
+    })
+    .await;
+}

+ 7 - 7
src/driver/serial.rs

@@ -5,12 +5,12 @@ use crate::{
         task::KernelStack, CharDevice, CharDeviceType, Terminal, TerminalDevice,
     },
     prelude::*,
-    sync::UCondVar,
 };
 use alloc::{collections::vec_deque::VecDeque, format, sync::Arc};
 use bitflags::bitflags;
-use eonix_runtime::{run::FutureRun, scheduler::Scheduler};
+use eonix_runtime::{run::FutureRun, scheduler::Scheduler, task::TaskWait};
 use eonix_spin_irq::SpinIrq as _;
+use eonix_sync::{yield_now, WaitList as _};
 
 bitflags! {
     struct LineStatus: u8 {
@@ -25,7 +25,7 @@ struct Serial {
     name: Arc<str>,
 
     terminal: Spin<Option<Arc<Terminal>>>,
-    cv_worker: UCondVar,
+    worker_wait: TaskWait,
 
     working: Spin<bool>,
     tx_buffer: Spin<VecDeque<u8>>,
@@ -67,7 +67,7 @@ impl Serial {
             self.enable_interrupts();
             *working = false;
 
-            self.cv_worker.async_wait(working)
+            self.worker_wait.wait(working)
         }
         .await;
 
@@ -106,7 +106,7 @@ impl Serial {
             if should_wait {
                 port.wait_for_interrupt().await;
             } else {
-                Scheduler::yield_now().await;
+                yield_now().await;
             }
         }
     }
@@ -116,7 +116,7 @@ impl Serial {
             id,
             name: Arc::from(format!("ttyS{id}")),
             terminal: Spin::new(None),
-            cv_worker: UCondVar::new(),
+            worker_wait: TaskWait::new(),
             working: Spin::new(true),
             tx_buffer: Spin::new(VecDeque::new()),
             tx_rx: Port8::new(base_port),
@@ -150,7 +150,7 @@ impl Serial {
     fn wakeup_worker(&self) {
         let working = self.working.lock_irq();
         if !*working {
-            self.cv_worker.notify_all();
+            self.worker_wait.notify_one();
         }
     }
 

+ 9 - 6
src/kernel/task/signal.rs

@@ -263,7 +263,7 @@ impl SignalListInner {
                 self.stop_waker.take().map(|waker| waker.wake());
             }
             _ => {
-                // If we don't have a waker here, we might be at initialization step.
+                // If we don't have a waker here, we are not permitted to be woken up.
                 // We would run in the end anyway.
                 self.signal_waker
                     .as_ref()
@@ -328,11 +328,9 @@ impl SignalList {
             .unwrap_or_else(SignalAction::default_action)
     }
 
-    // TODO!!!: Find a better way.
-    pub fn set_signal_waker(&self, waker: Waker) {
+    pub fn set_signal_waker(&self, waker: Option<Waker>) {
         let mut inner = self.inner.lock();
-        let old_waker = inner.signal_waker.replace(waker);
-        assert!(old_waker.is_none(), "We should not have a waker here");
+        inner.signal_waker = waker;
     }
 
     /// Clear all signals except for `SIG_IGN`.
@@ -423,7 +421,12 @@ impl SignalList {
                     // SAFETY: Preempt disabled above.
                     {
                         let mut inner = self.inner.lock();
-                        let waker = Waker::from(Task::current().usleep());
+                        let waker = Waker::from(Task::current().clone());
+
+                        unsafe {
+                            Task::current().sleep();
+                        }
+
                         let old_waker = inner.stop_waker.replace(waker);
                         assert!(old_waker.is_none(), "We should not have a waker here");
                     }

+ 6 - 4
src/kernel/task/thread.rs

@@ -23,7 +23,7 @@ use core::{
 };
 use eonix_runtime::{
     context::ExecutionContext,
-    run::{Contexted, PinRun, RunState},
+    run::{Contexted, Run, RunState},
 };
 use eonix_sync::AsProofMut as _;
 use pointers::BorrowedArc;
@@ -360,16 +360,18 @@ impl Contexted for ThreadRunnable {
     }
 }
 
-impl PinRun for ThreadRunnable {
+impl Run for ThreadRunnable {
     type Output = ();
 
-    fn pinned_run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output> {
+    fn run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output> {
         let mut task_context = ExecutionContext::new();
         task_context.set_interrupt(false);
         task_context.set_ip(_arch_fork_return as _);
         task_context.set_sp(&self.interrupt_context as *const _ as _);
 
-        self.thread.signal_list.set_signal_waker(waker.clone());
+        self.thread
+            .signal_list
+            .set_signal_waker(Some(waker.clone()));
 
         eonix_preempt::disable();
 

+ 20 - 44
src/sync/condvar.rs

@@ -1,9 +1,9 @@
-use crate::prelude::*;
+use crate::{kernel::task::Thread, prelude::*};
 use alloc::collections::vec_deque::VecDeque;
 use core::{future::Future, task::Waker};
 use eonix_preempt::{assert_preempt_count_eq, assert_preempt_enabled};
 use eonix_runtime::{scheduler::Scheduler, task::Task};
-use eonix_sync::{ForceUnlockableGuard, UnlockableGuard, UnlockedGuard as _};
+use eonix_sync::{sleep, ForceUnlockableGuard, UnlockableGuard, UnlockedGuard as _};
 
 pub struct CondVar<const INTERRUPTIBLE: bool> {
     waiters: Spin<VecDeque<Waker>>,
@@ -34,18 +34,6 @@ impl<const I: bool> CondVar<I> {
         waker.wake();
     }
 
-    fn sleep() -> Waker {
-        let task = Task::current();
-
-        let waker = if I {
-            Waker::from(task.isleep())
-        } else {
-            Waker::from(task.usleep())
-        };
-
-        waker
-    }
-
     pub fn notify_one(&self) {
         if let Some(waker) = self.waiters.lock().pop_front() {
             Self::wake(waker);
@@ -64,8 +52,19 @@ impl<const I: bool> CondVar<I> {
     /// This function **might sleep**, so call it in a preemptible context.
     pub fn wait(&self, guard: &mut impl ForceUnlockableGuard) {
         eonix_preempt::disable();
-        let waker = Self::sleep();
-        self.waiters.lock().push_back(waker);
+        let waker = Waker::from(Task::current().clone());
+        if I {
+            // Prohibit the thread from being woken up by a signal.
+            Thread::current()
+                .signal_list
+                .set_signal_waker(Some(waker.clone()));
+        }
+
+        self.waiters.lock().push_back(waker.clone());
+
+        unsafe {
+            Task::current().sleep();
+        }
 
         // TODO!!!: Another way to do this:
         //
@@ -78,34 +77,11 @@ impl<const I: bool> CondVar<I> {
         assert_preempt_count_eq!(1, "CondVar::wait");
         Scheduler::schedule();
 
-        unsafe { guard.force_relock() };
-
-        assert!(Task::current().is_runnable());
-    }
-
-    /// Unlock the `guard`. Then wait until being waken up. Relock the `guard` and return it.
-    pub fn async_wait<G>(&self, guard: G) -> impl Future<Output = G> + Send
-    where
-        G: UnlockableGuard,
-        G::Unlocked: Send,
-    {
-        let waker = Self::sleep();
-        self.waiters.lock().push_back(waker);
-
-        // TODO!!!: Another way to do this:
-        //
-        // Store a flag in our entry in the waiting list.
-        // Check the flag before doing `schedule()` but after we've unlocked the `guard`.
-        // If the flag is already set, we don't need to sleep.
-
-        let guard = guard.unlock();
-        assert_preempt_enabled!("CondVar::async_wait");
-
-        async {
-            Scheduler::sleep().await;
-
-            assert!(Task::current().is_runnable());
-            guard.relock()
+        if I {
+            // Allow the thread to be woken up by a signal again.
+            Thread::current().signal_list.set_signal_waker(None);
         }
+
+        unsafe { guard.force_relock() };
     }
 }