executor.rs 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // mod builder;
  2. mod output_handle;
  3. mod stack;
  4. use alloc::{
  5. boxed::Box,
  6. sync::{Arc, Weak},
  7. };
  8. use core::{
  9. marker::PhantomData,
  10. pin::Pin,
  11. task::{Context, Poll},
  12. };
  13. use eonix_sync::Spin;
  14. pub use output_handle::OutputHandle;
  15. pub use stack::Stack;
  16. /// An `Executor` executes a Future object in a separate thread of execution.
  17. ///
  18. /// When the Future is finished, the `Executor` will call the `OutputHandle` to commit the output.
  19. /// Then the `Executor` will release the resources associated with the Future.
  20. pub struct Executor(Option<Pin<Box<dyn TypeErasedExecutor>>>);
  21. trait TypeErasedExecutor: Send {
  22. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
  23. }
  24. struct RealExecutor<'a, F>
  25. where
  26. F: Future + Send + 'a,
  27. F::Output: Send + 'a,
  28. {
  29. future: F,
  30. output_handle: Weak<Spin<OutputHandle<F::Output>>>,
  31. _phantom: PhantomData<&'a ()>,
  32. }
  33. impl<F> TypeErasedExecutor for RealExecutor<'_, F>
  34. where
  35. F: Future + Send,
  36. F::Output: Send,
  37. {
  38. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
  39. if self.output_handle.as_ptr().is_null() {
  40. return Poll::Ready(());
  41. }
  42. let future = unsafe {
  43. // SAFETY: We don't move the future.
  44. self.as_mut().map_unchecked_mut(|me| &mut me.future)
  45. };
  46. future.poll(cx).map(|output| {
  47. if let Some(output_handle) = self.output_handle.upgrade() {
  48. output_handle.lock().commit_output(output);
  49. unsafe {
  50. // SAFETY: `output_handle` is Unpin.
  51. self.get_unchecked_mut().output_handle = Weak::new();
  52. }
  53. }
  54. })
  55. }
  56. }
  57. impl Executor {
  58. pub fn new<F>(future: F) -> (Self, Arc<Spin<OutputHandle<F::Output>>>)
  59. where
  60. F: Future + Send + 'static,
  61. F::Output: Send + 'static,
  62. {
  63. let output_handle = OutputHandle::new();
  64. (
  65. Executor(Some(Box::pin(RealExecutor {
  66. future,
  67. output_handle: Arc::downgrade(&output_handle),
  68. _phantom: PhantomData,
  69. }))),
  70. output_handle,
  71. )
  72. }
  73. pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
  74. if let Some(executor) = self.0.as_mut() {
  75. executor.as_mut().poll(cx).map(|_| {
  76. self.0.take();
  77. })
  78. } else {
  79. Poll::Ready(())
  80. }
  81. }
  82. }