rcu.rs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. use core::{
  2. ops::Deref,
  3. ptr::NonNull,
  4. sync::atomic::{AtomicPtr, Ordering},
  5. };
  6. use crate::{
  7. prelude::*,
  8. sync::{lock::Guard, semaphore::RwSemaphoreStrategy},
  9. };
  10. use alloc::sync::Arc;
  11. use lazy_static::lazy_static;
  12. pub struct RCUReadGuard<'data, T: 'data> {
  13. value: T,
  14. guard: Guard<'data, (), RwSemaphoreStrategy, false>,
  15. _phantom: PhantomData<&'data T>,
  16. }
  17. lazy_static! {
  18. static ref GLOBAL_RCU_SEM: RwSemaphore<()> = RwSemaphore::new(());
  19. }
  20. impl<'data, T: 'data> RCUReadGuard<'data, T> {
  21. fn lock(value: T) -> Self {
  22. Self {
  23. value,
  24. guard: GLOBAL_RCU_SEM.lock_shared(),
  25. _phantom: PhantomData,
  26. }
  27. }
  28. }
  29. impl<'data, T: 'data> Deref for RCUReadGuard<'data, T> {
  30. type Target = T;
  31. fn deref(&self) -> &Self::Target {
  32. &self.value
  33. }
  34. }
  35. pub fn rcu_sync() {
  36. GLOBAL_RCU_SEM.lock();
  37. }
  38. pub trait RCUNode<MySelf> {
  39. fn rcu_prev(&self) -> &AtomicPtr<MySelf>;
  40. fn rcu_next(&self) -> &AtomicPtr<MySelf>;
  41. }
  42. pub struct RCUList<T: RCUNode<T>> {
  43. head: AtomicPtr<T>,
  44. reader_lock: RwSemaphore<()>,
  45. update_lock: Mutex<()>,
  46. }
  47. impl<T: RCUNode<T>> RCUList<T> {
  48. pub fn new() -> Self {
  49. Self {
  50. head: AtomicPtr::new(core::ptr::null_mut()),
  51. reader_lock: RwSemaphore::new(()),
  52. update_lock: Mutex::new(()),
  53. }
  54. }
  55. pub fn insert(&self, new_node: Arc<T>) {
  56. let _lck = self.update_lock.lock();
  57. let old_head = self.head.load(Ordering::Acquire);
  58. new_node
  59. .rcu_prev()
  60. .store(core::ptr::null_mut(), Ordering::Release);
  61. new_node.rcu_next().store(old_head, Ordering::Release);
  62. if let Some(old_head) = unsafe { old_head.as_ref() } {
  63. old_head
  64. .rcu_prev()
  65. .store(Arc::into_raw(new_node.clone()) as *mut _, Ordering::Release);
  66. }
  67. self.head
  68. .store(Arc::into_raw(new_node) as *mut _, Ordering::Release);
  69. }
  70. pub fn remove(&self, node: &Arc<T>) {
  71. let _lck = self.update_lock.lock();
  72. let prev = node.rcu_prev().load(Ordering::Acquire);
  73. let next = node.rcu_next().load(Ordering::Acquire);
  74. if let Some(next) = unsafe { next.as_ref() } {
  75. let me = next.rcu_prev().swap(prev, Ordering::AcqRel);
  76. debug_assert!(me == Arc::as_ptr(&node) as *mut _);
  77. unsafe { Arc::from_raw(me) };
  78. }
  79. {
  80. let prev_next =
  81. unsafe { prev.as_ref().map(|rcu| rcu.rcu_next()) }.unwrap_or(&self.head);
  82. let me = prev_next.swap(next, Ordering::AcqRel);
  83. debug_assert!(me == Arc::as_ptr(&node) as *mut _);
  84. unsafe { Arc::from_raw(me) };
  85. }
  86. let _lck = self.reader_lock.lock();
  87. node.rcu_prev()
  88. .store(core::ptr::null_mut(), Ordering::Release);
  89. node.rcu_next()
  90. .store(core::ptr::null_mut(), Ordering::Release);
  91. }
  92. pub fn replace(&self, old_node: &Arc<T>, new_node: Arc<T>) {
  93. let _lck = self.update_lock.lock();
  94. let prev = old_node.rcu_prev().load(Ordering::Acquire);
  95. let next = old_node.rcu_next().load(Ordering::Acquire);
  96. new_node.rcu_prev().store(prev, Ordering::Release);
  97. new_node.rcu_next().store(next, Ordering::Release);
  98. {
  99. let prev_next =
  100. unsafe { prev.as_ref().map(|rcu| rcu.rcu_next()) }.unwrap_or(&self.head);
  101. let old = prev_next.swap(Arc::into_raw(new_node.clone()) as *mut _, Ordering::AcqRel);
  102. debug_assert!(old == Arc::as_ptr(&old_node) as *mut _);
  103. unsafe { Arc::from_raw(old) };
  104. }
  105. if let Some(next) = unsafe { next.as_ref() } {
  106. let old = next
  107. .rcu_prev()
  108. .swap(Arc::into_raw(new_node.clone()) as *mut _, Ordering::AcqRel);
  109. debug_assert!(old == Arc::as_ptr(&old_node) as *mut _);
  110. unsafe { Arc::from_raw(old) };
  111. }
  112. let _lck = self.reader_lock.lock();
  113. old_node
  114. .rcu_prev()
  115. .store(core::ptr::null_mut(), Ordering::Release);
  116. old_node
  117. .rcu_next()
  118. .store(core::ptr::null_mut(), Ordering::Release);
  119. }
  120. pub fn iter(&self) -> RCUIterator<T> {
  121. let _lck = self.reader_lock.lock_shared();
  122. RCUIterator {
  123. // SAFETY: We have a read lock, so the node is still alive.
  124. cur: self.head.load(Ordering::SeqCst),
  125. _lock: _lck,
  126. }
  127. }
  128. }
  129. pub struct RCUIterator<'lt, T: RCUNode<T>> {
  130. cur: *const T,
  131. _lock: Guard<'lt, (), RwSemaphoreStrategy, false>,
  132. }
  133. impl<'lt, T: RCUNode<T>> Iterator for RCUIterator<'lt, T> {
  134. type Item = BorrowedArc<'lt, T>;
  135. fn next(&mut self) -> Option<Self::Item> {
  136. match unsafe { self.cur.as_ref() } {
  137. None => None,
  138. Some(real) => {
  139. // SAFETY: We have a read lock, so the node is still alive.
  140. let ret = self.cur;
  141. self.cur = real.rcu_next().load(Ordering::SeqCst);
  142. Some(BorrowedArc::from_raw(ret))
  143. }
  144. }
  145. }
  146. }
  147. pub struct RCUPointer<T>(AtomicPtr<T>);
  148. impl<T: core::fmt::Debug> core::fmt::Debug for RCUPointer<T> {
  149. fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
  150. match NonNull::new(self.0.load(Ordering::Acquire)) {
  151. Some(pointer) => {
  152. let borrowed = BorrowedArc::from_raw(pointer.as_ptr());
  153. f.write_str("RCUPointer of ")?;
  154. borrowed.fmt(f)
  155. }
  156. None => f.debug_tuple("NULL RCUPointer").finish(),
  157. }
  158. }
  159. }
  160. impl<T> RCUPointer<T> {
  161. pub fn new_with(value: Arc<T>) -> Self {
  162. Self(AtomicPtr::new(Arc::into_raw(value) as *mut _))
  163. }
  164. pub fn empty() -> Self {
  165. Self(AtomicPtr::new(core::ptr::null_mut()))
  166. }
  167. pub fn load<'lt>(&self) -> Option<RCUReadGuard<'lt, BorrowedArc<'lt, T>>> {
  168. let ptr = self.0.load(Ordering::Acquire);
  169. if ptr.is_null() {
  170. None
  171. } else {
  172. Some(RCUReadGuard::lock(BorrowedArc::from_raw(ptr)))
  173. }
  174. }
  175. /// # Safety
  176. /// Caller must ensure no writers are updating the pointer.
  177. pub unsafe fn load_locked<'lt>(&self) -> Option<BorrowedArc<'lt, T>> {
  178. let ptr = self.0.load(Ordering::Acquire);
  179. if ptr.is_null() {
  180. None
  181. } else {
  182. Some(BorrowedArc::from_raw(ptr))
  183. }
  184. }
  185. /// # Safety
  186. /// Caller must ensure that the actual pointer is freed after all readers are done.
  187. pub unsafe fn swap(&self, new: Option<Arc<T>>) -> Option<Arc<T>> {
  188. let new = new
  189. .map(|arc| Arc::into_raw(arc) as *mut T)
  190. .unwrap_or(core::ptr::null_mut());
  191. let old = self.0.swap(new, Ordering::AcqRel);
  192. if old.is_null() {
  193. None
  194. } else {
  195. Some(unsafe { Arc::from_raw(old) })
  196. }
  197. }
  198. }
  199. impl<T> Drop for RCUPointer<T> {
  200. fn drop(&mut self) {
  201. // SAFETY: We call `rcu_sync()` to ensure that all readers are done.
  202. if let Some(arc) = unsafe { self.swap(None) } {
  203. // We only wait if there are other references.
  204. if Arc::strong_count(&arc) == 1 {
  205. rcu_sync();
  206. }
  207. }
  208. }
  209. }