kubectl源码阅读2

文章目录

书接上回,继续看kubectl 删除, 更新, 查询操作的源代码。因为之前已经介绍了kubectl一些常用的设计模式和代码结构,所以本文会比较快的阅读其他操作的代码。

删除

首先看删除的代码流程吧。

func NewCmdDelete(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
	deleteFlags := NewDeleteCommandFlags("containing the resource to delete.")

	cmd := &cobra.Command{
		Use:                   "delete ([-f FILENAME] | [-k DIRECTORY] | TYPE [(NAME | -l label | --all)])",
		Run: func(cmd *cobra.Command, args []string) {
            // 1.
            cmdutil.CheckErr(o.Complete(f, args, cmd))
			cmdutil.CheckErr(o.Validate())
            // 2.
			cmdutil.CheckErr(o.RunDelete(f))
		},
	}
	return cmd
}

func (o *DeleteOptions) Complete(f cmdutil.Factory, args []string, cmd *cobra.Command) error {
    // 3.
	if len(o.Raw) == 0 {
		r := f.NewBuilder().
			Unstructured().
			ContinueOnError().
			NamespaceParam(cmdNamespace).DefaultNamespace().
			FilenameParam(enforceNamespace, &o.FilenameOptions).
			LabelSelectorParam(o.LabelSelector).
			FieldSelectorParam(o.FieldSelector).
			SelectAllParam(o.DeleteAll).
			AllNamespaces(o.DeleteAllNamespaces).
			ResourceTypeOrNameArgs(false, args...).RequireObject(false).
			Flatten().
			Do()
		o.Result = r
		o.Mapper, err = f.ToRESTMapper()
		o.DynamicClient, err = f.DynamicClient()

	}

	return nil
}

func (o *DeleteOptions) RunDelete(f cmdutil.Factory) error {
	return o.DeleteResult(o.Result)
}

func (o *DeleteOptions) DeleteResult(r *resource.Result) error {
    // 4.
	err := r.Visit(func(info *resource.Info, err error) error {
		// 5.
		options := &metav1.DeleteOptions{}
		if o.GracePeriod >= 0 {
			options = metav1.NewDeleteOptions(int64(o.GracePeriod))
		}
		options.PropagationPolicy = &o.CascadingStrategy

        // 6.
		response, err := o.deleteResource(info, options)
		resourceLocation := cmdwait.ResourceLocation{
			GroupResource: info.Mapping.Resource.GroupResource(),
			Namespace:     info.Namespace,
			Name:          info.Name,
		}
		if status, ok := response.(*metav1.Status); ok && status.Details != nil {
			uidMap[resourceLocation] = status.Details.UID
			return nil
		}
		responseMetadata, err := meta.Accessor(response)
		uidMap[resourceLocation] = responseMetadata.GetUID()
		return nil
	})
    
	waitOptions := cmdwait.WaitOptions{
		ResourceFinder: genericclioptions.ResourceFinderForResult(resource.InfoListVisitor(deletedInfos)),
		UIDMap:         uidMap,
		DynamicClient:  o.DynamicClient,
		Timeout:        effectiveTimeout,

		Printer:     printers.NewDiscardingPrinter(),
		ConditionFn: cmdwait.IsDeleted,
		IOStreams:   o.IOStreams,
	}
    // 7.
	err = waitOptions.RunWait()
	return err
}

代码分解如下:

  1. 填充参数等
  2. 业务逻辑入口
  3. 删除操作将Result对象的构建过程放在了这,有点不和谐呀,感觉放在DeleteResult中也没啥问题
  4. 遍历解析的infos列表
  5. 设置删除的参数,优雅退出时间,删除策略等。
  6. 删除资源操作,后面的逻辑并不复杂,所以不深入了
  7. 等待删除资源是否删除

这里面稍微比较复杂的是等待的逻辑。

更新

代码如下:

