Просмотр исходного кода

temporary solution: use `Task::block_on` everywhere

greatbridf 9 месяцев назад
Родитель
Сommit
a2ec93f2e3

+ 25 - 19
src/driver/ahci/port.rs

@@ -1,3 +1,5 @@
+use core::pin::pin;
+
 use super::command::{Command, IdentifyCommand, ReadLBACommand};
 use super::{
     vread, vwrite, CommandHeader, PRDTEntry, FISH2D, PORT_CMD_CR, PORT_CMD_FR, PORT_CMD_FRE,
@@ -7,11 +9,11 @@ use crate::kernel::block::{BlockDeviceRequest, BlockRequestQueue};
 use crate::kernel::mem::paging::Page;
 use crate::kernel::mem::phys::{NoCachePP, PhysPtr};
 use crate::prelude::*;
-use crate::sync::UCondVar;
 use alloc::collections::vec_deque::VecDeque;
 use bindings::{EINVAL, EIO};
-use eonix_preempt::assert_preempt_enabled;
+use eonix_runtime::task::Task;
 use eonix_spin_irq::SpinIrq as _;
+use eonix_sync::WaitList;
 
 fn spinwait_clear(refval: *const u32, mask: u32) -> KResult<()> {
     const SPINWAIT_MAX: usize = 1000;
@@ -106,7 +108,7 @@ impl CommandSlotInner {
 
 struct CommandSlot {
     inner: Spin<CommandSlotInner>,
-    cv: UCondVar,
+    wait_list: WaitList,
 }
 
 impl CommandSlot {
@@ -116,7 +118,7 @@ impl CommandSlot {
                 state: SlotState::Idle,
                 cmdheader,
             }),
-            cv: UCondVar::new(),
+            wait_list: WaitList::new(),
         }
     }
 }
@@ -153,7 +155,7 @@ pub struct AdapterPort {
     page: Page,
     slots: [CommandSlot; 32],
     free_list: Spin<FreeList>,
-    free_list_cv: UCondVar,
+    free_list_wait: WaitList,
 
     /// Statistics for this port
     pub stats: Spin<AdapterPortStats>,
@@ -176,7 +178,7 @@ impl AdapterPort {
                 CommandSlot::new(unsafe { cmdheaders_start.offset(index as isize) })
             }),
             free_list: Spin::new(FreeList::new()),
-            free_list_cv: UCondVar::new(),
+            free_list_wait: WaitList::new(),
             page,
             stats: Spin::default(),
         }
@@ -217,13 +219,17 @@ impl AdapterPort {
     }
 
     fn get_free_slot(&self) -> u32 {
-        let mut free_list = self.free_list.lock_irq();
-
         loop {
-            match free_list.free.pop_front() {
-                Some(slot) => break slot,
-                None => self.free_list_cv.wait(&mut free_list),
-            };
+            let mut free_list = self.free_list.lock_irq();
+            let free_slot = free_list.free.pop_front();
+            if let Some(slot) = free_slot {
+                return slot;
+            }
+            let mut wait = pin!(self.free_list_wait.prepare_to_wait());
+            wait.as_mut().add_to_wait_list();
+            drop(free_list);
+
+            Task::block_on(wait);
         }
     }
 
