Skip to main content
  1. internet/

并发控制

·1967 words·4 mins·

当说到控制并发数量,一些场景说的是控制goroutine的最大数量,而在另一些场景说的是作为一个队列的消费者时,需要动态控制消费者的数量。前者可以通过sync.WaitGroup或者errgroup.Group来实现,后者则是这篇文章要讨论的场景。

在生产中,我们常常需要并发处理任务,这时就需要动态控制并发的数量:

  1. 并发数不能超过最大值:并发数量过大容易导致过度使用系统资源。
  2. 并发数不能低于最小值:并发数较小会使得任务处理速度变慢。
  3. 动态控制并发数量:任务多时,并发量应变大以加快任务处理速度;任务少时,并发量要降低以释放资源。

针对以上需求,我们抽象出了几种结构:

  • Worker:用于处理任务
  • WorkerManager:用于控制并发数量

在golang中,有以下几种方式可以实现。

方法1:使用channel来实现Worker #

使用Channel来实现Worker,控制并发数量就可以通过生成新的Channel或者关闭Channel来实现。

Worker实现 #

const workerCap = 100

type Worker chan any

func (w Worker) Work() {
	for job := range w {
		doJob(job)
	}
}

func (w Worker) Receive(job any) {
	w <- job
}

func (w Worker) Close() {
	close(w)
}

func NewWorker() Worker {
	return make(chan any, workerCap)
}

func doJob(job any) {
	fmt.Println("handling a job")
	time.Sleep(time.Millisecond * 100) // mock task exec
}

WorkerManager实现 #

workerManager需要控制Worker,设置最大并发数和最小并发数:

type WorkerManger struct {
	maxWorker int
	minWorker int

	Workers []Worker
	lock    sync.RWMutex
}

func NewWorkerManager(maxWorker, minWorker int) *WorkerManger {
	manager := &WorkerManger{
		maxWorker: maxWorker,
		minWorker: minWorker,
	}
	for i := 0; i < minWorker; i++ {
		manager.addWorker()
	}
	go manager.scale()
	return manager
}

在服务运行中,会通过控制worker的数量来控制并发数量:

type status int

const (
	idle status = iota
	normal
	busy
)

func (wm *WorkerManger) scale() {
	for {
		switch wm.status() {
		case busy:
			wm.addWorker()
		case idle:
			wm.remWorker()
		case normal:
		}
		time.Sleep(time.Millisecond * 10)
	}
}

查看当前的任务处理状态(我们假设当任务数量大于当前worker数量的两倍时为繁忙,任务数量小于当前worker数量的一半时为空闲):

func (wm *WorkerManger) status() status {
	wm.lock.RLock()
	defer wm.lock.RUnlock()

	jobCnt := 0
	for _, worker := range wm.Workers {
		jobCnt += len(worker)
	}
	if jobCnt > len(wm.Workers)*2 {
		return busy
	}
	if jobCnt < len(wm.Workers)/2 {
		return idle
	}
	return normal
}

任务繁忙时,需要增加worker:

func (wm *WorkerManger) addWorker() {
	fmt.Println("adding worker")
	wm.lock.RLock()
	if len(wm.Workers) >= wm.maxWorker {
		fmt.Println("worker num arrives max")
		wm.lock.RUnlock()
		return
	}
	wm.lock.RUnlock()

	wm.lock.Lock()
	defer wm.lock.Unlock()

	worker := NewWorker()
	go worker.Work()
	wm.Workers = append(wm.Workers, worker)
	return
}

任务空闲时,需要减少worker:

func (wm *WorkerManger) remWorker() {
	fmt.Println("removing worker")
	wm.lock.RLock()
	if len(wm.Workers) <= wm.minWorker {
		fmt.Println("worker num arrives min")
		wm.lock.RUnlock()
		return
	}
	wm.lock.RUnlock()

	wm.lock.Lock()
	defer wm.lock.Unlock()

	for i, worker := range wm.Workers {
		if len(worker) == 0 {
			fmt.Println("removing ", i, "st worker")
			worker.Close()
			wm.Workers = append(wm.Workers[:i], wm.Workers[i+1:]...)
			return
		}
	}
}

当接收到任务时,需要随机获取一个worker来处理任务:

func (wm *WorkerManger) Do(job any) {
	worker := wm.getWorker()
	worker.Receive(job)
}

func (wm *WorkerManger) getWorker() Worker {
	wm.lock.RLock()
	defer wm.lock.RUnlock()
	idx := rand.Intn(len(wm.Workers))
	return wm.Workers[idx]
}

以上就是全部代码了,执行一下:

func main() {
	manager := NewWorkerManager(10, 2)
	for i := 0; i < 15; i++ {
		manager.Do(i)
	}

	time.Sleep(time.Second * 5)
}

输出:

# 创建管理器时设置了两个worker
adding worker
adding worker
# 刚创建完没任务需要处理
removing worker
worker num arrives min
## 处理任务,输出乱序了
handling a job
handling a job
adding worker
adding worker
adding worker
adding worker
adding worker
handling a job
handling a job
handling a job
handling a job
handling a job
handling a job
handling a job
handling a job
handling a job
handling a job
handling a job
# 处理完任务进入idle状态,需要移除worker
removing worker
removing  0 st worker
removing worker
removing  1 st worker
handling a job
removing worker
removing  1 st worker
removing worker
removing  1 st worker
handling a job
removing worker
removing  0 st worker
removing worker
# 到达了最小并发数
worker num arrives min

方法2:使用协程实现Worker #

我们可以让Worker上报自己当前的状态——是繁忙还是空闲,如果有的worker空闲,那么需要释放worker;如果没有worker空闲,那么说明当前任务处理繁忙,需要新增worker。

Worker实现 #

抽象Worker为接口,这样Worker管理器就可以成为通用的组件。

type Worker interface {
	GetDataChannel() <-chan interface{}
	HandleData(interface{})
	CloseChannel()
}

WorkerManager实现 #

type WorkerManager struct {
	maxWorker      int // 最大并发数
	minWorker      int // 最小并发数
	currentWorkerNum *int32 // 当前的并发数 
	reportStatus   chan status // 汇报状态的channel

	closeChan   chan struct{} // 用于通知关闭的channel
	newWorkerFnc func() Worker // 创建新worker的函数
}

func NewWorkerManager(maxWorker, minWorker int, newWorkerFnc func() Worker) *WorkerManager {
	if maxWorker < minWorker {
		panic("maxWorker can't be less than minWorker")
	}
	if minWorker <= 0 {
		minWorker = 1
	}
	zeroNum := int32(0)
	s := &WorkerManager{
		maxWorker:      maxWorker,
		minWorker:      minWorker,
		reportStatus:   make(chan status, maxWorker),
		closeChan:      make(chan struct{}),
		newWorkerFnc:    newWorkerFnc,
		currentWorkerNum: &zeroNum,
	}

	for i := 0; i < minWorker; i++ {
		go s.newWorker()
	}
	go s.Manage()
	return s
}

动态管理并发数:

// Manage 管理线程逻辑
// 释放逻辑:当连续n次获取数据超时,并且休眠worker数小于最大的休眠worker数量,则释放worker
// 新建逻辑:当5秒内没有空闲worker时,新建worker
func (s *WorkerManager) Manage() {
	timer := time.NewTimer(time.Second * 5)
	for {
		select {
		case <-s.closeChan:
			return
		case <-s.reportStatus:
			break
		case <-timer.C: // not get any signal means that every thread is working
			if atomic.LoadInt32(s.currentWorkerNum) >= int32(s.maxWorker) {
				continue
			}
			go s.newWorker()
		}
		fmt.Println("current num", atomic.LoadInt32(s.currentWorkerNum))
		timer.Reset(time.Second * 5)
	}
}

创建新worker:当超过一段时间没有获取到任务后就上报自己的空闲状态。

func (s *WorkerManager) newWorker() {
	fmt.Println("new worker")
	consumer := s.newWorkerFnc()
	timeoutTimer := time.NewTimer(time.Second)
	timeoutNum := 0
	atomic.AddInt32(s.currentWorkerNum, 1)
	defer func() {
		if r := recover(); r != nil {
			fmt.Println("recover", r)
		}
		atomic.AddInt32(s.currentWorkerNum, -1)
		fmt.Println("release worker")
	}()
	needClose := false
	for {
		select {
		case data, ok := <-consumer.GetDataChannel():
			if ok {
				consumer.HandleData(data)
				timeoutNum = 0
			} else { // consumer channel closed
				return
			}
		case <-timeoutTimer.C:
			timeoutNum++
			s.reportStatus <- idleStatus
		case <-s.closeChan:
			needClose = true
		}
		if timeoutNum >= 3 && atomic.LoadInt32(s.currentWorkerNum) > int32(s.minWorker) {
			needClose = true
		}
		if needClose {
			consumer.CloseChannel() // wait consumer close, so that we can avoid lose data in the consumer channel
			for data := range consumer.GetDataChannel() {
				consumer.HandleData(data)
			}
			return
		}
		timeoutTimer.Reset(time.Second)
	}
}

优缺点分析 #

  1. 方法1需要频繁的加锁;而方法2无需加锁
  2. 方法1需要很多channel(每个worker一个channel);而方法2是多个worker共享一个channel
  3. 方法1从概率的角度来平均每个worker处理的任务;而方法2则是多个worker共享一个channel,因此,方法1如果用于每个任务处理时间不平均的场景下,可能会导致某个worker的任务堆积;而方法2不会