目录

Channel 应用

1. 使用反射操作 Channel

在学习如何使用 Channel 之前,我们来看看如何通过反射的方式执行 select 语句,这在处理很多的 case clause,尤其是不定长的 case clause 的时候,非常有用。

为了便于操作 Select,reflect 提供了如下几个函数:

  1. func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool):
    • 参数: SelectCase 表示 Select 语句的一个分支
    • 返回值:
      • chosen: select 是伪随机的,它在执行的 case 中随机选择一个 case,并把选择的这个 case 的索引(chosen)返回
      • recv: 如果 select 选中的 recv case,recvValue 表示接收的元素
      • recvOK: 表示是否有 case 成功被选择,false 表示没有可用的 case 返回
  2. SelectCase: struct 表示一个 select case 分支
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
const (
    SelectSend    // case Chan <- Send
    SelectRecv    // case <-Chan:
    SelectDefault // default
)


type SelectCase struct {
    Dir  SelectDir // case的方向
    Chan Value     // 使用的通道(收/发)
    Send Value     // 用于发送的值
}

type SelectDir int

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

下面是动态创建 Select 的一个示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

func main() {
    var ch1 = make(chan int, 10)
    var ch2 = make(chan int, 10)

    // 创建SelectCase
    var cases = createCases(ch1, ch2)

    // 执行10次select
    for i := 0; i < 10; i++ {
        chosen, recv, ok := reflect.Select(cases)
        if recv.IsValid() { // recv case
            fmt.Println("recv:", cases[chosen].Dir, recv, ok)
        } else { // send case
            fmt.Println("send:", cases[chosen].Dir, ok)
        }
    }
}

func createCases(chs ...chan int) []reflect.SelectCase {
    var cases []reflect.SelectCase


    // 创建recv case
    for _, ch := range chs {
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
        })
    }

    // 创建send case
    for i, ch := range chs {
        v := reflect.ValueOf(i)
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectSend,
            Chan: reflect.ValueOf(ch),
            Send: v,
        })
    }

    return cases
}

上一节我们说了 Channel 的五种使用场景:

  1. 数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)
  2. 数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
  3. 信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine
  4. 任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能
  5. 锁:利用 Channel 也可以实现互斥锁的机制

接下来我们一一举例说明。

2.消息交流

从 chan 的内部实现看,它是以一个循环队列的方式存放数据,所以,它有时候也会被当成线程安全的队列和 buffer 使用。我们来看几个例子。

2.1 worker 池

Marcio Castilho 在 使用 Go 每分钟处理百万请求 这篇文章中,就介绍了他们应对大并发请求的设计。他们将用户的请求放在一个 chan Job 中,这个 chan Job 就相当于一个待处理任务队列。除此之外,还有一个 chan chan Job 队列,用来存放可以处理任务的 worker 的缓存队列。具体的实现参见 Go Work Pool

3. 数据传递

下面是一个数据传递(任务编排)的例子,让四个 goroutine 顺序打印 1,2,3,4

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

type Token struct{}

func newWorker(id int, ch chan Token, nextCh chan Token) {
    for {
        token := <-ch         // 取得令牌
        fmt.Println((id + 1)) // id从1开始
        time.Sleep(time.Second)
        nextCh <- token
    }
}
func main() {
    chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}

    // 创建4个worker
    for i := 0; i < 4; i++ {
        go newWorker(i, chs[i], chs[(i+1)%4])
    }

    //首先把令牌交给第一个worker
    chs[0] <- struct{}{}
  
    select {}
}

这类场景有一个特点,就是当前持有数据的 goroutine 都有一个信箱,信箱使用 chan 实现,goroutine 只需要关注自己的信箱中的数据,处理完毕后,就把结果发送到下一家的信箱中。

4. 信号通知

chan 类型有这样一个特点:chan 如果为空,那么,receiver 接收数据的时候就会阻塞等待,直到 chan 被关闭或者有新的数据到来。利用这个机制,我们可以实现 wait/notify 的设计模式。

除了正常的业务处理时的 wait/notify,我们经常碰到的一个场景,就是程序关闭的时候,我们需要在退出之前做一些清理(doCleanup 方法)的动作。这个时候,我们经常要使用 chan。

比如,使用 chan 实现程序的 graceful shutdown,在退出之前执行一些连接关闭、文件 close、缓存落盘等一些动作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func main() {
  go func() {
      ...... // 执行业务处理
    }()

  // 处理CTRL+C等中断信号
  termChan := make(chan os.Signal)
  signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
  <-termChan 

  // 执行退出之前的清理动作
    doCleanup()
  
  fmt.Println("优雅退出")
}

有时候,doCleanup 可能是一个很耗时的操作,我们需要设置一个最长的等待时间。只要超过了这个时间,程序就不再等待,可以直接退出。所以,退出的时候分为两个阶段:

  1. closing,代表程序退出,但是清理工作还没做;
  2. closed,代表清理工作已经做完。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

