kubectl源码阅读2
书接上回,继续看kubectl 删除, 更新, 查询操作的源代码。因为之前已经介绍了kubectl一些常用的设计模式和代码结构,所以本文会比较快的阅读其他操作的代码。
删除
首先看删除的代码流程吧。
func NewCmdDelete(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
deleteFlags := NewDeleteCommandFlags("containing the resource to delete.")
cmd := &cobra.Command{
Use: "delete ([-f FILENAME] | [-k DIRECTORY] | TYPE [(NAME | -l label | --all)])",
Run: func(cmd *cobra.Command, args []string) {
// 1.
cmdutil.CheckErr(o.Complete(f, args, cmd))
cmdutil.CheckErr(o.Validate())
// 2.
cmdutil.CheckErr(o.RunDelete(f))
},
}
return cmd
}
func (o *DeleteOptions) Complete(f cmdutil.Factory, args []string, cmd *cobra.Command) error {
// 3.
if len(o.Raw) == 0 {
r := f.NewBuilder().
Unstructured().
ContinueOnError().
NamespaceParam(cmdNamespace).DefaultNamespace().
FilenameParam(enforceNamespace, &o.FilenameOptions).
LabelSelectorParam(o.LabelSelector).
FieldSelectorParam(o.FieldSelector).
SelectAllParam(o.DeleteAll).
AllNamespaces(o.DeleteAllNamespaces).
ResourceTypeOrNameArgs(false, args...).RequireObject(false).
Flatten().
Do()
o.Result = r
o.Mapper, err = f.ToRESTMapper()
o.DynamicClient, err = f.DynamicClient()
}
return nil
}
func (o *DeleteOptions) RunDelete(f cmdutil.Factory) error {
return o.DeleteResult(o.Result)
}
func (o *DeleteOptions) DeleteResult(r *resource.Result) error {
// 4.
err := r.Visit(func(info *resource.Info, err error) error {
// 5.
options := &metav1.DeleteOptions{}
if o.GracePeriod >= 0 {
options = metav1.NewDeleteOptions(int64(o.GracePeriod))
}
options.PropagationPolicy = &o.CascadingStrategy
// 6.
response, err := o.deleteResource(info, options)
resourceLocation := cmdwait.ResourceLocation{
GroupResource: info.Mapping.Resource.GroupResource(),
Namespace: info.Namespace,
Name: info.Name,
}
if status, ok := response.(*metav1.Status); ok && status.Details != nil {
uidMap[resourceLocation] = status.Details.UID
return nil
}
responseMetadata, err := meta.Accessor(response)
uidMap[resourceLocation] = responseMetadata.GetUID()
return nil
})
waitOptions := cmdwait.WaitOptions{
ResourceFinder: genericclioptions.ResourceFinderForResult(resource.InfoListVisitor(deletedInfos)),
UIDMap: uidMap,
DynamicClient: o.DynamicClient,
Timeout: effectiveTimeout,
Printer: printers.NewDiscardingPrinter(),
ConditionFn: cmdwait.IsDeleted,
IOStreams: o.IOStreams,
}
// 7.
err = waitOptions.RunWait()
return err
}
代码分解如下:
- 填充参数等
- 业务逻辑入口
- 删除操作将
Result
对象的构建过程放在了这,有点不和谐呀,感觉放在DeleteResult中也没啥问题 - 遍历解析的infos列表
- 设置删除的参数,优雅退出时间,删除策略等。
- 删除资源操作,后面的逻辑并不复杂,所以不深入了
- 等待删除资源是否删除
这里面稍微比较复杂的是等待的逻辑。
更新
代码如下:
func NewCmdApply(baseName string, f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
o := NewApplyOptions(ioStreams)
cmd := &cobra.Command{
Use: "apply (-f FILENAME | -k DIRECTORY)",
Run: func(cmd *cobra.Command, args []string) {
// 1.
cmdutil.CheckErr(o.Complete(f, cmd))
cmdutil.CheckErr(validateArgs(cmd, args))
cmdutil.CheckErr(validatePruneAll(o.Prune, o.All, o.Selector))
cmdutil.CheckErr(o.Run())
},
}
}
func (o *ApplyOptions) Run() error {
// 2.
infos, err := o.GetObjects()
for _, info := range infos {
if err := o.applyOneObject(info); err != nil {
errs = append(errs, err)
}
}
return nil
}
func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) {
var err error = nil
if !o.objectsCached {
// 3.
r := o.Builder.
Unstructured().
Schema(o.Validator).
ContinueOnError().
NamespaceParam(o.Namespace).DefaultNamespace().
FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()
o.objects, err = r.Infos()
o.objectsCached = true
}
return o.objects, err
}
func (o *ApplyOptions) applyOneObject(info *resource.Info) error {
// 4.
helper := resource.NewHelper(info.Client, info.Mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.FieldManager)
// 5.
if o.ServerSideApply {
// 服务端apply的逻辑, 成功或者出错都退出
}
// 6.
modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
// 7.
if err := info.Get(); err != nil {
// 8.
if !errors.IsNotFound(err) {
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving current configuration of:\n%s\nfrom server for:", info.String()), info.Source, err)
}
// 9.
if err := util.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil {
return cmdutil.AddSourceToErr("creating", info.Source, err)
}
if o.DryRunStrategy != cmdutil.DryRunClient {
obj, err := helper.Create(info.Namespace, true, info.Object)
info.Refresh(obj, true)
}
return nil
}
if err := o.MarkObjectVisited(info); err != nil {
return err
}
if o.DryRunStrategy != cmdutil.DryRunClient {
// 10.
patcher, err := newPatcher(o, info, helper)
patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
info.Refresh(patchedObject, true)
// 11.
if string(patchBytes) == "{}" && !o.shouldPrintObject() {
printer, err := o.ToPrinter("unchanged")
if err != nil {
return err
}
if err = printer.PrintObj(info.Object, o.Out); err != nil {
return err
}
return nil
}
}
return nil
}
代码分解如下:
- 参数补全, 验证,执行等
- 解析用户输入
- 构造参数解析对象,最后返回
Result.Infos
对象 - 构造一个helper对象,因为有Server Side Apply和Client Side Apply的区别,所以只构造通用的部分
- 如果用户选择Server Side Apply就让服务端patch, 即更新与当前对象不同的部分
- 构造一个修改后副本, 主要是增加注解(Annotation)
info.Get()
会获取服务端的最新版本的对象- 判断错误是否是
NotFound
, 如果NotFound
说明对象还不存在,所以后续创建并退出 - 增加对象的注释,并创建对象
- 构造一个
Patch
对象,用于更新对象 - 判断对象是否有变更。
默认情况下, kubectl使用的是Client Side Apply, Server Side Apply和Client Side Apply的区别主要是,由谁来比对修改的部分,总是使用前者会对服务端造成一定压力,但是使用后者其实有时候会出现冲突的情况,大家可以参考这篇文章: https://kubernetes.io/docs/reference/using-api/server-side-apply/
因为apply命令会判断对象是否存在,所以可以替代create命令,kubectl最常使用的命令应该就是kubectl apply
了。
Patch
最后看看patch的部分
func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
var getErr error
// 1.
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
if p.Retries == 0 {
p.Retries = maxPatchRetry
}
// 2.
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
current, getErr = p.Helper.Get(namespace, name)
patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
}
if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force {
patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
}
return patchBytes, patchObject, err
}
代码分解如下:
- 这一步是对比最新的对象和用户输入的区别,这一步部分逻辑比较复杂,就不深入了,因为这部分代码几乎是纯算法的代码了
- 判断是否冲突,需要重试,因为有可能其他用户或者controller在更新同一个对象
查询
func NewCmdGet(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
o := NewGetOptions(parent, streams)
cmd := &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
// 1.
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.Validate(cmd))
cmdutil.CheckErr(o.Run(f, cmd, args))
},
}
}
func (o *GetOptions) Run(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
//2.
r := f.NewBuilder().
Unstructured().
NamespaceParam(o.Namespace).DefaultNamespace().AllNamespaces(o.AllNamespaces).
FilenameParam(o.ExplicitNamespace, &o.FilenameOptions).
LabelSelectorParam(o.LabelSelector).
FieldSelectorParam(o.FieldSelector).
RequestChunksOf(chunkSize).
ResourceTypeOrNameArgs(true, args...).
ContinueOnError().
Latest().
Flatten().
// 3.
TransformRequests(o.transformRequests).
Do()
// 4.
infos, err := r.Infos()
objs := make([]runtime.Object, len(infos))
for ix := range infos {
objs[ix] = infos[ix].Object
}
// 5.
var printer printers.ResourcePrinter
var lastMapping *meta.RESTMapping
trackingWriter := &trackingWriterWrapper{Delegate: o.Out}
separatorWriter := &separatorWriterWrapper{Delegate: trackingWriter}
w := printers.GetNewTabWriter(separatorWriter)
// 6.
for ix := range objs {
var mapping *meta.RESTMapping
var info *resource.Info
if positioner != nil {
info = infos[positioner.OriginalPosition(ix)]
mapping = info.Mapping
} else {
info = infos[ix]
mapping = info.Mapping
}
printer.PrintObj(info.Object, w)
}
w.Flush()
return utilerrors.NewAggregate(allErrs)
}
func (o *GetOptions) transformRequests(req *rest.Request) {
// 7.
req.SetHeader("Accept", strings.Join([]string{
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1beta1.SchemeGroupVersion.Version, metav1beta1.GroupName),
"application/json",
}, ","))
}
代码分解如下:
- 无需多说
- 构造一个
Builder
, 这个也是老模式了 - 加一个转换请求的函数,用来设置
Accept
请求头 - 将对象包装一下, 变成了
runtime.Object
类型 - 构造printer对象,用于输出结果,默认是tablePrinter
- 依次打印
- 设置http请求头
如果你直接获取对象你会发现,只有对象的数据,而没有一些元数据,比如columnDefinitions, 而kubectl get
命令就是通过这个字段来构建输出的表格的,以pod为例,它的columnDefinitions字段的值是[{"name":"Name","type":"string","format":"name","description":"Name must be unique within a namespace. Is required when creating resources, although some resources may allow a client to request the generation of an appropriate name automatically. Name is primarily intended for creation idempotence and configuration definition. Cannot be updated. More info: http://kubernetes.io/docs/user-guide/identifiers#names","priority":0},..省略...]
通过这些信息就可以确定表格的表头了。
总结
kubectl的大多数命令的代码结构是差不多的,首先构造Result
对象,该对象是对用户输入的一个抽象,每个资源对象用Info
对象来表示,在解析过程中会创建对应的mapping
和Client
,mapping
是gvk和gvr的对应关系,gvr可以用来设置请求路径,而gvk可以确定golang中的数据类型, 而client是一个客户端用来跟服务端交互。