kubernetes kube-apiserver源码阅读5之路由

文章目录

这里一节主要看kube-apiserver的路由注册和最终映射到后端存储的处理函数是怎么构造的,kube-apiserver中一共有三个组件,apiExtensionsServer,kubeAPIServer, aggregatorServer,每个组件都有路由注册,但其实核心逻辑是差不多的。

路由主要可以分为两个部分

  • 发现路由 比如/apis, 它会列出集群中可用的资源
  • 资源路由 比如/apis/apiextensions.k8s.io/v1/customresourcedefinitions,这段路由具体到了资源

apiExtensionsServer

为了简单起见,我们从apiExtensionsServer开始看。

func CreateServerChain() {
	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    // 1.
	apiResourceConfig := c.GenericConfig.MergedResourceConfig
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
    // 2.
	if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
		storage := map[string]rest.Storage{}
		// 3.
		customResourceDefintionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
		if err != nil {
			return nil, err
		}
		storage["customresourcedefinitions"] = customResourceDefintionStorage
		storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)

		apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
	}
	// 4.
	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
		return nil, err
	}
}

func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
    // 5.
	return s.InstallAPIGroups(apiGroupInfo)
}

代码分解如下:

  1. 获取通用的配置
  2. 判断是否启用v1版本和注册将要注册的资源放进apiGroupInfo, 从apiGroupInfo这个命名应该能猜出来,是将一组API组合在一起,即不同版本的API放在同一个组, 这里是v1beta和v1。
  3. 创建一个REST对象 内嵌了Store对象
  4. 将设置好的apiGroupInfo注册到GenericAPIServer
  5. 复用InstallAPIGroups

k8s中对于资源的操作最终都会持久化后端存储中,而对外是一种RESTful的方式提供操作接口,REST对象就是这么一个介于用户请求和后端存储之间的对象,它会将用户的请求以增删改查的方式作用到后端存储。虽然它很重要,但是,我们姑且接近将其看做一个提供了增删改查的黑盒子,等后续在回来看它的具体逻辑。

路由注册

// vendor\k8s.io\apiserver\pkg\endpoints\installer.go
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
	// 1.
	openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
	// 2.
	for _, apiGroupInfo := range apiGroupInfos {
        // 3.
		if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
			return fmt.Errorf("unable to install api resources: %v", err)
		}

		// 设置发现资源发现的路由 这里暂时忽略
	}
	return nil
}

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
	var resourceInfos []*storageversion.ResourceInfo
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
        // 4.
		apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
		apiGroupVersion.OpenAPIModels = openAPIModels
        // 5.
		r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
		resourceInfos = append(resourceInfos, r...)
	}
	return nil
}

// 6.
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
	storage := make(map[string]rest.Storage)
	for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
		storage[strings.ToLower(k)] = v
	}
	version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
    // /apis
	version.Root = apiPrefix
	version.Storage = storage
	return version
}

// 7.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
    // /apis/apiextensions.k8s.io/v1
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}
	// 8.
	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
    // 9.
	container.Add(ws)
}

代码分解如下:

  1. 获取openapi模型数据, 这里会包含各个字段和参数的类型定义

    这个是通过make gen_openapi命令生成的, 全部数据在pkg/generated/openapi/

  2. 遍历apiGroupInfos

  3. apiGroupInfo继续安装

  4. 基于apiGroupInfo等信息创建一个APIGroupVersion对象

  5. 基于上一步创建的APIGroupVersion对象继续安装

  6. 第4步的具体实现

  7. 第5步的具体实现

  8. 构造一个installer继续安装

  9. 将生成webservice注册到GenericAPIServer.Handler.GoRestfulContainer

上面的调用链还是比较长的,大致如下

sequenceDiagram InstallAPIGroup ->> installAPIResources: installAPIResources ->> InstallREST: InstallREST ->> installer.Install():

在整个调用链过程中,承载storage和路由信息的对象也在不断的变换,依次是APIGroupInfo``, APIGroupVersion, APIInstaller。一层包一层,k8s的老操作了。

最后到了注册的最核心代码。

func (g *APIGroupVersion) InstallREST() {
    // 1.
	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	container.Add(ws)
}

func (a *APIInstaller) Install() () {
	var apiResources []metav1.APIResource
	var resourceInfos []*storageversion.ResourceInfo
	var errors []error
	ws := a.newWebService()

	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
    // 2.
	for _, path := range paths {
		apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
	}
}

代码分解如下:

  1. 路由安装入口
  2. 依次为每个路由注册hanlder

为了对registerResourceHandlers这个函数以示尊重,单独看它的代码, 这个函数完整的代码800多行,这里仅粘贴一部分。

