Explorar o código

runtime: new task sleep-wakeup method and some adaption

We use RUNNING to indicate that the task is on the cpu, and use READY to
indicate that the task could be further run again and therefore put into
the ready queue after one poll() call.

When the task is acquired from the ready queue and put onto cpu, it's
marked as RUNNING only, making it put suspended after we got the
Poll::Pending from the poll() call. If we (or others) call Waker::wake()
within the run, we'll set the READY flag then. And when we return from
the poll call, we could find it by a CAS and put it back to the ready
queue again.

We've also done some adaption work to the rest of the kernel, mainly to
remove *SOME* of the Task::block_on calls. But to completely remove it
is not possible for now. We should solve that in further few commits.

Signed-off-by: greatbridf <greatbridf@icloud.com>
greatbridf hai 6 meses
pai
achega
e23c9eb1f2

+ 14 - 25
crates/eonix_runtime/src/executor.rs

@@ -23,9 +23,7 @@ pub use stack::Stack;
 pub struct Executor(Option<Pin<Box<dyn TypeErasedExecutor>>>);
 pub struct Executor(Option<Pin<Box<dyn TypeErasedExecutor>>>);
 
 
 trait TypeErasedExecutor: Send {
 trait TypeErasedExecutor: Send {
-    /// # Returns
-    /// Whether the executor has finished.
-    fn run(self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool;
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
 }
 }
 
 
 struct RealExecutor<'a, F>
 struct RealExecutor<'a, F>
@@ -43,9 +41,9 @@ where
     F: Future + Send,
     F: Future + Send,
     F::Output: Send,
     F::Output: Send,
 {
 {
-    fn run(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
         if self.output_handle.as_ptr().is_null() {
         if self.output_handle.as_ptr().is_null() {
-            return true;
+            return Poll::Ready(());
         }
         }
 
 
         let future = unsafe {
         let future = unsafe {
@@ -53,21 +51,16 @@ where
             self.as_mut().map_unchecked_mut(|me| &mut me.future)
             self.as_mut().map_unchecked_mut(|me| &mut me.future)
         };
         };
 
 
-        match future.poll(cx) {
-            Poll::Ready(output) => {
-                if let Some(output_handle) = self.output_handle.upgrade() {
-                    output_handle.lock().commit_output(output);
+        future.poll(cx).map(|output| {
+            if let Some(output_handle) = self.output_handle.upgrade() {
+                output_handle.lock().commit_output(output);
 
 
-                    unsafe {
-                        // SAFETY: `output_handle` is Unpin.
-                        self.get_unchecked_mut().output_handle = Weak::new();
-                    }
+                unsafe {
+                    // SAFETY: `output_handle` is Unpin.
+                    self.get_unchecked_mut().output_handle = Weak::new();
                 }
                 }
-
-                true
             }
             }
-            Poll::Pending => false,
-        }
+        })
     }
     }
 }
 }
 
 
