1. 引入 实现一个简单序列化器,把map[string]int 序列化:
1 2 3 4 5 6 7 8 9 10 11 12 func JsonMar1 (m map [string ]int ) (string , error ) { var buf bytes.Buffer buf.Write([]byte (`{"data":{` )) for k, v := range m { buf.Write([]byte (`"` + k + `":` + strconv.Itoa(v) + `,` )) } if len (m) > 0 { buf.Truncate(buf.Len() - 1 ) } buf.Write([]byte (`"}}` )) return buf.String(), nil }
使用这个序列化器,如果在系统大量请求的时候,例如
1 2 3 4 5 6 7 8 9 10 11 12 func main () { var wg sync.WaitGroup wg.Add(10000 ) for i := 0 ; i <= 10000 ; i++ { _, err := JsonMar1(map [string ]int {"a" : i}) if err != nil { wg.Done() fmt.Printf("i: %d, err: %v\n" , i, err) } } wg.Wait() }
会出现大量地创建bytes.buffer, GC和向堆请求的内存会大大增加,而实际上这个buffer是可以复用的,如果有一个缓存可以存储这个buffer,又没有很大的额外开销(例如redis或者cache),那是极好的,我们可以尝试自己手动写一个专门”为序列化map[string]int“的缓存器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package main import ( "bytes" "fmt" "runtime" "sync" ) type cache struct { mu sync.Mutex pools []*bytes.Buffer } func (c *cache) Get() *bytes.Buffer { c.mu.Lock() defer c.mu.Unlock() if len (c.pools) == 0 { buf := make ([]byte , 1024 ) return bytes.NewBuffer(buf) } buf := c.pools[len (c.pools)-1 ] c.pools[len (c.pools)-1 ].Reset() c.pools = c.pools[:len (c.pools)-1 ] return buf } func (c *cache) Put(buf *bytes.Buffer) { c.mu.Lock() defer c.mu.Unlock() buf.Reset() c.pools = append (c.pools, buf) } var c cache func main () { runtime.GC() var before runtime.MemStats runtime.ReadMemStats(&before) var wg sync.WaitGroup for i := 0 ; i < 10000 ; i++ { wg.Add(1 ) go func () { defer wg.Done() buf.WriteString("hello world" ) }() } wg.Wait() runtime.GC() var after runtime.MemStats runtime.ReadMemStats(&after) fmt.Printf("Mallocs 增量: %d -> %d (增量: %d)\n" , before.Mallocs, after.Mallocs, after.Mallocs-before.Mallocs) fmt.Printf("TotalAlloc 增量: %d -> %d (增量: %d bytes)\n" , before.TotalAlloc, after.TotalAlloc, after.TotalAlloc-before.TotalAlloc) fmt.Printf("NumGC 增量: %d -> %d (增量: %d)\n" , before.NumGC, after.NumGC, after.NumGC-before.NumGC) }
这样在每次请求的时候都不必新向堆请求内存,直接从现有的取即可,会极大降低内存消耗:1 2 3 Mallocs 增量: 272 -> 20696 (增量: 20424) TotalAlloc 增量: 130464 -> 1132112 (增量: 1001648 bytes) NumGC 增量: 1 -> 2 (增量: 1)
做下对比:每次都创建新的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package main import ( "bytes" "fmt" "runtime" "sync" ) func main () { runtime.GC() var before runtime.MemStats runtime.ReadMemStats(&before) var wg sync.WaitGroup for i := 0 ; i < 10000 ; i++ { wg.Add(1 ) go func () { defer wg.Done() buf := make ([]byte , 1024 ) bf := bytes.NewBuffer(buf) bf.WriteString("hello world" ) }() } wg.Wait() runtime.GC() var after runtime.MemStats runtime.ReadMemStats(&after) fmt.Printf("Mallocs 增量: %d -> %d (增量: %d)\n" , before.Mallocs, after.Mallocs, after.Mallocs-before.Mallocs) fmt.Printf("TotalAlloc 增量: %d -> %d (增量: %d bytes)\n" , before.TotalAlloc, after.TotalAlloc, after.TotalAlloc-before.TotalAlloc) fmt.Printf("NumGC 增量: %d -> %d (增量: %d)\n" , before.NumGC, after.NumGC, after.NumGC-before.NumGC) }
1 2 3 Mallocs 增量: 272 -> 21974 (增量: 21702) TotalAlloc 增量: 130464 -> 21392032 (增量: 21261568 bytes) NumGC 增量: 1 -> 8 (增量: 7)
可以看到缓存后内存减少了一个数量级
sync.Pool 上面我们通过“插入一个本地缓存切片”实现了减少内存开销,但是这个缓存只能支持对于bytes.buffer操作, 有没有啥能对于所有对象都能实现本地缓存,类似上面实现的标准包呢?
Go的sync.Pool提供了这一方法: 基础使用:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package main import ( "bytes" "sync" ) var bufPool = sync.Pool{ New: func () any { return new (bytes.Buffer) }, } func main () { bf := bufPool.Get().(*bytes.Buffer) bf.WriteString("hello world" ) bufPool.Put(bf) }
这样实现了类似引入中我们自己写的pool,buf主要有Get和put两个使用,类似Java的ThreadLocal, 但是又不大像,且他自然支持线程安全,我们可以创建一个对象实例池来存储需要经常用到的项目对象
板子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 type BufferPool struct { mu sync.RWMutex pools map [string ]*sync.Pool } func NewBufferPool () *BufferPool { return &BufferPool{ pools: make (map [string ]*sync.Pool), } } func (c *BufferPool) Register(name string , newFunc func () any) { c.mu.Lock() defer c.mu.Unlock() c.pools[name] = &sync.Pool{New: newFunc} } func (c *BufferPool) Get(name string ) any { c.mu.RLock() p, ok := c.pools[name] c.mu.RUnlock() if !ok || p == nil { return nil } return p.Get() } func (c *BufferPool) Put(name string , v any) { c.mu.RLock() p := c.pools[name] c.mu.RUnlock() if p == nil { return } p.Put(v) } func (c *BufferPool) PutBuffer(name string , b *bytes.Buffer) { if b == nil { return } b.Reset() c.Put(name, b) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func main () { bufPool := NewBufferPool() bufPool.Register("buf" , func () any { return &bytes.Buffer{} }) v := bufPool.Get("buf" ) if v == nil { fmt.Println("pool not found or nil" ) return } b := v.(*bytes.Buffer) b.WriteString("hello world" ) fmt.Println(b.String()) bufPool.PutBuffer("buf" , b) }
性能对比 对比下使用和不使用pool的GC情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 package main import ( "fmt" "runtime" "strconv" "sync" "time" ) const ( goroutines = 50 iters = 20000 bufCap = 1024 ) func main () { fmt.Printf("总操作: %d (goroutines=%d * iters=%d)\n\n" , goroutines*iters, goroutines, iters) runtime.GC() fmt.Println("=== 使用 sync.Pool ===" ) pool := &sync.Pool{ New: func () interface {} { return make ([]byte , 0 , bufCap) }, } runAndMeasure(pool) fmt.Println("\n=== 不使用 sync.Pool(每次 make) ===" ) runAndMeasure(nil ) } func runAndMeasure (pool *sync.Pool) { runtime.GC() var before runtime.MemStats runtime.ReadMemStats(&before) start := time.Now() var wg sync.WaitGroup wg.Add(goroutines) for g := 0 ; g < goroutines; g++ { go func (gid int ) { defer wg.Done() for i := 0 ; i < iters; i++ { var buf []byte if pool != nil { v := pool.Get() buf = v.([]byte ) buf = buf[:0 ] } else { buf = make ([]byte , 0 , bufCap) } s := "hello-" + strconv.Itoa(gid) + "-" + strconv.Itoa(i) n := copy (buf, s) buf = buf[:n] _ = len (buf) if pool != nil { pool.Put(buf) } } }(g) } wg.Wait() elapsed := time.Since(start) runtime.GC() var after runtime.MemStats runtime.ReadMemStats(&after) fmt.Printf("耗时: %v\n" , elapsed) fmt.Printf("Mallocs 增量: %d -> %d (增量: %d)\n" , before.Mallocs, after.Mallocs, after.Mallocs-before.Mallocs) fmt.Printf("TotalAlloc 增量: %d -> %d (增量: %d bytes)\n" , before.TotalAlloc, after.TotalAlloc, after.TotalAlloc-before.TotalAlloc) fmt.Printf("NumGC 增量: %d -> %d (增量: %d)\n" , before.NumGC, after.NumGC, after.NumGC-before.NumGC) }
1 2 3 4 5 6 7 8 9 10 11 12 13 总操作: 1000000 (goroutines=50 * iters=20000) === 使用 sync.Pool === 耗时: 540.4799ms Mallocs 增量: 283 -> 1995584 (增量: 1995301) TotalAlloc 增量: 133216 -> 28840120 (增量: 28706904 bytes) NumGC 增量: 2 -> 6 (增量: 4) === 不使用 sync.Pool(每次 make) === 耗时: 223.6563ms Mallocs 增量: 1995600 -> 3991085 (增量: 1995485) TotalAlloc 增量: 28841752 -> 1057526712 (增量: 1028684960 bytes) NumGC 增量: 7 -> 150 (增量: 143)
可见极大降低GC和内存
使用注意 要用在短生命周期、高频次可复用对象 使用pool时,不要存生命周期太长的对象,容易造成GC泄露,举个例子,存储一个TCP实例1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package mainimport ( "crypto/tls" "fmt" "net" "runtime" "sync" "time" ) type Pool struct { p *sync.Pool } func main () { pool := &Pool{ p: &sync.Pool{}, } conn, err := tls.Dial("tcp" , "www.baidu.com:443" , nil ) if err != nil { panic (err) } fmt.Println("建立连接:" , conn.LocalAddr()) pool.p.Put(conn) go func () { for { _ = make ([]byte , 1024 *1024 ) time.Sleep(10 * time.Millisecond) } }() for i := 0 ; i < 5 ; i++ { runtime.GC() time.Sleep(200 * time.Millisecond) } v := pool.p.Get() if v == nil { fmt.Println("TCP 连接被 GC 吃掉了(pool.Get() == nil)" ) return } c := v.(net.Conn) fmt.Println("取回连接:" , c.LocalAddr()) _, err = c.Write([]byte ("ping\n" )) fmt.Println("write err:" , err) }
实际上在操作一半的时候TCP链接已经断了,而GC还在不断继续且TCP不是正常断链,资源未被回收造成内存泄露 且实际业务排查不好找
sunc.Pool本质是解决GC分配问题,而TCP链接是长时间的,稀疏的链接,并不涉及大量的GC分配,所以存储TCP也没有意义,应该专门的网络连接池管理
对象内部是有状态的 如
这些本身是有一定状态的对象,放回pool后可能被随时回收,造成不可预料结果, 如channel或者内部状态呗删除,导致数据丢失,因为pool内会自动回收对象