task.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. mod adapter;
  2. mod task_state;
  3. use crate::{
  4. context::ExecutionContext,
  5. executor::{ExecuteStatus, Executor, ExecutorBuilder, OutputHandle, Stack},
  6. run::{Contexted, Run},
  7. scheduler::Scheduler,
  8. };
  9. use alloc::{boxed::Box, sync::Arc, task::Wake};
  10. use atomic_unique_refcell::AtomicUniqueRefCell;
  11. use core::{
  12. pin::{pin, Pin},
  13. sync::atomic::{AtomicBool, AtomicU32, Ordering},
  14. task::{Context, Poll, Waker},
  15. };
  16. use eonix_preempt::assert_preempt_enabled;
  17. use eonix_sync::Spin;
  18. use intrusive_collections::RBTreeAtomicLink;
  19. use task_state::TaskState;
  20. pub use adapter::TaskAdapter;
  21. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
  22. pub struct TaskId(u32);
  23. pub struct TaskHandle<Output>
  24. where
  25. Output: Send,
  26. {
  27. pub(crate) task: Arc<Task>,
  28. pub(crate) output_handle: Arc<Spin<OutputHandle<Output>>>,
  29. }
  30. /// A `Task` represents a schedulable unit.
  31. ///
  32. /// Initial: state = Running, unparked = false
  33. ///
  34. /// Task::park() => swap state <- Parking, assert prev == Running
  35. /// => swap unparked <- false
  36. /// -> true => store state <- Running => return
  37. /// -> false => goto scheduler => get rq lock => load state
  38. /// -> Running => enqueue
  39. /// -> Parking => cmpxchg Parking -> Parked
  40. /// -> Running => enqueue
  41. /// -> Parking => on_rq <- false
  42. /// -> Parked => ???
  43. ///
  44. /// Task::unpark() => swap unparked <- true
  45. /// -> true => return
  46. /// -> false => swap state <- Running
  47. /// -> Running => return
  48. /// -> Parking | Parked => Scheduler::activate
  49. pub struct Task {
  50. /// Unique identifier of the task.
  51. pub id: TaskId,
  52. /// Whether the task is on some run queue (a.k.a ready).
  53. pub(crate) on_rq: AtomicBool,
  54. /// Whether someone has called `unpark` on this task.
  55. pub(crate) unparked: AtomicBool,
  56. /// The last cpu that the task was executed on.
  57. /// If `on_rq` is `false`, we can't assume that this task is still on the cpu.
  58. pub(crate) cpu: AtomicU32,
  59. /// Task state.
  60. pub(crate) state: TaskState,
  61. /// Task execution context.
  62. pub(crate) execution_context: ExecutionContext,
  63. /// Executor object.
  64. executor: AtomicUniqueRefCell<Option<Pin<Box<dyn Executor>>>>,
  65. /// Link in the global task list.
  66. link_task_list: RBTreeAtomicLink,
  67. }
  68. impl Task {
  69. pub fn new<S, R>(runnable: R) -> TaskHandle<R::Output>
  70. where
  71. S: Stack + 'static,
  72. R: Run + Contexted + Send + 'static,
  73. R::Output: Send + 'static,
  74. {
  75. static ID: AtomicU32 = AtomicU32::new(0);
  76. let (executor, execution_context, output) = ExecutorBuilder::new()
  77. .stack(S::new())
  78. .runnable(runnable)
  79. .build();
  80. let task = Arc::new(Self {
  81. id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
  82. on_rq: AtomicBool::new(false),
  83. unparked: AtomicBool::new(false),
  84. cpu: AtomicU32::new(0),
  85. state: TaskState::new(TaskState::RUNNING),
  86. executor: AtomicUniqueRefCell::new(Some(executor)),
  87. execution_context,
  88. link_task_list: RBTreeAtomicLink::new(),
  89. });
  90. TaskHandle {
  91. task,
  92. output_handle: output,
  93. }
  94. }
  95. pub fn run(&self) -> ExecuteStatus {
  96. let mut executor_borrow = self.executor.borrow();
  97. let executor = executor_borrow
  98. .as_ref()
  99. .expect("Executor should be present")
  100. .as_ref()
  101. .get_ref();
  102. if let ExecuteStatus::Finished = executor.progress() {
  103. executor_borrow.take();
  104. ExecuteStatus::Finished
  105. } else {
  106. ExecuteStatus::Executing
  107. }
  108. }
  109. pub fn unpark(self: &Arc<Self>) {
  110. if self.unparked.swap(true, Ordering::Release) {
  111. return;
  112. }
  113. eonix_preempt::disable();
  114. match self.state.swap(TaskState::RUNNING) {
  115. TaskState::RUNNING => {}
  116. TaskState::PARKED | TaskState::PARKING => {
  117. // We are waking up from sleep or someone else is parking this task.
  118. // Try to wake it up.
  119. Scheduler::get().activate(self);
  120. }
  121. _ => unreachable!(),
  122. }
  123. eonix_preempt::enable();
  124. }
  125. pub fn park() {
  126. eonix_preempt::disable();
  127. Self::park_preempt_disabled();
  128. }
  129. /// Park the current task with `preempt::count() == 1`.
  130. pub fn park_preempt_disabled() {
  131. let task = Task::current();
  132. let old_state = task.state.swap(TaskState::PARKING);
  133. assert_eq!(
  134. old_state,
  135. TaskState::RUNNING,
  136. "Parking a task that is not running."
  137. );
  138. if task.unparked.swap(false, Ordering::AcqRel) {
  139. // Someone has called `unpark` on this task previously.
  140. task.state.swap(TaskState::RUNNING);
  141. } else {
  142. unsafe {
  143. // SAFETY: Preemption is disabled.
  144. Scheduler::goto_scheduler(&Task::current().execution_context)
  145. };
  146. assert!(task.unparked.swap(false, Ordering::Acquire));
  147. }
  148. eonix_preempt::enable();
  149. }
  150. pub fn block_on<F>(future: F) -> F::Output
  151. where
  152. F: Future,
  153. {
  154. assert_preempt_enabled!("block_on() must be called with preemption enabled");
  155. let waker = Waker::from(Task::current().clone());
  156. let mut context = Context::from_waker(&waker);
  157. let mut future = pin!(future);
  158. loop {
  159. if let Poll::Ready(output) = future.as_mut().poll(&mut context) {
  160. break output;
  161. }
  162. Task::park();
  163. }
  164. }
  165. }
  166. impl Wake for Task {
  167. fn wake(self: Arc<Self>) {
  168. self.wake_by_ref();
  169. }
  170. fn wake_by_ref(self: &Arc<Self>) {
  171. self.unpark();
  172. }
  173. }