k8s client-go快速入门教程及源代码阅读之Informer

文章目录

只要读k8s源代码一定会读informer的代码的,因为informer相当优秀,大多数分布式项目(比如OpenStack)在解决组件间通信的问题时都会选择如kafka,rabbitmaq之类的消息队列,但是k8s不走寻常路,选择了自己解决,解决的方案是informer。

假设我们没有informer,那么我们应该如何从api server获取数据?

一般而言,我们有两种方式, 一是全量获取,二是增量获取,两种都各有优缺点,前者优点是,每次可以获取全量的最新状态, 逻辑简单,但是缺点很明显,如果请求频次过于频繁,就会有比较大的性能消耗, 如果频次过低就不够实时,但是依旧有比较大的性能消耗,想象一个100节点的集群,1000个deployment, 1000个ReplicaSet, 5000千个pod, 加个每个对象都只占5k, 就接近50MB, 这显然会占用比较多的带宽,这是让人难以接受的,而且数据的时效性不够高也是难以接受的,所以对于一个中大型集群而言,不能使用这种方式。

第二种方式是增量获取更新,这种方式的优点是时效性高占用资源低,但是相较于第一种方式而言,实现起来稍显复杂,复杂度在于两点,一是我们需要有健壮的容错机制,比如出错怎么办? 如果跳过可能导致状态不一致, 比如漏掉一个更新的请求, 那么对应的资源一直得不到正确的处理, 所以我们需要一种重试机制, 二是, 我们需要缓存全量的数据用于快速的检索, 比如定时轮训的检查资源,但我们不可能总是等收到增量更新才开始业务逻辑,所以增量更新的逻辑比较复杂, 并且增量更新不能单独存在, 因为我们需要全量的资源, 所以需要配合第一种方式。

那么怎么平衡这两种获取资源的方式呢? k8s的选择是,我全都要!!!

我全要图片

快速入门

一般来说informer会跟workque, controller在一起,这点从k8s的源代码可以很明显的看到,不过为了简单起见,这里只看informer的部分。

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"time"

	"k8s.io/client-go/util/homedir"
	"k8s.io/klog/v2"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	var kubeconfig *string
	var master string

	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.StringVar(&master, "master", "", "master url")
	flag.Parse()

	config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatal(err)
	}

	// 1.
	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

	// 2.
	_, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
        // 3.
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Add: ", key)
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Update: ", key)
			}
		},
		DeleteFunc: func(obj interface{}) {
            // 4.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Delete: ", key)
			}
		},
	})
    // 5. 
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()
    // 6.
	informer.Run(ctx.Done())
}

输出如下:

2023-06-03 14:19:21 Add:  default/example-fcsjwbzzf2
2023-06-03 14:20:21 Update:  default/example-fcsjwbzzf2
2023-06-03 14:21:21 Update:  default/example-fcsjwbzzf2

注意: 每分钟以Update的形式再次调用UpdateFunc

实名吐槽Golang的时间格式化!!!

代码分解如下:

  1. 创建ListWatch对象,用于获取资源最新列表及后续更新
  2. 创建informer对象,传入必要的参数
  3. 注册各种回调函数, 如AddFunc等
  4. 删除的对象和其他对象不同,所以需要不同的方法来获取key
  5. 设置退出信号量,k8s的惯用操作了
  6. 启动informer

通过上面的代码可以知道,创建informer有两件比较重要的事情,一是创建ListWatch,二是注册回调函数。

ListWatch

ListWatch就如名字指明的那样,List,Watch,前者是拉取指定资源的资源列表,比如default命名空间下的所有Pod资源,Watch是在前者拉取完成之后开始监听之后所有的资源变化(前者会得到一个版本号,watch可以借助这个版本号,只获取版本号之后的资源),比如新增,更新,删除等变化。

podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
    // 1.
	optionsModifier := func(options *metav1.ListOptions) {
		options.FieldSelector = fieldSelector.String()
	}
	return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
    // 2.
	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Do(context.TODO()).
			Get()
	}
    // 3.
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(context.TODO())
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

代码分解如下:

  1. 设置Options, Options以函数的形式传入也是k8s一个比较常用的模式了。
  2. 简单的在静态客户端的Get方法上包装一层函数
  3. 简单的在静态客户端的Watch方法上包装一层函数

可以看到ListWatch的内部构造并不复杂,仅仅是将GetWatch方法组合起来而已。

Informer

因为本文主要分析informer,所以会略过其中Store的部分,我们暂且将其作为一个存储的黑盒子即可,以后有文章再详细说明。

_, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
    // 略过代码部分
}

func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller) {
	// 1.
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
                                 
func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
) Controller {
	// 2.
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,
		// 3.
		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Replaced, Added, Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)
					}
				case Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return New(cfg)
}
                                 
func New(c *Config) Controller {
	ctlr := &controller{
		config: *c,
		clock:  &clock.RealClock{},
	}
	return ctlr
}

代码分解如下:

  1. 创建一个Store, 它用来存储informer获取到的资源
  2. 将Store再包装一层(k8s的传统操作了),提供先入先出(fifo)的功能
  3. informer处理的主函数,根据对象类型调用对应的回调函数,以及将对象更新到绑定的Store

这里有一个值得注意的点,informer是一个符合Controller接口的对象,阅读过k8s源代码或者写过operator的对controller应该不会陌生,这是k8s比较重要的对象了,或者说模式。

总的来说,informer的初始化过程还是比较清晰的,主要分为两步,创建队列(fifo),配置处理逻辑(Process),既然初始化不复杂,那么复杂的就是Run方法。

