Browse Source

wait_list: improved waitlist arch

greatbridf 9 months ago
parent
commit
689a147b35

+ 1 - 0
Cargo.lock

@@ -129,6 +129,7 @@ name = "eonix_sync"
 version = "0.1.0"
 dependencies = [
  "eonix_preempt",
+ "intrusive-collections",
 ]
 
 [[package]]

+ 1 - 1
Cargo.toml

@@ -49,7 +49,7 @@ opt-level = 0
 opt-level = 0
 
 [profile.dev.package."*"]
-opt-level = 2
+opt-level = "s"
 
 [profile.dev.build-override]
 opt-level = 0

+ 1 - 12
crates/eonix_runtime/src/executor.rs

@@ -61,18 +61,7 @@ where
 
                 match pinned_runnable.as_mut().run(&waker) {
                     RunState::Finished(output) => break output,
-                    RunState::Running => {
-                        eonix_preempt::disable();
-
-                        if !Task::current().state.is_running() {
-                            unsafe {
-                                // SAFETY: Preemption is disabled.
-                                Scheduler::goto_scheduler(&Task::current().execution_context)
-                            };
-                        }
-
-                        eonix_preempt::enable();
-                    }
+                    RunState::Running => Task::park(),
                 }
             };
 

+ 4 - 1
crates/eonix_runtime/src/scheduler.rs

