kubernetes kube-apiserver源码阅读7之服务发现

文章目录

从前文我们已经了解到kube-apiserver内部有三个组件,分别是apiExtensionsServer,kubeAPIServer,aggregatorServer, 为了方便用户扩展,所以存在apiExtensionsServer, 因为k8s有自己的核心资源,所以需要kubeAPIServer, 为了将两者结合起来所以需要aggregatorServer, 那么aggregatorServer怎么两者的资源集中在一起并提供给用户查询呢? 本文尝试从k8s的源代码中找到问题的答案。

APIService

所有的k8s资源以及CRD资源都会注册在这里,可以执行以下命令获取。

kubectl get apiservice

结果如下:

NAME                                   SERVICE   AVAILABLE   AGE
v1.                                    Local     True        372d
v1.admissionregistration.k8s.io        Local     True        372d
v1.apiextensions.k8s.io                Local     True        372d
v1.apps                                Local     True        372d
v1.authentication.k8s.io               Local     True        372d
# 省略其他资源

Name的命名方式就是 版本 + 资源组名, 核心资源由于历史原因没有组名,所以第一个组名是空的。

当我们访问http://{server_ip}/apis的时候其实就是将这些APIService转换成APIGroup对象并序列化返回给客户端,如果你访问/apis你会发现,并没有核心组, 我想这是因为核心组资源有一个/api的路径,所以就将其对应的资源列表在请求/apis的时候去掉了。

核心资源注册

这里的核心资源只k8s开箱即用的所有资源。

既然知道了服务发现的机制,那么直接找APIService就可以了。

func createAggregatorServer() {
	// 1.
	autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
    // 2.
	apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
    // 3.
	crdRegistrationController := crdregistration.NewCRDRegistrationController(
		apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
		autoRegistrationController)
	// 4.
	err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
		go crdRegistrationController.Run(5, context.StopCh)
		go func() {
			// 等待CRD资源初始化完成
			if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
				crdRegistrationController.WaitForInitialSync()
			}
			autoRegistrationController.Run(5, context.StopCh)
		}()
		return nil
	})

	return aggregatorServer, nil
}

// 5.
func apiServicesToRegister() []*v1.APIService {
	apiServices := []*v1.APIService{}
	// 遍历kubeAPIServer的路由
	for _, curr := range delegateAPIServer.ListedPaths() {
        // 核心资源的路由
		if curr == "/api/v1" {
            // 6.
			apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
			registration.AddAPIServiceToSyncOnStart(apiService)
			apiServices = append(apiServices, apiService)
			continue
		}

		if !strings.HasPrefix(curr, "/apis/") {
			continue
		}

		tokens := strings.Split(curr, "/")
		if len(tokens) != 4 {
			continue
		}
		// 7.
		apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
		if apiService == nil {
			continue
		}
        // 8.
		registration.AddAPIServiceToSyncOnStart(apiService)
		apiServices = append(apiServices, apiService)
	}

	return apiServices
}

// 9.
func makeAPIService(gv schema.GroupVersion) *v1.APIService {
	apiServicePriority, ok := apiVersionPriorities[gv]
	return &v1.APIService{
		ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
		Spec: v1.APIServiceSpec{
			Group:                gv.Group,
			Version:              gv.Version,
			GroupPriorityMinimum: apiServicePriority.group,
			VersionPriority:      apiServicePriority.version,
		},
	}
}

代码分解如下:

  1. 创建一个APIService资源的控制器,控制器模式在k8s中很常见,这里就不深入了。
  2. 基于kubeAPIServer的注册的路由创建对应的APISerice(如果没有的话)
  3. 创建一个CRD的资源控制器,当用户创建新的CRD资源,就将对应的资源信息更新到APIservice
  4. 启动第1,3步创建的控制器,基于informer机制实时监控CRD资源
  5. 第二步的具体实现
  6. 对核心组资源单独处理,因为它的组名是空字符串,创建对应的APIService并注册
  7. 其他核心资源,根据注册的路由信息结构组名和版本名,创建对应的APIService并注册
  8. 上面构建APIService对象的具体实现。

