@zephyr
Articles8
Tags10
Categories0
rust-zero-copy

rust-zero-copy

Rust: implement tcp zero-copy in an asynchronous context

Update: See this library.

Update: Tokio>=1.9.0 provides a low level API try_io, making it more efficient to do custom IO operation and clear readiness on raw fds.


I’m working on a multi-protocol network relay(written in rust) these days. I have searched a lot about zero-copy, but there seems no ready-made solution for async rust.

Zero-copy is of much significance for a network relay, where the CPU spends nearly all of its time copying transferred data. It could help reduce unnecessary CPU copies and context switching between user space and kernel space, thus bringing higher throughout and lower latency.

However, Tokio(a popular async framework) does not provide any zero-copy API. There are also a few issues about this on Tokio’s github, saying it not possible to provide such APIs with the current API set. But, in our case, the problem could be simplified. It is possible to implement zero-copy keeping using Tokio.

Consider the async task is just to proxy 2 TCP connection, where there’s no need to use buffers across tasks. If we create a buffer(pipe) within the task, we could make sure that the pipe is always valid during the async operation. When the task is finished or cancelled, the pipe will be dropped as normal(of course we need to impl Drop trait for it, in case of resource leakage).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
pub struct Pipe(i32, i32);

impl Pipe {
pub fn create() -> io::Result<Self> {
use libc::{c_int, O_NONBLOCK};
let mut pipes = std::mem::MaybeUninit::<[c_int; 2]>::uninit();
unsafe {
if libc::pipe2(pipes.as_mut_ptr() as *mut c_int, O_NONBLOCK) < 0 {
return Err(/* error handle */);
}
Ok(Pipe(pipes.assume_init()[0], pipes.assume_init()[1]))
}
}
}

impl Drop for Pipe {
fn drop(&mut self) {
unsafe {
libc::close(self.0);
libc::close(self.1);
}
}
}

Moreover, we could count the left bytes in the pipe, and limit the max read/write bytes when calling splice. This ensures that the pipe never blocks, and we no longer need to poll the pipe, waiting for its readable/writable event.

1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn splice_n(r: i32, w: i32, n: usize) -> isize {
use libc::{loff_t, SPLICE_F_MOVE, SPLICE_F_NONBLOCK};
unsafe {
libc::splice(
r,
std::ptr::null_mut::<loff_t>(),
w,
std::ptr::null_mut::<loff_t>(),
n,
SPLICE_F_MOVE | SPLICE_F_NONBLOCK,
)
}
}

Then how could we perform IO operations with splice? It’s quite different from normal read/write, which requires to impl AsyncRead or AsyncWrite, and use global functions provided by AsyncReadExt or AsyncWriteExt(within which the polling states are handled automatically). Meanwhile, Tokio’s AsyncRead and AsyncWrite trait take a &[u8] buffer, however we should use a pipe for zero-copy.

There is no need to create a new type which represents a tcp socket, such as ZeroTcpStream. We can still use TcpStream, making use of its inner event loop, which is scheduled by Tokio. Instead of calling read/write, we get its inner fd via as_raw_fd() (TcpStream impls AsRawFd), then call libc::splice on the fd directly.

1
2
3
4
// read
splice_n(tcp_read_half.as_ref().as_raw_fd(), pipe_write_half, n);
// write
splice_n(pipe_read_half, tcp_write_half.as_ref().as_raw_fd(), n);

Remember that our splice does not clear the readiness on the raw fds, which would lead to a busy loop. We need to do this manually. While Tokio does not provide a public(there are some pub(crate)) API which explicitly clear readiness, this is somewhat tricky.

I have looked into Tokio’s source code, finding some public APIs implicitly clearing readiness under some specific conditions. Among them the simplest way is to call the non-blocking method try_read/write or await on read/write with an empty buffer. And before doing this we must ensure the kernel returns EWOULDBLOCK or EAGAIN, otherwise the data would be corrupted.

1
2
3
4
5
pub fn is_wouldblock() -> bool {
use libc::{EAGAIN, EWOULDBLOCK};
let errno = unsafe { *libc::__errno_location() };
errno == EWOULDBLOCK || errno == EAGAIN
}

Below is the main function(use generics to support both TCP and UnixSocket):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
pub async fn zero_copy<X, Y, R, W>(mut r: R, mut w: W) -> io::Result<()>
where
X: AsRawFd,
Y: AsRawFd,
R: AsyncRead + AsRef<X> + Unpin,
W: AsyncWrite + AsRef<Y> + Unpin,
{
// create pipe
let pipe = Pipe::create()?;
let (rpipe, wpipe) = (pipe.0, pipe.1);
// get raw fd
let rfd = r.as_ref().as_raw_fd();
let wfd = w.as_ref().as_raw_fd();
let mut n: usize = 0;
let mut done = false;

'LOOP: loop {
// read until the socket buffer is empty
// or the pipe is filled
// clear readiness (EPOLLIN)
r.read(&mut [0u8; 0]).await?;
while n < PIPE_BUF_SIZE {
match splice_n(rfd, wpipe, PIPE_BUF_SIZE - n) {
x if x > 0 => n += x as usize,
x if x == 0 => {
done = true;
break;
}
x if x < 0 && is_wouldblock() => break,
_ => break 'LOOP,
}
}
// write until the pipe is empty
while n > 0 {
// clear readiness (EPOLLOUT)
w.write(&[0u8; 0]).await?;
match splice_n(rpipe, wfd, n) {
x if x > 0 => n -= x as usize,
x if x < 0 && is_wouldblock() => {}
_ => break 'LOOP,
}
}
// complete
if done {
break;
}
}

w.shutdown().await?;
Ok(())
}
Author:@zephyr
Link:https://zephyr.moe/2021/07/23/rust-zero-copy/
License:CC BY-NC-SA 3.0 CN