소스 검색

runtime: rework the whole runtime arch. (partial)

Remove old Scheduler. Add Runtime as replacement.

Use stackless coroutine as the low level tasking mechanism and build the
stackful tasks on top of it.

Redesign of the task state system. Rework the executor.

Remove Run trait and anything related.

Signed-off-by: greatbridf <greatbridf@icloud.com>
greatbridf 6 달 전
부모
커밋
e89a286104

+ 69 - 89
crates/eonix_runtime/src/executor.rs

@@ -1,125 +1,105 @@
-mod builder;
-mod execute_status;
+// mod builder;
 mod output_handle;
 mod stack;
 
-use crate::{
-    run::{Contexted, Run, RunState},
-    scheduler::Scheduler,
-    task::Task,
+use alloc::{
+    boxed::Box,
+    sync::{Arc, Weak},
 };
-use alloc::sync::Weak;
 use core::{
+    marker::PhantomData,
     pin::Pin,
-    sync::atomic::{compiler_fence, fence, AtomicBool, Ordering},
-    task::Waker,
+    task::{Context, Poll},
 };
 use eonix_sync::Spin;
 
-pub use builder::ExecutorBuilder;
-pub use execute_status::ExecuteStatus;
 pub use output_handle::OutputHandle;
 pub use stack::Stack;
 
-/// An `Executor` executes a `Run` object in a separate thread of execution
-/// where we have a dedicated stack and context.
-pub trait Executor: Send {
-    fn progress(&self) -> ExecuteStatus;
+/// An `Executor` executes a Future object in a separate thread of execution.
+///
+/// When the Future is finished, the `Executor` will call the `OutputHandle` to commit the output.
+/// Then the `Executor` will release the resources associated with the Future.
+pub struct Executor(Option<Pin<Box<dyn TypeErasedExecutor>>>);
+
+trait TypeErasedExecutor: Send {
+    /// # Returns
+    /// Whether the executor has finished.
+    fn run(self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool;
 }
 
-struct RealExecutor<S, R>
+struct RealExecutor<'a, F>
 where
-    R: Run + Send + Contexted + 'static,
-    R::Output: Send,
+    F: Future + Send + 'a,
+    F::Output: Send + 'a,
 {
-    _stack: S,
-    runnable: R,
-    output_handle: Weak<Spin<OutputHandle<R::Output>>>,
-    finished: AtomicBool,
+    future: F,
+    output_handle: Weak<Spin<OutputHandle<F::Output>>>,
+    _phantom: PhantomData<&'a ()>,
 }
 
-impl<S, R> RealExecutor<S, R>
+impl<F> TypeErasedExecutor for RealExecutor<'_, F>
 where
-    R: Run + Send + Contexted + 'static,
-    R::Output: Send,
+    F: Future + Send,
+    F::Output: Send,
 {
-    extern "C" fn execute(self: Pin<&Self>) -> ! {
-        // We get here with preempt count == 1.
-        eonix_preempt::enable();
-
-        {
-            let waker = Waker::from(Task::current().clone());
+    fn run(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
+        if self.output_handle.as_ptr().is_null() {
+            return true;
+        }
 
-            let output_data = loop {
-                // TODO!!!!!!: CHANGE THIS.
-                let runnable_pointer = &raw const self.get_ref().runnable;
+        let future = unsafe {
+            // SAFETY: We don't move the future.
+            self.as_mut().map_unchecked_mut(|me| &mut me.future)
+        };
 
-                // SAFETY: We don't move the runnable object and we MIGHT not be using the
-                //         part that is used in `pinned_run` in the runnable...?
-                let mut pinned_runnable =
-                    unsafe { Pin::new_unchecked(&mut *(runnable_pointer as *mut R)) };
+        match future.poll(cx) {
+            Poll::Ready(output) => {
+                if let Some(output_handle) = self.output_handle.upgrade() {
+                    output_handle.lock().commit_output(output);
 
-                match pinned_runnable.as_mut().run(&waker) {
-                    RunState::Finished(output) => break output,
-                    RunState::Running => Task::park(),
+                    unsafe {
+                        // SAFETY: `output_handle` is Unpin.
+                        self.get_unchecked_mut().output_handle = Weak::new();
+                    }
                 }
-            };
 
-            if let Some(output_handle) = self.output_handle.upgrade() {
-                output_handle.lock().commit_output(output_data);
+                true
             }
-        }
-
-        // SAFETY: We are on the same CPU as the task.
-        self.finished.store(true, Ordering::Relaxed);
-
-        unsafe {
-            // SAFETY: `preempt::count()` == 1.
-            eonix_preempt::disable();
-            Scheduler::goto_scheduler_noreturn()
+            Poll::Pending => false,
         }
     }
 }
 
-impl<S, R> Executor for RealExecutor<S, R>
-where
-    S: Send,
-    R: Run + Contexted + Send,
-    R::Output: Send,
-{
-    fn progress(&self) -> ExecuteStatus {
-        // TODO!!!: If the task comes from another cpu, we need to sync.
-        //
-        // The other cpu should see the changes of kernel stack of the target thread
-        // made in this cpu.
-        //
-        // Can we find a better way other than `fence`s?
-        //
-        // An alternative way is to use an atomic variable to store the cpu id of
-        // the current task. Then we can use acquire release swap to ensure that the
-        // other cpu sees the changes.
-        fence(Ordering::SeqCst);
-        compiler_fence(Ordering::SeqCst);
-
-        // TODO!!!: We should load the context only if the previous task is
-        // different from the current task.
-
-        self.runnable.load_running_context();
-
-        unsafe {
-            // SAFETY: We are in the scheduler context and we are not preempted.
-            Scheduler::go_from_scheduler(&Task::current().execution_context);
-        }
-
-        self.runnable.restore_running_context();
+impl Executor {
+    pub fn new<F>(future: F) -> (Self, Arc<Spin<OutputHandle<F::Output>>>)
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        let output_handle = OutputHandle::new();
+
+        // TODO: accept futures with non 'static lifetimes.
+        (
+            Executor(Some(Box::pin(RealExecutor {
+                future,
+                output_handle: Arc::downgrade(&output_handle),
+                _phantom: PhantomData,
+            }))),
+            output_handle,
+        )
+    }
 
-        compiler_fence(Ordering::SeqCst);
-        fence(Ordering::SeqCst);
+    pub fn run(&mut self, cx: &mut Context<'_>) -> bool {
+        if let Some(executor) = self.0.as_mut() {
+            let finished = executor.as_mut().run(cx);
+            if finished {
+                self.0.take();
+            }
 
-        if self.finished.load(Ordering::Acquire) {
-            ExecuteStatus::Finished
+            finished
         } else {
-            ExecuteStatus::Executing
+            true
         }
     }
 }

+ 1 - 5
crates/eonix_runtime/src/executor/builder.rs

@@ -1,8 +1,5 @@
 use super::{Executor, OutputHandle, RealExecutor, Stack};
-use crate::{
-    context::ExecutionContext,
-    run::{Contexted, Run},
-};
+use crate::context::ExecutionContext;
 use alloc::{boxed::Box, sync::Arc};
 use core::{pin::Pin, sync::atomic::AtomicBool};
 use eonix_sync::Spin;
@@ -15,7 +12,6 @@ pub struct ExecutorBuilder<S, R> {
 impl<S, R> ExecutorBuilder<S, R>
 where
     S: Stack,
-    R: Run + Contexted + Send + 'static,
     R::Output: Send,
 {
     pub fn new() -> Self {

+ 0 - 4
crates/eonix_runtime/src/executor/execute_status.rs

@@ -1,4 +0,0 @@
-pub enum ExecuteStatus {
-    Executing,
-    Finished,
-}

+ 0 - 1
crates/eonix_runtime/src/lib.rs

@@ -3,7 +3,6 @@
 pub mod context;
 pub mod executor;
 mod ready_queue;
-pub mod run;
 pub mod scheduler;
 pub mod task;
 

+ 0 - 34
crates/eonix_runtime/src/run.rs

@@ -1,34 +0,0 @@
-mod future_run;
-
-use core::{pin::Pin, task::Waker};
-pub use future_run::FutureRun;
-
-pub enum RunState<Output> {
-    Running,
-    Finished(Output),
-}
-
-pub trait Contexted {
-    /// # Safety
-    /// This function should be called in a preemption disabled context.
-    fn load_running_context(&self) {}
-
-    /// # Safety
-    /// This function should be called in a preemption disabled context.
-    fn restore_running_context(&self) {}
-}
-
-pub trait Run {
-    type Output;
-
-    fn run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output>;
-
-    fn join(mut self: Pin<&mut Self>, waker: &Waker) -> Self::Output {
-        loop {
-            match self.as_mut().run(waker) {
-                RunState::Running => continue,
-                RunState::Finished(output) => break output,
-            }
-        }
-    }
-}

+ 0 - 34
crates/eonix_runtime/src/run/future_run.rs

@@ -1,34 +0,0 @@
-use super::{Contexted, Run, RunState};
-use core::{
-    pin::Pin,
-    task::{Context, Poll, Waker},
-};
-
-pub struct FutureRun<F: Future>(F);
-
-impl<F> FutureRun<F>
-where
-    F: Future,
-{
-    pub const fn new(future: F) -> Self {
-        Self(future)
-    }
-}
-
-impl<F> Contexted for FutureRun<F> where F: Future {}
-impl<F> Run for FutureRun<F>
-where
-    F: Future + 'static,
-{
-    type Output = F::Output;
-
-    fn run(self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output> {
-        let mut future = unsafe { self.map_unchecked_mut(|me| &mut me.0) };
-        let mut context = Context::from_waker(waker);
-
-        match future.as_mut().poll(&mut context) {
-            Poll::Ready(output) => RunState::Finished(output),
-            Poll::Pending => RunState::Running,
-        }
-    }
-}

+ 104 - 161
crates/eonix_runtime/src/scheduler.rs

@@ -1,20 +1,16 @@
 use crate::{
-    context::ExecutionContext,
-    executor::{ExecuteStatus, OutputHandle, Stack},
-    ready_queue::{cpu_rq, local_rq},
-    run::{Contexted, Run},
-    task::{Task, TaskAdapter, TaskHandle},
+    executor::OutputHandle,
+    ready_queue::{cpu_rq, local_rq, ReadyQueue},
+    task::{Task, TaskAdapter, TaskHandle, TaskState},
 };
 use alloc::sync::Arc;
 use core::{
-    mem::forget,
+    ops::{Deref, DerefMut},
     ptr::NonNull,
-    sync::atomic::{compiler_fence, Ordering},
+    sync::atomic::Ordering,
     task::Waker,
 };
 use eonix_hal::processor::halt;
-use eonix_log::println_trace;
-use eonix_preempt::assert_preempt_count_eq;
 use eonix_sync::{LazyLock, Spin, SpinIrq as _};
 use intrusive_collections::RBTree;
 use pointers::BorrowedArc;
@@ -22,13 +18,12 @@ use pointers::BorrowedArc;
 #[eonix_percpu::define_percpu]
 static CURRENT_TASK: Option<NonNull<Task>> = None;
 
-#[eonix_percpu::define_percpu]
-static LOCAL_SCHEDULER_CONTEXT: ExecutionContext = ExecutionContext::new();
-
 static TASKS: LazyLock<Spin<RBTree<TaskAdapter>>> =
     LazyLock::new(|| Spin::new(RBTree::new(TaskAdapter::new())));
 
-pub struct Scheduler;
+pub static RUNTIME: Runtime = Runtime();
+
+pub struct Runtime();
 
 pub struct JoinHandle<Output>(Arc<Spin<OutputHandle<Output>>>)
 where
@@ -68,74 +63,7 @@ where
     }
 }
 
-impl Scheduler {
-    /// `Scheduler` might be used in various places. Do not hold it for a long time.
-    ///
-    /// # Safety
-    /// The locked returned by this function should be locked with `lock_irq` to prevent from
-    /// rescheduling during access to the scheduler. Disabling preemption will do the same.
-    ///
-    /// Drop the lock before calling `schedule`.
-    pub fn get() -> &'static Self {
-        static GLOBAL_SCHEDULER: Scheduler = Scheduler;
-        &GLOBAL_SCHEDULER
-    }
-
-    pub fn init_local_scheduler<S>()
-    where
-        S: Stack,
-    {
-        let stack = S::new();
-
-        unsafe {
-            eonix_preempt::disable();
-            // SAFETY: Preemption is disabled.
-            let context: &mut ExecutionContext = LOCAL_SCHEDULER_CONTEXT.as_mut();
-            context.set_ip(local_scheduler as _);
-            context.set_sp(stack.get_bottom().addr().get() as usize);
-            context.set_interrupt(true);
-            eonix_preempt::enable();
-        }
-
-        // We don't need to keep the stack around.
-        forget(stack);
-    }
-
-    /// # Safety
-    /// This function must not be called inside of the scheulder context.
-    ///
-    /// The caller must ensure that `preempt::count` == 1.
-    pub unsafe fn go_from_scheduler(to: &ExecutionContext) {
-        // SAFETY: Preemption is disabled.
-        unsafe { LOCAL_SCHEDULER_CONTEXT.as_ref() }.switch_to(to);
-    }
-
-    /// # Safety
-    /// This function must not be called inside of the scheulder context.
-    ///
-    /// The caller must ensure that `preempt::count` == 1.
-    pub unsafe fn goto_scheduler(from: &ExecutionContext) {
-        // SAFETY: Preemption is disabled.
-        from.switch_to(unsafe { LOCAL_SCHEDULER_CONTEXT.as_ref() });
-    }
-
-    /// # Safety
-    /// This function must not be called inside of the scheulder context.
-    ///
-    /// The caller must ensure that `preempt::count` == 1.
-    pub unsafe fn goto_scheduler_noreturn() -> ! {
-        // SAFETY: Preemption is disabled.
-        unsafe { LOCAL_SCHEDULER_CONTEXT.as_ref().switch_noreturn() }
-    }
-
-    fn add_task(task: Arc<Task>) {
-        TASKS.lock().insert(task);
-    }
-
-    fn remove_task(task: &Task) {
-        unsafe { TASKS.lock().cursor_mut_from_ptr(task as *const _).remove() };
-    }
-
+impl Runtime {
     fn select_cpu_for_task(&self, task: &Task) -> usize {
         task.cpu.load(Ordering::Relaxed) as _
     }
@@ -165,112 +93,127 @@ impl Scheduler {
         }
     }
 
-    pub fn spawn<S, R>(&self, runnable: R) -> JoinHandle<R::Output>
+    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
     where
-        S: Stack + 'static,
-        R: Run + Contexted + Send + 'static,
-        R::Output: Send + 'static,
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
     {
         let TaskHandle {
             task,
             output_handle,
-        } = Task::new::<S, _>(runnable);
+        } = Task::new(future);
 
-        Self::add_task(task.clone());
+        self.add_task(task.clone());
         self.activate(&task);
 
         JoinHandle(output_handle)
     }
 
-    /// Go to idle task. Call this with `preempt_count == 1`.
-    /// The preempt count will be decremented by this function.
-    ///
-    /// # Safety
-    /// We might never return from here.
-    /// Drop all variables that take ownership of some resource before calling this function.
-    pub fn schedule() {
-        assert_preempt_count_eq!(1, "Scheduler::schedule");
+    // /// Go to idle task. Call this with `preempt_count == 1`.
+    // /// The preempt count will be decremented by this function.
+    // ///
+    // /// # Safety
+    // /// We might never return from here.
+    // /// Drop all variables that take ownership of some resource before calling this function.
+    // pub fn schedule() {
+    //     assert_preempt_count_eq!(1, "Scheduler::schedule");
+
+    //     // Make sure all works are done before scheduling.
+    //     compiler_fence(Ordering::SeqCst);
+
+    //     // TODO!!!!!: Use of reference here needs further consideration.
+    //     //
+    //     // Since we might never return to here, we can't take ownership of `current()`.
+    //     // Is it safe to believe that `current()` will never change across calls?
+    //     unsafe {
+    //         // SAFETY: Preemption is disabled.
+    //         Scheduler::goto_scheduler(&Task::current().execution_context);
+    //     }
+    //     eonix_preempt::enable();
+    // }
+}
 
-        // Make sure all works are done before scheduling.
-        compiler_fence(Ordering::SeqCst);
+impl Runtime {
+    fn add_task(&self, task: Arc<Task>) {
+        TASKS.lock_irq().insert(task);
+    }
 
-        // TODO!!!!!: Use of reference here needs further consideration.
-        //
-        // Since we might never return to here, we can't take ownership of `current()`.
-        // Is it safe to believe that `current()` will never change across calls?
+    fn remove_task(&self, task: &impl Deref<Target = Arc<Task>>) {
         unsafe {
-            // SAFETY: Preemption is disabled.
-            Scheduler::goto_scheduler(&Task::current().execution_context);
+            TASKS
+                .lock_irq()
+                .cursor_mut_from_ptr(Arc::as_ptr(task))
+                .remove();
         }
-        eonix_preempt::enable();
     }
-}
-
-extern "C" fn local_scheduler() -> ! {
-    loop {
-        assert_preempt_count_eq!(1, "Scheduler::idle_task");
-        let mut rq = local_rq().lock_irq();
 
-        let previous_task = CURRENT_TASK
+    fn current(&self) -> Option<BorrowedArc<Task>> {
+        CURRENT_TASK
             .get()
-            .map(|ptr| unsafe { Arc::from_raw(ptr.as_ptr()) });
-        let next_task = rq.get();
+            .map(|ptr| unsafe { BorrowedArc::from_raw(ptr) })
+    }
+
+    fn remove_and_enqueue_current(&self, rq: &mut impl DerefMut<Target = dyn ReadyQueue>) {
+        let Some(current) = self.current() else {
+            return;
+        };
+
+        match current.state.cmpxchg(TaskState::RUNNING, TaskState::READY) {
+            Ok(_) => {
+                let current = unsafe {
+                    Arc::from_raw(
+                        CURRENT_TASK
+                            .get()
+                            .expect("Current task should be present")
+                            .as_ptr(),
+                    )
+                };
+
+                rq.put(current);
+            }
+            Err(old) => {
+                assert_eq!(
+                    old,
+                    TaskState::PARKED,
+                    "Current task should be in PARKED state"
+                );
+            }
+        }
+    }
 
-        match (previous_task, next_task) {
-            (None, None) => {
-                // Nothing to do, halt the cpu and rerun the loop.
+    /// Enter the runtime with an "init" future and run till its completion.
+    ///
+    /// The "init" future has the highest priority and when it completes,
+    /// the runtime will exit immediately and yield its output.
+    pub fn enter(&self) {
+        loop {
+            let mut rq = local_rq().lock_irq();
+
+            self.remove_and_enqueue_current(&mut rq);
+
+            let Some(next) = rq.get() else {
                 drop(rq);
                 halt();
                 continue;
-            }
-            (None, Some(next)) => {
-                CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
-            }
-            (Some(previous), None) => {
-                if previous.state.is_running() {
-                    // Previous thread is `Running`, return to the current running thread.
-                    println_trace!(
-                        "trace_scheduler",
-                        "Returning to task id({}) without doing context switch",
-                        previous.id
-                    );
-                    CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
-                } else {
-                    // Nothing to do, halt the cpu and rerun the loop.
-                    CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
-                    drop(rq);
-                    halt();
-                    continue;
-                }
-            }
-            (Some(previous), Some(next)) => {
-                println_trace!(
-                    "trace_scheduler",
-                    "Switching from task id({}) to task id({})",
-                    previous.id,
-                    next.id
-                );
+            };
 
-                debug_assert_ne!(previous.id, next.id, "Switching to the same task");
+            let old_state = next.state.swap(TaskState::RUNNING);
+            assert_eq!(
+                old_state,
+                TaskState::READY,
+                "Next task should be in READY state"
+            );
 
-                if previous.state.is_running() || !previous.state.try_park() {
-                    rq.put(previous);
-                } else {
-                    previous.on_rq.store(false, Ordering::Release);
-                }
+            CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
+            drop(rq);
 
-                CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
-            }
-        }
+            // TODO: MAYBE we can move the release of finished tasks to some worker thread.
+            if Task::current().run() {
+                Task::current().state.set(TaskState::DEAD);
+                CURRENT_TASK.set(None);
 
-        drop(rq);
-        // TODO: We can move the release of finished tasks to some worker thread.
-        if let ExecuteStatus::Finished = Task::current().run() {
-            let current = CURRENT_TASK
-                .swap(None)
-                .map(|ptr| unsafe { Arc::from_raw(ptr.as_ptr()) })
-                .expect("Current task should be present");
-            Scheduler::remove_task(&current);
+                self.remove_task(&Task::current());
+            }
         }
     }
 }

+ 52 - 130
crates/eonix_runtime/src/task.rs

@@ -2,25 +2,22 @@ mod adapter;
 mod task_state;
 
 use crate::{
-    context::ExecutionContext,
-    executor::{ExecuteStatus, Executor, ExecutorBuilder, OutputHandle, Stack},
-    run::{Contexted, Run},
-    scheduler::Scheduler,
+    executor::{Executor, OutputHandle},
+    ready_queue::{cpu_rq, ReadyQueue},
 };
-use alloc::{boxed::Box, sync::Arc, task::Wake};
+use alloc::{sync::Arc, task::Wake};
 use atomic_unique_refcell::AtomicUniqueRefCell;
 use core::{
-    pin::{pin, Pin},
-    sync::atomic::{AtomicBool, AtomicU32, Ordering},
-    task::{Context, Poll, Waker},
+    ops::DerefMut,
+    sync::atomic::{AtomicU32, Ordering},
+    task::{Context, Waker},
 };
 use eonix_hal::processor::CPU;
-use eonix_preempt::assert_preempt_enabled;
-use eonix_sync::Spin;
-use intrusive_collections::RBTreeAtomicLink;
-use task_state::TaskState;
+use eonix_sync::{Spin, SpinIrq};
+use intrusive_collections::{LinkedListAtomicLink, RBTreeAtomicLink};
 
-pub use adapter::TaskAdapter;
+pub use adapter::{TaskAdapter, TaskRqAdapter};
+pub(crate) use task_state::TaskState;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
 pub struct TaskId(u32);
@@ -33,159 +30,70 @@ where
     pub(crate) output_handle: Arc<Spin<OutputHandle<Output>>>,
 }
 
-/// A `Task` represents a schedulable unit.
-///
-/// Initial: state = Running, unparked = false
-///
-/// Task::park() => swap state <- Parking, assert prev == Running
-///              => swap unparked <- false
-///              -> true => store state <- Running => return
-///              -> false => goto scheduler => get rq lock => load state
-///                                                        -> Running => enqueue
-///                                                        -> Parking => cmpxchg Parking -> Parked
-///                                                                   -> Running => enqueue
-///                                                                   -> Parking => on_rq <- false
-///                                                                   -> Parked => ???
-///
-/// Task::unpark() => swap unparked <- true
-///                -> true => return
-///                -> false => swap state <- Running
-///                         -> Running => return
-///                         -> Parking | Parked => Scheduler::activate
 pub struct Task {
     /// Unique identifier of the task.
     pub id: TaskId,
-    /// Whether the task is on some run queue (a.k.a ready).
-    pub(crate) on_rq: AtomicBool,
-    /// Whether someone has called `unpark` on this task.
-    pub(crate) unparked: AtomicBool,
     /// The last cpu that the task was executed on.
     /// If `on_rq` is `false`, we can't assume that this task is still on the cpu.
     pub(crate) cpu: AtomicU32,
     /// Task state.
     pub(crate) state: TaskState,
-    /// Task execution context.
-    pub(crate) execution_context: ExecutionContext,
     /// Executor object.
-    executor: AtomicUniqueRefCell<Option<Pin<Box<dyn Executor>>>>,
+    executor: AtomicUniqueRefCell<Executor>,
     /// Link in the global task list.
     link_task_list: RBTreeAtomicLink,
+    /// Link in the ready queue.
+    link_ready_queue: LinkedListAtomicLink,
 }
 
 impl Task {
-    pub fn new<S, R>(runnable: R) -> TaskHandle<R::Output>
+    pub fn new<F>(future: F) -> TaskHandle<F::Output>
     where
-        S: Stack + 'static,
-        R: Run + Contexted + Send + 'static,
-        R::Output: Send + 'static,
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
     {
         static ID: AtomicU32 = AtomicU32::new(0);
 
-        let (executor, execution_context, output) = ExecutorBuilder::new()
-            .stack(S::new())
-            .runnable(runnable)
-            .build();
+        let (executor, output_handle) = Executor::new(future);
 
         let task = Arc::new(Self {
             id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
-            on_rq: AtomicBool::new(false),
-            unparked: AtomicBool::new(false),
             cpu: AtomicU32::new(CPU::local().cpuid() as u32),
             state: TaskState::new(TaskState::RUNNING),
-            executor: AtomicUniqueRefCell::new(Some(executor)),
-            execution_context,
+            executor: AtomicUniqueRefCell::new(executor),
             link_task_list: RBTreeAtomicLink::new(),
+            link_ready_queue: LinkedListAtomicLink::new(),
         });
 
         TaskHandle {
             task,
-            output_handle: output,
+            output_handle,
         }
     }
 
-    pub fn run(&self) -> ExecuteStatus {
+    /// # Returns
+    /// Whether the task has finished.
+    pub fn run(self: &Arc<Self>) -> bool {
         let mut executor_borrow = self.executor.borrow();
+        let waker = Waker::from(self.clone());
+        let mut cx = Context::from_waker(&waker);
 
-        let executor = executor_borrow
-            .as_ref()
-            .expect("Executor should be present")
-            .as_ref()
-            .get_ref();
-
-        if let ExecuteStatus::Finished = executor.progress() {
-            executor_borrow.take();
-            ExecuteStatus::Finished
-        } else {
-            ExecuteStatus::Executing
-        }
-    }
-
-    pub fn unpark(self: &Arc<Self>) {
-        if self.unparked.swap(true, Ordering::Release) {
-            return;
-        }
-
-        eonix_preempt::disable();
-
-        match self.state.swap(TaskState::RUNNING) {
-            TaskState::RUNNING => {}
-            TaskState::PARKED | TaskState::PARKING => {
-                // We are waking up from sleep or someone else is parking this task.
-                // Try to wake it up.
-                Scheduler::get().activate(self);
-            }
-            _ => unreachable!(),
-        }
-
-        eonix_preempt::enable();
+        executor_borrow.run(&mut cx)
     }
 
-    pub fn park() {
-        eonix_preempt::disable();
-        Self::park_preempt_disabled();
-    }
-
-    /// Park the current task with `preempt::count() == 1`.
-    pub fn park_preempt_disabled() {
-        let task = Task::current();
-
-        let old_state = task.state.swap(TaskState::PARKING);
-        assert_eq!(
-            old_state,
-            TaskState::RUNNING,
-            "Parking a task that is not running."
-        );
-
-        if task.unparked.swap(false, Ordering::AcqRel) {
-            // Someone has called `unpark` on this task previously.
-            task.state.swap(TaskState::RUNNING);
-        } else {
-            unsafe {
-                // SAFETY: Preemption is disabled.
-                Scheduler::goto_scheduler(&Task::current().execution_context)
-            };
-            assert!(task.unparked.swap(false, Ordering::Acquire));
-        }
-
-        eonix_preempt::enable();
-    }
-
-    pub fn block_on<F>(future: F) -> F::Output
-    where
-        F: Future,
-    {
-        assert_preempt_enabled!("block_on() must be called with preemption enabled");
-
-        let waker = Waker::from(Task::current().clone());
-        let mut context = Context::from_waker(&waker);
-        let mut future = pin!(future);
-
+    /// Get the stabilized lock for the task's run queue.
+    fn rq(&self) -> Option<impl DerefMut<Target = dyn ReadyQueue> + 'static> {
         loop {
-            if let Poll::Ready(output) = future.as_mut().poll(&mut context) {
-                break output;
+            let cpu = self.cpu.load(Ordering::Relaxed);
+            let rq = cpu_rq(cpu as usize).lock_irq();
+
+            if cpu == self.cpu.load(Ordering::Acquire) {
+                if self.link_ready_queue.is_linked() {
+                    return Some(rq);
+                } else {
+                    return None;
+                }
             }
-
-            Task::park();
         }
     }
 }
@@ -196,6 +104,20 @@ impl Wake for Task {
     }
 
     fn wake_by_ref(self: &Arc<Self>) {
-        self.unpark();
+        if self
+            .state
+            .cmpxchg(TaskState::PARKED, TaskState::READY)
+            .is_err()
+        {
+            return;
+        }
+
+        if let Some(mut rq) = self.rq() {
+            if self.state.get() != TaskState::PARKED {
+                return;
+            }
+
+            rq.put(self.clone());
+        }
     }
 }

+ 2 - 1
crates/eonix_runtime/src/task/adapter.rs

@@ -1,8 +1,9 @@
 use super::{Task, TaskId};
 use alloc::sync::Arc;
-use intrusive_collections::{intrusive_adapter, KeyAdapter, RBTreeAtomicLink};
+use intrusive_collections::{intrusive_adapter, KeyAdapter, LinkedListAtomicLink, RBTreeAtomicLink};
 
 intrusive_adapter!(pub TaskAdapter = Arc<Task>: Task { link_task_list: RBTreeAtomicLink });
+intrusive_adapter!(pub TaskRqAdapter = Arc<Task>: Task { link_ready_queue: LinkedListAtomicLink });
 
 impl<'a> KeyAdapter<'a> for TaskAdapter {
     type Key = TaskId;

+ 13 - 16
crates/eonix_runtime/src/task/task_state.rs

@@ -4,32 +4,29 @@ use core::sync::atomic::{AtomicU32, Ordering};
 pub struct TaskState(AtomicU32);
 
 impl TaskState {
-    pub const RUNNING: u32 = 0;
-    pub const PARKING: u32 = 1;
+    pub const READY: u32 = 0;
+    pub const RUNNING: u32 = 1;
     pub const PARKED: u32 = 2;
+    pub const DEAD: u32 = 1 << 31;
 
     pub(crate) const fn new(state: u32) -> Self {
         Self(AtomicU32::new(state))
     }
 
     pub(crate) fn swap(&self, state: u32) -> u32 {
-        self.0.swap(state, Ordering::AcqRel)
+        self.0.swap(state, Ordering::SeqCst)
     }
 
-    pub(crate) fn try_park(&self) -> bool {
-        match self.0.compare_exchange(
-            TaskState::PARKING,
-            TaskState::PARKED,
-            Ordering::AcqRel,
-            Ordering::Acquire,
-        ) {
-            Ok(_) => true,
-            Err(TaskState::RUNNING) => false,
-            Err(_) => unreachable!("Invalid task state while trying to park."),
-        }
+    pub(crate) fn set(&self, state: u32) {
+        self.0.store(state, Ordering::SeqCst);
     }
 
-    pub(crate) fn is_running(&self) -> bool {
-        self.0.load(Ordering::Acquire) == Self::RUNNING
+    pub(crate) fn get(&self) -> u32 {
+        self.0.load(Ordering::SeqCst)
+    }
+
+    pub(crate) fn cmpxchg(&self, current: u32, new: u32) -> Result<u32, u32> {
+        self.0
+            .compare_exchange(current, new, Ordering::SeqCst, Ordering::SeqCst)
     }
 }