百度开源网关BFE源代码阅读4之模块

文章目录

据我所知,所有开源负载均衡都会提供至少一种扩展机制,BFE也不例外,BFE通过模块的选择可以更精细的控制BFE在处理请求中的各个阶段。如果内置模块不能满足自己需求,那么可以自己开发模块,而BFE是用Golang写的,所以开发效率很高。

下面是BFE各个回调点的位置。

BFE转发过程中的回调点

BFE的模块和其他开源产品的一个很大的不同点是,BFE基于Product(租户)维度控制,而其他大多数产品都是基于Host(域名)。

初始化

首先看看BFE是怎么加载和启用模块的。

func StartUp(cfg bfe_conf.BfeConfig, version string, confRoot string) error {
	var err error

	// 1.
	bfe_modules.SetModules()
    
    // 2.
	if err = bfeServer.RegisterModules(cfg.Server.Modules); err != nil {
		log.Logger.Error("StartUp(): RegisterModules():%s", err.Error())
		return err
	}

	// 3.
	if err = bfeServer.InitModules(); err != nil {
		log.Logger.Error("StartUp(): bfeServer.InitModules():%s",
			err.Error())
		return err
	}
}

// 4.
func SetModules() {
	for _, module := range moduleList {
		bfe_module.AddModule(module)
	}
}

// 5.
func (srv *BfeServer) RegisterModules(modules []string) error {
	for _, moduleName := range modules {
		moduleName = strings.TrimSpace(moduleName)
		if err := srv.Modules.RegisterModule(moduleName); err != nil {
			return err
		}
	}

	return nil
}

// 6.
func (bm *BfeModules) RegisterModule(name string) error {
	module, ok := moduleMap[name]
	bm.workModules[name] = module

	return nil
}
// 7.
func (srv *BfeServer) InitModules() error {
	return srv.Modules.Init(srv.CallBacks, srv.Monitor.WebHandlers, srv.ConfRoot)
}

func (bm *BfeModules) Init(cbs *BfeCallbacks, whs *web_monitor.WebHandlers, cr string) error {
    // 8.
	for _, name := range modulesAll {
		module, ok := bm.workModules[name]
		if ok {
			err := module.Init(cbs, whs, cr)
			modulesEnabled = append(modulesEnabled, name)
		}
	}
	return nil
}

代码分解如下:

  1. 加载BFE代码中所有集成的模块
  2. 基于配置文件中的Modules参数依次尝试注册模块
  3. 初始化所有注册的模块
  4. 第一步的具体实现,就是简单的追加切片
  5. 第二步的具体实现,将模块注册到BfeServer对象上
  6. 上一步的具体实现
  7. 第三步的具体实现
  8. 注册时要按代码中定义的顺序来初始化模块,这很重要, 因为模块时按注册的顺序依次执行的。

从上面的的代码可以看到,模块的初始化主要分为三个部分,加载所有可用的模块,注册用户配置的模块,将注册的模块依次初始化,也就是调用其对应的Init方法,BFE的模块初始化流程还是比较简单明了的。

常用模块

下面开始看常见模块的初始化逻辑。

  1. mod_trust_clientip
  2. mod_block
  3. mod_header
  4. mod_rewrite
  5. mod_redirect
  6. mod_logid
  7. mod_tag
  8. mod_trace
  9. mod_access
  10. mod_prison

根据前面的代码,我们可以直接找模块的Init方法。

mod_trust_clientip

这个模块用于设置请求的客户端IP是否是可信的,可信的IP段由用户配置。

// 1.
func (m *ModuleTrustClientIP) Init() error {
	confPath := bfe_module.ModConfPath(cr, m.name)
	if conf, err = ConfLoad(confPath, cr); err != nil {
		return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
	}

	return m.init(conf, cbs, whs)
}

