1. 引入

实现一个简单序列化器,把map[string]int 序列化:

1
2
3
4
5
6
7
8
9
10
11
12
func JsonMar1(m map[string]int) (string, error) {  
var buf bytes.Buffer
buf.Write([]byte(`{"data":{`))
for k, v := range m {
buf.Write([]byte(`"` + k + `":` + strconv.Itoa(v) + `,`))
}
if len(m) > 0 {
buf.Truncate(buf.Len() - 1)
}
buf.Write([]byte(`"}}`))
return buf.String(), nil
}

使用这个序列化器,如果在系统大量请求的时候,例如

1
2
3
4
5
6
7
8
9
10
11
12
func main() {  
var wg sync.WaitGroup
wg.Add(10000)
for i := 0; i <= 10000; i++ {
_, err := JsonMar1(map[string]int{"a": i})
if err != nil {
wg.Done()
fmt.Printf("i: %d, err: %v\n", i, err)
}
}
wg.Wait()
}

会出现大量地创建bytes.buffer, GC和向堆请求的内存会大大增加,而实际上这个buffer是可以复用的,如果有一个缓存可以存储这个buffer,又没有很大的额外开销(例如redis或者cache),那是极好的,我们可以尝试自己手动写一个专门”为序列化map[string]int“的缓存器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main  

import (
"bytes"
"fmt" "runtime" "sync")

type cache struct {
mu sync.Mutex
pools []*bytes.Buffer
}

func (c *cache) Get() *bytes.Buffer {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.pools) == 0 {
buf := make([]byte, 1024)
return bytes.NewBuffer(buf)
} buf := c.pools[len(c.pools)-1]
c.pools[len(c.pools)-1].Reset()
c.pools = c.pools[:len(c.pools)-1]
return buf
}
func (c *cache) Put(buf *bytes.Buffer) {
c.mu.Lock()
defer c.mu.Unlock()
buf.Reset()
c.pools = append(c.pools, buf)
}

var c cache

func main() {
runtime.GC()

var before runtime.MemStats
runtime.ReadMemStats(&before)

var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//buf := c.Get()
//buf.WriteString("hello world") //c.Put(buf) var buf bytes.Buffer
buf.WriteString("hello world")
}() } wg.Wait()

runtime.GC()
var after runtime.MemStats
runtime.ReadMemStats(&after)

fmt.Printf("Mallocs 增量: %d -> %d (增量: %d)\n", before.Mallocs, after.Mallocs, after.Mallocs-before.Mallocs)
fmt.Printf("TotalAlloc 增量: %d -> %d (增量: %d bytes)\n", before.TotalAlloc, after.TotalAlloc, after.TotalAlloc-before.TotalAlloc)
fmt.Printf("NumGC 增量: %d -> %d (增量: %d)\n", before.NumGC, after.NumGC, after.NumGC-before.NumGC)
}

这样在每次请求的时候都不必新向堆请求内存,直接从现有的取即可,会极大降低内存消耗:

1
2
3
Mallocs 增量: 272 -> 20696  (增量: 20424)
TotalAlloc 增量: 130464 -> 1132112 (增量: 1001648 bytes)
NumGC 增量: 1 -> 2 (增量: 1)

做下对比:每次都创建新的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main  

import (
"bytes"
"fmt"
"runtime"
"sync"
)
func main() {
runtime.GC()

var before runtime.MemStats
runtime.ReadMemStats(&before)

var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()

// 每次都新创建
buf := make([]byte, 1024)
bf := bytes.NewBuffer(buf)
bf.WriteString("hello world")
}()
}
wg.Wait()

runtime.GC()
var after runtime.MemStats
runtime.ReadMemStats(&after)

fmt.Printf("Mallocs 增量: %d -> %d (增量: %d)\n", before.Mallocs, after.Mallocs, after.Mallocs-before.Mallocs)
fmt.Printf("TotalAlloc 增量: %d -> %d (增量: %d bytes)\n", before.TotalAlloc, after.TotalAlloc, after.TotalAlloc-before.TotalAlloc)
fmt.Printf("NumGC 增量: %d -> %d (增量: %d)\n", before.NumGC, after.NumGC, after.NumGC-before.NumGC)
}
1
2
3
Mallocs 增量: 272 -> 21974  (增量: 21702)
TotalAlloc 增量: 130464 -> 21392032 (增量: 21261568 bytes)
NumGC 增量: 1 -> 8 (增量: 7)

