Kubectl源码阅读
如果不知道怎么用client-go,那就看看kubectl的代码吧,kubectl除了是一个比较优秀的命令行工具之外,还提供了比较好的代码实现,通过看kubectl的代码我们可以抄到很多有用的代码片段。
kubernetes代码版本: v1.20.2
本文主要讲解kubectl增删改查的代码。
快速入门
一般来说kubectl最常用的操作就是对资源的增删改查。
# 创建资源
kubectl create -f test.yaml
# 删除资源
kubectl delete -f test.yaml
# 更新资源, 我们也可以用apply来创建资源
kubectl apply -f test.yaml
# 获取pod列表
kubectl get pods
代码结构
如果使用过cobra,那么对kubectl的代码结构不会陌生,因为kubectl就是使用cobra来组织各个命令的。
当然了,kubernetes/cmd下的所有项目,几乎都是使用的cobra,所以代码入口基本都是一致的。
func main() {
// 1.
command := cmd.NewDefaultKubectlCommand()
// 2.
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
func NewDefaultKubectlCommand() *cobra.Command {
return NewDefaultKubectlCommandWithArgs(NewDefaultPluginHandler(plugin.ValidPluginFilenamePrefixes), os.Args, os.Stdin, os.Stdout, os.Stderr)
}
func NewDefaultKubectlCommandWithArgs(pluginHandler PluginHandler, args []string, in io.Reader, out, errout io.Writer) *cobra.Command {
cmd := NewKubectlCommand(in, out, errout)
// 检查是否是调用插件...,
}
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
// 3.
cmds := &cobra.Command{
Use: "kubectl",
Short: i18n.T("kubectl controls the Kubernetes cluster manager"),,
Run: runHelp,
BashCompletionFunction: bashCompletionFunc,
}
// 4.
flags := cmds.PersistentFlags()
kubeConfigFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag()
kubeConfigFlags.AddFlags(flags)
// k8s老传统了一层套一层,即使很小的改变也不会直接修改原对象
matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())
// 5.
f := cmdutil.NewFactory(matchVersionKubeConfigFlags)
ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}
// 6.
groups := templates.CommandGroups{
{
Message: "Basic Commands (Beginner):",
Commands: []*cobra.Command{
create.NewCmdCreate(f, ioStreams),
},
},
{
Message: "Basic Commands (Intermediate):",
Commands: []*cobra.Command{
get.NewCmdGet("kubectl", f, ioStreams),
delete.NewCmdDelete(f, ioStreams),
},
},
{
Message: "Advanced Commands:",
Commands: []*cobra.Command{
apply.NewCmdApply("kubectl", f, ioStreams),
},
}
}
groups.Add(cmds)
return cmds
}
代码分解如下:
- 初始化命令行
- 执行命令
- kubectl根命令的配置, 作为所有子命令的父命令
- 获取
cobra.Command
的FlagSet
对象, 用于后续在它上面追加各种参数,比如这里的--kubeconfig
参数 - 将获取
kubeconfig
的ConfigFlags
再次封装,以及封装标准输出和输入 - 将子命令分组并添加到kubectl命令中
如果使用过kubectl或者二次开发过k8s,应该对kubeconfig不默认,一般来说我们总会准备一个kubeconfig来连接集群, kubectl也不例外,它会提供一个--kubeconfig
的可选参数用于配置kubeconfig, 默认情况下会读取~/.kube/config
。
总的来说kubectl的根命令做了两件事情,一是将所有命令组织起来,二是配置全局参数(比如--kubeconfig
)。
而这个--kubeconfig
参数对应的kubeConfigFlags
会负责加载kubeconfig并提供一系列的帮助函数,比如
// 查询k8s集群资源列表
func (*ConfigFlags).ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error)
// 可以生成client的rest.Config
func (*ConfigFlags).ToRESTConfig() (*rest.Config, error)
// 用于映射gvr到gvk的对象
func (*ConfigFlags).ToRESTMapper() (meta.RESTMapper, error)
有了这三个方法,就可以很方便的跟集群交互了。
如果对上述三个对象还是不太熟悉的同学,可以看看我之前client-go的系列文章: https://youerning.top/tags/client-go
创建
基于上面的代码结构,我们可以很容易的找到create命令对应的代码.
func NewCmdCreate(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
// 1.
o := NewCreateOptions(ioStreams)
cmd := &cobra.Command{
Use: "create -f FILENAME",
DisableFlagsInUseLine: true,
Short: i18n.T("Create a resource from a file or from stdin."),
Long: createLong,
Example: createExample,
// 2.
Run: func(cmd *cobra.Command, args []string) {
// cmdutil.CheckErr的逻辑主要就是判断是否出错并退出。
// 3.
cmdutil.CheckErr(o.Complete(f, cmd))
// 4.
cmdutil.CheckErr(o.ValidateArgs(cmd, args))
// 5.
cmdutil.CheckErr(o.RunCreate(f, cmd))
},
}
// 增加子命令和参数等
}
代码分解如下:
- 几乎所有kubectl子命令都会构建一个Options对象用于绑定命令行参数
- cobra代码运行入口
- 补全参数以及构建必要的对象
- 验证命令行参数是否合法
- 创建资源的代码入口
func (o *CreateOptions) RunCreate(f cmdutil.Factory, cmd *cobra.Command) error {
// 1.
schema, err := f.Validator(cmdutil.GetFlagBool(cmd, "validate"))
cmdNamespace, enforceNamespace, err := f.ToRawKubeConfigLoader().Namespace()
// 2.
r := f.NewBuilder().
Unstructured().
Schema(schema).
ContinueOnError().
NamespaceParam(cmdNamespace).DefaultNamespace().
FilenameParam(enforceNamespace, &o.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()
count := 0
// 3.
err = r.Visit(func(info *resource.Info, err error) error {
// 4.
if err := util.CreateOrUpdateAnnotation(cmdutil.GetFlagBool(cmd, cmdutil.ApplyAnnotationsFlag), info.Object, scheme.DefaultJSONEncoder()); err != nil {
return cmdutil.AddSourceToErr("creating", info.Source, err)
}
// DryRun试运行, 可以在不修改目标对象的情况下验证功能
if o.DryRunStrategy != cmdutil.DryRunClient {
// 5.
obj, err := resource.
NewHelper(info.Client, info.Mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.fieldManager).
Create(info.Namespace, true, info.Object)
// 6.
info.Refresh(obj, true)
}
count++
// 7.
return o.PrintObj(info.Object)
})
return nil
}
代码分解如下:
- 获取资源验证器(scheme), 通过集群的/openapi/v2地址获取
json schema validator
, 可以用来验证输入是否合法 - 构建者(
builder
)的设计模式,将参数通过各个方法传入,最后Do
方法用来执行,返回一个访问者vistor
设计模式的对象, 它提供Visit
方法来访问构造的对象 - 通过
Visit
方法遍历info
对象 - 更新注释:
kubectl.kubernetes.io/last-applied-configuration
- 创建对象
- 刷新对象,用于后续打印结果
- 打印结果
这段代码有两个对象比较重要,Builder, Visitor, 两者分别对应构建者设计模式和访问者设计模式,如果对这两个设计模式不太熟悉的同学,可以先搜索并学习一下,这里就不讲两者的代码思路了。
Builder
r := f.NewBuilder().
Unstructured().
Schema(schema).
ContinueOnError().
NamespaceParam(cmdNamespace).DefaultNamespace().
// 除了FilenameParam, 其他方法都是简单的设置一个熟悉
FilenameParam(enforceNamespace, &o.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()
func (b *Builder) FilenameParam(enforceNamespace bool, filenameOptions *FilenameOptions) *Builder {
recursive := filenameOptions.Recursive
paths := filenameOptions.Filenames
for _, s := range paths {
switch {
case s == "-":
b.Stdin()
case strings.Index(s, "http://") == 0 || strings.Index(s, "https://") == 0:
url, err := url.Parse(s)
b.URL(defaultHttpGetAttempts, url)
default:
if !recursive {
b.singleItemImplied = true
}
// 1.
b.Path(recursive, s)
}
}
return b
}
func (b *Builder) Path(recursive bool, paths ...string) *Builder {
for _, p := range paths {
//检查文件是否存在
// 2.
visitors, err := ExpandPathsToFileVisitors(b.mapper, p, recursive, FileExtensions, b.schema)
b.paths = append(b.paths, visitors...)
}
return b
}
func ExpandPathsToFileVisitors(mapper *mapper, paths string, recursive bool, extensions []string, schema ContentValidator) ([]Visitor, error) {
var visitors []Visitor
err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
// 3.
visitor := &FileVisitor{
Path: path,
StreamVisitor: NewStreamVisitor(nil, mapper, path, schema),
}
visitors = append(visitors, visitor)
return nil
})
return visitors, nil
}
代码分解如下:
- 解析传入的文件路径
- 将路径包装一下
- 最终包装成
FileVisitor
,该对象在调用Visit方法时会解析并验证传入的yaml文件。
Visitor
由于k8s支持比较多的场景,所以visitor包了一层又一层。
func (b *Builder) Do() *Result {
// 返回一个Result对象, 它也备包裹了许多层
r := b.visitorResult()
if b.flatten {
r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
}
helpers := []VisitorFunc{}
// 设置namespace,如果没有的话
if b.defaultNamespace {
helpers = append(helpers, SetNamespace(b.namespace))
}
// 检查namespace是否一致
if b.requireNamespace {
helpers = append(helpers, RequireNamespace(b.namespace))
}
// 忽略没有namespace作用于的对象,即kubectl api-resources的NAMESPACED字段
helpers = append(helpers, FilterNamespace)
// RetrieveLazy当对象是空的时候回去请求对象
if b.requireObject {
helpers = append(helpers, RetrieveLazy)
}
// 遇到错误是否继续的包装器
if b.continueOnError {
r.visitor = NewDecoratedVisitor(ContinueOnErrorVisitor{r.visitor}, helpers...)
} else {
r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
}
}
可以看到visitor被包装了非常多的层,所以如果调用Visit
会是这样的一个调用链
可以看到,真正干活的StreamVisitor
调用路径非常深。
为了简单起见,我们可以直接跳过前面的控制流程的visitor, 那么代码如下:
err = r.Visit(func(info *resource.Info, err error) error {
// 获取StreamVisitor解析的info对象并创建对应的资源,后文详细说明
}
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
// 1.
ext := runtime.RawExtension{}
if err := d.Decode(&ext); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("error parsing %s: %v", v.Source, err)
}
ext.Raw = bytes.TrimSpace(ext.Raw)
// 2.
if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
return fmt.Errorf("error validating %q: %v", v.Source, err)
}
// 3.
info, err := v.infoForData(ext.Raw, v.Source)
if err := fn(info, nil); err != nil {
return err
}
}
}
func (m *mapper) infoForData(data []byte, source string) (*Info, error) {
// 4.
obj, gvk, err := m.decoder.Decode(data, nil, nil)
name, _ := metadataAccessor.Name(obj)
namespace, _ := metadataAccessor.Namespace(obj)
resourceVersion, _ := metadataAccessor.ResourceVersion(obj)
ret := &Info{
Source: source,
Namespace: namespace,
Name: name,
ResourceVersion: resourceVersion,
Object: obj,
}
if m.localFn == nil || !m.localFn() {
// 5.
restMapper, err := m.restMapperFn()
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
ret.Mapping = mapping
client, err := m.clientFn(gvk.GroupVersion())
ret.Client = client
}
return ret, nil
}
代码分解如下:
-
尝试解析yaml文件,并读取第一个资源定义对象
因为yaml文件可以同时定义多个资源, 所以是一个循环来解析用户输入的yaml文件
-
验证yaml文件定义的各个字段是否合法
-
将yaml文件转成一个info对象
-
首先解码成一个
runtime.Object
, 这里应该是Unstructured
-
得到restmapper对象并获取对应的mapping, 即gvr和gvk的映射关系, 还有就是绑定一个client
最后就是创建的核心逻辑
obj, err := resource.
NewHelper(info.Client, info.Mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.fieldManager).
// 除了Create方法,其他都是构造方法,简单的设置对应的字段值而已
Create(info.Namespace, true, info.Object)
func (m *Helper) Create(namespace string, modify bool, obj runtime.Object) (runtime.Object, error) {
return m.CreateWithOptions(namespace, modify, obj, nil)
}
func (m *Helper) CreateWithOptions(namespace string, modify bool, obj runtime.Object, options *metav1.CreateOptions) (runtime.Object, error) {
if options == nil {
options = &metav1.CreateOptions{}
}
return m.createResource(m.RESTClient, m.Resource, namespace, obj, options)
}
func (m *Helper) createResource(c RESTClient, resource, namespace string, obj runtime.Object, options *metav1.CreateOptions) (runtime.Object, error) {
return c.Post().
NamespaceIfScoped(namespace, m.NamespaceScoped).
Resource(resource).
VersionedParams(options, metav1.ParameterCodec).
Body(obj).
Do(context.TODO()).
Get()
}
上面的代码还是比较清晰的,最后还是回到RESTClient
的构造,如果熟悉client-go的话应该不陌生。至此,整个调用过程就结束了。
由于文章太长了,所以在此截断。