介绍
传统基于channel的生产者与消费者模式,采用固定消费者个数形式,在面对生产数量猛增情况时,不能做到动态伸缩,容易致使队列积压。基于此 autoscalingworker 采用检测队列积压情况动态创建与销毁worker,来保证消费与避免性能的浪费。
探索
首先我们需要定义我们Worker的上下限,基于多少队列积压情况进行扩容与缩容。代码如下:
1 | type AutoScalingWorker struct { |
有了这些后,我们要做的就是循环检查我们的队列进行扩容与缩容了。
1 | func (auto *AutoScalingWorker) Start() { |
在golang中对worker进行扩容很简单,只需使用关键子go
就可以了,但是对以运行的goroutines
并没有kill办法,如果让其退出,只能等待起运行结束。此时,我们就需要借用golang中的另一神器channel
了,将我们的close信号传递进goroutines中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22func (auto *AutoScalingWorker) job() {
auto.lock.Lock()
auto.CurrentWorker = auto.CurrentWorker + 1
auto.lock.Unlock()
for {
select {
case <-auto.workerStop:
goto close
case data, ok := <-auto.Queue:
if !ok {
goto close
}
auto.Process(data)
}
}
close:
auto.lock.Lock()
auto.CurrentWorker = auto.CurrentWorker - 1
auto.lock.Unlock()
}
这样大体结构就完成了,我们使用起来就像这样1
2
3
4
5
6
7
8
9
10
11
12
13
14 c := make(chan interface{}, 1000)
auto := worker.AutoScalingWorker{
MinWorker: 0,
MaxWorker: 10,
QueueDepth: 8,
Interval: time.Second,
Process: func(i interface{}) {
time.Sleep(time.Millisecond * 100)
},
Queue: c,
}
go auto.Start()
输出结果如下,auto worker 表示当前运行的worker,QueueDepth 表示队列中积压的消息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20auto worker: 0 QueueDepth: 60
auto worker: 2 QueueDepth: 79
auto worker: 3 QueueDepth: 89
auto worker: 4 QueueDepth: 89
auto worker: 5 QueueDepth: 49
auto worker: 5 QueueDepth: 0
auto worker: 3 QueueDepth: 0
auto worker: 2 QueueDepth: 0
auto worker: 1 QueueDepth: 0
auto worker: 0 QueueDepth: 150
auto worker: 1 QueueDepth: 149
auto worker: 2 QueueDepth: 138
auto worker: 3 QueueDepth: 118
auto worker: 4 QueueDepth: 88
auto worker: 5 QueueDepth: 48
auto worker: 4 QueueDepth: 0
auto worker: 3 QueueDepth: 0
auto worker: 2 QueueDepth: 0
auto worker: 1 QueueDepth: 0
auto worker: 0 QueueDepth: 0
完成代码示例请查看这里