executor.rs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. mod builder;
  2. mod execute_status;
  3. mod output_handle;
  4. mod stack;
  5. use crate::{
  6. run::{Contexted, Run, RunState},
  7. scheduler::Scheduler,
  8. task::Task,
  9. };
  10. use alloc::sync::Weak;
  11. use core::{
  12. pin::Pin,
  13. sync::atomic::{compiler_fence, fence, AtomicBool, Ordering},
  14. task::Waker,
  15. };
  16. use eonix_sync::Spin;
  17. pub use builder::ExecutorBuilder;
  18. pub use execute_status::ExecuteStatus;
  19. pub use output_handle::OutputHandle;
  20. pub use stack::Stack;
  21. /// An `Executor` executes a `Run` object in a separate thread of execution
  22. /// where we have a dedicated stack and context.
  23. pub trait Executor: Send {
  24. fn progress(&self) -> ExecuteStatus;
  25. }
  26. struct RealExecutor<S, R>
  27. where
  28. R: Run + Send + Contexted + 'static,
  29. R::Output: Send,
  30. {
  31. _stack: S,
  32. runnable: R,
  33. output_handle: Weak<Spin<OutputHandle<R::Output>>>,
  34. finished: AtomicBool,
  35. }
  36. impl<S, R> RealExecutor<S, R>
  37. where
  38. R: Run + Send + Contexted + 'static,
  39. R::Output: Send,
  40. {
  41. extern "C" fn execute(self: Pin<&Self>) -> ! {
  42. // We get here with preempt count == 1.
  43. eonix_preempt::enable();
  44. {
  45. let waker = Waker::from(Task::current().clone());
  46. let output_data = loop {
  47. // TODO!!!!!!: CHANGE THIS.
  48. let runnable_pointer = &raw const self.get_ref().runnable;
  49. // SAFETY: We don't move the runnable object and we MIGHT not be using the
  50. // part that is used in `pinned_run` in the runnable...?
  51. let mut pinned_runnable =
  52. unsafe { Pin::new_unchecked(&mut *(runnable_pointer as *mut R)) };
  53. match pinned_runnable.as_mut().run(&waker) {
  54. RunState::Finished(output) => break output,
  55. RunState::Running => Task::park(),
  56. }
  57. };
  58. if let Some(output_handle) = self.output_handle.upgrade() {
  59. output_handle.lock().commit_output(output_data);
  60. }
  61. }
  62. // SAFETY: We are on the same CPU as the task.
  63. self.finished.store(true, Ordering::Relaxed);
  64. unsafe {
  65. // SAFETY: `preempt::count()` == 1.
  66. eonix_preempt::disable();
  67. Scheduler::goto_scheduler_noreturn()
  68. }
  69. }
  70. }
  71. impl<S, R> Executor for RealExecutor<S, R>
  72. where
  73. S: Send,
  74. R: Run + Contexted + Send,
  75. R::Output: Send,
  76. {
  77. fn progress(&self) -> ExecuteStatus {
  78. // TODO!!!: If the task comes from another cpu, we need to sync.
  79. //
  80. // The other cpu should see the changes of kernel stack of the target thread
  81. // made in this cpu.
  82. //
  83. // Can we find a better way other than `fence`s?
  84. //
  85. // An alternative way is to use an atomic variable to store the cpu id of
  86. // the current task. Then we can use acquire release swap to ensure that the
  87. // other cpu sees the changes.
  88. fence(Ordering::SeqCst);
  89. compiler_fence(Ordering::SeqCst);
  90. // TODO!!!: We should load the context only if the previous task is
  91. // different from the current task.
  92. self.runnable.load_running_context();
  93. unsafe {
  94. // SAFETY: We are in the scheduler context and we are not preempted.
  95. Scheduler::go_from_scheduler(&Task::current().execution_context);
  96. }
  97. self.runnable.restore_running_context();
  98. compiler_fence(Ordering::SeqCst);
  99. fence(Ordering::SeqCst);
  100. if self.finished.load(Ordering::Acquire) {
  101. ExecuteStatus::Finished
  102. } else {
  103. ExecuteStatus::Executing
  104. }
  105. }
  106. }