可以看到缓存后内存减少了一个数量级


sync.Pool

上面我们通过“插入一个本地缓存切片”实现了减少内存开销,但是这个缓存只能支持对于bytes.buffer操作, 有没有啥能对于所有对象都能实现本地缓存,类似上面实现的标准包呢?

Go的sync.Pool提供了这一方法:
基础使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main  

import (
"bytes"
"sync"
)

var bufPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}

func main() {
bf := bufPool.Get().(*bytes.Buffer)
bf.WriteString("hello world")
bufPool.Put(bf)
}

这样实现了类似引入中我们自己写的pool,buf主要有Get和put两个使用,类似Java的ThreadLocal, 但是又不大像,且他自然支持线程安全,我们可以创建一个对象实例池来存储需要经常用到的项目对象

板子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
type BufferPool struct {  
mu sync.RWMutex
pools map[string]*sync.Pool
}

func NewBufferPool() *BufferPool {
return &BufferPool{
pools: make(map[string]*sync.Pool),
}
}

// Register 注册一个名为 name 的 pool,newFunc 用于创建新对象(如: func() any { return &bytes.Buffer{} })
func (c *BufferPool) Register(name string, newFunc func() any) {
c.mu.Lock()
defer c.mu.Unlock()
c.pools[name] = &sync.Pool{New: newFunc}
}

// Get 从指定 name 的 pool 取出对象;若不存在则返回 nil(或你可以改为 panic)
func (c *BufferPool) Get(name string) any {
c.mu.RLock()
p, ok := c.pools[name]
c.mu.RUnlock()
// 如果没有注册或者本地没有,返回空
if !ok || p == nil {
return nil
}
return p.Get()
}

// Put 将对象放回指定 pool(不会对对象做额外 Reset, 保持属性)
func (c *BufferPool) Put(name string, v any) {
c.mu.RLock()
p := c.pools[name]
c.mu.RUnlock()

// 如果没有注册,直接忽略
if p == nil {
return
}
p.Put(v)
}

