executor.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. mod builder;
  2. mod execute_status;
  3. mod output_handle;
  4. mod stack;
  5. use crate::{
  6. run::{Contexted, PinRun, RunState},
  7. scheduler::Scheduler,
  8. task::Task,
  9. };
  10. use alloc::sync::Weak;
  11. use core::{
  12. pin::Pin,
  13. sync::atomic::{compiler_fence, fence, AtomicBool, Ordering},
  14. task::Waker,
  15. };
  16. use eonix_sync::Spin;
  17. pub use builder::ExecutorBuilder;
  18. pub use execute_status::ExecuteStatus;
  19. pub use output_handle::OutputHandle;
  20. pub use stack::Stack;
  21. /// An `Executor` executes a `PinRun` object in a separate thread of execution
  22. /// where we have a dedicated stack and context.
  23. pub trait Executor: Send {
  24. fn progress(&self) -> ExecuteStatus;
  25. }
  26. struct RealExecutor<S, R>
  27. where
  28. R: PinRun + Send + Contexted + 'static,
  29. R::Output: Send,
  30. {
  31. _stack: S,
  32. runnable: R,
  33. output_handle: Weak<Spin<OutputHandle<R::Output>>>,
  34. finished: AtomicBool,
  35. }
  36. impl<S, R> RealExecutor<S, R>
  37. where
  38. R: PinRun + Send + Contexted + 'static,
  39. R::Output: Send,
  40. {
  41. extern "C" fn execute(self: Pin<&Self>) -> ! {
  42. // We get here with preempt count == 1.
  43. eonix_preempt::enable();
  44. {
  45. let waker = Waker::from(Task::current().clone());
  46. let output_data = loop {
  47. // TODO!!!!!!: CHANGE THIS.
  48. let runnable_pointer = &raw const self.get_ref().runnable;
  49. // SAFETY: We don't move the runnable object and we MIGHT not be using the
  50. // part that is used in `pinned_run` in the runnable...?
  51. let mut pinned_runnable =
  52. unsafe { Pin::new_unchecked(&mut *(runnable_pointer as *mut R)) };
  53. match pinned_runnable.as_mut().pinned_run(&waker) {
  54. RunState::Finished(output) => break output,
  55. RunState::Running => {
  56. if Task::current().is_runnable() {
  57. continue;
  58. }
  59. // We need to set the preempt count to 0 to allow preemption.
  60. eonix_preempt::disable();
  61. // SAFETY: We are in the scheduler context and preemption is disabled.
  62. unsafe { Scheduler::goto_scheduler(&Task::current().execution_context) };
  63. eonix_preempt::enable();
  64. }
  65. }
  66. };
  67. if let Some(output_handle) = self.output_handle.upgrade() {
  68. output_handle.lock().commit_output(output_data);
  69. }
  70. }
  71. // SAFETY: We are on the same CPU as the task.
  72. self.finished.store(true, Ordering::Relaxed);
  73. unsafe {
  74. // SAFETY: `preempt::count()` == 1.
  75. eonix_preempt::disable();
  76. Scheduler::goto_scheduler_noreturn()
  77. }
  78. }
  79. }
  80. impl<S, R> Executor for RealExecutor<S, R>
  81. where
  82. S: Send,
  83. R: PinRun + Contexted + Send,
  84. R::Output: Send,
  85. {
  86. fn progress(&self) -> ExecuteStatus {
  87. // TODO!!!: If the task comes from another cpu, we need to sync.
  88. //
  89. // The other cpu should see the changes of kernel stack of the target thread
  90. // made in this cpu.
  91. //
  92. // Can we find a better way other than `fence`s?
  93. //
  94. // An alternative way is to use an atomic variable to store the cpu id of
  95. // the current task. Then we can use acquire release swap to ensure that the
  96. // other cpu sees the changes.
  97. fence(Ordering::SeqCst);
  98. compiler_fence(Ordering::SeqCst);
  99. // TODO!!!: We should load the context only if the previous task is
  100. // different from the current task.
  101. self.runnable.load_running_context();
  102. unsafe {
  103. // SAFETY: We are in the scheduler context and we are not preempted.
  104. Scheduler::go_from_scheduler(&Task::current().execution_context);
  105. }
  106. self.runnable.restore_running_context();
  107. compiler_fence(Ordering::SeqCst);
  108. fence(Ordering::SeqCst);
  109. if self.finished.load(Ordering::Acquire) {
  110. ExecuteStatus::Finished
  111. } else {
  112. ExecuteStatus::Executing
  113. }
  114. }
  115. }