Run

那么看看informer怎么运行的吧

informer.Run(ctx.Done())

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
    // 1.
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
    // 2.
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}
	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()
	var wg wait.Group
	// 3.
	wg.StartWithChannel(stopCh, r.Run)
    // 4.
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

代码分解如下:

  1. 创建Reflector, reflector负责和apiserver通信,不断的将数据同步给informer
  2. 配置Reflector的各项参数
  3. 启动Reflector
  4. 启动informer的主循环

由于processLoop比较简单,我们先看看它的源代码。

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

代码很简单,应该不需要特别的说明,就是传入之前的Process方法用于处理队列传入的各个对象。

Reflector

Reflector的初始化并不复杂,代码如下

r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
)

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	// 省略其他代码
	r.setExpectedType(expectedType)
	return r
}

func (r *Reflector) setExpectedType(expectedType interface{}) {
	r.expectedType = reflect.TypeOf(expectedType)
	if obj, ok := expectedType.(*unstructured.Unstructured); ok {
		gvk := obj.GroupVersionKind()
		r.expectedGVK = &gvk
		r.expectedTypeName = gvk.String()
	}
}

上面的代码唯一值得提的是setExpectedType,k8s的对象总是要知道gvk的。

然后就是Reflector的运行逻辑


func (r *Reflector) Run(stopCh <-chan struct{}) {
    // 1.
	wait.BackoffUntil(func() {
        // 2.
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
}

代码分解如下:

  1. client-go提供的重试帮助函数,只要没有收到终止信号就会不断的重试传入的方法
  2. List And Watch, 获取列表并监听资源更新

重头戏就是ListAndWatch了。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	var resourceVersion string
    // 1.
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    // 2.
	if err := func() error {
		var list runtime.Object
		var paginatedResult bool
		var err error
		listCh := make(chan struct{}, 1)
		panicCh := make(chan interface{}, 1)
		go func() {
            // 3.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
            
			// 4.
			list, paginatedResult, err = pager.List(context.Background(), options)
			close(listCh)
		}()
		// 5.
		items, err := meta.ExtractList(list)
        // 6.
		if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("unable to sync list result: %v", err)
		}
        // 7.
		r.setLastSyncResourceVersion(resourceVersion)
		return nil
	}(); err != nil {
		return err
	}

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
    // 8.
	go func() {
		resyncCh, cleanup := r.resyncChan()
        if r.ShouldResync == nil || r.ShouldResync() {
            klog.V(4).Infof("%s: forcing resync", r.name)
            if err := r.store.Resync(); err != nil {
                resyncerrc <- err
                return
            }
        }
        cleanup()
		resyncCh, cleanup = r.resyncChan()
		}
	}()
	// 9.
	for {
        // 10.
		w, err := r.listerWatcher.Watch(options)
		// 11.
		if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
			return nil
		}
	}
}

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
	found := make([]interface{}, 0, len(items))
	for _, item := range items {
		found = append(found, item)
	}
	return r.store.Replace(found, resourceVersion)
}

代码分解如下:

  1. 设置资源版本号ResourceVersion, k8s提供了一种以版本号过滤的资源的方式,如果是首次,那么是0,如果因为网络等原因重试,就可以增量的获取遗落的资源列表,而不需再次全量的获取一遍
  2. 将List的逻辑放在一个匿名函数中统一处理错误,常见操作了。
  3. 构建一个分页器,分批获取资源列表。
  4. 开始获取,这里的List其实就是调用之前传入的lw.List
  5. 获取列表,这一步会检查列表对象是否合法以及做一定的转换。
  6. 将数据同步到Store,使用它的Replace方法,这可以在上面源代码的最后看到具体操作。
  7. 设置ResourceVersion,如果后续出错,就可以从这个资源版本开始了
  8. resync, 就是将Store里面的数据以Update的事件形式再次传入informer,会触发UpdateFunc回调函数。
  9. 监听的循环
  10. 通过之前传入的lw的Watch方法,获得watch.Interface,这个接口会不断的给出变更对象
  11. 处理上一步传来的事件。

最后就是Reflector的核心方法了。

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
        // 1.
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
            // 2.
			if event.Type == watch.Error {
				return apierrors.FromObject(event.Object)
			}
			if r.expectedType != nil {
				if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
					continue
				}
			}
			if r.expectedGVK != nil {
				if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
					continue
				}
			}
            // 3.
			meta, err := meta.Accessor(event.Object)
			newResourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
			case watch.Modified:
				err := r.store.Update(event.Object)
			case watch.Deleted:
				err := r.store.Delete(event.Object)
			case watch.Bookmark:
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			if rvu, ok := r.store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(newResourceVersion)
			}
			eventCount++
		}
	}
	return nil
}

代码分解如下:

  1. 获取监听到的事件
  2. 判断事件是否正常,是否符合预期的GVK等
  3. 不同的事件以不同方法更新,这样可以触发不同的回调函数

总的来说,reflector做的事情就是将数据更新到Store里面,而Informer会不断的从Store里面读取数据,当读到数据后就调用对应的回调函数。

总结

Informer是k8s里面非常重要的数据同步机制,理解了Informer就可以很容易找到k8s相关组件的主要业务逻辑了,可以想象的到,业务逻辑一定注册在回调函数中,不过真实的代码要多了一层抽象,因为k8s源代码里回调函数逻辑一般是判断一下就扔进workqueue了。

client-go的代码部分差不多结束了,后面阅读k8s的源代码。

参考链接