func (m *ModuleTrustClientIP) init(cfg *ConfModTrustClientIP, cbs *bfe_module.BfeCallbacks,
	whs *web_monitor.WebHandlers) error {
	m.configPath = cfg.Basic.DataPath

    // 2.
	m.trustTable = ipdict.NewIPTable()

	// 3.
	if err := m.loadConfData(nil); err != nil {
		return fmt.Errorf("err in loadConfData(): %s", err.Error())
	}

	// 4.
	err := cbs.AddFilter(bfe_module.HandleAccept, m.acceptHandler)

	// 5.
	err = web_monitor.RegisterHandlers(whs, web_monitor.WebHandleMonitor, m.monitorHandlers())

	// 6.
	err = whs.RegisterHandler(web_monitor.WebHandleReload, m.name, m.loadConfData)

	return nil
}

func (m *ModuleTrustClientIP) acceptHandler(session *bfe_basic.Session) int {
	m.state.ConnTotal.Inc(1)
	trusted := m.trustTable.Search(session.RemoteAddr.IP)
	if trusted {
		m.state.ConnTrustClientip.Inc(1)
	}
    // 7.
	session.SetTrustSource(trusted)

	// state for internal remote ip
	if session.RemoteAddr.IP.IsPrivate() {
		m.state.ConnAddrInternal.Inc(1)
		if !trusted {
			m.state.ConnAddrInternalNotTrust.Inc(1)
		}
	}

	return bfe_module.BfeHandlerGoOn
}

代码分解如下:

  1. 入口函数
  2. 构造可以查询客户端IP的对象
  3. 加载配置文件及数据文件
  4. m.acceptHandler注册到回调函数中
  5. 注册监控的回调函数,用于展示内部数据
  6. 注册reload时被调用的函数
  7. 当客户端在配置的IP地址段中,只是简单的设置一个true

这个模块并不会拒绝请求,它只是在请求的Session(会话)对象的isTrustSourcez字段设置一个true。这个布尔值只有在条件匹配时使用条件原语req_cip_trusted才会应用。

mod_block

这个模块主要用于屏蔽指定的IP段,以及根据条件原语来屏蔽。为了更好理解看看它的测试配置文件

{
    "Version": "init version",
    "Config": {
        "example_product": [
            {
                "action": {
                    "cmd": "CLOSE",
                    "params": []
                },
                "name": "example rule",
                "cond": "req_path_in(\"/limit\", false)"            
            }
        ]
    }
}

这个配置文件用于屏蔽匹配的规则

192.168.1.253 192.168.1.254
192.168.1.250
192.168.1.250/20

这个配置文件用于指定要屏蔽的IP段

最后看看代码

func (m *ModuleBlock) Init() error {
    // 1.
	confPath := bfe_module.ModConfPath(cr, m.name)
	if conf, err = ConfLoad(confPath, cr); err != nil {
		return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
	}

    // 2.
	if err = m.loadGlobalIPTable(nil); err != nil {
		return fmt.Errorf("%s: loadGlobalIPTable() err %s", m.name, err.Error())
	}
    // 3.
	if err = m.loadProductRuleConf(nil); err != nil {
		return fmt.Errorf("%s: loadProductRuleConf() err %s", m.name, err.Error())
	}

    // 4.
	err = cbs.AddFilter(bfe_module.HandleAccept, m.globalBlockHandler)
	// 5.
	err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.productBlockHandler)
	return nil
}

// 6.
func (m *ModuleBlock) globalBlockHandler(session *bfe_basic.Session) int {
	clientIP := session.RemoteAddr.IP
	if m.ipTable.Search(clientIP) {
		session.SetError(ErrBlock, "connection blocked")
		return bfe_module.BfeHandlerClose
	}

	return bfe_module.BfeHandlerGoOn
}