// PutBuffer 是对 bytes.Buffer 的安全封装:Reset 后再放回
func (c *BufferPool) PutBuffer(name string, b *bytes.Buffer) {
if b == nil {
return
}
b.Reset()
c.Put(name, b)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {  
bufPool := NewBufferPool()
bufPool.Register("buf", func() any {
return &bytes.Buffer{}
})
// 取出 buffer,使用后用 PutBuffer 放回(会 Reset)
v := bufPool.Get("buf")
if v == nil {
fmt.Println("pool not found or nil")
return
}
b := v.(*bytes.Buffer)
b.WriteString("hello world")
fmt.Println(b.String())

// Reset 并放回
bufPool.PutBuffer("buf", b)
}

性能对比

对比下使用和不使用pool的GC情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main  

import (
"fmt"
"runtime"
"strconv"
"sync"
"time"
)

const (
goroutines = 50 // 并发 goroutine 数
iters = 20000 // 每个 goroutine 的迭代次数(总操作 = goroutines * iters)
bufCap = 1024 // 每个缓冲的容量
)

func main() {
fmt.Printf("总操作: %d (goroutines=%d * iters=%d)\n\n", goroutines*iters, goroutines, iters)

runtime.GC()

fmt.Println("=== 使用 sync.Pool ===")
pool := &sync.Pool{
New: func() interface{} {
// 预分配一个容量为 bufCap 的字节切片
return make([]byte, 0, bufCap)
},
}
runAndMeasure(pool)

fmt.Println("\n=== 不使用 sync.Pool(每次 make) ===")
runAndMeasure(nil)
}

// runAndMeasure: 如果 pool 非 nil 则使用 pool;否则每次都 make 新的 []byte
func runAndMeasure(pool *sync.Pool) {
// 触发 GC 并读取 baseline memstats
runtime.GC()
var before runtime.MemStats
runtime.ReadMemStats(&before)

start := time.Now()
var wg sync.WaitGroup
wg.Add(goroutines)

for g := 0; g < goroutines; g++ {
go func(gid int) {
defer wg.Done()
// 模拟每次需要一个临时缓冲并写入一些文本
for i := 0; i < iters; i++ {
var buf []byte
if pool != nil {
// 从池里拿一个
v := pool.Get()
buf = v.([]byte)
// 重置长度为 0(保留容量)
buf = buf[:0]
} else {
// 每次重新分配
buf = make([]byte, 0, bufCap)
}
// 模拟写入一些内容到 buf(使用 copy 避免从 string -> []byte 的额外分配)
s := "hello-" + strconv.Itoa(gid) + "-" + strconv.Itoa(i)
n := copy(buf, s)
buf = buf[:n]

// (这里可以做更多工作,例如格式化、计算哈希等)
_ = len(buf) // 使用一下,防止被优化掉

// 使用完放回 pool(如果有)
if pool != nil {
// 注意:放回前不要让其他 goroutine 继续使用这个 buf
pool.Put(buf)
}
}
}(g)
}
wg.Wait()
elapsed := time.Since(start)

runtime.GC()
var after runtime.MemStats
runtime.ReadMemStats(&after)

fmt.Printf("耗时: %v\n", elapsed)
fmt.Printf("Mallocs 增量: %d -> %d (增量: %d)\n", before.Mallocs, after.Mallocs, after.Mallocs-before.Mallocs)
fmt.Printf("TotalAlloc 增量: %d -> %d (增量: %d bytes)\n", before.TotalAlloc, after.TotalAlloc, after.TotalAlloc-before.TotalAlloc)
fmt.Printf("NumGC 增量: %d -> %d (增量: %d)\n", before.NumGC, after.NumGC, after.NumGC-before.NumGC)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
总操作: 1000000 (goroutines=50 * iters=20000)

=== 使用 sync.Pool ===
耗时: 540.4799ms
Mallocs 增量: 283 -> 1995584 (增量: 1995301)
TotalAlloc 增量: 133216 -> 28840120 (增量: 28706904 bytes)
NumGC 增量: 2 -> 6 (增量: 4)

=== 不使用 sync.Pool(每次 make) ===
耗时: 223.6563ms
Mallocs 增量: 1995600 -> 3991085 (增量: 1995485)
TotalAlloc 增量: 28841752 -> 1057526712 (增量: 1028684960 bytes)
NumGC 增量: 7 -> 150 (增量: 143)

可见极大降低GC和内存


使用注意

要用在短生命周期、高频次可复用对象

使用pool时,不要存生命周期太长的对象,容易造成GC泄露,举个例子,存储一个TCP实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
"crypto/tls"
"fmt"
"net"
"runtime"
"sync"
"time"
)

type Pool struct {
p *sync.Pool
}

func main() {
pool := &Pool{
p: &sync.Pool{},
}

// 建立 TCP 连接
conn, err := tls.Dial("tcp", "www.baidu.com:443", nil)
if err != nil {
panic(err)
}
fmt.Println("建立连接:", conn.LocalAddr())

// 放入 sync.Pool
pool.p.Put(conn)

// 疯狂制造内存分配,诱发 GC
go func() {
for {
_ = make([]byte, 1024*1024) // 1MB
time.Sleep(10 * time.Millisecond)
}
}()

// 主动触发 GC(更稳定复现)
for i := 0; i < 5; i++ {
runtime.GC()
time.Sleep(200 * time.Millisecond)
}

// 尝试从 pool 中取回连接
v := pool.p.Get()
if v == nil {
fmt.Println("TCP 连接被 GC 吃掉了(pool.Get() == nil)")
return
}

c := v.(net.Conn)
fmt.Println("取回连接:", c.LocalAddr())

// 再试一次读写
_, err = c.Write([]byte("ping\n"))
fmt.Println("write err:", err)
}

实际上在操作一半的时候TCP链接已经断了,而GC还在不断继续且TCP不是正常断链,资源未被回收造成内存泄露
且实际业务排查不好找

sunc.Pool本质是解决GC分配问题,而TCP链接是长时间的,稀疏的链接,并不涉及大量的GC分配,所以存储TCP也没有意义,应该专门的网络连接池管理

对象内部是有状态的

  • 带缓冲的 channel

  • 包含 mutex / condition 的对象

  • 自定义缓存结构(map/slice)但没有 Reset

这些本身是有一定状态的对象,放回pool后可能被随时回收,造成不可预料结果, 如channel或者内部状态呗删除,导致数据丢失,因为pool内会自动回收对象