百度开源网关BFE源代码阅读1之启动

文章目录

BFE是一个功能强大的网关。 因为BFE是用Golang写的,所以值得一看,但是它的用户体验实在是一言难尽。

BFE版本: v1.6.0

环境准备

BFE基于自身设计的原因,配置文件相较于其他产品非常多,所以直接使用BFE的二进制执行文件直接启动不是那么容易的事情,你需要一个配置文件目录,比如conf, 然后里面包含一堆的配置文件才能启动,不过最新的版本,二进制文件已经附带了一个conf目录了。

下面启动前的文件结构

conf/
├── bfe.conf
├── cluster_conf
│   ├── cluster_table.data
│   └── gslb.data
├── mod_access
│   └── mod_access.conf
├── mod_auth_basic
│   ├── auth_basic_rule.data
│   ├── mod_auth_basic.conf
│   └── userfile
# 其他mod_xxxx目录省略
├── server_data_conf
│   ├── cluster_conf.data
│   ├── host_rule.data
│   ├── name_conf.data
│   ├── route_rule.data
│   └── vip_rule.data
└── tls_conf
    ├── certs
    │   ├── example.crt
    │   └── example.key
    ├── client_ca
    │   ├── example_ca.crt
    │   └── example_ca.key
    ├── client_crl
    ├── server_cert_conf.data
    ├── session_ticket_key.data
    └── tls_rule_conf.data

配置文件配置了一些示例路由,这些路由一下是看不懂的,其中一条路由是将域名example.org的请求转发到127.0.0.1:8181

所以为了尝鲜,你可以在本地启动一个监听8181端口的web服务,然后使用以下命令测试

curl 127.0.0.1:8080  -H "Host: example.org"

如果启动不了BFE, 或者测试返回错误也不要紧,因为BFE的配置就是没有其他网关那么明了,这不是使用者的问题,是BFE自身设计的问题。

启动流程

下面通过阅读源代码的方式来理解BFE中的各个概念,只有理解了这些概念才会配置BFE的路由。

func main() {
    // confRoot默认是./conf
    confPath := path.Join(*confRoot, "bfe.conf")
    // 加载配置文件
	config, err = bfe_conf.BfeConfigLoad(confPath, *confRoot)
    // 基于配置文件启动
    if err = bfe_server.StartUp(config, version, *confRoot); err != nil {
		log.Logger.Error("main(): bfe_server.StartUp(): %s", err.Error())
	}
}

让人值得吐槽的一点是,BFE的内部模块命名为啥都要以**bfe_**开头!!!

入口文件不是太复杂,省略掉参数解析和日志初始化,就两个部分,加载配置,启动服务。接下来以这两部分作为入口,继续深入。

加载配置

通过阅读配置加载的代码,我们可以了解到默认的参数和使用限制

func BfeConfigLoad(filePath string, confRoot string) (BfeConfig, error) {
	var cfg BfeConfig
	var err error
	// 1.
	SetDefaultConf(&cfg)
	// 2.
	err = gcfg.ReadFileInto(&cfg, filePath)
	// 3.
	if err = cfg.Server.Check(confRoot); err != nil {
		return cfg, err
	}
    
	// 4.
	if err = cfg.HttpsBasic.Check(confRoot); err != nil {
		return cfg, err
	}
	
    // 5.
	if err = cfg.SessionCache.Check(confRoot); err != nil {
		return cfg, err
	}
	
    // 6.
	if err = cfg.SessionTicket.Check(confRoot); err != nil {
		return cfg, err
	}

	return cfg, nil
}

代码分解如下:

  1. 设置BFE提供的默认参数,比如监听端口,默认配置文件的路径之类的。
  2. 将用户的配置信息加载到cfg
  3. 检查基本的配置,比如端口是否合法,指定的配置文件路径是否合法之类的
  4. 检查HTTPS相关的配置,比如证书目录是否存在,证书版本是否合法之类的。
  5. 会话缓存配置检查 暂时不知道干啥的
  6. 暂时不知道干啥的