// 7.
func (m *ModuleBlock) productBlockHandler() {
    // 8.
	rules, ok := m.ruleTable.Search(bfe_basic.GlobalProduct)
	if ok { // rules found
		retVal, isMatch, resp := m.productRulesProcess(request, rules)
		if isMatch {
			return retVal, resp
		}
	}
    
    // 9.
	rules, ok = m.ruleTable.Search(request.Route.Product)
	if !ok {
		return bfe_module.BfeHandlerGoOn, nil
	}
	retVal, isMatch, resp := m.productRulesProcess(request, rules)
	if !isMatch {
		m.state.ReqAccept.Inc(1)
	}
	return retVal, resp
}

代码分解如下:

  1. 加载配置文件
  2. 构造全局的IP屏蔽规则
  3. 加载按Product租户分类的屏蔽的规则
  4. 注册回调函数globalBlockHandler
  5. 注册回调函数productBlockHandler
  6. 第4步的具体实现,如果客户端IP在设定的IP段中就屏蔽
  7. 第5步的具体实现
  8. 看看有没有叫做globalproduct租户名,这个特殊的租户名会全局生效
  9. 看看有没有对应的产品规则,没有的话直接放行

mod_header

这个模块主要使用用来修改头信息,为了更好理解看看它的测试配置文件

{
    "Version": "init version",
    "Config": {
        "example_product": [
            {
                "cond": "req_path_prefix_in(\"/header\", false)",
                "actions": [
                    {
                        "cmd": "RSP_HEADER_SET",
                        "params": [
                            "X-Proxied-By",
                            "bfe"
                        ]
                    }
                ],
                "last": true
            }
        ]
    }
}

配置的意思就是, 当请求的路径前缀包含/header, 就设置X-Proxied-By: bfe这样的HTTP头信息。

func (m *ModuleHeader) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
	// 1.
	confPath := bfe_module.ModConfPath(cr, m.name)
	if conf, err = ConfLoad(confPath, cr); err != nil {
		return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
	}

	return m.init(conf, cbs, whs)
}
                            
func (m *ModuleHeader) init() error {
	// 2.
	if err := m.loadConfData(nil); err != nil {
		return fmt.Errorf("err in loadConfData(): %s", err.Error())
	}

	// 3.
	err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.reqHeaderHandler)
	err = cbs.AddFilter(bfe_module.HandleReadResponse, m.rspHeaderHandler)
	return nil
}

代码分解如下:

  1. 加载配置文件
  2. 加载数据文件
  3. 注册请求前和响应后的回调函数

这个模块的功能逻辑比较直观,具体的实现代码就不看了。

mod_rewrite

这个模块用来改写请求,可以修改三部分的数据,源码如下:

var allowActions = map[string]interface{}{
	// host actions
	action.ActionHostSetFromPathPrefix: nil, // set host from path prefix
	action.ActionHostSet:               nil, //set host
	action.ActionHostSuffixReplace:     nil, // replace host suffix

	// path actions
	action.ActionPathSet:        nil, // set path
	action.ActionPathPrefixAdd:  nil, // add path prefix
	action.ActionPathPrefixTrim: nil, // trim path prefix

	// query actions
	action.ActionQueryAdd:          nil, // add query
	action.ActionQueryDel:          nil, // del query
	action.ActionQueryRename:       nil, // rename query
	action.ActionQueryDelAllExcept: nil, // del query except given query key
}

根据名字应该就能看出来, Host, Path, Query三个字段的值。

下面是一个示例配置文件

{
    "Version": "init version",
    "Config": {
        "youerning_product": [
            {
                "Cond": "req_path_prefix_in(\"/rewrite\", false)",
                "Actions": [
                    {
                        "Cmd": "PATH_PREFIX_ADD",
                        "Params": [
                            "/bfe/"
                        ]
                    }
                ],
                "Last": true
            }
        ]
    }
}

上面配置的意思是,在URL路径上加上/bfe/, 如果路径的前缀是/rewrite的话。

测试如下:

curl "http://127.0.0.1:8080/rewrite" -H "Host: youerning.top"
ClusterB Backend1 /bfe/rewrite

其实模块的初始化代码都差不多,后面都会简单的过一下初始化过程,主要是看看它的回调点在哪。

func (m *ModuleReWrite) Init() error {
    // 加载配置文件
	confPath := bfe_module.ModConfPath(cr, m.name)
	if conf, err = ConfLoad(confPath, cr); err != nil {
		return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
	}

	return m.init(conf, cbs, whs)
}

func (m *ModuleReWrite) init() error {
    // HandleAfterLocation的回调会被调用
	err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.rewriteHandler)

	return nil
}

mod_redirect

这个模块的名字也很直观,就是重定向,我们通过配置文件来学习。

{
    "Version": "init version",
    "Config": {
        "youerning_product": [
            {
                "Cond": "req_path_prefix_in(\"/redirect\", false)",
                "Actions": [
                    {
                        "Cmd": "URL_SET",
                        "Params": ["http://127.0.0.1:18002"]
                    }
                ],
                "Status": 301
            }
        ]
    }
}

上面配置的意思是,重定向到http://127.0.0.1:18002, 如果路径的前缀是/redirect的话。

测试如下:

curl "http://127.0.0.1:8080/redirect" -H "Host: youerning.top" -i -L
HTTP/1.1 301 Moved Permanently
Location: http://127.0.0.1:18002
Server: bfe
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 57
Content-Type: text/html; charset=utf-8

HTTP/1.1 200 OK
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 20
Content-Type: text/plain; charset=utf-8

ClusterA Backend2 /

可以看到,最先得到的响应时301,被重定向的位置是http://127.0.0.1:18002

mod_logid

这个模块用来生成一个随机的日志ID, 不需要配置文件,暂时不知道干啥的。

func (m *ModuleLogId) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
	cr string) error {
	// 分别在HandleAccept, HandleBeforeLocation创建logid
	err := cbs.AddFilter(bfe_module.HandleAccept, m.sessionIdHandler)
	err = cbs.AddFilter(bfe_module.HandleBeforeLocation, m.requestIdHandler)

	return nil
}

func (m *ModuleLogId) sessionIdHandler(session *bfe_basic.Session) int {
    // 在session上设置logid
	session.SessionId = genLogId()
	return bfe_module.BfeHandlerGoOn
}

func (m *ModuleLogId) requestIdHandler() {
	// 在HTTP头 X-Bfe-Log-Id 上设置logid
	req.LogId = genLogId()
	req.HttpRequest.Header.Set(bfe_basic.HeaderBfeLogId, req.LogId)
	return bfe_module.BfeHandlerGoOn, nil
}

// 随机生成logid
func genLogId() string {
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		return ""
	}
	return hex.EncodeToString(b)
}

mod_tag

这个也不知道是干啥的,在请求上添加了一个tag

func (m *ModuleTag) init() error {
    // 加载数据文件
	_, err = m.loadRuleData(nil)
    // 注册回调点
	err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.tagHandler)
	return nil
}

func (m *ModuleTag) tagHandler(request *bfe_basic.Request) (int, *bfe_http.Response) {
	rules, ok := m.ruleTable.Search(request.Route.Product)
	for _, rule := range rules {
		if rule.Cond.Match(request) {
            //在请求对象上加上tags
			request.AddTags(rule.Param.TagName, []string{rule.Param.TagValue})
		}
	}

	return bfe_module.BfeHandlerGoOn, nil
}

// 具体的实现逻辑
func (req *Request) AddTags(name string, ntags []string) {
	tags := req.Tags.TagTable[name]
	tags = append(tags, ntags...)
	req.Tags.TagTable[name] = tags
}

mod_trace

这个还是比较有用的,用来追踪某个Product(租户)的耗时,但是需要配置额外的trace agent ,比如Zipkin,Jaeger, elasticsearch等服务,我没配置过,所以只过一下代码主流程。

