Преглед на файлове

serial: improve irq handle. add worker support

greatbridf преди 10 месеца
родител
ревизия
b29d448a4f
променени са 9 файла, в които са добавени 226 реда и са изтрити 75 реда
  1. 110 32
      src/driver/serial.rs
  2. 23 0
      src/kernel/task/scheduler.rs
  3. 24 26
      src/kernel/task/signal.rs
  4. 50 8
      src/kernel/task/task.rs
  5. 4 0
      src/kernel/task/task/runnable.rs
  6. 5 0
      src/kernel/task/thread.rs
  7. 5 6
      src/kernel/terminal.rs
  8. 2 3
      src/kernel/timer.rs
  9. 3 0
      src/sync/condvar.rs

+ 110 - 32
src/driver/serial.rs

@@ -1,22 +1,39 @@
-use alloc::{format, sync::Arc};
+use core::task::Waker;
+
+use alloc::{collections::vec_deque::VecDeque, format, sync::Arc};
+use atomic_unique_refcell::AtomicUniqueRefCell;
 use bindings::EIO;
+use bitflags::bitflags;
 
 use crate::{
     kernel::{
-        block::make_device, interrupt::register_irq_handler, CharDevice, CharDeviceType, Console,
-        Terminal, TerminalDevice,
+        block::make_device,
+        interrupt::register_irq_handler,
+        task::{FutureRunnable, Scheduler, Task},
+        CharDevice, CharDeviceType, Console, Terminal, TerminalDevice,
     },
     prelude::*,
 };
 
 use super::Port8;
 
