1. 分布式并发原语概述
在前面的课程里,我们学习的并发原语都是在进程内使用的,即一个运行程序为了控制共享资源、实现任务编排和进行消息传递而提供的控制类型。布式的并发原语实现更加复杂,因为在分布式环境中,网络不可靠、时钟不一致、以及不可预测的程序暂停,使得节点之间的通信相对于内存具有非常的不确定性。不过还好有相应的软件系统去做这些事情。这些软件系统会专门去处理这些节点之间的协调和异常情况,并且保证数据的一致性。我们要做的就是在它们的基础上实现我们的业务。etcd 就提供了非常好的分布式并发原语,比如分布式互斥锁、分布式读写锁、Leader 选举,等等。所以,今天,我就以 etcd 为基础,给你介绍几种分布式并发原语。
2. etcd 单节点集群安装
etcd 集群的安装配置参考其官方文档。本地独立集群的安装如下:
1
2
3
|
yum install etcd
etcd -listen-client-urls=http://192.168.108.55:2379 --advertise-client-urls=http://192.168.108.55:2379
|
2.1 etcd go 模块安装
1
2
3
|
# 1. 安装 gRPC
go get -u google.golang.org/grpc
go get github.com/grpc-ecosystem/go-grpc-middleware
|
3. Leader 选举
Leader 选举常常用在主从架构的系统中。主从架构中的服务节点分为主(Leader、Master)和从(Follower、Slave)两种角色,实际节点包括 1 主 n 从,一共是 n+1 个节点。主节点常常执行写操作,从节点常常执行读操作,如果读写都在主节点,从节点只是提供一个备份功能的话,那么,主从架构就会退化成主备模式架构。
主从架构中最重要的是如何确定节点的角色,在同一时刻,系统中不能有两个主节点,否则,如果两个节点都是主,都执行写操作的话,就有可能出现数据不一致的情况,所以,我们需要一个选主机制,选择一个节点作为主节点,这个过程就是 Leader 选举。当主节点宕机或者是不可用时,就需要新一轮的选举,从其它的从节点中选择出一个节点,让它作为新主节点,宕机的原主节点恢复后,可以变为从节点,或者被摘掉。
接下来,我们将介绍业务开发中跟 Leader 选举相关的选举、查询、Leader 变动监控等功能。
3.1 选举
如果业务集群还没有主节点,或者主节点宕机了,就需要发起新一轮的选主操作,主要会用到 Campaign 和 Proclaim。如果你需要主节点放弃主的角色,让其它从节点有机会成为主节点,就可以调用 Resign 方法。
- Campaign:
- 作用:把一个节点选举为主节点,并且会设置一个值
- 签名:
func (e *Election) Campaign(ctx context.Context, val string) error
- 说明: 这是一个阻塞方法,在调用它的时候会被阻塞,直到满足下面的三个条件之一,才会取消阻塞
- Proclaim:
- 作用: 重新设置 Leader 的值,但是不会重新选主
- 返回: 新值设置成功或者失败的信息
- 签名:
func (e *Election) Proclaim(ctx context.Context, val string) error
- Resign:
- 作用: 开始新一次选举
- 返回: 新的选举成功或者失败的信息
- 签名:
func (e *Election) Resign(ctx context.Context) (err error)
3.2 查询
程序在启动的过程中,或者在运行的时候,还有可能需要查询当前的主节点是哪一个节点?主节点的值是什么?此外查询主节点以便把读写请求发往相应的主从节点上。etcd 提供了查询当前主几点的 Leader 的方法
- Leader:
- 作用:查询当前的主节点
- 返回: 如果当前还没有 Leader,就返回一个错误
- 签名:
func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error)
- Rev:
- 作用: 查询版本号信息
- 说明: 每次主节点的变动都会生成一个新的版本号
- 签名:
func (e *Election) Rev() int64
3.3 监控
有了选举和查询方法,我们还需要一个监控方法。毕竟,如果主节点变化了,我们需要得到最新的主节点信息。我们可以通过 Observe 来监控主的变化:
- Observe
- 作用: 监控主节点的变化
- 签名:
func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse
- 返回: 一个 chan,显示主节点的变动信息,它不会返回主节点的全部历史变动信息,而是只返回最近的一条变动信息以及之后的变动信息。
3.4 使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
package main
import (
"bufio"
"context"
"flag"
"fmt"
"log"
"os"
"strings"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/clientv3"
)
var (
nodeID = flag.Int("id", 0, "node ID")
addr = flag.String("addr", "http://192.168.108.55:2379", "etcd address")
electName = flag.String("name", "my-test-elect", "election name")
count int
)
func main() {
endpoints := strings.Split(*addr, ",")
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
session, err := concurrency.NewSession(cli)
defer session.Close()
el := concurrency.NewElection(session, *electName)
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
switch action {
case "elect":
go elect(el, *electName)
case "proclaim":
proclaim(el, *electName)
case "resign":
resign(el, *electName)
case "watch":
go watch(el, *electName)
case "query":
query(el, *electName)
case "rev":
rev(el, *electName)
default:
fmt.Println("unkonw error")
}
}
}
// 选主
func elect(e1 *concurrency.Election, electName string) {
log.Println("acampaigning for ID:", *nodeID)
// 调用Campaign方法选主,主的值为value-<主节点ID>-<count>
if err := e1.Campaign(context.Background(), fmt.Sprintf("value-%d-%d", *nodeID, count)); err != nil {
log.Println(err)
}
log.Println("campaigned for ID:", *nodeID)
count++
}
// 为主设置新值
func proclaim(e1 *concurrency.Election, electName string) {
log.Println("proclaiming for ID:", *nodeID)
// 调用Proclaim方法设置新值,新值为value-<主节点ID>-<count>
if err := e1.Proclaim(context.Background(), fmt.Sprintf("value-%d-%d", *nodeID, count)); err != nil {
log.Println(err)
}
log.Println("proclaimed for ID:", *nodeID)
count++
}
// 重新选主,有可能另外一个节点被选为了主
func resign(e1 *concurrency.Election, electName string) {
log.Println("resigning for ID:", *nodeID)
// 调用Resign重新选主
if err := e1.Resign(context.TODO()); err != nil {
log.Println(err)
}
log.Println("resigned for ID:", *nodeID)
}
// 查询主的信息
func query(e1 *concurrency.Election, electName string) {
// 调用Leader返回主的信息,包括key和value等信息
resp, err := e1.Leader(context.Background())
if err != nil {
log.Printf("failed to get the current leader: %v", err)
}
log.Println("current leader:", string(resp.Kvs[0].Key), string(resp.Kvs[0].Value))
}
// 可以直接查询主的rev信息
func rev(e1 *concurrency.Election, electName string) {
rev := e1.Rev()
log.Println("current rev:", rev)
}
// 监控主节点的变化
func watch(e1 *concurrency.Election, electName string) {
ch := e1.Observe(context.TODO())
log.Println("start to watch for ID:", *nodeID)
for i := 0; i < 10; i++ {
resp := <-ch
log.Println("leader changed to", string(resp.Kvs[0].Key), string(resp.Kvs[0].Value))
}
}
|
4. 互斥锁
前面说的互斥锁都是用来保护同一进程内的共享资源的,今天我们要重点学习下分布在不同机器中的不同进程内的 goroutine,如何利用分布式互斥锁来保护共享资源。
互斥锁的应用场景和主从架构的应用场景不太一样。使用互斥锁的不同节点是没有主从这样的角色的,所有的节点都是一样的,只不过在同一时刻,只允许其中的一个节点持有锁。
4.1 Locker
etcd 提供了一个简单的 Locker 原语,它类似于 Go 标准库中的 sync.Locker 接口,也提供了 Lock/UnLock 的机制:
1
|
func NewLocker(s *Session, pfx string) sync.Locker
|
下面的代码是一个使用 Locker 并发原语的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
package main
import (
"flag"
"log"
"math/rand"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
)
var (
addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
lockName = flag.String("name", "my-test-lock", "lock name")
)
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
// etcd地址
endpoints := strings.Split(*addr, ",")
// 生成一个etcd client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
useLock(cli) // 测试锁
}
func useLock(cli *clientv3.Client) {
// 为锁生成session
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
//得到一个分布式锁
locker := concurrency.NewLocker(s1, *lockName)
// 请求锁
log.Println("acquiring lock")
locker.Lock()
log.Println("acquired lock")
// 等待一段时间
time.Sleep(time.Duration(rand.Intn(30)) * time.Second)
locker.Unlock() // 释放锁
log.Println("released lock")
}
|
4.2 Mutex
Locker 是基于 Mutex 实现的,只不过,Mutex 提供了查询 Mutex 的 key 的信息的功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func useMutex(cli *clientv3.Client) {
// 为锁生成session
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, *lockName)
//在请求锁之前查询key
log.Printf("before acquiring. key: %s", m1.Key())
// 请求锁
log.Println("acquiring lock")
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Printf("acquired lock. key: %s", m1.Key())
//等待一段时间
time.Sleep(time.Duration(rand.Intn(30)) * time.Second)
// 释放锁
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Println("released lock")
}
|
可以看到,Mutex 并没有实现 sync.Locker 接口,它的 Lock/Unlock 方法需要提供一个 context.Context 实例做参数,这也就意味着,在请求锁的时候,你可以设置超时时间,或者主动取消请求。
5. 读写锁
etcd 也提供了分布式的读写锁。不过,互斥锁 Mutex 是在 github.com/coreos/etcd/clientv3/concurrency 包中提供的,读写锁 RWMutex 却是在 github.com/coreos/etcd/contrib/recipes 包中提供的。
etcd 提供的分布式读写锁的功能和标准库的读写锁的功能是一样的。只不过,etcd 提供的读写锁,可以在分布式环境中的不同的节点使用。它提供的方法也和标准库中的读写锁的方法一致,分别提供了 RLock/RUnlock、Lock/Unlock 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package main
import (
"bufio"
"flag"
"fmt"
"log"
"math/rand"
"os"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
recipe "github.com/coreos/etcd/contrib/recipes"
)
var (
addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
lockName = flag.String("name", "my-test-lock", "lock name")
action = flag.String("rw", "w", "r means acquiring read lock, w means acquiring write lock")
)
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
// 解析etcd地址
endpoints := strings.Split(*addr, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建session
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := recipe.NewRWMutex(s1, *lockName)
// 从命令行读取命令
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
switch action {
case "w": // 请求写锁
testWriteLocker(m1)
case "r": // 请求读锁
testReadLocker(m1)
default:
fmt.Println("unknown action")
}
}
}
func testWriteLocker(m1 *recipe.RWMutex) {
// 请求写锁
log.Println("acquiring write lock")
if err := m1.Lock(); err != nil {
log.Fatal(err)
}
log.Println("acquired write lock")
// 等待一段时间
time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
// 释放写锁
if err := m1.Unlock(); err != nil {
log.Fatal(err)
}
log.Println("released write lock")
}
func testReadLocker(m1 *recipe.RWMutex) {
// 请求读锁
log.Println("acquiring read lock")
if err := m1.RLock(); err != nil {
log.Fatal(err)
}
log.Println("acquired read lock")
// 等待一段时间
time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
// 释放写锁
if err := m1.RUnlock(); err != nil {
log.Fatal(err)
}
log.Println("released read lock")
}
|
参考
本文内容摘录自:
- 极客专栏-鸟叔的 Go 并发编程实战