百度开源网关BFE源代码阅读1之启动
BFE是一个功能强大的网关。 因为BFE是用Golang写的,所以值得一看,但是它的用户体验实在是一言难尽。
BFE版本: v1.6.0
环境准备
BFE
基于自身设计的原因,配置文件相较于其他产品非常多,所以直接使用BFE
的二进制执行文件直接启动不是那么容易的事情,你需要一个配置文件目录,比如conf
, 然后里面包含一堆的配置文件才能启动,不过最新的版本,二进制文件已经附带了一个conf
目录了。
下面启动前的文件结构
conf/
├── bfe.conf
├── cluster_conf
│ ├── cluster_table.data
│ └── gslb.data
├── mod_access
│ └── mod_access.conf
├── mod_auth_basic
│ ├── auth_basic_rule.data
│ ├── mod_auth_basic.conf
│ └── userfile
# 其他mod_xxxx目录省略
├── server_data_conf
│ ├── cluster_conf.data
│ ├── host_rule.data
│ ├── name_conf.data
│ ├── route_rule.data
│ └── vip_rule.data
└── tls_conf
├── certs
│ ├── example.crt
│ └── example.key
├── client_ca
│ ├── example_ca.crt
│ └── example_ca.key
├── client_crl
├── server_cert_conf.data
├── session_ticket_key.data
└── tls_rule_conf.data
配置文件配置了一些示例路由,这些路由一下是看不懂的,其中一条路由是将域名example.org
的请求转发到127.0.0.1:8181
。
所以为了尝鲜,你可以在本地启动一个监听8181端口的web服务,然后使用以下命令测试
curl 127.0.0.1:8080 -H "Host: example.org"
如果启动不了BFE
, 或者测试返回错误也不要紧,因为BFE的配置就是没有其他网关那么明了,这不是使用者的问题,是BFE
自身设计的问题。
启动流程
下面通过阅读源代码的方式来理解BFE中的各个概念,只有理解了这些概念才会配置BFE的路由。
func main() {
// confRoot默认是./conf
confPath := path.Join(*confRoot, "bfe.conf")
// 加载配置文件
config, err = bfe_conf.BfeConfigLoad(confPath, *confRoot)
// 基于配置文件启动
if err = bfe_server.StartUp(config, version, *confRoot); err != nil {
log.Logger.Error("main(): bfe_server.StartUp(): %s", err.Error())
}
}
让人值得吐槽的一点是,BFE的内部模块命名为啥都要以**bfe_**开头!!!
入口文件不是太复杂,省略掉参数解析和日志初始化,就两个部分,加载配置,启动服务。接下来以这两部分作为入口,继续深入。
加载配置
通过阅读配置加载的代码,我们可以了解到默认的参数和使用限制
func BfeConfigLoad(filePath string, confRoot string) (BfeConfig, error) {
var cfg BfeConfig
var err error
// 1.
SetDefaultConf(&cfg)
// 2.
err = gcfg.ReadFileInto(&cfg, filePath)
// 3.
if err = cfg.Server.Check(confRoot); err != nil {
return cfg, err
}
// 4.
if err = cfg.HttpsBasic.Check(confRoot); err != nil {
return cfg, err
}
// 5.
if err = cfg.SessionCache.Check(confRoot); err != nil {
return cfg, err
}
// 6.
if err = cfg.SessionTicket.Check(confRoot); err != nil {
return cfg, err
}
return cfg, nil
}
代码分解如下:
- 设置BFE提供的默认参数,比如监听端口,默认配置文件的路径之类的。
- 将用户的配置信息加载到
cfg
中 - 检查基本的配置,比如端口是否合法,指定的配置文件路径是否合法之类的
- 检查HTTPS相关的配置,比如证书目录是否存在,证书版本是否合法之类的。
- 会话缓存配置检查 暂时不知道干啥的
- 暂时不知道干啥的
总的来说,配置文件加载分为三个部分。
- 设置默认参数
- 加载用户配置文件
- 检查用户配置参数
当这三步没有问题之后,就会返回一个BfeConfig
对象,基于它可以创建BfeServer
对象。
启动服务
话说BFE的启动流程好清晰呀。
func StartUp(cfg bfe_conf.BfeConfig, version string, confRoot string) error {
var err error
// 1. 加载所有bfe内置的模块
bfe_modules.SetModules()
// 2. 创建 bfe server
bfeServer := NewBfeServer(cfg, confRoot, version)
// 3. 初始化http的负载均衡服务
if err = bfeServer.InitHttp(); err != nil {
return err
}
// 4.初始化https的负载均衡服务
if err = bfeServer.InitHttps(); err != nil {
return err
}
// 5. 加载数据 如host_rule.data,vip_rule.data等配置文件
if err = bfeServer.InitDataLoad(); err != nil {
return err
}
// 6. 注册信号函数,当接受到kill -9, kill -15之类的信号可以做一些收尾操作
bfeServer.InitSignalTable()
// 7. 启动监视器, 提供配置文件的监控,reload和debug
monitorPort := cfg.Server.MonitorPort
if err = bfeServer.InitWebMonitor(monitorPort); err != nil {
return err
}
// 8. 加载用户配置的模块
if err = bfeServer.RegisterModules(cfg.Server.Modules); err != nil {
return err
}
// 9. 初始化模块
if err = bfeServer.InitModules(); err != nil {
return err
}
// 10. 加载插件 似乎被模块替代了
if err = bfeServer.LoadPlugins(cfg.Server.Plugins); err != nil {
return err
}
// 11. 加载插件 似乎被模块替代了
if err = bfeServer.InitPlugins(); err != nil {
return err
}
// 12. 初始化监听器,即tcp端口监听
if err = bfeServer.InitListeners(cfg); err != nil {
return err
}
// 13. 是否启用监听器
if cfg.Server.MonitorEnabled {
bfeServer.Monitor.Start()
}
// 14. 后面就是依次启动http, https服务了。
serveChan := make(chan error)
// start goroutine to accept http connections
for i := 0; i < cfg.Server.AcceptNum; i++ {
go func() {
httpErr := bfeServer.ServeHttp(bfeServer.HttpListener)
serveChan <- httpErr
}()
}
// start goroutine to accept https connections
for i := 0; i < cfg.Server.AcceptNum; i++ {
go func() {
httpsErr := bfeServer.ServeHttps(bfeServer.HttpsListener)
serveChan <- httpsErr
}()
}
err = <-serveChan
return err
}
因为初始化的代码逻辑还是比较清晰的,所以就直接在对应的地方加注释进行说明了,如果你看过BFE的代码会发现,其实我只是简单的翻译了一下。
BFE的初始化过程还是会设计很多配置的,这些只有需要的时候去看才能比较好的理解,所以这里暂时略过各个初始化的过程。
ServeHttp, ServeHttps
ServeHttp
和ServeHttps
其实差别不大。
// ServeHttp accept incoming http connections
func (srv *BfeServer) ServeHttp(ln net.Listener) error {
return srv.Serve(ln, ln, "HTTP")
}
// ServeHttps accept incoming https connections
func (srv *BfeServer) ServeHttps(ln *HttpsListener) error {
return srv.Serve(ln.tlsListener, ln.tcpListener, "HTTPS")
}
func (srv *BfeServer) Serve(l net.Listener, raw net.Listener, proto string) error {
var tempDelay time.Duration // how long to sleep on accept failure
proxyState := srv.serverStatus.ProxyState
for {
// 1.
rw, e := l.Accept()
// 2.
go func(rwc net.Conn, srv *BfeServer) {
// 3.
c, err := newConn(rwc, srv)
// 4.
c.serve()
}(rw, srv)
}
}
代码分解如下:
- 获得
net.Conn
连接 - 每个连接起一个gorouting处理
- 将原始的
net.Conn
对象包装一层,加上自己的各种处理逻辑 - 开始处理请求
可以看到HTTP, HTTPS的请求入口其实是差不多的。
serve
当连接建立之后就可以处理请求了。
func (c *conn) serve() {
var hl *bfe_module.HandlerList
var retVal int
session := c.session
c.server.connWaitGroup.Add(1)
serverStatus := c.server.serverStatus
proxyState := serverStatus.ProxyState
defer func() {
// 1. 异常处理, 捕获panic
}()
defer func() {
// 2. 调用收尾的回调函数和关闭连接
c.finish()
c.close()
}()
// 3.
hl = c.server.CallBacks.GetHandlerList(bfe_module.HandleAccept)
if tlsConn, ok := c.rwc.(*bfe_tls.Conn); ok {
// start tls handshake
// 4.
start := time.Now()
if err := tlsConn.Handshake(); err != nil {
session.SetError(bfe_basic.ErrClientTlsHandshake, err.Error())
return
}
// Callback for HANDLE_HANDSHAKE
// 5.
hl = c.server.CallBacks.GetHandlerList(bfe_module.HandleHandshake)
// upgrade to negotiated protocol
// 6.
proto := tlsState.NegotiatedProtocol
if mandatoryProtocol, ok := c.getMandatoryProtocol(tlsConn); ok {
proto = mandatoryProtocol
}
// 7.
if validNPN(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
c.session.Proto = proto
// process protocol over TLS connection (spdy, http2, etc)
handler := NewProtocolHandler(c, proto)
fn(&c.server.Server, tlsConn, handler)
} else {
// never go here
log.Logger.Info("conn.serve(): unknown negotiated protocol %s over TLS", proto)
}
return
}
}
// 8.
if _, ok := c.rwc.(*bfe_tls.Conn); ok {
c.session.Proto = "https"
} else {
c.session.Proto = "http"
}
firstRequest := true
// 9.
for {
// 10.
if firstRequest {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
}
// 11.
request, err := c.readRequest()
req := request.HttpRequest
// 12.
w := newResponse(c, req)
// 13
if firstRequest {
// 14
nextProto := checkHttpUpgrade(request)
fn := c.server.HTTPNextProto[nextProto]
switch nextProto {
case bfe_websocket.WebSocket:
fn(&c.server.Server, w, req)
return
default:
log.Logger.Debug("conn.serve(): not upgrade to other protocol over http/https")
}
firstRequest = false
}
// 15.
isKeepAlive := c.serveRequest(w, request)
// 16.
if !isKeepAlive || w.closeAfterReply {
if w.requestBodyLimitHit {
c.closeWriteAndWait()
}
break
}
}
}
代码分解如下:
- 注册延迟函数 异常处理, 捕获panic
- 注册延迟函数 调用收尾的回调函数和关闭连接和恢复
- 调用回调函数
HandleAccept
,如果有的话 - 开始TLS握手
- 调用
HANDLE_HANDSHAKE
回调函数 - 判断是否设置了协议升级 比如
spdy
,http2
之类的 - 判断是否是合法的升级协议 跳过
"", "http/1.1", "http/1.0"
- 设置Proto字段
- 开始处理请求,之所以是一个循环,是因为代理的链接可能是一个长连接
- 如果是第一次请求且配置了
ReadTimeout
, 就设置超时参数。 - 读取客户端发给BFE的请求
- 创建response对象
- 第一次请求时检查是否需要升级到WebSocket
- 具体的逻辑
- 开始代理请求,将客户端的请求转发给后端,并返回该请求是否是长连接
- 如果不是长连接或者配置了
closeAfterReply
参数就关闭连接,清空缓存
至此请求前的工作全部做完,后面就是具体的代理过程了。
ReverseProxy.ServeHTTP
代理要做的事情比较简单,就是将客户端的请求转发给后端。但是具体实现会涉及两个问题。
- 怎么找到对应的后端服务
- 怎么扩展(植入)自己的代码片段
我们看看BFE
是怎么做的。
func (p *ReverseProxy) ServeHTTP() (action int) {
// 1.
srv := p.server
// 2.
setClientAddr(basicReq)
// 3.
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleBeforeLocation)
// 4.
if err := srv.findProduct(basicReq); err != nil {
// 找不到产品时的错误处理
}
// 5.
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleFoundProduct)
// 6.
if err = srv.findCluster(basicReq); err != nil {
// 找不到集群的错误处理
}
clusterName = basicReq.Route.ClusterName
// 7.
serverConf = basicReq.SvrDataConf.(*bfe_route.ServerDataConf)
cluster, err = serverConf.ClusterTable.Lookup(clusterName)
basicReq.Backend.ClusterName = clusterName
// 8.
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleAfterLocation)
// 9.
outreq = new(bfe_http.Request)
*outreq = *req // includes shallow copies of maps, but okay
basicReq.OutRequest = outreq
// invoke cluster to get response
res, action, err = p.clusterInvoke(srv, cluster, basicReq, rw)
basicReq.HttpResponse = res
// 10.
basicReq.SvrDataConf = nil
// 11.
response_got:
// 设置超时参数
// 12.
hl = srv.CallBacks.GetHandlerList(bfe_module.HandleReadResponse)
// 13.
send_response:
// 14.
if !isRedirect && res != nil {
err = p.sendResponse(rw, res, resFlushInterval, cancelOnClientClose)
}
return
}
代码分解如下:
- 获取连接对应的Server对象
- 设置客户端的请求IP,会尝试解析
X-Real-Ip
和X-Forwarded-For
HTTP头 - 调用
HandleBeforeLocation
回调函数 - 先找连接对应的产品
- 调用
HandleFoundProduct
回调函数 - 通过产品在找到对应的集群名
- 在通过集群名找到对应的集群
- 调用
HandleAfterLocation
回调函数 - 拷贝请求对象并加个请求通过集群调用, 集群会包含多个后端
SvrDataConf
对象这时已经没有了,为了减轻GC压力,所以手动删除引用,这样可以快速的被GC了- goto能直接定向的标签 获取响应之后
- 调用HandleReadResponse回调函数
- goto能直接定向的标签 发送响应时
- 将后端发来的响应发回给客户端
上面的代码大致包含两个部分,一是路由的部分,二是回调函数。
因为BFE
多了一个产品的概念,所以直接去看配置文件会有点不太好理解BFE
的路由配置,而回调函数就是BFE
提供的钩子函数,在BFE
各个阶段前后执行,提供开发自己的模块可以介入BFE
的各个阶段。
总结
本文大致讲解了路由的前半部分的代码,没有讲解后端转发的部分,是因为这部分其实具体的逻辑还是比较复杂的,后面单独一篇文章,而想对BFE
有一个大概的认识,路由的前半部分大致可以帮助建立这个意识了,但是想使用还是有点难度的,所以需要继续对路由部分再次深入。