Kubectl源码阅读

文章目录

如果不知道怎么用client-go,那就看看kubectl的代码吧,kubectl除了是一个比较优秀的命令行工具之外,还提供了比较好的代码实现,通过看kubectl的代码我们可以抄到很多有用的代码片段。

kubernetes代码版本: v1.20.2

本文主要讲解kubectl增删改查的代码。

快速入门

一般来说kubectl最常用的操作就是对资源的增删改查。

# 创建资源
kubectl create -f test.yaml

# 删除资源
kubectl delete -f test.yaml

# 更新资源, 我们也可以用apply来创建资源
kubectl apply -f test.yaml

# 获取pod列表
kubectl get pods

代码结构

如果使用过cobra,那么对kubectl的代码结构不会陌生,因为kubectl就是使用cobra来组织各个命令的。

当然了,kubernetes/cmd下的所有项目,几乎都是使用的cobra,所以代码入口基本都是一致的。

func main() {
    // 1.
	command := cmd.NewDefaultKubectlCommand()
    // 2.
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

func NewDefaultKubectlCommand() *cobra.Command {
	return NewDefaultKubectlCommandWithArgs(NewDefaultPluginHandler(plugin.ValidPluginFilenamePrefixes), os.Args, os.Stdin, os.Stdout, os.Stderr)
}


func NewDefaultKubectlCommandWithArgs(pluginHandler PluginHandler, args []string, in io.Reader, out, errout io.Writer) *cobra.Command {
	cmd := NewKubectlCommand(in, out, errout)
    // 检查是否是调用插件..., 
}


func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
    // 3.
	cmds := &cobra.Command{
		Use:   "kubectl",
		Short: i18n.T("kubectl controls the Kubernetes cluster manager"),,
		Run: runHelp,
		BashCompletionFunction: bashCompletionFunc,
	}
	// 4.
	flags := cmds.PersistentFlags()
	kubeConfigFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag()
	kubeConfigFlags.AddFlags(flags)
    // k8s老传统了一层套一层,即使很小的改变也不会直接修改原对象
	matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
	matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())
    // 5.
	f := cmdutil.NewFactory(matchVersionKubeConfigFlags)
	ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}
    
    // 6.
	groups := templates.CommandGroups{
		{
			Message: "Basic Commands (Beginner):",
			Commands: []*cobra.Command{
				create.NewCmdCreate(f, ioStreams),
			},
		},
		{
			Message: "Basic Commands (Intermediate):",
			Commands: []*cobra.Command{
				get.NewCmdGet("kubectl", f, ioStreams),
				delete.NewCmdDelete(f, ioStreams),
			},
		},
		{
			Message: "Advanced Commands:",
			Commands: []*cobra.Command{
				apply.NewCmdApply("kubectl", f, ioStreams),
			},
		}
	}
	groups.Add(cmds)
	return cmds
}

代码分解如下:

  1. 初始化命令行
  2. 执行命令
  3. kubectl根命令的配置, 作为所有子命令的父命令
  4. 获取cobra.CommandFlagSet对象, 用于后续在它上面追加各种参数,比如这里的--kubeconfig参数
  5. 将获取kubeconfigConfigFlags再次封装,以及封装标准输出和输入
  6. 将子命令分组并添加到kubectl命令中

如果使用过kubectl或者二次开发过k8s,应该对kubeconfig不默认,一般来说我们总会准备一个kubeconfig来连接集群, kubectl也不例外,它会提供一个--kubeconfig的可选参数用于配置kubeconfig, 默认情况下会读取~/.kube/config

总的来说kubectl的根命令做了两件事情,一是将所有命令组织起来,二是配置全局参数(比如--kubeconfig)。

而这个--kubeconfig参数对应的kubeConfigFlags会负责加载kubeconfig并提供一系列的帮助函数,比如

// 查询k8s集群资源列表
func (*ConfigFlags).ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error)
// 可以生成client的rest.Config
func (*ConfigFlags).ToRESTConfig() (*rest.Config, error)
// 用于映射gvr到gvk的对象
func (*ConfigFlags).ToRESTMapper() (meta.RESTMapper, error)

有了这三个方法,就可以很方便的跟集群交互了。

如果对上述三个对象还是不太熟悉的同学,可以看看我之前client-go的系列文章: https://youerning.top/tags/client-go

创建

基于上面的代码结构,我们可以很容易的找到create命令对应的代码.

func NewCmdCreate(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
    // 1.
	o := NewCreateOptions(ioStreams)
	cmd := &cobra.Command{
		Use:                   "create -f FILENAME",
		DisableFlagsInUseLine: true,
		Short:                 i18n.T("Create a resource from a file or from stdin."),
		Long:                  createLong,
		Example:               createExample,
        // 2.
		Run: func(cmd *cobra.Command, args []string) {
            // cmdutil.CheckErr的逻辑主要就是判断是否出错并退出。
            // 3.
			cmdutil.CheckErr(o.Complete(f, cmd))
            // 4.
			cmdutil.CheckErr(o.ValidateArgs(cmd, args))
            // 5.
			cmdutil.CheckErr(o.RunCreate(f, cmd))
		},
    }
    
    // 增加子命令和参数等
}

