我们在 Go 第四部分 Go 并发系列的 sync.Pool 和 channel 提到了很多用于 Go 协程池的第三方库,今天我们就来详细介绍它们的使用和实现。
1. worker 池
我们的第一个示例来自Marcio Castilho 在 使用 Go 每分钟处理百万请求 这篇文章中,就介绍了他们应对大并发请求的设计。这不是一个库,但是 worker pool 的实现很具有代表性:
- 他们将用户的请求放在一个 chan Job 中,这个 chan Job 就相当于一个待处理任务队列
- 除此之外,还有一个 chan chan Job 队列,用来存放可以处理任务的 worker 的缓存队列
- 每个 goroutine 都监听 chan chan Job 中的一个 chan Job
下面核心代码实现。
1.1 核心对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
// 1. 定义Worker 池和消息队列的长度
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// 2. 定义任务队列
type Job struct {
Payload Payload
}
var JobQueue chan Job
// 3. 定义 Worker
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job // 接收任务
quit chan bool // Worker 退出
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
// 这个每个 worker 监听的任务队列
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// 4. 创建 Worker Pool
type Dispatcher struct {
// 这是 worker 池,也是任务传递的中介
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
|
1.2 任务启动,创建 worker pool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
// 4.1 将任务从 JobQueue 放入到 Worker Pool 中某一个 Workder 的 JobChannel 中,来调用 Worker
func (d *Dispatcher) dispatch() {
for {
select {
// JobQueue 就是一个待处理任务队列
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
// 3.1 启动任务
func (w Worker) Start() {
go func() {
for {
// 重要: 把当前 worker 注册到 worker 池中
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// 3.2 停止任务
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
|
1.3 使用 worker pool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
// 5. 使用 Worker Pool
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
}
|
1.4 个人理解
这个 worker pool 第一看优点迷惑,有很多 channel,但是如果厘清里面的实现逻辑就感觉非常清晰,重点理解 Worker 这个 struct:
- 首先每个 worker 有各自监听的任务队列,而且一个 worker 就是一个 goroutine
- 然后他包含的 WorkerPool,就是 worker 挂载的 worker pool,worker 在启动时,需要把自己注册到 worker pool 中:
w.WorkerPool <- w.JobChannel
- Job 的流转过程是:
- 调用方把 Job 发送到全局的 JobQueue 中
- dispatch 从 JobQueue 拿到一个 Job,然后从 worker pool 拿到一个 worker 的 chan Job,即一个 goroutine
- 把 job 发送到worker 的 chan Job 中,让 worker 开始执行
这个实现中我有点不理解的地方是,为什么 Worker 的方法传递的是 Worker 的值,而不是指针。因为 Worker 的成员都是 chan,chan 在内部实现上就是指向 hchan 的指针,所以不会有内部状态复制的问题。