+bitflags! {
+    struct LineStatus: u8 {
+        const RX_READY = 0x01;
+        const TX_READY = 0x20;
+    }
+}
+
 #[allow(dead_code)]
 struct Serial {
     id: u32,
     name: Arc<str>,
 
-    terminal: Option<Arc<Terminal>>,
+    terminal: Spin<Option<Arc<Terminal>>>,
+    worker: AtomicUniqueRefCell<Option<Waker>>,
+
+    working: Spin<bool>,
+    tx_buffer: Spin<VecDeque<u8>>,
 
     tx_rx: Port8,
     int_ena: Port8,
@@ -37,14 +54,76 @@ impl Serial {
 
     fn enable_interrupts(&self) {
         // Enable interrupt #0: Received data available
-        self.int_ena.write(0x01);
+        self.int_ena.write(0x03);
+    }
+
+    fn disable_interrupts(&self) {
+        // Disable interrupt #0: Received data available
+        self.int_ena.write(0x00);
+    }
+
+    fn line_status(&self) -> LineStatus {
+        LineStatus::from_bits_truncate(self.line_status.read())
+    }
+
+    async fn wait_for_interrupt(&self) {
+        {
+            let mut working = self.working.lock_irq();
+            self.enable_interrupts();
+            *working = false;
+            Task::current().isleep();
+        }
+
+        Scheduler::sleep().await;
+
+        *self.working.lock_irq() = true;
+        self.disable_interrupts();
+    }
+
+    async fn worker(port: Arc<Self>) {
+        let terminal = port.terminal.lock().clone();
+
+        loop {
+            while port.line_status().contains(LineStatus::RX_READY) {
+                let ch = port.tx_rx.read();
+
+                if let Some(terminal) = terminal.as_ref() {
+                    terminal.commit_char(ch);
+                }
+            }
+
+            let should_wait = {
+                let mut tx_buffer = port.tx_buffer.lock();
+
+                // Give it a chance to receive data.
+                let count = tx_buffer.len().min(64);
+                for ch in tx_buffer.drain(..count) {
+                    if port.line_status().contains(LineStatus::TX_READY) {
+                        port.tx_rx.write(ch);
+                    } else {
+                        break;
+                    }
+                }
+
+                tx_buffer.is_empty()
+            };
+
+            if should_wait {
+                port.wait_for_interrupt().await;
+            } else {
+                Scheduler::yield_now().await;
+            }
+        }
     }
 
     pub fn new(id: u32, base_port: u16) -> KResult<Self> {
         let port = Self {
             id,
             name: Arc::from(format!("ttyS{id}")),
-            terminal: None,
+            terminal: Spin::new(None),
+            worker: AtomicUniqueRefCell::new(None),
+            working: Spin::new(true),
+            tx_buffer: Spin::new(VecDeque::new()),
             tx_rx: Port8::new(base_port),
             int_ena: Port8::new(base_port + 1),
             int_ident: Port8::new(base_port + 2),
@@ -73,24 +152,30 @@ impl Serial {
         Ok(port)
     }
 
-    fn irq_handler(&self) {
-        let terminal = self.terminal.as_ref();
-        while self.line_status.read() & 0x01 != 0 {
-            let ch = self.tx_rx.read();
-
-            if let Some(terminal) = terminal {
-                terminal.commit_char(ch);
-            }
+    fn wakeup_worker(&self) {
+        let working = self.working.lock_irq();
+        if !*working {
+            self.worker
+                .borrow()
+                .as_ref()
+                .expect("Worker not initialized")
+                .wake_by_ref();
         }
     }
 
+    fn irq_handler(&self) {
+        // Read the interrupt ID register to clear the interrupt.
+        self.int_ident.read();
+        self.wakeup_worker();
+    }
+
     fn register_char_device(port: Self) -> KResult<()> {
-        let mut port = Arc::new(port);
+        let port = Arc::new(port);
         let terminal = Terminal::new(port.clone());
+        let task = Task::new(FutureRunnable::new(Self::worker(port.clone())));
 
-        // TODO!!!!!!: This is unsafe, we should find a way to avoid this.
-        //             Under smp, we should make the publish of terminal atomic.
-        unsafe { Arc::get_mut_unchecked(&mut port) }.terminal = Some(terminal.clone());
+        port.terminal.lock().replace(terminal.clone());
+        port.worker.borrow().replace(task.waker());
 
         {
             let port = port.clone();
@@ -104,7 +189,8 @@ impl Serial {
                 port.irq_handler();
             })?;
         }
-        port.enable_interrupts();
+
+        Scheduler::get().spawn(task);
         dont_check!(Console::register_terminal(&terminal));
 
         CharDevice::register(
@@ -119,26 +205,18 @@ impl Serial {
 
 impl TerminalDevice for Serial {
     fn putchar(&self, ch: u8) {
-        loop {
-            // If we poll the status and get the corresponding bit, we should handle the action.
-            let status = self.line_status.read();
-            if status & 0x20 != 0 {
-                self.tx_rx.write(ch);
-                return;
-            }
-        }
+        let mut tx_buffer = self.tx_buffer.lock();
+        tx_buffer.push_back(ch);
+        self.wakeup_worker();
     }
 }
 
 pub fn init() -> KResult<()> {
-    let com0 = Serial::new(0, Serial::COM0_BASE);
-    let com1 = Serial::new(1, Serial::COM1_BASE);
-
-    if let Ok(port) = com0 {
+    if let Ok(port) = Serial::new(0, Serial::COM0_BASE) {
         Serial::register_char_device(port)?;
     }
 
-    if let Ok(port) = com1 {
+    if let Ok(port) = Serial::new(1, Serial::COM1_BASE) {
         Serial::register_char_device(port)?;
     }
 

+ 23 - 0
src/kernel/task/scheduler.rs

@@ -175,6 +175,26 @@ impl Scheduler {
 
         Yield(false).await
     }
+
+    pub async fn sleep() {
+        struct Sleep(bool);
+
+        impl Future for Sleep {
+            type Output = ();
+
+            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+                match *self {
+                    Sleep(true) => Poll::Ready(()),
+                    Sleep(false) => {
+                        self.set(Sleep(true));
+                        Poll::Pending
+                    }
+                }
+            }
+        }
+
+        Sleep(false).await
+    }
 }
 
 async fn idle_task() {
@@ -223,6 +243,9 @@ async fn idle_task() {
                     if task.is_runnable() {
                         rq.put(task);
                     } else {
+                        // TODO!!!!!!!!!: There is a race condition here if we reach here and there
+                        // is another thread waking the task up. They might read `on_rq` == true so
+                        // the task will never be waken up.
                         task.on_rq.store(false, Ordering::Release);
                     }
                 }

+ 24 - 26
src/kernel/task/signal.rs

@@ -78,7 +78,6 @@ struct SignalListInner {
 
 #[derive(Debug)]
 pub struct SignalList {
-    /// We might use this inside interrupt handler, so we need to use `lock_irq`.
     inner: Spin<SignalListInner>,
 }
 
@@ -266,12 +265,11 @@ impl SignalListInner {
                 self.stop_waker.take().map(|waker| waker.wake());
             }
             _ => {
-                let waker = self
-                    .signal_waker
+                // If we don't have a waker here, we might be at initialization step.
+                // We would run in the end anyway.
+                self.signal_waker
                     .as_ref()
-                    .expect("We should have a signal waker");
-
-                waker.wake_by_ref();
+                    .inspect(|waker| waker.wake_by_ref());
             }
         }
 
@@ -293,19 +291,19 @@ impl SignalList {
     }
 
     pub fn get_mask(&self) -> u64 {
-        self.inner.lock_irq().get_mask()
+        self.inner.lock().get_mask()
     }
 
     pub fn set_mask(&self, mask: u64) {
-        self.inner.lock_irq().set_mask(mask)
+        self.inner.lock().set_mask(mask)
     }
 
     pub fn mask(&self, mask: u64) {
-        self.inner.lock_irq().set_mask(mask)
+        self.inner.lock().set_mask(mask)
     }
 
     pub fn unmask(&self, mask: u64) {
-        self.inner.lock_irq().unmask(mask)
+        self.inner.lock().unmask(mask)
     }
 
     pub fn set_handler(&self, signal: Signal, action: &SignalAction) -> KResult<()> {
@@ -313,7 +311,7 @@ impl SignalList {
             return Err(EINVAL);
         }
 
-        let mut inner = self.inner.lock_irq();
+        let mut inner = self.inner.lock();
         if action.is_default() {
             inner.handlers.remove(&signal);
         } else {
@@ -325,7 +323,7 @@ impl SignalList {
 
     pub fn get_handler(&self, signal: Signal) -> SignalAction {
         self.inner
-            .lock_irq()
+            .lock()
             .handlers
             .get(&signal)
             .cloned()
@@ -334,7 +332,7 @@ impl SignalList {
 
     // TODO!!!: Find a better way.
     pub fn set_signal_waker(&self, waker: Waker) {
-        let mut inner = self.inner.lock_irq();
+        let mut inner = self.inner.lock();
         let old_waker = inner.signal_waker.replace(waker);
         assert!(old_waker.is_none(), "We should not have a waker here");
     }
@@ -343,7 +341,7 @@ impl SignalList {
     /// This is used when `execve` is called.
     pub fn clear_non_ignore(&self) {
         self.inner
-            .lock_irq()
+            .lock()
             .handlers
             .retain(|_, action| action.is_ignore());
     }
@@ -351,16 +349,16 @@ impl SignalList {
     /// Clear all pending signals.
     /// This is used when `fork` is called.
     pub fn clear_pending(&self) {
-        self.inner.lock_irq().pending.clear()
+        self.inner.lock().pending.clear()
     }
 
     pub fn has_pending_signal(&self) -> bool {
-        !self.inner.lock_irq().pending.is_empty()
+        !self.inner.lock().pending.is_empty()
     }
 
     /// Do not use this, use `Thread::raise` instead.
     pub(super) fn raise(&self, signal: Signal) -> RaiseResult {
-        self.inner.lock_irq().raise(signal)
+        self.inner.lock().raise(signal)
     }
 
     /// Handle signals in the context of `Thread::current()`.
@@ -371,27 +369,27 @@ impl SignalList {
     pub fn handle(&self, int_stack: &mut InterruptContext, ext_ctx: &mut ExtendedContext) {
         loop {
             let signal = {
-                let signal = match self.inner.lock_irq().pop() {
+                let signal = match self.inner.lock().pop() {
                     Some(signal) => signal,
                     None => return,
                 };
 
-                let handler = self.inner.lock_irq().handlers.get(&signal).cloned();
+                let handler = self.inner.lock().handlers.get(&signal).cloned();
                 if let Some(handler) = handler {
                     if !signal.is_now() {
                         let old_mask = {
-                            let mut inner = self.inner.lock_irq();
+                            let mut inner = self.inner.lock();
                             let old_mask = inner.mask;
                             inner.mask(handler.sa_mask as u64);
                             old_mask
                         };
                         let result = handler.handle(signal, old_mask, int_stack, ext_ctx);
                         if result.is_err() {
-                            self.inner.lock_irq().set_mask(old_mask);
+                            self.inner.lock().set_mask(old_mask);
                         }
                         match result {
-                            Err(EFAULT) => self.inner.lock_irq().raise(Signal::SIGSEGV),
-                            Err(_) => self.inner.lock_irq().raise(Signal::SIGSYS),
+                            Err(EFAULT) => self.inner.lock().raise(Signal::SIGSEGV),
+                            Err(_) => self.inner.lock().raise(Signal::SIGSYS),
                             Ok(()) => return,
                         };
                         continue;
@@ -403,7 +401,7 @@ impl SignalList {
                 // Default actions include stopping the thread, continuing the thread and
                 // terminating the process. All these actions will block the thread or return
                 // to the thread immediately. So we can unmask these signals now.
-                self.inner.lock_irq().unmask(signal.to_mask());
+                self.inner.lock().unmask(signal.to_mask());
                 signal
             };
 
@@ -426,7 +424,7 @@ impl SignalList {
                     // `SIGSTOP` can only be waken up by `SIGCONT` or `SIGKILL`.
                     // SAFETY: Preempt disabled above.
                     {
-                        let mut inner = self.inner.lock_irq();
+                        let mut inner = self.inner.lock();
                         let waker = Waker::from(Task::current().usleep());
                         let old_waker = inner.stop_waker.replace(waker);
                         assert!(old_waker.is_none(), "We should not have a waker here");
@@ -471,7 +469,7 @@ impl SignalList {
         *ext_ctx = UserPointer::<ExtendedContext>::new_vaddr(old_mmxregs_vaddr)?.read()?;
         *int_stack = UserPointer::<InterruptContext>::new_vaddr(old_int_stack_vaddr)?.read()?;
 
-        self.inner.lock_irq().set_mask(old_mask);
+        self.inner.lock().set_mask(old_mask);
         Ok(int_stack.rax as usize)
     }
 }

+ 50 - 8
src/kernel/task/task.rs

@@ -11,7 +11,7 @@ use kstack::KernelStack;
 use core::{
     future::Future,
     pin::Pin,
-    sync::atomic::{fence, AtomicBool, AtomicU32, Ordering},
+    sync::atomic::{compiler_fence, fence, AtomicBool, AtomicU32, Ordering},
     task::{Context, Poll, Waker},
 };
 
@@ -49,6 +49,15 @@ pub struct TaskOutput<Output: Send> {
     inner: TaskOutputState<Output>,
 }
 
+impl<Output> TaskHandle<Output>
+where
+    Output: Send,
+{
+    pub fn waker(&self) -> Waker {
+        Waker::from(self.task.clone())
+    }
+}
+
 impl<Output> TaskOutput<Output>
 where
     Output: Send,
@@ -137,7 +146,7 @@ impl TaskState {
 
     pub fn cmpxchg(&self, current: u32, new: u32) -> u32 {
         self.0
-            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
+            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Relaxed)
             .unwrap_or_else(|x| x)
     }
 
@@ -190,7 +199,12 @@ impl Task {
 
     pub(super) fn set_usleep(&self) {
         let prev_state = self.state.swap(TaskState::USLEEP);
-        assert_eq!(prev_state, TaskState::RUNNING);
+        assert_eq!(
+            prev_state,
+            TaskState::RUNNING,
+            "Trying to set task {} usleep that is not running",
+            self.id.0
+        );
     }
 
     pub fn usleep(self: &Arc<Self>) -> Arc<UniqueWaker> {
@@ -202,8 +216,14 @@ impl Task {
 
     pub fn isleep(self: &Arc<Self>) -> Arc<Self> {
         // No need to dequeue. We have proved that the task is running so not in the queue.
-        let prev_state = self.state.swap(TaskState::ISLEEP);
-        assert_eq!(prev_state, TaskState::RUNNING);
+        let prev_state = self.state.cmpxchg(TaskState::RUNNING, TaskState::ISLEEP);
+
+        assert_eq!(
+            prev_state,
+            TaskState::RUNNING,
+            "Trying to sleep task {} that is not running",
+            self.id.0
+        );
 
         self.clone()
     }
@@ -227,14 +247,33 @@ impl Task {
         // We get here with preempt count == 1.
         preempt::enable();
 
-        let output = Weak::from_raw(output);
         let executor = unsafe { executor.get_unchecked_mut() };
-        let runnable = unsafe { Pin::new_unchecked(&mut executor.runnable) };
+        let mut runnable = unsafe { Pin::new_unchecked(&mut executor.runnable) };
 
         {
             let waker = Waker::from(Task::current().clone());
-            let output_data = runnable.pinned_join(&waker);
 
+            let output_data = loop {
+                match runnable.as_mut().pinned_run(&waker) {
+                    RunState::Finished(output) => break output,
+                    RunState::Running => {
+                        if Task::current().is_runnable() {
+                            continue;
+                        }
+
+                        // We need to set the preempt count to 0 to allow preemption.
+                        preempt::disable();
+                        compiler_fence(Ordering::Release);
+
+                        Task::switch(&Task::current(), &Task::idle());
+
+                        compiler_fence(Ordering::Acquire);
+                        preempt::enable();
+                    }
+                }
+            };
+
+            let output = Weak::from_raw(output);
             if let Some(output) = output.upgrade() {
                 output.lock().commit_output(output_data);
             }
@@ -341,6 +380,8 @@ where
 
         fence(Ordering::SeqCst);
 
+        executor.runnable.restore_running_context();
+
         if executor.finished.load(Ordering::Relaxed) {
             return Poll::Ready(());
         }
@@ -362,6 +403,7 @@ where
 
 impl<F: Future + 'static> Contexted for FutureRunnable<F> {
     fn load_running_context(&mut self) {}
+    fn restore_running_context(&mut self) {}
 }
 
 impl<F: Future + 'static> PinRunnable for FutureRunnable<F> {

+ 4 - 0
src/kernel/task/task/runnable.rs

@@ -9,6 +9,10 @@ pub trait Contexted {
     /// # Safety
     /// This function will be called in a preemption disabled context.
     fn load_running_context(&mut self);
+
+    /// # Safety
+    /// This function will be called in a preemption disabled context.
+    fn restore_running_context(&mut self);
 }
 
 pub trait Runnable {

+ 5 - 0
src/kernel/task/thread.rs

@@ -12,6 +12,7 @@ use crate::{
 };
 
 use alloc::sync::Arc;
+use bindings::KERNEL_PML4;
 
 use super::{
     signal::{RaiseResult, Signal, SignalList},
@@ -340,6 +341,10 @@ impl Contexted for ThreadRunnable {
             thread.load_thread_area32();
         }
     }
+
+    fn restore_running_context(&mut self) {
+        arch::set_root_page_table(KERNEL_PML4 as usize);
+    }
 }
 
 impl PinRunnable for ThreadRunnable {

+ 5 - 6
src/kernel/terminal.rs

@@ -358,8 +358,7 @@ struct TerminalInner {
 }
 
 pub struct Terminal {
-    /// Lock with IRQ disabled. We might use this in IRQ context.
-    inner: Spin<TerminalInner>,
+    inner: Mutex<TerminalInner>,
     device: Arc<dyn TerminalDevice>,
     cv: CondVar,
 }
@@ -401,7 +400,7 @@ impl core::fmt::Debug for Terminal {
 impl Terminal {
     pub fn new(device: Arc<dyn TerminalDevice>) -> Arc<Self> {
         Arc::new(Self {
-            inner: Spin::new(TerminalInner {
+            inner: Mutex::new(TerminalInner {
                 termio: Termios::new_standard(),
                 session: Weak::new(),
                 buffer: VecDeque::with_capacity(BUFFER_SIZE),
@@ -486,7 +485,7 @@ impl Terminal {
 
     // TODO: Find a better way to handle this.
     pub fn commit_char(&self, ch: u8) {
-        let mut inner = self.inner.lock_irq();
+        let mut inner = self.inner.lock();
         if inner.termio.isig() {
             match ch {
                 0xff => {}
@@ -534,7 +533,7 @@ impl Terminal {
     }
 
     pub fn poll_in(&self) -> KResult<()> {
-        let mut inner = self.inner.lock_irq();
+        let mut inner = self.inner.lock();
         if inner.buffer.is_empty() {
             self.cv.wait(&mut inner);
 
@@ -553,7 +552,7 @@ impl Terminal {
                 break 'block &tmp_buffer[..0];
             }
 
-            let mut inner = self.inner.lock_irq();
+            let mut inner = self.inner.lock();
             if inner.buffer.is_empty() {
                 self.cv.wait(&mut inner);
 

+ 2 - 3
src/kernel/timer.rs

@@ -28,14 +28,13 @@ impl Ticks {
 }
 
 pub fn timer_interrupt() {
+    end_of_interrupt();
     TICKS.fetch_add(1, Ordering::Relaxed);
+
     if preempt::count() == 0 {
         // To make scheduler satisfied.
         preempt::disable();
-        end_of_interrupt();
         Scheduler::schedule();
-    } else {
-        end_of_interrupt();
     }
 }
 

+ 3 - 0
src/sync/condvar.rs

@@ -83,7 +83,10 @@ impl<const I: bool> CondVar<I> {
         // If the flag is already set, we don't need to sleep.
 
         unsafe { guard.force_unlock() };
+
+        might_sleep!(1);
         Scheduler::schedule();
+
         unsafe { guard.force_relock() };
 
         assert!(Task::current().is_runnable());