k8s client-go快速入门教程及源代码阅读之RESTClient

文章目录

之前无论是讲解静态客户端还是动态客户端都可以略过了client-go底层的RESTClient的构造过程,这篇文章主要分析RESTClient的构造和请求过程,k8s的API都遵循RESTFul规范,所以客户端自然可以以对应的约定形式构造,所以叫做RESTClient。

k8s的RESTClient请求过程中有两件比较重要的事情,一是通过GVR/GVK构造请求,二是将响应内容转换成特定的类型,前者无论是静态客户端还是动态客户端都是差不多的,区别只是在于传参的形式,后者的不同主要在于,转换成的目标类型不一样,静态客户端在调用的时候就能确定目标对象是什么,比如DeploymentList, Deployment等对象,而动态客户端就无法确定目标对象了,所以一般情况都是将响应转换成unstructured.UnstructuredList或者unstructured.Unstructured, 后续的是用需要自己断言。

由于类型转换也是一个复杂的过程,所以本文对于类型转化和编解码只是当做一个黑盒子来描述,后续在写文章说明

快速入门

首先通过一个静态客户端的例子来回顾一下client-go的请求过程

func main() {
	var kubeconfig *string
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)

	clientset, err := kubernetes.NewForConfig(config)

	// 传一个空的字符串代表不指定namespace,即所有namespace
	deploymentClient := clientset.AppsV1().Deployments("default")
	deployements, err := deploymentClient.List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		log.Fatal("获取deployment列表失败:", err)
	}
	for _, deployment := range deployements.Items {
		fmt.Println(deployment.Name, deployment.Namespace)
	}

}

请求

基于之前的文章,我们可以很容易定位到以下代码。

func (c *deployments) List(ctx context.Context, opts metav1.ListOptions) (result *v1.DeploymentList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
    // 1.
	result = &v1.DeploymentList{}
    // 2.
	err = c.client.Get().
		Namespace(c.ns).
		Resource("deployments").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
    	// 3.
		Do(ctx).
    	// 4.
		Into(result)
	return
}

虽然代码量不大,但是内容还是很多的,上面的代码可以分解为以下四个部分

  1. 创建一个空的目标对象DeploymentList, 用于后续将结果反序列。
  2. 构造请求, c.client.Get()做的事情比诸如Namespace, Resource等方法做的事情多的多,这里会创建一个新的Request对象, 以及做各种配置。
  3. 发送请求
  4. 将响应转换(反序列)成DeploymentList类型。

第一步自然不需要太多介绍,下面对后面三个部分依次介绍一下

构造请求

如果你熟悉go的net/http库的话,对这个部分应该不会陌生,除了简单的请求,如Get, Post等,都需要创建一个请求对象, 通过这个请求对象可以设置请求头,url参数,超时参数等等。

func (c *RESTClient) Get() *Request {
	return c.Verb("GET")
}

func (c *RESTClient) Verb(verb string) *Request {
	return NewRequest(c).Verb(verb)
}

func (r *Request) Verb(verb string) *Request {
	r.verb = verb
	return r
}

func NewRequest(c *RESTClient) *Request {
	var pathPrefix string
    // 1.
	if c.base != nil {
		pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
	} else {
		pathPrefix = path.Join("/", c.versionedAPIPath)
	}

    // 2.
	var timeout time.Duration
	if c.Client != nil {
		timeout = c.Client.Timeout
	}
	
    // 3.
	r := &Request{
		c:              c,
		rateLimiter:    c.rateLimiter,
		backoff:        backoff,
		timeout:        timeout,
		pathPrefix:     pathPrefix,
		maxRetries:     10,
		warningHandler: c.warningHandler,
	}
	
    // 4.
	switch {
	case len(c.content.AcceptContentTypes) > 0:
		r.SetHeader("Accept", c.content.AcceptContentTypes)
	case len(c.content.ContentType) > 0:
		r.SetHeader("Accept", c.content.ContentType+", */*")
	}
	return r
}
  1. 设置路径前缀, 比如/apis/apps/v1
  2. 设置超时
  3. 创建Request对象。注意,这个Request对象不是net/http里的Request对象
  4. 设置必要的http请求头Content-Type

这里之所以构造client-go自己的Request对象,因为可以基于自定义的类型封装更多的属性,比如回退函数,重试等。

发送请求

发送请求可能是整个请求的核心了。

func (r *Request) Do(ctx context.Context) Result {
	var result Result
    
	err := r.request(ctx, func(req *http.Request, resp *http.Response) {
        // 1.
		result = r.transformResponse(resp, req)
	})
	return result
}

基于上一步的Request, 可以调用request方法发送请求以及注入一个回调函数,这里的回调函数transformResponse是用于类型转换。所以整个逻辑可以分为两步,一是发送请求并获得响应,二是将响应转换成指定类型。

func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
	client := r.c.Client
	if client == nil {
		client = http.DefaultClient
	}

	// 1. 
	if err := r.tryThrottle(ctx); err != nil {
		return err
	}
	if r.timeout > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, r.timeout)
		defer cancel()
	}

	// 2. 
	retries := 0
	for {
		// 3.
		url := r.URL().String()
		req, err := http.NewRequest(r.verb, url, r.body)
		req = req.WithContext(ctx)
		req.Header = r.headers
		
        // 4.
		r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
		if retries > 0 {
			if err := r.tryThrottle(ctx); err != nil {
				return err
			}
		}
        // 5.
		resp, err := client.Do(req)
		done := func() bool {
			// 7.
			defer func() {
				const maxBodySlurpSize = 2 << 10
				if resp.ContentLength <= maxBodySlurpSize {
					io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
				}
				resp.Body.Close()
			}()

			retries++

            // 8.
			fn(req, resp)
			return true
		}()
        // 6.
		if done {
			return nil
		}
	}
}