func (m *ModuleTrace) init() error {
	// 初始化trace对象
	globalTrace, err = trace.NewTrace(conf.Basic.ServiceName, conf.GetTraceConfig())
    // 
	_, err = m.loadRuleData(nil)

    // 分别在发现产品后和结束请求后
	err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.startTrace)
	err = cbs.AddFilter(bfe_module.HandleRequestFinish, m.finishTrace)


	return nil
}

mod_access

这个是用来配置请求日志的,也略过。

mod_prison

这个模块比较有用,但是官方的配置参数有一些错误,下面是一个正确的配置文件

{
	"Version": "20190101000000",
	"Config": {
		"youerning_product": [{
			"Name": "youerning_prison",
			"Cond": "req_path_prefix_in(\"/prison\", false)",
			"accessSignConf": {
				"UseClientIP": false,
				"UseSocketIP": false,
				"UseConnectID": false,
				"UseUrl": false,
				"UseHost": false,
				"UsePath": false,
				"UseHeaders": false,
				"UrlRegexp": false,
				"Query": [],
				"Header": [],
				"Cookie": []
			},
			"action": {
				"cmd": "CLOSE",
				"params": []
			},
			"checkPeriod": 1,
			"stayPeriod": 10,
			"threshold": 2,
			"accessDictSize": 1000,
			"prisonDictSize": 1000
		}]
	}
}

官方代码仓库里的配置文件使用的字段是url, path等,这些参数不会解析成功。

测试如下:

 for i in `seq 1 4`;do curl "http://127.0.0.1:8080/prison" -H "Host: youerning.top" -i ;done

HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:41 GMT

ClusterB Backend1 /prison
HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:42 GMT

ClusterA Backend1 /prison
curl: (52) Empty reply from server
curl: (52) Empty reply from server

可以看到,处理前面两个请求成功之外,其他的请求都失败了,这是因为上面的配置参数是checkPeriod: 1, threshold:2, 意思是一秒钟超过两次请求就是直接关闭连接,关闭的持续时间是stayPeriod: 10, 也就是10秒。

一般来说,我们不会对某个地址直接限流,而是基于客户端IP, 所以我们可以配置clientIP: true启用基于客户端IP限流的功能。

值得注意的是, bfemod_prison模块使用的不是漏桶算法,所以checkPeriod设置的太大可能导致后端服务崩溃,即使是1秒,其实也有可能会崩溃,假设1秒分为5个区间,然后设置的阈值是10,当第一个区间请求来了1,那么第5个区间的最大请求量能到9,而下一秒开始的时候又能最大请求到10,也就说,最坏的情况下一秒的请求是可以超过设置的阈值的,并且接近阈值的两倍!!!。示意图如下

第一秒
[1,2,3,4,5]
 1,0,0,0,9
           第二秒
           [1, 2, 3, 4, 5]
            10,0, 0, 0, 0

所以在不自己开发一个其他的限流模块的情况下,不建议这个checkPeriod设置的太大。

最后来看看代码吧

func (m *ModulePrison) Init() error {
	// 1.
	confPath := bfe_module.ModConfPath(cr, m.name)
	conf, err := ConfLoad(confPath, cr)
	m.productConfPath = conf.Basic.ProductRulePath
	openDebug = conf.Log.OpenDebug

	// 2.
	if _, err := m.loadProductRuleTable(nil); err != nil {
		return fmt.Errorf("%s.Init():loadProductRuleTable(): %s", m.name, err.Error())
	}

	// 3. 
	err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.prisonHandler)

	return nil
}

func (m *ModulePrison) prisonHandler(req *bfe_basic.Request) (
	int, *bfe_http.Response) {
	// 4.
	product := bfe_basic.GlobalProduct
	ret, res := m.processProductRules(req, product)
	if ret != bfe_module.BfeHandlerGoOn {
		return ret, res
	}
	
	// 5.
	product = req.Route.Product
	ret, res = m.processProductRules(req, product)
	return ret, res
}