func NewCmdApply(baseName string, f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
	o := NewApplyOptions(ioStreams)

	cmd := &cobra.Command{
		Use:                   "apply (-f FILENAME | -k DIRECTORY)",
		Run: func(cmd *cobra.Command, args []string) {
            // 1.
			cmdutil.CheckErr(o.Complete(f, cmd))
			cmdutil.CheckErr(validateArgs(cmd, args))
			cmdutil.CheckErr(validatePruneAll(o.Prune, o.All, o.Selector))
			cmdutil.CheckErr(o.Run())
		},
	}
}

func (o *ApplyOptions) Run() error {
    // 2.
	infos, err := o.GetObjects()
	for _, info := range infos {
		if err := o.applyOneObject(info); err != nil {
			errs = append(errs, err)
		}
	}
	return nil
}


func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) {
	var err error = nil
	if !o.objectsCached {
        // 3.
		r := o.Builder.
			Unstructured().
			Schema(o.Validator).
			ContinueOnError().
			NamespaceParam(o.Namespace).DefaultNamespace().
			FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions).
			LabelSelectorParam(o.Selector).
			Flatten().
			Do()
		o.objects, err = r.Infos()
		o.objectsCached = true
	}
	return o.objects, err
}

func (o *ApplyOptions) applyOneObject(info *resource.Info) error {
    // 4.
	helper := resource.NewHelper(info.Client, info.Mapping).
		DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
		WithFieldManager(o.FieldManager)
    // 5.
	if o.ServerSideApply {
		// 服务端apply的逻辑, 成功或者出错都退出
	}
	// 6.
	modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
	// 7.
	if err := info.Get(); err != nil {
        // 8.
		if !errors.IsNotFound(err) {
			return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving current configuration of:\n%s\nfrom server for:", info.String()), info.Source, err)
		}
		
        // 9.
		if err := util.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil {
			return cmdutil.AddSourceToErr("creating", info.Source, err)
		}
		if o.DryRunStrategy != cmdutil.DryRunClient {
			obj, err := helper.Create(info.Namespace, true, info.Object)
			info.Refresh(obj, true)
		}
		return nil
	}

	if err := o.MarkObjectVisited(info); err != nil {
		return err
	}

	if o.DryRunStrategy != cmdutil.DryRunClient {
        // 10.
		patcher, err := newPatcher(o, info, helper)
		patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
		info.Refresh(patchedObject, true)
	
        // 11.
		if string(patchBytes) == "{}" && !o.shouldPrintObject() {
			printer, err := o.ToPrinter("unchanged")
			if err != nil {
				return err
			}
			if err = printer.PrintObj(info.Object, o.Out); err != nil {
				return err
			}
			return nil
		}
	}
	return nil
}

代码分解如下:

  1. 参数补全, 验证,执行等
  2. 解析用户输入
  3. 构造参数解析对象,最后返回Result.Infos对象
  4. 构造一个helper对象,因为有Server Side Apply和Client Side Apply的区别,所以只构造通用的部分
  5. 如果用户选择Server Side Apply就让服务端patch, 即更新与当前对象不同的部分
  6. 构造一个修改后副本, 主要是增加注解(Annotation)
  7. info.Get()会获取服务端的最新版本的对象
  8. 判断错误是否是NotFound, 如果NotFound说明对象还不存在,所以后续创建并退出
  9. 增加对象的注释,并创建对象
  10. 构造一个Patch对象,用于更新对象
  11. 判断对象是否有变更。

默认情况下, kubectl使用的是Client Side Apply, Server Side Apply和Client Side Apply的区别主要是,由谁来比对修改的部分,总是使用前者会对服务端造成一定压力,但是使用后者其实有时候会出现冲突的情况,大家可以参考这篇文章: https://kubernetes.io/docs/reference/using-api/server-side-apply/

因为apply命令会判断对象是否存在,所以可以替代create命令,kubectl最常使用的命令应该就是kubectl apply了。

Patch

最后看看patch的部分

func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
	var getErr error
    // 1.
	patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
	if p.Retries == 0 {
		p.Retries = maxPatchRetry
	}
    // 2.
	for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
		current, getErr = p.Helper.Get(namespace, name)
		patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
	}
	if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force {
		patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
	}
	return patchBytes, patchObject, err
}

