侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 79 篇文章
  • 累计创建 53 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

Golang的协程池

江南的风
2023-05-10 / 0 评论 / 0 点赞 / 19 阅读 / 6600 字 / 正在检测是否收录...

在Go语言中,协程(goroutine)是轻量级的线程,由Go运行时(runtime)管理。虽然Go标准库中没有直接提供协程池(goroutine pool)的实现,但我们可以自己实现一个基本的协程池来复用goroutine,减少创建和销毁goroutine的开销。

自己实现一个Golang的协程池

首先我们应该定义以下几个关键点:

  1. 定义协程池结构体:包含用于存储空闲goroutine的channel,以及用于控制goroutine数量的信号量(或计数器)。

  2. 初始化协程池:预分配一定数量的goroutine到空闲列表中。

  3. 任务提交:将任务发送到协程池,如果空闲goroutine不足,则可以选择阻塞等待或拒绝新任务。

  4. 任务执行:空闲goroutine从channel中取出任务并执行。

  5. 关闭协程池:等待所有任务完成,并安全关闭所有goroutine。

package pool

import (
	"fmt"
	"sync"
	"sync/atomic"
	"testing"
	"time"
)

type GoroPool struct {
	size        int
	jobs        chan func()
	wg          sync.WaitGroup
	isReleased  bool
	releaseChan chan bool
}

func NewGoroPool(size int, queueSize int) *GoroPool {
	return &GoroPool{
		size:        size,
		jobs:        make(chan func(), queueSize),
		isReleased:  false,
		releaseChan: make(chan bool),
	}
}

func (p *GoroPool) start() {
	for i := 0; i < p.size; i++ {
		p.wg.Add(1)
		go func() {
			defer p.wg.Done()
			for {
				select {
				case <-p.releaseChan:
					return
				case job := <-p.jobs:
					job()
				}
			}
		}()
	}
}

func (p *GoroPool) submit(job func()) {
	if job == nil {
		fmt.Println("job is nil, cannot submit job")
		return
	}
	if p.isReleased {
		fmt.Println("pool is released, cannot submit job")
		return
	}
	select {
	case p.jobs <- job:
	default:
		fmt.Println("job queue is full, cannot submit job")
	}
}

func (p *GoroPool) release() {
	p.isReleased = true
	close(p.releaseChan)
	p.wg.Wait()
	close(p.jobs)
}

var num int32

func addNum(i int32) {
	atomic.AddInt32(&num, i)
	time.Sleep(time.Millisecond)
}

func TestMyGoroPool(t *testing.T) {
	p := NewGoroPool(100, 10000)
	defer p.release()
	p.start()
	var wg sync.WaitGroup
	f := func() {
		addNum(1)
		wg.Done()
	}
	// submit jobs 10000 times
	runTimes := 10001
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		p.submit(f)
	}
	wg.Wait()
	fmt.Printf("num = %d \n ", num)
}

ants协程池的使用

ants协程池是Golang中一个广泛使用的开源项目,它提供了丰富的协程池管理功能,允许开发者在并发程序中限制goroutine的数量,复用资源,以达到更高效执行任务的效果。以下是ants协程池的使用方法和一些关键点的介绍:

1. 引入ants库

首先,你需要在你的Golang项目中引入ants库。如果你使用的是Go modules,可以通过在go.mod文件中添加相应的依赖来实现。ants库在GitHub上的地址为github.com/panjf2000/ants/v2,请确保你使用的是最新版本。

2. 协程池的创建

ants提供了两种主要的协程池创建方式:

  • 使用ants.NewPool(size int, options ...Option) (*Pool, error)创建一个普通的协程池,其中size表示协程池的大小,options是可选的配置项。

  • 使用ants.NewPoolWithFunc(size int, fn func(interface{}), options ...Option) (*PoolWithFunc, error)创建一个带函数的协程池,其中fn是协程池将执行的函数,该函数接受一个interface{}类型的参数。

3. 提交任务

  • 对于普通协程池,你可以使用Submit(task func()) error方法提交一个无参数的任务。

  • 对于带函数的协程池,你可以使用Invoke(args interface{}) error方法提交任务,并传入相应的参数。

4. 等待任务完成

如果你的任务需要等待所有协程执行完成,你可以使用Go的sync.WaitGroup或其他同步机制来等待。在提交任务时,对WaitGroupAdd(1)进行调用,在任务执行完成时调用Done()

5. 协程池的释放

当协程池不再需要时,应该调用Release()方法释放协程池资源,以避免内存泄漏。对于带函数的协程池,应调用p.Release();对于普通协程池,应调用pool.Release()

6. 其他重要功能

  • 动态调整协程池大小:ants协程池支持在运行时动态调整其大小,通过调用p.Resize(newSize int)方法可以实现。

  • 获取运行中的协程数量:你可以使用Running()方法获取当前协程池中正在运行的协程数量。

  • 优雅处理panic:ants协程池能够优雅地处理任务中的panic,防止程序崩溃,并通过配置panic处理函数来自定义panic时的行为。

示例代码

以下是一个使用ants协程池的简单示例:

package main  
  
import (  
	"fmt"  
	"sync"  
	"time"  
	"github.com/panjf2000/ants/v2"  
)  
  
func main() {  
	// 创建一个带函数的协程池  
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {  
		fmt.Println("Handling", i)  
	})  
	defer p.Release()  
  
	var wg sync.WaitGroup  
	for i := 0; i < 20; i++ {  
		wg.Add(1)  
		_ = p.Invoke(i)  
	}  
	wg.Wait()  
  
	// 创建一个普通协程池  
	pool, _ := ants.NewPool(5)  
	defer pool.Release()  
  
	for i := 0; i < 10; i++ {  
		_ = pool.Submit(func() {  
			fmt.Println("Executing task")  
			time.Sleep(1 * time.Second) // 模拟耗时任务  
		})  
	}  
  
	// 等待一段时间以确保所有任务完成  
	time.Sleep(2 * time.Second)  
}

在这个示例中,我们创建了一个带函数的协程池和一个普通协程池,分别提交了不同的任务,并使用了sync.WaitGroup来等待所有任务完成。最后,我们通过调用Release()方法来释放协程池资源。

0

评论区