gopool-头条使用的协程池
原生goroutine使用很方便
1
2
3
|
go func() {
// do something
}
|
但是原生goroutine存在一些问题,包括无法限制使用的goroutine总数。
单个goroutine使用的内存虽然很少2KB,但是大量的goroutine并发可能会导致OOM,所以协程池的使用还是很有必要。
分析下头条开源出来使用的gopool,这里面为了减少gc,使用了很多sync.Pool。同时atomic原子操作的使用也值得借鉴。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func BenchmarkPool(b *testing.B) {
config := NewConfig()
config.ScaleThreshold = 1
p := NewPool("benchmark", int32(runtime.GOMAXPROCS(0)), config)
var wg sync.WaitGroup
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(benchmarkTimes)
for j := 0; j < benchmarkTimes; j++ {
p.Go(func() {
testFunc()
wg.Done()
})
}
wg.Wait()
}
}
|
多使用atomic和sync.Pool
![gopool](https://tva4.sinaimg.cn/large/8e50e345gy1gskxiuntwyj20g50f4wfb.jpg)
对外interface
1
2
3
4
5
6
7
8
9
10
11
12
|
type Pool interface {
// Name returns the corresponding pool name.
Name() string
// SetCap sets the goroutine capacity of the pool.
SetCap(cap int32)
// Go executes f.
Go(f func())
// CtxGo executes f and accepts the context.
CtxGo(ctx context.Context, f func())
// SetPanicHandler sets the panic handler.
SetPanicHandler(f func(context.Context, interface{}))
}
|
taskPool的结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
type pool struct {
// The name of the pool
name string
// capacity of the pool, the maximum number of goroutines that are actually working
cap int32
// Configuration information
config *Config
// linked list of tasks
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// Record the number of running workers
workerCount int32
// This method will be called when the worker panic
panicHandler func(context.Context, interface{})
}
type worker struct {
pool *pool
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (p *pool) CtxGo(ctx context.Context, f func()) {
t := taskPool.Get().(*task)
t.ctx = ctx
t.f = f
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
atomic.AddInt32(&p.taskCount, 1)
// The following two conditions are met:
// 1. the number of tasks is greater than the threshold.
// 2. The current number of workers is less than the upper limit p.cap.
// or there are currently no workers.
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
p.incWorkerCount()
w := workerPool.Get().(*worker)
w.pool = p
w.run()
}
}
|
github.com/bytedance/gopkg/util/gopool
atomic
乐观锁 悲观锁
CAS compare and swap
https://blog.betacat.io/post/golang-atomic-value-exploration/
绘图
draw.io