Переглянути джерело

feat: add Stream used for write operations

greatbridf 7 місяців тому
батько
коміт
485546e8d0

+ 8 - 4
src/driver/sbi_console.rs

@@ -11,12 +11,16 @@ impl ConsoleWrite for SbiConsole {
 }
 
 impl TerminalDevice for SbiConsole {
-    fn putchar(&self, ch: u8) {
-        eonix_hal::bootstrap::early_console_putchar(ch);
+    fn write(&self, data: &[u8]) {
+        for &ch in data {
+            eonix_hal::bootstrap::early_console_putchar(ch);
+        }
     }
 
-    fn putchar_direct(&self, ch: u8) {
-        eonix_hal::bootstrap::early_console_putchar(ch);
+    fn write_direct(&self, data: &[u8]) {
+        for &ch in data {
+            eonix_hal::bootstrap::early_console_putchar(ch);
+        }
     }
 }
 

+ 6 - 4
src/driver/serial.rs

@@ -200,14 +200,16 @@ impl Serial {
 }
 
 impl TerminalDevice for Serial {
-    fn putchar(&self, ch: u8) {
+    fn write(&self, data: &[u8]) {
         let mut tx_buffer = self.tx_buffer.lock();
-        tx_buffer.push_back(ch);
+        tx_buffer.extend(data.iter().copied());
         self.wakeup_worker();
     }
 
-    fn putchar_direct(&self, ch: u8) {
-        self.tx_rx.write(ch);
+    fn write_direct(&self, data: &[u8]) {
+        for &ch in data {
+            self.tx_rx.write(ch);
+        }
     }
 }
 

+ 22 - 11
src/fs/tmpfs.rs

@@ -1,3 +1,4 @@
+use crate::io::Stream;
 use crate::kernel::constants::{EINVAL, EIO, EISDIR};
 use crate::{
     io::Buffer,
@@ -274,32 +275,42 @@ impl Inode for FileInode {
         }
     }
 
-    fn write(&self, buffer: &[u8], offset: WriteOffset) -> KResult<usize> {
+    fn write(&self, stream: &mut dyn Stream, offset: WriteOffset) -> KResult<usize> {
         // TODO: We don't need that strong guarantee, find some way to avoid locks
         let lock = Task::block_on(self.rwsem.write());
         let filedata = self.filedata.access_mut(lock.prove_mut());
 
+        let mut store_new_end = None;
         let offset = match offset {
             WriteOffset::Position(offset) => offset,
-            // SAFETY: `lock` has done the synchronization
             WriteOffset::End(end) => {
-                let size = self.size.load(Ordering::Relaxed) as usize;
-                *end = size + buffer.len();
+                store_new_end = Some(end);
 
-                size
+                // SAFETY: `lock` has done the synchronization
+                self.size.load(Ordering::Relaxed) as usize
             }
         };
 
-        if filedata.len() < offset + buffer.len() {
-            filedata.resize(offset + buffer.len(), 0);
+        let mut pos = offset;
+        loop {
+            if pos >= filedata.len() {
+                filedata.resize(pos + 4096, 0);
+            }
+
+            match stream.poll_data(&mut filedata[pos..])? {
+                Some(data) => pos += data.len(),
+                None => break,
+            }
         }
 
-        filedata[offset..offset + buffer.len()].copy_from_slice(&buffer);
+        filedata.resize(pos, 0);
+        if let Some(store_end) = store_new_end {
+            *store_end = pos;
+        }
 
         // SAFETY: `lock` has done the synchronization
-        self.size.store(filedata.len() as u64, Ordering::Relaxed);
-
-        Ok(buffer.len())
+        self.size.store(pos as u64, Ordering::Relaxed);
+        Ok(pos - offset)
     }
 
     fn truncate(&self, length: usize) -> KResult<()> {

+ 96 - 0
src/io.rs

@@ -48,6 +48,53 @@ pub trait Buffer {
     }
 }
 
+pub trait Stream {
+    fn poll_data<'a>(&mut self, buf: &'a mut [u8]) -> KResult<Option<&'a mut [u8]>>;
+    fn ignore(&mut self, len: usize) -> KResult<Option<usize>>;
+}
+
+pub trait IntoStream {
+    type Stream: Stream;
+
+    fn into_stream(self) -> Self::Stream;
+}
+
+pub trait StreamRead {
+    fn read_till_end(
+        &mut self,
+        buffer: &mut [u8],
+        func: impl Fn(&mut [u8]) -> KResult<()>,
+    ) -> KResult<usize>;
+
+    fn ignore_all(&mut self) -> KResult<usize>;
+}
+
+impl<T> StreamRead for T
+where
+    T: Stream + ?Sized,
+{
+    fn read_till_end(
+        &mut self,
+        buffer: &mut [u8],
+        func: impl Fn(&mut [u8]) -> KResult<()>,
+    ) -> KResult<usize> {
+        let mut total = 0;
+        while let Some(data) = self.poll_data(buffer)? {
+            func(data)?;
+            total += data.len();
+        }
+        Ok(total)
+    }
+
+    fn ignore_all(&mut self) -> KResult<usize> {
+        let mut total = 0;
+        while let Some(len) = self.ignore(usize::MAX)? {
+            total += len;
+        }
+        Ok(total)
+    }
+}
+
 pub trait BufferFill<T: Copy> {
     fn copy(&mut self, object: &T) -> KResult<FillResult>;
 }
@@ -221,3 +268,52 @@ impl Iterator for Chunks {
         Some((start, len))
     }
 }
+
+pub struct ByteStream<'a> {
+    data: &'a [u8],
+    cur: usize,
+}
+
+impl<'a> ByteStream<'a> {
+    pub fn new(data: &'a [u8]) -> Self {
+        Self { data, cur: 0 }
+    }
+}
+
+impl<'a> Stream for ByteStream<'a> {
+    fn poll_data<'b>(&mut self, buf: &'b mut [u8]) -> KResult<Option<&'b mut [u8]>> {
+        if self.cur >= self.data.len() {
+            return Ok(None);
+        }
+
+        let end = core::cmp::min(self.data.len(), self.cur + buf.len());
+
+        let data = &self.data[self.cur..end];
+        let buf = &mut buf[..data.len()];
+
+        buf.copy_from_slice(data);
+        self.cur += data.len();
+
+        Ok(Some(buf))
+    }
+
+    fn ignore(&mut self, len: usize) -> KResult<Option<usize>> {
+        if self.cur >= self.data.len() {
+            return Ok(None);
+        }
+
+        let remaining = self.data.len() - self.cur;
+        let ignored = core::cmp::min(remaining, len);
+        self.cur += ignored;
+
+        Ok(Some(ignored))
+    }
+}
+
+impl<'a> IntoStream for &'a [u8] {
+    type Stream = ByteStream<'a>;
+
+    fn into_stream(self) -> Self::Stream {
+        ByteStream::new(self)
+    }
+}

+ 20 - 19
src/kernel/chardev.rs

@@ -9,7 +9,10 @@ use super::{
         DevId,
     },
 };