代码分解如下:

  1. 几乎所有kubectl子命令都会构建一个Options对象用于绑定命令行参数
  2. cobra代码运行入口
  3. 补全参数以及构建必要的对象
  4. 验证命令行参数是否合法
  5. 创建资源的代码入口
func (o *CreateOptions) RunCreate(f cmdutil.Factory, cmd *cobra.Command) error {
	// 1.
	schema, err := f.Validator(cmdutil.GetFlagBool(cmd, "validate"))
	cmdNamespace, enforceNamespace, err := f.ToRawKubeConfigLoader().Namespace()

    // 2.
	r := f.NewBuilder().
		Unstructured().
		Schema(schema).
		ContinueOnError().
		NamespaceParam(cmdNamespace).DefaultNamespace().
		FilenameParam(enforceNamespace, &o.FilenameOptions).
		LabelSelectorParam(o.Selector).
		Flatten().
		Do()

	count := 0
    // 3.
	err = r.Visit(func(info *resource.Info, err error) error {
        // 4.
		if err := util.CreateOrUpdateAnnotation(cmdutil.GetFlagBool(cmd, cmdutil.ApplyAnnotationsFlag), info.Object, scheme.DefaultJSONEncoder()); err != nil {
			return cmdutil.AddSourceToErr("creating", info.Source, err)
		}

        // DryRun试运行, 可以在不修改目标对象的情况下验证功能
		if o.DryRunStrategy != cmdutil.DryRunClient {
            // 5.
			obj, err := resource.
				NewHelper(info.Client, info.Mapping).
				DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
				WithFieldManager(o.fieldManager).
				Create(info.Namespace, true, info.Object)
            // 6.
			info.Refresh(obj, true)
		}

		count++
        // 7.
		return o.PrintObj(info.Object)
	})
	return nil
}

代码分解如下:

  1. 获取资源验证器(scheme), 通过集群的/openapi/v2地址获取json schema validator, 可以用来验证输入是否合法
  2. 构建者(builder)的设计模式,将参数通过各个方法传入,最后Do方法用来执行,返回一个访问者vistor设计模式的对象, 它提供Visit方法来访问构造的对象
  3. 通过Visit方法遍历info对象
  4. 更新注释: kubectl.kubernetes.io/last-applied-configuration
  5. 创建对象
  6. 刷新对象,用于后续打印结果
  7. 打印结果

这段代码有两个对象比较重要,Builder, Visitor, 两者分别对应构建者设计模式和访问者设计模式,如果对这两个设计模式不太熟悉的同学,可以先搜索并学习一下,这里就不讲两者的代码思路了。

Builder

r := f.NewBuilder().
		Unstructured().
		Schema(schema).
		ContinueOnError().
		NamespaceParam(cmdNamespace).DefaultNamespace().
		// 除了FilenameParam, 其他方法都是简单的设置一个熟悉
		FilenameParam(enforceNamespace, &o.FilenameOptions).
		LabelSelectorParam(o.Selector).
		Flatten().
		Do()


func (b *Builder) FilenameParam(enforceNamespace bool, filenameOptions *FilenameOptions) *Builder {
	recursive := filenameOptions.Recursive
	paths := filenameOptions.Filenames
	for _, s := range paths {
		switch {
		case s == "-":
			b.Stdin()
		case strings.Index(s, "http://") == 0 || strings.Index(s, "https://") == 0:
			url, err := url.Parse(s)
			b.URL(defaultHttpGetAttempts, url)
		default:
			if !recursive {
				b.singleItemImplied = true
			}
            // 1.
			b.Path(recursive, s)
		}
	}
	return b
}

func (b *Builder) Path(recursive bool, paths ...string) *Builder {
	for _, p := range paths {
        //检查文件是否存在
        
        // 2.
		visitors, err := ExpandPathsToFileVisitors(b.mapper, p, recursive, FileExtensions, b.schema)
		b.paths = append(b.paths, visitors...)
	}
	return b
}

func ExpandPathsToFileVisitors(mapper *mapper, paths string, recursive bool, extensions []string, schema ContentValidator) ([]Visitor, error) {
	var visitors []Visitor
	err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
        
        // 3.
		visitor := &FileVisitor{
			Path:          path,
			StreamVisitor: NewStreamVisitor(nil, mapper, path, schema),
		}

		visitors = append(visitors, visitor)
		return nil
	})

	return visitors, nil
}

代码分解如下:

  1. 解析传入的文件路径
  2. 将路径包装一下
  3. 最终包装成FileVisitor,该对象在调用Visit方法时会解析并验证传入的yaml文件。

