热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

golang协程池设计详解

这篇文章主要介绍了golang协程池设计详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

Why Pool

go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量

for {
    // 监听tcp
    rw, e := l.Accept()
    if e != nil {
      .......
    }
    tempDelay = 0
    c := srv.newConn(rw)
    c.setState(c.rwc, StateNew) // before Serve can return
    // 启动协程处理上下文
    go c.serve(ctx)
}

虽然创建一个groutine占用的内存极小(大约2KB左右,线程通常2M左右),但是在实际生产环境无限制的开启协程显然是不科学的,比如上图的逻辑,如果来几千万个请求就会开启几千万个groutine,当没有更多内存可用时,go的调度器就会阻塞groutine最终导致内存溢出乃至严重的崩溃,所以本文将通过实现一个简单的协程池,以及剖析几个开源的协程池源码来探讨一下对groutine的并发控制以及多路复用的设计和实现。

一个简单的协程池

过年前做过一波小需求,是将主播管理系统中信息不完整的主播找出来然后再到其相对应的直播平台爬取完整信息并补全,当时考虑到每一个主播的数据都要访问一次直播平台所以就用应对每一个主播开启一个groutine去抓取数据,虽然这个业务量还远远远远达不到能造成groutine性能瓶颈的地步,但是心里总是不舒服,于是放假回来后将其优化成从协程池中控制groutine数量再开启爬虫进行数据抓取。思路其实非常简单,用一个channel当做任务队列,初始化groutine池时确定好并发量,然后以设置好的并发量开启groutine同时读取channel中的任务并执行, 模型如下图

实现

type SimplePool struct {
  wg  sync.WaitGroup
  work chan func() //任务队列
}

