Think Abount 《A Million WebSockets and Go》

Think Abount 《A Million WebSockets and Go》

背景

我之前写过一个长链服务,使用的是ws协议。单机的连接数(4c4g)大约在3.2W左右。然后当时看到了《A Million WebSockets and Go》 这边文字。文章作者单机建立了 300W的连接,我当时感到十分诧异,因为但从read、writer、writerChannel、readBuffer、writerBuffer等这些资源的创建就需要至少50-80G的内存。带着疑问我读完了作者的文章,所以我想分享一下。

分析

首先我列举一下 一个 「ws」连接需要哪些资源

  1. read writer 两个gorutine,也许还会有用来writer的channel

    1. code is here

    2. 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      // Packet represents application level data.
      type Packet struct {
      ...
      }
      // Channel wraps user connection.
      type Channel struct {
      conn net.Conn // WebSocket connection.
      send chan Packet // Outgoing packets queue.
      }
      func NewChannel(conn net.Conn) *Channel {
      c := &Channel{
      conn: conn,
      send: make(chan Packet, N),
      }
      go c.reader()
      go c.writer()
      return c
      }
    3. 我们按照一个gorutine 2KB的方式去计算 300W连接至少是12GB

  2. read writer 两个gorutine 需要有buffer对象来存储度读写内容

    1. code is here

    2. 1
      2
      3
      4
      5
      6
      7
      8
      9
      func (c *Channel) reader() {
      // We make a buffered read to reduce read syscalls.
      buf := bufio.NewReader(c.conn)
      for {
      pkt, _ := readPacket(buf)
      c.handle(pkt)
      }
      }
    3. 我们按照一个对象4KB的方式去计算 300W连接至少是24GB

    4. 以及http连接Upgrade成webSocket带来的内存分配「hiJack」方式剥离出conn连接

      1. code is here

      2. 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        // Upgrade upgrades the HTTP server connection to the WebSocket protocol.
        //
        // The responseHeader is included in the response to the client's upgrade
        // request. Use the responseHeader to specify cookies (Set-Cookie) and the
        // application negotiated subprotocol (Sec-Websocket-Protocol).
        //
        // If the upgrade fails, then Upgrade replies to the client with an HTTP error
        // response.
        func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {
        /*other code*/
        var (
        netConn net.Conn
        err error
        )
        h, ok := w.(http.Hijacker)
        if !ok {
        return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")
        }
        var brw *bufio.ReadWriter
        netConn, brw, err = h.Hijack()
        if err != nil {
        return u.returnError(w, r, http.StatusInternalServerError, err.Error())
        }
        if brw.Reader.Buffered() > 0 {
        netConn.Close()
        return nil, errors.New("websocket: client sent data before handshake is complete")
        }
        c := newConnBRW(netConn, true, u.ReadBufferSize, u.WriteBufferSize, brw)
        /*other code*/
        return c, nil
        }
      3. 我们按照一个对象 4KB的方式去计算 300W连接至少是24GB

这就是我们一般来说建立连接带来的开销(还是在什么都没有做的情况下)

那么作者的优化思路是什么呢

1. 首先针对上述的1.2两点 主要思路就是netpoll(epoll、kqueue)

   1. code is here

   2. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ch := NewChannel(conn)
// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go Receive(ch)
})
// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
3. 但是简单的「用时候新建gorutine来执行操作」也会有一些问题,比如同一时刻大量连接发送或者接受消息会导致瞬间gorutine的飙升 4.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package gopool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
5.
1
2
3
4
5
6
7
8
9
pool := gopool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
Receive(ch)
})
})
6. 使用固定大小的gorutine Pool来解决重复利用gorutine的问题。且可以利用池的大小来做到控制连接数的目的 2. 针对3 使用的Upgrade的时候是重用缓冲区 1. code is here 2.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, brw *bufio.ReadWriter) *Conn {
/*some code*/
if readBufferSize == 0 && brw != nil && brw.Reader != nil {
// Reuse the supplied bufio.Reader if the buffer has a useful size.
// This code assumes that peek on a reader returns
// bufio.Reader.buf[:0].
brw.Reader.Reset(conn)
if p, err := brw.Reader.Peek(0); err == nil && cap(p) >= 256 {
br = brw.Reader
}
}
/*some code*/
if writeBufferSize == 0 && brw != nil && brw.Writer != nil {
// Use the bufio.Writer's buffer if the buffer has a useful size. This
// code assumes that bufio.Writer.buf[:1] is passed to the
// bufio.Writer's underlying writer.
var wh writeHook
brw.Writer.Reset(&wh)
brw.Writer.WriteByte(0)
brw.Flush()
if cap(wh.p) >= maxFrameHeaderSize+256 {
writeBuf = wh.p[:cap(wh.p)]
}
}
return c
}
3. 所以理论上只需要一次的内存分配 ## 摘要 让我们来构造我所介绍的优化。 - 带有缓冲区的读取 goroutine 昂贵。**解决方案**:netpoll(epoll,kqueue); 重用缓冲区。 - 内部带有缓冲区的写 goroutine 很昂贵。**解决方案**:必要时启动 goroutine;重用缓冲区。 - 随着连接的风暴,netpoll 将无法正常工作。**解决方案**:重复使用 goroutines 并限制其数量。 - `net/http`不是处理升级到 WebSocket 的最快方法。**解决方案**:在裸 TCP 连接上使用零拷贝升级。 ## 推荐读物 1. https://github.com/eranyanay/1m-go-websockets 2. https://github.com/golang/go/issues/15735