func (a *APIInstaller) registerResourceHandlers() () {
    // 1.
    resource, subresource, err := splitSubresource(path)
    
	// 2.
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	// 省略其他断言和判断
    
    var apiResource metav1.APIResource
	switch {
    // 3.
	case !namespaceScoped:
		// Handle non-namespace scoped resources like nodes.
		resourcePath := resource
		resourceParams := params
		itemPath := resourcePath + "/{name}"
		nameParams := append(params, nameParam)
		proxyParams := append(nameParams, pathParam)
		suffix := ""
		if isSubresource {
			suffix = "/" + subresource
			itemPath = itemPath + suffix
			resourcePath = itemPath
			resourceParams = nameParams
		}
		apiResource.Name = path
		apiResource.Namespaced = false
		apiResource.Kind = resourceKind
		namer := handlers.ContextBasedNaming{
			SelfLinker:         a.group.Linker,
			ClusterScoped:      true,
			SelfLinkPathPrefix: gpath.Join(a.prefix, resource) + "/",
			SelfLinkPathSuffix: suffix,
		}
		// 4.
        actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		// 省略其他actions,如LIST, POST等
	default:
        // 命名空间作用于的添加业务逻辑,区别在于,请求路径中会有namespaces/{namepsace}
	}

	// 5.
	for _, action := range actions {
		switch action.Verb {
		case "GET": // Get a resource.
            // 6.
            // 这里去除了各种判断条件
            handler = restfulGetResource(getter, exporter, reqScope)
			}

            // 7.
			route := ws.GET(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
			routes = append(routes, route)
        default:
        	return nil, nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
        }
        // 为啥这里要用routes,直接用route感觉也一样呀。。。奇怪的设计
        // 8.
        for _, route := range routes {
			route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
				Group:   reqScope.Kind.Group,
				Version: reqScope.Kind.Version,
				Kind:    reqScope.Kind.Kind,
			})
			route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
			ws.Route(route)
		}
	}

	return &apiResource, resourceInfo, nil
}

代码分解如下:

  1. 分割资源名称
  2. 断言各种类型,用于判断是否实现了create, update, list等接口, 如果实现了就可以生成对应的hanlder
  3. 分别对命名空间级别和非命名空间级别的判断,CRD是非命名空间级别资源
  4. 增加action对象
  5. 遍历所有支持的action对象
  6. 生成handler对象
  7. 使用restful.WebServiceroute方法生成路由
  8. 将路由加到restful.WebService对象中,这个对象后续会注册到GenericAPIServer.Handler.GoRestfulContainer

在去掉干扰项,上述的代码还是比较好理解的,至于go-restful的使用方法大家当做一个黑盒子即可

handler

在路由注册中有一个非常重要的对象,那就是handler, 它负责处理请求,接下来我们看看hanlder的生成

// 1.
handler = restfulGetResource(getter, exporter, reqScope)

func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
        // 2.
		handlers.GetResource(r, e, &scope)(res.ResponseWriter, req.Request)
	}
}

// vendor\k8s.io\apiserver\pkg\endpoints\handlers\get.go
func GetResource(r rest.Getter, e rest.Exporter, scope *RequestScope) http.HandlerFunc {
	return getResourceHandler(scope,
		func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
			// check for export
			options := metav1.GetOptions{}
			if values := req.URL.Query(); len(values) > 0 {
                // 3.
				if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
					err = errors.NewBadRequest(err.Error())
					return nil, err
				}
			}
            // 4.
			return r.Get(ctx, name, &options)
		})
}

代码分解如下:

  1. handler构造入口
  2. 上一步的具体实现
  3. 解析请求参数
  4. 调用REST对象的Get方法

增删改查的的差别其实差别不大,比较大的区别是增删改查中除了查,其他功能还会调用准入控制(Admit)方法。从上面的代码我们知道,最终处理增删改查的还是REST对象,或者说Store(Storage)对象(这三个词有时候差别不大,因为REST对象内嵌了Store对象)

REST

让我们回到最初创建REST对象的代码

// vendor\k8s.io\apiextensions-apiserver\pkg\apiserver\apiserver.go
// 注意: RESTOptionsGetter来自GenericConfig
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)

func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
    store := &genericregistry.Store{/*省略参数*/}
	options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
    // 1.
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, err
	}
	return &REST{store}, nil
}

func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    // 2.
    // DefaultQualifiedResource => customresourcedefinitions
    opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
    
    // 3.
	if e.Storage.Storage == nil {
		e.Storage.Codec = opts.StorageConfig.Codec
		var err error
        // 4.
		e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
			opts.StorageConfig,
			prefix,
			keyFunc,
			e.NewFunc,
			e.NewListFunc,
			attrFunc,
			options.TriggerFunc,
			options.Indexers,
		)
		e.StorageVersioner = opts.StorageConfig.EncodeVersioner
	}

	return nil
}