总的来说,配置文件加载分为三个部分。

  1. 设置默认参数
  2. 加载用户配置文件
  3. 检查用户配置参数

当这三步没有问题之后,就会返回一个BfeConfig对象,基于它可以创建BfeServer对象。

启动服务

话说BFE的启动流程好清晰呀。

func StartUp(cfg bfe_conf.BfeConfig, version string, confRoot string) error {
	var err error

	// 1. 加载所有bfe内置的模块
	bfe_modules.SetModules()

	// 2. 创建 bfe server
	bfeServer := NewBfeServer(cfg, confRoot, version)

	// 3. 初始化http的负载均衡服务
	if err = bfeServer.InitHttp(); err != nil {
		return err
	}

	// 4.初始化https的负载均衡服务
	if err = bfeServer.InitHttps(); err != nil {
		return err
	}

	// 5. 加载数据  如host_rule.data,vip_rule.data等配置文件
	if err = bfeServer.InitDataLoad(); err != nil {
		return err
	}

	// 6. 注册信号函数,当接受到kill -9, kill -15之类的信号可以做一些收尾操作
	bfeServer.InitSignalTable()

	// 7. 启动监视器, 提供配置文件的监控,reload和debug
	monitorPort := cfg.Server.MonitorPort
	if err = bfeServer.InitWebMonitor(monitorPort); err != nil {
		return err
	}

	// 8. 加载用户配置的模块
	if err = bfeServer.RegisterModules(cfg.Server.Modules); err != nil {
		return err
	}

	// 9. 初始化模块
	if err = bfeServer.InitModules(); err != nil {
		return err
	}

	// 10. 加载插件 似乎被模块替代了
	if err = bfeServer.LoadPlugins(cfg.Server.Plugins); err != nil {
		return err
	}

	// 11.  加载插件 似乎被模块替代了
	if err = bfeServer.InitPlugins(); err != nil {
		return err
	}

	// 12. 初始化监听器,即tcp端口监听
	if err = bfeServer.InitListeners(cfg); err != nil {
		return err
	}

	// 13. 是否启用监听器
	if cfg.Server.MonitorEnabled {
		bfeServer.Monitor.Start()
	}

    // 14. 后面就是依次启动http, https服务了。
	serveChan := make(chan error)
	// start goroutine to accept http connections
	for i := 0; i < cfg.Server.AcceptNum; i++ {
		go func() {
			httpErr := bfeServer.ServeHttp(bfeServer.HttpListener)
			serveChan <- httpErr
		}()
	}

	// start goroutine to accept https connections
	for i := 0; i < cfg.Server.AcceptNum; i++ {
		go func() {
			httpsErr := bfeServer.ServeHttps(bfeServer.HttpsListener)
			serveChan <- httpsErr
		}()
	}

	err = <-serveChan
	return err
}

因为初始化的代码逻辑还是比较清晰的,所以就直接在对应的地方加注释进行说明了,如果你看过BFE的代码会发现,其实我只是简单的翻译了一下。

BFE的初始化过程还是会设计很多配置的,这些只有需要的时候去看才能比较好的理解,所以这里暂时略过各个初始化的过程。

ServeHttp, ServeHttps

ServeHttpServeHttps其实差别不大。

// ServeHttp accept incoming http connections
func (srv *BfeServer) ServeHttp(ln net.Listener) error {
	return srv.Serve(ln, ln, "HTTP")
}

// ServeHttps accept incoming https connections
func (srv *BfeServer) ServeHttps(ln *HttpsListener) error {
	return srv.Serve(ln.tlsListener, ln.tcpListener, "HTTPS")
}

func (srv *BfeServer) Serve(l net.Listener, raw net.Listener, proto string) error {
	var tempDelay time.Duration // how long to sleep on accept failure
	proxyState := srv.serverStatus.ProxyState

	for {
		// 1.
		rw, e := l.Accept()

		// 2.
		go func(rwc net.Conn, srv *BfeServer) {
			// 3.
			c, err := newConn(rwc, srv)
			// 4.
			c.serve()
		}(rw, srv)
	}
}