@@ -79,7 +72,6 @@ impl Executor {
     {
     {
         let output_handle = OutputHandle::new();
         let output_handle = OutputHandle::new();
 
 
-        // TODO: accept futures with non 'static lifetimes.
         (
         (
             Executor(Some(Box::pin(RealExecutor {
             Executor(Some(Box::pin(RealExecutor {
                 future,
                 future,
@@ -90,16 +82,13 @@ impl Executor {
         )
         )
     }
     }
 
 
-    pub fn run(&mut self, cx: &mut Context<'_>) -> bool {
+    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
         if let Some(executor) = self.0.as_mut() {
         if let Some(executor) = self.0.as_mut() {
-            let finished = executor.as_mut().run(cx);
-            if finished {
+            executor.as_mut().poll(cx).map(|_| {
                 self.0.take();
                 self.0.take();
-            }
-
-            finished
+            })
         } else {
         } else {
-            true
+            Poll::Ready(())
         }
         }
     }
     }
 }
 }

+ 57 - 72
crates/eonix_runtime/src/scheduler.rs

@@ -1,14 +1,13 @@
 use crate::{
 use crate::{
     executor::OutputHandle,
     executor::OutputHandle,
-    ready_queue::{cpu_rq, local_rq, ReadyQueue},
+    ready_queue::{local_rq, ReadyQueue},
     task::{Task, TaskAdapter, TaskHandle, TaskState},
     task::{Task, TaskAdapter, TaskHandle, TaskState},
 };
 };
-use alloc::sync::Arc;
+use alloc::{sync::Arc, task::Wake};
 use core::{
 use core::{
     ops::{Deref, DerefMut},
     ops::{Deref, DerefMut},
     ptr::NonNull,
     ptr::NonNull,
-    sync::atomic::Ordering,
-    task::Waker,
+    task::{Context, Poll, Waker},
 };
 };
 use eonix_hal::processor::halt;
 use eonix_hal::processor::halt;
 use eonix_sync::{LazyLock, Spin, SpinIrq as _};
 use eonix_sync::{LazyLock, Spin, SpinIrq as _};
@@ -64,35 +63,6 @@ where
 }
 }
 
 
 impl Runtime {
 impl Runtime {
-    fn select_cpu_for_task(&self, task: &Task) -> usize {
-        task.cpu.load(Ordering::Relaxed) as _
-    }
-
-    pub fn activate(&self, task: &Arc<Task>) {
-        // Only one cpu can be activating the task at a time.
-        // TODO: Add some checks.
-
-        if task.on_rq.swap(true, Ordering::Acquire) {
-            // Lock the rq and check whether the task is on the rq again.
-            let cpuid = task.cpu.load(Ordering::Acquire);
-            let mut rq = cpu_rq(cpuid as _).lock_irq();
-
-            if !task.on_rq.load(Ordering::Acquire) {
-                // Task has just got off the rq. Put it back.
-                rq.put(task.clone());
-            } else {
-                // Task is already on the rq. Do nothing.
-                return;
-            }
-        } else {
-            // Task not on some rq. Select one and put it here.
-            let cpu = self.select_cpu_for_task(&task);
-            let mut rq = cpu_rq(cpu).lock_irq();
-            task.cpu.store(cpu as _, Ordering::Release);
-            rq.put(task.clone());
-        }
-    }
-
     pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
     pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
     where
     where
         F: Future + Send + 'static,
         F: Future + Send + 'static,
@@ -104,36 +74,11 @@ impl Runtime {
         } = Task::new(future);
         } = Task::new(future);
 
 
         self.add_task(task.clone());
         self.add_task(task.clone());
-        self.activate(&task);
+        task.wake_by_ref();
 
 
         JoinHandle(output_handle)
         JoinHandle(output_handle)
     }
     }
 
 
-    // /// Go to idle task. Call this with `preempt_count == 1`.
-    // /// The preempt count will be decremented by this function.
-    // ///
-    // /// # Safety
-    // /// We might never return from here.
-    // /// Drop all variables that take ownership of some resource before calling this function.
-    // pub fn schedule() {
-    //     assert_preempt_count_eq!(1, "Scheduler::schedule");
-
-    //     // Make sure all works are done before scheduling.
-    //     compiler_fence(Ordering::SeqCst);
-
-    //     // TODO!!!!!: Use of reference here needs further consideration.
-    //     //
-    //     // Since we might never return to here, we can't take ownership of `current()`.
-    //     // Is it safe to believe that `current()` will never change across calls?
-    //     unsafe {
-    //         // SAFETY: Preemption is disabled.
-    //         Scheduler::goto_scheduler(&Task::current().execution_context);
-    //     }
-    //     eonix_preempt::enable();
-    // }
-}
-
-impl Runtime {
     fn add_task(&self, task: Arc<Task>) {
     fn add_task(&self, task: Arc<Task>) {
         TASKS.lock_irq().insert(task);
         TASKS.lock_irq().insert(task);
     }
     }
@@ -158,12 +103,18 @@ impl Runtime {
             return;
             return;
         };
         };
 
 
-        match current.state.cmpxchg(TaskState::RUNNING, TaskState::READY) {
-            Ok(_) => {
+        match current.state.update(|state| match state {
+            TaskState::READY_RUNNING => Some(TaskState::READY),
+            TaskState::RUNNING => Some(TaskState::BLOCKED),
+            _ => {
+                unreachable!("Current task should be at least in RUNNING state, but got {state:?}")
+            }
+        }) {
+            Ok(TaskState::READY_RUNNING) => {
                 let current = unsafe {
                 let current = unsafe {
                     Arc::from_raw(
                     Arc::from_raw(
                         CURRENT_TASK
                         CURRENT_TASK
-                            .get()
+                            .swap(None)
                             .expect("Current task should be present")
                             .expect("Current task should be present")
                             .as_ptr(),
                             .as_ptr(),
                     )
                     )
@@ -171,14 +122,40 @@ impl Runtime {
 
 
                 rq.put(current);
                 rq.put(current);
             }
             }
-            Err(old) => {
-                assert_eq!(
-                    old,
-                    TaskState::PARKED,
-                    "Current task should be in PARKED state"
-                );
+            Ok(_) => {}
+            _ => unreachable!(),
+        }
+    }
+
+    pub fn block_till_woken(set_waker: impl FnOnce(&Waker)) -> impl Future<Output = ()> {
+        struct BlockTillWoken<F: FnOnce(&Waker)> {
+            set_waker: Option<F>,
+            slept: bool,
+        }
+
+        impl<F: FnOnce(&Waker)> Future for BlockTillWoken<F> {
+            type Output = ();
+
+            fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+                if self.slept {
+                    Poll::Ready(())
+                } else {
+                    let (set_waker, slept) = unsafe {
+                        let me = self.get_unchecked_mut();
+                        (me.set_waker.take().unwrap(), &mut me.slept)
+                    };
+
+                    set_waker(cx.waker());
+                    *slept = true;
+                    Poll::Pending
+                }
             }
             }
         }
         }
+
+        BlockTillWoken {
+            set_waker: Some(set_waker),
+            slept: false,
+        }
     }
     }
 
 
     /// Enter the runtime with an "init" future and run till its completion.
     /// Enter the runtime with an "init" future and run till its completion.
@@ -204,15 +181,23 @@ impl Runtime {
                 "Next task should be in READY state"
                 "Next task should be in READY state"
             );
             );
 
 
-            CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
+            unsafe {
+                CURRENT_TASK.set(Some(NonNull::new_unchecked(Arc::into_raw(next) as *mut _)));
+            }
+
             drop(rq);
             drop(rq);
 
 
             // TODO: MAYBE we can move the release of finished tasks to some worker thread.
             // TODO: MAYBE we can move the release of finished tasks to some worker thread.
-            if Task::current().run() {
-                Task::current().state.set(TaskState::DEAD);
-                CURRENT_TASK.set(None);
+            if Task::current().poll().is_ready() {
+                let old_state = Task::current().state.swap(TaskState::DEAD);
+                assert!(
+                    old_state & TaskState::RUNNING != 0,
+                    "Current task should be at least in RUNNING state"
+                );
 
 
                 self.remove_task(&Task::current());
                 self.remove_task(&Task::current());
+
+                CURRENT_TASK.set(None);
             }
             }
         }
         }
     }
     }

+ 27 - 25
crates/eonix_runtime/src/task.rs

@@ -10,7 +10,7 @@ use atomic_unique_refcell::AtomicUniqueRefCell;
 use core::{
 use core::{
     ops::DerefMut,
     ops::DerefMut,
     sync::atomic::{AtomicU32, Ordering},
     sync::atomic::{AtomicU32, Ordering},
-    task::{Context, Waker},
+    task::{Context, Poll, Waker},
 };
 };
 use eonix_hal::processor::CPU;
 use eonix_hal::processor::CPU;
 use eonix_sync::{Spin, SpinIrq};
 use eonix_sync::{Spin, SpinIrq};