func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	obj := e.NewFunc()
	key, err := e.KeyFunc(ctx, name)
    
	if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
		return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
	}
	if e.Decorator != nil {
		if err := e.Decorator(obj); err != nil {
			return nil, err
		}
	}
	return obj, nil
}

代码分解如下:

  1. 补全Store对象
  2. 判断是否有Storage对象
  3. 创建Storage对象

可以看到后端存储的进行了三层封装,REST跟请求绑定,然后Store在Storage上做了一层封装,而Store的Storage负责具体的存储交互。这三个对象还是比较清晰的。

Storage(Store)

从上面的代码知道,Storage对象来自Options对象, 所以继续往前翻代码,不过想从ServerRunOptions找到RESTOptions注定徒劳无功,因为前者只有ETCD相关的配置,虽然k8s在ETCD之上加了一层抽象,理论上可以自己实现存储的接口就可以以替换ETCD, 但是几乎没人替换,除了一些k8s分支,比如k3s。

初始化

所以最前的初始化应该是ETCD配置的初始化,代码如下:

// cmd\kube-apiserver\app\options\options.go
func NewServerRunOptions() *ServerRunOptions {
	s := ServerRunOptions{
        // 1.
		Etcd:                    genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
    }
}

// cmd\kube-apiserver\app\server.go
func CreateKubeAPIServerConfig()  {
	buildGenericConfig(s.ServerRunOptions, proxyTransport)
}

func buildGenericConfig() {
    	// 2.
		if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
		return
	}
}

func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
    // 3.
	c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
	return nil
}

// 4.
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	storageConfig, err := f.StorageFactory.NewConfig(resource)

	ret := generic.RESTOptions{
		StorageConfig:           storageConfig,
        // 5.
		Decorator:               generic.UndecoratedStorage,
		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
		EnableGarbageCollection: f.Options.EnableGarbageCollection,
		ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
		CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
	}

	return ret, nil
}

代码分解如下:

  1. 创建ETCDOptions, 这些参数会负责跟命令行参数绑定
  2. ETCDOptions的参数应用到Config对象
  3. StorageFactoryRestOptionsFactory包装一层,比较重要的是它会实现GetRESTOptions方法。
  4. GetRESTOptions方法的具体实现
  5. 后续的ETCD client由这个方法实现

至此,Storage的参数初始化基本完成了,让我们看看后面Storage是怎么基于这些参数创建的。

创建

我们知道REST对象是和Storage绑定的,所以再次回到NewREST的代码

func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
	store := &genericregistry.Store{}
    // 1.
	options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, err
	}
	return &REST{store}, nil
}

func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    if e.Storage.Storage == nil {
		var err error
        // 2.
		e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
			opts.StorageConfig,
			prefix,
			keyFunc,
			e.NewFunc,
			e.NewListFunc,
			attrFunc,
			options.TriggerFunc,
			options.Indexers,
		)
	}
}
// vendor\k8s.io\apiserver\pkg\registry\generic\storage_decorator.go
// 3.
func UndecoratedStorage() (storage.Interface, factory.DestroyFunc, error) {
	return NewRawStorage(config, newFunc)
}

func NewRawStorage() (storage.Interface, factory.DestroyFunc, error) {
	return factory.Create(*config, newFunc)
}

func Create(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
    // 4.
	switch c.Type {
	case "etcd2":
		return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
	case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
		return newETCD3Storage(c, newFunc)
	default:
		return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
	}
}

func newETCD3Storage() (storage.Interface, DestroyFunc, error) {
    // 5.
	client, err := newETCD3Client(c.Transport)
    // 6.
	return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

// 7.
func New() storage.Interface {
	return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
}

func newStore() *store {
    // 8.
	result := &store{
		client:        c,
		codec:         codec,
		versioner:     versioner,
		transformer:   transformer,
		pagingEnabled: pagingEnabled,
		pathPrefix:   path.Join("/", prefix),
		watcher:      newWatcher(c, codec, newFunc, versioner, transformer),
		leaseManager: newDefaultLeaseManager(c),
	}
	return result
}

代码分解如下:

  1. optsGetter就是StorageFactoryRestOptionsFactory
  2. 使用之前初始化的Decorator创建Storage对象
  3. 上一步的具体实现
  4. 创建ETCD客户端的具体入口
  5. 创建etcd3 客户端
  6. 将etcd3客户端用store对象包装一层
  7. 上一步的具体实现
  8. 构造store的具体代码

ETCD客户端肯定不会实现k8s的storage接口的,所以包一层并不意外,至此,我们可以看看后端存储的具体实现,即客户端的请求是怎么持久化到后端的,这里我们只看GetCreate方法对应的实现。

后端存储

最后再重新梳理一遍整个请求的编解码和调用过程。

GET

// vendor\k8s.io\apiserver\pkg\endpoints\handlers\get.go
// 1.
func GetResource() http.HandlerFunc {
	return getResourceHandler(scope,
		func() (runtime.Object, error) {
			options := metav1.GetOptions{}
            // 2.
			return r.Get(ctx, name, &options)
		})
}

func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
    // 3.
	obj := e.NewFunc()
	key, err := e.KeyFunc(ctx, name)
    // 4.
	if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
		return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
	}
	return obj, nil
}