代码分解如下:

  1. 获得net.Conn连接
  2. 每个连接起一个gorouting处理
  3. 将原始的net.Conn对象包装一层,加上自己的各种处理逻辑
  4. 开始处理请求

可以看到HTTP, HTTPS的请求入口其实是差不多的。

serve

当连接建立之后就可以处理请求了。

func (c *conn) serve() {
	var hl *bfe_module.HandlerList
	var retVal int
	session := c.session
	c.server.connWaitGroup.Add(1)
	serverStatus := c.server.serverStatus
	proxyState := serverStatus.ProxyState

	defer func() {
		// 1. 异常处理, 捕获panic
	}()

	defer func() {
		// 2. 调用收尾的回调函数和关闭连接
		c.finish()
		c.close()
	}()

	// 3.
	hl = c.server.CallBacks.GetHandlerList(bfe_module.HandleAccept)

	if tlsConn, ok := c.rwc.(*bfe_tls.Conn); ok {
		// start tls handshake
        // 4.
		start := time.Now()
		if err := tlsConn.Handshake(); err != nil {
			session.SetError(bfe_basic.ErrClientTlsHandshake, err.Error())
			return
		}

		// Callback for HANDLE_HANDSHAKE
        // 5.
		hl = c.server.CallBacks.GetHandlerList(bfe_module.HandleHandshake)
        
		// upgrade to negotiated protocol
        // 6.
		proto := tlsState.NegotiatedProtocol
		if mandatoryProtocol, ok := c.getMandatoryProtocol(tlsConn); ok {
			proto = mandatoryProtocol
		}
        
        // 7.
		if validNPN(proto) {
			if fn := c.server.TLSNextProto[proto]; fn != nil {
				c.session.Proto = proto
				// process protocol over TLS connection (spdy, http2, etc)
				handler := NewProtocolHandler(c, proto)
				fn(&c.server.Server, tlsConn, handler)
			} else {
				// never go here
				log.Logger.Info("conn.serve(): unknown negotiated protocol %s over TLS", proto)
			}
			return
		}
	}

    // 8.
	if _, ok := c.rwc.(*bfe_tls.Conn); ok {
		c.session.Proto = "https"
	} else {
		c.session.Proto = "http"
	}
    
	firstRequest := true
    // 9.
	for {
        // 10.
		if firstRequest {
			if d := c.server.ReadTimeout; d != 0 {
				c.rwc.SetReadDeadline(time.Now().Add(d))
			}
		}
		// 11.
		request, err := c.readRequest()
		req := request.HttpRequest

        // 12.
		w := newResponse(c, req)

         // 13
		if firstRequest {           
            // 14
			nextProto := checkHttpUpgrade(request)
			fn := c.server.HTTPNextProto[nextProto]
			switch nextProto {
			case bfe_websocket.WebSocket:
				fn(&c.server.Server, w, req)
				return
			default:
				log.Logger.Debug("conn.serve(): not upgrade to other protocol over http/https")
			}
			firstRequest = false
		}
		
        // 15.
		isKeepAlive := c.serveRequest(w, request)


        // 16.
		if !isKeepAlive || w.closeAfterReply {
			if w.requestBodyLimitHit {
				c.closeWriteAndWait()
			}
			break
		}
	}
}

代码分解如下:

  1. 注册延迟函数 异常处理, 捕获panic
  2. 注册延迟函数 调用收尾的回调函数和关闭连接和恢复
  3. 调用回调函数HandleAccept,如果有的话
  4. 开始TLS握手
  5. 调用HANDLE_HANDSHAKE回调函数
  6. 判断是否设置了协议升级 比如spdy, http2之类的
  7. 判断是否是合法的升级协议 跳过"", "http/1.1", "http/1.0"
  8. 设置Proto字段
  9. 开始处理请求,之所以是一个循环,是因为代理的链接可能是一个长连接
  10. 如果是第一次请求且配置了ReadTimeout, 就设置超时参数。
  11. 读取客户端发给BFE的请求
  12. 创建response对象
  13. 第一次请求时检查是否需要升级到WebSocket
  14. 具体的逻辑
  15. 开始代理请求,将客户端的请求转发给后端,并返回该请求是否是长连接
  16. 如果不是长连接或者配置了closeAfterReply参数就关闭连接,清空缓存

