kubernetes kube-apiserver源码阅读6之准入控制

文章目录

之前看过kube-apiserver请求处理链的相关内容,知道一些通用逻辑,如认证,鉴权,审计之类的内容都以一种链条的方式组合在一起依次调用,但是这些处理函数中并不包含准入的业务逻辑,之所以这样,我想是因为准入只针对修改请求,所以放在通用的处理链中会不太和谐,再者就是准入是比较重要的抽象,所以单独抽离出来了。

初始化

初始化包含两部分,一部分是参数初始化,包括默认插件的注册,命令行参数绑定之类的,另一部分就是如何将参数应用到(ApplyTo) Config对象中。k8s的其他组件基本也是这样的模式,构造Options对象跟命令行参数绑定, 校验没有问题之后就通过ApplyTo方法应用到Config对象中。

参数初始化

首先从Options对象找找准入的初始化配置。

func NewAPIServerCommand() *cobra.Command {
    // 1.
	s := options.NewServerRunOptions()
}

func NewServerRunOptions() *ServerRunOptions {
	s := ServerRunOptions{
        // 2.
		Admission:               kubeoptions.NewAdmissionOptions()
    }
}

func NewAdmissionOptions() *AdmissionOptions {
    // 3.
	options := genericoptions.NewAdmissionOptions()
    // 4.
	// 注册所有的准入插件
	RegisterAllAdmissionPlugins(options.Plugins)
	// 设置推荐的插件顺序
	options.RecommendedPluginOrder = AllOrderedPlugins
	// 设置默认禁用的准入插件
	options.DefaultOffPlugins = DefaultOffAdmissionPlugins()

	return &AdmissionOptions{
		GenericAdmission: options,
	}
}

// 5.
func NewAdmissionOptions() *AdmissionOptions {
    // 6.
	options := &AdmissionOptions{
		Plugins:    admission.NewPlugins(),
		Decorators: admission.Decorators{admission.DecoratorFunc(admissionmetrics.WithControllerMetrics)},
		RecommendedPluginOrder: []string{lifecycle.PluginName, mutatingwebhook.PluginName, validatingwebhook.PluginName},
		DefaultOffPlugins:      sets.NewString(),
	}
    // 7.
	server.RegisterAllAdmissionPlugins(options.Plugins)
	return options
}

// 8.
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
	lifecycle.Register(plugins)
	validatingwebhook.Register(plugins)
	mutatingwebhook.Register(plugins)
}

代码分解如下:

  1. Options对象的初始化入口
  2. 准入控制参数的初始化入口
  3. 初始化一个AdmissionOptions, 它会作为GenericAdmission字段值传入, 值得注意的是, 这两个AdmissionOptions名字是一样的。
  4. 注册所有的插件, 以及覆盖默认的配置
  5. 第3步的具体实现
  6. 唯一值得注意的是Decorators,它会保证后续的整个准入链。
  7. 底层AdmissionOptions会注册的默认插件
  8. 上一步具体注册的插件

这里容易让人困扰的点是, AdmissionOptions需要一个AdmissionOptions, 在看代码的时候可能会看错,一个是贴近命令行参数,一个是贴近准入插件的配置。

从上面的代码,大致看到了两部分注册的插件,下面看看kube-apiserver所有支持的插件列表。

admit.Register(plugins) // DEPRECATED as no real meaning
alwayspullimages.Register(plugins)
antiaffinity.Register(plugins)
defaulttolerationseconds.Register(plugins)
defaultingressclass.Register(plugins)
deny.Register(plugins) // DEPRECATED as no real meaning
eventratelimit.Register(plugins)
exec.Register(plugins)
extendedresourcetoleration.Register(plugins)
gc.Register(plugins)
imagepolicy.Register(plugins)
limitranger.Register(plugins)
autoprovision.Register(plugins)
exists.Register(plugins)
noderestriction.Register(plugins)
nodetaint.Register(plugins)
label.Register(plugins) // DEPRECATED, future PVs should not rely on labels for zone topology
podnodeselector.Register(plugins)
podtolerationrestriction.Register(plugins)
runtimeclass.Register(plugins)
resourcequota.Register(plugins)
podsecuritypolicy.Register(plugins)
podpriority.Register(plugins)
scdeny.Register(plugins)
serviceaccount.Register(plugins)
setdefault.Register(plugins)
resize.Register(plugins)
storageobjectinuseprotection.Register(plugins)
certapproval.Register(plugins)
certsigning.Register(plugins)
certsubjectrestriction.Register(plugins)