Visitor

由于k8s支持比较多的场景,所以visitor包了一层又一层。

func (b *Builder) Do() *Result {
    // 返回一个Result对象, 它也备包裹了许多层
    r := b.visitorResult()
    if b.flatten {
		r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
	}
	helpers := []VisitorFunc{}
    // 设置namespace,如果没有的话
	if b.defaultNamespace {
		helpers = append(helpers, SetNamespace(b.namespace))
	}
    // 检查namespace是否一致
	if b.requireNamespace {
		helpers = append(helpers, RequireNamespace(b.namespace))
	}
    // 忽略没有namespace作用于的对象,即kubectl api-resources的NAMESPACED字段
	helpers = append(helpers, FilterNamespace)
    // RetrieveLazy当对象是空的时候回去请求对象
	if b.requireObject {
		helpers = append(helpers, RetrieveLazy)
	}
    // 遇到错误是否继续的包装器
	if b.continueOnError {
		r.visitor = NewDecoratedVisitor(ContinueOnErrorVisitor{r.visitor}, helpers...)
	} else {
		r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
	}
}

可以看到visitor被包装了非常多的层,所以如果调用Visit会是这样的一个调用链

vistor调用链

可以看到,真正干活的StreamVisitor调用路径非常深。

为了简单起见,我们可以直接跳过前面的控制流程的visitor, 那么代码如下:

err = r.Visit(func(info *resource.Info, err error) error {
	// 获取StreamVisitor解析的info对象并创建对应的资源,后文详细说明
}
              
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
	d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
	for {
        // 1.
		ext := runtime.RawExtension{}
		if err := d.Decode(&ext); err != nil {
			if err == io.EOF {
				return nil
			}
			return fmt.Errorf("error parsing %s: %v", v.Source, err)
		}
		ext.Raw = bytes.TrimSpace(ext.Raw)
        // 2.
		if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
			return fmt.Errorf("error validating %q: %v", v.Source, err)
		}
        // 3.
		info, err := v.infoForData(ext.Raw, v.Source)
		if err := fn(info, nil); err != nil {
			return err
		}
	}
}

func (m *mapper) infoForData(data []byte, source string) (*Info, error) {
    // 4.
	obj, gvk, err := m.decoder.Decode(data, nil, nil)
	name, _ := metadataAccessor.Name(obj)
	namespace, _ := metadataAccessor.Namespace(obj)
	resourceVersion, _ := metadataAccessor.ResourceVersion(obj)

	ret := &Info{
		Source:          source,
		Namespace:       namespace,
		Name:            name,
		ResourceVersion: resourceVersion,
		Object: obj,
	}

	if m.localFn == nil || !m.localFn() {
        // 5.
		restMapper, err := m.restMapperFn()
		mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
		ret.Mapping = mapping
		client, err := m.clientFn(gvk.GroupVersion())
		ret.Client = client
	}

	return ret, nil
}

代码分解如下:

  1. 尝试解析yaml文件,并读取第一个资源定义对象

    因为yaml文件可以同时定义多个资源, 所以是一个循环来解析用户输入的yaml文件

  2. 验证yaml文件定义的各个字段是否合法

  3. 将yaml文件转成一个info对象

  4. 首先解码成一个runtime.Object, 这里应该是Unstructured

  5. 得到restmapper对象并获取对应的mapping, 即gvr和gvk的映射关系, 还有就是绑定一个client

最后就是创建的核心逻辑

obj, err := resource.
    NewHelper(info.Client, info.Mapping).
    DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
    WithFieldManager(o.fieldManager).
    // 除了Create方法,其他都是构造方法,简单的设置对应的字段值而已
    Create(info.Namespace, true, info.Object)

func (m *Helper) Create(namespace string, modify bool, obj runtime.Object) (runtime.Object, error) {
	return m.CreateWithOptions(namespace, modify, obj, nil)
}

func (m *Helper) CreateWithOptions(namespace string, modify bool, obj runtime.Object, options *metav1.CreateOptions) (runtime.Object, error) {
	if options == nil {
		options = &metav1.CreateOptions{}
	}
	return m.createResource(m.RESTClient, m.Resource, namespace, obj, options)
}

func (m *Helper) createResource(c RESTClient, resource, namespace string, obj runtime.Object, options *metav1.CreateOptions) (runtime.Object, error) {
	return c.Post().
		NamespaceIfScoped(namespace, m.NamespaceScoped).
		Resource(resource).
		VersionedParams(options, metav1.ParameterCodec).
		Body(obj).
		Do(context.TODO()).
		Get()
}

上面的代码还是比较清晰的,最后还是回到RESTClient的构造,如果熟悉client-go的话应该不陌生。至此,整个调用过程就结束了。

由于文章太长了,所以在此截断。