task.rs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. mod adapter;
  2. mod task_state;
  3. use crate::{
  4. executor::{Executor, OutputHandle},
  5. ready_queue::{cpu_rq, ReadyQueue},
  6. };
  7. use alloc::{sync::Arc, task::Wake};
  8. use atomic_unique_refcell::AtomicUniqueRefCell;
  9. use core::{
  10. ops::DerefMut,
  11. sync::atomic::{AtomicU32, Ordering},
  12. task::{Context, Poll, Waker},
  13. };
  14. use eonix_hal::processor::CPU;
  15. use eonix_sync::{Spin, SpinIrq};
  16. use intrusive_collections::{LinkedListAtomicLink, RBTreeAtomicLink};
  17. pub use adapter::{TaskAdapter, TaskRqAdapter};
  18. pub(crate) use task_state::TaskState;
  19. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
  20. pub struct TaskId(u32);
  21. pub struct TaskHandle<Output>
  22. where
  23. Output: Send,
  24. {
  25. pub(crate) task: Arc<Task>,
  26. pub(crate) output_handle: Arc<Spin<OutputHandle<Output>>>,
  27. }
  28. pub struct Task {
  29. /// Unique identifier of the task.
  30. pub id: TaskId,
  31. /// The last cpu that the task was executed on.
  32. /// If `on_rq` is `false`, we can't assume that this task is still on the cpu.
  33. pub(crate) cpu: AtomicU32,
  34. /// Task state.
  35. pub(crate) state: TaskState,
  36. /// Executor object.
  37. executor: AtomicUniqueRefCell<Executor>,
  38. /// Link in the global task list.
  39. link_task_list: RBTreeAtomicLink,
  40. /// Link in the ready queue.
  41. link_ready_queue: LinkedListAtomicLink,
  42. }
  43. impl Task {
  44. pub fn new<F>(future: F) -> TaskHandle<F::Output>
  45. where
  46. F: Future + Send + 'static,
  47. F::Output: Send + 'static,
  48. {
  49. static ID: AtomicU32 = AtomicU32::new(0);
  50. let (executor, output_handle) = Executor::new(future);
  51. let task = Arc::new(Self {
  52. id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
  53. cpu: AtomicU32::new(CPU::local().cpuid() as u32),
  54. state: TaskState::new(TaskState::BLOCKED),
  55. executor: AtomicUniqueRefCell::new(executor),
  56. link_task_list: RBTreeAtomicLink::new(),
  57. link_ready_queue: LinkedListAtomicLink::new(),
  58. });
  59. TaskHandle {
  60. task,
  61. output_handle,
  62. }
  63. }
  64. pub fn poll(self: &Arc<Self>) -> Poll<()> {
  65. let mut executor_borrow = self.executor.borrow();
  66. let waker = Waker::from(self.clone());
  67. let mut cx = Context::from_waker(&waker);
  68. executor_borrow.poll(&mut cx)
  69. }
  70. /// Get the stabilized lock for the task's run queue.
  71. pub fn rq(&self) -> impl DerefMut<Target = dyn ReadyQueue> + 'static {
  72. loop {
  73. let cpu = self.cpu.load(Ordering::Relaxed);
  74. let rq = cpu_rq(cpu as usize).lock_irq();
  75. // We stabilize the task cpu with the cpu rq here for now.
  76. if cpu != self.cpu.load(Ordering::Acquire) {
  77. continue;
  78. }
  79. return rq;
  80. }
  81. }
  82. pub fn is_ready(&self) -> bool {
  83. self.state.is_ready()
  84. }
  85. }
  86. impl Wake for Task {
  87. fn wake(self: Arc<Self>) {
  88. self.wake_by_ref();
  89. }
  90. fn wake_by_ref(self: &Arc<Self>) {
  91. let Ok(old) = self.state.update(|state| match state {
  92. TaskState::BLOCKED => Some(TaskState::READY),
  93. TaskState::RUNNING => Some(TaskState::READY | TaskState::RUNNING),
  94. TaskState::READY | TaskState::READY_RUNNING => None,
  95. state => unreachable!("Waking a {state:?} task"),
  96. }) else {
  97. return;
  98. };
  99. if old == TaskState::BLOCKED {
  100. // If the task was blocked, we need to put it back to the ready queue.
  101. self.rq().put(self.clone());
  102. }
  103. }
  104. }