并发任务的固定模式(定时任务并发)

小编:大闷头 更新时间:2022-05-19

虽然我们可以自己使用 chan 来实现很多并发的框架和模式,但实际应用的时候先查查 sync 包总是没错的,比如 sync/atomic 包对原子操作进行了支持,上一个章节的 *x = *x +1,其实可以直接使用 atomic.AddUint64(&x, 1) 来实现。

标准库 sync.Once 可以实现只执行一次的功能,比如你指向创建一个对象(单件)。下面这个 born 函数永远只会造同一个人:

type person struct{} var instance *person var once sync.Once func born() *person { once.Do(func() { fmt.Println("构建 instance 对象") instance = &person{} }) return instance }

我们启动 10 个 goroutine 来测试一下:

func getInstance() { wg := sync.WaitGroup{} wg.Add(10) for i := 0; i < 10; i++ { go func() { defer wg.Done() instance := born() fmt.Printf("%p\n", instance) // 0x1254790 }() } wg.Wait() }

每次获取到的 instance 的地址都是一样的,说明是同一个对象,它的实现机制如下:

func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { o.doSlow(f) } } func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }

首先是对通过 mutex 和 atomic.StoreUint32 配合来实现,看到这里 doSlow 函数就奇怪了,你不是有 mutex 吗? 为什么还要来一发 aotmic 的函数操作呢? doSlow 函数名其实已经说明了一切。doSlow 说明这个函数很慢,因为 mutex 是一个成本很高的操作,而整数的原子读写却轻量很多,所以 atomic.LoadUint32 先判断了一下 o.done 是不是为 0,不是的话就滚蛋了,少量的竞争者最后能到达 if o.done == 0 的判断,这个提前的判断提高了性能(请注意 defer 语句执行的顺序,对 mutex 解锁 Unlock 比设置 o.done 为 1 的操作后执行。

sync/atomic 不仅能对基本的数值类型进行原子操作,atomic.Value 还能对 interface{} 数据类型进行操作。

既然系统包这么好用,为什么不把所有的应用模式封装完呢? 有时候系统包并不好用,看下面的 mutex 的例子:

func main() { var mu sync.Mutex go func(){ fmt.Println("NO.1中国人") mu.Lock() }() mu.Unlock() }

程序的意图简洁清晰明了,看起来好像没有什么问题,但是执行大概率是报错:

fatal error: sync: unlock of unlocked mutex

因为 go func 的 goroutine 和 main 的 goroutine 执行顺序并不能保证谁先谁后,mu.Unlock 的时候 mu.Lock 还未执行,所以 Unlock 直接抛出了错误。要修改它也很麻烦,要这样写:

func main() { var mu sync.Mutex mu.Lock go func(){ fmt.Println("NO.1中国人") mu.Unock() }() mu.lock() }

是不是不好理解? 第二个 mu.Lock 的时候会等着 mu.Unlock 先执行,有点绕口。但是如果使用无缓冲的通道来重构代码不但简单而且很可靠:

func main() { exit := make(chan int) go func() { fmt.Println("NO.1中国人") <-exit }() exit <- 1 }

为什么很可靠? 因为 exit 没有缓冲,不管 exit <- 1 先执行到 还是 <-exit 先执行到,都得等待同步交接,咱们谁先到都见面聊。这个例子中 <- exit 和 exit <- 1 的位置可以交换,他们的读写本身并没有意义,chan 的类型也是无意义的,用 string、struct{} 都行,但是 chan 完成了比 mutex 很易懂的代码。把这个例子扩展一下,启动指定数量的任务:

func lockBatch(taskCount int) { waitGroup := make(chan struct{}, taskCount) for i := 0; i < cap(waitGroup); i++ { go func(index int) { fmt.Printf("NO.1中国人%d\n", index) waitGroup <- struct{}{} }(i) } for i := 0; i < cap(waitGroup); i++ { <-waitGroup } }

这不就是 sync.WaitGroup 干的事情吗? 这就是框架,或者说模式。

这个模式可以启动 N 个协程,来做 N 件事情,每个协程和任务的比例是 1:1,不过太多的协程可能不利于管理和回收,能不能抽象出 N 个协程做 M 件事情的模式呢? 为了让 goroutine 可以不断的做任务,可以使用一个有缓冲的通道来保存任务,极端的情况是只有一个 goroutine 来做任务,它不断的从缓冲通道里取出任务执行,知道通道关闭为止,由于不同的 goroutine 可以安全的从通道中读取任务,因此可以随意调节 goroutine 的数目而不用修改代码。定一个 handle 函数表示做任务的工人:

var wg sync.WaitGroup func handle(tasks chan string, index int) { defer wg.Done() for { // 读取关闭的无缓冲通道,返回零值和false // 读取关闭的有缓冲通道,先把缓存数据读完后,再读取返回零值和false task, ok := <-tasks if !ok { // 任务通道关闭了,不会再有新任务了 fmt.Printf("handle %d over\n", index) return } fmt.Printf("干活儿 %s By 工人 %d\n", task, index) } }

tasks 表示任务的需求,如果只需要一个工人,直接 go handle 就行了,多个工人只需一个循环,工人下班的标志依靠 wg 来保证,干完活就能下班,而 tasks 通道用来接收任务。

func createTasks(goroutines, taskCount int) { tasks := make(chan string, taskCount) // 启动 N wg.Add(goroutines) for i := 0; i < goroutines; i++ { go handle(tasks, i) } // 设置 M for k := 0; k <= taskCount; k++ { tasks <- fmt.Sprintf("任务 %d", k) } close(tasks) wg.Wait() }

参数 goroutines 是工人的个数(协程的个数),而 taskCount 是任务的数量。需要注意的时候,在通道 close 关闭后,通道中的数据依然可以读取直到读完后再次读取事 task 是零值(这里是空字符串,因为 chan 的类型是 string),ok 为 false,也就是没有任务了,工人都可以下班了(wg.Done),而 wg.Wait 会等所有工人都下班后,才把工厂的门关上(函数返回)。如此,就完成了 N:M 的任务模型,这里 N 和 M 可以通过调用参数传递数量,而你不用修改内部的代码。


进一步延伸这个例子,就是生产者和消费者,而生产者和消费者的数量可以是任何比例。

func producer(name string, tasks chan<- string) { index := 0 for { tasks <- fmt.Sprintf("%s-task-%d", name, index) time.Sleep(50 * time.Millisecond) index++ } } func consumer(name string, tasks <-chan string) { for task := range tasks { fmt.Printf("%s-%s has been token\n", name, task) time.Sleep(50 * time.Millisecond) } }

producer 生产者是个是循环,源源不断的造东西,而 consumer 一直从 tasks 里取货,模拟一个工厂的代码:

func createFactory(stockSize, producerCount, consumerCount int) { tasks := make(chan string, stockSize) for i := 0; i <= producerCount; i++ { go producer(fmt.Sprintf("p%d", i), tasks) } for i := 0; i <= consumerCount; i++ { go consumer(fmt.Sprintf("c%d", i), tasks) } exit := make(chan os.Signal, 1) // ctrl+c 中断程序 signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM) <-exit }

stockSize 是车间大小,表示了最多能存多少货,producerCount 是生产者个数,consumerCount 是消费者个数,当生产者 pdoducer 足够多的时候,把仓库 stockSize 塞满了就阻塞了没法再生产了,除非消费者 consumer 来拿走一些;反之如果消费者足够多的时候,仓库里没货也只能等着了,除非生产者 producer 造点货出来。

为了达到平衡,如果希望生产者以稳定的效率工作,不多不少,如何控制呢? 非常简单:

func producer2(max int, tasks chan<- string, doSth func() string) { lines := make(chan int, max) for { lines <- 1 tasks <- doSth() <-lines } }

max 表示最大的生产线,每次开一个生产线,就往通道里计数一个,生产完了,再从通道取出一个,因为通道最大是 max,满了就阻塞了,随时都保证了只有 max 条生产线存在。


下面的这个例子实现的功能是,等待一个对象完成自己的工作,和前面不同的是,它把 chan 的状态作为对象的一部分:

type request struct { data []interface{} // 模拟要处理的数据 finish chan struct{} // 完成后的标志 } func newRequest(data ...interface{}) *request { return &request{data, make(chan struct{}, 1)} } func doWork(req *request) { time.Sleep(1 * time.Second) // 模拟耗时 fmt.Println(req.data) // 工作内容就是打印 close(req.finish) // req.finish <- struct{}{} 也行 } func createWork() { req := newRequest(1, "string", true) go doWork(req) for i := 0; i < 100; i++ { time.Sleep(50 * time.Millisecond) fmt.Println("继续做事,我很忙") } <-req.finish }

createWork 函数通过并发提高了处理效率,单独开了一条线去处理 request,同时它可以继续往后做事,但是它保证在函数返回前 req 对象肯定被处理了。关于 chan 和并发,还有很多有用的固定模式,希望这几个例子可以抛砖引玉。


本章节的代码 https://github.com/developdeveloper/go-demo/tree/master/16-concurrent-pattern