kubernetes kube-apiserver源码阅读8之Hook

文章目录

阅读kube-apiserver的过程中,会发现很多的AddPostStartHook的代码,这部分代码用于执行kube-apiserver启动之后的逻辑,因为他们放在启动后执行更适合,所以就提供了两种钩子(Hook), PostStartHookPreShutdownHook。这里只看PostStartHook,并且只看bootstrap-controller对应的钩子函数。

kubernetes service

当集群初次创建的时候,查看k8s的service的时候会发现,有一个叫做kubernetes的service,比如下面这样。

kubectl get service
# 输出如下
NAME               TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
kubernetes         ClusterIP   10.96.0.1        <none>        443/TCP             3s

如果不小心删除了,会发现它又自动创建了,那么它是怎么实现的呢?

bootstrapController

这里简单的列一下之前的调用链

func CreateServerChain(*aggregatorapiserver.APIAggregator, error) {
    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
}

func CreateKubeAPIServer() {
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
}

func (c completedConfig) New() {
    if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
			return nil, err
		}
}

func (m *Instance) InstallLegacyAPI() error {
    // 1.
	controllerName := "bootstrap-controller"
    // 2.
	coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    // 3.
	bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
    // 4.
	m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    // 5.
	m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
	return nil
}

前面的调用链就不细说了。

代码分解如下:

  1. 设置controller名字
  2. 创建k8s的静态客户端
  3. 创建controller
  4. 加入PostStartHook

然后我们继续这个controller的创建和启动逻辑

func (c *completedConfig) NewBootstrapController() *Controller {
    // 1.
	_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
	// 2. kube-system, kube-public, kube-node-lease
	systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}

	return &Controller{
		EndpointInterval:   c.ExtraConfig.EndpointReconcilerConfig.Interval,
		SystemNamespaces:         systemNamespaces,
		SystemNamespacesInterval: 1 * time.Minute,
		ServiceIP:                 c.ExtraConfig.APIServerServiceIP,
	}
}

// 3.
func (c *Controller) PostStartHook() error {
	c.Start()
	return nil
}

// 4.
func (c *Controller) Start() {
	//  5.
	endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    // 6.
	if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
		klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
	}
    
    // 忽略验证/修复服务ip的逻辑
    
	// 7.
	c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
	c.runner.Start()
}

// 8.
func (r *Runner) Start() {
	if r.stop == nil {
		c := make(chan struct{})
		r.stop = &c
		for i := range r.loopFuncs {
			go r.loopFuncs[i](*r.stop)
		}
	}
}

代码分解如下:

  1. 获取服务IP地址端的第一个IP, 比如--service-cluster-ip-range=10.96.0.0/12参数的第一个IP是10.96.0.1
  2. 服务启动后要创建的namespace
  3. 注册到钩子函数里的函数
  4. 上一步的具体实现
  5. 创建Service对应的Endpoints,用于删除对应的Endpoint
  6. 因为kube-apiserver可能还没有启动完成,所以先删除对应的Endpoint(如果有的话)
  7. 创建一个Runner对象,传入一串后续要执行的函数
  8. 上一步的具体启动逻辑

RunKubernetesNamespaces

这个实现比较简单,就顺便一起看看

func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
	wait.Until(func() {
		// Loop the system namespace list, and create them if they do not exist
		for _, ns := range c.SystemNamespaces {
			if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
				runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
			}
		}
	}, c.SystemNamespacesInterval, ch)
}

代码比较简单,就是不断的检查是否有必要创建,默认间隔是每分钟。

RunKubernetesService

func (c *Controller) RunKubernetesService(ch chan struct{}) {
	// 1.
	wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
		var code int
		c.readyzClient.Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
		return code == http.StatusOK, nil
	}, ch)
    
	// 2.
	wait.NonSlidingUntil(func() {
		// 3.
		if err := c.UpdateKubernetesService(false); err != nil {
			runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
		}
	}, c.EndpointInterval, ch)
}

func (c *Controller) UpdateKubernetesService(reconcile bool) error {
	// 4.
	if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
		return err
	}
	return nil
}

代码分解如下:

  1. 等待服务就绪
  2. 重试机制的封装
  3. 不断尝试更新kubernetes service, 如果需要的话
  4. 如果有必要就创建

总结

Kube-apiserver处了处理客户端请求,还会起一堆的controller用于监控必要的资源并镜像相关的操作,这些操作通过注册钩子函数的方式来实现。

参考链接