通过上面的代码了解到了,k8s核心资源的注册,然后在看看CRD资源的注册逻辑。

CRD资源注册

前面其实已经看到了CRD资源注册的入口,但是没有深入,这里继续看它的内部逻辑。

func createAggregatorServer() {
	// 1.
	autoRegistrationController := autoregister.NewAutoRegisterController()
    crdRegistrationController := crdregistration.NewCRDRegistrationController(
        apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
        // 2. 
        autoRegistrationController)
	err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
        // 3.
		go crdRegistrationController.Run(5, context.StopCh)
		go func() {
			// 等待CRD资源初始化完成
			if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
				crdRegistrationController.WaitForInitialSync()
			}
			autoRegistrationController.Run(5, context.StopCh)
		}()
		return nil
	})
}

代码分解如下:

  1. 分别创建APIService, CRD资源的控制器
  2. 注意CRD控制器传入了autoRegistrationController
  3. 分别启动两个控制器

上面的代码之前大致讲过,这里稍微再详细的说明了一下。

如果了解k8s的informer机制,会知道informer提供一种类似回调的机制让控制器注册回调函数,当资源更新的时候就将调用之前注册的回调函数,那么为了实时的将CRD的资源更新注册到APIService, 这里大胆的猜测一下(不大胆也没关系 :) ),更新APISerice的逻辑在CRD控制器的业务代码里面。

func NewCRDRegistrationController() *crdRegistrationController {
    // 1.
	c := &crdRegistrationController{
		crdLister:              crdinformer.Lister(),
		crdSynced:              crdinformer.Informer().HasSynced,
		apiServiceRegistration: apiServiceRegistration,
		syncedInitialSet:       make(chan struct{}),
		queue:                  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_autoregistration_controller"),
	}
    
    // 2.
	c.syncHandler = c.handleVersionUpdate

    // 3.
	crdinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			cast := obj.(*apiextensionsv1.CustomResourceDefinition)
			c.enqueueCRD(cast)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			c.enqueueCRD(oldObj.(*apiextensionsv1.CustomResourceDefinition))
			c.enqueueCRD(newObj.(*apiextensionsv1.CustomResourceDefinition))
		},
		DeleteFunc: func(obj interface{}) {
			c.enqueueCRD(cast)
		},
	})

	return c
}

// 4.
func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
	apiServiceName := groupVersion.Version + "." + groupVersion.Group
    // 5.
	crds, err := c.crdLister.List(labels.Everything())
    
    // 6.
	for _, crd := range crds {
        // 7.
		if crd.Spec.Group != groupVersion.Group {
			continue
		}
		for _, version := range crd.Spec.Versions {
			if version.Name != groupVersion.Version || !version.Served {
				continue
			}
			
            // 8.
			c.apiServiceRegistration.AddAPIServiceToSync(&v1.APIService{
				ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
				Spec: v1.APIServiceSpec{
					Group:                groupVersion.Group,
					Version:              groupVersion.Version,
					GroupPriorityMinimum: 1000, 
					VersionPriority:      100, 
				},
			})
			return nil
		}
	}
	// 9.
	c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName)
	return nil
}

代码分解如下:

  1. 创建crdRegistrationController, 重点是apiServiceRegistration, 后续的更新会调用它
  2. k8s控制器的惯用模式,所有的更新最终由syncHandler统一处理
  3. 注册回调函数,informer控制器之间通过队列解耦,也是k8s控制的惯用模式
  4. 第2步的具体实现
  5. 因为CRD资源一般不会很多,所以直接列出所有CRD资源,再者Informer的列出所有资源也不是重新再拉一遍数据,而是使用本地缓存,所以不用担心。
  6. 遍历CRD资源
  7. 如果不是要处理的资源就跳过
  8. 添加找到的资源,并返回
  9. 如果没有在当前CRD列表,说明对应的资源已经被删除,所以删除资源

总结

总的来说,Kube-apiserver可以通过APIService向客户端展示当前k8s集群所有可用资源,用户创建CRD资源,也会实时的同步到APIService

参考链接