kubernetes kube-apiserver源码阅读2之GenericAPIServer
kubernetes代码版本: v1.20.2
前一节大致看了一下apiserver 的启动流程,以及组成kube-apiserver
的三个组件,这一节看看三个组件都会用到的一个非常重要的对象GenericAPIServer
, 它是一个HTTP Server的抽象, 虽然这么说很抽象。它会提供注册路由的入口以及各种钩子函数的注册入口。
GenericAPIServer的构建
其实apiserver中的三个组件都会创建GenericAPIServer
,看哪个其实都差不多,所以这些选择了apiExtensionsServer
中的创建代码。
func CreateServerChain() (*aggregatorapiserver.APIAggregator, error) {
// 1.
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
}
func createAPIExtensionsServer(/*参数省略*/) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
// 2.
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
//3.
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
return s, nil
}
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// 4.
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
//5.
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
// 6.
s := &GenericAPIServer{
Handler: apiServerHandler,
listedPathProvider: apiServerHandler,
}
return s, nil
}
// 7.
func NewAPIServerHandler() *APIServerHandler {
// 8.
nonGoRestfulMux := mux.NewPathRecorderMux(name)
// 9.
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
// 10. goreset
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{})
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
// 11
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
代码分解如下:
- 创建
apiExtensionsServer
对象 - 上一步的具体实现
- 创建一个
GenericAPIServer
对象 - 处理链的构造,这条链会依次负责认证,授权,审计等功能,这里只是简单的包装了一下
BuildHandlerChainFunc
。 - 创建
apiServerHandler
, 这是一个Handler, 它实现了ServeHTTP
方法 - 基于上一步的
apiServerHandler
构建GenericAPIServer
- 第5步的具体实现
- 构造
nonGoRestfulMux
路由对象,与gorestfulContainer
相对应 - 通过第1步知道,这里的
notFoundHandler
是nil
, 所以apiExtensionsServer
,所以这里不会设置notFoundHandler
, 而是使用NewPathRecorderMux
里的http.NotFoundHandler()
- 构造
go-restful
的Container
对象,go-restful
是一个面向RESTful的go web框架, k8s的资源几乎都注册在它上面 - 构造一个
director
对象,它负责在nonGoRestfulMux
和gorestfulContainer
之间路由,最后返回APIServerHandler
上一节说过三个组件的是通过委托默认来调用的,即aggregatorServer
->kubeAPIServer
-> aggregatorServer
-> apiExtensionsServer
,那么apiExtensionsServer
如果也无法响应怎么办呢?这里的代码给出了回答, 它如果无法响应就会将请求委托给genericapiserver.NewEmptyDelegate()
, 而emptyDelegate
的UnprotectedHandler
方法返回nil
,所以最终会使用http.NotFoundHandler()
作为apiserver的最终兜底方案,即返回404.
这里提到了nonGoRestfulMux
,gorestfulContainer
两个对象,前者用于注册一些非RESTful风格的路由,比如/health
, /metrics
等接口,而/apis/apps/v1
, /api/v1
等都是RESTful风格的路由,所以两者使用不同的对象来注册路由。
PrepareRun
前文简单的提及了一下PrepareRun
, 这里再简单重复一下
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// 可以看到GenericServer占据了C位
prepared := s.GenericAPIServer.PrepareRun()
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.delegationTarget.PrepareRun()
return preparedGenericAPIServer{s}
}
代码并不复杂,最终是通过delegationTarget.PrepareRun()
将ServerChain
上的各个组件都执行到,最后用preparedGenericAPIServer
包装一下。
Run
从前文知道APIAggregator
最终是运行Run方法,这里深入一下
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// 1.
delayedStopCh := make(chan struct{})
go func() {
defer close(delayedStopCh)
<-stopCh
close(s.readinessStopCh)
time.Sleep(s.ShutdownDelayDuration)
}()
// 2.
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
// 3.
<-stopCh
err = s.RunPreShutdownHooks()
// 4.
<-stoppedCh
s.HandlerChainWaitGroup.Wait()
return nil
}
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
// 5.
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
}
// 6.
s.RunPostStartHooks(stopCh)
return stoppedCh, nil
}
代码分解如下:
- 构造一个延迟的关闭通道,在收到关闭信号之前睡眠一段时间
- 启动HTTP Server的入口
- 收到关闭信号(外部)之后执行注册的钩子函数
- 收到关闭信号(内部)之后等待处理函数链条完成,比如一些长时间的等待请求
- 启动HTTP服务的入口
- 运行启动后的钩子函数
这里主要是运行各种钩子函数,而NonBlockingRun
也比较好比较理解,以非阻塞的方式运行,所以后面的等待都是通过通道来阻塞主进程,后面就应该是具体的接受客户端请求的逻辑了。
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
// 1.
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
// 2.
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
}
// 设置http服务器的各种参数,比如读写数据流的帧(FrameSize)大小
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
func RunServer(
server *http.Server,
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, error) {
go func() {
defer utilruntime.HandleCrash()
var listener net.Listener
listener = tcpKeepAliveListener{ln}
// 3.
err := server.Serve(listener)
select {
case <-stopCh:
klog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
return stoppedCh, nil
}
代码分解如下:
- 用go标准库的
net/http
构造http server - 重点是传入的handler,也就是
GenericAPIServer.Handler
, 这个方法会负责处理请求 - go net/http标准库的启动方法
GenericAPIServer的启动分为了两部分,一部分是自己业务相关的设置和钩子函数执行,一部分是http标准库的相关设置,然后我们就可以回过头来看看GenericAPIServer.Handler
Handler
在此之前让我们回顾一下GenericAPIServer
和APIServerHandler
的数据结构
type GenericAPIServer struct {
Handler *APIServerHandler
}
type APIServerHandler struct {
FullHandlerChain http.Handler
GoRestfulContainer *restful.Container
NonGoRestfulMux *mux.PathRecorderMux
Director http.Handler
}
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
// 1.
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 2.
a.FullHandlerChain.ServeHTTP(w, r)
}
代码分解如下:
- 在
director
外面用handlerChainBuilder
包一层, 也就是将通用的处理方法(比如认证, 授权等)构造成一条链,作为公共对象,因为三个组件都会用到。 - 因为上一步的操作,所以不直接调用director,即在真正处理前,使用请求链对象(handlerChain)处理认证,授权,审计等操作。
处理链对象(handlerChain)的各项操作以后单独再说,这里先看director
对象是怎么处理请求的。
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// 1.
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
if path == "/apis" || path == "/apis/" {
d.goRestfulContainer.Dispatch(w, req)
return
}
case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}
// 2.
d.nonGoRestfulMux.ServeHTTP(w, req)
}
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 3.
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
exactHandler.ServeHTTP(w, r)
return
}
for _, prefixHandler := range h.prefixHandlers {
if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
prefixHandler.handler.ServeHTTP(w, r)
return
}
}
// 4.
h.notFoundHandler.ServeHTTP(w, r)
}
代码分解如下:
- 首先处理k8s RESTful风格资源的请求
- 上一步不匹配的交给
nonGoRestfulMux
- 尝试找到匹配的路由并处理
- 找不到就委托给下一级,比如
aggregatorServer
的下一级是kubeAPIServer
。
总结
至此我们大致了解了kube-apiserver
中的三个组件的核心组件GenericAPIServer
是怎么将请求链接起来并移交请求的,以及内部的路由数据流,但我们还不了解怎么处理一些通用操作,比如认证,鉴权,审计等,以及k8s的api资源是怎么注册路由的,这两个问题我们放在后面的文章来说明。