lifecycle.Register(plugins)
validatingwebhook.Register(plugins)
mutatingwebhook.Register(plugins)

kube-apiserver准入控制支持的列表还是比较多的,但是最常用的应该是最下面的两个,因为我们扩展kube-apiserver的准入控制的时候一般就用这两个插件,所以后面着重看这两个插件的代码实现(但是这个初始化和实现还是比较复杂的,需要另开一篇文章),而其他的插件只看ServiceAccountNamespaceLifecycle这两个插件。

这里简单的看一下注册的逻辑, 以NamespaceLifecycle这个插件为例

// vendor\k8s.io\apiserver\pkg\admission\plugin\namespace\lifecycle\admission.go
func Register(plugins *admission.Plugins) {
	plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
		return NewLifecycle(sets.NewString(metav1.NamespaceDefault, metav1.NamespaceSystem, metav1.NamespacePublic))
	})
}

func (ps *Plugins) Register(name string, plugin Factory) {
	if ps.registry != nil {
		_, found := ps.registry[name]
		if found {
			klog.Fatalf("Admission plugin %q was registered twice", name)
		}
	} else {
		ps.registry = map[string]Factory{}
	}
	ps.registry[name] = plugin
}

注册的逻辑比较简单,就是注册一个实现了type Factory func(config io.Reader) (Interface, error)签名的函数,内部的存储结构是一个插件名到插件map。

准入的命令行参数有两种,一种是即将弃用的,一种是最新支持的,也许这也是有两个AdmissionOptions主要原因。

func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
	fs.StringSliceVar(&a.PluginNames, "admission-control", a.PluginNames, "Comma-delimited list of: "+strings.Join(a.GenericAdmission.Plugins.Registered(), ", ")+".")
	fs.MarkDeprecated("admission-control", "Use --enable-admission-plugins or --disable-admission-plugins instead. Will be removed in a future version.")
	fs.Lookup("admission-control").Hidden = false
	a.GenericAdmission.AddFlags(fs)
}

func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
	fs.StringSliceVar(&a.EnablePlugins, "enable-admission-plugins", a.EnablePlugins, ""+
		"admission plugins that should be enabled in addition to default enabled ones ("+
		strings.Join(a.defaultEnabledPluginNames(), ", ")+"). "+
		"Comma-delimited list of admission plugins: "+strings.Join(a.Plugins.Registered(), ", ")+". "+
		"The order of plugins in this flag does not matter.")
	fs.StringSliceVar(&a.DisablePlugins, "disable-admission-plugins", a.DisablePlugins, ""+
		"admission plugins that should be disabled although they are in the default enabled plugins list ("+
		strings.Join(a.defaultEnabledPluginNames(), ", ")+"). "+
		"Comma-delimited list of admission plugins: "+strings.Join(a.Plugins.Registered(), ", ")+". "+
		"The order of plugins in this flag does not matter.")
	fs.StringVar(&a.ConfigFile, "admission-control-config-file", a.ConfigFile,
		"File with admission control configuration.")
}

总的来说,你可以通过--admission-controlenable-admission-plugins,disable-admission-plugins 两种方式来指定插件或者启用禁用插件。现在基本不会使用前一种方式了,因为它要被弃用了。

应用参数(ApplyTo)

应用参数的调用路径如下:

func CreateServerChain() (*aggregatorapiserver.APIAggregator, error) {
	kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
}

func CreateKubeAPIServerConfig() {
	genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
}

func buildGenericConfig() {
    admissionConfig := &kubeapiserveradmission.Config{
		ExternalInformers:    versionedInformers,
		LoopbackClientConfig: genericConfig.LoopbackClientConfig,
		CloudConfigFile:      s.CloudProvider.CloudConfigFile,
	}
    pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
    err = s.Admission.ApplyTo(
		genericConfig,
		versionedInformers,
		kubeClientConfig,
		feature.DefaultFeatureGate,
		pluginInitializers...)
}

