背景
go中常见的实现协程池方案(极简法)
一般第一种比较直观,但是缺点是总值是固定的
第二种由两个值决定,运用了令牌桶的方式,配合*int甚至可以动态改变当前同时运行的协程数量
代码
package main
import (
"fmt"
"sync"
"time"
)
type Worker interface {
work(num int, cf chan mission)
}
type calFunc func(int, int)
type mission struct {
a, b int
//cf calFunc
}
type PoolF struct {
}
type PoolS struct {
}
func main() {
}
// 两种常见的线程池
func (p *PoolF) work(num int, cf chan mission) {
var wg sync.WaitGroup
var closeChan = make(chan struct{})
// 一次性开启多个协程
for i := 0; i < num; i++ {
go func(m chan mission, end <-chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer func() {
wg.Done()
}()
for {
select {
case <-end:
return
case item := <-m:
// doSomething
cal(item.a, item.b)
default:
}
}
}(cf, closeChan, &wg)
}
time.Sleep(time.Second * 3)
wg.Wait()
}
func (p *PoolS) work(num int, cf chan mission) {
maxValue := 10
var wg sync.WaitGroup
var once sync.Once
var queue = make(chan struct{}, maxValue)
var closeChan = make(chan struct{})
// 此处 num 本质上是最后运行过的协程总数,同时最多只有maxValue个协程在运行
for i := 0; i < num; i++ { // 也可以不限定循环次数
<-queue
go func(m <-chan mission, c <-chan struct{}) {
wg.Add(1)
defer func() {
wg.Done()
queue <- struct{}{}
}()
for {
select {
case <-c:
once.Do(func() { // 需要将协程通道一起关闭
close(queue)
})
return
case item := <-cf:
cal(item.a, item.b)
// 也可当任务完成后return,交还令牌
default:
}
}
}(cf, closeChan)
}
}
func cal(a, b int) {
fmt.Println(a * b)
}