Go 与异步 IO - io_uring 的思考
Golang 中并发 IO 的现状
对于 Go 这种本身便是为并发而生的语言来说,使用 这种系统级异步接口也不是那么的迫切
比如对于普通文件的读写以及 socket 的操作都会通过 来进行优化,当文件/套接字可读可写时 netpoll 便会唤醒相应 goroutine 来对文件进行读写
而对于可能会阻塞的系统调用,在 syscall 底层调用 时配合 runtime 判断是否将P 和 G 绑定的 M 解绑,然后将 P 交给其他 M 来使用,通过这种机制可以减少系统调用从用户态切换到内核态对整个程序带来的损耗
Go runtime 实际上已经实现了用户态的并发 IO,现在 Linux 内核提供了新的异步 IO 接口,那又该如何去利用这种新的技术呢
我们首先先看一下当前 Go 是如何做到异步 IO 的
IO 与 netpoll
文件 IO 与 netpoll
// src/os/file.go
func OpenFile(name string, flag int, perm FileMode) (*File, error) {
f, err := openFileNolog(name, flag, perm)
}
// src/os/file_unix.go
func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
// ...
r, e = syscall.Open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
// ...
return newFile(uintptr(r), name, kindOpenFile), nil
}
// src/os/file_unix.go
func newFile(fd uintptr, name string, kind newFileKind) *File {
fdi := int(fd)
f := &File{&file{
pfd: poll.FD{
Sysfd: fdi,
IsStream: true,
ZeroReadIsEOF: true,
},
name: name,
stdoutOrErr: fdi == 1 || fdi == 2,
}}
pollable := kind == kindOpenFile || kind == kindPipe || kind == kindNonBlock
// ...
if err := f.pfd.Init("file", pollable); err != nil {
// ...
} else if pollable {
if err := syscall.SetNonblock(fdi, true); err == nil {
f.nonblock = true
}
return f
}
}
从 到 ,可以看到文件的 被放到 进行初始化了, 便是将注册到 中
需要注意当文件被注册到 后,会将它置为非阻塞模式(),因为 采用的是边缘触发模式
比如说非阻塞文件描述符中有可读事件时, 只会通知一次(除非有新的数据被写入文件会再次通知),也就说需要所有数据读出来直到返回 ,对于阻塞模式的socket文件,当从socket中读取数据时就可能会阻塞等待,这样也就失去了 epoll 的意义
我们可以再看一下 是如何利用 进行读取的
// src/internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
// ...
for {
n, err := ignoringEINTR(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
可以看到 中调用 读取文件,如果出现 ,那么就调用 来等待数据可读
// src/internal/poll/fd_unix.go
type FD struct {
// ...
Sysfd int
pd pollDesc
}
Go 对 的抽象
// src/internal/poll/fd_poll_runtime.go
func runtime_pollServerInit()
func runtime_pollOpen(fd uintptr)(uintptr, int)
func runtime_pollClose(ctx uintptr)
// ...
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
// ...
pd.runtimeCtx = ctx
return nil
}
这些函数才是真正的 netpoll,而这些函数是在
// src/runtime/netpoll.go
// go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
// ...
}
根据具体的平台来实现 poller,对于 Linux,便是使用
// src/runtime/netpoll_epoll.go
// 注册文件到 netpoll 中
func netpolllopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
// ...
return -epollctr(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
添加新的文件描述符时,可以发现是以 的方式注册到 中
socket IO 与 netpoll
从 这个名字上就可以看出, 是 Go 为了高性能的异步网络而实现的
看一下创建 TCPListener socket 的流程
// src/net/tcpsock.go
type TCPListener struct {
fd *netFD
// ...
}
// src/net/fd_posix.go
type netFD struct {
pfd poll.FD
// ...
}
// 1.
// src/net/tcpsock.go
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
sl := &sysListener{network: network, address: laddr.String()}
ln, err := sl.listenTCP(context.Background(), laddr)
// ...
}
// 2.
// src/net/tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
// ...
return &TCPListener{fd: fd, lc, sl.ListenConfig}, nil
}
// 3.
// src/net/ipsock_posix.go
func internelSocket(ctx context.Context, ...) (fd *netFD, err error) {
// ...
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, radddr, ctrlFn)
}
// 4.
// src/sock_posix.go
func socket(...) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
// ...
fd, err = newFD(s, family, sotype, net)
}
// 5.
// src/fd_unix.go
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
// ...
},
// ...
}
return ret, nil
}
创建 TCPListener 链路还是挺长的,不过在第四步 函数中可以看到调用 来返回 实例,而 便是 , 而对 的读写和文件IO一样便都会通过 来利用 。
netpoll 唤醒 goroutine
挂起 goroutine
通过 将文件描述符加入到 后,当对文件描述符进行读写时,如果 返回 的话就需要调用 来等待可读可写
// src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitRead(isFile bool) error{
return pd.wait('r', isFile)
}
func (pd *pollDesc) waitWrite(isFile bool) error{
return pd.wait('w', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error{
// ...
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
// src/runtime/netpoll.go
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...
for !netpollblock(pd, int32(mode), false) {
// ...
}
// ...
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
// ...
// 状态检查
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// ...
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
// ...
}
等待文件可读写,最终会调用 函数,并不会直接调用 epoll wait 的系统调用,而是挂起当前 goroutine, 并等待唤醒
唤醒 goroutine
// src/runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
// ...
var waitms int32
// 计算 waitms,大概规则:
// delay < 0, waitms = -1,阻塞等待
// delay == 0, waitms = 0, 不阻塞
// delay > 0, delay 以纳秒为单位作为 waitms
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
// ...
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
// ...
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
netpollready(&toRun, pd, mode)
}
}
return toRun
}
会调用 来获取epoll事件,而在 runtime 中很多地方都会调用 函数
监控函数
// src/runtime/proc.go
func sysmon() {
// ....
for {
// ...
list := netpoll(0)
if !list.empty() {
//...
injectglist(&list) // 将 goroutine 放到 runable 队列中
}
}
}
查找可运行的 goroutine
// src/runtime/proc.go
func findrunable() (gp *g, inheritTIme bool) {
top:
// ...
if list := netpoll(0); !list.empty() {
gp := list.pop()
injectglist(&list)
// ...
return gp, false
}
// ....
stop:
// ...
list := netpoll(delta) // block until new work is available
// ...
}
GC 时调用
// src/runtime/proc.go
func startTheWorld() {
systemstack(func() {startTheWorldWithSema(false)})
// ...
}
func startTheWorldWithSema(emitTraceEvent bool) int64 {
// ...
list := netpoll(0)
injectglist(&list)
// ...
}
通常获取可用的 goroutine 时都可能有机会去调用 ,然后再调用 将可运行 goroutine 加入到队列中
系统级异步接口 —— io_uring
Linux kernel 5.1 新增了异步接口 ,它是 Jens Axboe 以高效,可扩展,易用为目的设计的一种全新异步接口,为什么是全新呢,因为 Linux 已经提供了异步 IO 接口 —— AIO,不过就连 Linus 都对它一阵吐槽
So I think this is ridiculously ugly.
AIO is a horrible ad-hoc design, with the main excuse being "other,
less gifted people, made that design, and we are implementing it for
compatibility because database people - who seldom have any shred of
taste - actually use it".
But AIO was always really really ugly.
io_uring 提供的异步接口,不仅仅可以使用 文件 IO,套接字 IO,甚至未来可以扩展加入其它系统调用
而且 采用应用程序和内核共享内存的方式,来提交请求和获取完成事件
使用共享内存的方式可能是内核对接口优化的一种趋势
io_uring 名称的意思便是
SubmissionQueueEntry
我们来看一下 io_uring 用来提交请求的结构
struct io_uring_sqe {
__u8 opcode; /* 请求的操作类型 */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* 用于 IO 的文件描述符 */
union {
__u64 off; /* offset into file */
__u64 addr2;
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
/*
* 用于特定操作的字段
*/
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
};
__u64 user_data; /* 用来关联请求的完成事件 */
union {
struct {
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
__s32 splice_fd_in;
};
__u64 __pad2[3];
};
};
一个非常复杂的结构,最核心的三个字段 ,,
指定具体是什么操作,比如 ,,,现在支持 35 种操作
表示用于 IO 操作的文件描述符
当请求操作完成后,io_uring 会生成一个完成事件放到 中,而 便是用来和中的事件进行绑定的,他会原封不动的复制到中
字段用来实现链式请求之类的功能
可以发现 是uint8,也就是现在来看最多支持 256 个系统调用,但是整个 还为未来预留了一些空间
通过 结合结合其他 的字段,便实现了扩展性极强的 接口
CompletionQueueEvent
的结构就比较简单了,主要是表示提交的异步操作执行的结果
struct io_uring_cqe {
__u64 user_data; /* 直接复制 sqe->data */
__s32 res; /* 异步操作的结果 */
__u32 flags;
};
便是从 直接复制过来了,可以通过 绑定到对应的
便是异步操作执行的结果,如果 res < 0,通常说明操作执行错误
暂时没有使用
io_uring_setup
io_uring 使用共享内存的方式,来提交请求和获取执行结果,减少内存拷贝带来的损耗
接口会接受一个指定提交队列大小的 类型参数和一个 对象
#include
int io_uring_setup(u32 entries, struct io_uring_params *p);
调用成功会返回一个,用于后续的操作
io_uring_params
struct io_uring_params {
__u32 sq_entries;
__u32 cq_entries;
__u32 flags;
__u32 sq_thread_cpu;
__u32 sq_thread_idle;
__u32 features;
__u32 wq_fd;
__u32 resv[3];
struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off;
};
不只用来配置 实例,内核也会填充 中关于 实例的信息,比如用来映射共享内存的请求队列和完成队列字段的偏移量 - 和
配置 io_uring
以位掩码的方式,结合相应 , ,, 字段来配置 io_uring 实例
/*
* io_uring_setup() flags
*/
#define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */
#define IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */
#define IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */
#define IORING_SETUP_CQSIZE (1U << 3) /* app defines CQ size */
#define IORING_SETUP_CLAMP (1U << 4) /* clamp SQ/CQ ring sizes */
#define IORING_SETUP_ATTACH_WQ (1U << 5) /* attach to existing wq */
#define IORING_SETUP_R_DISABLED (1U << 6) /* start with ring disabled */
通常 cq_entries 为 sq_entries 的两倍,通过 指定 ,然后设置 字段为指定大小
cq_entries 不能小于 sq_entries
提供了初始化 io_uring 对象时的配置函数,可以看一下这些函数的具体实现
type IOURingOption func(*IOURing)
func New(entries uint, opts ...IOURingOption) (iour *IOURing, err error)
func WithParams(params *iouring_syscall.IOURingParams) IOURingOption
func WithAsync() IOURingOption
func WithDisableRing() IOURingOption
func WithCQSize(size uint32) IOURingOption
func WithSQPoll() IOURingOption
func WithSQPollThreadCPU(cpu uint32) IOURingOption
func WithSQPollThreadIdle(idle time.Duration) IOURingOption
内核填充信息
内核会向 填充跟 io_uring 实例相关的信息
请求队列的大小, 会传递请求队列的大小 ,io_uring 会根据 设置 为 2 的次方大小
完成队列的大小,通常为 的两倍,即使通过 flag 设置了 ,内核依然会以 2 的次方重新计算出 的大小
记录了当前内核版本支持的一些功能
/*
* io_uring_params->features flags
*/
#define IORING_FEAT_SINGLE_MMAP (1U << 0)
#define IORING_FEAT_NODROP (1U << 1)
#define IORING_FEAT_SUBMIT_STABLE (1U << 2)
#define IORING_FEAT_RW_CUR_POS (1U << 3)
#define IORING_FEAT_CUR_PERSONALITY (1U << 4)
#define IORING_FEAT_FAST_POLL (1U << 5)
#define IORING_FEAT_POLL_32BITS (1U << 6)
#define IORING_FEAT_SQPOLL_NONFIXED (1U << 7)
和 便是和在共享内存中的偏移量
struct io_sqring_offsets {
__u32 head;
__u32 tail;
__u32 ring_mask;
__u32 ring_entries;
__u32 flags;
__u32 dropped;
__u32 array;
__u32 resv1;
__u64 resv2;
};
struct io_cqring_offsets {
__u32 head;
__u32 tail;
__u32 ring_mask;
__u32 ring_entries;
__u32 overflow;
__u32 cqes;
__u32 flags;
__u32 resv1;
__u64 resv2;
};
根据这些偏移量便可以调用 mmap 来映射 SQ 和 CQ
ptr = mmap(0, sq_off.array + sq_entries * sizeof(__u32),
PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE,
ring_fd, IORING_OFF_SQ_RING);
// iouring-go/iouring.go
func New(entries uint, opts ...IOURingOption) (*IOURing, error) {
iour := &IOURing{
params: &iouring_syscall.IOURingParams{},
userDatas: make(map[uint64]*UserData),
cqeSign: make(chan struct{}, 1),
closer: make(chan struct{}),
closed: make(chan struct{}),
}
for _, opt := range opts {
opt(iour)
}
var err error
iour.fd, err = iouring_syscall.IOURingSetup(entries, iour.params)
if err != nil {
return nil, err
}
if err := mmapIOURing(iour); err != nil {
munmapIOURing(iour)
return nil, err
}
// ...
}
中实现了对请求队列以及完成队列的内存映射
// iouring-go/mmap.go
func mmapIOURing(iour *IOURing) (err error) {
defer func() {
if err != nil {
munmapIOURing(iour)
}
}()
iour.sq = new(SubmissionQueue)
iour.cq = new(CompletionQueue)
if err = mmapSQ(iour); err != nil {
return err
}
if (iour.params.Features & iouring_syscall.IORING_FEAT_SINGLE_MMAP) != 0 {
iour.cq.ptr = iour.sq.ptr
}
if err = mmapCQ(iour); err != nil {
return err
}
if err = mmapSQEs(iour); err != nil {
return err
}
return nil
}
io_uring 的功能
这里简单介绍一下 提供的一些功能,以及在 Go 中如何去使用
顺序执行
设置 的 标记,这样只有当该 之前所有的 都完成后,才会执行该 ,而后续的 也会在该 完成后才会执行
iour := iouring.New(8, WithDrain())
针对单一请求设置 ,保证请求会在之前所有的请求都完成才会执行,而后续的请求也都会在该请求完成之后才会开始执行
request, err := iour.SubmitRequest(iouring.Read(fd, buf).WithDrain(), nil)
链式请求
提供了一组请求的链式/顺序执行的方法,可以让链中的请求会在上一个请求执行完成后才会执行,而且不影响链外其他请求的并发执行
设置 的 标记后,下一个 和当前 自动组成新链或者当前 的链中,链中没有设置 的 便是链尾
如果链中的有请求执行失败了,那么链中后续的 都会被取消( 为 )
还提供了以外一种设置链式请求的方式,设置 为 flag,这种方式会让链中的请求忽略之前请求的结果,也就是说即使链中之前的请求执行失败了,也不会取消链中后边的请求
preps := []iouring.PrepRequest{ iouring.Read(fd1, buf), iouring.Write(fd2, buf) }
requests, err := iour.SubmitLinkRequest(preps, nil)
请求取消
当请求提交后,还可以提交取消请求的请求,这样如果请求还没有执行或者请求的操作可以被中断(比如 socket IO),那么就可以被异步的取消,而对于已经启动的磁盘IO请求则无法取消
在 中,提交请求后会返回一个 对象,通过 方法就可以取消请求
request, err := iour.SubmitRequest(iouring.Timeout(1 * time.Second), nil)
cancelRequest, err := request.Cancel()
方法会返回一个 cancelRequest 对象,表示提交的取消请求
可以监听 的执行是否失败,并且失败原因是否为
<- request.Done()
if err := request.Err(); if err != nil {
if err == iouring.ErrRequestCanceled {
fmt.Println("request is canceled")
}
}
也可去监听 cancelRequest 的执行结果,如果 方法返回 ,便是可能成功取消了,注意是可能取消了,因为一些操作是无法被取消的
<- cancelRequest.Done()
if err := cancelRequest.Err(); if err != nil{
if err == iouring.ErrRequestNotFound(){
fmt.Println("canceled request is not found")
}
// do something
}
定时和请求完成计数
提供了 请求,可以用来提交超时请求
超时请求可以分为三种:
相对时间超时
绝对时间超时
对请求完成计数,到达指定的完成事件数量后,超时请求就会完成
now := time.Now()
request, err := iouring.SubmitRequest(iouring.Timeout(2 * time.Second), nil)
if err != nil {
panic(err)
}
<- request.Done()
fmt.Println(time.Now().Sub(now))
根据 提供的超时请求,可以实现系统级的异步定时器
请求超时
通过 将一个请求和 请求链接在一起,那么就可以做到请求的超时控制
preps := iouring.Read(fd, buf).WithTimeout()
注册文件
的一些 IO 操作需要提供文件描述符,而频繁的将文件和内核之间进行映射也会导致一定的性能损耗,所以可以使用 的 接口来提前注册文件描述符
也提供了文件描述符的注册功能,而且对于已经注册的文件描述符会自动使用
func (iour *IOURing) RegisterFile(file *os.File) error
func (iour *IOURing) RegisterFiles(files []*os.File) error
func (iour *IOURing) UnregisterFile(file *os.File) error
func (iour *IOURing) UnregisterFiles(files []*os.File) error
当 的文件描述符被关闭后,这些注册的文件会自动注销
注册文件描述符在 Go 中带来的并发问题
type fileRegister struct {
lock sync.Mutex
iouringFd int
fds []int32
sparseindexs map[int]int
registered bool
indexs sync.Map
}
需要注意由于存在对索引 的并发读写,所以使用 ,也就意味着,使用注册文件描述符,会带来一定的并发问题,经过简单的测试, 带来的性能损耗导致注册文件描述符带来的优势并没有那么大的
在 Go 中使用 的最大问题便是对 实例的竞争问题,而通过 Go 暴露给外部使用的并发机制,并不能让 带来的异步 IO 发挥最大的性能
将 融入 runtime 中,才是最终的解决方案
注册缓冲区
和注册文件描述符类似, 为了减少 IO 请求中缓冲区的映射,同样可以使用 来注册缓冲区
如果要在请求中使用缓冲区的话,需要使用 或者 请求
内核侧请求队列轮询
将请求放到的环形缓冲区后,需要调用 来通知内核有请求需要处理
io_uring 为了进一步减少系统调用,可以在 是设置 的 flags,内核就会创建一个轮询请求队列的线程
可以通过 命令查看用来轮询的内核线程
ps --ppid 2 | grep io_uring-sq
需要注意在 5.10 之前的版本,需要使用特权用户来执行,而 5.10 以后只需 权限即可
并且 5.10 之前,SQPoll 需要配合注册的文件描述符一起使用,而 5.10 以后则不需要,可以通过查看内核填充的 是否设置了
// iouring-go/iouring.go
func (iour *IOURing) doRequest(sqe *iouring_syscall.SubmissionQueueEntry, request PrepRequest, ch chan<- Result) (*UserData, error) {
// ...
if sqe.Fd() >= 0 {
if index, ok := iour.fileRegister.GetFileIndex(int32(sqe.Fd())); ok {
sqe.SetFdIndex(int32(index))
} else if iour.Flags&iouring_syscall.IORING_SETUP_SQPOLL != 0 &&
iour.Features&iouring_syscall.IORING_FEAT_SQPOLL_NONFIXED == 0 {
return nil, ErrUnregisteredFile
}
}
// ...
}
iouring-go 同样提供了开启 SQPoll 的 以及设置与 SQPoll 内核线程的相关配置 和
iour, err := iouring.New(8, iouring.WithSQPoll())
但是在 Go 简单的设置 并不能正常的工作,可能是由于 Go 的 GMP 模型导致的一些问题。暂时还在思考解决方案
注册 eventfd,利用 epoll
通过 可以将 注册到 io_uring 实例中,然后将 加入到 中,如果当 io_uring 中有完成事件时,便会通知
type IOURing struct {
eventfd int
cqeSign chan struct{}
// ...
}
func New() (*IOURing, error) {
// ....
if err := iour.registerEventfd(); err != nil {
return nil, err
}
if err := registerIOURing(iour); err != nil {
return nil, err
}
// ...
}
func (iour *IOURing) registerEventfd() error {
eventfd, err := unix.Eventfd(0, unix.EFD_NONBLOCK|unix.FD_CLOEXEC)
if err != nil {
return os.NewSyscallError("eventfd", err)
}
iour.eventfd = eventfd
return iouring_syscall.IOURingRegister(
iour.fd,
iouring_syscall.IOURING_REGISTER_EVENTFD,
unsafe.Pointer(&iour.eventfd), 1,
)
}
func registerIOURing(iour *IOURing) error {
if err := initpoller(); err != nil {
return err
}
if err := unix.EpollCtl(poller.fd, unix.EPOLL_CTL_ADD, iour.eventfd,
&unix.EpollEvent{Fd: int32(iour.eventfd), Events: unix.EPOLLIN | unix.EPOLLET},
); err != nil {
return os.NewSyscallError("epoll_ctl_add", err)
}
poller.Lock()
poller.iours[iour.eventfd] = iour
poller.Unlock()
return nil
}
会调用 EpollWait 等待中有完成事件,并通知相应的 IOURing 对象
// iouring-go/iouring.go
func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
var tryPeeks int
for {
if cqe = iour.cq.peek(); cqe != nil {
iour.cq.advance(1)
return
}
if !wait && !iour.sq.cqOverflow() {
err = syscall.EAGAIN
return
}
if iour.sq.cqOverflow() {
_, err = iouring_syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
if err != nil {
return
}
continue
}
if tryPeeks++; tryPeeks < 3 {
runtime.Gosched()
continue
}
select {
case <-iour.cqeSign:
case <-iour.closer:
return nil, ErrIOURingClosed
}
}
}
// iouring-go/poller.go
func (poller *iourPoller) run() {
for {
n, err := unix.EpollWait(poller.fd, poller.events, -1)
if err != nil {
continue
}
for i := 0; i < n; i++ {
fd := int(poller.events[i].Fd)
poller.Lock()
iour, ok := poller.iours[fd]
poller.Unlock()
if !ok {
continue
}
select {
case iour.cqeSign <- struct{}{}:
default:
}
}
poller.adjust()
}
}
保证数据不丢失
默认情况下, CQ 环的大小是 SQ 环的 两倍,为什么 SQ 环的大小会小于 CQ 环,是因为 SQ 环中的 sqe 一旦被内核发现,便会被内核消耗掉,也就意味着 sqe 的生命周期很短,而请求的完成事件都会放到 CQ 环中
我们也可以通过 或者 的 Option 里设置 CQ 环的大小
但是依然会存在 CQ 环溢出的情况,而内核会在内部存储溢出的时间,直到 CQ 环有空间容纳更多事件。
可以通过 io_uring_params->features 是否设置 IORING_FEAT_NODROP 来判断当前内核是否支持该功能
如果 CQ 环溢出,那么提交请求时可能会以 错误失败,需要重新提交
并且当 CQ 环中数据被消耗后,需要调用 来通知内核 CQ 环中有空余空间
func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
// ...
if iour.sq.cqOverflow() {
_, err := iour.syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
if err != nil{
return
}
continue
}
// ...
}
io_uring 与 Go —— iouring-go
竞争问题
对于 CQ 环的竞争是使用单一的 CQ 环消费 goroutine 来完成 的消费
func New(entries int, opts ...IOURingOption) (iour *IOURing, err error) {
iour := &IOURing{...}
// ...
go iour.run()
return
}
func (iour *IOURing) run() {
for {
cqe, err := iour.getCQEvent(true)
// ...
}
}
SQ 环的解决方案有两种
第一种方式听起来使用 channel 更优雅一些,但是 channel 内部依然使用锁的方式以及额外的内存复制
另外最大的弊端就是将 的(
当多个 向 channel 发送的请求的顺序无法保证,这样链式请求就无法实现(除非对于链式请求再次加锁)
第二种方式,采用加锁的方式,保证了同一时间只有一个提交函数在处理 SQ 环,并且可以立即是否真正提交成功(
真正去解决这个问题的方式,估计可能只有 runtime 才能给出答案,为每一个 P 创建一个 io_uring 实例在 runtime 内部解决竞争问题,内部使用 eventfd 注册到 中来获取通知
io_uring 与 channel
对于 设计比较好的地方,我感觉便是对 channel 的利用,异步 IO 加上 channel,可以将异步在并发的程序中发挥出最大的作用
func (iour *IOURing) SubmitRequest(request PrepRequest, ch chan<- Result) (Request, error)
方法接收一个 channel,当请求完成后,会将结果发送到 channel 中,这样通过多个请求复用同一个 channel,程序便可以监听一组请求的完成情况
func (iour *IOURing) run() {
for {
cqe, err := iour.getCQEvent(true)
// ...
userData := iour.userData[cqe.UserData]
// ...
userData.request.complate(cqe)
if userData.resulter != nil {
userData.resulter <- userData.request
}
}
}
而 方法同样会返回一个 Request 接口对象,通过 Request 我们同样可以去查看请求的是否完成已经它的完成结果
type Request interface {
Result
Cancel() (Request, error)
Done() <-chan struct{}
GetRes() (int, error)
// Can Only be used in ResultResolver
SetResult(r0, r1 interface{}, err error) error
}
type Result interface {
Fd() int
Opcode() uint8
GetRequestBuffer() (b0, b1 []byte)
GetRequestBuffers() [][]byte
GetRequestInfo() interface{}
FreeRequestBuffer()
Err() error
ReturnValue0() interface{}
ReturnValue1() interface{}
ReturnFd() (int, error)
ReturnInt() (int, error)
}
利用 channel 便可以完成对异步 IO 的异步监听和同步监听
channel 带来的问题
当然使用 channel 又会带来其他的问题,比如 channel 满了以后,对 io_uring 完成队列的消费便会阻塞在向 channel 发送数据,阻塞时间过长也会导致 CQ 环溢出
比较好的解决方案是,在 channel 上抽象出一层 , 会对完成事件进行自动缓冲,当然这也会带来一定的代码复杂度,所以 便将 channel 阻塞的问题交给使用者,要求 channel 的消费端尽快消费掉数据
思考 io_uring 在 Go 中的发展
在 Linux 平台下使用了 ,而且 在使用上并没有竞争问题,当然如果要使用 来替代 来实现 的话并不是不可能,只是这样对于工作很好的 epoll 来说并没有什么必要,而且是否能够带来可观的性能收益也都是不确定的
在高并发的情况下,有限的 SQ 环和 CQ 环,对于请求数量大于完成事件的消费速度的情况,CQ 环的大量溢出带来对内核的压力以及新的请求提交带来的错误处理,都会提高真正利用 的难度
对于 SQ 环和 CQ 环的大小限制,也许需要通过 Pool 的方式来解决,初始化多个 io_uring 实例,当一个实例的 SQ 环满,那么就使用另外的实例来提交请求
而使用 Pool 又会增加一定的复杂度
的功能实际可以覆盖了 的,比如提交的阻塞 IO 请求便相当于 + ,另外 还提供了超时设置和请求的超时控制,相当于实现了系统级的定时器以及 的 deadline
但是 epoll 自身的优势,比如没有竞争问题,没有监听文件描述符的数量限制,都让 epoll 在实际的使用中更加好用,而这些问题对于 在本身设计上就会导致的问题
比如竞争问题,使用环形缓冲区可以协调应用和内核对请求队列的访问,但是应用中多个线程或者 goroutine 就会引发对环形缓冲区的竞争问题
而请求数量的限制,那么就需要考虑到请求完成事件的溢出问题,内核不能无限制的去保存溢出的完成事件,当然这个问题通过应用中在 实例上抽象出 的方式来解决
使用 来实现异步网络框架,对已有的网络模型会是非常大的冲击,怎么去使用 来发挥最大的能力依然处于探索阶段,毕竟 是一个出现才 1 年的技术
而对于普通的磁盘 IO 来说, 还是有很大的发挥空间的,利用 Go 中已有的并发机制,结合具体的性能评估,对于文件服务器来说,也许会带来极大的提升
另外一个问题便是,对于 5.1 引入,5.6 开始功能变得丰富成熟的 来说,现在大量的环境处于 3.X,4.X,甚至 2.X , 仍然需要等待时机才能去发挥它真正的作用,而这段时间便是留给我们去探讨怎么让 更好用