@@ -140,6 +140,9 @@ impl Scheduler {
     }
 
     pub fn activate(&self, task: &Arc<Task>) {
+        // Only one cpu can be activating the task at a time.
+        // TODO: Add some checks.
+
         if task.on_rq.swap(true, Ordering::Acquire) {
             // Lock the rq and check whether the task is on the rq again.
             let cpuid = task.cpu.load(Ordering::Acquire);
@@ -249,7 +252,7 @@ extern "C" fn local_scheduler() -> ! {
 
                 debug_assert_ne!(previous.id, next.id, "Switching to the same task");
 
-                if previous.state.is_running() {
+                if previous.state.is_running() || !previous.state.try_park() {
                     rq.put(previous);
                 } else {
                     previous.on_rq.store(false, Ordering::Release);

+ 67 - 42
crates/eonix_runtime/src/task.rs

@@ -1,6 +1,5 @@
 mod adapter;
 mod task_state;
-mod wait_list;
 
 use crate::{
     context::ExecutionContext,
@@ -13,14 +12,12 @@ use atomic_unique_refcell::AtomicUniqueRefCell;
 use core::{
     pin::Pin,
     sync::atomic::{AtomicBool, AtomicU32, Ordering},
-    task::Waker,
 };
 use eonix_sync::Spin;
 use intrusive_collections::RBTreeAtomicLink;
 use task_state::TaskState;
 
 pub use adapter::TaskAdapter;
-pub use wait_list::TaskWait;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
 pub struct TaskId(u32);
@@ -35,30 +32,30 @@ where
 
 /// A `Task` represents a schedulable unit.
 ///
-/// ## Task Sleeping and Waking up
+/// Initial: state = Running, unparked = false
 ///
-/// ### Waiters
+/// Task::park() => swap state <- Parking, assert prev == Running
+///              => swap unparked <- false
+///              -> true => store state <- Running => return
+///              -> false => goto scheduler => get rq lock => load state
+///                                                        -> Running => enqueue
+///                                                        -> Parking => cmpxchg Parking -> Parked
+///                                                                   -> Running => enqueue
+///                                                                   -> Parking => on_rq <- false
+///                                                                   -> Parked => ???
 ///
-/// lock => check condition no => save waker => set state sleep => unlock => return pending
-///
-/// executor check state -> if sleeping => goto scheduler => get rq lock => scheduler check state
-///
-///                                                                      -> if sleeping => on_rq = false
-///
-///                                                                      -> if running => enqueue
-///
-///                      -> if running => poll again
-///
-/// ### Wakers
-///
-/// lock => set condition yes => get waker => unlock => if has waker
-///
-/// set state running => swap on_rq true => get rq lock => check on_rq true again => if false enqueue
+/// Task::unpark() => swap unparked <- true
+///                -> true => return
+///                -> false => swap state <- Running
+///                         -> Running => return
+///                         -> Parking | Parked => Scheduler::activate
 pub struct Task {
     /// Unique identifier of the task.
     pub id: TaskId,
     /// Whether the task is on some run queue (a.k.a ready).
     pub(crate) on_rq: AtomicBool,
+    /// Whether someone has called `unpark` on this task.
+    pub(crate) unparked: AtomicBool,
     /// The last cpu that the task was executed on.
     /// If `on_rq` is `false`, we can't assume that this task is still on the cpu.
     pub(crate) cpu: AtomicU32,
@@ -72,15 +69,6 @@ pub struct Task {
     link_task_list: RBTreeAtomicLink,
 }
 
-impl<Output> TaskHandle<Output>
-where
-    Output: Send,
-{
-    pub fn waker(&self) -> Waker {
-        Waker::from(self.task.clone())
-    }
-}
-
 impl Task {
     pub fn new<S, R>(runnable: R) -> TaskHandle<R::Output>
     where
@@ -98,6 +86,7 @@ impl Task {
         let task = Arc::new(Self {
             id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
             on_rq: AtomicBool::new(false),
+            unparked: AtomicBool::new(false),
             cpu: AtomicU32::new(0),
             state: TaskState::new(TaskState::RUNNING),
             executor: AtomicUniqueRefCell::new(Some(executor)),
@@ -128,9 +117,54 @@ impl Task {
         }
     }
 
-    /// Temporary solution.
-    pub unsafe fn sleep(&self) {
-        self.state.swap(TaskState::SLEEPING);
+    pub fn unpark(self: &Arc<Self>) {
+        if self.unparked.swap(true, Ordering::Release) {
+            return;
+        }
+
+        match self.state.swap(TaskState::RUNNING) {
+            TaskState::RUNNING => return,
+            TaskState::PARKED | TaskState::PARKING => {
+                // We are waking up from sleep or someone else is parking this task.
+                // Try to wake it up.
+                Scheduler::get().activate(self);
+            }
+            _ => unreachable!(),
+        }
+    }
+
+    pub fn park() {
+        eonix_preempt::disable();
+        Self::park_preempt_disabled();
+    }
+
+    /// Park the current task with `preempt::count() == 1`.
+    pub fn park_preempt_disabled() {
+        let task = Task::current();
+
+        let old_state = task.state.swap(TaskState::PARKING);
+        assert_eq!(
+            old_state,
+            TaskState::RUNNING,
+            "Parking a task that is not running."
+        );
+
+        if task.unparked.swap(false, Ordering::Release) {
+            // Someone has called `unpark` on this task previously.
+            let old_state = task.state.swap(TaskState::RUNNING);
+            assert_eq!(
+                old_state,
+                TaskState::PARKING,
+                "We should have just swapped the state to RUNNING."
+            );
+        } else {
+            unsafe {
+                // SAFETY: Preemption is disabled.
+                Scheduler::goto_scheduler(&Task::current().execution_context)
+            };
+        }
+
+        eonix_preempt::enable();
     }
 }
 
@@ -140,15 +174,6 @@ impl Wake for Task {
     }
 
     fn wake_by_ref(self: &Arc<Self>) {
-        // TODO: Check the fast path where we're waking up current.
-
-        // SAFETY: All the operations below should happen after we've read the sleeping state.
-        let old_state = self.state.swap(TaskState::RUNNING);
-        if old_state != TaskState::SLEEPING {
-            return;
-        }
-
-        // If we get here, we should be the only one waking up the task.
-        Scheduler::get().activate(self);
+        self.unpark();
     }
 }

+ 15 - 1
crates/eonix_runtime/src/task/task_state.rs

@@ -5,7 +5,8 @@ pub struct TaskState(AtomicU32);
 
 impl TaskState {
     pub const RUNNING: u32 = 0;
-    pub const SLEEPING: u32 = 1;
+    pub const PARKING: u32 = 1;
+    pub const PARKED: u32 = 2;
 
     pub(crate) const fn new(state: u32) -> Self {
         Self(AtomicU32::new(state))
@@ -15,6 +16,19 @@ impl TaskState {
         self.0.swap(state, Ordering::AcqRel)
     }
 
+    pub(crate) fn try_park(&self) -> bool {
+        match self.0.compare_exchange(
+            TaskState::PARKING,
+            TaskState::PARKED,
+            Ordering::AcqRel,
+            Ordering::Acquire,
+        ) {
+            Ok(_) => true,
+            Err(TaskState::RUNNING) => false,
+            Err(_) => unreachable!("Invalid task state while trying to park."),
+        }
+    }
+
     pub(crate) fn is_running(&self) -> bool {
         self.0.load(Ordering::Acquire) == Self::RUNNING
     }

+ 0 - 67
crates/eonix_runtime/src/task/wait_list.rs

@@ -1,67 +0,0 @@
-use super::task_state::TaskState;
-use crate::task::Task;
-use alloc::collections::vec_deque::VecDeque;
-use core::{fmt, task::Waker};
-use eonix_preempt::assert_preempt_enabled;
-use eonix_sync::{sleep, Spin, UnlockableGuard, UnlockedGuard, WaitList};
-
-pub struct TaskWait {
-    waiters: Spin<VecDeque<Waker>>,
-}
-
-impl TaskWait {
-    pub const fn new() -> Self {
-        Self {
-            waiters: Spin::new(VecDeque::new()),
-        }
-    }
-
-    fn wake(waker: &Waker) {
-        waker.wake_by_ref();
-    }
-}
-
-impl WaitList for TaskWait {
-    fn has_waiters(&self) -> bool {
-        !self.waiters.lock().is_empty()
-    }
-
-    fn notify_one(&self) -> bool {
-        self.waiters
-            .lock()
-            .pop_front()
-            .inspect(Self::wake)
-            .is_some()
-    }
-
-    fn notify_all(&self) -> usize {
-        self.waiters.lock().drain(..).inspect(Self::wake).count()
-    }
-
-    fn wait<G>(&self, guard: G) -> impl Future<Output = G> + Send
-    where
-        Self: Sized,
-        G: UnlockableGuard,
-        G::Unlocked: Send,
-    {
-        let waker = Waker::from(Task::current().clone());
-        self.waiters.lock().push_back(waker);
-
-        Task::current().state.swap(TaskState::SLEEPING);
-
-        let unlocked_guard = guard.unlock();
-        assert_preempt_enabled!("TaskWait::wait()");
-
-        async {
-            sleep().await;
-
-            unlocked_guard.relock()
-        }
-    }
-}
-
-impl fmt::Debug for TaskWait {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("WaitList").finish()
-    }
-}

+ 1 - 0
crates/eonix_sync/Cargo.toml

@@ -5,6 +5,7 @@ edition = "2024"
 
 [dependencies]
 eonix_preempt = { path = "../eonix_preempt" }
+intrusive-collections = "0.9.7"
 
 [features]
 default = []

+ 1 - 1
crates/eonix_sync/src/lib.rs

@@ -15,6 +15,6 @@ pub use locked::{AsProof, AsProofMut, Locked, Proof, ProofMut};
 pub use mutex::{Mutex, MutexGuard, Wait as MutexWait};
 pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard, Wait as RwLockWait};
 pub use spin::{LoopRelax, Relax, Spin, SpinGuard, SpinRelax, UnlockedSpinGuard};
-pub use wait_list::{sleep, yield_now, WaitList};
+pub use wait_list::WaitList;
 
 extern crate alloc;

+ 77 - 35
crates/eonix_sync/src/wait_list.rs

@@ -1,41 +1,83 @@
-use crate::UnlockableGuard;
-use core::{future::poll_fn, task::Poll};
-
-pub trait WaitList {
-    fn has_waiters(&self) -> bool;
-    fn notify_one(&self) -> bool;
-    fn notify_all(&self) -> usize;
-
-    fn wait<G>(&self, guard: G) -> impl Future<Output = G> + Send
-    where
-        Self: Sized,
-        G: UnlockableGuard,
-        G::Unlocked: Send;
+mod prepare;
+mod wait_object;
+
+use crate::Spin;
+use core::{fmt, sync::atomic::Ordering};
+use intrusive_collections::LinkedList;
+use wait_object::WaitObjectAdapter;
+
+pub use prepare::Prepare;
+
+pub struct WaitList {
+    waiters: Spin<LinkedList<WaitObjectAdapter>>,
 }
 
-pub async fn yield_now() {
-    let mut yielded = false;
-    poll_fn(|ctx| {
-        if yielded {
-            Poll::Ready(())
-        } else {
-            yielded = true;
-            ctx.waker().wake_by_ref();
-            Poll::Pending
+impl WaitList {
+    pub fn new() -> Self {
+        Self {
+            waiters: Spin::new(LinkedList::new(WaitObjectAdapter::new())),
         }
-    })
-    .await;
-}
+    }
+
+    pub fn has_waiters(&self) -> bool {
+        !self.waiters.lock().is_empty()
+    }
 
-pub async fn sleep() {
-    let mut sleeped = false;
-    poll_fn(|_| {
-        if sleeped {
-            Poll::Ready(())
-        } else {
-            sleeped = true;
-            Poll::Pending
+    pub fn notify_one(&self) -> bool {
+        let mut waiters = self.waiters.lock();
+        let mut waiter = waiters.front_mut();
+        if let Some(waiter) = waiter.get() {
+            // SAFETY: `wait_object` is a valid reference to a `WaitObject` because we
+            //         won't drop the wait object until the waiting thread will be woken
+            //         up and make sure that it is not on the list.
+            waiter.woken_up.store(true, Ordering::Release);
+
+            if let Some(waker) = waiter.waker.lock().take() {
+                waker.wake();
+            }
         }
-    })
-    .await;
+
+        // We need to remove the node from the list AFTER we've finished accessing it so
+        // the waiter knows when it is safe to release the wait object node.
+        waiter.remove().is_some()
+    }
+
+    pub fn notify_all(&self) -> usize {
+        let mut waiters = self.waiters.lock().take();
+        let mut waiter = waiters.front_mut();
+        let mut count = 0;
+
+        while !waiter.is_null() {
+            if let Some(waiter) = waiter.get() {
+                // SAFETY: `wait_object` is a valid reference to a `WaitObject` because we
+                //         won't drop the wait object until the waiting thread will be woken
+                //         up and make sure that it is not on the list.
+                waiter.woken_up.store(true, Ordering::Release);
+
+                if let Some(waker) = waiter.waker.lock().take() {
+                    waker.wake();
+                }
+            } else {
+                unreachable!("Invalid state.");
+            }
+
+            count += 1;
+
+            // We need to remove the node from the list AFTER we've finished accessing it so
+            // the waiter knows when it is safe to release the wait object node.
+            waiter.remove();
+        }
+
+        count
+    }
+
+    pub fn prepare_to_wait(&self) -> Prepare<'_> {
+        Prepare::new(self)
+    }
+}
+
+impl fmt::Debug for WaitList {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("WaitList").finish()
+    }
 }

+ 176 - 0
crates/eonix_sync/src/wait_list/prepare.rs

@@ -0,0 +1,176 @@
+use super::{wait_object::WaitObject, WaitList};
+use core::{
+    cell::UnsafeCell,
+    pin::Pin,
+    sync::atomic::Ordering,
+    task::{Context, Poll, Waker},
+};
+use intrusive_collections::UnsafeRef;
+
+pub struct Prepare<'a> {
+    wait_list: &'a WaitList,
+    wait_object: UnsafeCell<WaitObject>,
+    state: State,
+}
+
+#[derive(Debug, PartialEq)]
+enum State {
+    Init,
+    OnList,
+    WakerSet,
+    WokenUp,
+}
+
+struct PrepareSplit<'a> {
+    wait_list: &'a WaitList,
+    state: &'a mut State,
+    wait_object: Pin<&'a WaitObject>,
+}
+
+impl<'a> Prepare<'a> {
+    pub const fn new(wait_list: &'a WaitList) -> Self {
+        Self {
+            wait_list,
+            wait_object: UnsafeCell::new(WaitObject::new()),
+            state: State::Init,
+        }
+    }
+
+    fn wait_object(&self) -> &WaitObject {
+        // SAFETY: We never get mutable references to a `WaitObject`.
+        unsafe { &*self.wait_object.get() }
+    }
+
+    fn split_borrow(self: Pin<&mut Self>) -> PrepareSplit<'_> {
+        unsafe {
+            // SAFETY: `wait_list` and `state` is `Unpin`.
+            let this = self.get_unchecked_mut();
+
+            // SAFETY: `wait_object` is a field of a pinned struct.
+            //         And we never get mutable references to a `WaitObject`.
+            let wait_object = Pin::new_unchecked(&*this.wait_object.get());
+
+            PrepareSplit {
+                wait_list: this.wait_list,
+                state: &mut this.state,
+                wait_object,
+            }
+        }
+    }
+
+    fn set_state(self: Pin<&mut Self>, state: State) {
+        // SAFETY: We only touch `state`, which is `Unpin`.
+        unsafe {
+            let this = self.get_unchecked_mut();
+            this.state = state;
+        }
+    }
+
+    /// # Returns
+    /// Whether we've been woken up or not.
+    fn do_add_to_wait_list(mut self: Pin<&mut Self>, waker: Option<&Waker>) -> bool {
+        let PrepareSplit {
+            wait_list,
+            state,
+            wait_object,
+        } = self.as_mut().split_borrow();
+
+        let wait_object_ref = unsafe {
+            // SAFETY: `wait_object` is a valid reference to a `WaitObject` because we
+            //         won't drop the wait object until the waiting thread will be woken
+            //         up and make sure that it is not on the list.
+            //
+            // SAFETY: `wait_object` is a pinned reference to a `WaitObject`, so we can
+            //         safely convert it to a `Pin<UnsafeRef<WaitObject>>`.
+            Pin::new_unchecked(UnsafeRef::from_raw(&raw const *wait_object))
+        };
+
+        match *state {
+            State::Init => {
+                let mut waiters = wait_list.waiters.lock();
+                waiters.push_back(wait_object_ref);
+
+                if let Some(waker) = waker.cloned() {
+                    let old_waker = wait_object.waker.lock().replace(waker);
+                    assert!(old_waker.is_none(), "Waker already set");
+                    *state = State::WakerSet;
+                } else {
+                    *state = State::OnList;
+                }
+
+                return false;
+            }
+            // We are already on the wait list, so we can just set the waker.
+            State::OnList => {
+                // If we are already woken up, we can just return.
+                if wait_object.woken_up.load(Ordering::Acquire) {
+                    *state = State::WokenUp;
+                    return true;
+                }
+
+                if let Some(waker) = waker {
+                    // Lock the waker and check if it is already set.
+                    let mut waker_lock = wait_object.waker.lock();
+                    if wait_object.woken_up.load(Ordering::Acquire) {
+                        *state = State::WokenUp;
+                        return true;
+                    }
+
+                    let old_waker = waker_lock.replace(waker.clone());
+                    assert!(old_waker.is_none(), "Waker already set");
+                    *state = State::WakerSet;
+                }
+
+                return false;
+            }
+            _ => unreachable!("Invalid state."),
+        }
+    }
+
+    pub fn add_to_wait_list(self: Pin<&mut Self>) {
+        self.do_add_to_wait_list(None);
+    }
+}
+
+impl Future for Prepare<'_> {
+    type Output = ();
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        match self.state {
+            State::Init | State::OnList => {
+                if self.as_mut().do_add_to_wait_list(Some(cx.waker())) {
+                    // Make sure we're off the wait list.
+                    while self.wait_object().on_list() {}
+                    Poll::Ready(())
+                } else {
+                    Poll::Pending
+                }
+            }
+            State::WakerSet => {
+                if !self.as_ref().wait_object().woken_up.load(Ordering::Acquire) {
+                    // If we read `woken_up == false`, we can guarantee that we have a spurious
+                    // wakeup. In this case, we MUST be still on the wait list, so no more
+                    // actions are required.
+                    Poll::Pending
+                } else {
+                    // Make sure we're off the wait list.
+                    while self.wait_object().on_list() {}
+
+                    self.set_state(State::WokenUp);
+                    Poll::Ready(())
+                }
+            }
+            State::WokenUp => Poll::Ready(()),
+        }
+    }
+}
+
+impl Drop for Prepare<'_> {
+    fn drop(&mut self) {
+        assert_eq!(
+            self.state,
+            State::WokenUp,
+            "Prepare dropped before woken up."
+        );
+    }
+}

+ 30 - 0
crates/eonix_sync/src/wait_list/wait_object.rs

@@ -0,0 +1,30 @@
+use crate::Spin;
+use core::{marker::PhantomPinned, pin::Pin, sync::atomic::AtomicBool, task::Waker};
+use intrusive_collections::{intrusive_adapter, LinkedListAtomicLink, UnsafeRef};
+
+intrusive_adapter!(
+    pub WaitObjectAdapter = Pin<UnsafeRef<WaitObject>>:
+    WaitObject { link: LinkedListAtomicLink }
+);
+
+pub struct WaitObject {
+    pub(super) woken_up: AtomicBool,
+    pub(super) waker: Spin<Option<Waker>>,
+    link: LinkedListAtomicLink,
+    _pinned: PhantomPinned,
+}
+
+impl WaitObject {
+    pub const fn new() -> Self {
+        Self {
+            woken_up: AtomicBool::new(false),
+            waker: Spin::new(None),
+            link: LinkedListAtomicLink::new(),
+            _pinned: PhantomPinned,
+        }
+    }
+
+    pub fn on_list(&self) -> bool {
+        self.link.is_linked()
+    }
+}

+ 17 - 14
src/driver/serial.rs

@@ -8,9 +8,10 @@ use crate::{
 };
 use alloc::{collections::vec_deque::VecDeque, format, sync::Arc};
 use bitflags::bitflags;
-use eonix_runtime::{run::FutureRun, scheduler::Scheduler, task::TaskWait};
+use core::pin::pin;
+use eonix_runtime::{run::FutureRun, scheduler::Scheduler};
 use eonix_spin_irq::SpinIrq as _;
-use eonix_sync::{yield_now, WaitList as _};
+use eonix_sync::WaitList;
 
 bitflags! {
     struct LineStatus: u8 {
@@ -25,7 +26,7 @@ struct Serial {
     name: Arc<str>,
 
     terminal: Spin<Option<Arc<Terminal>>>,
-    worker_wait: TaskWait,
+    worker_wait: WaitList,
 
     working: Spin<bool>,
     tx_buffer: Spin<VecDeque<u8>>,
@@ -54,7 +55,7 @@ impl Serial {
 
     fn disable_interrupts(&self) {
         // Disable interrupt #0: Received data available
-        self.int_ena.write(0x00);
+        self.int_ena.write(0x02);
     }
 
     fn line_status(&self) -> LineStatus {
@@ -62,16 +63,18 @@ impl Serial {
     }
 
     async fn wait_for_interrupt(&self) {
-        let mut working = {
+        let mut wait = pin!(self.worker_wait.prepare_to_wait());
+
+        {
             let mut working = self.working.lock_irq();
             self.enable_interrupts();
+            wait.as_mut().add_to_wait_list();
             *working = false;
+        };
 
-            self.worker_wait.wait(working)
-        }
-        .await;
+        wait.await;
 
-        *working = true;
+        *self.working.lock_irq() = true;
         self.disable_interrupts();
     }
 
@@ -89,24 +92,24 @@ impl Serial {
 
             let should_wait = {
                 let mut tx_buffer = port.tx_buffer.lock();
+                let mut count = 0;
 
                 // Give it a chance to receive data.
-                let count = tx_buffer.len().min(64);
-                for ch in tx_buffer.drain(..count) {
+                for &ch in tx_buffer.iter().take(64) {
                     if port.line_status().contains(LineStatus::TX_READY) {
                         port.tx_rx.write(ch);
                     } else {
                         break;
                     }
+                    count += 1;
                 }
+                tx_buffer.drain(..count);
 
                 tx_buffer.is_empty()
             };
 
             if should_wait {
                 port.wait_for_interrupt().await;
-            } else {
-                yield_now().await;
             }
         }
     }
@@ -116,7 +119,7 @@ impl Serial {
             id,
             name: Arc::from(format!("ttyS{id}")),
             terminal: Spin::new(None),
-            worker_wait: TaskWait::new(),
+            worker_wait: WaitList::new(),
             working: Spin::new(true),
             tx_buffer: Spin::new(VecDeque::new()),
             tx_rx: Port8::new(base_port),

+ 2 - 6
src/kernel/task/signal.rs

@@ -11,7 +11,7 @@ use alloc::collections::{binary_heap::BinaryHeap, btree_map::BTreeMap};
 use arch::{ExtendedContext, InterruptContext};
 use bindings::{EFAULT, EINVAL};
 use core::{cmp::Reverse, task::Waker};
-use eonix_runtime::{scheduler::Scheduler, task::Task};
+use eonix_runtime::task::Task;
 use eonix_sync::AsProof as _;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
@@ -423,15 +423,11 @@ impl SignalList {
                         let mut inner = self.inner.lock();
                         let waker = Waker::from(Task::current().clone());
 
-                        unsafe {
-                            Task::current().sleep();
-                        }
-
                         let old_waker = inner.stop_waker.replace(waker);
                         assert!(old_waker.is_none(), "We should not have a waker here");
                     }
 
-                    Scheduler::schedule();
+                    Task::park_preempt_disabled();
 
                     if let Some(parent) = thread.process.parent.load() {
                         parent.notify(

+ 5 - 9
src/sync/condvar.rs

@@ -1,9 +1,9 @@
 use crate::{kernel::task::Thread, prelude::*};
 use alloc::collections::vec_deque::VecDeque;
-use core::{future::Future, task::Waker};
-use eonix_preempt::{assert_preempt_count_eq, assert_preempt_enabled};
-use eonix_runtime::{scheduler::Scheduler, task::Task};
-use eonix_sync::{sleep, ForceUnlockableGuard, UnlockableGuard, UnlockedGuard as _};
+use core::task::Waker;
+use eonix_preempt::assert_preempt_count_eq;
+use eonix_runtime::task::Task;
+use eonix_sync::ForceUnlockableGuard;
 
 pub struct CondVar<const INTERRUPTIBLE: bool> {
     waiters: Spin<VecDeque<Waker>>,
@@ -62,10 +62,6 @@ impl<const I: bool> CondVar<I> {
 
         self.waiters.lock().push_back(waker.clone());
 
-        unsafe {
-            Task::current().sleep();
-        }
-
         // TODO!!!: Another way to do this:
         //
         // Store a flag in our entry in the waiting list.
@@ -75,7 +71,7 @@ impl<const I: bool> CondVar<I> {
         unsafe { guard.force_unlock() };
 
         assert_preempt_count_eq!(1, "CondVar::wait");
-        Scheduler::schedule();
+        Task::park_preempt_disabled();
 
         if I {
             // Allow the thread to be woken up by a signal again.