kubernetes kube-apiserver源码阅读6之准入控制
之前看过kube-apiserver
请求处理链的相关内容,知道一些通用逻辑,如认证,鉴权,审计之类的内容都以一种链条的方式组合在一起依次调用,但是这些处理函数中并不包含准入的业务逻辑,之所以这样,我想是因为准入只针对修改请求,所以放在通用的处理链中会不太和谐,再者就是准入是比较重要的抽象,所以单独抽离出来了。
初始化
初始化包含两部分,一部分是参数初始化,包括默认插件的注册,命令行参数绑定之类的,另一部分就是如何将参数应用到(ApplyTo
) Config
对象中。k8s的其他组件基本也是这样的模式,构造Options
对象跟命令行参数绑定, 校验没有问题之后就通过ApplyTo
方法应用到Config
对象中。
参数初始化
首先从Options对象找找准入的初始化配置。
func NewAPIServerCommand() *cobra.Command {
// 1.
s := options.NewServerRunOptions()
}
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
// 2.
Admission: kubeoptions.NewAdmissionOptions()
}
}
func NewAdmissionOptions() *AdmissionOptions {
// 3.
options := genericoptions.NewAdmissionOptions()
// 4.
// 注册所有的准入插件
RegisterAllAdmissionPlugins(options.Plugins)
// 设置推荐的插件顺序
options.RecommendedPluginOrder = AllOrderedPlugins
// 设置默认禁用的准入插件
options.DefaultOffPlugins = DefaultOffAdmissionPlugins()
return &AdmissionOptions{
GenericAdmission: options,
}
}
// 5.
func NewAdmissionOptions() *AdmissionOptions {
// 6.
options := &AdmissionOptions{
Plugins: admission.NewPlugins(),
Decorators: admission.Decorators{admission.DecoratorFunc(admissionmetrics.WithControllerMetrics)},
RecommendedPluginOrder: []string{lifecycle.PluginName, mutatingwebhook.PluginName, validatingwebhook.PluginName},
DefaultOffPlugins: sets.NewString(),
}
// 7.
server.RegisterAllAdmissionPlugins(options.Plugins)
return options
}
// 8.
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
lifecycle.Register(plugins)
validatingwebhook.Register(plugins)
mutatingwebhook.Register(plugins)
}
代码分解如下:
Options
对象的初始化入口- 准入控制参数的初始化入口
- 初始化一个
AdmissionOptions
, 它会作为GenericAdmission
字段值传入, 值得注意的是, 这两个AdmissionOptions
名字是一样的。 - 注册所有的插件, 以及覆盖默认的配置
- 第3步的具体实现
- 唯一值得注意的是
Decorators
,它会保证后续的整个准入链。 - 底层
AdmissionOptions
会注册的默认插件 - 上一步具体注册的插件
这里容易让人困扰的点是, AdmissionOptions
需要一个AdmissionOptions
, 在看代码的时候可能会看错,一个是贴近命令行参数,一个是贴近准入插件的配置。
从上面的代码,大致看到了两部分注册的插件,下面看看kube-apiserver
所有支持的插件列表。
admit.Register(plugins) // DEPRECATED as no real meaning
alwayspullimages.Register(plugins)
antiaffinity.Register(plugins)
defaulttolerationseconds.Register(plugins)
defaultingressclass.Register(plugins)
deny.Register(plugins) // DEPRECATED as no real meaning
eventratelimit.Register(plugins)
exec.Register(plugins)
extendedresourcetoleration.Register(plugins)
gc.Register(plugins)
imagepolicy.Register(plugins)
limitranger.Register(plugins)
autoprovision.Register(plugins)
exists.Register(plugins)
noderestriction.Register(plugins)
nodetaint.Register(plugins)
label.Register(plugins) // DEPRECATED, future PVs should not rely on labels for zone topology
podnodeselector.Register(plugins)
podtolerationrestriction.Register(plugins)
runtimeclass.Register(plugins)
resourcequota.Register(plugins)
podsecuritypolicy.Register(plugins)
podpriority.Register(plugins)
scdeny.Register(plugins)
serviceaccount.Register(plugins)
setdefault.Register(plugins)
resize.Register(plugins)
storageobjectinuseprotection.Register(plugins)
certapproval.Register(plugins)
certsigning.Register(plugins)
certsubjectrestriction.Register(plugins)
lifecycle.Register(plugins)
validatingwebhook.Register(plugins)
mutatingwebhook.Register(plugins)
kube-apiserver
准入控制支持的列表还是比较多的,但是最常用的应该是最下面的两个,因为我们扩展kube-apiserver
的准入控制的时候一般就用这两个插件,所以后面着重看这两个插件的代码实现(但是这个初始化和实现还是比较复杂的,需要另开一篇文章),而其他的插件只看ServiceAccount
和NamespaceLifecycle
这两个插件。
这里简单的看一下注册的逻辑, 以NamespaceLifecycle
这个插件为例
// vendor\k8s.io\apiserver\pkg\admission\plugin\namespace\lifecycle\admission.go
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
return NewLifecycle(sets.NewString(metav1.NamespaceDefault, metav1.NamespaceSystem, metav1.NamespacePublic))
})
}
func (ps *Plugins) Register(name string, plugin Factory) {
if ps.registry != nil {
_, found := ps.registry[name]
if found {
klog.Fatalf("Admission plugin %q was registered twice", name)
}
} else {
ps.registry = map[string]Factory{}
}
ps.registry[name] = plugin
}
注册的逻辑比较简单,就是注册一个实现了type Factory func(config io.Reader) (Interface, error)
签名的函数,内部的存储结构是一个插件名到插件map。
准入的命令行参数有两种,一种是即将弃用的,一种是最新支持的,也许这也是有两个AdmissionOptions
主要原因。
func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&a.PluginNames, "admission-control", a.PluginNames, "Comma-delimited list of: "+strings.Join(a.GenericAdmission.Plugins.Registered(), ", ")+".")
fs.MarkDeprecated("admission-control", "Use --enable-admission-plugins or --disable-admission-plugins instead. Will be removed in a future version.")
fs.Lookup("admission-control").Hidden = false
a.GenericAdmission.AddFlags(fs)
}
func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&a.EnablePlugins, "enable-admission-plugins", a.EnablePlugins, ""+
"admission plugins that should be enabled in addition to default enabled ones ("+
strings.Join(a.defaultEnabledPluginNames(), ", ")+"). "+
"Comma-delimited list of admission plugins: "+strings.Join(a.Plugins.Registered(), ", ")+". "+
"The order of plugins in this flag does not matter.")
fs.StringSliceVar(&a.DisablePlugins, "disable-admission-plugins", a.DisablePlugins, ""+
"admission plugins that should be disabled although they are in the default enabled plugins list ("+
strings.Join(a.defaultEnabledPluginNames(), ", ")+"). "+
"Comma-delimited list of admission plugins: "+strings.Join(a.Plugins.Registered(), ", ")+". "+
"The order of plugins in this flag does not matter.")
fs.StringVar(&a.ConfigFile, "admission-control-config-file", a.ConfigFile,
"File with admission control configuration.")
}
总的来说,你可以通过--admission-control
和 enable-admission-plugins,disable-admission-plugins
两种方式来指定插件或者启用禁用插件。现在基本不会使用前一种方式了,因为它要被弃用了。
应用参数(ApplyTo)
应用参数的调用路径如下:
func CreateServerChain() (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
}
func CreateKubeAPIServerConfig() {
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
}
func buildGenericConfig() {
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
feature.DefaultFeatureGate,
pluginInitializers...)
}
最前两个函数应该不用多说了,贴过很多遍了,这里主要看看最后一个函数里面关于准入控制的代码。这里首先忽略pluginInitializers
, 因为它里面有很多陌生的方法。
func (a *AdmissionOptions) ApplyTo() error {
// 1.
return a.GenericAdmission.ApplyTo(c, informers, kubeAPIServerClientConfig, features, pluginInitializers...)
}
func (a *AdmissionOptions) ApplyTo() error {
// 2.
pluginNames := a.enabledPluginNames()
// 3.
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
// 4.
admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
// 5.
c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
return nil
}
// 6.
func (a *AdmissionOptions) enabledPluginNames() []string {
allOffPlugins := append(a.DefaultOffPlugins.List(), a.DisablePlugins...)
disabledPlugins := sets.NewString(allOffPlugins...)
enabledPlugins := sets.NewString(a.EnablePlugins...)
disabledPlugins = disabledPlugins.Difference(enabledPlugins)
orderedPlugins := []string{}
for _, plugin := range a.RecommendedPluginOrder {
if !disabledPlugins.Has(plugin) {
orderedPlugins = append(orderedPlugins, plugin)
}
}
return orderedPlugins
}
// 7.
func (ps *Plugins) NewFromPlugins() (Interface, error) {
handlers := []Interface{}
mutationPlugins := []string{}
validationPlugins := []string{}
for _, pluginName := range pluginNames {
pluginConfig, err := configProvider.ConfigFor(pluginName)
plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
if plugin != nil {
if decorator != nil {
handlers = append(handlers, decorator.Decorate(plugin, pluginName))
} else {
handlers = append(handlers, plugin)
}
}
}
// 8.
return newReinvocationHandler(chainAdmissionHandler(handlers)), nil
}
func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error) {
// 9.
plugin, found, err := ps.getPlugin(name, config)
// 10.
pluginInitializer.Initialize(plugin)
// 11.
if err := ValidateInitialization(plugin); err != nil {
return nil, fmt.Errorf("failed to initialize admission plugin %q: %v", name, err)
}
return plugin, nil
}
func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) {
// 12
f, found := ps.registry[name]
if !found {
return nil, false, nil
}
config1, config2, err := splitStream(config)
if !PluginEnabledFn(name, config1) {
return nil, true, nil
}
ret, err := f(config2)
return ret, true, err
}
代码分解如下:
- 将参数应用到
Config
对象上, 便于后续使用创建好的AdmissionControl
对象 - 获取启用的插件列表, 就是将所有注册的插件中默认禁用的插件列表去掉,以及命令行参数中指定的禁用插件列表去掉
- 获取插件配置,一般来说为空,这里不深入
- 基于启用的插件列表生成
admissionChain
, 也就是将所有插件包装成一条链条,和之前的处理链差不多 - 用
WithStepMetrics
在包装一层,用于记录可监控的指标数据 - 第2步的具体实现
- 第4步的具体实现
- 无限套娃,用
newReinvocationHandler(chainAdmissionHandler(
在包装一遍,而handler
也被decorator
包装。 - 通过插件名获取插件对象
- 用
pluginInitializer
初始化插件 - 验证插件
总的来说,最外层的包装应该是admissionmetrics.WithStepMetrics
-> newReinvocationHandler
-> chainAdmissionHandler
-> handlers
。
前面两个看起来只是基于总体的调用,而chainAdmissionHandler
`很明显就是将所有插件聚合起来的函数。
type chainAdmissionHandler []Interface
func (admissionHandler chainAdmissionHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
continue
}
if mutator, ok := handler.(MutationInterface); ok {
err := mutator.Admit(ctx, a, o)
if err != nil {
return err
}
}
}
return nil
}
func (admissionHandler chainAdmissionHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
continue
}
if validator, ok := handler.(ValidationInterface); ok {
err := validator.Validate(ctx, a, o)
if err != nil {
return err
}
}
}
return nil
}
func (admissionHandler chainAdmissionHandler) Handles(operation Operation) bool {
for _, handler := range admissionHandler {
if handler.Handles(operation) {
return true
}
}
return false
}
通过阅读上面的代码我们知道,这个chainAdmissionHandler
对象实现了Admit, Validate, Handles
三个方法,而这三个方法的实现其实就是依次调用各个插件。所以我们研究插件的具体实现,直接看各个插件的实现即可
这里我们可以简单的认为, Admit代表修改请求,Validate代表校验请求。
值得注意的是,Admit
总是在Validate
之前被调用
实现
这里主要看两个插件的实现
- ServiceAccount
- NamespaceLifecycle
ServiceAccount
一般来说我在创建应用的时候不会指定应用对应的ServiceAccount
, 但是当我查看创建好的应用的pod的时候,会发现它会关联一个ServiceAccount
。
# 其他字段省略
apiVersion: v1
kind: Pod
spec:
serviceAccount: default
serviceAccountName: default
volumes:
- name: default-token-xxxxx
secret:
defaultMode: 420
secretName: default-token-xxxxx
默认情况下是default
,并且会关联一个对应的secrets
. 那么是谁在帮助我们呢?答案不言而喻,那就是k8s内置了许多准入插件,这些准入插件会校验用户的请求也会修改用户的请求,而这一步的内容由ServiceAccount
这个插件负责。
Admit
// plugin\pkg\admission\serviceaccount\admission.go
func (s *Plugin) Admit() (err error) {
pod := a.GetObject().(*api.Pod)
// 忽略镜像pod的代码
// 设置默认的ServiceAccount
if len(pod.Spec.ServiceAccountName) == 0 {
// default
pod.Spec.ServiceAccountName = DefaultServiceAccountName
}
// 获取对应的ServiceAccount对象
serviceAccount, err := s.getServiceAccount(a.GetNamespace(), pod.Spec.ServiceAccountName)
// 如果没有挂载default-token就尝试新增volumes配置
if s.MountServiceAccountToken && shouldAutomount(serviceAccount, pod) {
if err := s.mountServiceAccountToken(serviceAccount, pod); err != nil {
if _, ok := err.(errors.APIStatus); ok {
return err
}
return admission.NewForbidden(a, err)
}
}
// 如果pod没有指定ImagePullSecrets且对应的ServiceAccount有ImagePullSecrets就配置ImagePullSecrets
if len(pod.Spec.ImagePullSecrets) == 0 {
pod.Spec.ImagePullSecrets = make([]api.LocalObjectReference, len(serviceAccount.ImagePullSecrets))
for i := 0; i < len(serviceAccount.ImagePullSecrets); i++ {
pod.Spec.ImagePullSecrets[i].Name = serviceAccount.ImagePullSecrets[i].Name
}
}
return s.Validate(ctx, a, o)
}
Validate
func (s *Plugin) Validate() (err error) {
pod := a.GetObject().(*api.Pod)
// 忽略镜像pod的代码
// 获取serviceAccount
serviceAccount, err := s.getServiceAccount(a.GetNamespace(), pod.Spec.ServiceAccountName)
// 尝试校验pod配置的secrets引用是否合法,一般不校验
if s.enforceMountableSecrets(serviceAccount) {
if err := s.limitSecretReferences(serviceAccount, pod); err != nil {
return admission.NewForbidden(a, err)
}
}
return nil
}
NamespaceLifecycle
如果我们执行kubectl delete kube-system
,会发现报错error: the server doesn't have a resource type "kube-system"
,之所以报错,就是因为这个插件会拒绝这样的请求。
初始化
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
return NewLifecycle(sets.NewString(metav1.NamespaceDefault, metav1.NamespaceSystem, metav1.NamespacePublic))
})
}
这里初始化的时候注册了三个不能被删除的命名空间,分别是default,kube-system,kube-public
Admit
这个插件只实现的Admit
方法。
func (l *Lifecycle) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
// 阻止删除不能删除的命名空间,即注册时配置的三个命名空间
if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == v1.SchemeGroupVersion.WithKind("Namespace").GroupKind() && l.immortalNamespaces.Has(a.GetName()) {
return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("this namespace may not be deleted"))
}
// 不是命名空间的资源就忽略
if len(a.GetNamespace()) == 0 && a.GetKind().GroupKind() != v1.SchemeGroupVersion.WithKind("Namespace").GroupKind() {
return nil
}
// 还有创建和删除的逻辑,这里省略
return nil
}
其实可以看到,Admit
并不只是代表修改用户请求,它本身可以包含修改和校验两部分的逻辑。
准入控制
最后来看看准入控制是怎么集成到请求的处理逻辑中的。整个调用链还是比较深的,这里就忽略路由注册之前的大部分调用代码,直接进入到installer.Install()
,路由注册的逻辑可以看之前的文章。
installer.Install()
func (a *APIInstaller) Install() {
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
}
return apiResources, resourceInfos, ws, errors
}
func (a *APIInstaller) registerResourceHandlers() {
// 1.
admit := a.group.Admit
case "POST": // Create a resource.
var handler restful.RouteFunction
// 2.
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
route := ws.POST(action.Path).To(handler)
}
// 3.
func restfulCreateResource() restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
// 4.
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// 5.
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
// 6.
result, err := finishRequest(timeout, func() (runtime.Object, error) {
// 7.
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// 8.
result, err := requestFunc()
return result, err
})
}
// 9.
func AdmissionToValidateObjectFunc(admit admission.Interface, staticAttributes admission.Attributes, o admission.ObjectInterfaces) ValidateObjectFunc {
validatingAdmission, ok := admit.(admission.ValidationInterface)
if !ok {
return func(ctx context.Context, obj runtime.Object) error { return nil }
}
return func(ctx context.Context, obj runtime.Object) error {
name := staticAttributes.GetName()
finalAttributes := admission.NewAttributesRecord(
obj,
staticAttributes.GetOldObject(),
staticAttributes.GetKind(),
staticAttributes.GetNamespace(),
name,
staticAttributes.GetResource(),
staticAttributes.GetSubresource(),
staticAttributes.GetOperation(),
staticAttributes.GetOperationOptions(),
staticAttributes.IsDryRun(),
staticAttributes.GetUserInfo(),
)
if !validatingAdmission.Handles(finalAttributes.GetOperation()) {
return nil
}
// 10.
return validatingAdmission.Validate(ctx, finalAttributes, o)
}
}
代码分解如下:
- 首先获取
admit
对象也就是上文创建的AdmissionControl
- 将准入控制包装起来
- 上一步的具体实现
- 上一步的具体实现,k8s经典套娃
- 将后端存储的
Create
逻辑用匿名函数包装一下,是为了以后扩展?不太懂 - 将获取后端结果的调用用
finishRequest
封装一下,后者会处理异常和超时。 - 在存储之前先尝试
Admit
一下,这就是为什么之前说Admit
总在valiedate
前的原因。 - 获取存储持久化的结果
- 第5的中
Validate
的封装。
总结
准入控制是k8s提供的一种控制逻辑,它可以修改和验证用户请求,并且这个控制方式可以通过MutatingAdmissionWebhook
, ValidatingAdmissionWebhook
的方式扩展。