-use crate::{io::Buffer, prelude::*};
+use crate::{
+    io::{Buffer, Stream, StreamRead},
+    prelude::*,
+};
 use alloc::{
     boxed::Box,
     collections::btree_map::{BTreeMap, Entry},
@@ -20,7 +23,7 @@ use eonix_sync::AsProof as _;
 
 pub trait VirtualCharDevice: Send + Sync {
     fn read(&self, buffer: &mut dyn Buffer) -> KResult<usize>;
-    fn write(&self, data: &[u8]) -> KResult<usize>;
+    fn write(&self, stream: &mut dyn Stream) -> KResult<usize>;
 }
 
 pub enum CharDeviceType {
@@ -44,15 +47,13 @@ impl CharDevice {
         }
     }
 
-    pub fn write(&self, data: &[u8]) -> KResult<usize> {
+    pub fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
         match &self.device {
-            CharDeviceType::Virtual(device) => device.write(data),
-            CharDeviceType::Terminal(terminal) => {
-                for &ch in data.iter() {
-                    terminal.show_char(ch);
-                }
-                Ok(data.len())
-            }
+            CharDeviceType::Virtual(device) => device.write(stream),
+            CharDeviceType::Terminal(terminal) => stream.read_till_end(&mut [0; 128], |data| {
+                terminal.write(data);
+                Ok(())
+            }),
         }
     }
 
@@ -99,8 +100,8 @@ impl VirtualCharDevice for NullDevice {
         Ok(0)
     }
 
-    fn write(&self, _data: &[u8]) -> KResult<usize> {
-        Ok(_data.len())
+    fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
+        stream.ignore_all()
     }
 }
 
