AutoScalingWorker

介绍

传统基于channel的生产者与消费者模式,采用固定消费者个数形式,在面对生产数量猛增情况时,不能做到动态伸缩,容易致使队列积压。基于此 autoscalingworker 采用检测队列积压情况动态创建与销毁worker,来保证消费与避免性能的浪费。

探索

首先我们需要定义我们Worker的上下限,基于多少队列积压情况进行扩容与缩容。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
type AutoScalingWorker struct {
MinWorker int
MaxWorker int
QueueDepth int
CurrentWorker int
Process Process
Queue chan interface{}
Interval time.Duration

stop chan int
workerStop chan int
lock sync.RWMutex
}

有了这些后,我们要做的就是循环检查我们的队列进行扩容与缩容了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (auto *AutoScalingWorker) Start() {
auto.workerStop = make(chan int)
auto.stop = make(chan int)

t := time.NewTicker(auto.Interval)
for {
select {
case <-t.C:
if len(auto.Queue) > auto.QueueDepth {
auto.Expansion()
} else {
auto.Shrinkage()
}
case <-auto.stop:
return
}
}
}

在golang中对worker进行扩容很简单,只需使用关键子go就可以了,但是对以运行的goroutines 并没有kill办法,如果让其退出,只能等待起运行结束。此时,我们就需要借用golang中的另一神器channel了,将我们的close信号传递进goroutines中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (auto *AutoScalingWorker) job() {
auto.lock.Lock()
auto.CurrentWorker = auto.CurrentWorker + 1
auto.lock.Unlock()
for {
select {
case <-auto.workerStop:
goto close

case data, ok := <-auto.Queue:
if !ok {
goto close
}
auto.Process(data)
}
}

close:
auto.lock.Lock()
auto.CurrentWorker = auto.CurrentWorker - 1
auto.lock.Unlock()
}

这样大体结构就完成了,我们使用起来就像这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   c := make(chan interface{}, 1000)

auto := worker.AutoScalingWorker{
MinWorker: 0,
MaxWorker: 10,
QueueDepth: 8,
Interval: time.Second,
Process: func(i interface{}) {
time.Sleep(time.Millisecond * 100)
},
Queue: c,
}

go auto.Start()

输出结果如下,auto worker 表示当前运行的worker,QueueDepth 表示队列中积压的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
auto worker: 0  QueueDepth: 60
auto worker: 2 QueueDepth: 79
auto worker: 3 QueueDepth: 89
auto worker: 4 QueueDepth: 89
auto worker: 5 QueueDepth: 49
auto worker: 5 QueueDepth: 0
auto worker: 3 QueueDepth: 0
auto worker: 2 QueueDepth: 0
auto worker: 1 QueueDepth: 0
auto worker: 0 QueueDepth: 150
auto worker: 1 QueueDepth: 149
auto worker: 2 QueueDepth: 138
auto worker: 3 QueueDepth: 118
auto worker: 4 QueueDepth: 88
auto worker: 5 QueueDepth: 48
auto worker: 4 QueueDepth: 0
auto worker: 3 QueueDepth: 0
auto worker: 2 QueueDepth: 0
auto worker: 1 QueueDepth: 0
auto worker: 0 QueueDepth: 0

完成代码示例请查看这里