在Go语言中,协程(goroutine)是轻量级的线程,由Go运行时(runtime)管理。虽然Go标准库中没有直接提供协程池(goroutine pool)的实现,但我们可以自己实现一个基本的协程池来复用goroutine,减少创建和销毁goroutine的开销。
自己实现一个Golang的协程池
首先我们应该定义以下几个关键点:
定义协程池结构体:包含用于存储空闲goroutine的channel,以及用于控制goroutine数量的信号量(或计数器)。
初始化协程池:预分配一定数量的goroutine到空闲列表中。
任务提交:将任务发送到协程池,如果空闲goroutine不足,则可以选择阻塞等待或拒绝新任务。
任务执行:空闲goroutine从channel中取出任务并执行。
关闭协程池:等待所有任务完成,并安全关闭所有goroutine。
package pool
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
type GoroPool struct {
size int
jobs chan func()
wg sync.WaitGroup
isReleased bool
releaseChan chan bool
}
func NewGoroPool(size int, queueSize int) *GoroPool {
return &GoroPool{
size: size,
jobs: make(chan func(), queueSize),
isReleased: false,
releaseChan: make(chan bool),
}
}
func (p *GoroPool) start() {
for i := 0; i < p.size; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-p.releaseChan:
return
case job := <-p.jobs:
job()
}
}
}()
}
}
func (p *GoroPool) submit(job func()) {
if job == nil {
fmt.Println("job is nil, cannot submit job")
return
}
if p.isReleased {
fmt.Println("pool is released, cannot submit job")
return
}
select {
case p.jobs <- job:
default:
fmt.Println("job queue is full, cannot submit job")
}
}
func (p *GoroPool) release() {
p.isReleased = true
close(p.releaseChan)
p.wg.Wait()
close(p.jobs)
}
var num int32
func addNum(i int32) {
atomic.AddInt32(&num, i)
time.Sleep(time.Millisecond)
}
func TestMyGoroPool(t *testing.T) {
p := NewGoroPool(100, 10000)
defer p.release()
p.start()
var wg sync.WaitGroup
f := func() {
addNum(1)
wg.Done()
}
// submit jobs 10000 times
runTimes := 10001
for i := 0; i < runTimes; i++ {
wg.Add(1)
p.submit(f)
}
wg.Wait()
fmt.Printf("num = %d \n ", num)
}
ants协程池的使用
ants协程池是Golang中一个广泛使用的开源项目,它提供了丰富的协程池管理功能,允许开发者在并发程序中限制goroutine的数量,复用资源,以达到更高效执行任务的效果。以下是ants协程池的使用方法和一些关键点的介绍:
1. 引入ants库
首先,你需要在你的Golang项目中引入ants库。如果你使用的是Go modules,可以通过在go.mod
文件中添加相应的依赖来实现。ants库在GitHub上的地址为github.com/panjf2000/ants/v2
,请确保你使用的是最新版本。
2. 协程池的创建
ants提供了两种主要的协程池创建方式:
使用
ants.NewPool(size int, options ...Option) (*Pool, error)
创建一个普通的协程池,其中size
表示协程池的大小,options
是可选的配置项。使用
ants.NewPoolWithFunc(size int, fn func(interface{}), options ...Option) (*PoolWithFunc, error)
创建一个带函数的协程池,其中fn
是协程池将执行的函数,该函数接受一个interface{}
类型的参数。
3. 提交任务
对于普通协程池,你可以使用
Submit(task func()) error
方法提交一个无参数的任务。对于带函数的协程池,你可以使用
Invoke(args interface{}) error
方法提交任务,并传入相应的参数。
4. 等待任务完成
如果你的任务需要等待所有协程执行完成,你可以使用Go的sync.WaitGroup
或其他同步机制来等待。在提交任务时,对WaitGroup
的Add(1)
进行调用,在任务执行完成时调用Done()
。
5. 协程池的释放
当协程池不再需要时,应该调用Release()
方法释放协程池资源,以避免内存泄漏。对于带函数的协程池,应调用p.Release()
;对于普通协程池,应调用pool.Release()
。
6. 其他重要功能
动态调整协程池大小:ants协程池支持在运行时动态调整其大小,通过调用
p.Resize(newSize int)
方法可以实现。获取运行中的协程数量:你可以使用
Running()
方法获取当前协程池中正在运行的协程数量。优雅处理panic:ants协程池能够优雅地处理任务中的panic,防止程序崩溃,并通过配置panic处理函数来自定义panic时的行为。
示例代码
以下是一个使用ants协程池的简单示例:
package main
import (
"fmt"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
func main() {
// 创建一个带函数的协程池
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
fmt.Println("Handling", i)
})
defer p.Release()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
_ = p.Invoke(i)
}
wg.Wait()
// 创建一个普通协程池
pool, _ := ants.NewPool(5)
defer pool.Release()
for i := 0; i < 10; i++ {
_ = pool.Submit(func() {
fmt.Println("Executing task")
time.Sleep(1 * time.Second) // 模拟耗时任务
})
}
// 等待一段时间以确保所有任务完成
time.Sleep(2 * time.Second)
}
在这个示例中,我们创建了一个带函数的协程池和一个普通协程池,分别提交了不同的任务,并使用了sync.WaitGroup
来等待所有任务完成。最后,我们通过调用Release()
方法来释放协程池资源。
评论区