@@ -59,7 +59,7 @@ impl Task {
         let task = Arc::new(Self {
         let task = Arc::new(Self {
             id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
             id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
             cpu: AtomicU32::new(CPU::local().cpuid() as u32),
             cpu: AtomicU32::new(CPU::local().cpuid() as u32),
-            state: TaskState::new(TaskState::RUNNING),
+            state: TaskState::new(TaskState::BLOCKED),
             executor: AtomicUniqueRefCell::new(executor),
             executor: AtomicUniqueRefCell::new(executor),
             link_task_list: RBTreeAtomicLink::new(),
             link_task_list: RBTreeAtomicLink::new(),
             link_ready_queue: LinkedListAtomicLink::new(),
             link_ready_queue: LinkedListAtomicLink::new(),
@@ -71,31 +71,35 @@ impl Task {
         }
         }
     }
     }
 
 
-    /// # Returns
-    /// Whether the task has finished.
-    pub fn run(self: &Arc<Self>) -> bool {
+    pub fn poll(self: &Arc<Self>) -> Poll<()> {
         let mut executor_borrow = self.executor.borrow();
         let mut executor_borrow = self.executor.borrow();
         let waker = Waker::from(self.clone());
         let waker = Waker::from(self.clone());
         let mut cx = Context::from_waker(&waker);
         let mut cx = Context::from_waker(&waker);
 
 
-        executor_borrow.run(&mut cx)
+        executor_borrow.poll(&mut cx)
     }
     }
 
 
     /// Get the stabilized lock for the task's run queue.
     /// Get the stabilized lock for the task's run queue.
-    fn rq(&self) -> Option<impl DerefMut<Target = dyn ReadyQueue> + 'static> {
+    pub fn rq(&self) -> impl DerefMut<Target = dyn ReadyQueue> + 'static {
         loop {
         loop {
             let cpu = self.cpu.load(Ordering::Relaxed);
             let cpu = self.cpu.load(Ordering::Relaxed);
             let rq = cpu_rq(cpu as usize).lock_irq();
             let rq = cpu_rq(cpu as usize).lock_irq();
 
 
-            if cpu == self.cpu.load(Ordering::Acquire) {
-                if self.link_ready_queue.is_linked() {
-                    return Some(rq);
-                } else {
-                    return None;
-                }
+            // We stabilize the task cpu with the cpu rq here for now.
+            if cpu != self.cpu.load(Ordering::Acquire) {
+                continue;
             }
             }
+
+            return rq;
         }
         }
     }
     }
+
+    pub fn block_on<F>(future: F) -> F::Output
+    where
+        F: Future,
+    {
+        todo!()
+    }
 }
 }
 
 
 impl Wake for Task {
 impl Wake for Task {
@@ -104,20 +108,18 @@ impl Wake for Task {
     }
     }
 
 
     fn wake_by_ref(self: &Arc<Self>) {
     fn wake_by_ref(self: &Arc<Self>) {
-        if self
-            .state
-            .cmpxchg(TaskState::PARKED, TaskState::READY)
-            .is_err()
-        {
+        let Ok(old) = self.state.update(|state| match state {
+            TaskState::BLOCKED => Some(TaskState::READY),
+            TaskState::RUNNING => Some(TaskState::READY | TaskState::RUNNING),
+            TaskState::READY | TaskState::READY_RUNNING => None,
+            state => unreachable!("Waking a {state:?} task"),
+        }) else {
             return;
             return;
-        }
-
-        if let Some(mut rq) = self.rq() {
-            if self.state.get() != TaskState::PARKED {
-                return;
-            }
+        };
 
 
-            rq.put(self.clone());
+        if old == TaskState::BLOCKED {
+            // If the task was blocked, we need to put it back to the ready queue.
+            self.rq().put(self.clone());
         }
         }
     }
     }
 }
 }

+ 6 - 13
crates/eonix_runtime/src/task/task_state.rs

@@ -4,9 +4,10 @@ use core::sync::atomic::{AtomicU32, Ordering};
 pub struct TaskState(AtomicU32);
 pub struct TaskState(AtomicU32);
 
 
 impl TaskState {
 impl TaskState {
-    pub const READY: u32 = 0;
-    pub const RUNNING: u32 = 1;
-    pub const PARKED: u32 = 2;
+    pub const BLOCKED: u32 = 0;
+    pub const READY: u32 = 1;
+    pub const RUNNING: u32 = 2;
+    pub const READY_RUNNING: u32 = TaskState::READY | TaskState::RUNNING;
     pub const DEAD: u32 = 1 << 31;
     pub const DEAD: u32 = 1 << 31;
 
 
     pub(crate) const fn new(state: u32) -> Self {
     pub(crate) const fn new(state: u32) -> Self {
@@ -17,16 +18,8 @@ impl TaskState {
         self.0.swap(state, Ordering::SeqCst)
         self.0.swap(state, Ordering::SeqCst)
     }
     }
 
 
-    pub(crate) fn set(&self, state: u32) {
-        self.0.store(state, Ordering::SeqCst);
-    }
-
-    pub(crate) fn get(&self) -> u32 {
-        self.0.load(Ordering::SeqCst)
-    }
-
-    pub(crate) fn cmpxchg(&self, current: u32, new: u32) -> Result<u32, u32> {
+    pub(crate) fn update(&self, func: impl FnMut(u32) -> Option<u32>) -> Result<u32, u32> {
         self.0
         self.0
-            .compare_exchange(current, new, Ordering::SeqCst, Ordering::SeqCst)
+            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, func)
     }
     }
 }
 }

+ 3 - 3
src/driver/serial.rs

@@ -3,14 +3,14 @@ mod io;
 use crate::{
 use crate::{
     kernel::{
     kernel::{
         block::make_device, console::set_console, constants::EIO, interrupt::register_irq_handler,
         block::make_device, console::set_console, constants::EIO, interrupt::register_irq_handler,
-        task::KernelStack, CharDevice, CharDeviceType, Terminal, TerminalDevice,
+        CharDevice, CharDeviceType, Terminal, TerminalDevice,
     },
     },
     prelude::*,
     prelude::*,
 };
 };
 use alloc::{collections::vec_deque::VecDeque, format, sync::Arc};
 use alloc::{collections::vec_deque::VecDeque, format, sync::Arc};
 use bitflags::bitflags;
 use bitflags::bitflags;
 use core::pin::pin;
 use core::pin::pin;
-use eonix_runtime::{run::FutureRun, scheduler::Scheduler};
+use eonix_runtime::scheduler::RUNTIME;
 use eonix_sync::{SpinIrq as _, WaitList};
 use eonix_sync::{SpinIrq as _, WaitList};
 use io::SerialIO;
 use io::SerialIO;
 
 