@@ -233,7 +239,7 @@ impl AdapterPort {
 
     fn release_free_slot(&self, slot: u32) {
         self.free_list.lock().free.push_back(slot);
-        self.free_list_cv.notify_one();
+        self.free_list_wait.notify_one();
     }
 
     pub fn handle_interrupt(&self) {
@@ -253,7 +259,7 @@ impl AdapterPort {
             let mut slot_inner = slot.inner.lock();
             debug_assert_eq!(slot_inner.state, SlotState::Working);
             slot_inner.state = SlotState::Finished;
-            slot.cv.notify_all();
+            slot.wait_list.notify_all();
             self.stats.lock().int_fired += 1;
 
             false
@@ -281,11 +287,7 @@ impl AdapterPort {
         Ok(())
     }
 
-    /// # Might Sleep
-    /// This function **might sleep**, so call it in a preemptible context
     fn send_command(&self, cmd: &impl Command) -> KResult<()> {
-        assert_preempt_enabled!("AdapterPort::send_command");
-
         let pages = cmd.pages();
         let cmdtable_page = Page::alloc_one();
 
@@ -321,7 +323,11 @@ impl AdapterPort {
                     saved = true;
                     self.save_working(slot_index as u32);
                 }
-                slot_object.cv.wait(&mut slot);
+                let mut wait = pin!(slot_object.wait_list.prepare_to_wait());
+                wait.as_mut().add_to_wait_list();
+                drop(slot);
+                Task::block_on(wait);
+                slot = slot_object.inner.lock_irq();
             }
         } else {
             // TODO: check error

+ 3 - 1
src/driver/e1000e.rs

@@ -119,7 +119,9 @@ impl netdev::Netdev for E1000eDev {
         // setup interrupt handler
         let device = netdev::get_netdev(self.id).unwrap();
         let handler = move || {
-            device.lock().fire().unwrap();
+            eonix_runtime::task::Task::block_on(device.lock())
+                .fire()
+                .unwrap();
         };
 
         register_irq_handler(0xb, handler)?;

+ 5 - 1
src/driver/serial.rs

@@ -86,7 +86,7 @@ impl Serial {
                 let ch = port.tx_rx.read();
 
                 if let Some(terminal) = terminal.as_ref() {
-                    terminal.commit_char(ch);
+                    terminal.commit_char(ch).await;
                 }
             }
 
@@ -203,6 +203,10 @@ impl TerminalDevice for Serial {
         tx_buffer.push_back(ch);
         self.wakeup_worker();
     }
+
+    fn putchar_direct(&self, ch: u8) {
+        self.tx_rx.write(ch);
+    }
 }
 
 pub fn init() -> KResult<()> {

+ 6 - 5
src/fs/fat32.rs

@@ -16,7 +16,6 @@ use crate::{
         },
     },
     prelude::*,
-    sync::rwlock_new,
     KResult,
 };
 use alloc::{
@@ -27,6 +26,8 @@ use alloc::{
 use bindings::EIO;
 use core::{ops::ControlFlow, sync::atomic::Ordering};
 use dir::Dirs as _;
+use eonix_runtime::task::Task;
+use eonix_sync::RwLock;
 use file::ClusterRead;
 
 type ClusterNo = u32;
@@ -133,7 +134,7 @@ impl FatFs {
             sectors_per_cluster: 0,
             rootdir_cluster: 0,
             data_start: 0,
-            fat: rwlock_new(Vec::new()),
+            fat: RwLock::new(Vec::new()),
             weak: weak.clone(),
             icache: BTreeMap::new(),
             volume_label: [0; 11],
@@ -245,7 +246,7 @@ impl Inode for FileInode {
     fn read(&self, buffer: &mut dyn Buffer, offset: usize) -> KResult<usize> {
         let vfs = self.vfs.upgrade().ok_or(EIO)?;
         let vfs = vfs.as_any().downcast_ref::<FatFs>().unwrap();
-        let fat = vfs.fat.read();
+        let fat = Task::block_on(vfs.fat.read());
 
         if self.size.load(Ordering::Relaxed) as usize == 0 {
             return Ok(0);
@@ -286,7 +287,7 @@ impl Inode for DirInode {
     fn lookup(&self, dentry: &Arc<Dentry>) -> KResult<Option<Arc<dyn Inode>>> {
         let vfs = self.vfs.upgrade().ok_or(EIO)?;
         let vfs = vfs.as_any().downcast_ref::<FatFs>().unwrap();
-        let fat = vfs.fat.read();
+        let fat = Task::block_on(vfs.fat.read());
 
         let mut entries = ClusterIterator::new(fat.as_ref(), self.ino as ClusterNo)
             .read(vfs, 0)
@@ -317,7 +318,7 @@ impl Inode for DirInode {
     ) -> KResult<usize> {
         let vfs = self.vfs.upgrade().ok_or(EIO)?;
         let vfs = vfs.as_any().downcast_ref::<FatFs>().unwrap();
-        let fat = vfs.fat.read();
+        let fat = Task::block_on(vfs.fat.read());
 
         let cluster_iter = ClusterIterator::new(fat.as_ref(), self.ino as ClusterNo)
             .read(vfs, offset)

+ 5 - 4
src/fs/procfs.rs

@@ -16,6 +16,7 @@ use crate::{
 use alloc::sync::{Arc, Weak};
 use bindings::{EACCES, ENOTDIR};
 use core::{ops::ControlFlow, sync::atomic::Ordering};
+use eonix_runtime::task::Task;
 use eonix_sync::{AsProof as _, AsProofMut as _, LazyLock, Locked};
 use itertools::Itertools;
 
@@ -129,7 +130,7 @@ impl DirInode {
 
 impl Inode for DirInode {
     fn lookup(&self, dentry: &Arc<Dentry>) -> KResult<Option<Arc<dyn Inode>>> {
-        let lock = self.rwsem.read();
+        let lock = Task::block_on(self.rwsem.read());
         Ok(self
             .entries
             .access(lock.prove())
@@ -146,7 +147,7 @@ impl Inode for DirInode {
         offset: usize,
         callback: &mut dyn FnMut(&[u8], Ino) -> KResult<ControlFlow<(), ()>>,
     ) -> KResult<usize> {
-        let lock = self.rwsem.read();
+        let lock = Task::block_on(self.rwsem.read());
         self.entries
             .access(lock.prove())
             .iter()
@@ -229,7 +230,7 @@ pub fn creat(
     let inode = FileInode::new(ino, Arc::downgrade(&fs), file);
 
     {
-        let lock = parent.idata.rwsem.write();
+        let lock = Task::block_on(parent.idata.rwsem.write());
         parent
             .entries
             .access_mut(lock.prove_mut())
@@ -253,7 +254,7 @@ pub fn mkdir(parent: &ProcFsNode, name: &[u8]) -> KResult<ProcFsNode> {
 
     parent
         .entries
-        .access_mut(inode.rwsem.write().prove_mut())
+        .access_mut(Task::block_on(inode.rwsem.write()).prove_mut())
         .push((Arc::from(name), ProcFsNode::Dir(inode.clone())));
 
     Ok(ProcFsNode::Dir(inode))

+ 12 - 11
src/fs/tmpfs.rs

@@ -14,6 +14,7 @@ use crate::{
 use alloc::sync::{Arc, Weak};
 use bindings::{EINVAL, EIO, EISDIR};
 use core::{ops::ControlFlow, sync::atomic::Ordering};
+use eonix_runtime::task::Task;
 use eonix_sync::{AsProof as _, AsProofMut as _, Locked, ProofMut};
 use itertools::Itertools;
 
@@ -86,7 +87,7 @@ impl Inode for DirectoryInode {
         offset: usize,
         callback: &mut dyn FnMut(&[u8], Ino) -> KResult<ControlFlow<(), ()>>,
     ) -> KResult<usize> {
-        let lock = self.rwsem.read();
+        let lock = Task::block_on(self.rwsem.read());
         self.entries
             .access(lock.prove())
             .iter()
@@ -101,7 +102,7 @@ impl Inode for DirectoryInode {
         let vfs = acquire(&self.vfs)?;
         let vfs = astmp(&vfs);
 
-        let rwsem = self.rwsem.write();
+        let rwsem = Task::block_on(self.rwsem.write());
 
         let ino = vfs.assign_ino();
         let file = FileInode::new(ino, self.vfs.clone(), mode);
@@ -118,7 +119,7 @@ impl Inode for DirectoryInode {
         let vfs = acquire(&self.vfs)?;
         let vfs = astmp(&vfs);
 
-        let rwsem = self.rwsem.write();
+        let rwsem = Task::block_on(self.rwsem.write());
 
         let ino = vfs.assign_ino();
         let file = NodeInode::new(
@@ -136,7 +137,7 @@ impl Inode for DirectoryInode {
         let vfs = acquire(&self.vfs)?;
         let vfs = astmp(&vfs);
 
-        let rwsem = self.rwsem.write();
+        let rwsem = Task::block_on(self.rwsem.write());
 
         let ino = vfs.assign_ino();
         let file = SymlinkInode::new(ino, self.vfs.clone(), target.into());
@@ -149,7 +150,7 @@ impl Inode for DirectoryInode {
         let vfs = acquire(&self.vfs)?;
         let vfs = astmp(&vfs);
 
-        let rwsem = self.rwsem.write();
+        let rwsem = Task::block_on(self.rwsem.write());
 
         let ino = vfs.assign_ino();
         let newdir = DirectoryInode::new(ino, self.vfs.clone(), mode);
@@ -161,7 +162,7 @@ impl Inode for DirectoryInode {
     fn unlink(&self, at: &Arc<Dentry>) -> KResult<()> {
         let _vfs = acquire(&self.vfs)?;
 
-        let dlock = self.rwsem.write();
+        let dlock = Task::block_on(self.rwsem.write());
 
         let file = at.get_inode()?;
         let _flock = file.rwsem.write();
@@ -205,7 +206,7 @@ impl Inode for DirectoryInode {
 
     fn chmod(&self, mode: Mode) -> KResult<()> {
         let _vfs = acquire(&self.vfs)?;
-        let _lock = self.rwsem.write();
+        let _lock = Task::block_on(self.rwsem.write());
 
         // SAFETY: `rwsem` has done the synchronization
         let old = self.mode.load(Ordering::Relaxed);
@@ -265,7 +266,7 @@ impl FileInode {
 impl Inode for FileInode {
     fn read(&self, buffer: &mut dyn Buffer, offset: usize) -> KResult<usize> {
         // TODO: We don't need that strong guarantee, find some way to avoid locks
-        let lock = self.rwsem.read();
+        let lock = Task::block_on(self.rwsem.read());
 
         match self.filedata.access(lock.prove()).split_at_checked(offset) {
             Some((_, data)) => buffer.fill(data).map(|result| result.allow_partial()),
@@ -275,7 +276,7 @@ impl Inode for FileInode {
 
     fn write(&self, buffer: &[u8], offset: WriteOffset) -> KResult<usize> {
         // TODO: We don't need that strong guarantee, find some way to avoid locks
-        let lock = self.rwsem.write();
+        let lock = Task::block_on(self.rwsem.write());
         let filedata = self.filedata.access_mut(lock.prove_mut());
 
         let offset = match offset {
@@ -303,7 +304,7 @@ impl Inode for FileInode {
 
     fn truncate(&self, length: usize) -> KResult<()> {
         // TODO: We don't need that strong guarantee, find some way to avoid locks
-        let lock = self.rwsem.write();
+        let lock = Task::block_on(self.rwsem.write());
         let filedata = self.filedata.access_mut(lock.prove_mut());
 
         // SAFETY: `lock` has done the synchronization
@@ -315,7 +316,7 @@ impl Inode for FileInode {
 
     fn chmod(&self, mode: Mode) -> KResult<()> {
         let _vfs = acquire(&self.vfs)?;
-        let _lock = self.rwsem.write();
+        let _lock = Task::block_on(self.rwsem.write());
 
         // SAFETY: `rwsem` has done the synchronization
         let old = self.mode.load(Ordering::Relaxed);

+ 9 - 4
src/kernel/chardev.rs

@@ -15,6 +15,7 @@ use alloc::{
     collections::btree_map::{BTreeMap, Entry},
     sync::Arc,
 };
+use eonix_runtime::task::Task;
 use eonix_sync::AsProof as _;
 
 pub trait VirtualCharDevice: Send + Sync {
@@ -38,7 +39,7 @@ static CHAR_DEVICES: Spin<BTreeMap<DevId, Arc<CharDevice>>> = Spin::new(BTreeMap
 impl CharDevice {
     pub fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
         match &self.device {
-            CharDeviceType::Terminal(terminal) => terminal.read(buffer),
+            CharDeviceType::Terminal(terminal) => Task::block_on(terminal.read(buffer)),
             CharDeviceType::Virtual(device) => device.read(buffer),
         }
     }
@@ -72,13 +73,17 @@ impl CharDevice {
     pub fn open(self: &Arc<Self>) -> KResult<Arc<File>> {
         Ok(match &self.device {
             CharDeviceType::Terminal(terminal) => {
-                let procs = ProcessList::get().read();
+                let procs = Task::block_on(ProcessList::get().read());
                 let current = Thread::current();
                 let session = current.process.session(procs.prove());
                 // We only set the control terminal if the process is the session leader.
                 if session.sid == Thread::current().process.pid {
                     // Silently fail if we can't set the control terminal.
-                    dont_check!(session.set_control_terminal(&terminal, false, procs.prove()));
+                    dont_check!(Task::block_on(session.set_control_terminal(
+                        &terminal,
+                        false,
+                        procs.prove()
+                    )));
                 }
 
                 TerminalFile::new(terminal.clone())
@@ -116,7 +121,7 @@ struct ConsoleDevice;
 impl VirtualCharDevice for ConsoleDevice {
     fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
         let console_terminal = get_console().ok_or(EIO)?;
-        console_terminal.read(buffer)
+        Task::block_on(console_terminal.read(buffer))
     }
 
     fn write(&self, data: &[u8]) -> KResult<usize> {

+ 1 - 0
src/kernel/constants.rs

@@ -23,6 +23,7 @@ pub const ENOENT: u32 = 2;
 pub const EIO: u32 = 5;
 pub const ENXIO: u32 = 6;
 pub const ENOEXEC: u32 = 8;
+pub const EFAULT: u32 = 14;
 pub const EEXIST: u32 = 17;
 pub const ENOSYS: u32 = 38;
 

+ 13 - 16
src/kernel/mem/mm_list.rs

@@ -2,16 +2,15 @@ mod page_fault;
 
 use super::{MMArea, Page, PageTable, VAddr, VRange};
 use crate::kernel::vfs::dentry::Dentry;
-use crate::{
-    prelude::*,
-    sync::{mutex_new, ArcSwap},
-};
+use crate::{prelude::*, sync::ArcSwap};
 use alloc::{collections::btree_set::BTreeSet, sync::Arc};
 use bindings::{EEXIST, EFAULT, EINVAL, ENOMEM, KERNEL_PML4};
 use core::{
     ops::Sub as _,
     sync::atomic::{AtomicUsize, Ordering},
 };
+use eonix_runtime::task::Task;
+use eonix_sync::Mutex;
 
 pub use page_fault::handle_page_fault;
 
@@ -197,7 +196,7 @@ impl MMList {
         let page_table = PageTable::new();
         Self {
             root_page_table: AtomicUsize::from(page_table.root_page_table()),
-            inner: ArcSwap::new(mutex_new(MMListInner {
+            inner: ArcSwap::new(Mutex::new(MMListInner {
                 areas: BTreeSet::new(),
                 page_table,
                 break_start: None,
@@ -208,12 +207,12 @@ impl MMList {
 
     pub fn new_cloned(&self) -> Self {
         let inner = self.inner.borrow();
-        let inner = inner.lock();
+        let inner = Task::block_on(inner.lock());
 
         let page_table = PageTable::new();
         let list = Self {
             root_page_table: AtomicUsize::from(page_table.root_page_table()),
-            inner: ArcSwap::new(mutex_new(MMListInner {
+            inner: ArcSwap::new(Mutex::new(MMListInner {
                 areas: inner.areas.clone(),
                 page_table,
                 break_start: inner.break_start,
@@ -223,7 +222,7 @@ impl MMList {
 
         {
             let list_inner = list.inner.borrow();
-            let list_inner = list_inner.lock();
+            let list_inner = Task::block_on(list_inner.lock());
 
             for area in list_inner.areas.iter() {
                 let new_iter = list_inner.page_table.iter_user(area.range()).unwrap();
@@ -284,7 +283,7 @@ impl MMList {
 
     /// No need to do invalidation manually, `PageTable` already does it.
     pub fn unmap(&self, start: VAddr, len: usize) -> KResult<()> {
-        self.inner.borrow().lock().unmap(start, len)
+        Task::block_on(self.inner.borrow().lock()).unmap(start, len)
     }
 
     pub fn mmap_hint(
@@ -295,7 +294,7 @@ impl MMList {
         permission: Permission,
     ) -> KResult<VAddr> {
         let inner = self.inner.borrow();
-        let mut inner = inner.lock();
+        let mut inner = Task::block_on(inner.lock());
 
         if hint == VAddr::NULL {
             let at = inner.find_available(hint, len).ok_or(ENOMEM)?;
@@ -321,16 +320,14 @@ impl MMList {
         mapping: Mapping,
         permission: Permission,
     ) -> KResult<VAddr> {
-        self.inner
-            .borrow()
-            .lock()
+        Task::block_on(self.inner.borrow().lock())
             .mmap(at, len, mapping.clone(), permission)
             .map(|_| at)
     }
 
     pub fn set_break(&self, pos: Option<VAddr>) -> VAddr {
         let inner = self.inner.borrow();
-        let mut inner = inner.lock();
+        let mut inner = Task::block_on(inner.lock());
 
         // SAFETY: `set_break` is only called in syscalls, where program break should be valid.
         assert!(inner.break_start.is_some() && inner.break_pos.is_some());
@@ -380,7 +377,7 @@ impl MMList {
     /// This should be called only **once** for every thread.
     pub fn register_break(&self, start: VAddr) {
         let inner = self.inner.borrow();
-        let mut inner = inner.lock();
+        let mut inner = Task::block_on(inner.lock());
         assert!(inner.break_start.is_none() && inner.break_pos.is_none());
 
         inner.break_start = Some(start.into());
@@ -400,7 +397,7 @@ impl MMList {
         }
 
         let inner = self.inner.borrow();
-        let inner = inner.lock();
+        let inner = Task::block_on(inner.lock());
 
         let mut offset = 0;
         let mut remaining = len;

+ 5 - 6
src/kernel/mem/mm_list/page_fault.rs

@@ -1,11 +1,10 @@
-use arch::InterruptContext;
-use bitflags::bitflags;
-
+use super::{MMList, VAddr};
 use crate::kernel::mem::{Mapping, VRange};
 use crate::kernel::task::{ProcessList, Signal, Thread};
 use crate::prelude::*;
-
-use super::{MMList, VAddr};
+use arch::InterruptContext;
+use bitflags::bitflags;
+use eonix_runtime::task::Task;
 
 bitflags! {
     pub struct PageFaultError: u64 {
@@ -35,7 +34,7 @@ impl MMList {
         error: PageFaultError,
     ) -> Result<(), Signal> {
         let inner = self.inner.borrow();
-        let inner = inner.lock();
+        let inner = Task::block_on(inner.lock());
 
         let area = match inner.areas.get(&VRange::from(addr)) {
             Some(area) => area,

+ 7 - 6
src/kernel/syscall/file_rw.rs

@@ -4,6 +4,7 @@ use bindings::{
     statx, AT_FDCWD, AT_STATX_SYNC_AS_STAT, AT_STATX_SYNC_TYPE, AT_SYMLINK_NOFOLLOW, EBADF, EFAULT,
     EINVAL, ENOENT, SEEK_CUR, SEEK_END, SEEK_SET, S_IFBLK, S_IFCHR,
 };
+use eonix_runtime::task::Task;
 
 use crate::{
     io::{Buffer, BufferFill},
@@ -31,14 +32,14 @@ fn do_read(fd: u32, buffer: *mut u8, bufsize: usize) -> KResult<usize> {
     let mut buffer = UserBuffer::new(buffer, bufsize)?;
     let files = FileArray::get_current();
 
-    files.get(fd).ok_or(EBADF)?.read(&mut buffer)
+    Task::block_on(files.get(fd).ok_or(EBADF)?.read(&mut buffer))
 }
 
 fn do_write(fd: u32, buffer: *const u8, count: usize) -> KResult<usize> {
     let data = unsafe { core::slice::from_raw_parts(buffer, count) };
     let files = FileArray::get_current();
 
-    files.get(fd).ok_or(EBADF)?.write(data)
+    Task::block_on(files.get(fd).ok_or(EBADF)?.write(data))
 }
 
 fn do_open(path: *const u8, flags: u32, mode: u32) -> KResult<u32> {
@@ -246,7 +247,7 @@ fn do_readv(fd: u32, iov_user: *const IoVec32, iovcnt: u32) -> KResult<usize> {
     let mut tot = 0usize;
     for mut buffer in iov_buffers.into_iter() {
         // TODO!!!: `readv`
-        let nread = file.read(&mut buffer)?;
+        let nread = Task::block_on(file.read(&mut buffer))?;
         tot += nread;
 
         if nread != buffer.total() {
@@ -283,7 +284,7 @@ fn do_writev(fd: u32, iov_user: *const u8, iovcnt: u32) -> KResult<usize> {
         // TODO!!!: atomic `writev`
         // TODO!!!!!: copy from user
         let slice = block.as_slice();
-        let nread = file.write(slice)?;
+        let nread = Task::block_on(file.write(slice))?;
         tot += nread;
 
         if nread == 0 || nread != slice.len() {
@@ -324,7 +325,7 @@ fn do_sendfile64(out_fd: u32, in_fd: u32, offset: *mut u8, count: usize) -> KRes
         unimplemented!("sendfile64 with offset");
     }
 
-    in_file.sendfile(&out_file, count)
+    Task::block_on(in_file.sendfile(&out_file, count))
 }
 
 fn do_ioctl(fd: u32, request: usize, arg3: usize) -> KResult<usize> {
@@ -359,7 +360,7 @@ fn do_poll(fds: *mut UserPollFd, nfds: u32, _timeout: u32) -> KResult<u32> {
             let mut fd = fds.read()?;
 
             let file = Thread::current().files.get(fd.fd).ok_or(EBADF)?;
-            fd.revents = file.poll(PollEvent::from_bits_retain(fd.events))?.bits();
+            fd.revents = Task::block_on(file.poll(PollEvent::from_bits_retain(fd.events)))?.bits();
 
             fds.write(fd)?;
             Ok(1)

+ 11 - 12
src/kernel/syscall/procops.rs

@@ -22,6 +22,7 @@ use arch::{ExtendedContext, InterruptContext};
 use bindings::{EINVAL, ENOENT, ENOTDIR, ERANGE, ESRCH};
 use bitflags::bitflags;
 use eonix_runtime::scheduler::Scheduler;
+use eonix_runtime::task::Task;
 use eonix_sync::AsProof as _;
 
 fn do_umask(mask: u32) -> KResult<u32> {
@@ -159,14 +160,13 @@ fn sys_exit(int_stack: &mut InterruptContext, _: &mut ExtendedContext) -> usize
     let status = int_stack.rbx as u32;
 
     unsafe {
-        let mut procs = ProcessList::get().write();
-        eonix_preempt::disable();
-
-        // SAFETY: Preemption is disabled.
+        let mut procs = Task::block_on(ProcessList::get().write());
         procs.do_kill_process(&Thread::current().process, WaitType::Exited(status));
     }
 
     unsafe {
+        eonix_preempt::disable();
+
         // SAFETY: Preempt count == 1.
         Thread::exit();
     }
@@ -189,11 +189,11 @@ fn do_waitpid(_waitpid: u32, arg1: *mut u32, options: u32) -> KResult<u32> {
         Some(options) => options,
     };
 
-    let wait_object = Thread::current().process.wait(
+    let wait_object = Task::block_on(Thread::current().process.wait(
         options.contains(UserWaitOptions::WNOHANG),
         options.contains(UserWaitOptions::WUNTRACED),
         options.contains(UserWaitOptions::WCONTINUED),
-    )?;
+    ))?;
 
     match wait_object {
         None => Ok(0),
@@ -234,7 +234,7 @@ fn do_getsid(pid: u32) -> KResult<u32> {
     if pid == 0 {
         Ok(Thread::current().process.session_rcu().sid)
     } else {
-        let procs = ProcessList::get().read();
+        let procs = Task::block_on(ProcessList::get().read());
         procs
             .try_find_process(pid)
             .map(|proc| proc.session(procs.prove()).sid)
@@ -246,7 +246,7 @@ fn do_getpgid(pid: u32) -> KResult<u32> {
     if pid == 0 {
         Ok(Thread::current().process.pgroup_rcu().pgid)
     } else {
-        let procs = ProcessList::get().read();
+        let procs = Task::block_on(ProcessList::get().read());
         procs
             .try_find_process(pid)
             .map(|proc| proc.pgroup(procs.prove()).pgid)
@@ -324,7 +324,7 @@ fn do_prctl(option: u32, arg2: usize) -> KResult<()> {
 }
 
 fn do_kill(pid: i32, sig: u32) -> KResult<()> {
-    let procs = ProcessList::get().read();
+    let procs = Task::block_on(ProcessList::get().read());
     match pid {
         // Send signal to every process for which the calling process has
         // permission to send signals.
@@ -350,8 +350,7 @@ fn do_kill(pid: i32, sig: u32) -> KResult<()> {
 }
 
 fn do_tkill(tid: u32, sig: u32) -> KResult<()> {
-    ProcessList::get()
-        .read()
+    Task::block_on(ProcessList::get().read())
         .try_find_thread(tid)
         .ok_or(ESRCH)?
         .raise(Signal::try_from(sig)?);
@@ -582,7 +581,7 @@ fn sys_vfork(int_stack: &mut InterruptContext, ext: &mut ExtendedContext) -> usi
 }
 
 fn sys_fork(int_stack: &mut InterruptContext, _: &mut ExtendedContext) -> usize {
-    let mut procs = ProcessList::get().write();
+    let mut procs = Task::block_on(ProcessList::get().write());
 
     let current = Thread::current();
     let current_process = current.process.clone();

+ 28 - 20
src/kernel/task/process.rs

@@ -6,7 +6,7 @@ use crate::{
     kernel::mem::MMList,
     prelude::*,
     rcu::{rcu_sync, RCUPointer, RCUReadGuard},
-    sync::{CondVar, RwLockReadGuard},
+    sync::CondVar,
 };
 use alloc::{
     collections::{btree_map::BTreeMap, vec_deque::VecDeque},
@@ -14,8 +14,10 @@ use alloc::{
 };
 use bindings::{ECHILD, EINTR, EPERM, ESRCH};
 use core::sync::atomic::{AtomicU32, Ordering};
+use eonix_runtime::task::Task;
 use eonix_sync::{
-    AsProof as _, AsProofMut as _, ForceUnlockableGuard as _, Locked, Proof, ProofMut, SpinGuard,
+    AsProof as _, AsProofMut as _, Locked, Proof, ProofMut, RwLockReadGuard, SpinGuard,
+    UnlockableGuard as _, UnlockedGuard as _,
 };
 use pointers::BorrowedArc;
 
@@ -258,7 +260,7 @@ impl Process {
             .is_none());
     }
 
-    pub fn wait(
+    pub async fn wait(
         &self,
         no_block: bool,
         trace_stop: bool,
@@ -284,14 +286,14 @@ impl Process {
                     return Ok(None);
                 }
 
-                waits.wait()?;
+                waits = waits.wait().await?;
             }
         };
 
         if wait_object.stopped().is_some() || wait_object.is_continue() {
             Ok(Some(wait_object))
         } else {
-            let mut procs = ProcessList::get().write();
+            let mut procs = ProcessList::get().write().await;
             procs.remove_process(wait_object.pid);
             assert!(self
                 .inner
@@ -306,7 +308,7 @@ impl Process {
 
     /// Create a new session for the process.
     pub fn setsid(self: &Arc<Self>) -> KResult<u32> {
-        let mut process_list = ProcessList::get().write();
+        let mut process_list = Task::block_on(ProcessList::get().write());
         // If there exists a session that has the same sid as our pid, we can't create a new
         // session. The standard says that we should create a new process group and be the
         // only process in the new process group and session.
@@ -323,7 +325,7 @@ impl Process {
             let _old_session = unsafe { self.session.swap(Some(session.clone())) }.unwrap();
             let old_pgroup = unsafe { self.pgroup.swap(Some(pgroup.clone())) }.unwrap();
             old_pgroup.remove_member(self.pid, process_list.prove_mut());
-            rcu_sync();
+            Task::block_on(rcu_sync());
         }
 
         Ok(pgroup.pgid)
@@ -372,7 +374,7 @@ impl Process {
         pgroup.remove_member(self.pid, procs.prove_mut());
         {
             let _old_pgroup = unsafe { self.pgroup.swap(Some(new_pgroup)) }.unwrap();
-            rcu_sync();
+            Task::block_on(rcu_sync());
         }
 
         Ok(())
@@ -383,7 +385,7 @@ impl Process {
     /// This function should be called on the process that issued the syscall in order to do
     /// permission checks.
     pub fn setpgid(self: &Arc<Self>, pid: u32, pgid: u32) -> KResult<()> {
-        let mut procs = ProcessList::get().write();
+        let mut procs = Task::block_on(ProcessList::get().write());
         // We may set pgid of either the calling process or a child process.
         if pid == self.pid {
             self.do_setpgid(pgid, &mut procs)
@@ -485,7 +487,7 @@ impl WaitList {
     /// releases the lock on `ProcessList` and `WaitList` and waits on `cv_wait_procs`.
     pub fn entry(&self, want_stop: bool, want_continue: bool) -> Entry {
         Entry {
-            process_list: ProcessList::get().read(),
+            process_list: Task::block_on(ProcessList::get().read()),
             wait_procs: self.wait_procs.lock(),
             cv: &self.cv_wait_procs,
             want_stop,
@@ -518,19 +520,25 @@ impl Entry<'_, '_, '_> {
         }
     }
 
-    pub fn wait(&mut self) -> KResult<()> {
-        // SAFETY: We will lock it again after returning from `cv.wait`.
-        unsafe { self.wait_procs.force_unlock() };
+    pub fn wait(self) -> impl core::future::Future<Output = KResult<Self>> {
+        let wait_procs = self.wait_procs.unlock();
 
-        self.cv.wait(&mut self.process_list);
+        async move {
+            let process_list = self.cv.wait(self.process_list).await;
+            let wait_procs = wait_procs.relock().await;
 
-        // SAFETY: We will lock it again.
-        unsafe { self.wait_procs.force_relock() };
-
-        if Thread::current().signal_list.has_pending_signal() {
-            return Err(EINTR);
+            if Thread::current().signal_list.has_pending_signal() {
+                Err(EINTR)
+            } else {
+                Ok(Self {
+                    wait_procs,
+                    process_list,
+                    cv: self.cv,
+                    want_stop: self.want_stop,
+                    want_continue: self.want_continue,
+                })
+            }
         }
-        Ok(())
     }
 }
 

+ 18 - 9
src/kernel/task/process_list.rs

@@ -1,11 +1,12 @@
 use super::{Process, ProcessGroup, Session, Signal, Thread, WaitObject, WaitType};
-use crate::{prelude::*, rcu::rcu_sync, sync::rwlock_new};
+use crate::{prelude::*, rcu::rcu_sync};
 use alloc::{
     collections::btree_map::BTreeMap,
     sync::{Arc, Weak},
 };
 use bindings::KERNEL_PML4;
-use eonix_sync::{AsProof as _, AsProofMut as _};
+use eonix_runtime::task::Task;
+use eonix_sync::{AsProof as _, AsProofMut as _, RwLock};
 
 pub struct ProcessList {
     /// The init process.
@@ -20,7 +21,7 @@ pub struct ProcessList {
     sessions: BTreeMap<u32, Weak<Session>>,
 }
 
-static GLOBAL_PROC_LIST: RwLock<ProcessList> = rwlock_new(ProcessList {
+static GLOBAL_PROC_LIST: RwLock<ProcessList> = RwLock::new(ProcessList {
     init: None,
     threads: BTreeMap::new(),
     processes: BTreeMap::new(),
@@ -51,14 +52,14 @@ impl ProcessList {
 
     pub fn kill_current(signal: Signal) -> ! {
         unsafe {
-            let mut process_list = ProcessList::get().write();
-            eonix_preempt::disable();
+            let mut process_list = Task::block_on(ProcessList::get().write());
 
-            // SAFETY: Preemption disabled.
             process_list.do_kill_process(&Thread::current().process, WaitType::Signaled(signal));
         }
 
         unsafe {
+            eonix_preempt::disable();
+
             // SAFETY: Preempt count == 1.
             Thread::exit();
         }
@@ -74,7 +75,7 @@ impl ProcessList {
             let pgroup = unsafe { thread.process.pgroup.swap(None) }.unwrap();
             let _parent = unsafe { thread.process.parent.swap(None) }.unwrap();
             pgroup.remove_member(pid, self.prove_mut());
-            rcu_sync();
+            Task::block_on(rcu_sync());
 
             if Arc::strong_count(&pgroup) == 1 {
                 self.pgroups.remove(&pgroup.pgid);
@@ -115,7 +116,9 @@ impl ProcessList {
 
     /// Make the process a zombie and notify the parent.
     /// # Safety
-    /// This function needs to be called with preemption disabled.
+    /// This function will destroy the process and all its threads.
+    /// It is the caller's responsibility to ensure that the process is not
+    /// running or will not run after this function is called.
     pub unsafe fn do_kill_process(&mut self, process: &Arc<Process>, status: WaitType) {
         if process.pid == 1 {
             panic!("init exited");
@@ -132,11 +135,15 @@ impl ProcessList {
 
         // If we are the session leader, we should drop the control terminal.
         if process.session(self.prove()).sid == process.pid {
-            if let Some(terminal) = process.session(self.prove()).drop_control_terminal() {
+            if let Some(terminal) =
+                Task::block_on(process.session(self.prove()).drop_control_terminal())
+            {
                 terminal.drop_session();
             }
         }
 
+        eonix_preempt::disable();
+
         // Release the MMList as well as the page table.
         // Before we release the page table, we need to switch to the kernel page table.
         arch::set_root_page_table(KERNEL_PML4 as usize);
@@ -144,6 +151,8 @@ impl ProcessList {
             process.mm_list.release();
         }
 
+        eonix_preempt::enable();
+
         // Make children orphans (adopted by init)
         {
             let init = self.init_process();

+ 18 - 14
src/kernel/task/session.rs

@@ -1,11 +1,11 @@
 use super::{Process, ProcessGroup, ProcessList, Signal, Thread};
-use crate::{kernel::Terminal, prelude::*, sync::rwlock_new};
+use crate::{kernel::Terminal, prelude::*};
 use alloc::{
     collections::btree_map::BTreeMap,
     sync::{Arc, Weak},
 };
 use bindings::EPERM;
-use eonix_sync::{AsProof as _, AsProofMut as _, Locked, Proof, ProofMut};
+use eonix_sync::{AsProof as _, AsProofMut as _, Locked, Proof, ProofMut, RwLock};
 
 #[derive(Debug)]
 struct SessionJobControl {
@@ -30,7 +30,7 @@ impl Session {
         let session = Arc::new(Self {
             sid: leader.pid,
             leader: Arc::downgrade(leader),
-            job_control: rwlock_new(SessionJobControl {
+            job_control: RwLock::new(SessionJobControl {
                 foreground: Weak::new(),
                 control_terminal: None,
             }),
@@ -55,15 +55,19 @@ impl Session {
         assert!(self.groups.access_mut(procs).remove(&pgid).is_some());
     }
 
-    pub fn foreground(&self) -> Option<Arc<ProcessGroup>> {
-        self.job_control.read().foreground.upgrade()
+    pub async fn foreground(&self) -> Option<Arc<ProcessGroup>> {
+        self.job_control.read().await.foreground.upgrade()
     }
 
     /// Set the foreground process group identified by `pgid`.
     /// The process group must belong to the session.
-    pub fn set_foreground_pgid(&self, pgid: u32, procs: Proof<'_, ProcessList>) -> KResult<()> {
+    pub async fn set_foreground_pgid(
+        &self,
+        pgid: u32,
+        procs: Proof<'_, ProcessList>,
+    ) -> KResult<()> {
         if let Some(group) = self.groups.access(procs).get(&pgid) {
-            self.job_control.write().foreground = group.clone();
+            self.job_control.write().await.foreground = group.clone();
             Ok(())
         } else {
             // TODO: Check if the process group refers to an existing process group.
@@ -74,13 +78,13 @@ impl Session {
 
     /// Only session leaders can set the control terminal.
     /// Make sure we've checked that before calling this function.
-    pub fn set_control_terminal(
+    pub async fn set_control_terminal(
         self: &Arc<Self>,
         terminal: &Arc<Terminal>,
         forced: bool,
         procs: Proof<'_, ProcessList>,
     ) -> KResult<()> {
-        let mut job_control = self.job_control.write();
+        let mut job_control = self.job_control.write().await;
         if let Some(_) = job_control.control_terminal.as_ref() {
             if let Some(session) = terminal.session().as_ref() {
                 if session.sid == self.sid {
@@ -97,15 +101,15 @@ impl Session {
 
     /// Drop the control terminal reference inside the session.
     /// DO NOT TOUCH THE TERMINAL'S SESSION FIELD.
-    pub fn drop_control_terminal(&self) -> Option<Arc<Terminal>> {
-        let mut inner = self.job_control.write();
+    pub async fn drop_control_terminal(&self) -> Option<Arc<Terminal>> {
+        let mut inner = self.job_control.write().await;
         inner.foreground = Weak::new();
         inner.control_terminal.take()
     }
 
-    pub fn raise_foreground(&self, signal: Signal) {
-        if let Some(fg) = self.foreground() {
-            let procs = ProcessList::get().read();
+    pub async fn raise_foreground(&self, signal: Signal) {
+        if let Some(fg) = self.foreground().await {
+            let procs = ProcessList::get().read().await;
             fg.raise(signal, procs.prove());
         }
     }

+ 8 - 9
src/kernel/task/signal.rs

@@ -13,6 +13,7 @@ use bindings::{EFAULT, EINVAL};
 use core::{cmp::Reverse, task::Waker};
 use eonix_runtime::task::Task;
 use eonix_sync::AsProof as _;
+use intrusive_collections::UnsafeRef;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
 pub struct Signal(u32);
@@ -62,19 +63,17 @@ pub struct SignalAction {
     pub sa_mask: usize,
 }
 
-#[derive(Debug)]
 struct SignalListInner {
     mask: u64,
     pending: BinaryHeap<Reverse<Signal>>,
 
-    signal_waker: Option<Waker>,
+    signal_waker: Option<UnsafeRef<dyn Fn() + Send + Sync>>,
     stop_waker: Option<Waker>,
 
     // TODO!!!!!: Signal disposition should be per-process.
     handlers: BTreeMap<Signal, SignalAction>,
 }
 
-#[derive(Debug)]
 pub struct SignalList {
     inner: Spin<SignalListInner>,
 }
@@ -265,9 +264,9 @@ impl SignalListInner {
             _ => {
                 // If we don't have a waker here, we are not permitted to be woken up.
                 // We would run in the end anyway.
-                self.signal_waker
-                    .as_ref()
-                    .inspect(|waker| waker.wake_by_ref());
+                if let Some(waker) = self.signal_waker.take() {
+                    waker();
+                }
             }
         }
 
@@ -328,7 +327,7 @@ impl SignalList {
             .unwrap_or_else(SignalAction::default_action)
     }
 
-    pub fn set_signal_waker(&self, waker: Option<Waker>) {
+    pub fn set_signal_waker(&self, waker: Option<UnsafeRef<dyn Fn() + Send + Sync>>) {
         let mut inner = self.inner.lock();
         inner.signal_waker = waker;
     }
@@ -411,7 +410,7 @@ impl SignalList {
                                 pid: thread.process.pid,
                                 code: WaitType::Stopped(signal),
                             },
-                            ProcessList::get().read().prove(),
+                            Task::block_on(ProcessList::get().read()).prove(),
                         );
                     }
 
@@ -435,7 +434,7 @@ impl SignalList {
                                 pid: thread.process.pid,
                                 code: WaitType::Continued,
                             },
-                            ProcessList::get().read().prove(),
+                            Task::block_on(ProcessList::get().read()).prove(),
                         );
                     }
                 }

+ 32 - 34
src/kernel/terminal.rs

@@ -2,11 +2,7 @@ use super::{
     task::{ProcessList, Session, Signal, Thread},
     user::{UserPointer, UserPointerMut},
 };
-use crate::{
-    io::Buffer,
-    prelude::*,
-    sync::{mutex_new, CondVar},
-};
+use crate::{io::Buffer, prelude::*, sync::CondVar};
 use alloc::{
     collections::vec_deque::VecDeque,
     sync::{Arc, Weak},
@@ -14,7 +10,8 @@ use alloc::{
 use bindings::{EINTR, ENOTTY, EPERM};
 use bitflags::bitflags;
 use eonix_log::ConsoleWrite;
-use eonix_sync::AsProof as _;
+use eonix_runtime::task::Task;
+use eonix_sync::{AsProof as _, Mutex};
 
 const BUFFER_SIZE: usize = 4096;
 
@@ -349,6 +346,8 @@ impl Termios {
 
 pub trait TerminalDevice: Send + Sync {
     fn putchar(&self, ch: u8);
+
+    fn putchar_direct(&self, ch: u8);
 }
 
 struct TerminalInner {
@@ -400,7 +399,7 @@ impl core::fmt::Debug for Terminal {
 impl Terminal {
     pub fn new(device: Arc<dyn TerminalDevice>) -> Arc<Self> {
         Arc::new(Self {
-            inner: mutex_new(TerminalInner {
+            inner: Mutex::new(TerminalInner {
                 termio: Termios::new_standard(),
                 session: Weak::new(),
                 buffer: VecDeque::with_capacity(BUFFER_SIZE),
@@ -457,7 +456,7 @@ impl Terminal {
 
     fn signal(&self, inner: &mut TerminalInner, signal: Signal) {
         if let Some(session) = inner.session.upgrade() {
-            session.raise_foreground(signal);
+            Task::block_on(session.raise_foreground(signal));
         }
         if !inner.termio.noflsh() {
             self.clear_read_buffer(inner);
@@ -484,8 +483,8 @@ impl Terminal {
     }
 
     // TODO: Find a better way to handle this.
-    pub fn commit_char(&self, ch: u8) {
-        let mut inner = self.inner.lock();
+    pub async fn commit_char(&self, ch: u8) {
+        let mut inner = self.inner.lock().await;
         if inner.termio.isig() {
             match ch {
                 0xff => {}
@@ -532,10 +531,10 @@ impl Terminal {
         }
     }
 
-    pub fn poll_in(&self) -> KResult<()> {
-        let mut inner = self.inner.lock();
+    pub async fn poll_in(&self) -> KResult<()> {
+        let inner = self.inner.lock().await;
         if inner.buffer.is_empty() {
-            self.cv.wait(&mut inner);
+            let _inner = self.cv.wait(inner).await;
 
             if Thread::current().signal_list.has_pending_signal() {
                 return Err(EINTR);
@@ -544,7 +543,7 @@ impl Terminal {
         Ok(())
     }
 
-    pub fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
+    pub async fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
         let mut tmp_buffer = [0u8; 32];
 
         let data = 'block: {
@@ -552,9 +551,9 @@ impl Terminal {
                 break 'block &tmp_buffer[..0];
             }
 
-            let mut inner = self.inner.lock();
+            let mut inner = self.inner.lock().await;
             if inner.buffer.is_empty() {
-                self.cv.wait(&mut inner);
+                inner = self.cv.wait(inner).await;
 
                 if Thread::current().signal_list.has_pending_signal() {
                     return Err(EINTR);
@@ -595,27 +594,26 @@ impl Terminal {
         buffer.fill(data).map(|result| result.allow_partial())
     }
 
-    pub fn ioctl(&self, request: TerminalIORequest) -> KResult<()> {
+    pub async fn ioctl(&self, request: TerminalIORequest<'_>) -> KResult<()> {
         match request {
             TerminalIORequest::GetProcessGroup(pgid_pointer) => {
-                let session = self.inner.lock().session.upgrade();
-                let pgroup = session.map(|session| session.foreground()).flatten();
-
-                if let Some(pgroup) = pgroup {
-                    pgid_pointer.write(pgroup.pgid)
-                } else {
-                    Err(ENOTTY)
+                if let Some(session) = self.inner.lock().await.session.upgrade() {
+                    if let Some(pgroup) = session.foreground().await {
+                        return pgid_pointer.write(pgroup.pgid);
+                    }
                 }
+
+                Err(ENOTTY)
             }
             TerminalIORequest::SetProcessGroup(pgid) => {
                 let pgid = pgid.read()?;
 
-                let procs = ProcessList::get().read();
-                let inner = self.inner.lock();
+                let procs = ProcessList::get().read().await;
+                let inner = self.inner.lock().await;
                 let session = inner.session.upgrade();
 
                 if let Some(session) = session {
-                    session.set_foreground_pgid(pgid, procs.prove())
+                    session.set_foreground_pgid(pgid, procs.prove()).await
                 } else {
                     Err(ENOTTY)
                 }
@@ -632,12 +630,12 @@ impl Terminal {
                 ptr.write(window_size)
             }
             TerminalIORequest::GetTermios(ptr) => {
-                let termios = self.inner.lock().termio.get_user();
+                let termios = Task::block_on(self.inner.lock()).termio.get_user();
                 ptr.write(termios)
             }
             TerminalIORequest::SetTermios(ptr) => {
                 let user_termios = ptr.read()?;
-                let mut inner = self.inner.lock();
+                let mut inner = Task::block_on(self.inner.lock());
 
                 // TODO: We ignore unknown bits for now.
                 inner.termio.iflag = TermioIFlags::from_bits_truncate(user_termios.iflag as u16);
@@ -654,12 +652,12 @@ impl Terminal {
 
     /// Assign the `session` to this terminal. Drop the previous session if `forced` is true.
     pub fn set_session(&self, session: &Arc<Session>, forced: bool) -> KResult<()> {
-        let mut inner = self.inner.lock();
+        let mut inner = Task::block_on(self.inner.lock());
         if let Some(session) = inner.session.upgrade() {
             if !forced {
                 Err(EPERM)
             } else {
-                session.drop_control_terminal();
+                Task::block_on(session.drop_control_terminal());
                 inner.session = Arc::downgrade(&session);
                 Ok(())
             }
@@ -671,18 +669,18 @@ impl Terminal {
     }
 
     pub fn drop_session(&self) {
-        self.inner.lock().session = Weak::new();
+        Task::block_on(self.inner.lock()).session = Weak::new();
     }
 
     pub fn session(&self) -> Option<Arc<Session>> {
-        self.inner.lock().session.upgrade()
+        Task::block_on(self.inner.lock()).session.upgrade()
     }
 }
 
 impl ConsoleWrite for Terminal {
     fn write(&self, s: &str) {
         for &ch in s.as_bytes() {
-            self.show_char(ch);
+            self.device.putchar_direct(ch);
         }
     }
 }

+ 40 - 38
src/kernel/vfs/file.rs

@@ -14,7 +14,7 @@ use crate::{
         CharDevice,
     },
     prelude::*,
-    sync::{mutex_new, CondVar},
+    sync::CondVar,
 };
 use alloc::{collections::vec_deque::VecDeque, sync::Arc};
 use bindings::{
@@ -22,6 +22,8 @@ use bindings::{
 };
 use bitflags::bitflags;
 use core::{ops::ControlFlow, sync::atomic::Ordering};
+use eonix_runtime::task::Task;
+use eonix_sync::Mutex;
 
 pub struct InodeFile {
     read: bool,
@@ -107,7 +109,7 @@ impl Pipe {
 
     pub fn new() -> Arc<Self> {
         Arc::new(Self {
-            inner: mutex_new(PipeInner {
+            inner: Mutex::new(PipeInner {
                 buffer: VecDeque::with_capacity(Self::PIPE_SIZE),
                 read_closed: false,
                 write_closed: false,
@@ -127,7 +129,7 @@ impl Pipe {
     }
 
     fn close_read(&self) {
-        let mut inner = self.inner.lock();
+        let mut inner = Task::block_on(self.inner.lock());
         if inner.read_closed {
             return;
         }
@@ -137,7 +139,7 @@ impl Pipe {
     }
 
     fn close_write(&self) {
-        let mut inner = self.inner.lock();
+        let mut inner = Task::block_on(self.inner.lock());
         if inner.write_closed {
             return;
         }
@@ -146,11 +148,11 @@ impl Pipe {
         self.cv_read.notify_all();
     }
 
-    fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
-        let mut inner = self.inner.lock();
+    async fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
+        let mut inner = self.inner.lock().await;
 
         while !inner.write_closed && inner.buffer.is_empty() {
-            self.cv_read.wait(&mut inner);
+            inner = self.cv_read.wait(inner).await;
             if Thread::current().signal_list.has_pending_signal() {
                 return Err(EINTR);
             }
@@ -164,8 +166,8 @@ impl Pipe {
         Ok(nread)
     }
 
-    fn write_atomic(&self, data: &[u8]) -> KResult<usize> {
-        let mut inner = self.inner.lock();
+    async fn write_atomic(&self, data: &[u8]) -> KResult<usize> {
+        let mut inner = self.inner.lock().await;
 
         if inner.read_closed {
             send_sigpipe_to_current();
@@ -173,7 +175,7 @@ impl Pipe {
         }
 
         while inner.buffer.len() + data.len() > Self::PIPE_SIZE {
-            self.cv_write.wait(&mut inner);
+            inner = self.cv_write.wait(inner).await;
             if Thread::current().signal_list.has_pending_signal() {
                 return Err(EINTR);
             }
@@ -190,8 +192,8 @@ impl Pipe {
         return Ok(data.len());
     }
 
-    fn write_non_atomic(&self, data: &[u8]) -> KResult<usize> {
-        let mut inner = self.inner.lock();
+    async fn write_non_atomic(&self, data: &[u8]) -> KResult<usize> {
+        let mut inner = self.inner.lock().await;
 
         if inner.read_closed {
             send_sigpipe_to_current();
@@ -214,7 +216,7 @@ impl Pipe {
                 break;
             }
 
-            self.cv_write.wait(&mut inner);
+            inner = self.cv_write.wait(inner).await;
             if Thread::current().signal_list.has_pending_signal() {
                 if data.len() != remaining.len() {
                     break;
@@ -231,12 +233,12 @@ impl Pipe {
         Ok(data.len() - remaining.len())
     }
 
-    fn write(&self, data: &[u8]) -> KResult<usize> {
+    async fn write(&self, data: &[u8]) -> KResult<usize> {
         // Writes those are smaller than the pipe size are atomic.
         if data.len() <= Self::PIPE_SIZE {
-            self.write_atomic(data)
+            self.write_atomic(data).await
         } else {
-            self.write_non_atomic(data)
+            self.write_non_atomic(data).await
         }
     }
 }
@@ -287,12 +289,12 @@ impl InodeFile {
             write: rwa.1,
             append: rwa.2,
             mode: cached_mode,
-            cursor: mutex_new(0),
+            cursor: Mutex::new(0),
         }))
     }
 
     fn seek(&self, option: SeekOption) -> KResult<usize> {
-        let mut cursor = self.cursor.lock();
+        let mut cursor = Task::block_on(self.cursor.lock());
 
         let new_cursor = match option {
             SeekOption::Current(off) => cursor.checked_add_signed(off).ok_or(EOVERFLOW)?,
@@ -313,7 +315,7 @@ impl InodeFile {
             return Err(EBADF);
         }
 
-        let mut cursor = self.cursor.lock();
+        let mut cursor = Task::block_on(self.cursor.lock());
 
         // TODO!!!: use `UserBuffer`
         if self.append {
@@ -333,7 +335,7 @@ impl InodeFile {
             return Err(EBADF);
         }
 
-        let mut cursor = self.cursor.lock();
+        let mut cursor = Task::block_on(self.cursor.lock());
 
         let nread = self.dentry.read(buffer, *cursor)?;
 
@@ -342,7 +344,7 @@ impl InodeFile {
     }
 
     fn getdents64(&self, buffer: &mut dyn Buffer) -> KResult<()> {
-        let mut cursor = self.cursor.lock();
+        let mut cursor = Task::block_on(self.cursor.lock());
 
         let nread = self.dentry.readdir(*cursor, |filename, ino| {
             // Filename length + 1 for padding '\0'
@@ -372,7 +374,7 @@ impl InodeFile {
     }
 
     fn getdents(&self, buffer: &mut dyn Buffer) -> KResult<()> {
-        let mut cursor = self.cursor.lock();
+        let mut cursor = Task::block_on(self.cursor.lock());
 
         let nread = self.dentry.readdir(*cursor, |filename, ino| {
             // + 1 for filename length padding '\0', + 1 for d_type.
@@ -406,8 +408,8 @@ impl TerminalFile {
         Arc::new(File::TTY(TerminalFile { terminal: tty }))
     }
 
-    fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
-        self.terminal.read(buffer)
+    async fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
+        self.terminal.read(buffer).await
     }
 
     fn write(&self, buffer: &[u8]) -> KResult<usize> {
@@ -418,32 +420,32 @@ impl TerminalFile {
         Ok(buffer.len())
     }
 
-    fn poll(&self, event: PollEvent) -> KResult<PollEvent> {
+    async fn poll(&self, event: PollEvent) -> KResult<PollEvent> {
         if !event.contains(PollEvent::Readable) {
             unimplemented!("Poll event not supported.")
         }
 
-        self.terminal.poll_in().map(|_| PollEvent::Readable)
+        self.terminal.poll_in().await.map(|_| PollEvent::Readable)
     }
 
     fn ioctl(&self, request: usize, arg3: usize) -> KResult<()> {
-        self.terminal.ioctl(match request as u32 {
+        Task::block_on(self.terminal.ioctl(match request as u32 {
             TCGETS => TerminalIORequest::GetTermios(UserPointerMut::new_vaddr(arg3)?),
             TCSETS => TerminalIORequest::SetTermios(UserPointer::new_vaddr(arg3)?),
             TIOCGPGRP => TerminalIORequest::GetProcessGroup(UserPointerMut::new_vaddr(arg3)?),
             TIOCSPGRP => TerminalIORequest::SetProcessGroup(UserPointer::new_vaddr(arg3)?),
             TIOCGWINSZ => TerminalIORequest::GetWindowSize(UserPointerMut::new_vaddr(arg3)?),
             _ => return Err(EINVAL),
-        })
+        }))
     }
 }
 
 impl File {
-    pub fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
+    pub async fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize> {
         match self {
             File::Inode(inode) => inode.read(buffer),
-            File::PipeRead(pipe) => pipe.pipe.read(buffer),
-            File::TTY(tty) => tty.read(buffer),
+            File::PipeRead(pipe) => pipe.pipe.read(buffer).await,
+            File::TTY(tty) => tty.read(buffer).await,
             File::CharDev(device) => device.read(buffer),
             _ => Err(EBADF),
         }
@@ -464,10 +466,10 @@ impl File {
     //     }
     // }
 
-    pub fn write(&self, buffer: &[u8]) -> KResult<usize> {
+    pub async fn write(&self, buffer: &[u8]) -> KResult<usize> {
         match self {
             File::Inode(inode) => inode.write(buffer),
-            File::PipeWrite(pipe) => pipe.pipe.write(buffer),
+            File::PipeWrite(pipe) => pipe.pipe.write(buffer).await,
             File::TTY(tty) => tty.write(buffer),
             File::CharDev(device) => device.write(buffer),
             _ => Err(EBADF),
@@ -495,7 +497,7 @@ impl File {
         }
     }
 
-    pub fn sendfile(&self, dest_file: &Self, count: usize) -> KResult<usize> {
+    pub async fn sendfile(&self, dest_file: &Self, count: usize) -> KResult<usize> {
         let buffer_page = Page::alloc_one();
 
         match self {
@@ -518,13 +520,13 @@ impl File {
             let slice = &mut buffer_page.as_mut_slice()[..batch_size];
             let mut buffer = ByteBuffer::new(slice);
 
-            let nwrote = self.read(&mut buffer)?;
+            let nwrote = self.read(&mut buffer).await?;
 
             if nwrote == 0 {
                 break;
             }
 
-            tot += dest_file.write(&slice[..nwrote])?;
+            tot += dest_file.write(&slice[..nwrote]).await?;
         }
 
         Ok(tot)
@@ -537,10 +539,10 @@ impl File {
         }
     }
 
-    pub fn poll(&self, event: PollEvent) -> KResult<PollEvent> {
+    pub async fn poll(&self, event: PollEvent) -> KResult<PollEvent> {
         match self {
             File::Inode(_) => Ok(event),
-            File::TTY(tty) => tty.poll(event),
+            File::TTY(tty) => tty.poll(event).await,
             _ => unimplemented!("Poll event not supported."),
         }
     }

+ 16 - 13
src/kernel/vfs/filearray.rs

@@ -90,9 +90,12 @@ impl FileArray {
     }
 
     pub fn close(&self, fd: FD) -> KResult<()> {
-        let mut inner = self.inner.lock();
-        inner.files.remove(&fd).ok_or(EBADF)?;
-        inner.release_fd(fd);
+        let _file = {
+            let mut inner = self.inner.lock();
+            let file = inner.files.remove(&fd).ok_or(EBADF)?;
+            inner.release_fd(fd);
+            file
+        };
         Ok(())
     }
 
@@ -136,10 +139,12 @@ impl FileArray {
             Entry::Vacant(_) => {}
             Entry::Occupied(entry) => {
                 let new_file = entry.into_mut();
+                let mut file_swap = new_file_data;
 
                 new_file.flags = flags;
-                new_file.file = new_file_data;
+                core::mem::swap(&mut file_swap, &mut new_file.file);
 
+                drop(inner);
                 return Ok(new_fd);
             }
         }
@@ -196,21 +201,19 @@ impl FileArray {
             }
         }
 
-        let mut inner = self.inner.lock();
-        let fd = inner.next_fd();
+        let file;
 
         if s_ischr(filemode) {
             let device = CharDevice::get(inode.devid()?).ok_or(ENXIO)?;
-            let file = device.open()?;
-            inner.do_insert(fd, fdflag as u64, file);
+            file = device.open()?;
         } else {
-            inner.do_insert(
-                fd,
-                fdflag as u64,
-                InodeFile::new(dentry, (can_read, can_write, append)),
-            );
+            file = InodeFile::new(dentry, (can_read, can_write, append));
         }
 
+        let mut inner = self.inner.lock();
+        let fd = inner.next_fd();
+        inner.do_insert(fd, fdflag as u64, file);
+
         Ok(fd)
     }
 

+ 5 - 3
src/kernel/vfs/inode.rs

@@ -1,5 +1,5 @@
 use super::{dentry::Dentry, s_isblk, s_ischr, vfs::Vfs, DevId, TimeSpec};
-use crate::{io::Buffer, prelude::*, sync::rwlock_new};
+use crate::{io::Buffer, prelude::*};
 use alloc::sync::{Arc, Weak};
 use bindings::{
     statx, EINVAL, EISDIR, ENOTDIR, EPERM, STATX_ATIME, STATX_BLOCKS, STATX_CTIME, STATX_GID,
@@ -12,6 +12,8 @@ use core::{
     ptr::addr_of_mut,
     sync::atomic::{AtomicU32, AtomicU64, Ordering},
 };
+use eonix_runtime::task::Task;
+use eonix_sync::RwLock;
 
 pub type Ino = u64;
 pub type AtomicIno = AtomicU64;
@@ -56,7 +58,7 @@ impl InodeData {
             atime: Spin::new(TimeSpec::default()),
             ctime: Spin::new(TimeSpec::default()),
             mtime: Spin::new(TimeSpec::default()),
-            rwsem: rwlock_new(()),
+            rwsem: RwLock::new(()),
             size: AtomicU64::new(0),
             nlink: AtomicNlink::new(0),
             uid: AtomicUid::new(0),
@@ -248,7 +250,7 @@ pub trait Inode: Send + Sync + InodeInner {
         f(
             uninit_mut.as_mut_ptr(),
             // SAFETY: `idata` is initialized and we will never move the lock.
-            &unsafe { idata.assume_init_ref() }.rwsem.read(),
+            &Task::block_on(unsafe { idata.assume_init_ref() }.rwsem.read()),
         );
 
         // Safety: `uninit` is initialized

+ 2 - 2
src/lib.rs

@@ -28,7 +28,7 @@ mod sync;
 use alloc::{ffi::CString, sync::Arc};
 use core::alloc::{GlobalAlloc, Layout};
 use elf::ParsedElf32;
-use eonix_runtime::{run::FutureRun, scheduler::Scheduler};
+use eonix_runtime::{run::FutureRun, scheduler::Scheduler, task::Task};
 use kernel::{
     cpu::init_localcpu,
     mem::Page,
@@ -178,7 +178,7 @@ async fn init_process(early_kstack_pfn: usize) {
 
     let thread_builder = ThreadBuilder::new().name(Arc::from(*b"busybox"));
 
-    let mut process_list = ProcessList::get().write();
+    let mut process_list = Task::block_on(ProcessList::get().write());
     let (thread, process) = ProcessBuilder::new()
         .mm_list(mm_list)
         .thread_builder(thread_builder)

+ 12 - 15
src/net/netdev.rs

@@ -1,5 +1,11 @@
-use crate::{bindings::root::EFAULT, prelude::*, sync::mutex_new};
-use alloc::{collections::btree_map::BTreeMap, sync::Arc};
+use core::sync::atomic::{AtomicU32, Ordering};
+
+use crate::kernel::constants::EFAULT;
+use alloc::{
+    collections::btree_map::{BTreeMap, Entry},
+    sync::Arc,
+};
+use eonix_sync::{Mutex, Spin};
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum LinkStatus {
@@ -50,26 +56,17 @@ impl Ord for dyn Netdev {
     }
 }
 
-static NETDEVS_ID: Spin<u32> = Spin::new(0);
+static NETDEVS_ID: AtomicU32 = AtomicU32::new(0);
 static NETDEVS: Spin<BTreeMap<u32, Arc<Mutex<dyn Netdev>>>> = Spin::new(BTreeMap::new());
 
 pub fn alloc_id() -> u32 {
-    let mut id = NETDEVS_ID.lock();
-    let retval = *id;
-
-    *id += 1;
-    retval
+    NETDEVS_ID.fetch_add(1, Ordering::SeqCst)
 }
 
 pub fn register_netdev(netdev: impl Netdev + 'static) -> Result<Arc<Mutex<dyn Netdev>>, u32> {
-    let devid = netdev.id();
-
-    let mut netdevs = NETDEVS.lock();
-
-    use alloc::collections::btree_map::Entry;
-    match netdevs.entry(devid) {
+    match NETDEVS.lock().entry(netdev.id()) {
         Entry::Vacant(entry) => {
-            let netdev = Arc::new(mutex_new(netdev));
+            let netdev = Arc::new(Mutex::new(netdev));
             entry.insert(netdev.clone());
             Ok(netdev)
         }

+ 11 - 12
src/rcu.rs

@@ -1,13 +1,12 @@
-use crate::{
-    prelude::*,
-    sync::{mutex_new, rwlock_new, RwLockReadGuard},
-};
+use crate::prelude::*;
 use alloc::sync::Arc;
 use core::{
     ops::Deref,
     ptr::NonNull,
     sync::atomic::{AtomicPtr, Ordering},
 };
+use eonix_runtime::task::Task;
+use eonix_sync::{Mutex, RwLock, RwLockReadGuard};
 use pointers::BorrowedArc;
 
 pub struct RCUReadGuard<'data, T: 'data> {
@@ -16,13 +15,13 @@ pub struct RCUReadGuard<'data, T: 'data> {
     _phantom: PhantomData<&'data T>,
 }
 
-static GLOBAL_RCU_SEM: RwLock<()> = rwlock_new(());
+static GLOBAL_RCU_SEM: RwLock<()> = RwLock::new(());
 
 impl<'data, T: 'data> RCUReadGuard<'data, T> {
     fn lock(value: T) -> Self {
         Self {
             value,
-            _guard: GLOBAL_RCU_SEM.read(),
+            _guard: Task::block_on(GLOBAL_RCU_SEM.read()),
             _phantom: PhantomData,
         }
     }
@@ -36,9 +35,9 @@ impl<'data, T: 'data> Deref for RCUReadGuard<'data, T> {
     }
 }
 
-pub fn rcu_sync() {
+pub async fn rcu_sync() {
     // Lock the global RCU semaphore to ensure that all readers are done.
-    let _ = GLOBAL_RCU_SEM.write();
+    let _ = GLOBAL_RCU_SEM.write().await;
 }
 
 pub trait RCUNode<MySelf> {
@@ -57,8 +56,8 @@ impl<T: RCUNode<T>> RCUList<T> {
     pub const fn new() -> Self {
         Self {
             head: AtomicPtr::new(core::ptr::null_mut()),
-            reader_lock: rwlock_new(()),
-            update_lock: mutex_new(()),
+            reader_lock: RwLock::new(()),
+            update_lock: Mutex::new(()),
         }
     }
 
@@ -147,7 +146,7 @@ impl<T: RCUNode<T>> RCUList<T> {
     }
 
     pub fn iter(&self) -> RCUIterator<T> {
-        let _lck = self.reader_lock.read();
+        let _lck = Task::block_on(self.reader_lock.read());
 
         RCUIterator {
             // SAFETY: We have a read lock, so the node is still alive.
@@ -233,7 +232,7 @@ impl<T> Drop for RCUPointer<T> {
         if let Some(arc) = unsafe { self.swap(None) } {
             // We only wait if there are other references.
             if Arc::strong_count(&arc) == 1 {
-                rcu_sync();
+                Task::block_on(rcu_sync());
             }
         }
     }