kubernetes kube-apiserver源码阅读2之GenericAPIServer

文章目录

kubernetes代码版本: v1.20.2

前一节大致看了一下apiserver 的启动流程,以及组成kube-apiserver的三个组件,这一节看看三个组件都会用到的一个非常重要的对象GenericAPIServer, 它是一个HTTP Server的抽象, 虽然这么说很抽象。它会提供注册路由的入口以及各种钩子函数的注册入口。

GenericAPIServer的构建

其实apiserver中的三个组件都会创建GenericAPIServer,看哪个其实都差不多,所以这些选择了apiExtensionsServer中的创建代码。

func CreateServerChain() (*aggregatorapiserver.APIAggregator, error) {
    // 1.
	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
}

func createAPIExtensionsServer(/*参数省略*/) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
    // 2.
	return apiextensionsConfig.Complete().New(delegateAPIServer)
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    //3.
	genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
	return s, nil
}


func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
	
    // 4. 
	handlerChainBuilder := func(handler http.Handler) http.Handler {
		return c.BuildHandlerChainFunc(handler, c.Config)
	}
     //5.
	apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
	
    // 6.
	s := &GenericAPIServer{
		Handler: apiServerHandler,
		listedPathProvider: apiServerHandler,
	}
	return s, nil
}

// 7. 
func NewAPIServerHandler() *APIServerHandler {
    // 8.
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
    // 9.
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

    // 10. goreset
	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.ServeMux = http.NewServeMux()
	gorestfulContainer.Router(restful.CurlyRouter{}) 
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})

    // 11
	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}
	return &APIServerHandler{
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

代码分解如下:

  1. 创建apiExtensionsServer对象
  2. 上一步的具体实现
  3. 创建一个GenericAPIServer对象
  4. 处理链的构造,这条链会依次负责认证,授权,审计等功能,这里只是简单的包装了一下BuildHandlerChainFunc
  5. 创建apiServerHandler, 这是一个Handler, 它实现了ServeHTTP方法
  6. 基于上一步的apiServerHandler构建GenericAPIServer
  7. 第5步的具体实现
  8. 构造nonGoRestfulMux路由对象,与gorestfulContainer相对应
  9. 通过第1步知道,这里的notFoundHandlernil, 所以apiExtensionsServer,所以这里不会设置notFoundHandler, 而是使用NewPathRecorderMux里的http.NotFoundHandler()
  10. 构造go-restfulContainer对象, go-restful是一个面向RESTful的go web框架, k8s的资源几乎都注册在它上面
  11. 构造一个director对象,它负责在nonGoRestfulMuxgorestfulContainer之间路由,最后返回APIServerHandler

上一节说过三个组件的是通过委托默认来调用的,即aggregatorServer ->kubeAPIServer -> aggregatorServer -> apiExtensionsServer,那么apiExtensionsServer如果也无法响应怎么办呢?这里的代码给出了回答, 它如果无法响应就会将请求委托给genericapiserver.NewEmptyDelegate(), 而emptyDelegateUnprotectedHandler方法返回nil,所以最终会使用http.NotFoundHandler()作为apiserver的最终兜底方案,即返回404.

这里提到了nonGoRestfulMuxgorestfulContainer两个对象,前者用于注册一些非RESTful风格的路由,比如/health, /metrics等接口,而/apis/apps/v1, /api/v1等都是RESTful风格的路由,所以两者使用不同的对象来注册路由。

PrepareRun

前文简单的提及了一下PrepareRun, 这里再简单重复一下

func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
      // 可以看到GenericServer占据了C位
      prepared := s.GenericAPIServer.PrepareRun()
    return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}

func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
	s.delegationTarget.PrepareRun()
	return preparedGenericAPIServer{s}
}

代码并不复杂,最终是通过delegationTarget.PrepareRun()ServerChain上的各个组件都执行到,最后用preparedGenericAPIServer包装一下。

Run

从前文知道APIAggregator最终是运行Run方法,这里深入一下

func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
    // 1.
	delayedStopCh := make(chan struct{})
    go func() {
		defer close(delayedStopCh)
		<-stopCh
		close(s.readinessStopCh)

		time.Sleep(s.ShutdownDelayDuration)
	}()

	// 2.
	stoppedCh, err := s.NonBlockingRun(delayedStopCh)
    // 3.
	<-stopCh
	err = s.RunPreShutdownHooks()
    // 4.
    <-stoppedCh
	s.HandlerChainWaitGroup.Wait()
	return nil
}

