在日常开发中时常会遇到批量任务: 批量创建, 批量查询, 批量xxx, 如果按照串行编码, 那么任务队列每次都只有一个任务被执行,十分缓慢,我们考虑使用以下一个简易Worker线程工厂(个人简易实现有错误请指出)
Controller层: 注册下这个方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package controller import ( "awesomeProject/server" "fmt" ) func BatchJob () { req := &server.BatchJobReq{ IdList: []int {1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 25 , 26 , 27 , 28 , 29 , 30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39 , 40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49 , 50 , 51 , 52 , 53 , 54 , 55 }, } resp, err := server.BatchCreate(req) if err != nil { fmt.Println("error" ) return } fmt.Println("\n\nOver, resposne is" , resp) }
Dao层: 任务详细数据结构体1 2 3 4 5 package dao type BatchJob struct { Id int }
service层 1. 处理方式一: 无顺序,那在取数据的时候用限制数量个worker疯狂操作数据 对要处理的数据, 每个数据分配一个Gorountine去处理他, 使用限流器sem 设置最大的并发写入Gorountine数量,这样异步处理批量任务速度极大加倍 ( 假设有N 个任务, 每个任务 1 秒,则一共需要 N 秒, 分配多个协程处理可以做到异步处理,假设分配最大协程数量是20,理论上只需要 1 + (N - 1) / 20秒, 极大加快了操作速度 )。
在并发处理读入数据后将所有数据拼接后返回结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package server import ( "fmt" "strconv" "sync" "time" ) type BatchJobReq struct { IdList []int } type BatchJobResp struct { message []string } func BatchCreate (BJ *BatchJobReq) (BatchJobResp, error ) { var wg sync.WaitGroup resultCh := make (chan string , len (BJ.IdList)) sem := make (chan struct {}, 16 ) for _, id := range BJ.IdList { wg.Add(1 ) sem <- struct {}{} go func (id int ) { defer wg.Done() defer func () { <-sem }() fmt.Println(id, "is Running" ) time.Sleep(time.Second) fmt.Println(id, "is finished" ) resultCh <- strconv.Itoa(id) + " success" }(id) } go func () { wg.Wait() close (resultCh) }() var result BatchJobResp for msg := range resultCh { result.message = append (result.message, msg) } return result, nil }
2. worker pool + 有序操作 方法一中速度虽然快,但是操作是无序的,不够稳定,我们考虑牺牲一小点性能(实际上几乎没差距),使用worker pool:, 创建措大数量的工作者,之后分配任务让worker异步执行,同时加强了于上下游交互错误的即使处理能力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 package serverimport ( "context" "errors" "fmt" "math" "strconv" "sync" "sync/atomic" "time" ) type BatchJobReq struct { IdList []int } type BatchJobResp struct { Message []string } type InsertFunc func (ctx context.Context, id int ) error func BatchCreateOptimized ( ctx context.Context, req *BatchJobReq, workers int , maxRetries int , insert InsertFunc, ) (BatchJobResp, error ) { n := len (req.IdList) if n == 0 { return BatchJobResp{Message: []string {}}, nil } if workers <= 0 { workers = 1 } if workers > n { workers = n } if maxRetries < 0 { maxRetries = 0 } type job struct { idx int id int } jobs := make (chan job, n) results := make ([]string , n) var wg sync.WaitGroup errCh := make (chan error , 1 ) finish := make (chan struct {}) var doneCount int32 for w := 0 ; w < workers; w++ { wg.Add(1 ) go func (workerID int ) { defer wg.Done() for jb := range jobs { select { case <-ctx.Done(): return default : } var lastErr error for attempt := 0 ; attempt <= maxRetries; attempt++ { select { case <-ctx.Done(): return default : } if err := insert(ctx, jb.id); err != nil { lastErr = err if attempt < maxRetries { backoff := time.Duration(math.Pow(2 , float64 (attempt))) * 100 * time.Millisecond time.Sleep(backoff) continue } select { case errCh <- fmt.Errorf("id=%d idx=%d failed after %d attempts: %w" , jb.id, attempt+1 , lastErr): default : } } else { results[jb.idx] = strconv.Itoa(jb.id) + " success" atomic.AddInt32(&doneCount, 1 ) break } } if lastErr != nil { return } } }(w) } go func () { for i, id := range req.IdList { select { case <-ctx.Done(): break default : } jobs <- job{idx: i, id: id} } close (jobs) }() go func () { wg.Wait() close (finish) }() select { case <-ctx.Done(): return BatchJobResp{Message: results}, ctx.Err() case err := <-errCh: return BatchJobResp{Message: results}, err case <-finish: return BatchJobResp{Message: results}, nil } }
实际运行时间 : Plan B 约等于 Plan A但是稍微小于(N = 1000)的时候, 此时用直接串行读取数据方式会慢如乌龟,尽量使用第二种方式