最前两个函数应该不用多说了,贴过很多遍了,这里主要看看最后一个函数里面关于准入控制的代码。这里首先忽略pluginInitializers, 因为它里面有很多陌生的方法。

func (a *AdmissionOptions) ApplyTo() error {
    // 1.
	return a.GenericAdmission.ApplyTo(c, informers, kubeAPIServerClientConfig, features, pluginInitializers...)
}

func (a *AdmissionOptions) ApplyTo() error {
    // 2.
	pluginNames := a.enabledPluginNames()
	// 3.  
	pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
    // 4.
	admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
    // 5.
	c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
	return nil
}


// 6.
func (a *AdmissionOptions) enabledPluginNames() []string {
	allOffPlugins := append(a.DefaultOffPlugins.List(), a.DisablePlugins...)
	disabledPlugins := sets.NewString(allOffPlugins...)
	enabledPlugins := sets.NewString(a.EnablePlugins...)
	disabledPlugins = disabledPlugins.Difference(enabledPlugins)

	orderedPlugins := []string{}
	for _, plugin := range a.RecommendedPluginOrder {
		if !disabledPlugins.Has(plugin) {
			orderedPlugins = append(orderedPlugins, plugin)
		}
	}

	return orderedPlugins
}

// 7.
func (ps *Plugins) NewFromPlugins() (Interface, error) {
	handlers := []Interface{}
	mutationPlugins := []string{}
	validationPlugins := []string{}
	for _, pluginName := range pluginNames {
		pluginConfig, err := configProvider.ConfigFor(pluginName)

		plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
		if plugin != nil {
			if decorator != nil {
				handlers = append(handlers, decorator.Decorate(plugin, pluginName))
			} else {
				handlers = append(handlers, plugin)
			}
		}
	}
	// 8.
	return newReinvocationHandler(chainAdmissionHandler(handlers)), nil
}

func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error) {
    // 9.
	plugin, found, err := ps.getPlugin(name, config)
    // 10.
	pluginInitializer.Initialize(plugin)
	// 11.
	if err := ValidateInitialization(plugin); err != nil {
		return nil, fmt.Errorf("failed to initialize admission plugin %q: %v", name, err)
	}

	return plugin, nil
}

func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) {
    // 12
	f, found := ps.registry[name]
	if !found {
		return nil, false, nil
	}

	config1, config2, err := splitStream(config)
	if !PluginEnabledFn(name, config1) {
		return nil, true, nil
	}

	ret, err := f(config2)
	return ret, true, err
}

