先思考一个问题:
go team 说我们可以创建成千上万个 goroutine 用于处理并发,那么我们还需要 goroutine pool 吗?
go team 这么说的底气在于其调度模型 G-P-M,确实
但是,没有什么是绝对的,比如100w goroutines or more and more
那么一旦 goroutine 数量过多就会导致资源侵占,CPU暴涨,那么怎么办?
池化! 复用goroutine
ants
是一个高性能的协程池,实现了对大规模goroutine的调度管理、goroutine复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。
提纲
- 池化技术
- ants 如何在项目中使用
- *ants 代码设计
- 学习到的优秀源码
池化技术
池化技术是一种通过创建池子来提高对象复用的常用技术,减少重复创建、销毁的开销。常用池化技术有内存池、对象池、线程池、连接池等。在go语言,goroutine的复用可以理解为线程池技术。ants
是一个优秀的代码库,被广泛用于生产环境。
ants 的使用
在其官方的README文件有详细的用法说明,这里简单介绍下基础使用
p, err := ants.NewPool(workerNum)
if err != nil {
// 错误处理
}
defer p.Release() // 关闭时释放
// 工作任务
worker := func() {
xxx
}
// 提交任务
err := p.Submit(worker)
if err != nil {
// 错误处理
}
代码设计
-
核心代码结构
|-- pool.go |-- worker.go
-
核心数据结构
type Pool struct { // 协程池的容量 capacity int32 // 每个 worker 的过期时间 expiryDuration time.Duration // 存放可使用的worker workers []*Worker } type Worker struct { // 所属的 pool pool *Pool // job task chan func() // 被重新放回pool时更新 recycleTime time.Time }
-
核心逻辑代码,隐藏了一些细节,关注宏观实现
// NewUltimatePool 生成带有自定义定时任务的协程池实例 func NewUltimatePool(size, expiry int, preAlloc bool) (*Pool, error) { var p *Pool // 是否预先分配 if preAlloc { p = &Pool{ capacity: int32(size), expiryDuration: time.Duration(expiry) * time.Second, workers: make([]*Worker, 0, size), } } else { p = &Pool{ capacity: int32(size), expiryDuration: time.Duration(expiry) * time.Second, } } // 运行一个goroutine用于tick,回收过期的worker go p.periodicallyPurge() return p, nil }
// Submit 提交一个任务 func (p *Pool) Submit(task func()) error { if w := p.retrieveWorker(); w == nil { // 没有空闲 worker return ErrPoolOverload } else { w.task <- task } return nil } // retrieveWorker 返回一个空闲worker运行task func (p *Pool) retrieveWorker() *Worker { var w *Worker spawnWorker := func() { ... 省略 cacheWorker w = &Worker{ pool: p, task: make(chan func(), workerChanCap), } w.run() } p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - 1 if n >= 0 { // 从队尾拿出 w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() spawnWorker() } else { // 阻塞情况会自旋等待获取一个worker,非阻塞则 return ... } return w }
// 执行函数调用 func (w *Worker) run() { go func() { ... panicHandle for f := range w.task { if f == nil { w.pool.decRunning() w.pool.workerCache.Put(w) return } // 执行任务 f() // 回收worker if ok := w.pool.revertWorker(w); !ok { break } } }() }
总结
- ants 池化核心——让一个 goroutine 多处理几个任务。
- ants 的实现和我们的线程池不一样,线程池是创建一定数量的线程,阻塞等待任务;而ants之所以不用这么做,是因为基于go G-P-M调度器,只需要限制goroutine数量,尽可能的复用goroutine。
- 这段代码读起来比较简单,对于培养代码阅读习惯有帮助。