k8s client-go快速入门教程及源代码阅读之选举

文章目录

client-go提供了许多有用的工具,其中就包括选举,通过选举的功能,我们可以同时运行多个业务实例,多个实例之间互为备份,并且同一时刻只有一个实例运行。

选举的逻辑并不复杂,复杂的是如何在一个分布式架构选举,由于etcd应该实现了共识算法,所以分布式的一些问题就不需要考虑了(比如脑裂)。选举的逻辑大致可以分为以下几步。

  1. 所有选举者不断尝试获取租约, apiserver(或者说etcd)会保证在同一时刻只有一个用户能够得到租约
  2. 获得租约就成为了Leader, 没有获得租约的用户就不断的重试,当Leader掉线后,其他选举者就可以竞争Leader的位置了
  3. Leader需要按照一个约定的周期不断更新租约,当租约的更新时间大于这个周期就说明Leader无法更新租约,也就是说Leader掉线了,那么那些不断重试的选举者就可以竞争并成为Leader了。

快速入门

这个例子来自client-go官方的例子, 简单的改了一下

源代码地址: https://github.com/kubernetes/client-go/blob/v0.20.2/examples/leader-election/main.go

func main() {
	var kubeconfig *string
	var leaseLockName string
	var leaseLockNamespace string
	var id string

	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	flag.StringVar(&leaseLockName, "lease-lock-name", "client-go-election", "the lease lock resource name")
	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "default", "the lease lock resource namespace")
	flag.Parse()

	// 1.
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	client := clientset.NewForConfigOrDie(config)
    // 2.
	run := func(ctx context.Context) {
		klog.Info("现在我是leader了")
		select {}
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

    // 3.
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

    // 4.
	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		Client: client.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	// 5.
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock: lock,
		ReleaseOnCancel: true,
		LeaseDuration:   60 * time.Second,
		RenewDeadline:   15 * time.Second,
		RetryPeriod:     5 * time.Second,
        // 6.
		Callbacks: leaderelection.LeaderCallbacks{
            //
			OnStartedLeading: func(ctx context.Context) {
				run(ctx)
			},
            //
			OnStoppedLeading: func() {
                // 
				klog.Infof("leader lost: %s", id)
				os.Exit(0)
			},
            //
			OnNewLeader: func(identity string) {
                // 7. 
				if identity == id {
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

代码分解如下:

  1. 创建rest.Config并构建静态客户端集合
  2. 业务代码逻辑入口,当成功选举成leader之后就会执行该函数
  3. 注册退出信号, 比如ctrl + c或者kill -15 pid这样的退出信号, 当收到信号后就可以关闭context, 而context的关闭可以通知leaderelection,这样leaderelection就会执行一些应该收尾的回调函数,比如用户注册的业务逻辑,或者释放租约
  4. 创建锁对象,指定命名空间,锁的名字,id等, 值得注意的是, configmap, endpoints等对象也能作为租约,不过首选还是租约,名字就很对象专业对口
  5. 开始选举,配置选举相关的配置,如租约的周期,刷新租约的超时时间,重试时间等
  6. 可以注册的各种回调函数
  7. 如果leader是自己就跳过

如果你执行了上述代码,你可以在k8s集群查询到配置的租约lease

kubectl get leases election
# 输出如下
NAME       HOLDER                                 AGE
election   ea0a3e2a-d8ec-4fa8-8581-02a0507337c9   16s

holder对应的是生成的uuid, 即Identity对应的值

源代码

选举并没有太多初始化的操作,通过传入的LeaderElectionConfig直接运行。

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    // 省略相关配置
}
                        
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    // 1.
	le, err := NewLeaderElector(lec)
    // 2.
	le.Run(ctx)
}
                        
func (le *LeaderElector) Run(ctx context.Context) {
	defer runtime.HandleCrash()
	defer func() {
        // 3. 
		le.config.Callbacks.OnStoppedLeading()
	}()
	// 4.
	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
    // 5.
	go le.config.Callbacks.OnStartedLeading(ctx)
    // 6.
	le.renew(ctx)
}

代码分解如下:

  1. 创建LeaderElector,创建的过程并不复杂,主要是校验各个参数是否合法
  2. 开始选举
  3. 在结束时,执行注册的回调函数
  4. 尝试不断获取租约,如果获得租约就是Leader了
  5. 并行执行注册的回调函数
  6. 不断更新租约,确保自己一直是Leader

根据上面的代码我们知道,选举的代码存在两个状态。

  1. 非Leader状态,这个状态会不断的尝试获取租约,这时的业务逻辑入口在le.acquire(ctx)
  2. Leader状态,这个状态会不断的更新租约,这里可以告诉其他选举者,Leader一直在线,这时的业务逻辑入口在le.renew(ctx)

非Leader状态

这个状态会不断的尝试获取租约以成为Leader.

func (le *LeaderElector) acquire(ctx context.Context) bool {
    // 1.
    ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	succeeded := false
	wait.JitterUntil(func() {
        // 2.
		succeeded = le.tryAcquireOrRenew(ctx)
        // 3. 
		le.maybeReportTransition()
        // 4.
		if !succeeded {
			klog.V(4).Infof("failed to acquire lease %v", desc)
			return
		}
        // 5. 
        cancel()
	}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
	return succeeded
}

代码分解如下:

  1. 创建一个context, 这个context用于终止wait.JitterUntil
  2. 尝试获取租约
  3. 判断是否换了Leader, 如果是的话就回调OnNewLeader
  4. 没有成功就退出,因为函数是在wait.JitterUntil里执行,所以退出之后还会不断的重试
  5. 成功了就通过contextcancel来停止wait.JitterUntil

client-go里面的k8s.io/apimachinery/pkg/util/wait 有各种重试逻辑的帮助函数,比如这里的wait.JitterUntil

而尝试获取租约的代码如下:

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
	now := metav1.Now()
	leaderElectionRecord := rl.LeaderElectionRecord{
		HolderIdentity:       le.config.Lock.Identity(),
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
		RenewTime:            now,
		AcquireTime:          now,
	}

	// 1. 
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
	if err != nil {
        // 2.
		if !errors.IsNotFound(err) {
			return false
		}
		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}
		le.observedRecord = leaderElectionRecord
		le.observedTime = le.clock.Now()
		return true
	}

	// 3.
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

	// 4.
	if le.IsLeader() {
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

	// 5.
	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}
	return true
}

func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
	var err error
    // 6.
	ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
	record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
	recordByte, err := json.Marshal(*record)
	if err != nil {
		return nil, nil, err
	}
	return record, recordByte, nil
}