代码分解如下:

  1. 请求限流, 默认是5 rps, 最大爆发量10 rps。
  2. 开启一个重试循环,成功就退出循环
  3. 创建http.Request对象,用于请求
  4. 回退,如果不是第一请求
  5. 开始发送请求,并获得响应
  6. 判断是否读取响应成功
  7. 保证响应内容被完全读取,这样才能安全的复用TCP连接
  8. 调用回调函数,这里是transformResponse

这里的第6步是一个十分常见的模式,将一系列逻辑封装在一个匿名函数中统一处理,这样做得好处可以减少判断的代码,这有点数据库中原子概念的意思,除非全部成功,否则就是失败,这种模式的另一个常见的地方是错误处理,即将一系列可能出错的逻辑封装在一个匿名函数中,这样就只需要判断一次了,两者异曲同工。

转换类型(反序列化)

类型转换在上一步作为另一个回调函数注入到了请求过程中,它的代码如下。

func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
	var body []byte
	if resp.Body != nil {
        // 1.
		data, err := ioutil.ReadAll(resp.Body)
		switch err.(type) {
		case nil:
			body = data
        // 2. 
		case http2.StreamError:
			streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
			return Result{
				err: streamErr,
			}
		default:
			unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err)
			return Result{
				err: unexpectedErr,
			}
		}
	}

	var decoder runtime.Decoder
	contentType := resp.Header.Get("Content-Type")
	if len(contentType) > 0 {
		var err error
		mediaType, params, err := mime.ParseMediaType(contentType)
        // 3. 
		decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
		if err != nil {
			return Result{
				body:        body,
				contentType: contentType,
				statusCode:  resp.StatusCode,
				warnings:    handleWarnings(resp.Header, r.warningHandler),
			}
		}
	}

	return Result{
		body:        body,
		contentType: contentType,
		statusCode:  resp.StatusCode,
		decoder:     decoder,
		warnings:    handleWarnings(resp.Header, r.warningHandler),
	}
}

代码分解如下:

  1. 读取响应体所有内容
  2. 非正常情况下的错误处理
  3. 获取decoder

前面两步其实倒不难理解,调用链比较长的是获取decoder, 而这个r.c.content.Negotiator对象是依赖Scheme, Scheme是k8s中各类型转换的核心,这里暂时略过,就当做一个黑盒子使用即可,它提供编解码的操作。

至此,类型转化完成了一半,即读取响应内容(字节流), 获得decoder。后续的代码在Into

err = c.client.Get().
		Namespace(c.ns).
		Resource("deployments").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)

func (r Result) Into(obj runtime.Object) error {
	out, _, err := r.decoder.Decode(r.body, nil, obj)
	return nil
}

k8s的转换机制是十分强大的,给一个目标对象就能还你一个对应类型的对象。这段代码不复杂是因为转换的逻辑都被隐藏了。

动态客户端

动态客户端与静态客户端的主要区别就是返回的时候无法直接返回特定类型对象,所以动态客户端的结果是封装在unstructured.UnstructuredListunstructured.Unstructured中的。

func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
    // 1.
	result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)

	retBytes, err := result.Raw()
    // 2.
	uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
    // 3.
	if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
		return list, nil
	}
	list, err := uncastObj.(*unstructured.Unstructured).ToList()
	return list, nil
}

代码分解如下:

  1. 这一步和动态客户端差不多,都是读取响应内容
  2. 解码
  3. 判断转换结果并返回

动态客户端的解码因为不需要判断要解码的目标类型,所以统一解码到UnstructuredListUnstructured

首先看看这两个对象的类型定义。

type UnstructuredList struct {
	Object map[string]interface{}
	Items []Unstructured `json:"items"`
}

type Unstructured struct {
	Object map[string]interface{}
}

可以看到,两者存储数据的底层都是map, 所以可以将任何合法对象(映射类型)反序列化。

解码过程如下:

// runtime.Decode
func Decode(d Decoder, data []byte) (Object, error) {
	obj, _, err := d.Decode(data, nil, nil)
	return obj, err
}
// d.Decode
func (s unstructuredJSONScheme) Decode(data []byte, _ *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
	var err error
	obj, err = s.decode(data)
	// 尝试获取gvk
	gvk := obj.GetObjectKind().GroupVersionKind()
	return obj, &gvk, nil
}

func (s unstructuredJSONScheme) decode(data []byte) (runtime.Object, error) {
	type detector struct {
		Items gojson.RawMessage
	}
	var det detector
    // 1.
	if err := json.Unmarshal(data, &det); err != nil {
		return nil, err
	}
	// 2. 
	if det.Items != nil {
		list := &UnstructuredList{}
		err := s.decodeToList(data, list)
		return list, err
	}

	// 3.
	unstruct := &Unstructured{}
	err := s.decodeToUnstructured(data, unstruct)
	return unstruct, err
}

代码分解如下:

  1. 反序列化
  2. 判断是否有items对象,如果有自然是List对象
  3. 不是List自然是Item

这里有一个非常有用的知识点,那就是gojson.RawMessage, **这个对象可以用来反序列化任何对象!!!!**也就是说,任何你不确定的数据结构都可以用它来作为占位符。

总结

虽然k8s提供了静态客户端和动态客户端,但是两者底层使用的都是RESTClient, 因为静态和动态的不同,两者的类型转换机制(序列化,反序列化)会稍显不同。

参考链接