page_cache.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. use super::{paging::AllocZeroed, Page};
  2. use crate::{
  3. io::{Buffer, FillResult, Stream},
  4. kernel::mem::page_alloc::RawPagePtr,
  5. prelude::KResult,
  6. GlobalPageAlloc,
  7. };
  8. use align_ext::AlignExt;
  9. use alloc::boxed::Box;
  10. use alloc::{collections::btree_map::BTreeMap, sync::Weak};
  11. use async_trait::async_trait;
  12. use core::{future::Future, mem::ManuallyDrop};
  13. use eonix_hal::mm::ArchPhysAccess;
  14. use eonix_mm::{
  15. address::{PAddr, PhysAccess},
  16. paging::{PageAlloc, RawPage, PAGE_SIZE, PAGE_SIZE_BITS, PFN},
  17. };
  18. use eonix_sync::Mutex;
  19. pub struct PageCache {
  20. pages: Mutex<BTreeMap<usize, CachePage>>,
  21. backend: Weak<dyn PageCacheBackend>,
  22. }
  23. unsafe impl Send for PageCache {}
  24. unsafe impl Sync for PageCache {}
  25. #[derive(Clone, Copy)]
  26. pub struct CachePage(RawPagePtr);
  27. unsafe impl Send for CachePage {}
  28. impl Buffer for CachePage {
  29. fn total(&self) -> usize {
  30. PAGE_SIZE
  31. }
  32. fn wrote(&self) -> usize {
  33. self.valid_size()
  34. }
  35. fn fill(&mut self, data: &[u8]) -> KResult<FillResult> {
  36. let valid_size = self.valid_size();
  37. let available = &mut self.all_mut()[valid_size..];
  38. if available.len() == 0 {
  39. return Ok(FillResult::Full);
  40. }
  41. let len = core::cmp::min(data.len(), available.len());
  42. available[..len].copy_from_slice(&data[..len]);
  43. *self.0.valid_size() += len;
  44. if len < data.len() {
  45. Ok(FillResult::Partial(len))
  46. } else {
  47. Ok(FillResult::Done(len))
  48. }
  49. }
  50. }
  51. impl CachePage {
  52. pub fn new() -> Self {
  53. let page = GlobalPageAlloc.alloc().unwrap();
  54. page.cache_init();
  55. Self(page)
  56. }
  57. pub fn new_zeroed() -> Self {
  58. let page = Page::zeroed();
  59. let raw_page_ptr = RawPagePtr::from(page.into_raw());
  60. raw_page_ptr.cache_init();
  61. Self(raw_page_ptr)
  62. }
  63. pub fn valid_size(&self) -> usize {
  64. *self.0.valid_size()
  65. }
  66. pub fn set_valid_size(&mut self, valid_size: usize) {
  67. *self.0.valid_size() = valid_size;
  68. }
  69. pub fn all(&self) -> &[u8] {
  70. unsafe {
  71. core::slice::from_raw_parts(
  72. // SAFETY: The page is exclusively owned by us, so we can safely access its data.
  73. ArchPhysAccess::as_ptr(PAddr::from(PFN::from(self.0))).as_ptr(),
  74. PAGE_SIZE,
  75. )
  76. }
  77. }
  78. pub fn all_mut(&mut self) -> &mut [u8] {
  79. unsafe {
  80. core::slice::from_raw_parts_mut(
  81. // SAFETY: The page is exclusively owned by us, so we can safely access its data.
  82. ArchPhysAccess::as_ptr(PAddr::from(PFN::from(self.0))).as_ptr(),
  83. PAGE_SIZE,
  84. )
  85. }
  86. }
  87. pub fn valid_data(&self) -> &[u8] {
  88. &self.all()[..self.valid_size()]
  89. }
  90. pub fn is_dirty(&self) -> bool {
  91. self.0.is_dirty()
  92. }
  93. pub fn set_dirty(&self) {
  94. self.0.set_dirty();
  95. }
  96. pub fn clear_dirty(&self) {
  97. self.0.clear_dirty();
  98. }
  99. }
  100. impl PageCache {
  101. pub fn new(backend: Weak<dyn PageCacheBackend>) -> Self {
  102. Self {
  103. pages: Mutex::new(BTreeMap::new()),
  104. backend: backend,
  105. }
  106. }
  107. pub async fn read(&self, buffer: &mut dyn Buffer, mut offset: usize) -> KResult<usize> {
  108. let mut pages = self.pages.lock().await;
  109. let size = self.backend.upgrade().unwrap().size();
  110. loop {
  111. if offset >= size {
  112. break;
  113. }
  114. let page_id = offset >> PAGE_SIZE_BITS;
  115. let page = pages.get(&page_id);
  116. match page {
  117. Some(page) => {
  118. let inner_offset = offset % PAGE_SIZE;
  119. let available_in_file = size.saturating_sub(offset);
  120. // TODO: still cause unnecessary IO if valid_size < PAGESIZE
  121. // and fill result is Done
  122. let page_data = &page.valid_data()[inner_offset..];
  123. let read_size = page_data.len().min(available_in_file);
  124. if read_size == 0
  125. || buffer.fill(&page_data[..read_size])?.should_stop()
  126. || buffer.available() == 0
  127. {
  128. break;
  129. }
  130. offset += read_size;
  131. }
  132. None => {
  133. let mut new_page = CachePage::new();
  134. self.backend
  135. .upgrade()
  136. .unwrap()
  137. .read_page(&mut new_page, offset.align_down(PAGE_SIZE))
  138. .await?;
  139. pages.insert(page_id, new_page);
  140. }
  141. }
  142. }
  143. Ok(buffer.wrote())
  144. }
  145. pub async fn write(&self, stream: &mut dyn Stream, mut offset: usize) -> KResult<usize> {
  146. let mut pages = self.pages.lock().await;
  147. let old_size = self.backend.upgrade().unwrap().size();
  148. let mut wrote = 0;
  149. loop {
  150. let page_id = offset >> PAGE_SIZE_BITS;
  151. let page = pages.get_mut(&page_id);
  152. match page {
  153. Some(page) => {
  154. let inner_offset = offset % PAGE_SIZE;
  155. let cursor_end = match stream.poll_data(&mut page.all_mut()[inner_offset..])? {
  156. Some(buf) => {
  157. wrote += buf.len();
  158. inner_offset + buf.len()
  159. }
  160. None => {
  161. break;
  162. }
  163. };
  164. if page.valid_size() < cursor_end {
  165. page.set_valid_size(cursor_end);
  166. }
  167. page.set_dirty();
  168. offset += PAGE_SIZE - inner_offset;
  169. }
  170. None => {
  171. let new_page = if (offset >> PAGE_SIZE_BITS) > (old_size >> PAGE_SIZE_BITS) {
  172. let new_page = CachePage::new_zeroed();
  173. new_page
  174. } else {
  175. let mut new_page = CachePage::new();
  176. self.backend
  177. .upgrade()
  178. .unwrap()
  179. .read_page(&mut new_page, offset.align_down(PAGE_SIZE))
  180. .await?;
  181. new_page
  182. };
  183. pages.insert(page_id, new_page);
  184. }
  185. }
  186. }
  187. Ok(wrote)
  188. }
  189. pub async fn fsync(&self) -> KResult<()> {
  190. let pages = self.pages.lock().await;
  191. for (page_id, page) in pages.iter() {
  192. if page.is_dirty() {
  193. self.backend
  194. .upgrade()
  195. .unwrap()
  196. .write_page(&mut CachePageStream::new(*page), page_id << PAGE_SIZE_BITS)
  197. .await?;
  198. page.clear_dirty();
  199. }
  200. }
  201. Ok(())
  202. }
  203. // This function is used for extend write or truncate
  204. pub async fn resize(&self, new_size: usize) -> KResult<()> {
  205. let mut pages = self.pages.lock().await;
  206. let old_size = self.backend.upgrade().unwrap().size();
  207. if new_size < old_size {
  208. let begin = new_size.align_down(PAGE_SIZE) >> PAGE_SIZE_BITS;
  209. let end = old_size.align_up(PAGE_SIZE) >> PAGE_SIZE_BITS;
  210. for page_id in begin..end {
  211. pages.remove(&page_id);
  212. }
  213. } else if new_size > old_size {
  214. let begin = old_size.align_down(PAGE_SIZE) >> PAGE_SIZE_BITS;
  215. let end = new_size.align_up(PAGE_SIZE) >> PAGE_SIZE_BITS;
  216. pages.remove(&begin);
  217. for page_id in begin..end {
  218. let mut new_page = CachePage::new_zeroed();
  219. if page_id != end - 1 {
  220. new_page.set_valid_size(PAGE_SIZE);
  221. } else {
  222. new_page.set_valid_size(new_size % PAGE_SIZE);
  223. }
  224. new_page.set_dirty();
  225. pages.insert(page_id, new_page);
  226. }
  227. }
  228. Ok(())
  229. }
  230. pub async fn with_page<F, O>(&self, offset: usize, func: F) -> KResult<Option<O>>
  231. where
  232. F: FnOnce(&Page, &CachePage) -> O,
  233. {
  234. let offset_aligin = offset.align_down(PAGE_SIZE);
  235. let page_id = offset_aligin >> PAGE_SIZE_BITS;
  236. let size = self.backend.upgrade().unwrap().size();
  237. if offset_aligin > size {
  238. return Ok(None);
  239. }
  240. let mut pages = self.pages.lock().await;
  241. let raw_page_ptr = match pages.get(&page_id) {
  242. Some(CachePage(raw_page_ptr)) => *raw_page_ptr,
  243. None => {
  244. let mut new_page = CachePage::new();
  245. self.backend
  246. .upgrade()
  247. .unwrap()
  248. .read_page(&mut new_page, offset_aligin)
  249. .await?;
  250. pages.insert(page_id, new_page);
  251. new_page.0
  252. }
  253. };
  254. unsafe {
  255. let page = ManuallyDrop::new(Page::from_raw_unchecked(PFN::from(raw_page_ptr)));
  256. Ok(Some(func(&page, &CachePage(raw_page_ptr))))
  257. }
  258. }
  259. }
  260. pub struct CachePageStream {
  261. page: CachePage,
  262. cur: usize,
  263. }
  264. impl CachePageStream {
  265. pub fn new(page: CachePage) -> Self {
  266. Self { page, cur: 0 }
  267. }
  268. pub fn remaining(&self) -> usize {
  269. self.page.valid_size().saturating_sub(self.cur)
  270. }
  271. pub fn is_drained(&self) -> bool {
  272. self.cur >= self.page.valid_size()
  273. }
  274. }
  275. impl Stream for CachePageStream {
  276. fn poll_data<'a>(&mut self, buf: &'a mut [u8]) -> KResult<Option<&'a mut [u8]>> {
  277. if self.cur >= self.page.valid_size() {
  278. return Ok(None);
  279. }
  280. let page_data = &self.page.all()[self.cur..self.page.valid_size()];
  281. let to_read = buf.len().min(page_data.len());
  282. buf[..to_read].copy_from_slice(&page_data[..to_read]);
  283. self.cur += to_read;
  284. Ok(Some(&mut buf[..to_read]))
  285. }
  286. fn ignore(&mut self, len: usize) -> KResult<Option<usize>> {
  287. if self.cur >= self.page.valid_size() {
  288. return Ok(None);
  289. }
  290. let to_ignore = len.min(self.page.valid_size() - self.cur);
  291. self.cur += to_ignore;
  292. Ok(Some(to_ignore))
  293. }
  294. }
  295. // with this trait, "page cache" and "block cache" are unified,
  296. // for fs, offset is file offset (floor algin to PAGE_SIZE)
  297. // for blkdev, offset is block idx (floor align to PAGE_SIZE / BLK_SIZE)
  298. // Oh no, this would make unnecessary cache
  299. pub trait PageCacheBackendOps: Sized {
  300. fn read_page(
  301. &self,
  302. page: &mut CachePage,
  303. offset: usize,
  304. ) -> impl Future<Output = KResult<usize>> + Send;
  305. fn write_page(
  306. &self,
  307. page: &mut CachePageStream,
  308. offset: usize,
  309. ) -> impl Future<Output = KResult<usize>> + Send;
  310. fn size(&self) -> usize;
  311. }
  312. #[async_trait]
  313. pub trait PageCacheBackend: Send + Sync {
  314. async fn read_page(&self, page: &mut CachePage, offset: usize) -> KResult<usize>;
  315. async fn write_page(&self, page: &mut CachePageStream, offset: usize) -> KResult<usize>;
  316. fn size(&self) -> usize;
  317. }
  318. #[async_trait]
  319. impl<T> PageCacheBackend for T
  320. where
  321. T: PageCacheBackendOps + Send + Sync + 'static,
  322. {
  323. async fn read_page(&self, page: &mut CachePage, offset: usize) -> KResult<usize> {
  324. self.read_page(page, offset).await
  325. }
  326. async fn write_page(&self, page: &mut CachePageStream, offset: usize) -> KResult<usize> {
  327. self.write_page(page, offset).await
  328. }
  329. fn size(&self) -> usize {
  330. self.size()
  331. }
  332. }
  333. pub trait PageCacheRawPage: RawPage {
  334. fn valid_size(&self) -> &mut usize;
  335. fn is_dirty(&self) -> bool;
  336. fn set_dirty(&self);
  337. fn clear_dirty(&self);
  338. fn cache_init(&self);
  339. }
  340. impl Drop for PageCache {
  341. fn drop(&mut self) {
  342. let _ = self.fsync();
  343. }
  344. }