func NewSimplePoll(workers int) *SimplePool {
  p := &SimplePool{
    wg:  sync.WaitGroup{},
    work: make(chan func()),
  }
  p.wg.Add(workers)
  //根据指定的并发量去读取管道并执行
  for i := 0; i 

测试

测试设定为在并发数量为20的协程池中并发抓取一百个人的信息, 因为代码包含较多业务逻辑所以sleep 1秒模拟爬虫过程,理论上执行时间为5秒

func TestSimplePool(t *testing.T) {
  p := NewSimplePoll(20)
  for i := 0; i <100; i++ {
    p.Add(parseTask(i))
  }
  p.Run()
}

func parseTask(i int) func() {
  return func() {
    // 模拟抓取数据的过程
    time.Sleep(time.Second * 1)
    fmt.Println("finish parse ", i)
  }
}

这样一来最简单的一个groutine池就完成了

go-playground/pool

上面的groutine池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,所以又去看了一下go-playground/pool的源码实现,先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。

// 需要加入pool 中执行的任务
type WorkFunc func(wu WorkUnit) (interface{}, error)

// 工作单元
type workUnit struct {
  value   interface{}  // 任务结果 
  err    error     // 任务的报错
  done    chan struct{} // 通知任务完成
  fn     WorkFunc  
  cancelled atomic.Value  // 任务是否被取消
  cancelling atomic.Value  // 是否正在取消任务
  writing  atomic.Value  // 任务是否正在执行
}

接下来看Pool的结构

type limitedPool struct {
  workers uint      // 并发量 
  work  chan *workUnit // 任务channel
  cancel chan struct{}  // 用于通知结束的channel
  closed bool      // 是否关闭
  m    sync.RWMutex  // 读写锁,主要用来保证 closed值的并发安全
}

初始化groutine池, 以及启动设定好数量的groutine

// 初始化pool,设定并发量
func NewLimited(workers uint) Pool {
  if workers == 0 {
    panic("invalid workers '0'")
  }
  p := &limitedPool{
    workers: workers,
  }
  p.initialize()
  return p
}

func (p *limitedPool) initialize() {
  p.work = make(chan *workUnit, p.workers*2)
  p.cancel = make(chan struct{})
  p.closed = false
  for i := 0; i 

往POOL中添加任务,并检查pool是否关闭

func (p *limitedPool) Queue(fn WorkFunc) WorkUnit {
  w := &workUnit{
    done: make(chan struct{}),
    fn:  fn,
  }

  go func() {
    p.m.RLock()
    if p.closed {
      w.err = &ErrPoolClosed{s: errClosed}
      if w.cancelled.Load() == nil {
        close(w.done)
      }
      p.m.RUnlock()
      return
    }
    // 将工作单元写入workChannel, pool启动后将由上面newWorker函数中读取执行
    p.work <- w
    p.m.RUnlock()
  }()

  return w
}

在go-playground/pool包中, limitedPool的批量并发执行还需要借助batch.go来完成

// batch contains all information for a batch run of WorkUnits
type batch struct {
  pool  Pool     // 上面的limitedPool实现了Pool interface
  m    sync.Mutex  // 互斥锁,用来判断closed
  units  []WorkUnit  // 工作单元的slice, 这个主要用在不设并发限制的场景,这里忽略
  results chan WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取
  done  chan struct{} // 通知batch是否完成
  closed bool
  wg   *sync.WaitGroup
}
// go-playground/pool 中有设置并发量和不设并发量的批量任务,都实现Pool interface,初始化batch批量任务时会将之前创建好的Pool传入newBatch
func newBatch(p Pool) Batch {
  return &batch{
    pool:  p,
    units:  make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.
    results: make(chan WorkUnit),
    done:  make(chan struct{}),
    wg:   new(sync.WaitGroup),
  }
}

// 往批量任务中添加workFunc任务
func (b *batch) Queue(fn WorkFunc) {

  b.m.Lock()
  if b.closed {
    b.m.Unlock()
    return
  }
  //往上述的limitPool中添加workFunc
  wu := b.pool.Queue(fn)

  b.units = append(b.units, wu) // keeping a reference for cancellation purposes
  b.wg.Add(1)
  b.m.Unlock()
  
  // 执行完后将workUnit写入结果集channel
  go func(b *batch, wu WorkUnit) {
    wu.Wait()
    b.results <- wu
    b.wg.Done()
  }(b, wu)
}

// 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞
func (b *batch) QueueComplete() {
  b.m.Lock()
  b.closed = true
  close(b.done)
  b.m.Unlock()
}

// 获取批量任务结果集
func (b *batch) Results() <-chan WorkUnit {
  go func(b *batch) {
    <-b.done
    b.m.Lock()
    b.wg.Wait()
    b.m.Unlock()
    close(b.results)
  }(b)
  return b.results
}

测试

func SendMail(int int) pool.WorkFunc {
  fn := func(wu pool.WorkUnit) (interface{}, error) {
    // sleep 1s 模拟发邮件过程
    time.Sleep(time.Second * 1)
    // 模拟异常任务需要取消
    if int == 17 {
      wu.Cancel()
    }
    if wu.IsCancelled() {
      return false, nil
    }
    fmt.Println("send to", int)
    return true, nil
  }
  return fn
}

func TestBatchWork(t *testing.T) {
  // 初始化groutine数量为20的pool
  p := pool.NewLimited(20)
  defer p.Close()
  batch := p.Batch()
  // 设置一个批量任务的过期超时时间
  t := time.After(10 * time.Second)
  go func() {
    for i := 0; i <100; i++ {
      batch.Queue(SendMail(i))
    }
    batch.QueueComplete()
  }()
  // 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行
  r := batch.Results()
LOOP:
  for {
    select {
    case <-t:
    // 登台超时通知
      fmt.Println("recived timeout")
      break LOOP
   
    case email, ok := <-r:
    // 读取结果集
      if ok {
        if err := email.Error(); err != nil {
          fmt.Println("err", err.Error())
        }
        fmt.Println(email.Value())
      } else {
        fmt.Println("finish")
        break LOOP
      }
    }
  }
}

  



接近理论值5s, 通知模拟被取消的work也正常取消

go-playground/pool在比起之前简单的协程池的基础上, 对pool, worker的状态有了很好的管理。但是,但是问题来了,在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。所以我又去看了一下ants的设计和实现。

ants

ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各种池化技术(这个日后再开新坑去读源码), ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。

先看一下ants的pool 结构体 (pool.go)

type Pool struct {
  // 协程池的容量 (groutine数量的上限)
  capacity int32
  // 正在执行中的groutine
  running int32
  // 过期清理间隔时间
  expiryDuration time.Duration
  // 当前可用空闲的groutine
  workers []*Worker
  // 表示pool是否关闭
  release int32
  // lock for synchronous operation.
  lock sync.Mutex
  // 用于控制pool等待获取可用的groutine
  cond *sync.Cond
  // 确保pool只被关闭一次
  once sync.Once
  // worker临时对象池,在复用worker时减少新对象的创建并加速worker从pool中的获取速度
  workerCache sync.Pool
  // pool引发panic时的执行函数
  PanicHandler func(interface{})
}

接下来看pool的工作单元 worker (worker.go)

type Worker struct {
  // worker 所属的poo;
  pool *Pool
  // 任务队列
  task chan func()
  // 回收时间,即该worker的最后一次结束运行的时间
  recycleTime time.Time
}

执行worker的代码 (worker.go)

func (w *Worker) run() {
  // pool中正在执行的worker数+1
  w.pool.incRunning()
  go func() {
    defer func() {
      if p := recover(); p != nil {
        //若worker因各种问题引发panic, 
        //pool中正在执行的worker数 -1,     
        //如果设置了Pool中的PanicHandler,此时会被调用
        w.pool.decRunning()
        if w.pool.PanicHandler != nil {
          w.pool.PanicHandler(p)
        } else {
          log.Printf("worker exits from a panic: %v", p)
        }
      }
    }()
    
    // worker 执行任务队列
    for f := range w.task {
      //任务队列中的函数全部被执行完后,
      //pool中正在执行的worker数 -1, 
      //将worker 放回对象池
      if f == nil {
        w.pool.decRunning()
        w.pool.workerCache.Put(w)
        return
      }
      f()
      //worker 执行完任务后放回Pool 
      //使得其余正在阻塞的任务可以获取worker
      w.pool.revertWorker(w)
    }
  }()
}

了解了工作单元worker如何执行任务以及与pool交互后,回到pool中查看其实现, pool的核心就是取出可用worker提供给任务执行 (pool.go)

// 向pool提交任务
func (p *Pool) Submit(task func()) error {
  if 1 == atomic.LoadInt32(&p.release) {
    return ErrPoolClosed
  }
  // 获取pool中的可用worker并向其任务队列中写入任务
  p.retrieveWorker().task <- task
  return nil
}


// **核心代码** 获取可用worker
func (p *Pool) retrieveWorker() *Worker {
  var w *Worker

  p.lock.Lock()
  idleWorkers := p.workers
  n := len(idleWorkers) - 1
 // 当前pool中有可用worker, 取出(队尾)worker并执行
  if n >= 0 {
    w = idleWorkers[n]
    idleWorkers[n] = nil
    p.workers = idleWorkers[:n]
    p.lock.Unlock()
  } else if p.Running() 

在批量并发任务的执行过程中, 如果有超过5纳秒(ants中默认worker过期时间为5ns)的worker未被分配新的任务,则将其作为过期worker清理掉,从而保证pool中可用的worker都能发挥出最大的作用以及将任务分配得更均匀
(pool.go)

// 该函数会在pool初始化后在协程中启动
func (p *Pool) periodicallyPurge() {
  // 创建一个5ns定时的心跳
  heartbeat := time.NewTicker(p.expiryDuration)
  defer heartbeat.Stop()

  for range heartbeat.C {
    currentTime := time.Now()
    p.lock.Lock()
    idleWorkers := p.workers
    if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
      p.lock.Unlock()
      return
    }
    n := -1
    for i, w := range idleWorkers {
      // 因为pool 的worker队列是先进后出的,所以正序遍历可用worker时前面的往往里当前时间越久
      if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
        break
      }  
      // 如果worker最后一次运行时间距现在超过5纳秒,视为过期,worker收到nil, 执行上述worker.go中 if n == nil 的操作
      n = i
      w.task <- nil
      idleWorkers[i] = nil
    }
    if n > -1 {
      // 全部过期
      if n >= len(idleWorkers)-1 {
        p.workers = idleWorkers[:0]
      } else {
      // 部分过期
        p.workers = idleWorkers[n+1:]
      }
    }
    p.lock.Unlock()
  }
}

测试

func TestAnts(t *testing.T) {
  wg := sync.WaitGroup{}
  pool, _ := ants.NewPool(20)
  defer pool.Release()
  for i := 0; i <100; i++ {
    wg.Add(1)
    pool.Submit(sendMail(i, &wg))
  }
  wg.Wait()
}

func sendMail(i int, wg *sync.WaitGroup) func() {
  return func() {
    time.Sleep(time.Second * 1)
    fmt.Println("send mail to ", i)
    wg.Done()
  }
}


这里虽只简单的测试批量并发任务的场景, 如果大家有兴趣可以去看看ants的压力测试, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍, 可谓是协程池中的神器。

借用ants作者的原话来说:

然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情…

Over

一口气从简单到复杂总结了三个协程池的实现,受益匪浅, 感谢各开源库的作者, 虽然世界上没有龙,但是屠龙技是必须练的,因为它就像存款,不一定要全部都用了,但是一定不能没有!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 2018年人工智能大数据的爆发,学Java还是Python?
    本文介绍了2018年人工智能大数据的爆发以及学习Java和Python的相关知识。在人工智能和大数据时代,Java和Python这两门编程语言都很优秀且火爆。选择学习哪门语言要根据个人兴趣爱好来决定。Python是一门拥有简洁语法的高级编程语言,容易上手。其特色之一是强制使用空白符作为语句缩进,使得新手可以快速上手。目前,Python在人工智能领域有着广泛的应用。如果对Java、Python或大数据感兴趣,欢迎加入qq群458345782。 ... [详细]
  • 生成对抗式网络GAN及其衍生CGAN、DCGAN、WGAN、LSGAN、BEGAN介绍
    一、GAN原理介绍学习GAN的第一篇论文当然由是IanGoodfellow于2014年发表的GenerativeAdversarialNetworks(论文下载链接arxiv:[h ... [详细]
  • 如何使用代理服务器进行网页抓取?
    本文介绍了如何使用代理服务器进行网页抓取,并探讨了数据驱动对竞争优势的重要性。通过网页抓取,企业可以快速获取并分析大量与需求相关的数据,从而制定营销战略。同时,网页抓取还可以帮助电子商务公司在竞争对手的网站上下载数百页的有用数据,提高销售增长和毛利率。 ... [详细]
  • 背景应用安全领域,各类攻击长久以来都危害着互联网上的应用,在web应用安全风险中,各类注入、跨站等攻击仍然占据着较前的位置。WAF(Web应用防火墙)正是为防御和阻断这类攻击而存在 ... [详细]
  • 【爬虫】关于企业信用信息公示系统加速乐最新反爬虫机制
    ( ̄▽ ̄)~又得半夜修仙了,作为一个爬虫小白,花了3天时间写好的程序,才跑了一个月目标网站就更新了,是有点悲催,还是要只有一天的时间重构。升级后网站的层次结构并没有太多变化,表面上 ... [详细]
  • 开发笔记:Python之路第一篇:初识Python
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Python之路第一篇:初识Python相关的知识,希望对你有一定的参考价值。Python简介& ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 本文介绍了新款奇骏的两个让人上瘾的功能,分别是智能互联系统和BOSE音响。通过对新款奇骏的配置和功能进行评测,探讨了这两个新增功能的使用体验和优势。此外,还介绍了新款奇骏的其他配置和改进,如增加的座椅和驾驶辅助系统,以及内饰的舒适性提升。对于喜欢音响的消费者来说,BOSE音响的升级也是一个亮点。最后,文章提到了BOSE音响的数字还原能力,以及7座版无法配备BOSE音响的原因。 ... [详细]
  • 本文介绍了adg架构设置在企业数据治理中的应用。随着信息技术的发展,企业IT系统的快速发展使得数据成为企业业务增长的新动力,但同时也带来了数据冗余、数据难发现、效率低下、资源消耗等问题。本文讨论了企业面临的几类尖锐问题,并提出了解决方案,包括确保库表结构与系统测试版本一致、避免数据冗余、快速定位问题等。此外,本文还探讨了adg架构在大版本升级、上云服务和微服务治理方面的应用。通过本文的介绍,读者可以了解到adg架构设置的重要性及其在企业数据治理中的应用。 ... [详细]
  • 本文详细介绍了云服务器API接口的概念和作用,以及如何使用API接口管理云上资源和开发应用程序。通过创建实例API、调整实例配置API、关闭实例API和退还实例API等功能,可以实现云服务器的创建、配置修改和销毁等操作。对于想要学习云服务器API接口的人来说,本文提供了详细的入门指南和使用方法。如果想进一步了解相关知识或阅读更多相关文章,请关注编程笔记行业资讯频道。 ... [详细]
  • 《51CTO编辑部的外传》——剧本篇(下)阴天MMA咏春电动车时隔《51CTO编辑部的外传》——剧本篇(上)的出炉ÿ ... [详细]
  • Python入门后,想要从事自由职业可以做哪方面工作?1.爬虫很多人入门Python的必修课之一就是web开发和爬虫。但是这两项想要赚钱的话 ... [详细]
  • 本人学习笔记,知识点均摘自于网络,用于学习和交流(如未注明出处,请提醒,将及时更正,谢谢)OS:我学习是为了上 ... [详细]
author-avatar
加勒比海盗530
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有