@@ -161,7 +161,7 @@ impl Serial {
             })?;
             })?;
         }
         }
 
 
-        Scheduler::get().spawn::<KernelStack, _>(FutureRun::new(Self::worker(port.clone())));
+        RUNTIME.spawn(Self::worker(port.clone()));
 
 
         let _ = set_console(terminal.clone());
         let _ = set_console(terminal.clone());
         eonix_log::set_console(terminal.clone());
         eonix_log::set_console(terminal.clone());

+ 2 - 10
src/driver/virtio/riscv64.rs

@@ -1,23 +1,15 @@
 use super::virtio_blk::HAL;
 use super::virtio_blk::HAL;
-use crate::kernel::{
-    block::{make_device, BlockDevice},
-    mem::{AsMemoryBlock, MemoryBlock, Page},
-};
+use crate::kernel::block::{make_device, BlockDevice};
 use alloc::{sync::Arc, vec::Vec};
 use alloc::{sync::Arc, vec::Vec};
-use core::num::NonZero;
 use eonix_hal::arch_exported::fdt::FDT;
 use eonix_hal::arch_exported::fdt::FDT;
 use eonix_hal::mm::ArchPhysAccess;
 use eonix_hal::mm::ArchPhysAccess;
 use eonix_log::{println_info, println_warn};
 use eonix_log::{println_info, println_warn};
-use eonix_mm::{
-    address::{Addr, PAddr, PhysAccess},
-    paging::PFN,
-};
+use eonix_mm::address::{PAddr, PhysAccess};
 use eonix_runtime::task::Task;
 use eonix_runtime::task::Task;
 use eonix_sync::Spin;
 use eonix_sync::Spin;
 use virtio_drivers::{
 use virtio_drivers::{
     device::blk::VirtIOBlk,
     device::blk::VirtIOBlk,
     transport::{mmio::MmioTransport, Transport},
     transport::{mmio::MmioTransport, Transport},
-    Hal,
 };
 };
 
 
 pub fn init() {
 pub fn init() {

+ 2 - 11
src/kernel/interrupt.rs

@@ -1,5 +1,5 @@
 use super::mem::handle_kernel_page_fault;
 use super::mem::handle_kernel_page_fault;
-use super::timer::{should_reschedule, timer_interrupt};
+use super::timer::timer_interrupt;
 use crate::kernel::constants::EINVAL;
 use crate::kernel::constants::EINVAL;
 use crate::prelude::*;
 use crate::prelude::*;
 use alloc::sync::Arc;
 use alloc::sync::Arc;
@@ -7,7 +7,6 @@ use eonix_hal::traits::fault::Fault;
 use eonix_hal::traits::trap::{RawTrapContext, TrapType};
 use eonix_hal::traits::trap::{RawTrapContext, TrapType};
 use eonix_hal::trap::TrapContext;
 use eonix_hal::trap::TrapContext;
 use eonix_mm::address::{Addr as _, VAddr};
 use eonix_mm::address::{Addr as _, VAddr};
-use eonix_runtime::scheduler::Scheduler;
 use eonix_sync::SpinIrq as _;
 use eonix_sync::SpinIrq as _;
 
 
 static IRQ_HANDLERS: Spin<[Vec<Arc<dyn Fn() + Send + Sync>>; 16]> =
 static IRQ_HANDLERS: Spin<[Vec<Arc<dyn Fn() + Send + Sync>>; 16]> =
@@ -51,15 +50,7 @@ pub fn interrupt_handler(trap_ctx: &mut TrapContext) {
         TrapType::Syscall { no, .. } => unreachable!("Syscall {} in kernel space.", no),
         TrapType::Syscall { no, .. } => unreachable!("Syscall {} in kernel space.", no),
         TrapType::Fault(fault) => default_fault_handler(fault, trap_ctx),
         TrapType::Fault(fault) => default_fault_handler(fault, trap_ctx),
         TrapType::Irq { callback } => callback(default_irq_handler),
         TrapType::Irq { callback } => callback(default_irq_handler),
-        TrapType::Timer { callback } => {
-            callback(timer_interrupt);
-
-            if eonix_preempt::count() == 0 && should_reschedule() {
-                // To make scheduler satisfied.
-                eonix_preempt::disable();
-                Scheduler::schedule();
-            }
-        }
+        TrapType::Timer { callback } => callback(timer_interrupt),
     }
     }
 }
 }
 
 

+ 0 - 1
src/kernel/mem/page_alloc/raw_page.rs

@@ -6,7 +6,6 @@ use core::{
     sync::atomic::{AtomicU32, AtomicUsize, Ordering},
     sync::atomic::{AtomicU32, AtomicUsize, Ordering},
 };
 };
 use eonix_hal::mm::ArchPhysAccess;
 use eonix_hal::mm::ArchPhysAccess;