func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
    // 5.
	key = path.Join(s.pathPrefix, key)
	getResp, err := s.client.KV.Get(ctx, key)
	metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
	if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
		return err
	}
	// 6.
	if len(getResp.Kvs) == 0 {
		if opts.IgnoreNotFound {
			return runtime.SetZeroValue(out)
		}
		return storage.NewKeyNotFoundError(key, 0)
	}
	kv := getResp.Kvs[0]
	// 7.
	data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
	// 8.
	return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

代码分解如下:

  1. GET hanlder的构造函数
  2. 调用Store的Get方法
  3. 使用NewFunc构造要填充的对象,这里新建的__internal版本的对象
  4. 调用Storage的Get方法
  5. 构造key然后用etcd客户端获取
  6. 检查是否存在对应的键值对,不存在就返回404
  7. 如果指定了--encryption-provider-config参数, 会在读写数据的时候加解密数据,一般不设置。
  8. 将数据解码, 将__internal版本解码成客户端请求的版本

Storage的其他方法基本都差不多,对于ETCD客户端的包装主要在于编解码,持久化的时候将数据转换成__internal版本的对象并存储,获取的时候则将__internal版本的数据解码成用户指定版本的数据类型,或者默认的数据类型。

值得注意的是, 整个请求链的关系是 ResourceHandler -> REST -> Store -> store。

CREATE

func (a *APIInstaller) registerResourceHandlers() {
    // 1.
    fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer)
}
	// 2.
	reqScope := handlers.RequestScope{
		Kind:        fqKindToRegister,
        HubGroupVersion: schema.GroupVersion{Group: fqKindToRegister.Group, Version: runtime.APIVersionInternal},
    }
	// 3.
    handler = restfulCreateResource(creater, reqScope, admit)
}
// 4.
func restfulCreateResource() restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
	}
}


func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		// 5.
		decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
		// 6.
		body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)

        // 7.
        outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
        
        // 8.
		obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
        
        // 9.
		requestFunc := func() (runtime.Object, error) {
			return r.Create(
				ctx,
				name,
				obj,
				rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
				options,
			)
		}
		// 9.
		result, err := finishRequest(timeout, func() (runtime.Object, error) {
			result, err := requestFunc()
			return result, err
		})
		// 10.
		transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
	}
}

func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
	name, err := e.ObjectNameFunc(obj)
	key, err := e.KeyFunc(ctx, name)
	out := e.NewFunc()
	//  11.
	if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
		return nil, err
	}
	return out, nil
}

// 12.
func (s *store) Create() error {
	// 12.
	txnResp, err := s.client.KV.Txn(ctx).If(
		notFound(key),
	).Then(
        // 13.
		clientv3.OpPut(key, string(newData), opts...),
	).Commit()
	
    // 14.
	if out != nil {
		putResp := txnResp.Responses[0].GetResponsePut()
		return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
	}
	return nil
}

代码分解如下:

  1. 获取注册的默认GVR, 默认是__internal版本的GVR
  2. 设置reqscope变量,这个变量设置了一些比较重要的变量,比如HubGroupVersion, 它的值是versionruntime.APIVersionInterna
  3. hanlder构造方法
  4. 上一步具体调用
  5. 创建decoder, 注意: 这个decoder指定的版本是__internal
  6. 获取请求体
  7. 构造一个MediaTypeOptions, 它生成一个Convert方法,用于将结果转换成用户期望的那样。
  8. 将请求体解析成对应版本的对象
  9. Create方法用匿名方法封装起来
  10. 获取底层存储的调用结果
  11. 尝试转换结果
  12. 开启ETCD客户端的事务
  13. 调用ETCD客户端的create方法,将对象持久化到ETCD。
  14. 将ETCD调用结果转码返回。

总结

几乎所有的业务都是围绕着数据来开发的,k8s也不意外,所以对于资源的操作最终都会落到存储上。

参考链接