Think Abount 《A Million WebSockets and Go》
背景
我之前写过一个长链服务,使用的是ws协议。单机的连接数(4c4g)大约在3.2W左右。然后当时看到了《A Million WebSockets and Go》 这边文字。文章作者单机建立了 300W的连接,我当时感到十分诧异,因为但从read、writer、writerChannel、readBuffer、writerBuffer等这些资源的创建就需要至少50-80G的内存。带着疑问我读完了作者的文章,所以我想分享一下。
分析
首先我列举一下 一个 「ws」连接需要哪些资源
read writer 两个gorutine,也许还会有用来writer的channel
code is here
- 12345678910111213141516171819202122// Packet represents application level data.type Packet struct {...}// Channel wraps user connection.type Channel struct {conn net.Conn // WebSocket connection.send chan Packet // Outgoing packets queue.}func NewChannel(conn net.Conn) *Channel {c := &Channel{conn: conn,send: make(chan Packet, N),}go c.reader()go c.writer()return c}
我们按照一个gorutine 2KB的方式去计算 300W连接至少是12GB
read writer 两个gorutine 需要有buffer对象来存储度读写内容
code is here
- 123456789func (c *Channel) reader() {// We make a buffered read to reduce read syscalls.buf := bufio.NewReader(c.conn)for {pkt, _ := readPacket(buf)c.handle(pkt)}}
我们按照一个对象4KB的方式去计算 300W连接至少是24GB
以及http连接Upgrade成webSocket带来的内存分配「hiJack」方式剥离出conn连接
code is here
- 12345678910111213141516171819202122232425262728293031323334// Upgrade upgrades the HTTP server connection to the WebSocket protocol.//// The responseHeader is included in the response to the client's upgrade// request. Use the responseHeader to specify cookies (Set-Cookie) and the// application negotiated subprotocol (Sec-Websocket-Protocol).//// If the upgrade fails, then Upgrade replies to the client with an HTTP error// response.func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {/*other code*/var (netConn net.Connerr error)h, ok := w.(http.Hijacker)if !ok {return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")}var brw *bufio.ReadWriternetConn, brw, err = h.Hijack()if err != nil {return u.returnError(w, r, http.StatusInternalServerError, err.Error())}if brw.Reader.Buffered() > 0 {netConn.Close()return nil, errors.New("websocket: client sent data before handshake is complete")}c := newConnBRW(netConn, true, u.ReadBufferSize, u.WriteBufferSize, brw)/*other code*/return c, nil}
我们按照一个对象 4KB的方式去计算 300W连接至少是24GB
这就是我们一般来说建立连接带来的开销(还是在什么都没有做的情况下)
那么作者的优化思路是什么呢
1. 首先针对上述的1.2两点 主要思路就是netpoll(epoll、kqueue)
1. code is here
2. 123456789101112131415
ch := NewChannel(conn)// Make conn to be observed by netpoll instance.poller.Start(conn, netpoll.EventRead, func() { // We spawn goroutine here to prevent poller wait loop // to become locked during receiving packet from ch. go Receive(ch)})// Receive reads a packet from conn and handles it somehow.func (ch *Channel) Receive() { buf := bufio.NewReader(ch.conn) pkt := readPacket(buf) c.handle(pkt)}
3. 但是简单的「用时候新建gorutine来执行操作」也会有一些问题,比如同一时刻大量连接发送或者接受消息会导致瞬间gorutine的飙升
4. 123456789101112131415161718192021222324
package gopoolfunc New(size int) *Pool { return &Pool{ work: make(chan func()), sem: make(chan struct{}, size), }}func (p *Pool) Schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) }}func (p *Pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work }}
5. 123456789
pool := gopool.New(128)poller.Start(conn, netpoll.EventRead, func() { // We will block poller wait loop when // all pool workers are busy. pool.Schedule(func() { Receive(ch) })})
6. 使用固定大小的gorutine Pool来解决重复利用gorutine的问题。且可以利用池的大小来做到控制连接数的目的
2. 针对3 使用的Upgrade的时候是重用缓冲区
1. code is here
2. 1234567891011121314151617181920212223242526
func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, brw *bufio.ReadWriter) *Conn { /*some code*/ if readBufferSize == 0 && brw != nil && brw.Reader != nil { // Reuse the supplied bufio.Reader if the buffer has a useful size. // This code assumes that peek on a reader returns // bufio.Reader.buf[:0]. brw.Reader.Reset(conn) if p, err := brw.Reader.Peek(0); err == nil && cap(p) >= 256 { br = brw.Reader } } /*some code*/ if writeBufferSize == 0 && brw != nil && brw.Writer != nil { // Use the bufio.Writer's buffer if the buffer has a useful size. This // code assumes that bufio.Writer.buf[:1] is passed to the // bufio.Writer's underlying writer. var wh writeHook brw.Writer.Reset(&wh) brw.Writer.WriteByte(0) brw.Flush() if cap(wh.p) >= maxFrameHeaderSize+256 { writeBuf = wh.p[:cap(wh.p)] } } return c}
3. 所以理论上只需要一次的内存分配
## 摘要
让我们来构造我所介绍的优化。
- 带有缓冲区的读取 goroutine 昂贵。**解决方案**:netpoll(epoll,kqueue); 重用缓冲区。
- 内部带有缓冲区的写 goroutine 很昂贵。**解决方案**:必要时启动 goroutine;重用缓冲区。
- 随着连接的风暴,netpoll 将无法正常工作。**解决方案**:重复使用 goroutines 并限制其数量。
- `net/http`不是处理升级到 WebSocket 的最快方法。**解决方案**:在裸 TCP 连接上使用零拷贝升级。
## 推荐读物
1. https://github.com/eranyanay/1m-go-websockets
2. https://github.com/golang/go/issues/15735