@@ -112,8 +113,8 @@ impl VirtualCharDevice for ZeroDevice {
         Ok(buffer.wrote())
     }
 
-    fn write(&self, _data: &[u8]) -> KResult<usize> {
-        Ok(_data.len())
+    fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
+        stream.ignore_all()
     }
 }
 
@@ -124,12 +125,12 @@ impl VirtualCharDevice for ConsoleDevice {
         Task::block_on(console_terminal.read(buffer))
     }
 
-    fn write(&self, data: &[u8]) -> KResult<usize> {
+    fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
         let console_terminal = get_console().ok_or(EIO)?;
-        for &ch in data.iter() {
-            console_terminal.show_char(ch);
-        }
-        Ok(data.len())
+        stream.read_till_end(&mut [0; 128], |data| {
+            console_terminal.write(data);
+            Ok(())
+        })
     }
 }
 

+ 23 - 25
src/kernel/syscall/file_rw.rs

@@ -1,4 +1,5 @@
 use super::FromSyscallArg;
+use crate::io::IntoStream;
 use crate::kernel::constants::{
     EBADF, EFAULT, EINVAL, ENOENT, ENOTDIR, SEEK_CUR, SEEK_END, SEEK_SET, S_IFBLK, S_IFCHR,
 };
@@ -73,9 +74,10 @@ fn read(fd: FD, buffer: *mut u8, bufsize: usize) -> KResult<usize> {
 
 #[eonix_macros::define_syscall(SYS_WRITE)]
 fn write(fd: FD, buffer: *const u8, count: usize) -> KResult<usize> {
-    let data = unsafe { core::slice::from_raw_parts(buffer, count) };
+    let buffer = CheckedUserPointer::new(buffer, count)?;
+    let mut stream = buffer.into_stream();
 
-    Task::block_on(thread.files.get(fd).ok_or(EBADF)?.write(data))
+    Task::block_on(thread.files.get(fd).ok_or(EBADF)?.write(&mut stream))
 }
 
 #[eonix_macros::define_syscall(SYS_OPENAT)]
@@ -295,7 +297,7 @@ struct IoVec32 {
 fn readv(fd: FD, iov_user: *const IoVec32, iovcnt: u32) -> KResult<usize> {
     let file = thread.files.get(fd).ok_or(EBADF)?;
 
-    let mut iov_user = UserPointer::new(iov_user as *mut IoVec32)?;
+    let mut iov_user = UserPointer::new(iov_user)?;
     let iov_buffers = (0..iovcnt)
         .map(|_| {
             let iov_result = iov_user.read()?;
@@ -324,35 +326,31 @@ fn readv(fd: FD, iov_user: *const IoVec32, iovcnt: u32) -> KResult<usize> {
 }
 
 #[eonix_macros::define_syscall(SYS_WRITEV)]
-fn writev(fd: FD, iov_user: *const u8, iovcnt: u32) -> KResult<usize> {
+fn writev(fd: FD, iov_user: *const IoVec32, iovcnt: u32) -> KResult<usize> {
     let file = thread.files.get(fd).ok_or(EBADF)?;
 
-    // TODO: Rewrite this with `UserPointer`.
-    let iov_user =
-        CheckedUserPointer::new(iov_user, iovcnt as usize * core::mem::size_of::<IoVec32>())?;
-    let mut iov_user_copied: Vec<IoVec32> = vec![];
-    iov_user_copied.resize(iovcnt as usize, IoVec32::default());
-
-    iov_user.read(
-        iov_user_copied.as_mut_ptr() as *mut (),
-        iov_user_copied.len() * core::mem::size_of::<IoVec32>(),
-    )?;
-
-    let iov_blocks = iov_user_copied
-        .into_iter()
-        .filter(|iov| iov.len != 0)
-        .map(|iov| CheckedUserPointer::new(iov.base as *mut u8, iov.len as usize))
+    let mut iov_user = UserPointer::new(iov_user)?;
+    let iov_streams = (0..iovcnt)
+        .map(|_| {
+            let iov_result = iov_user.read()?;
+            iov_user = iov_user.offset(1)?;
+            Ok(iov_result)
+        })
+        .filter_map(|iov_result| match iov_result {
+            Err(err) => Some(Err(err)),
+            Ok(IoVec32 { len: 0, .. }) => None,
+            Ok(IoVec32 { base, len }) => Some(
+                CheckedUserPointer::new(base as *mut u8, len as usize).map(|ptr| ptr.into_stream()),
+            ),
+        })
         .collect::<KResult<Vec<_>>>()?;
 
     let mut tot = 0usize;
-    for block in iov_blocks.into_iter() {
-        // TODO!!!: atomic `writev`
-        // TODO!!!!!: copy from user
-        let slice = block.as_slice();
-        let nread = Task::block_on(file.write(slice))?;
+    for mut stream in iov_streams.into_iter() {
+        let nread = Task::block_on(file.write(&mut stream))?;
         tot += nread;
 
-        if nread == 0 || nread != slice.len() {
+        if nread == 0 || !stream.is_drained() {
             break;
         }
     }

+ 12 - 22
src/kernel/terminal.rs

@@ -345,9 +345,8 @@ impl Termios {
 }
 
 pub trait TerminalDevice: Send + Sync {
-    fn putchar(&self, ch: u8);
-
-    fn putchar_direct(&self, ch: u8);
+    fn write_direct(&self, data: &[u8]);
+    fn write(&self, data: &[u8]);
 }
 
 struct TerminalInner {
@@ -414,9 +413,8 @@ impl Terminal {
         inner.buffer.clear();
     }
 
-    // TODO: Buffer terminal writes.
-    pub fn show_char(&self, ch: u8) {
-        self.device.putchar(ch)
+    pub fn write(&self, data: &[u8]) {
+        self.device.write(data);
     }
 
     fn erase(&self, inner: &mut TerminalInner, echo: bool) -> Option<u8> {
@@ -432,9 +430,7 @@ impl Terminal {
         let back = inner.buffer.pop_back();
 
         if echo && inner.termio.echo() && inner.termio.echoe() {
-            self.show_char(CTRL!('H')); // Backspace
-            self.show_char(b' '); // Space
-            self.show_char(CTRL!('H')); // Backspace
+            self.write(&[CTRL!('H'), b' ', CTRL!('H')]); // Backspace, Space, Backspace
         }
 
         return back;
@@ -442,15 +438,11 @@ impl Terminal {
 
     fn echo_char(&self, inner: &mut TerminalInner, ch: u8) {
         match ch {
-            b'\t' | b'\n' | CTRL!('Q') | CTRL!('S') => self.show_char(ch),
-            c if c >= 32 => self.show_char(ch),
-            _ if !inner.termio.echo() => self.show_char(ch),
-            _ if !inner.termio.echoctl() => self.show_char(ch),
-            _ if !inner.termio.iexten() => self.show_char(ch),
-            _ => {
-                self.show_char(b'^');
-                self.show_char(ch + 0x40);
-            }
+            b'\t' | b'\n' | CTRL!('Q') | CTRL!('S') | 32.. => self.write(&[ch]),
+            _ if !inner.termio.echo() => self.write(&[ch]),
+            _ if !inner.termio.echoctl() => self.write(&[ch]),
+            _ if !inner.termio.iexten() => self.write(&[ch]),
+            _ => self.write(&[b'^', ch + 0x40]),
         }
     }
 
@@ -513,7 +505,7 @@ impl Terminal {
                 ch if ch == inner.termio.vkill() => {
                     if inner.termio.echok() {
                         while self.erase(&mut inner, false).is_some() {}
-                        self.show_char(b'\n');
+                        self.write(&[b'\n']);
                     } else if inner.termio.echoke() && inner.termio.iexten() {
                         while self.erase(&mut inner, true).is_some() {}
                     }
@@ -679,8 +671,6 @@ impl Terminal {
 
 impl ConsoleWrite for Terminal {
     fn write(&self, s: &str) {
-        for &ch in s.as_bytes() {
-            self.device.putchar_direct(ch);
-        }
+        self.device.write_direct(s.as_bytes());
     }
 }

+ 81 - 22
src/kernel/user/dataflow.rs

@@ -1,5 +1,8 @@
-use crate::kernel::constants::{EFAULT, EINVAL};
-use core::{arch::asm, ffi::CStr};
+use crate::{
+    io::{IntoStream, Stream},
+    kernel::constants::{EFAULT, EINVAL},
+};
+use core::{arch::asm, ffi::CStr, marker::PhantomData};
 use eonix_preempt::assert_preempt_enabled;
 
 use crate::{
@@ -7,36 +10,40 @@ use crate::{
     prelude::*,
 };
 
-pub struct CheckedUserPointer {
+pub struct CheckedUserPointer<'a> {
     ptr: *const u8,
     len: usize,
+    _phantom: PhantomData<&'a ()>,
 }
 
-pub struct UserBuffer<'lt> {
-    ptr: CheckedUserPointer,
+pub struct UserBuffer<'a> {
+    ptr: CheckedUserPointer<'a>,
     size: usize,
     cur: usize,
-    _phantom: core::marker::PhantomData<&'lt ()>,
 }
 
-pub struct UserString<'lt> {
-    ptr: CheckedUserPointer,
+pub struct UserString<'a> {
+    ptr: CheckedUserPointer<'a>,
     len: usize,
-    _phantom: core::marker::PhantomData<&'lt ()>,
 }
 
 pub struct UserPointer<'a, T: Copy, const CONST: bool> {
-    pointer: CheckedUserPointer,
-    _phantom: core::marker::PhantomData<&'a T>,
+    pointer: CheckedUserPointer<'a>,
+    _phantom: PhantomData<T>,
+}
+
+pub struct UserStream<'a> {
+    pointer: CheckedUserPointer<'a>,
+    cur: usize,
 }
 
-impl<'a, T: Copy, const CONST: bool> UserPointer<'a, T, CONST> {
+impl<T: Copy, const CONST: bool> UserPointer<'_, T, CONST> {
     pub fn new(ptr: *const T) -> KResult<Self> {
         let pointer = CheckedUserPointer::new(ptr as *const u8, core::mem::size_of::<T>())?;
 
         Ok(Self {
             pointer,
-            _phantom: core::marker::PhantomData,
+            _phantom: PhantomData,
         })
     }
 
@@ -65,14 +72,18 @@ impl<'a, T: Copy> UserPointer<'a, T, false> {
     }
 }
 
-impl CheckedUserPointer {
+impl CheckedUserPointer<'_> {
     pub fn new(ptr: *const u8, len: usize) -> KResult<Self> {
         const USER_MAX_ADDR: usize = 0x7ff_fff_fff_fff;
         let end = (ptr as usize).checked_add(len);
         if ptr.is_null() || end.ok_or(EFAULT)? > USER_MAX_ADDR {
             Err(EFAULT)
         } else {
-            Ok(Self { ptr, len })
+            Ok(Self {
+                ptr,
+                len,
+                _phantom: PhantomData,
+            })
         }
     }
 
@@ -275,12 +286,7 @@ impl UserBuffer<'_> {
     pub fn new(ptr: *mut u8, size: usize) -> KResult<Self> {
         let ptr = CheckedUserPointer::new(ptr, size)?;
 
-        Ok(Self {
-            ptr,
-            size,
-            cur: 0,
-            _phantom: core::marker::PhantomData,
-        })
+        Ok(Self { ptr, size, cur: 0 })
     }
 
     fn remaining(&self) -> usize {
@@ -382,7 +388,6 @@ impl<'lt> UserString<'lt> {
             Ok(Self {
                 ptr,
                 len: MAX_LEN - result,
-                _phantom: core::marker::PhantomData,
             })
         }
     }
@@ -396,3 +401,57 @@ impl<'lt> UserString<'lt> {
         }
     }
 }
+
+impl UserStream<'_> {
+    pub fn len(&self) -> usize {
+        self.pointer.len
+    }
+
+    pub fn remaining(&self) -> usize {
+        self.pointer.len - self.cur
+    }
+
+    pub fn is_drained(&self) -> bool {
+        self.cur >= self.pointer.len
+    }
+}
+
+impl Stream for UserStream<'_> {
+    fn poll_data<'a>(&mut self, buf: &'a mut [u8]) -> KResult<Option<&'a mut [u8]>> {
+        assert_preempt_enabled!("UserStream::poll_data");
+
+        if self.cur >= self.pointer.len {
+            return Ok(None);
+        }
+
+        let to_read = buf.len().min(self.pointer.len - self.cur);
+
+        self.pointer.read(buf.as_mut_ptr() as *mut (), to_read)?;
+
+        self.pointer.forward(to_read);
+        self.cur += to_read;
+        Ok(Some(&mut buf[..to_read]))
+    }
+
+    fn ignore(&mut self, len: usize) -> KResult<Option<usize>> {
+        if self.cur >= self.pointer.len {
+            return Ok(None);
+        }
+        let to_ignore = len.min(self.pointer.len - self.cur);
+
+        self.pointer.forward(to_ignore);
+        self.cur += to_ignore;
+        Ok(Some(to_ignore))
+    }
+}
+
+impl<'a> IntoStream for CheckedUserPointer<'a> {
+    type Stream = UserStream<'a>;
+
+    fn into_stream(self) -> Self::Stream {
+        UserStream {
+            pointer: self,
+            cur: 0,
+        }
+    }
+}

+ 7 - 4
src/kernel/vfs/dentry.rs

@@ -4,7 +4,6 @@ use super::{
     inode::{Ino, Inode, Mode, WriteOffset},
     s_isblk, s_ischr, s_isdir, s_isreg, DevId, FsContext,
 };
-use crate::kernel::constants::{EEXIST, EINVAL, EISDIR, ELOOP, ENOENT, ENOTDIR, EPERM, ERANGE};
 use crate::{
     hash::KernelHasher,
     io::{Buffer, ByteBuffer},
@@ -13,6 +12,10 @@ use crate::{
     prelude::*,
     rcu::{RCUNode, RCUPointer},
 };
+use crate::{
+    io::Stream,
+    kernel::constants::{EEXIST, EINVAL, EISDIR, ELOOP, ENOENT, ENOTDIR, EPERM, ERANGE},
+};
 use alloc::sync::Arc;
 use core::{
     fmt,
@@ -384,14 +387,14 @@ impl Dentry {
         }
     }
 
-    pub fn write(&self, buffer: &[u8], offset: WriteOffset) -> KResult<usize> {
+    pub fn write(&self, stream: &mut dyn Stream, offset: WriteOffset) -> KResult<usize> {
         let inode = self.get_inode()?;
         // Safety: Changing mode alone will have no effect on the file's contents
         match inode.mode.load(Ordering::Relaxed) {
             mode if s_isdir(mode) => Err(EISDIR),
-            mode if s_isreg(mode) => inode.write(buffer, offset),
+            mode if s_isreg(mode) => inode.write(stream, offset),
             mode if s_isblk(mode) => Err(EINVAL), // TODO
-            mode if s_ischr(mode) => CharDevice::get(inode.devid()?).ok_or(EPERM)?.write(buffer),
+            mode if s_ischr(mode) => CharDevice::get(inode.devid()?).ok_or(EPERM)?.write(stream),
             _ => Err(EINVAL),
         }
     }

+ 29 - 64
src/kernel/vfs/file.rs

@@ -3,11 +3,8 @@ use super::{
     inode::{Mode, WriteOffset},
     s_isblk, s_isdir, s_isreg,
 };
-use crate::kernel::constants::{
-    EBADF, EFAULT, EINTR, EINVAL, ENOTDIR, ENOTTY, EOVERFLOW, EPIPE, ESPIPE, S_IFMT,
-};
 use crate::{
-    io::{Buffer, BufferFill, ByteBuffer, Chunks},
+    io::{Buffer, BufferFill, ByteBuffer, Chunks, IntoStream},
     kernel::{
         constants::{TCGETS, TCSETS, TIOCGPGRP, TIOCGWINSZ, TIOCSPGRP},
         mem::{paging::Page, AsMemoryBlock as _},
@@ -19,6 +16,12 @@ use crate::{
     prelude::*,
     sync::CondVar,
 };
+use crate::{
+    io::{Stream, StreamRead},
+    kernel::constants::{
+        EBADF, EFAULT, EINTR, EINVAL, ENOTDIR, ENOTTY, EOVERFLOW, EPIPE, ESPIPE, S_IFMT,
+    },
+};
 use alloc::{collections::vec_deque::VecDeque, sync::Arc};
 use bitflags::bitflags;
 use core::{ops::ControlFlow, sync::atomic::Ordering};
@@ -193,54 +196,17 @@ impl Pipe {
         return Ok(data.len());
     }
 
-    async fn write_non_atomic(&self, data: &[u8]) -> KResult<usize> {
-        let mut inner = self.inner.lock().await;
-
-        if inner.read_closed {
-            send_sigpipe_to_current();
-            return Err(EPIPE);
-        }
-
-        let mut remaining = data;
-        while !remaining.is_empty() {
-            let space = inner.buffer.capacity() - inner.buffer.len();
-
-            if space != 0 {
-                let to_write = remaining.len().min(space);
-                inner.buffer.extend(&remaining[..to_write]);
-                remaining = &remaining[to_write..];
-
-                self.cv_read.notify_all();
-            }
-
-            if remaining.is_empty() {
+    async fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
+        let mut buffer = [0; Self::PIPE_SIZE];
+        let mut total = 0;
+        while let Some(data) = stream.poll_data(&mut buffer)? {
+            let nwrote = self.write_atomic(data).await?;
+            total += nwrote;
+            if nwrote != data.len() {
                 break;
             }
-
-            inner = self.cv_write.wait(inner).await;
-            if Thread::current().signal_list.has_pending_signal() {
-                if data.len() != remaining.len() {
-                    break;
-                }
-                return Err(EINTR);
-            }
-
-            if inner.read_closed {
-                send_sigpipe_to_current();
-                return Err(EPIPE);
-            }
-        }
-
-        Ok(data.len() - remaining.len())
-    }
-
-    async fn write(&self, data: &[u8]) -> KResult<usize> {
-        // Writes those are smaller than the pipe size are atomic.
-        if data.len() <= Self::PIPE_SIZE {
-            self.write_atomic(data).await
-        } else {
-            self.write_non_atomic(data).await
         }
+        Ok(total)
     }
 }
 
@@ -311,7 +277,7 @@ impl InodeFile {
         Ok(new_cursor)
     }
 
-    fn write(&self, buffer: &[u8]) -> KResult<usize> {
+    fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
         if !self.write {
             return Err(EBADF);
         }
@@ -320,11 +286,11 @@ impl InodeFile {
 
         // TODO!!!: use `UserBuffer`
         if self.append {
-            let nwrote = self.dentry.write(buffer, WriteOffset::End(&mut cursor))?;
+            let nwrote = self.dentry.write(stream, WriteOffset::End(&mut cursor))?;
 
             Ok(nwrote)
         } else {
-            let nwrote = self.dentry.write(buffer, WriteOffset::Position(*cursor))?;
+            let nwrote = self.dentry.write(stream, WriteOffset::Position(*cursor))?;
 
             *cursor += nwrote;
             Ok(nwrote)
@@ -413,12 +379,11 @@ impl TerminalFile {
         self.terminal.read(buffer).await
     }
 
-    fn write(&self, buffer: &[u8]) -> KResult<usize> {
-        for &ch in buffer.iter() {
-            self.terminal.show_char(ch);
-        }
-
-        Ok(buffer.len())
+    fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
+        stream.read_till_end(&mut [0; 128], |data| {
+            self.terminal.write(data);
+            Ok(())
+        })
     }
 
     async fn poll(&self, event: PollEvent) -> KResult<PollEvent> {
@@ -467,12 +432,12 @@ impl File {
     //     }
     // }
 
-    pub async fn write(&self, buffer: &[u8]) -> KResult<usize> {
+    pub async fn write(&self, stream: &mut dyn Stream) -> KResult<usize> {
         match self {
-            File::Inode(inode) => inode.write(buffer),
-            File::PipeWrite(pipe) => pipe.pipe.write(buffer).await,
-            File::TTY(tty) => tty.write(buffer),
-            File::CharDev(device) => device.write(buffer),
+            File::Inode(inode) => inode.write(stream),
+            File::PipeWrite(pipe) => pipe.pipe.write(stream).await,
+            File::TTY(tty) => tty.write(stream),
+            File::CharDev(device) => device.write(stream),
             _ => Err(EBADF),
         }
     }
@@ -518,7 +483,7 @@ impl File {
                 break;
             }
 
-            let nwrote = dest_file.write(&buffer[..nread]).await?;
+            let nwrote = dest_file.write(&mut buffer[..nread].into_stream()).await?;
             nsent += nwrote;
 
             if nwrote != len {

+ 2 - 1
src/kernel/vfs/inode.rs

@@ -1,4 +1,5 @@
 use super::{dentry::Dentry, s_isblk, s_ischr, vfs::Vfs, DevId, TimeSpec};
+use crate::io::Stream;
 use crate::kernel::constants::{
     EINVAL, EISDIR, ENOTDIR, EPERM, STATX_ATIME, STATX_BLOCKS, STATX_CTIME, STATX_GID, STATX_INO,
     STATX_MODE, STATX_MTIME, STATX_NLINK, STATX_SIZE, STATX_TYPE, STATX_UID, S_IFDIR, S_IFMT,
@@ -115,7 +116,7 @@ pub trait Inode: Send + Sync + InodeInner {
         Err(if self.is_dir() { EISDIR } else { EINVAL })
     }
 
-    fn write(&self, buffer: &[u8], offset: WriteOffset) -> KResult<usize> {
+    fn write(&self, stream: &mut dyn Stream, offset: WriteOffset) -> KResult<usize> {
         Err(if self.is_dir() { EISDIR } else { EINVAL })
     }