先思考一个问题:

go team 说我们可以创建成千上万个 goroutine 用于处理并发,那么我们还需要 goroutine pool 吗?

go team 这么说的底气在于其调度模型 G-P-M,确实

但是,没有什么是绝对的,比如100w goroutines or more and more

那么一旦 goroutine 数量过多就会导致资源侵占,CPU暴涨,那么怎么办?

池化! 复用goroutine

ants是一个高性能的协程池,实现了对大规模goroutine的调度管理、goroutine复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。

提纲

  • 池化技术
  • ants 如何在项目中使用
  • *ants 代码设计
  • 学习到的优秀源码

池化技术

池化技术是一种通过创建池子来提高对象复用的常用技术,减少重复创建、销毁的开销。常用池化技术有内存池、对象池、线程池、连接池等。在go语言,goroutine的复用可以理解为线程池技术。ants是一个优秀的代码库,被广泛用于生产环境。

ants 的使用

在其官方的README文件有详细的用法说明,这里简单介绍下基础使用

p, err := ants.NewPool(workerNum)
if err != nil {
  // 错误处理
}
defer p.Release() // 关闭时释放
// 工作任务
worker := func() {
		xxx
}
// 提交任务
err := p.Submit(worker)
if err != nil {
	// 错误处理
}

代码设计

  • 核心代码结构

    |-- pool.go
    |-- worker.go
    
  • 核心数据结构

    type Pool struct {
    	// 协程池的容量
    	capacity int32
      // 每个 worker 的过期时间
      expiryDuration time.Duration
    	// 存放可使用的worker
    	workers []*Worker
    }
    
    type Worker struct {
      // 所属的 pool
    	pool *Pool
    	// job
    	task chan func()
      // 被重新放回pool时更新
    	recycleTime time.Time
    }
    
  • 核心逻辑代码,隐藏了一些细节,关注宏观实现

    // NewUltimatePool 生成带有自定义定时任务的协程池实例
    func NewUltimatePool(size, expiry int, preAlloc bool) (*Pool, error) {
    	var p *Pool
      // 是否预先分配
    	if preAlloc {
    		p = &Pool{
    			capacity:       int32(size),
    			expiryDuration: time.Duration(expiry) * time.Second,
    			workers:        make([]*Worker, 0, size),
    		}
    	} else {
    		p = &Pool{
    			capacity:       int32(size),
    			expiryDuration: time.Duration(expiry) * time.Second,
    		}
    	}
      // 运行一个goroutine用于tick,回收过期的worker
    	go p.periodicallyPurge()
    	return p, nil
    }
    
    // Submit 提交一个任务
    func (p *Pool) Submit(task func()) error {
    	if w := p.retrieveWorker(); w == nil {
        // 没有空闲 worker
    		return ErrPoolOverload
    	} else {
    		w.task <- task
    	}
    	return nil
    }
    
    // retrieveWorker 返回一个空闲worker运行task
    func (p *Pool) retrieveWorker() *Worker {
    	var w *Worker
    	spawnWorker := func() {
        ... 省略 cacheWorker
    		w = &Worker{
    			pool: p,
    			task: make(chan func(), workerChanCap),
    		}
    		w.run()
    	}
    
    	p.lock.Lock()
    	idleWorkers := p.workers
    	n := len(idleWorkers) - 1
    	if n >= 0 {
        // 从队尾拿出
    		w = idleWorkers[n]
    		idleWorkers[n] = nil
    		p.workers = idleWorkers[:n]
    		p.lock.Unlock()
    	} else if p.Running() < p.Cap() {
    		p.lock.Unlock()
    		spawnWorker()
    	} else {
        // 阻塞情况会自旋等待获取一个worker,非阻塞则 return
    		...
    	}
    	return w
    }
    
    
    // 执行函数调用
    func (w *Worker) run() {
    	go func() {
        ... panicHandle
    		for f := range w.task {
    			if f == nil {
    				w.pool.decRunning()
    				w.pool.workerCache.Put(w)
    				return
    			}
          // 执行任务
    			f()
          // 回收worker
    			if ok := w.pool.revertWorker(w); !ok {
    				break
    			}
    		}
    	}()
    }
    

总结

  1. ants 池化核心——让一个 goroutine 多处理几个任务。
  2. ants 的实现和我们的线程池不一样,线程池是创建一定数量的线程,阻塞等待任务;而ants之所以不用这么做,是因为基于go G-P-M调度器,只需要限制goroutine数量,尽可能的复用goroutine。
  3. 这段代码读起来比较简单,对于培养代码阅读习惯有帮助。

参考资料