-use eonix_mm::paging::PAGE_SIZE;
 use eonix_mm::{
 use eonix_mm::{
     address::{PAddr, PhysAccess as _},
     address::{PAddr, PhysAccess as _},
     paging::{RawPage as RawPageTrait, PFN},
     paging::{RawPage as RawPageTrait, PFN},

+ 1 - 1
src/kernel/syscall/mm.rs

@@ -1,6 +1,6 @@
 use super::FromSyscallArg;
 use super::FromSyscallArg;
 use crate::fs::shm::{gen_shm_id, ShmFlags, IPC_PRIVATE, SHM_MANAGER};
 use crate::fs::shm::{gen_shm_id, ShmFlags, IPC_PRIVATE, SHM_MANAGER};
-use crate::kernel::constants::{EBADF, EEXIST, EINVAL, ENOENT, ENOMEM};
+use crate::kernel::constants::{EBADF, EEXIST, EINVAL, ENOENT};
 use crate::kernel::mem::FileMapping;
 use crate::kernel::mem::FileMapping;
 use crate::kernel::task::Thread;
 use crate::kernel::task::Thread;
 use crate::kernel::vfs::filearray::FD;
 use crate::kernel::vfs::filearray::FD;

+ 1 - 1
src/kernel/task.rs

@@ -18,4 +18,4 @@ pub use process_group::ProcessGroup;
 pub use process_list::ProcessList;
 pub use process_list::ProcessList;
 pub use session::Session;
 pub use session::Session;
 pub use signal::SignalAction;
 pub use signal::SignalAction;
-pub use thread::{new_thread_runnable, yield_now, Thread, ThreadBuilder};
+pub use thread::{yield_now, Thread, ThreadBuilder};

+ 3 - 6
src/kernel/task/clone.rs

@@ -1,10 +1,7 @@
 use crate::{
 use crate::{
     kernel::{
     kernel::{
         syscall::procops::parse_user_tls,
         syscall::procops::parse_user_tls,
-        task::{
-            alloc_pid, new_thread_runnable, KernelStack, ProcessBuilder, ProcessList, Thread,
-            ThreadBuilder,
-        },
+        task::{alloc_pid, ProcessBuilder, ProcessList, Thread, ThreadBuilder},
         user::UserPointerMut,
         user::UserPointerMut,
     },
     },
     KResult,
     KResult,
@@ -12,7 +9,7 @@ use crate::{
 use bitflags::bitflags;
 use bitflags::bitflags;
 use core::num::NonZero;
 use core::num::NonZero;
 use eonix_hal::processor::UserTLS;
 use eonix_hal::processor::UserTLS;
-use eonix_runtime::{scheduler::Scheduler, task::Task};
+use eonix_runtime::{scheduler::RUNTIME, task::Task};
 use eonix_sync::AsProof;
 use eonix_sync::AsProof;
 use posix_types::signal::Signal;
 use posix_types::signal::Signal;
 
 
@@ -166,7 +163,7 @@ pub fn do_clone(thread: &Thread, clone_args: CloneArgs) -> KResult<u32> {
         UserPointerMut::new(parent_tid_ptr as *mut u32)?.write(new_pid)?
         UserPointerMut::new(parent_tid_ptr as *mut u32)?.write(new_pid)?
     }
     }
 
 
-    Scheduler::get().spawn::<KernelStack, _>(new_thread_runnable(new_thread));
+    RUNTIME.spawn(new_thread.run());
 
 
     Ok(new_pid)
     Ok(new_pid)
 }
 }

+ 8 - 4
src/kernel/task/process.rs

@@ -350,7 +350,11 @@ impl Process {
         trace_continue: bool,
         trace_continue: bool,
     ) -> KResult<Option<WaitObject>> {
     ) -> KResult<Option<WaitObject>> {
         let wait_object = {
         let wait_object = {
-            let mut waits = self.wait_list.entry(wait_id, trace_stop, trace_continue);
+            let mut waits = self
+                .wait_list
+                .entry(wait_id, trace_stop, trace_continue)
+                .await;
+
             loop {
             loop {
                 if let Some(object) = waits.get() {
                 if let Some(object) = waits.get() {
                     break object;
                     break object;
@@ -377,7 +381,7 @@ impl Process {
             Ok(Some(wait_object))
             Ok(Some(wait_object))
         } else {
         } else {
             let mut procs = ProcessList::get().write().await;
             let mut procs = ProcessList::get().write().await;
-            procs.remove_process(wait_object.pid);
+            procs.remove_process(wait_object.pid).await;
             assert!(self
             assert!(self
                 .inner
                 .inner
                 .access_mut(procs.prove_mut())
                 .access_mut(procs.prove_mut())
@@ -572,9 +576,9 @@ impl WaitList {
     /// # Safety
     /// # Safety
     /// Locks `ProcessList` and `WaitList` at the same time. When `wait` is called,
     /// Locks `ProcessList` and `WaitList` at the same time. When `wait` is called,
     /// releases the lock on `ProcessList` and `WaitList` and waits on `cv_wait_procs`.
     /// releases the lock on `ProcessList` and `WaitList` and waits on `cv_wait_procs`.
-    pub fn entry(&self, wait_id: WaitId, want_stop: bool, want_continue: bool) -> Entry {
+    pub async fn entry(&self, wait_id: WaitId, want_stop: bool, want_continue: bool) -> Entry {
         Entry {
         Entry {
-            process_list: Task::block_on(ProcessList::get().read()),
+            process_list: ProcessList::get().read().await,
             wait_procs: self.wait_procs.lock(),
             wait_procs: self.wait_procs.lock(),
             cv: &self.cv_wait_procs,
             cv: &self.cv_wait_procs,
             want_stop,
             want_stop,

+ 4 - 6
src/kernel/task/process_list.rs

@@ -9,7 +9,6 @@ use alloc::{
     collections::btree_map::BTreeMap,
     collections::btree_map::BTreeMap,
     sync::{Arc, Weak},
     sync::{Arc, Weak},
 };
 };
-use eonix_runtime::task::Task;
 use eonix_sync::{AsProof as _, AsProofMut as _, RwLock};
 use eonix_sync::{AsProof as _, AsProofMut as _, RwLock};
 
 
 pub struct ProcessList {
 pub struct ProcessList {
@@ -54,7 +53,7 @@ impl ProcessList {
         self.threads.insert(thread.tid, thread.clone());
         self.threads.insert(thread.tid, thread.clone());
     }
     }
 
 
-    pub fn remove_process(&mut self, pid: u32) {
+    pub async fn remove_process(&mut self, pid: u32) {
         // Thread group leader has the same tid as the pid.
         // Thread group leader has the same tid as the pid.
         if let Some(thread) = self.threads.remove(&pid) {
         if let Some(thread) = self.threads.remove(&pid) {
             self.processes.remove(&pid);
             self.processes.remove(&pid);
@@ -64,7 +63,7 @@ impl ProcessList {
             let pgroup = unsafe { thread.process.pgroup.swap(None) }.unwrap();
             let pgroup = unsafe { thread.process.pgroup.swap(None) }.unwrap();
             let _parent = unsafe { thread.process.parent.swap(None) }.unwrap();
             let _parent = unsafe { thread.process.parent.swap(None) }.unwrap();
             pgroup.remove_member(pid, self.prove_mut());
             pgroup.remove_member(pid, self.prove_mut());
-            Task::block_on(rcu_sync());
+            rcu_sync().await;
 
 
             if Arc::strong_count(&pgroup) == 1 {
             if Arc::strong_count(&pgroup) == 1 {
                 self.pgroups.remove(&pgroup.pgid);
                 self.pgroups.remove(&pgroup.pgid);
@@ -154,10 +153,9 @@ impl ProcessList {
 
 
             // If we are the session leader, we should drop the control terminal.
             // If we are the session leader, we should drop the control terminal.
             if process.session(self.prove()).sid == process.pid {
             if process.session(self.prove()).sid == process.pid {
-                if let Some(terminal) =
-                    Task::block_on(process.session(self.prove()).drop_control_terminal())
+                if let Some(terminal) = process.session(self.prove()).drop_control_terminal().await
                 {
                 {
-                    terminal.drop_session();
+                    terminal.drop_session().await;
                 }
                 }
             }
             }
 
 

+ 2 - 2
src/kernel/task/session.rs

@@ -87,14 +87,14 @@ impl Session {
     ) -> KResult<()> {
     ) -> KResult<()> {
         let mut job_control = self.job_control.write().await;
         let mut job_control = self.job_control.write().await;
         if let Some(_) = job_control.control_terminal.as_ref() {
         if let Some(_) = job_control.control_terminal.as_ref() {
-            if let Some(session) = terminal.session().as_ref() {
+            if let Some(session) = terminal.session().await.as_ref() {
                 if session.sid == self.sid {
                 if session.sid == self.sid {
                     return Ok(());
                     return Ok(());
                 }
                 }
             }
             }
             return Err(EPERM);
             return Err(EPERM);
         }
         }
-        terminal.set_session(self, forced)?;
+        terminal.set_session(self, forced).await?;
         job_control.control_terminal = Some(terminal.clone());
         job_control.control_terminal = Some(terminal.clone());
         job_control.foreground = Arc::downgrade(&Thread::current().process.pgroup(procs));
         job_control.foreground = Arc::downgrade(&Thread::current().process.pgroup(procs));
         Ok(())
         Ok(())

+ 5 - 8
src/kernel/task/signal.rs

@@ -9,7 +9,7 @@ use core::{cmp::Reverse, task::Waker};
 use eonix_hal::fpu::FpuState;
 use eonix_hal::fpu::FpuState;
 use eonix_hal::traits::trap::RawTrapContext;
 use eonix_hal::traits::trap::RawTrapContext;
 use eonix_hal::trap::TrapContext;
 use eonix_hal::trap::TrapContext;
-use eonix_runtime::task::Task;
+use eonix_runtime::scheduler::Runtime;
 use eonix_sync::AsProof as _;
 use eonix_sync::AsProof as _;
 use intrusive_collections::UnsafeRef;
 use intrusive_collections::UnsafeRef;
 use posix_types::signal::{SigSet, Signal};
 use posix_types::signal::{SigSet, Signal};
@@ -226,15 +226,12 @@ impl SignalList {
 
 
                     // `SIGSTOP` can only be waken up by `SIGCONT` or `SIGKILL`.
                     // `SIGSTOP` can only be waken up by `SIGCONT` or `SIGKILL`.
                     // SAFETY: Preempt disabled above.
                     // SAFETY: Preempt disabled above.
-                    {
+                    Runtime::block_till_woken(|waker| {
                         let mut inner = self.inner.lock();
                         let mut inner = self.inner.lock();
-                        let waker = Waker::from(Task::current().clone());
-
-                        let old_waker = inner.stop_waker.replace(waker);
+                        let old_waker = inner.stop_waker.replace(waker.clone());
                         assert!(old_waker.is_none(), "We should not have a waker here");
                         assert!(old_waker.is_none(), "We should not have a waker here");
-                    }
-
-                    Task::park_preempt_disabled();
+                    })
+                    .await;
 
 
                     if let Some(parent) = thread.process.parent.load() {
                     if let Some(parent) = thread.process.parent.load() {
                         parent.notify(
                         parent.notify(

+ 32 - 76
src/kernel/task/thread.rs

@@ -17,10 +17,10 @@ use alloc::sync::Arc;
 use atomic_unique_refcell::AtomicUniqueRefCell;
 use atomic_unique_refcell::AtomicUniqueRefCell;
 use core::{
 use core::{
     future::Future,
     future::Future,
-    pin::Pin,
+    pin::{pin, Pin},
     ptr::NonNull,
     ptr::NonNull,
     sync::atomic::{AtomicBool, Ordering},
     sync::atomic::{AtomicBool, Ordering},
-    task::{Context, Poll, Waker},
+    task::{Context, Poll},
 };
 };
 use eonix_hal::{
 use eonix_hal::{
     fpu::FpuState,
     fpu::FpuState,
@@ -33,7 +33,6 @@ use eonix_hal::{
     trap::{disable_irqs_save, TrapContext},
     trap::{disable_irqs_save, TrapContext},
 };
 };
 use eonix_mm::address::{Addr as _, VAddr};
 use eonix_mm::address::{Addr as _, VAddr};
-use eonix_runtime::run::{Contexted, Run, RunState};
 use eonix_sync::AsProofMut as _;
 use eonix_sync::AsProofMut as _;
 use pointers::BorrowedArc;
 use pointers::BorrowedArc;
 use posix_types::signal::Signal;
 use posix_types::signal::Signal;
@@ -41,11 +40,6 @@ use posix_types::signal::Signal;
 #[eonix_percpu::define_percpu]
 #[eonix_percpu::define_percpu]
 static CURRENT_THREAD: Option<NonNull<Thread>> = None;
 static CURRENT_THREAD: Option<NonNull<Thread>> = None;
 
 
-pub struct ThreadRunnable<F: Future> {
-    thread: Arc<Thread>,
-    future: F,
-}
-
 pub struct ThreadBuilder {
 pub struct ThreadBuilder {
     tid: Option<u32>,
     tid: Option<u32>,
     name: Option<Arc<[u8]>>,
     name: Option<Arc<[u8]>>,
@@ -421,28 +415,44 @@ impl Thread {
         }
         }
     }
     }
 
 
-    pub async fn run(self: Arc<Thread>) {
-        struct ContextedRun<'a, F: Future>(F, &'a Thread);
+    pub fn run(self: Arc<Thread>) -> impl Future<Output = ()> + Send + 'static {
+        async fn real_run_with_context(me: &Arc<Thread>) {
+            let mut future = pin!(me.real_run());
+
+            core::future::poll_fn(|cx| {
+                me.process.mm_list.activate();
 
 
-        impl<F: Future> Future for ContextedRun<'_, F> {
-            type Output = F::Output;
+                CURRENT_THREAD.set(NonNull::new(Arc::as_ptr(me) as *mut _));
+
+                unsafe {
+                    // SAFETY: Preemption is disabled.
+                    me.load_thread_area32();
+                }
+
+                unsafe {
+                    let trap_ctx_ptr: *const TrapContext = &raw const *me.trap_ctx.borrow();
+                    // SAFETY:
+                    CPU::local()
+                        .as_mut()
+                        .load_interrupt_stack(trap_ctx_ptr as u64);
+                }
 
 
-            fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
                 let irq_state = disable_irqs_save();
                 let irq_state = disable_irqs_save();
-                let (future, _) = unsafe {
-                    // SAFETY: We construct a pinned future and `&Thread` is `Unpin`.
-                    let me = self.as_mut().get_unchecked_mut();
-                    (Pin::new_unchecked(&mut me.0), me.1)
-                };
 
 
-                let retval = future.poll(ctx);
+                let result = future.as_mut().poll(cx);
 
 
                 irq_state.restore();
                 irq_state.restore();
-                retval
-            }
+
+                me.process.mm_list.deactivate();
+
+                CURRENT_THREAD.set(None);
+
+                result
+            })
+            .await
         }
         }
 
 
-        ContextedRun(self.real_run(), &self).await
+        async move { real_run_with_context(&self).await }
     }
     }
 }
 }
 
 
@@ -467,57 +477,3 @@ pub async fn yield_now() {
 
 
     Yield { yielded: false }.await;
     Yield { yielded: false }.await;
 }
 }
-
-pub fn new_thread_runnable(
-    thread: Arc<Thread>,
-) -> ThreadRunnable<impl Future<Output = impl Send + 'static> + Send + 'static> {
-    ThreadRunnable {
-        thread: thread.clone(),
-        future: thread.run(),
-    }
-}
-
-impl<F: Future> Contexted for ThreadRunnable<F> {
-    fn load_running_context(&self) {
-        self.thread.process.mm_list.activate();
-
-        let raw_ptr: *const Thread = &raw const *self.thread;
-        CURRENT_THREAD.set(NonNull::new(raw_ptr as *mut _));
-
-        unsafe {
-            // SAFETY: Preemption is disabled.
-            self.thread.load_thread_area32();
-        }
-
-        unsafe {
-            let trap_ctx_ptr: *const TrapContext = &raw const *self.thread.trap_ctx.borrow();
-            // SAFETY:
-            CPU::local()
-                .as_mut()
-                .load_interrupt_stack(trap_ctx_ptr as u64);
-        }
-    }
-
-    fn restore_running_context(&self) {
-        self.thread.process.mm_list.deactivate();
-
-        CURRENT_THREAD.set(None);
-    }
-}
-
-impl<F: Future> Run for ThreadRunnable<F> {
-    type Output = F::Output;
-
-    fn run(mut self: Pin<&mut Self>, waker: &Waker) -> RunState<Self::Output> {
-        let mut ctx = Context::from_waker(waker);
-
-        match unsafe {
-            self.as_mut()
-                .map_unchecked_mut(|me| &mut me.future)
-                .poll(&mut ctx)
-        } {
-            Poll::Ready(output) => RunState::Finished(output),
-            Poll::Pending => RunState::Running,
-        }
-    }
-}

+ 9 - 9
src/kernel/terminal.rs

@@ -623,12 +623,12 @@ impl Terminal {
                 ptr.write(window_size)
                 ptr.write(window_size)
             }
             }
             TerminalIORequest::GetTermios(ptr) => {
             TerminalIORequest::GetTermios(ptr) => {
-                let termios = Task::block_on(self.inner.lock()).termio.get_user();
+                let termios = self.inner.lock().await.termio.get_user();
                 ptr.write(termios)
                 ptr.write(termios)
             }
             }
             TerminalIORequest::SetTermios(ptr) => {
             TerminalIORequest::SetTermios(ptr) => {
                 let user_termios = ptr.read()?;
                 let user_termios = ptr.read()?;
-                let mut inner = Task::block_on(self.inner.lock());
+                let mut inner = self.inner.lock().await;
 
 
                 // TODO: We ignore unknown bits for now.
                 // TODO: We ignore unknown bits for now.
                 inner.termio.iflag = TermioIFlags::from_bits_truncate(user_termios.iflag as u16);
                 inner.termio.iflag = TermioIFlags::from_bits_truncate(user_termios.iflag as u16);
@@ -644,13 +644,13 @@ impl Terminal {
     }
     }
 
 
     /// Assign the `session` to this terminal. Drop the previous session if `forced` is true.
     /// Assign the `session` to this terminal. Drop the previous session if `forced` is true.
-    pub fn set_session(&self, session: &Arc<Session>, forced: bool) -> KResult<()> {
-        let mut inner = Task::block_on(self.inner.lock());
+    pub async fn set_session(&self, session: &Arc<Session>, forced: bool) -> KResult<()> {
+        let mut inner = self.inner.lock().await;
         if let Some(session) = inner.session.upgrade() {
         if let Some(session) = inner.session.upgrade() {
             if !forced {
             if !forced {
                 Err(EPERM)
                 Err(EPERM)
             } else {
             } else {
-                Task::block_on(session.drop_control_terminal());
+                session.drop_control_terminal().await;
                 inner.session = Arc::downgrade(&session);
                 inner.session = Arc::downgrade(&session);
                 Ok(())
                 Ok(())
             }
             }
@@ -661,12 +661,12 @@ impl Terminal {
         }
         }
     }
     }
 
 
-    pub fn drop_session(&self) {
-        Task::block_on(self.inner.lock()).session = Weak::new();
+    pub async fn drop_session(&self) {
+        self.inner.lock().await.session = Weak::new();
     }
     }
 
 
-    pub fn session(&self) -> Option<Arc<Session>> {
-        Task::block_on(self.inner.lock()).session.upgrade()
+    pub async fn session(&self) -> Option<Arc<Session>> {
+        self.inner.lock().await.session.upgrade()
     }
     }
 }
 }
 
 

+ 34 - 29
src/lib.rs

@@ -24,19 +24,19 @@ use crate::kernel::task::alloc_pid;
 use alloc::{ffi::CString, sync::Arc};
 use alloc::{ffi::CString, sync::Arc};
 use core::{
 use core::{
     hint::spin_loop,
     hint::spin_loop,
-    sync::atomic::{AtomicBool, Ordering},
+    sync::atomic::{AtomicBool, AtomicUsize, Ordering},
 };
 };
 use eonix_hal::{
 use eonix_hal::{
-    arch_exported::bootstrap::shutdown, processor::CPU, traits::trap::IrqState,
+    arch_exported::bootstrap::shutdown,
+    processor::{halt, CPU, CPU_COUNT},
+    traits::trap::IrqState,
     trap::disable_irqs_save,
     trap::disable_irqs_save,
 };
 };
 use eonix_mm::address::PRange;
 use eonix_mm::address::PRange;
-use eonix_runtime::{run::FutureRun, scheduler::Scheduler, task::Task};
+use eonix_runtime::scheduler::RUNTIME;
 use kernel::{
 use kernel::{
     mem::GlobalPageAlloc,
     mem::GlobalPageAlloc,
-    task::{
-        new_thread_runnable, KernelStack, ProcessBuilder, ProcessList, ProgramLoader, ThreadBuilder,
-    },
+    task::{ProcessBuilder, ProcessList, ProgramLoader, ThreadBuilder},
     vfs::{
     vfs::{
         dentry::Dentry,
         dentry::Dentry,
         mount::{do_mount, MS_NOATIME, MS_NODEV, MS_NOSUID, MS_RDONLY},
         mount::{do_mount, MS_NOATIME, MS_NODEV, MS_NOSUID, MS_RDONLY},
@@ -80,6 +80,25 @@ fn panic(info: &core::panic::PanicInfo) -> ! {
 }
 }
 
 
 static BSP_OK: AtomicBool = AtomicBool::new(false);
 static BSP_OK: AtomicBool = AtomicBool::new(false);
+static CPU_SHUTTING_DOWN: AtomicUsize = AtomicUsize::new(0);
+
+fn shutdown_system() -> ! {
+    let cpu_count = CPU_COUNT.load(Ordering::Relaxed);
+
+    if CPU_SHUTTING_DOWN.fetch_add(1, Ordering::AcqRel) + 1 == cpu_count {
+        println_info!("All CPUs are shutting down. Gracefully powering off...");
+        shutdown();
+    } else {
+        println_info!(
+            "CPU {} is shutting down. Waiting for other CPUs...",
+            CPU::local().cpuid()
+        );
+
+        loop {
+            halt();
+        }
+    }
+}
 
 
 #[eonix_hal::main]
 #[eonix_hal::main]
 fn kernel_init(mut data: eonix_hal::bootstrap::BootStrapData) -> ! {
 fn kernel_init(mut data: eonix_hal::bootstrap::BootStrapData) -> ! {
@@ -90,22 +109,14 @@ fn kernel_init(mut data: eonix_hal::bootstrap::BootStrapData) -> ! {
         driver::sbi_console::init_console();
         driver::sbi_console::init_console();
     }
     }
 
 
-    // To satisfy the `Scheduler` "preempt count == 0" assertion.
-    eonix_preempt::disable();
-
-    // We need root dentry to be present in constructor of `FsContext`.
-    // So call `init_vfs` first, then `init_multitasking`.
-    Scheduler::init_local_scheduler::<KernelStack>();
-
-    Scheduler::get().spawn::<KernelStack, _>(FutureRun::new(init_process(data.get_early_stack())));
-
     BSP_OK.store(true, Ordering::Release);
     BSP_OK.store(true, Ordering::Release);
 
 
+    RUNTIME.spawn(init_process(data.get_early_stack()));
+
     drop(data);
     drop(data);
-    unsafe {
-        // SAFETY: `preempt::count()` == 1.
-        Scheduler::goto_scheduler_noreturn()
-    }
+
+    RUNTIME.enter();
+    shutdown_system();
 }
 }
 
 
 #[eonix_hal::ap_main]
 #[eonix_hal::ap_main]
@@ -115,16 +126,10 @@ fn kernel_ap_main(_stack_range: PRange) -> ! {
         spin_loop();
         spin_loop();
     }
     }
 
 
-    Scheduler::init_local_scheduler::<KernelStack>();
     println_debug!("AP{} started", CPU::local().cpuid());
     println_debug!("AP{} started", CPU::local().cpuid());
 
 
-    eonix_preempt::disable();
-
-    // TODO!!!!!: Free the stack after having switched to idle task.
-    unsafe {
-        // SAFETY: `preempt::count()` == 1.
-        Scheduler::goto_scheduler_noreturn()
-    }
+    RUNTIME.enter();
+    shutdown_system();
 }
 }
 
 
 async fn init_process(early_kstack: PRange) {
 async fn init_process(early_kstack: PRange) {
@@ -223,7 +228,7 @@ async fn init_process(early_kstack: PRange) {
         .name(Arc::from(&b"busybox"[..]))
         .name(Arc::from(&b"busybox"[..]))
         .entry(load_info.entry_ip, load_info.sp);
         .entry(load_info.entry_ip, load_info.sp);
 
 
-    let mut process_list = Task::block_on(ProcessList::get().write());
+    let mut process_list = ProcessList::get().write().await;
     let (thread, process) = ProcessBuilder::new()
     let (thread, process) = ProcessBuilder::new()
         .pid(alloc_pid())
         .pid(alloc_pid())
         .mm_list(load_info.mm_list)
         .mm_list(load_info.mm_list)
@@ -235,5 +240,5 @@ async fn init_process(early_kstack: PRange) {
     // TODO!!!: Remove this.
     // TODO!!!: Remove this.
     thread.files.open_console();
     thread.files.open_console();
 
 
-    Scheduler::get().spawn::<KernelStack, _>(new_thread_runnable(thread));
+    RUNTIME.spawn(thread.run());
 }
 }