func main() {
    var closing = make(chan struct{})
    var closed = make(chan struct{})

    go func() {
        // 模拟业务处理
        for {
            select {
            case <-closing:
                return
            default:
                // ....... 业务计算
                time.Sleep(100 * time.Millisecond)
            }
        }
    }()

    // 处理CTRL+C等中断信号
    termChan := make(chan os.Signal)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan

    close(closing)
    // 执行退出之前的清理动作
    go doCleanup(closed)

    select {
    case <-closed:
    case <-time.After(time.Second):
        fmt.Println("清理超时,不等了")
    }
    fmt.Println("优雅退出")
}

func doCleanup(closed chan struct{}) {
    time.Sleep((time.Minute))
    close(closed)
}

5. 锁

使用 chan 也可以实现互斥锁。在 chan 的内部实现中,就有一把互斥锁保护着它的所有字段。从外在表现上,chan 的发送和接收之间也存在着 happens-before 的关系,保证元素放进去之后,receiver 才能读取到。

要想使用 chan 实现互斥锁,至少有两种方式。一种方式是先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获取了这把锁。另一种方式是,先初始化一个 capacity 等于 1 的 Channel,它的“空槽”代表锁,谁能成功地把元素发送到这个 Channel,谁就获取了这把锁。

我们以第一种为例实现一个锁:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

// 使用chan实现互斥锁
type Mutex struct {
    ch chan struct{}
}

// 使用锁需要初始化
func NewMutex() *Mutex {
    mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

// 请求锁,直到获取到
func (m *Mutex) Lock() {
    <-m.ch
}

// 解锁
func (m *Mutex) Unlock() {
    select {
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:
    }
    return false
}

// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    select {
    case <-m.ch:
        timer.Stop()
        return true
    case <-timer.C:
    }
    return false
}

// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
    return len(m.ch) == 0
}


func main() {
    m := NewMutex()
    ok := m.TryLock()
    fmt.Printf("locked v %v\n", ok)
    ok = m.TryLock()
    fmt.Printf("locked %v\n", ok)
}

6. 任务编排

消息交流的场景是一个特殊的任务编排的场景:

  1. 前面我们介绍的顺序答应1,2,3,4 s 也被称为流水线模式
  2. WaitGroup 可以实现等待模式,Channel 也可以实现这种等待模式

任务编排既指安排 goroutine 按照指定的顺序执行,也指多个 chan 按照指定的方式组合处理的方式。我们通过编排数据在 channel 之间的流转,就可以控制 goroutine 的执行。接下来我们着重介绍 channel 编排的五种典型模式L

  1. Or-Done
  2. 扇入
  3. 扇出
  4. Stream
  5. map-reduce

6.1 Or-Done 模式

Or-Done 模式是信号通知模式中更宽泛的一种模式。我们会使用“信号通知”实现某个任务执行完成后的通知机制,在实现时:

  1. 我们为这个任务定义一个类型为 chan struct{}类型的 done 变量
  2. 等任务结束后,我们就可以 close 这个变量,然后,其它 receiver 就会收到这个通知。

这是有一个任务的情况,如果有多个任务,只要有任意一个任务执行完,我们就想获得这个信号,这就是 Or-Done 模式。比如发送同一个请求到多个微服务节点,只要任意一个微服务返回结果就算成功。下面是Or-Done 的一个实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

func or(channels ...<-chan interface{}) <-chan interface{} {
    // 特殊情况,只有零个或者1个chan
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2: // 2个也是一种特殊情况
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default: //超过两个,二分法递归处理
            m := len(channels) / 2
            select {
            case <-or(channels[:m]...):
            case <-or(channels[m:]...):
            }
        }
    }()

    return orDone
}

这里的实现使用了一个巧妙的方式,当 chan 的数量大于 2 时,使用递归的方式等待信号。在 chan 数量比较多的情况下,递归并不是一个很好的解决方式,根据这一讲最开始介绍的反射的方法,我们也可以实现 Or-Done 模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

func or(channels ...<-chan interface{}) <-chan interface{} {
    //特殊情况,只有0个或者1个
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        // 利用反射构建SelectCase
        var cases []reflect.SelectCase
        for _, c := range channels {
            cases = append(cases, reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(c),
            })
        }

        // 随机选择一个可用的case
        reflect.Select(cases)
    }()


    return orDone
}

6.2 扇入

Channel 扇入模式来说,是指有多个源 Channel 输入、一个目的 Channel 输出的情况。每个源 Channel 的元素都会发送给目标 Channel,相当于目标 Channel 的 receiver 只需要监听目标 Channel,就可以接收所有发送给源 Channel 的数据。即合并多个 channel 为一个 channel

扇入模式也可以使用反射、递归,或者是用最笨的每个 goroutine 处理一个 Channel 的方式来实现。