至此请求前的工作全部做完,后面就是具体的代理过程了。

ReverseProxy.ServeHTTP

代理要做的事情比较简单,就是将客户端的请求转发给后端。但是具体实现会涉及两个问题。

  1. 怎么找到对应的后端服务
  2. 怎么扩展(植入)自己的代码片段

我们看看BFE是怎么做的。

func (p *ReverseProxy) ServeHTTP() (action int) {
    // 1.
	srv := p.server

	// 2.
	setClientAddr(basicReq)

	// 3.
	hl = srv.CallBacks.GetHandlerList(bfe_module.HandleBeforeLocation)

    // 4.
	if err := srv.findProduct(basicReq); err != nil {
		// 找不到产品时的错误处理
	}

    // 5.
	hl = srv.CallBacks.GetHandlerList(bfe_module.HandleFoundProduct)

	// 6.
	if err = srv.findCluster(basicReq); err != nil {
        // 找不到集群的错误处理
	}
	clusterName = basicReq.Route.ClusterName

	// 7.
	serverConf = basicReq.SvrDataConf.(*bfe_route.ServerDataConf)
	cluster, err = serverConf.ClusterTable.Lookup(clusterName)
	basicReq.Backend.ClusterName = clusterName
	
    
	// 8.
	hl = srv.CallBacks.GetHandlerList(bfe_module.HandleAfterLocation)

	// 9.
	outreq = new(bfe_http.Request)
	*outreq = *req // includes shallow copies of maps, but okay
	basicReq.OutRequest = outreq
	// invoke cluster to get response
	res, action, err = p.clusterInvoke(srv, cluster, basicReq, rw)
	basicReq.HttpResponse = res

    
    // 10. 
	basicReq.SvrDataConf = nil

// 11.
response_got:
	// 设置超时参数
    
	// 12.
	hl = srv.CallBacks.GetHandlerList(bfe_module.HandleReadResponse)

// 13.
send_response:
    // 14.
	if !isRedirect && res != nil {
		err = p.sendResponse(rw, res, resFlushInterval, cancelOnClientClose)
	}
	return
}

代码分解如下:

  1. 获取连接对应的Server对象
  2. 设置客户端的请求IP,会尝试解析X-Real-IpX-Forwarded-For HTTP头
  3. 调用HandleBeforeLocation回调函数
  4. 先找连接对应的产品
  5. 调用HandleFoundProduct回调函数
  6. 通过产品在找到对应的集群名
  7. 在通过集群名找到对应的集群
  8. 调用HandleAfterLocation回调函数
  9. 拷贝请求对象并加个请求通过集群调用, 集群会包含多个后端
  10. SvrDataConf对象这时已经没有了,为了减轻GC压力,所以手动删除引用,这样可以快速的被GC了
  11. goto能直接定向的标签 获取响应之后
  12. 调用HandleReadResponse回调函数
  13. goto能直接定向的标签 发送响应时
  14. 将后端发来的响应发回给客户端

上面的代码大致包含两个部分,一是路由的部分,二是回调函数。

因为BFE多了一个产品的概念,所以直接去看配置文件会有点不太好理解BFE的路由配置,而回调函数就是BFE提供的钩子函数,在BFE各个阶段前后执行,提供开发自己的模块可以介入BFE的各个阶段。

总结

本文大致讲解了路由的前半部分的代码,没有讲解后端转发的部分,是因为这部分其实具体的逻辑还是比较复杂的,后面单独一篇文章,而想对BFE有一个大概的认识,路由的前半部分大致可以帮助建立这个意识了,但是想使用还是有点难度的,所以需要继续对路由部分再次深入。

参考链接