kubernetes kube-apiserver源码阅读7之服务发现
从前文我们已经了解到kube-apiserver
内部有三个组件,分别是apiExtensionsServer
,kubeAPIServer
,aggregatorServer
, 为了方便用户扩展,所以存在apiExtensionsServer
, 因为k8s有自己的核心资源,所以需要kubeAPIServer
, 为了将两者结合起来所以需要aggregatorServer
, 那么aggregatorServer
怎么两者的资源集中在一起并提供给用户查询呢? 本文尝试从k8s的源代码中找到问题的答案。
APIService
所有的k8s资源以及CRD资源都会注册在这里,可以执行以下命令获取。
kubectl get apiservice
结果如下:
NAME SERVICE AVAILABLE AGE
v1. Local True 372d
v1.admissionregistration.k8s.io Local True 372d
v1.apiextensions.k8s.io Local True 372d
v1.apps Local True 372d
v1.authentication.k8s.io Local True 372d
# 省略其他资源
Name的命名方式就是 版本 + 资源组名, 核心资源由于历史原因没有组名,所以第一个组名是空的。
当我们访问http://{server_ip}/apis
的时候其实就是将这些APIService
转换成APIGroup
对象并序列化返回给客户端,如果你访问/apis
你会发现,并没有核心组, 我想这是因为核心组资源有一个/api
的路径,所以就将其对应的资源列表在请求/apis
的时候去掉了。
核心资源注册
这里的核心资源只k8s开箱即用的所有资源。
既然知道了服务发现的机制,那么直接找APIService
就可以了。
func createAggregatorServer() {
// 1.
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
// 2.
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
// 3.
crdRegistrationController := crdregistration.NewCRDRegistrationController(
apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
autoRegistrationController)
// 4.
err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go crdRegistrationController.Run(5, context.StopCh)
go func() {
// 等待CRD资源初始化完成
if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
crdRegistrationController.WaitForInitialSync()
}
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
return aggregatorServer, nil
}
// 5.
func apiServicesToRegister() []*v1.APIService {
apiServices := []*v1.APIService{}
// 遍历kubeAPIServer的路由
for _, curr := range delegateAPIServer.ListedPaths() {
// 核心资源的路由
if curr == "/api/v1" {
// 6.
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
continue
}
if !strings.HasPrefix(curr, "/apis/") {
continue
}
tokens := strings.Split(curr, "/")
if len(tokens) != 4 {
continue
}
// 7.
apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
if apiService == nil {
continue
}
// 8.
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
}
return apiServices
}
// 9.
func makeAPIService(gv schema.GroupVersion) *v1.APIService {
apiServicePriority, ok := apiVersionPriorities[gv]
return &v1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
Spec: v1.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
GroupPriorityMinimum: apiServicePriority.group,
VersionPriority: apiServicePriority.version,
},
}
}
代码分解如下:
- 创建一个
APIService
资源的控制器,控制器模式在k8s中很常见,这里就不深入了。 - 基于
kubeAPIServer
的注册的路由创建对应的APISerice
(如果没有的话) - 创建一个
CRD
的资源控制器,当用户创建新的CRD
资源,就将对应的资源信息更新到APIservice
- 启动第1,3步创建的控制器,基于
informer
机制实时监控CRD
资源 - 第二步的具体实现
- 对核心组资源单独处理,因为它的组名是空字符串,创建对应的
APIService
并注册 - 其他核心资源,根据注册的路由信息结构组名和版本名,创建对应的
APIService并
注册 - 上面构建
APIService
对象的具体实现。
通过上面的代码了解到了,k8s核心资源的注册,然后在看看CRD
资源的注册逻辑。
CRD资源注册
前面其实已经看到了CRD
资源注册的入口,但是没有深入,这里继续看它的内部逻辑。
func createAggregatorServer() {
// 1.
autoRegistrationController := autoregister.NewAutoRegisterController()
crdRegistrationController := crdregistration.NewCRDRegistrationController(
apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
// 2.
autoRegistrationController)
err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
// 3.
go crdRegistrationController.Run(5, context.StopCh)
go func() {
// 等待CRD资源初始化完成
if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
crdRegistrationController.WaitForInitialSync()
}
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
}
代码分解如下:
- 分别创建
APIService
,CRD
资源的控制器 - 注意
CRD
控制器传入了autoRegistrationController
- 分别启动两个控制器
上面的代码之前大致讲过,这里稍微再详细的说明了一下。
如果了解k8s的informer
机制,会知道informer
提供一种类似回调的机制让控制器
注册回调函数,当资源更新的时候就将调用之前注册的回调函数,那么为了实时的将CRD
的资源更新注册到APIService
, 这里大胆的猜测一下(不大胆也没关系 :) ),更新APISerice
的逻辑在CRD
控制器的业务代码里面。
func NewCRDRegistrationController() *crdRegistrationController {
// 1.
c := &crdRegistrationController{
crdLister: crdinformer.Lister(),
crdSynced: crdinformer.Informer().HasSynced,
apiServiceRegistration: apiServiceRegistration,
syncedInitialSet: make(chan struct{}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_autoregistration_controller"),
}
// 2.
c.syncHandler = c.handleVersionUpdate
// 3.
crdinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cast := obj.(*apiextensionsv1.CustomResourceDefinition)
c.enqueueCRD(cast)
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.enqueueCRD(oldObj.(*apiextensionsv1.CustomResourceDefinition))
c.enqueueCRD(newObj.(*apiextensionsv1.CustomResourceDefinition))
},
DeleteFunc: func(obj interface{}) {
c.enqueueCRD(cast)
},
})
return c
}
// 4.
func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
apiServiceName := groupVersion.Version + "." + groupVersion.Group
// 5.
crds, err := c.crdLister.List(labels.Everything())
// 6.
for _, crd := range crds {
// 7.
if crd.Spec.Group != groupVersion.Group {
continue
}
for _, version := range crd.Spec.Versions {
if version.Name != groupVersion.Version || !version.Served {
continue
}
// 8.
c.apiServiceRegistration.AddAPIServiceToSync(&v1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
Spec: v1.APIServiceSpec{
Group: groupVersion.Group,
Version: groupVersion.Version,
GroupPriorityMinimum: 1000,
VersionPriority: 100,
},
})
return nil
}
}
// 9.
c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName)
return nil
}
代码分解如下:
- 创建
crdRegistrationController
, 重点是apiServiceRegistration
, 后续的更新会调用它 - k8s控制器的惯用模式,所有的更新最终由
syncHandler
统一处理 - 注册回调函数,
informer
与控制器
之间通过队列解耦,也是k8s控制的惯用模式 - 第2步的具体实现
- 因为
CRD
资源一般不会很多,所以直接列出所有CRD
资源,再者Informer
的列出所有资源也不是重新再拉一遍数据,而是使用本地缓存,所以不用担心。 - 遍历
CRD
资源 - 如果不是要处理的资源就跳过
- 添加找到的资源,并返回
- 如果没有在当前
CRD
列表,说明对应的资源已经被删除,所以删除资源
总结
总的来说,Kube-apiserver
可以通过APIService
向客户端展示当前k8s集群所有可用资源,用户创建CRD
资源,也会实时的同步到APIService
。