func (m *ModulePrison) processProductRules(req *bfe_basic.Request, product string) (int, *bfe_http.Response) {
	rules, ok := m.productTable.getRules(product)
	if !ok {
		if openDebug {
			log.Logger.Debug("product[%s] without prison rules, pass", product)
		}
		return bfe_module.BfeHandlerGoOn, nil
	}
	// 6.
	return m.processRules(req, rules)
}

func (m *ModulePrison) processRules(req *bfe_basic.Request, rules *prisonRules) (int, *bfe_http.Response) {
	for _, rule := range rules.ruleList {
		if !rule.cond.Match(req) {
			continue
		}
		// 7.        
		if !rule.recordAndCheck(req) {
			continue
		}
		// 如果被屏蔽了,就根据设置的action进行返回
		switch rule.action.Cmd {
		case action.ActionClose:
			req.ErrCode = ErrPrison
			return bfe_module.BfeHandlerClose, nil
		case action.ActionFinish:
			req.ErrCode = ErrPrison
			return bfe_module.BfeHandlerFinish, nil
		default:
			rule.action.Do(req)
		}
	}

	return bfe_module.BfeHandlerGoOn, nil
}

func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
    // 8.
	sign, err := r.accessSigner.Sign(r.condStr, req)

    // 9.
	if deny := r.shouldDeny(sign, req); deny {
		return deny
	}

    // 10
	r.recordAccess(sign)
	return r.shouldDeny(sign, req)
}

代码分解如下:

  1. 加载配置文件
  2. 加载数据文件,并生成规则列表
  3. 增加回调函数,因为这个模块是以Product租户为维度,所以放在找到Product租户之后,倒是没有问题,但是限流感觉放在HandleAccept阶段效率更高。
  4. 先尝试全局规则,没有的话
  5. 尝试匹配对应Product租户对应的规则
  6. 找到对应规则并开始匹配规则
  7. 记录并检查
  8. 获得一个标识用户的签名
  9. 用这个签名作为KEY来检查是否已经被屏蔽
  10. 没有屏蔽的话就增加计数再次检查一下。

下面看看签名的逻辑

func (s *AccessSigner) Sign(label string, req *bfe_basic.Request) (AccessSign, error) {
	// 1.从请求中生成要作为签名的数据
	data, err := s.prepareData(label, req)

	// 2. 基于上一步的数据使用md5算法计算出一个唯一标识
	return AccessSign(md5.Sum(data)), nil
}

func (s *AccessSigner) prepareData(label string, req *bfe_basic.Request) ([]byte, error) {
	var buf bytes.Buffer

	// label就是配置的Cond字段的值
	buildKeyValue(&buf, "label", label)
	// 如果启用clientIP签名,就将数据追加到这个buf里面
	if s.UseClientIP {
		if req.ClientAddr == nil {
			return nil, errors.New("request without client ip")

		}
		buildKeyValue(&buf, "clientIP", req.ClientAddr.IP.String())
	}
	// 和clientIP类似
	if s.UseUrl {
		buildKeyValue(&buf, "url", req.HttpRequest.RequestURI)
	}
    // 省略其他字段的设置
    return buf.Bytes(), nil
}

从上面的代码知道,如果我们的accessSignConf不启用任何其他标识的话,那么就是作用整个规则,也就是说不同的用户都受制于这个限流规则,一般情况下我们会至少加个UseClientIP, 然后可以根据自己的应用需求增加其他的字段标识用户。

当我们得到了标识请求的签名,就可以基于这个KEY来判断用户是否可以继续访问了。


func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
	sign, err := r.accessSigner.Sign(r.condStr, req)
	if deny := r.shouldDeny(sign, req); deny {
		return deny
	}
	r.recordAccess(sign)
	return r.shouldDeny(sign, req)
}

