百度开源网关BFE源代码阅读4之模块
据我所知,所有开源负载均衡都会提供至少一种扩展机制,BFE
也不例外,BFE
通过模块的选择可以更精细的控制BFE
在处理请求中的各个阶段。如果内置模块不能满足自己需求,那么可以自己开发模块,而BFE
是用Golang
写的,所以开发效率很高。
下面是BFE
各个回调点的位置。
BFE
的模块和其他开源产品的一个很大的不同点是,BFE
基于Product
(租户)维度控制,而其他大多数产品都是基于Host
(域名)。
初始化
首先看看BFE
是怎么加载和启用模块的。
func StartUp(cfg bfe_conf.BfeConfig, version string, confRoot string) error {
var err error
// 1.
bfe_modules.SetModules()
// 2.
if err = bfeServer.RegisterModules(cfg.Server.Modules); err != nil {
log.Logger.Error("StartUp(): RegisterModules():%s", err.Error())
return err
}
// 3.
if err = bfeServer.InitModules(); err != nil {
log.Logger.Error("StartUp(): bfeServer.InitModules():%s",
err.Error())
return err
}
}
// 4.
func SetModules() {
for _, module := range moduleList {
bfe_module.AddModule(module)
}
}
// 5.
func (srv *BfeServer) RegisterModules(modules []string) error {
for _, moduleName := range modules {
moduleName = strings.TrimSpace(moduleName)
if err := srv.Modules.RegisterModule(moduleName); err != nil {
return err
}
}
return nil
}
// 6.
func (bm *BfeModules) RegisterModule(name string) error {
module, ok := moduleMap[name]
bm.workModules[name] = module
return nil
}
// 7.
func (srv *BfeServer) InitModules() error {
return srv.Modules.Init(srv.CallBacks, srv.Monitor.WebHandlers, srv.ConfRoot)
}
func (bm *BfeModules) Init(cbs *BfeCallbacks, whs *web_monitor.WebHandlers, cr string) error {
// 8.
for _, name := range modulesAll {
module, ok := bm.workModules[name]
if ok {
err := module.Init(cbs, whs, cr)
modulesEnabled = append(modulesEnabled, name)
}
}
return nil
}
代码分解如下:
- 加载
BFE
代码中所有集成的模块 - 基于配置文件中的
Modules
参数依次尝试注册模块 - 初始化所有注册的模块
- 第一步的具体实现,就是简单的追加切片
- 第二步的具体实现,将模块注册到
BfeServer
对象上 - 上一步的具体实现
- 第三步的具体实现
- 注册时要按代码中定义的顺序来初始化模块,这很重要, 因为模块时按注册的顺序依次执行的。
从上面的的代码可以看到,模块的初始化主要分为三个部分,加载所有可用的模块,注册用户配置的模块,将注册的模块依次初始化,也就是调用其对应的Init
方法,BFE
的模块初始化流程还是比较简单明了的。
常用模块
下面开始看常见模块的初始化逻辑。
- mod_trust_clientip
- mod_block
- mod_header
- mod_rewrite
- mod_redirect
- mod_logid
- mod_tag
- mod_trace
- mod_access
- mod_prison
根据前面的代码,我们可以直接找模块的Init
方法。
mod_trust_clientip
这个模块用于设置请求的客户端IP是否是可信的,可信的IP段由用户配置。
// 1.
func (m *ModuleTrustClientIP) Init() error {
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleTrustClientIP) init(cfg *ConfModTrustClientIP, cbs *bfe_module.BfeCallbacks,
whs *web_monitor.WebHandlers) error {
m.configPath = cfg.Basic.DataPath
// 2.
m.trustTable = ipdict.NewIPTable()
// 3.
if err := m.loadConfData(nil); err != nil {
return fmt.Errorf("err in loadConfData(): %s", err.Error())
}
// 4.
err := cbs.AddFilter(bfe_module.HandleAccept, m.acceptHandler)
// 5.
err = web_monitor.RegisterHandlers(whs, web_monitor.WebHandleMonitor, m.monitorHandlers())
// 6.
err = whs.RegisterHandler(web_monitor.WebHandleReload, m.name, m.loadConfData)
return nil
}
func (m *ModuleTrustClientIP) acceptHandler(session *bfe_basic.Session) int {
m.state.ConnTotal.Inc(1)
trusted := m.trustTable.Search(session.RemoteAddr.IP)
if trusted {
m.state.ConnTrustClientip.Inc(1)
}
// 7.
session.SetTrustSource(trusted)
// state for internal remote ip
if session.RemoteAddr.IP.IsPrivate() {
m.state.ConnAddrInternal.Inc(1)
if !trusted {
m.state.ConnAddrInternalNotTrust.Inc(1)
}
}
return bfe_module.BfeHandlerGoOn
}
代码分解如下:
- 入口函数
- 构造可以查询客户端IP的对象
- 加载配置文件及数据文件
- 将
m.acceptHandler
注册到回调函数中 - 注册监控的回调函数,用于展示内部数据
- 注册
reload
时被调用的函数 - 当客户端在配置的IP地址段中,只是简单的设置一个
true
。
这个模块并不会拒绝请求,它只是在请求的Session
(会话)对象的isTrustSource
z字段设置一个true
。这个布尔值只有在条件匹配时使用条件原语req_cip_trusted
才会应用。
mod_block
这个模块主要用于屏蔽指定的IP段,以及根据条件原语来屏蔽。为了更好理解看看它的测试配置文件
{
"Version": "init version",
"Config": {
"example_product": [
{
"action": {
"cmd": "CLOSE",
"params": []
},
"name": "example rule",
"cond": "req_path_in(\"/limit\", false)"
}
]
}
}
这个配置文件用于屏蔽匹配的规则
192.168.1.253 192.168.1.254
192.168.1.250
192.168.1.250/20
这个配置文件用于指定要屏蔽的IP段
最后看看代码
func (m *ModuleBlock) Init() error {
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
// 2.
if err = m.loadGlobalIPTable(nil); err != nil {
return fmt.Errorf("%s: loadGlobalIPTable() err %s", m.name, err.Error())
}
// 3.
if err = m.loadProductRuleConf(nil); err != nil {
return fmt.Errorf("%s: loadProductRuleConf() err %s", m.name, err.Error())
}
// 4.
err = cbs.AddFilter(bfe_module.HandleAccept, m.globalBlockHandler)
// 5.
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.productBlockHandler)
return nil
}
// 6.
func (m *ModuleBlock) globalBlockHandler(session *bfe_basic.Session) int {
clientIP := session.RemoteAddr.IP
if m.ipTable.Search(clientIP) {
session.SetError(ErrBlock, "connection blocked")
return bfe_module.BfeHandlerClose
}
return bfe_module.BfeHandlerGoOn
}
// 7.
func (m *ModuleBlock) productBlockHandler() {
// 8.
rules, ok := m.ruleTable.Search(bfe_basic.GlobalProduct)
if ok { // rules found
retVal, isMatch, resp := m.productRulesProcess(request, rules)
if isMatch {
return retVal, resp
}
}
// 9.
rules, ok = m.ruleTable.Search(request.Route.Product)
if !ok {
return bfe_module.BfeHandlerGoOn, nil
}
retVal, isMatch, resp := m.productRulesProcess(request, rules)
if !isMatch {
m.state.ReqAccept.Inc(1)
}
return retVal, resp
}
代码分解如下:
- 加载配置文件
- 构造全局的IP屏蔽规则
- 加载按
Product
租户分类的屏蔽的规则 - 注册回调函数
globalBlockHandler
- 注册回调函数
productBlockHandler
- 第4步的具体实现,如果客户端IP在设定的IP段中就屏蔽
- 第5步的具体实现
- 看看有没有叫做
global
的product
租户名,这个特殊的租户名会全局生效 - 看看有没有对应的产品规则,没有的话直接放行
mod_header
这个模块主要使用用来修改头信息,为了更好理解看看它的测试配置文件
{
"Version": "init version",
"Config": {
"example_product": [
{
"cond": "req_path_prefix_in(\"/header\", false)",
"actions": [
{
"cmd": "RSP_HEADER_SET",
"params": [
"X-Proxied-By",
"bfe"
]
}
],
"last": true
}
]
}
}
配置的意思就是, 当请求的路径前缀包含/header
, 就设置X-Proxied-By: bfe
这样的HTTP头信息。
func (m *ModuleHeader) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleHeader) init() error {
// 2.
if err := m.loadConfData(nil); err != nil {
return fmt.Errorf("err in loadConfData(): %s", err.Error())
}
// 3.
err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.reqHeaderHandler)
err = cbs.AddFilter(bfe_module.HandleReadResponse, m.rspHeaderHandler)
return nil
}
代码分解如下:
- 加载配置文件
- 加载数据文件
- 注册请求前和响应后的回调函数
这个模块的功能逻辑比较直观,具体的实现代码就不看了。
mod_rewrite
这个模块用来改写请求,可以修改三部分的数据,源码如下:
var allowActions = map[string]interface{}{
// host actions
action.ActionHostSetFromPathPrefix: nil, // set host from path prefix
action.ActionHostSet: nil, //set host
action.ActionHostSuffixReplace: nil, // replace host suffix
// path actions
action.ActionPathSet: nil, // set path
action.ActionPathPrefixAdd: nil, // add path prefix
action.ActionPathPrefixTrim: nil, // trim path prefix
// query actions
action.ActionQueryAdd: nil, // add query
action.ActionQueryDel: nil, // del query
action.ActionQueryRename: nil, // rename query
action.ActionQueryDelAllExcept: nil, // del query except given query key
}
根据名字应该就能看出来, Host, Path, Query三个字段的值。
下面是一个示例配置文件
{
"Version": "init version",
"Config": {
"youerning_product": [
{
"Cond": "req_path_prefix_in(\"/rewrite\", false)",
"Actions": [
{
"Cmd": "PATH_PREFIX_ADD",
"Params": [
"/bfe/"
]
}
],
"Last": true
}
]
}
}
上面配置的意思是,在URL路径上加上/bfe/
, 如果路径的前缀是/rewrite
的话。
测试如下:
curl "http://127.0.0.1:8080/rewrite" -H "Host: youerning.top"
ClusterB Backend1 /bfe/rewrite
其实模块的初始化代码都差不多,后面都会简单的过一下初始化过程,主要是看看它的回调点在哪。
func (m *ModuleReWrite) Init() error {
// 加载配置文件
confPath := bfe_module.ModConfPath(cr, m.name)
if conf, err = ConfLoad(confPath, cr); err != nil {
return fmt.Errorf("%s: conf load err %s", m.name, err.Error())
}
return m.init(conf, cbs, whs)
}
func (m *ModuleReWrite) init() error {
// HandleAfterLocation的回调会被调用
err := cbs.AddFilter(bfe_module.HandleAfterLocation, m.rewriteHandler)
return nil
}
mod_redirect
这个模块的名字也很直观,就是重定向,我们通过配置文件来学习。
{
"Version": "init version",
"Config": {
"youerning_product": [
{
"Cond": "req_path_prefix_in(\"/redirect\", false)",
"Actions": [
{
"Cmd": "URL_SET",
"Params": ["http://127.0.0.1:18002"]
}
],
"Status": 301
}
]
}
}
上面配置的意思是,重定向到http://127.0.0.1:18002
, 如果路径的前缀是/redirect
的话。
测试如下:
curl "http://127.0.0.1:8080/redirect" -H "Host: youerning.top" -i -L
HTTP/1.1 301 Moved Permanently
Location: http://127.0.0.1:18002
Server: bfe
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 57
Content-Type: text/html; charset=utf-8
HTTP/1.1 200 OK
Date: Tue, 05 Sep 2023 01:40:53 GMT
Content-Length: 20
Content-Type: text/plain; charset=utf-8
ClusterA Backend2 /
可以看到,最先得到的响应时301,被重定向的位置是http://127.0.0.1:18002
mod_logid
这个模块用来生成一个随机的日志ID, 不需要配置文件,暂时不知道干啥的。
func (m *ModuleLogId) Init(cbs *bfe_module.BfeCallbacks, whs *web_monitor.WebHandlers,
cr string) error {
// 分别在HandleAccept, HandleBeforeLocation创建logid
err := cbs.AddFilter(bfe_module.HandleAccept, m.sessionIdHandler)
err = cbs.AddFilter(bfe_module.HandleBeforeLocation, m.requestIdHandler)
return nil
}
func (m *ModuleLogId) sessionIdHandler(session *bfe_basic.Session) int {
// 在session上设置logid
session.SessionId = genLogId()
return bfe_module.BfeHandlerGoOn
}
func (m *ModuleLogId) requestIdHandler() {
// 在HTTP头 X-Bfe-Log-Id 上设置logid
req.LogId = genLogId()
req.HttpRequest.Header.Set(bfe_basic.HeaderBfeLogId, req.LogId)
return bfe_module.BfeHandlerGoOn, nil
}
// 随机生成logid
func genLogId() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return ""
}
return hex.EncodeToString(b)
}
mod_tag
这个也不知道是干啥的,在请求上添加了一个tag
func (m *ModuleTag) init() error {
// 加载数据文件
_, err = m.loadRuleData(nil)
// 注册回调点
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.tagHandler)
return nil
}
func (m *ModuleTag) tagHandler(request *bfe_basic.Request) (int, *bfe_http.Response) {
rules, ok := m.ruleTable.Search(request.Route.Product)
for _, rule := range rules {
if rule.Cond.Match(request) {
//在请求对象上加上tags
request.AddTags(rule.Param.TagName, []string{rule.Param.TagValue})
}
}
return bfe_module.BfeHandlerGoOn, nil
}
// 具体的实现逻辑
func (req *Request) AddTags(name string, ntags []string) {
tags := req.Tags.TagTable[name]
tags = append(tags, ntags...)
req.Tags.TagTable[name] = tags
}
mod_trace
这个还是比较有用的,用来追踪某个Product
(租户)的耗时,但是需要配置额外的trace agent
,比如Zipkin
,Jaeger
, elasticsearch
等服务,我没配置过,所以只过一下代码主流程。
func (m *ModuleTrace) init() error {
// 初始化trace对象
globalTrace, err = trace.NewTrace(conf.Basic.ServiceName, conf.GetTraceConfig())
//
_, err = m.loadRuleData(nil)
// 分别在发现产品后和结束请求后
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.startTrace)
err = cbs.AddFilter(bfe_module.HandleRequestFinish, m.finishTrace)
return nil
}
mod_access
这个是用来配置请求日志的,也略过。
mod_prison
这个模块比较有用,但是官方的配置参数有一些错误,下面是一个正确的配置文件
{
"Version": "20190101000000",
"Config": {
"youerning_product": [{
"Name": "youerning_prison",
"Cond": "req_path_prefix_in(\"/prison\", false)",
"accessSignConf": {
"UseClientIP": false,
"UseSocketIP": false,
"UseConnectID": false,
"UseUrl": false,
"UseHost": false,
"UsePath": false,
"UseHeaders": false,
"UrlRegexp": false,
"Query": [],
"Header": [],
"Cookie": []
},
"action": {
"cmd": "CLOSE",
"params": []
},
"checkPeriod": 1,
"stayPeriod": 10,
"threshold": 2,
"accessDictSize": 1000,
"prisonDictSize": 1000
}]
}
}
官方代码仓库里的配置文件使用的字段是url
, path
等,这些参数不会解析成功。
测试如下:
for i in `seq 1 4`;do curl "http://127.0.0.1:8080/prison" -H "Host: youerning.top" -i ;done
HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:41 GMT
ClusterB Backend1 /prison
HTTP/1.1 200 OK
Content-Length: 26
Content-Type: text/plain; charset=utf-8
Date: Tue, 05 Sep 2023 03:19:42 GMT
ClusterA Backend1 /prison
curl: (52) Empty reply from server
curl: (52) Empty reply from server
可以看到,处理前面两个请求成功之外,其他的请求都失败了,这是因为上面的配置参数是checkPeriod: 1
, threshold:2
, 意思是一秒钟超过两次请求就是直接关闭连接,关闭的持续时间是stayPeriod: 10
, 也就是10秒。
一般来说,我们不会对某个地址直接限流,而是基于客户端IP, 所以我们可以配置clientIP: true
启用基于客户端IP限流的功能。
值得注意的是, bfe
的mod_prison
模块使用的不是漏桶算法,所以checkPeriod
设置的太大可能导致后端服务崩溃,即使是1秒,其实也有可能会崩溃,假设1秒分为5个区间,然后设置的阈值是10,当第一个区间请求来了1,那么第5个区间的最大请求量能到9,而下一秒开始的时候又能最大请求到10,也就说,最坏的情况下一秒的请求是可以超过设置的阈值的,并且接近阈值的两倍!!!。示意图如下
第一秒
[1,2,3,4,5]
1,0,0,0,9
第二秒
[1, 2, 3, 4, 5]
10,0, 0, 0, 0
所以在不自己开发一个其他的限流模块的情况下,不建议这个checkPeriod
设置的太大。
最后来看看代码吧
func (m *ModulePrison) Init() error {
// 1.
confPath := bfe_module.ModConfPath(cr, m.name)
conf, err := ConfLoad(confPath, cr)
m.productConfPath = conf.Basic.ProductRulePath
openDebug = conf.Log.OpenDebug
// 2.
if _, err := m.loadProductRuleTable(nil); err != nil {
return fmt.Errorf("%s.Init():loadProductRuleTable(): %s", m.name, err.Error())
}
// 3.
err = cbs.AddFilter(bfe_module.HandleFoundProduct, m.prisonHandler)
return nil
}
func (m *ModulePrison) prisonHandler(req *bfe_basic.Request) (
int, *bfe_http.Response) {
// 4.
product := bfe_basic.GlobalProduct
ret, res := m.processProductRules(req, product)
if ret != bfe_module.BfeHandlerGoOn {
return ret, res
}
// 5.
product = req.Route.Product
ret, res = m.processProductRules(req, product)
return ret, res
}
func (m *ModulePrison) processProductRules(req *bfe_basic.Request, product string) (int, *bfe_http.Response) {
rules, ok := m.productTable.getRules(product)
if !ok {
if openDebug {
log.Logger.Debug("product[%s] without prison rules, pass", product)
}
return bfe_module.BfeHandlerGoOn, nil
}
// 6.
return m.processRules(req, rules)
}
func (m *ModulePrison) processRules(req *bfe_basic.Request, rules *prisonRules) (int, *bfe_http.Response) {
for _, rule := range rules.ruleList {
if !rule.cond.Match(req) {
continue
}
// 7.
if !rule.recordAndCheck(req) {
continue
}
// 如果被屏蔽了,就根据设置的action进行返回
switch rule.action.Cmd {
case action.ActionClose:
req.ErrCode = ErrPrison
return bfe_module.BfeHandlerClose, nil
case action.ActionFinish:
req.ErrCode = ErrPrison
return bfe_module.BfeHandlerFinish, nil
default:
rule.action.Do(req)
}
}
return bfe_module.BfeHandlerGoOn, nil
}
func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
// 8.
sign, err := r.accessSigner.Sign(r.condStr, req)
// 9.
if deny := r.shouldDeny(sign, req); deny {
return deny
}
// 10
r.recordAccess(sign)
return r.shouldDeny(sign, req)
}
代码分解如下:
- 加载配置文件
- 加载数据文件,并生成规则列表
- 增加回调函数,因为这个模块是以
Product
租户为维度,所以放在找到Product
租户之后,倒是没有问题,但是限流感觉放在HandleAccept
阶段效率更高。 - 先尝试全局规则,没有的话
- 尝试匹配对应
Product
租户对应的规则 - 找到对应规则并开始匹配规则
- 记录并检查
- 获得一个标识用户的签名
- 用这个签名作为KEY来检查是否已经被屏蔽
- 没有屏蔽的话就增加计数再次检查一下。
下面看看签名的逻辑
func (s *AccessSigner) Sign(label string, req *bfe_basic.Request) (AccessSign, error) {
// 1.从请求中生成要作为签名的数据
data, err := s.prepareData(label, req)
// 2. 基于上一步的数据使用md5算法计算出一个唯一标识
return AccessSign(md5.Sum(data)), nil
}
func (s *AccessSigner) prepareData(label string, req *bfe_basic.Request) ([]byte, error) {
var buf bytes.Buffer
// label就是配置的Cond字段的值
buildKeyValue(&buf, "label", label)
// 如果启用clientIP签名,就将数据追加到这个buf里面
if s.UseClientIP {
if req.ClientAddr == nil {
return nil, errors.New("request without client ip")
}
buildKeyValue(&buf, "clientIP", req.ClientAddr.IP.String())
}
// 和clientIP类似
if s.UseUrl {
buildKeyValue(&buf, "url", req.HttpRequest.RequestURI)
}
// 省略其他字段的设置
return buf.Bytes(), nil
}
从上面的代码知道,如果我们的accessSignConf
不启用任何其他标识的话,那么就是作用整个规则,也就是说不同的用户都受制于这个限流规则,一般情况下我们会至少加个UseClientIP
, 然后可以根据自己的应用需求增加其他的字段标识用户。
当我们得到了标识请求的签名,就可以基于这个KEY来判断用户是否可以继续访问了。
func (r *prisonRule) recordAndCheck(req *bfe_basic.Request) bool {
sign, err := r.accessSigner.Sign(r.condStr, req)
if deny := r.shouldDeny(sign, req); deny {
return deny
}
r.recordAccess(sign)
return r.shouldDeny(sign, req)
}
func (r *prisonRule) shouldDeny(sign AccessSign, req *bfe_basic.Request) bool {
// 1.
freeTimeNs, ok := r.prisonDict.Get(sign)
if !ok {
return false
}
// 2.
if time.Now().UnixNano() < freeTimeNs.(int64) {
prisonInfo := &PrisonInfo{
PrisonType: ModPrison,
PrisonName: r.name,
FreeTime: time.Unix(0, freeTimeNs.(int64)),
IsExpired: false,
Action: r.action.Cmd,
}
req.SetContext(ReqCtxPrisonInfo, prisonInfo)
return true
}
// remove prison record if expired
// 3.
r.prisonDict.Del(sign)
return false
}
func (r *prisonRule) recordAccess(sign AccessSign) {
var f *AccessCounter
// 4.
value, ok := r.accessDict.Get(sign)
if !ok {
f = NewAccessCounter()
r.accessDict.Add(sign, f)
} else {
f = value.(*AccessCounter)
}
// 5.
if block, restTimeNs := f.IncAndCheck(r.checkPeriodNs, r.threshold); block {
freeTimeNs := r.stayPeriodNs + restTimeNs + time.Now().UnixNano()
r.prisonDict.Add(sign, freeTimeNs)
r.accessDict.Del(sign)
}
}
func (c *AccessCounter) IncAndCheck(checkPeriodNs int64, threshold int32) (bool, int64) {
// 6.
now := time.Now().UnixNano()
stime := atomic.LoadInt64(&c.startTime)
if stime+checkPeriodNs < now { // reset count
c.reset()
}
// 7.
count := atomic.AddInt32(&c.count, 1)
// 8.
stime = atomic.LoadInt64(&c.startTime)
return count > threshold, stime + checkPeriodNs - now
}
代码分解如下:
- 查看是否已经在屏蔽的列表里面,如果没有直接返回false
- 检查对应的记录的释放时间, 如果小于要释放的时间点,说明还没解禁,那么返回true,说明需要屏蔽
- 如果到了这里,说明该记录已经到了该释放的时间点,所以删除对应的记录
- 尝试记录新来的请求
- 增加计数并检查是否达到阈值,如果block为true则增加到屏蔽列表中, 并从访问列表中删除。
- 检查记录的最开始访问时间是否已经超过检查时间间隔,如果是,则重新计数
- 增加计数,**注意: 为了高并发需要使用
atomic.AddInt32
**方法,这样能保证原子性。 - 检查计数器,计算到下一个间隔点的剩余时间,用于设置释放时间。
mod_prison
有两个LRUCache
对象, accessDict
用于存储所有请求信息,prisonDict
用于存储被限流的信息。
为什么用LRUCache
这样的缓存对象呢?
因为随着请求的不断积累,内存会无限膨胀,所以需要限制记录的数据大小,官方代码仓库的配置是1000, 即"accessDictSize": 1000,"prisonDictSize": 1000
两个配置参数,大家可以根据自己的实际情况配置,一个AccessCounter
占用内存14字节, 1000 * 14
差不多13KB 左右, 如果算上底层存储的interface{}
, list.Element
的抽象可能差不多56字节一条记录,10MB内存可以记录差不多1872457条记录,所以配置的时候可以适当设置大一点,不用担心。
参考链接
- https://www.bfe-networks.net/zh_cn/development/module/bfe_callback/
- https://www.bfe-networks.net/zh_cn/condition/condition_grammar/
总结
很多模块都用到了两个通用的部分Cond
和Action
, 前者用于匹配请求或者响应中的条件,在BFE
的官方文档称之为原语,这个需要查阅官方文档或者源代码,大多数原语的命名都是比较直观的,所以这个不会过于深入,Action
则是一个动作,当条件匹配之后可以执行,比如拒绝。
还有就是很多模块都支持global
的设置,即对所有产品生效,这个做全局配置很有用。