反射实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        defer close(out)
        // 构造SelectCase slice
        var cases []reflect.SelectCase
        for _, c := range chans {
            cases = append(cases, reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(c),
            })
        }
        
        // 循环,从cases中选择一个可用的
        for len(cases) > 0 {
            i, v, ok := reflect.Select(cases)
            if !ok { // 此channel已经close
                cases = append(cases[:i], cases[i+1:]...)
                continue
            }
            out <- v.Interface()
        }
    }()
    return out
}

递归实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
    switch len(chans) {
    case 0:
        c := make(chan interface{})
        close(c)
        return c
    case 1:
        return chans[0]
    case 2:
        return mergeTwo(chans[0], chans[1])
    default:
        m := len(chans) / 2
        return mergeTwo(
            fanInRec(chans[:m]...),
            fanInRec(chans[m:]...))
    }
}


func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        for a != nil || b != nil { //只要还有可读的chan
            select {
            case v, ok := <-a:
                if !ok { // a 已关闭,设置为nil
                    a = nil
                    continue
                }
                c <- v
            case v, ok := <-b:
                if !ok { // b 已关闭,设置为nil
                    b = nil
                    continue
                }
                c <- v
            }
        }
    }()
    return c
}

6.3 扇出

扇出模式只有一个输入源 Channel,有多个目标 Channel,经常用在设计模式中的观察者模式中,将一个一个对象的状态变化通知到多个观察者中。扇入模式也可以使用反射、递归,或者是用最笨的每个 goroutine 处理一个 Channel 的方式来实现。即消息的广播

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
    go func() {
        defer func() { //退出时关闭所有的输出chan
            for i := 0; i < len(out); i++ {
                close(out[i])
            }
        }()

        for v := range ch { // 从输入chan中读取数据
            v := v
            for i := 0; i < len(out); i++ {
                // 注意这里的赋值
                i := i
                if async { //异步
                    go func() {
                        out[i] <- v // 放入到输出chan中,异步方式
                    }()
                } else {
                    out[i] <- v // 放入到输出chan中,同步方式
                }
            }
        }
    }()
}

6.4 Stream

把 Channel 当作流式管道使用,提供跳过几个元素,或者是只取其中的几个元素等方法。在下面的实现中,我们首先创建流,然后为流定义以下方法:

  1. takeN:只取流中的前 n 个数据;
  2. takeFn:筛选流中的数据,只保留满足条件的数据;
  3. takeWhile:只取前面满足条件的数据,一旦不满足条件,就不再取;
  4. skipN:跳过流中前几个数据;
  5. skipFn:跳过满足条件的数据;
  6. skipWhile:跳过前面满足条件的数据,一旦不满足条件,当前这个元素和以后的元素都会输出给 Channel 的 receiver。

这些方法的实现很类似,我们以 takeN 为例。

创建流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
    s := make(chan interface{}) //创建一个unbuffered的channel
    go func() { // 启动一个goroutine,往s中塞数据
        defer close(s) // 退出时关闭chan
        for _, v := range values { // 遍历数组
            select {
            case <-done:
                return
            case s <- v: // 将数组元素塞入到chan中
            }
        }
    }()
    return s
}

流上的方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
    takeStream := make(chan interface{}) // 创建输出流
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ { // 只读取前num个元素
            select {
            case <-done:
                return
            case takeStream <- <-valueStream: //从输入流中读取元素
            }
        }
    }()
    return takeStream
}

6.5 map-reduce

这里我们要讲的是单机单进程的 map-reduce 方法。map-reduce 分为两个步骤,第一步是映射(map),处理队列中的数据,第二步是规约(reduce),把列表中的每一个元素按照一定的处理方式处理成结果,放入到结果队列中。

map

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17

func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
    out := make(chan interface{}) //创建一个输出chan
    if in == nil { // 异常检查
        close(out)
        return out
    }

    go func() { // 启动一个goroutine,实现map的主要逻辑
        defer close(out)
        for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
            out <- fn(v)
        }
    }()

    return out
}

reduce

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
    if in == nil { // 异常检查
        return nil
    }

    out := <-in // 先读取第一个元素
    for v := range in { // 实现reduce的主要逻辑
        out = fn(out, v)
    }

    return out
}

应用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// 生成一个数据流
func asStream(done <-chan struct{}) <-chan interface{} {
    s := make(chan interface{})
    values := []int{1, 2, 3, 4, 5}
    go func() {
        defer close(s)
        for _, v := range values { // 从数组生成
            select {
            case <-done:
                return
            case s <- v:
            }
        }
    }()
    return s
}

func main() {
    in := asStream(nil)

    // map操作: 乘以10
    mapFn := func(v interface{}) interface{} {
        return v.(int) * 10
    }

    // reduce操作: 对map的结果进行累加
    reduceFn := func(r, v interface{}) interface{} {
        return r.(int) + v.(int)
    }

    sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果
    fmt.Println(sum)
}

参考

本文内容摘录自:

  1. 极客专栏-鸟叔的 Go 并发编程实战