k8s client-go快速入门教程及源代码阅读之选举
client-go提供了许多有用的工具,其中就包括选举,通过选举的功能,我们可以同时运行多个业务实例,多个实例之间互为备份,并且同一时刻只有一个实例运行。
选举的逻辑并不复杂,复杂的是如何在一个分布式架构选举,由于etcd应该实现了共识算法,所以分布式的一些问题就不需要考虑了(比如脑裂)。选举的逻辑大致可以分为以下几步。
- 所有选举者不断尝试获取租约, apiserver(或者说etcd)会保证在同一时刻只有一个用户能够得到租约
- 获得租约就成为了Leader, 没有获得租约的用户就不断的重试,当Leader掉线后,其他选举者就可以竞争Leader的位置了
- Leader需要按照一个约定的周期不断更新租约,当租约的更新时间大于这个周期就说明Leader无法更新租约,也就是说Leader掉线了,那么那些不断重试的选举者就可以竞争并成为Leader了。
快速入门
这个例子来自client-go官方的例子, 简单的改了一下
源代码地址: https://github.com/kubernetes/client-go/blob/v0.20.2/examples/leader-election/main.go
func main() {
var kubeconfig *string
var leaseLockName string
var leaseLockNamespace string
var id string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "client-go-election", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "default", "the lease lock resource namespace")
flag.Parse()
// 1.
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
client := clientset.NewForConfigOrDie(config)
// 2.
run := func(ctx context.Context) {
klog.Info("现在我是leader了")
select {}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 3.
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// 4.
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// 5.
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
// 6.
Callbacks: leaderelection.LeaderCallbacks{
//
OnStartedLeading: func(ctx context.Context) {
run(ctx)
},
//
OnStoppedLeading: func() {
//
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
//
OnNewLeader: func(identity string) {
// 7.
if identity == id {
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}
代码分解如下:
- 创建
rest.Config
并构建静态客户端集合 - 业务代码逻辑入口,当成功选举成leader之后就会执行该函数
- 注册退出信号, 比如
ctrl + c
或者kill -15 pid
这样的退出信号, 当收到信号后就可以关闭context, 而context的关闭可以通知leaderelection,这样leaderelection就会执行一些应该收尾的回调函数,比如用户注册的业务逻辑,或者释放租约 - 创建锁对象,指定命名空间,锁的名字,id等, 值得注意的是, configmap, endpoints等对象也能作为租约,不过首选还是租约,名字就很对象专业对口
- 开始选举,配置选举相关的配置,如租约的周期,刷新租约的超时时间,重试时间等
- 可以注册的各种回调函数
- 如果leader是自己就跳过
如果你执行了上述代码,你可以在k8s集群查询到配置的租约lease
kubectl get leases election
# 输出如下
NAME HOLDER AGE
election ea0a3e2a-d8ec-4fa8-8581-02a0507337c9 16s
holder对应的是生成的uuid, 即
Identity
对应的值
源代码
选举并没有太多初始化的操作,通过传入的LeaderElectionConfig
直接运行。
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
// 省略相关配置
}
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
// 1.
le, err := NewLeaderElector(lec)
// 2.
le.Run(ctx)
}
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
// 3.
le.config.Callbacks.OnStoppedLeading()
}()
// 4.
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 5.
go le.config.Callbacks.OnStartedLeading(ctx)
// 6.
le.renew(ctx)
}
代码分解如下:
- 创建
LeaderElector
,创建的过程并不复杂,主要是校验各个参数是否合法 - 开始选举
- 在结束时,执行注册的回调函数
- 尝试不断获取租约,如果获得租约就是Leader了
- 并行执行注册的回调函数
- 不断更新租约,确保自己一直是Leader
根据上面的代码我们知道,选举的代码存在两个状态。
- 非Leader状态,这个状态会不断的尝试获取租约,这时的业务逻辑入口在
le.acquire(ctx)
- Leader状态,这个状态会不断的更新租约,这里可以告诉其他选举者,Leader一直在线,这时的业务逻辑入口在
le.renew(ctx)
非Leader状态
这个状态会不断的尝试获取租约以成为Leader.
func (le *LeaderElector) acquire(ctx context.Context) bool {
// 1.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
wait.JitterUntil(func() {
// 2.
succeeded = le.tryAcquireOrRenew(ctx)
// 3.
le.maybeReportTransition()
// 4.
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
// 5.
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
代码分解如下:
- 创建一个context, 这个context用于终止
wait.JitterUntil
- 尝试获取租约
- 判断是否换了Leader, 如果是的话就回调
OnNewLeader
- 没有成功就退出,因为函数是在
wait.JitterUntil
里执行,所以退出之后还会不断的重试 - 成功了就通过
context
的cancel
来停止wait.JitterUntil
。
client-go里面的k8s.io/apimachinery/pkg/util/wait 有各种重试逻辑的帮助函数,比如这里的
wait.JitterUntil
而尝试获取租约的代码如下:
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1.
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
// 2.
if !errors.IsNotFound(err) {
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// 3.
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 4.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// 5.
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
return true
}
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var err error
// 6.
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
recordByte, err := json.Marshal(*record)
if err != nil {
return nil, nil, err
}
return record, recordByte, nil
}
代码分解如下:
- 获取当前租约的状态,这和静态客户端获取资源对象没有太多不同,只是在静态客户端的基础上封装了一层
- 如果404错误,说明租约不存在,那么应该创建以便成为Leader
- 判断租约是否被其他选举者获得
- 判断自己是否获得租约
- 如果更新租约失败,可能是由于网络原因,这段逻辑是马上renew一下的意思, 当然了,最主要的原因是作者比较懒,把获取租约和更新租约的代码放在一个函数里面,从它的函数名就能看出来
tryAcquireOrRenew
. - 第1步的代码逻辑
tryAcquireOrRenew
这段逻辑会在称为Leader之前不断的尝试。
Leader状态
如果获取到了租约,那么就会成为Leader,成为了Leader也不是一劳永逸,而是要不断的刷新租约,告诉其他选举者自己还在线,你们这些家伙休想取而代之。
func (le *LeaderElector) Run(ctx context.Context) {
// 省略获取租约等逻辑
le.renew(ctx)
}
func (le *LeaderElector) renew(ctx context.Context) {
// 1.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {
// 2.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
// 3.
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
// 4.
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
return
}
// 5.
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// 6.
if le.config.ReleaseOnCancel {
le.release()
}
}
代码分解如下:
- 利用上下文退出,golang的常见操作
- context套娃,构建一个会超时的context
- 开始刷新租约,并传入设置了超时时间的context,在指定时间内没有renew成功,说明不能继续胜任Leader了,需要下岗了。
- 判断是否刚上岗,如果刚上岗,回调一下回调函数
- 记录日志,取消后续的renew操作
- 如果有回调就触发回调
如果不熟悉k8s的部署,可能会有个问题,Leader掉线之后岂不是少了一个参选者? 如果网络环境差的话,,岂不是参选者越来越少?那么最后岂不是一个选举者都没了,怎么做到高可用?
为了避免这种情况,应该有两个前提
- 业务代码应该将整个逻辑放在
OnStartedLeading
,这样从Leader状态掉线后就会终止程序 - 程序应该部署在会重启的环境中,比如以Deployment部署在k8s环境, 挂了会重启,那么重启之后就能继续参选了, 这样选举者会一直保持一定的数量,当然了,用client-go却不跑在k8s环境里似乎是一件很怪异的事情。
总结
通过client-go和k8s环境我们可以很容易写一个高可用的应用,这样的高可用应用可以保证总是有一个实例处理业务,因为只有一个实例处理业务,那么可以避免一些并发冲突的问题。