k8s client-go快速入门教程及源代码阅读之Scheme编解码

文章目录

本文主要讲解k8s源码中基于Scheme的编解码机制, Scheme就像一个类型大管家,内部注册了各种GVK, 所以使用静态客户端的时候,我们才能无感的直接得到静态类型的指定对象,而不是像动态客户端得到一个Unstructured

scheme能做的事有很多,除了为编解码提供支持,还有类型转换(比如, v1转_internal), 比如默认值设置等。

本文除了讲解Scheme还会讲讲Unstructuredruntime.Object

快速入门

下面展示一个通过scheme获取对象GVK, 以及通过GVK获取对象实例的例子

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"reflect"

	appsv1 "k8s.io/api/apps/v1"
	"k8s.io/client-go/kubernetes/scheme"
)

func printJson(obj interface{}) {
	data, err := json.MarshalIndent(obj, "", "  ")
	if err != nil {
		log.Fatal("序列化失败!", err)
	}
	fmt.Println(string(data))
}

func main() {
	deployment := &appsv1.Deployment{}
	s := scheme.Scheme
	// 通过类型找gvk
	gvks, _, err := s.ObjectKinds(deployment)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("gvks:")
	printJson(gvks)

	// 通过gvk创建对应的类型实例
	obj, err := s.New(gvks[0])
	if err != nil {
		log.Fatal(err)
	}
	t := reflect.TypeOf(obj)
	fmt.Println("type name: ", t.Elem().Name())
}

输出如下:

gvks:
[{"Group":"apps","Version":"v1","Kind":"Deployment"}]
type name:  Deployment

虽然scheme很有用,但是我们一般不会直接使用scheme,因为它比较底层。

下面在看一个更贴近真实使用场景的例子

package main

import (
	"bytes"
	"fmt"
	"io"
	"log"

	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"

	appsv1 "k8s.io/api/apps/v1"
	apiv1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes/scheme"
)

func int32Ptr(i int32) *int32 { return &i }

func main() {
	deployment := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name: "demo-deployment",
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: int32Ptr(2),
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{
					"app": "demo",
				},
			},
			Template: apiv1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: map[string]string{
						"app": "demo",
					},
				},
				Spec: apiv1.PodSpec{
					Containers: []apiv1.Container{
						{
							Name:  "web",
							Image: "nginx:1.12",
							Ports: []apiv1.ContainerPort{
								{
									Name:          "http",
									Protocol:      apiv1.ProtocolTCP,
									ContainerPort: 80,
								},
							},
						},
					},
				},
			},
		},
	}
    // 1.
	negotiator := runtime.NewClientNegotiator(scheme.Codecs.WithoutConversion(), schema.GroupVersion{Group: "apps", Version: "v1"})
    // 2.
	encoder, err := negotiator.Encoder("application/json", nil)
	if err != nil {
		log.Fatal("初始化eecoder失败", err)
	}

	out := bytes.NewBuffer(nil)
    // 3.
	if err := encoder.Encode(deployment, out); err != nil {
		log.Fatal("编码失败", err)
	}
	data, err := io.ReadAll(out)
	if err != nil {
		log.Fatal("读取失败", err)
	}
	fmt.Println(string(data))
	// 4.
	decoder, err := negotiator.Decoder("application/json", nil)
	if err != nil {
		log.Fatal("初始化decoder失败", err)
	}
	// 5.
	obj, gvk, err := decoder.Decode(data, nil, nil)
	if err != nil {
		log.Fatal("解码失败", err)
	}
	deploy, ok := obj.(*appsv1.Deployment)
	if !ok {
		log.Fatal("解码不符合预期")
	}
	fmt.Println(deploy.Name)
	fmt.Printf("%#v\n", gvk)
}

输出如下:

{"kind":"Deployment","apiVersion":"apps/v1","metadata":{"name":"demo-deployment","creationTimestamp":null},"spec":{"replicas":2,"selector":{"matchLabels":{"app":"demo"}},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"demo"}},"spec":{"containers":[{"name":"web","image":"nginx:1.12","ports":[{"name":"http","containerPort":80,"protocol":"TCP"}],"resources":{}}]}},"strategy":{}},"status":{}}

