在日常开发中时常会遇到批量任务: 批量创建, 批量查询, 批量xxx, 如果按照串行编码, 那么任务队列每次都只有一个任务被执行,十分缓慢,我们考虑使用以下一个简易Worker线程工厂(个人简易实现有错误请指出)

Controller层:

注册下这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package controller  

import (
"awesomeProject/server"
"fmt")

func BatchJob() {
req := &server.BatchJobReq{
IdList: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55},
}

resp, err := server.BatchCreate(req)
if err != nil {
fmt.Println("error")
return
}

fmt.Println("\n\nOver, resposne is", resp)
}

Dao层:

任务详细数据结构体

1
2
3
4
5
package dao  

type BatchJob struct {
Id int
}

service层

1. 处理方式一: 无顺序,那在取数据的时候用限制数量个worker疯狂操作数据

对要处理的数据, 每个数据分配一个Gorountine去处理他, 使用限流器sem 设置最大的并发写入Gorountine数量,这样异步处理批量任务速度极大加倍 (
假设有N 个任务, 每个任务 1 秒,则一共需要 N 秒, 分配多个协程处理可以做到异步处理,假设分配最大协程数量是20,理论上只需要 1 + (N - 1) / 20秒, 极大加快了操作速度
)。

在并发处理读入数据后将所有数据拼接后返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package server  

import (
"fmt"
"strconv"
"sync"
"time"
)

type BatchJobReq struct {
IdList []int
}

type BatchJobResp struct {
message []string
}

func BatchCreate(BJ *BatchJobReq) (BatchJobResp, error) {
var wg sync.WaitGroup
resultCh := make(chan string, len(BJ.IdList))
sem := make(chan struct{}, 16) // 限流器

for _, id := range BJ.IdList {
wg.Add(1)
sem <- struct{}{}

go func(id int) {
defer wg.Done()
defer func() {
<-sem
}()

fmt.Println(id, "is Running")
time.Sleep(time.Second)

fmt.Println(id, "is finished")
resultCh <- strconv.Itoa(id) + " success"
}(id)
}

go func() {
wg.Wait()
close(resultCh)
}()

var result BatchJobResp
for msg := range resultCh {
result.message = append(result.message, msg)
}

return result, nil
}

2. worker pool + 有序操作

方法一中速度虽然快,但是操作是无序的,不够稳定,我们考虑牺牲一小点性能(实际上几乎没差距),使用worker pool:, 创建措大数量的工作者,之后分配任务让worker异步执行,同时加强了于上下游交互错误的即使处理能力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package server

import (
"context"
"errors"
"fmt"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
)

type BatchJobReq struct {
IdList []int
}

type BatchJobResp struct {
Message []string
}

// InsertFunc 是用户传入的实际执行插入的函数(可替换为 DB Exec 等)。
// 返回 nil 表示成功,非 nil 表示失败(会触发重试逻辑)。
type InsertFunc func(ctx context.Context, id int) error

// BatchCreateOptimized:
// - ctx: 支持超时/取消
// - req: 待插入 ID 列表
// - workers: 并发 worker 数量(建议与 DB 连接池大小相匹配)
// - maxRetries: 每个任务的最大重试次数
// - insert: 实际插入函数(同步调用)
// 返回:按输入顺序的结果切片或第一个不可恢复错误。
func BatchCreateOptimized(
ctx context.Context,
req *BatchJobReq,
workers int,
maxRetries int,
insert InsertFunc,
) (BatchJobResp, error) {
n := len(req.IdList)
if n == 0 {
return BatchJobResp{Message: []string{}}, nil
}
if workers <= 0 {
workers = 1
}
if workers > n {
workers = n
}
if maxRetries < 0 {
maxRetries = 0
}

type job struct {
idx int
id int
}

jobs := make(chan job, n)
results := make([]string, n) // 按索引写回(每个索引只写一次,无需锁)
var wg sync.WaitGroup

// 用于返回第一个严重错误,并取消所有 worker
errCh := make(chan error, 1)
// 用于检测全部完成
finish := make(chan struct{})

var doneCount int32

// worker
for w := 0; w < workers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for jb := range jobs {
// 优先检查外部取消(例如上游 ctx.Done)
select {
case <-ctx.Done():
return
default:
}

var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
// 如果外部已取消,提前退出
select {
case <-ctx.Done():
return
default:
}

// 执行插入(由调用方提供真实 DB 操作)
if err := insert(ctx, jb.id); err != nil {
lastErr = err
// 简单判断是否可重试:这里全部重试,实际可根据错误类型判断
if attempt < maxRetries {
// 指数退避(基础 100ms)
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
time.Sleep(backoff)
continue
}
// 超过重试次数,记录并向 errCh 报错(非阻塞)
select {
case errCh <- fmt.Errorf("id=%d idx=%d failed after %d attempts: %w", jb.id, attempt+1, lastErr):
default:
}
} else {
// 成功
results[jb.idx] = strconv.Itoa(jb.id) + " success"
atomic.AddInt32(&doneCount, 1)
break
}
}
// 如果最后一次仍然有错误,直接返回(让调用方决定是否要失败)
if lastErr != nil {
// 通知并退出 worker
return
}
}
}(w)
}

// 分发任务
go func() {
for i, id := range req.IdList {
select {
case <-ctx.Done():
break
default:
}
jobs <- job{idx: i, id: id}
}
close(jobs)
}()

// 等待所有 worker 完成或出现错误/取消
go func() {
wg.Wait()
close(finish)
}()

select {
case <-ctx.Done():
// 上游取消/超时
return BatchJobResp{Message: results}, ctx.Err()
case err := <-errCh:
// 遇到严重错误,返回
return BatchJobResp{Message: results}, err
case <-finish:
// 正常完成
return BatchJobResp{Message: results}, nil
}
}

实际运行时间 : Plan B 约等于 Plan A但是稍微小于(N = 1000)的时候, 此时用直接串行读取数据方式会慢如乌龟,尽量使用第二种方式