kubernetes kube-apiserver源码阅读8之Hook
文章目录
阅读kube-apiserver
的过程中,会发现很多的AddPostStartHook
的代码,这部分代码用于执行kube-apiserver
启动之后的逻辑,因为他们放在启动后执行更适合,所以就提供了两种钩子(Hook), PostStartHook
和PreShutdownHook
。这里只看PostStartHook
,并且只看bootstrap-controller
对应的钩子函数。
kubernetes service
当集群初次创建的时候,查看k8s的service的时候会发现,有一个叫做kubernetes
的service,比如下面这样。
kubectl get service
# 输出如下
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 3s
如果不小心删除了,会发现它又自动创建了,那么它是怎么实现的呢?
bootstrapController
这里简单的列一下之前的调用链
func CreateServerChain(*aggregatorapiserver.APIAggregator, error) {
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
}
func CreateKubeAPIServer() {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New() {
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
func (m *Instance) InstallLegacyAPI() error {
// 1.
controllerName := "bootstrap-controller"
// 2.
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
// 3.
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
// 4.
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
// 5.
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
return nil
}
前面的调用链就不细说了。
代码分解如下:
- 设置
controller
名字 - 创建k8s的静态客户端
- 创建
controller
- 加入
PostStartHook
然后我们继续这个controller
的创建和启动逻辑
func (c *completedConfig) NewBootstrapController() *Controller {
// 1.
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
// 2. kube-system, kube-public, kube-node-lease
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
return &Controller{
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
SystemNamespaces: systemNamespaces,
SystemNamespacesInterval: 1 * time.Minute,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
}
}
// 3.
func (c *Controller) PostStartHook() error {
c.Start()
return nil
}
// 4.
func (c *Controller) Start() {
// 5.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
// 6.
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
}
// 忽略验证/修复服务ip的逻辑
// 7.
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
c.runner.Start()
}
// 8.
func (r *Runner) Start() {
if r.stop == nil {
c := make(chan struct{})
r.stop = &c
for i := range r.loopFuncs {
go r.loopFuncs[i](*r.stop)
}
}
}
代码分解如下:
- 获取服务IP地址端的第一个IP, 比如
--service-cluster-ip-range=10.96.0.0/12
参数的第一个IP是10.96.0.1
- 服务启动后要创建的namespace
- 注册到钩子函数里的函数
- 上一步的具体实现
- 创建
Service
对应的Endpoints
,用于删除对应的Endpoint
- 因为
kube-apiserver
可能还没有启动完成,所以先删除对应的Endpoint
(如果有的话) - 创建一个
Runner
对象,传入一串后续要执行的函数 - 上一步的具体启动逻辑
RunKubernetesNamespaces
这个实现比较简单,就顺便一起看看
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() {
// Loop the system namespace list, and create them if they do not exist
for _, ns := range c.SystemNamespaces {
if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
}
}
}, c.SystemNamespacesInterval, ch)
}
代码比较简单,就是不断的检查是否有必要创建,默认间隔是每分钟。
RunKubernetesService
func (c *Controller) RunKubernetesService(ch chan struct{}) {
// 1.
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.readyzClient.Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
// 2.
wait.NonSlidingUntil(func() {
// 3.
if err := c.UpdateKubernetesService(false); err != nil {
runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
}
}, c.EndpointInterval, ch)
}
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// 4.
if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
return err
}
return nil
}
代码分解如下:
- 等待服务就绪
- 重试机制的封装
- 不断尝试更新
kubernetes service
, 如果需要的话 - 如果有必要就创建
总结
Kube-apiserver
处了处理客户端请求,还会起一堆的controller
用于监控必要的资源并镜像相关的操作,这些操作通过注册钩子函数的方式来实现。