代码分解如下:

  1. 这一步是对比最新的对象和用户输入的区别,这一步部分逻辑比较复杂,就不深入了,因为这部分代码几乎是纯算法的代码了
  2. 判断是否冲突,需要重试,因为有可能其他用户或者controller在更新同一个对象

查询

func NewCmdGet(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
	o := NewGetOptions(parent, streams)
	cmd := &cobra.Command{
		Run: func(cmd *cobra.Command, args []string) {
            // 1.
			cmdutil.CheckErr(o.Complete(f, cmd, args))
			cmdutil.CheckErr(o.Validate(cmd))
			cmdutil.CheckErr(o.Run(f, cmd, args))
		},
	}
}

func (o *GetOptions) Run(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
    //2.
	r := f.NewBuilder().
		Unstructured().
    NamespaceParam(o.Namespace).DefaultNamespace().AllNamespaces(o.AllNamespaces).
		FilenameParam(o.ExplicitNamespace, &o.FilenameOptions).
		LabelSelectorParam(o.LabelSelector).
		FieldSelectorParam(o.FieldSelector).
		RequestChunksOf(chunkSize).
		ResourceTypeOrNameArgs(true, args...).
		ContinueOnError().
		Latest().
		Flatten().
        // 3.
		TransformRequests(o.transformRequests).
		Do()

    // 4.
	infos, err := r.Infos()
	objs := make([]runtime.Object, len(infos))
	for ix := range infos {
		objs[ix] = infos[ix].Object
	}
	
    // 5.
	var printer printers.ResourcePrinter
	var lastMapping *meta.RESTMapping
	trackingWriter := &trackingWriterWrapper{Delegate: o.Out}
	separatorWriter := &separatorWriterWrapper{Delegate: trackingWriter}
	w := printers.GetNewTabWriter(separatorWriter)
    
    // 6.
	for ix := range objs {
		var mapping *meta.RESTMapping
		var info *resource.Info
		if positioner != nil {
			info = infos[positioner.OriginalPosition(ix)]
			mapping = info.Mapping
		} else {
			info = infos[ix]
			mapping = info.Mapping
		}
		printer.PrintObj(info.Object, w)
	}
	w.Flush()
	return utilerrors.NewAggregate(allErrs)
}


func (o *GetOptions) transformRequests(req *rest.Request) {
    // 7.
	req.SetHeader("Accept", strings.Join([]string{
		fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
		fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1beta1.SchemeGroupVersion.Version, metav1beta1.GroupName),
		"application/json",
	}, ","))
}

代码分解如下:

  1. 无需多说
  2. 构造一个Builder, 这个也是老模式了
  3. 加一个转换请求的函数,用来设置Accept请求头
  4. 将对象包装一下, 变成了runtime.Object类型
  5. 构造printer对象,用于输出结果,默认是tablePrinter
  6. 依次打印
  7. 设置http请求头

如果你直接获取对象你会发现,只有对象的数据,而没有一些元数据,比如columnDefinitions, 而kubectl get命令就是通过这个字段来构建输出的表格的,以pod为例,它的columnDefinitions字段的值是[{"name":"Name","type":"string","format":"name","description":"Name must be unique within a namespace. Is required when creating resources, although some resources may allow a client to request the generation of an appropriate name automatically. Name is primarily intended for creation idempotence and configuration definition. Cannot be updated. More info: http://kubernetes.io/docs/user-guide/identifiers#names","priority":0},..省略...]

通过这些信息就可以确定表格的表头了。

总结

kubectl的大多数命令的代码结构是差不多的,首先构造Result对象,该对象是对用户输入的一个抽象,每个资源对象用Info对象来表示,在解析过程中会创建对应的mappingClientmapping是gvk和gvr的对应关系,gvr可以用来设置请求路径,而gvk可以确定golang中的数据类型, 而client是一个客户端用来跟服务端交互。