demo-deployment
&schema.GroupVersionKind{Group:"apps", Version:"v1", Kind:"Deployment"}

代码分解如下

  1. 创建一个持有scheme的编解码对象, 这一部分在源代码会继续深入
  2. 基于应用类型创建encoder
  3. 编码
  4. 基于应用类型创建decoder
  5. 解码

静态客户端编解码器的构造过程

client-go里面的编解码初始化和上面的代码差不多, 编解码对象在创建RESTClient对象时构建。

clientset, err := kubernetes.NewForConfig(config)
cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy)
func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
	config := *c
    // 1. 
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
    // 3. 
	client, err := rest.RESTClientFor(&config)
	if err != nil {
		return nil, err
	}
	return &AppsV1Client{client}, nil
}

func setConfigDefaults(config *rest.Config) error {
	gv := v1.SchemeGroupVersion
	config.GroupVersion = &gv
	config.APIPath = "/apis"
    // 2. 
	config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
	return nil
}


func RESTClientFor(config *Config) (*RESTClient, error) {
	baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
	transport, err := TransportFor(config)

	var gv schema.GroupVersion
	if config.GroupVersion != nil {
		gv = *config.GroupVersion
	}
	clientContent := ClientContentConfig{
		AcceptContentTypes: config.AcceptContentTypes,
		ContentType:        config.ContentType,
		GroupVersion:       gv,
        // 4.
		Negotiator:         runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
	}

	restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
	return restClient, err
}

代码分解如下:

  1. 加载rest.Config,
  2. 创建NegotiatedSerializer, 该对象是一个工厂对象
  3. 创建RESTClient
  4. 基于gv创建Negotiator

构造过程和前面的例子差不多,所以不重复讲解了。

最后贴一下静态客户端的构建过程。

scheme.Codecs.WithoutConversion()

// k8s.io\client-go\kubernetes\scheme\register.go
var Codecs = serializer.NewCodecFactory(Scheme)

func NewCodecFactory(scheme *runtime.Scheme, mutators ...CodecFactoryOptionsMutator) CodecFactory {
	options := CodecFactoryOptions{Pretty: true}
	for _, fn := range mutators {
		fn(&options)
	}
	serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory, options)
	return newCodecFactory(scheme, serializers)
}

func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []serializerType {
	jsonSerializer := json.NewSerializerWithOptions(
		mf, scheme, scheme,
		json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
	)
	jsonSerializerType := serializerType{
		AcceptContentTypes: []string{runtime.ContentTypeJSON},
		ContentType:        runtime.ContentTypeJSON,
		FileExtensions:     []string{"json"},
		EncodesAsText:      true,
		Serializer:         jsonSerializer,

		Framer:           json.Framer,
		StreamSerializer: jsonSerializer,
	}

	yamlSerializer := json.NewSerializerWithOptions(
		mf, scheme, scheme,
		json.SerializerOptions{Yaml: true, Pretty: false, Strict: options.Strict},
	)
	protoSerializer := protobuf.NewSerializer(scheme, scheme)
	protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)

	serializers := []serializerType{
		jsonSerializerType,
		{
			AcceptContentTypes: []string{runtime.ContentTypeYAML},
			ContentType:        runtime.ContentTypeYAML,
			FileExtensions:     []string{"yaml"},
			EncodesAsText:      true,
			Serializer:         yamlSerializer,
		},
		{
			AcceptContentTypes: []string{runtime.ContentTypeProtobuf},
			ContentType:        runtime.ContentTypeProtobuf,
			FileExtensions:     []string{"pb"},
			Serializer:         protoSerializer,

			Framer:           protobuf.LengthDelimitedFramer,
			StreamSerializer: protoRawSerializer,
		},
	}

	for _, fn := range serializerExtensions {
		if serializer, ok := fn(scheme); ok {
			serializers = append(serializers, serializer)
		}
	}
	return serializers
}

从代码可以看到, 每个serializer(json, yaml, protobuf)都绑定到了给定的scheme, 其承担着类型映射及创建的任务。

client-go会默认在kubernetes\scheme\register.go给内置的类型全部注册到上面的scheme

总结

了解编解码的使用和构造过程可以让我们在解析k8s的数据结果时更游刃有余。

参考链接