代码分解如下:

  1. 获取当前租约的状态,这和静态客户端获取资源对象没有太多不同,只是在静态客户端的基础上封装了一层
  2. 如果404错误,说明租约不存在,那么应该创建以便成为Leader
  3. 判断租约是否被其他选举者获得
  4. 判断自己是否获得租约
  5. 如果更新租约失败,可能是由于网络原因,这段逻辑是马上renew一下的意思, 当然了,最主要的原因是作者比较懒,把获取租约和更新租约的代码放在一个函数里面,从它的函数名就能看出来tryAcquireOrRenew.
  6. 第1步的代码逻辑

tryAcquireOrRenew这段逻辑会在称为Leader之前不断的尝试。

Leader状态

如果获取到了租约,那么就会成为Leader,成为了Leader也不是一劳永逸,而是要不断的刷新租约,告诉其他选举者自己还在线,你们这些家伙休想取而代之。

func (le *LeaderElector) Run(ctx context.Context) {
	// 省略获取租约等逻辑
	le.renew(ctx)
}

func (le *LeaderElector) renew(ctx context.Context) {
   	// 1.
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	wait.Until(func() {
        // 2.
		timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
		defer timeoutCancel()
        // 3.
		err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
			return le.tryAcquireOrRenew(timeoutCtx), nil
		}, timeoutCtx.Done())
		// 4.
		le.maybeReportTransition()
		desc := le.config.Lock.Describe()
		if err == nil {
			klog.V(5).Infof("successfully renewed lease %v", desc)
			return
		}
        // 5.
		le.config.Lock.RecordEvent("stopped leading")
		le.metrics.leaderOff(le.config.Name)
		klog.Infof("failed to renew lease %v: %v", desc, err)
		cancel()
	}, le.config.RetryPeriod, ctx.Done())

	// 6.
	if le.config.ReleaseOnCancel {
		le.release()
	}
}

代码分解如下:

  1. 利用上下文退出,golang的常见操作
  2. context套娃,构建一个会超时的context
  3. 开始刷新租约,并传入设置了超时时间的context,在指定时间内没有renew成功,说明不能继续胜任Leader了,需要下岗了。
  4. 判断是否刚上岗,如果刚上岗,回调一下回调函数
  5. 记录日志,取消后续的renew操作
  6. 如果有回调就触发回调

如果不熟悉k8s的部署,可能会有个问题,Leader掉线之后岂不是少了一个参选者? 如果网络环境差的话,,岂不是参选者越来越少?那么最后岂不是一个选举者都没了,怎么做到高可用?

为了避免这种情况,应该有两个前提

  1. 业务代码应该将整个逻辑放在OnStartedLeading,这样从Leader状态掉线后就会终止程序
  2. 程序应该部署在会重启的环境中,比如以Deployment部署在k8s环境, 挂了会重启,那么重启之后就能继续参选了, 这样选举者会一直保持一定的数量,当然了,用client-go却不跑在k8s环境里似乎是一件很怪异的事情。

总结

通过client-go和k8s环境我们可以很容易写一个高可用的应用,这样的高可用应用可以保证总是有一个实例处理业务,因为只有一个实例处理业务,那么可以避免一些并发冲突的问题。

参考链接