func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
	internalStopCh := make(chan struct{})
	var stoppedCh <-chan struct{}
	if s.SecureServingInfo != nil && s.Handler != nil {
		var err error
        // 5.
		stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
	}
	// 6.
	s.RunPostStartHooks(stopCh)
	return stoppedCh, nil
}

代码分解如下:

  1. 构造一个延迟的关闭通道,在收到关闭信号之前睡眠一段时间
  2. 启动HTTP Server的入口
  3. 收到关闭信号(外部)之后执行注册的钩子函数
  4. 收到关闭信号(内部)之后等待处理函数链条完成,比如一些长时间的等待请求
  5. 启动HTTP服务的入口
  6. 运行启动后的钩子函数

这里主要是运行各种钩子函数,而NonBlockingRun也比较好比较理解,以非阻塞的方式运行,所以后面的等待都是通过通道来阻塞主进程,后面就应该是具体的接受客户端请求的逻辑了。

func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
    // 1.
	secureServer := &http.Server{
		Addr:           s.Listener.Addr().String(),
        // 2.
		Handler:        handler,
		MaxHeaderBytes: 1 << 20,
		TLSConfig:      tlsConfig,
	}
    // 设置http服务器的各种参数,比如读写数据流的帧(FrameSize)大小
    
	return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}

func RunServer(
	server *http.Server,
	ln net.Listener,
	shutDownTimeout time.Duration,
	stopCh <-chan struct{},
) (<-chan struct{}, error) {
	go func() {
		defer utilruntime.HandleCrash()
		var listener net.Listener
		listener = tcpKeepAliveListener{ln}
        // 3.
		err := server.Serve(listener)
		select {
		case <-stopCh:
			klog.Info(msg)
		default:
			panic(fmt.Sprintf("%s due to error: %v", msg, err))
		}
	}()

	return stoppedCh, nil
}

代码分解如下:

  1. 用go标准库的net/http构造http server
  2. 重点是传入的handler,也就是GenericAPIServer.Handler, 这个方法会负责处理请求
  3. go net/http标准库的启动方法

GenericAPIServer的启动分为了两部分,一部分是自己业务相关的设置和钩子函数执行,一部分是http标准库的相关设置,然后我们就可以回过头来看看GenericAPIServer.Handler

Handler

在此之前让我们回顾一下GenericAPIServerAPIServerHandler的数据结构

type GenericAPIServer struct {
    Handler *APIServerHandler
}

type APIServerHandler struct {
	FullHandlerChain http.Handler
	GoRestfulContainer *restful.Container
	NonGoRestfulMux *mux.PathRecorderMux
	Director http.Handler
}

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	return &APIServerHandler{
        // 1.
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 2.
	a.FullHandlerChain.ServeHTTP(w, r)
}

代码分解如下:

  1. director外面用handlerChainBuilder包一层, 也就是将通用的处理方法(比如认证, 授权等)构造成一条链,作为公共对象,因为三个组件都会用到。
  2. 因为上一步的操作,所以不直接调用director,即在真正处理前,使用请求链对象(handlerChain)处理认证,授权,审计等操作。

处理链对象(handlerChain)的各项操作以后单独再说,这里先看director对象是怎么处理请求的。

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	path := req.URL.Path

    // 1. 
	for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
		switch {
		case ws.RootPath() == "/apis":
			if path == "/apis" || path == "/apis/" {
				d.goRestfulContainer.Dispatch(w, req)
				return
			}

		case strings.HasPrefix(path, ws.RootPath()):
			// ensure an exact match or a path boundary match
			if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
				d.goRestfulContainer.Dispatch(w, req)
				return
			}
		}
	}
	// 2.
	d.nonGoRestfulMux.ServeHTTP(w, req)
}

func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}

func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 3.
	if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
		exactHandler.ServeHTTP(w, r)
		return
	}
	for _, prefixHandler := range h.prefixHandlers {
		if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
			prefixHandler.handler.ServeHTTP(w, r)
			return
		}
	}

    // 4.
	h.notFoundHandler.ServeHTTP(w, r)
}

代码分解如下:

  1. 首先处理k8s RESTful风格资源的请求
  2. 上一步不匹配的交给nonGoRestfulMux
  3. 尝试找到匹配的路由并处理
  4. 找不到就委托给下一级,比如aggregatorServer的下一级是kubeAPIServer

总结

至此我们大致了解了kube-apiserver中的三个组件的核心组件GenericAPIServer是怎么将请求链接起来并移交请求的,以及内部的路由数据流,但我们还不了解怎么处理一些通用操作,比如认证,鉴权,审计等,以及k8s的api资源是怎么注册路由的,这两个问题我们放在后面的文章来说明。

参考链接