代码分解如下:

  1. 将参数应用到Config对象上, 便于后续使用创建好的AdmissionControl对象
  2. 获取启用的插件列表, 就是将所有注册的插件中默认禁用的插件列表去掉,以及命令行参数中指定的禁用插件列表去掉
  3. 获取插件配置,一般来说为空,这里不深入
  4. 基于启用的插件列表生成admissionChain, 也就是将所有插件包装成一条链条,和之前的处理链差不多
  5. WithStepMetrics在包装一层,用于记录可监控的指标数据
  6. 第2步的具体实现
  7. 第4步的具体实现
  8. 无限套娃,用newReinvocationHandler(chainAdmissionHandler(在包装一遍,而handler也被decorator包装。
  9. 通过插件名获取插件对象
  10. pluginInitializer初始化插件
  11. 验证插件

总的来说,最外层的包装应该是admissionmetrics.WithStepMetrics -> newReinvocationHandler -> chainAdmissionHandler -> handlers

前面两个看起来只是基于总体的调用,而chainAdmissionHandler `很明显就是将所有插件聚合起来的函数。

type chainAdmissionHandler []Interface

func (admissionHandler chainAdmissionHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
	for _, handler := range admissionHandler {
		if !handler.Handles(a.GetOperation()) {
			continue
		}
		if mutator, ok := handler.(MutationInterface); ok {
			err := mutator.Admit(ctx, a, o)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

func (admissionHandler chainAdmissionHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
	for _, handler := range admissionHandler {
		if !handler.Handles(a.GetOperation()) {
			continue
		}
		if validator, ok := handler.(ValidationInterface); ok {
			err := validator.Validate(ctx, a, o)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

func (admissionHandler chainAdmissionHandler) Handles(operation Operation) bool {
	for _, handler := range admissionHandler {
		if handler.Handles(operation) {
			return true
		}
	}
	return false
}

通过阅读上面的代码我们知道,这个chainAdmissionHandler对象实现了Admit, Validate, Handles三个方法,而这三个方法的实现其实就是依次调用各个插件。所以我们研究插件的具体实现,直接看各个插件的实现即可

这里我们可以简单的认为, Admit代表修改请求,Validate代表校验请求。

值得注意的是,Admit总是在Validate之前被调用

实现

这里主要看两个插件的实现

  • ServiceAccount
  • NamespaceLifecycle

ServiceAccount

一般来说我在创建应用的时候不会指定应用对应的ServiceAccount, 但是当我查看创建好的应用的pod的时候,会发现它会关联一个ServiceAccount

# 其他字段省略
apiVersion: v1
kind: Pod
spec:
  serviceAccount: default
  serviceAccountName: default
  volumes:
  - name: default-token-xxxxx
    secret:
      defaultMode: 420
      secretName: default-token-xxxxx

默认情况下是default,并且会关联一个对应的secrets. 那么是谁在帮助我们呢?答案不言而喻,那就是k8s内置了许多准入插件,这些准入插件会校验用户的请求也会修改用户的请求,而这一步的内容由ServiceAccount这个插件负责。

Admit

// plugin\pkg\admission\serviceaccount\admission.go
func (s *Plugin) Admit() (err error) {
	pod := a.GetObject().(*api.Pod)

    // 忽略镜像pod的代码
    
	// 设置默认的ServiceAccount
	if len(pod.Spec.ServiceAccountName) == 0 {
        // default
		pod.Spec.ServiceAccountName = DefaultServiceAccountName
	}
	// 获取对应的ServiceAccount对象
	serviceAccount, err := s.getServiceAccount(a.GetNamespace(), pod.Spec.ServiceAccountName)

    // 如果没有挂载default-token就尝试新增volumes配置
	if s.MountServiceAccountToken && shouldAutomount(serviceAccount, pod) {
		if err := s.mountServiceAccountToken(serviceAccount, pod); err != nil {
			if _, ok := err.(errors.APIStatus); ok {
				return err
			}
			return admission.NewForbidden(a, err)
		}
	}
    // 如果pod没有指定ImagePullSecrets且对应的ServiceAccount有ImagePullSecrets就配置ImagePullSecrets
	if len(pod.Spec.ImagePullSecrets) == 0 {
		pod.Spec.ImagePullSecrets = make([]api.LocalObjectReference, len(serviceAccount.ImagePullSecrets))
		for i := 0; i < len(serviceAccount.ImagePullSecrets); i++ {
			pod.Spec.ImagePullSecrets[i].Name = serviceAccount.ImagePullSecrets[i].Name
		}
	}

	return s.Validate(ctx, a, o)
}

Validate

func (s *Plugin) Validate() (err error) {
	pod := a.GetObject().(*api.Pod)

	// 忽略镜像pod的代码

	// 获取serviceAccount
	serviceAccount, err := s.getServiceAccount(a.GetNamespace(), pod.Spec.ServiceAccountName)

	// 尝试校验pod配置的secrets引用是否合法,一般不校验
	if s.enforceMountableSecrets(serviceAccount) {
		if err := s.limitSecretReferences(serviceAccount, pod); err != nil {
			return admission.NewForbidden(a, err)
		}
	}

	return nil
}

NamespaceLifecycle

如果我们执行kubectl delete kube-system,会发现报错error: the server doesn't have a resource type "kube-system",之所以报错,就是因为这个插件会拒绝这样的请求。

初始化

func Register(plugins *admission.Plugins) {
	plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
		return NewLifecycle(sets.NewString(metav1.NamespaceDefault, metav1.NamespaceSystem, metav1.NamespacePublic))
	})
}

这里初始化的时候注册了三个不能被删除的命名空间,分别是default,kube-system,kube-public

Admit

这个插件只实现的Admit方法。

func (l *Lifecycle) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
	// 阻止删除不能删除的命名空间,即注册时配置的三个命名空间
	if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == v1.SchemeGroupVersion.WithKind("Namespace").GroupKind() && l.immortalNamespaces.Has(a.GetName()) {
		return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("this namespace may not be deleted"))
	}

	// 不是命名空间的资源就忽略
	if len(a.GetNamespace()) == 0 && a.GetKind().GroupKind() != v1.SchemeGroupVersion.WithKind("Namespace").GroupKind() {
		return nil
	}
	
    // 还有创建和删除的逻辑,这里省略

	return nil
}

其实可以看到,Admit并不只是代表修改用户请求,它本身可以包含修改和校验两部分的逻辑。

准入控制

最后来看看准入控制是怎么集成到请求的处理逻辑中的。整个调用链还是比较深的,这里就忽略路由注册之前的大部分调用代码,直接进入到installer.Install(),路由注册的逻辑可以看之前的文章。

installer.Install()

func (a *APIInstaller) Install() {
	for _, path := range paths {
		apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)

	}
	return apiResources, resourceInfos, ws, errors
}

func (a *APIInstaller) registerResourceHandlers() {
    // 1.
	admit := a.group.Admit
    
    case "POST": // Create a resource.
        var handler restful.RouteFunction
    	// 2.
        if isNamedCreater {
            handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
        } else {
            handler = restfulCreateResource(creater, reqScope, admit)
        }
        route := ws.POST(action.Path).To(handler)
}

// 3.
func restfulCreateResource() restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
	}
}

// 4.
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
	return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}

func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
       // 5.
       requestFunc := func() (runtime.Object, error) {
			return r.Create(
				ctx,
				name,
				obj,
				rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
				options,
			)
		}
        // 6.
        result, err := finishRequest(timeout, func() (runtime.Object, error) {
            // 7.
			if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
				if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
					return nil, err
				}
			}
			// 8.
			result, err := requestFunc()
			return result, err
		})
}

// 9.
func AdmissionToValidateObjectFunc(admit admission.Interface, staticAttributes admission.Attributes, o admission.ObjectInterfaces) ValidateObjectFunc {
	validatingAdmission, ok := admit.(admission.ValidationInterface)
	if !ok {
		return func(ctx context.Context, obj runtime.Object) error { return nil }
	}
	return func(ctx context.Context, obj runtime.Object) error {
		name := staticAttributes.GetName()

		finalAttributes := admission.NewAttributesRecord(
			obj,
			staticAttributes.GetOldObject(),
			staticAttributes.GetKind(),
			staticAttributes.GetNamespace(),
			name,
			staticAttributes.GetResource(),
			staticAttributes.GetSubresource(),
			staticAttributes.GetOperation(),
			staticAttributes.GetOperationOptions(),
			staticAttributes.IsDryRun(),
			staticAttributes.GetUserInfo(),
		)
		if !validatingAdmission.Handles(finalAttributes.GetOperation()) {
			return nil
		}
        // 10.
		return validatingAdmission.Validate(ctx, finalAttributes, o)
	}
}

代码分解如下:

  1. 首先获取admit对象也就是上文创建的AdmissionControl
  2. 将准入控制包装起来
  3. 上一步的具体实现
  4. 上一步的具体实现,k8s经典套娃
  5. 将后端存储的Create逻辑用匿名函数包装一下,是为了以后扩展?不太懂
  6. 将获取后端结果的调用用finishRequest封装一下,后者会处理异常和超时。
  7. 在存储之前先尝试Admit一下,这就是为什么之前说Admit总在valiedate前的原因。
  8. 获取存储持久化的结果
  9. 第5的中Validate的封装。

总结

准入控制是k8s提供的一种控制逻辑,它可以修改和验证用户请求,并且这个控制方式可以通过MutatingAdmissionWebhook, ValidatingAdmissionWebhook的方式扩展。

参考链接