func (r *prisonRule) shouldDeny(sign AccessSign, req *bfe_basic.Request) bool {
    // 1.
	freeTimeNs, ok := r.prisonDict.Get(sign)
	if !ok {
		return false
	}

	// 2.
	if time.Now().UnixNano() < freeTimeNs.(int64) {
		prisonInfo := &PrisonInfo{
			PrisonType: ModPrison,
			PrisonName: r.name,
			FreeTime:   time.Unix(0, freeTimeNs.(int64)),
			IsExpired:  false,
			Action:     r.action.Cmd,
		}
		req.SetContext(ReqCtxPrisonInfo, prisonInfo)
		return true
	}

	// remove prison record if expired
    // 3.
	r.prisonDict.Del(sign)
	return false
}


func (r *prisonRule) recordAccess(sign AccessSign) {
	var f *AccessCounter
    // 4.
	value, ok := r.accessDict.Get(sign)
	if !ok {
		f = NewAccessCounter()
		r.accessDict.Add(sign, f)
	} else {
		f = value.(*AccessCounter)
	}

    // 5.
	if block, restTimeNs := f.IncAndCheck(r.checkPeriodNs, r.threshold); block {
		freeTimeNs := r.stayPeriodNs + restTimeNs + time.Now().UnixNano()
		r.prisonDict.Add(sign, freeTimeNs)
		r.accessDict.Del(sign)
	}
}

func (c *AccessCounter) IncAndCheck(checkPeriodNs int64, threshold int32) (bool, int64) {
    // 6.
	now := time.Now().UnixNano()
	stime := atomic.LoadInt64(&c.startTime)
	if stime+checkPeriodNs < now { // reset count
		c.reset()
	}

    // 7.
	count := atomic.AddInt32(&c.count, 1)

    // 8.
	stime = atomic.LoadInt64(&c.startTime)
	return count > threshold, stime + checkPeriodNs - now
}

代码分解如下:

  1. 查看是否已经在屏蔽的列表里面,如果没有直接返回false
  2. 检查对应的记录的释放时间, 如果小于要释放的时间点,说明还没解禁,那么返回true,说明需要屏蔽
  3. 如果到了这里,说明该记录已经到了该释放的时间点,所以删除对应的记录
  4. 尝试记录新来的请求
  5. 增加计数并检查是否达到阈值,如果block为true则增加到屏蔽列表中, 并从访问列表中删除。
  6. 检查记录的最开始访问时间是否已经超过检查时间间隔,如果是,则重新计数
  7. 增加计数,**注意: 为了高并发需要使用atomic.AddInt32**方法,这样能保证原子性。
  8. 检查计数器,计算到下一个间隔点的剩余时间,用于设置释放时间。

mod_prison有两个LRUCache对象, accessDict用于存储所有请求信息,prisonDict用于存储被限流的信息。

为什么用LRUCache这样的缓存对象呢?

因为随着请求的不断积累,内存会无限膨胀,所以需要限制记录的数据大小,官方代码仓库的配置是1000, 即"accessDictSize": 1000,"prisonDictSize": 1000两个配置参数,大家可以根据自己的实际情况配置,一个AccessCounter占用内存14字节, 1000 * 14差不多13KB 左右, 如果算上底层存储的interface{}, list.Element的抽象可能差不多56字节一条记录,10MB内存可以记录差不多1872457条记录,所以配置的时候可以适当设置大一点,不用担心。

参考链接

总结

很多模块都用到了两个通用的部分CondAction, 前者用于匹配请求或者响应中的条件,在BFE的官方文档称之为原语,这个需要查阅官方文档或者源代码,大多数原语的命名都是比较直观的,所以这个不会过于深入,Action则是一个动作,当条件匹配之后可以执行,比如拒绝。

还有就是很多模块都支持global的设置,即对所有产品生效,这个做全局配置很有用。

参考链接