专业编程基础技术教程

网站首页 > 基础教程 正文

“流量整形”干货!限流限频技术方案总结

ccvgpt 2024-10-12 13:48:10 基础教程 8 ℃

作者:国利鹏,腾讯CSIG后台开发工程师

限流限频的技术方案和探讨

  • 常见方案:
  • 简单计数器
  • 滑动窗口
  • 漏桶
  • 令牌桶 ...
  • 单机
  • 分布式

思考


首先,先引入一个 "流量整形" 的概念

“流量整形”干货!限流限频技术方案总结

是一种控制网络数据包传输的技术,通过控制数据速率使数据较为均匀发送。流量整形可以一定程度减少网络拥塞,并减弱突发流量带来的影响

流量整形的作用

流量控制在网络传输中是一个常用的概念,它用于调整网络包的发送数据。然而,从系统稳定性角度考虑,在处理请求的速度上,也有非常多的讲究。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。

流量整形作为一个调配器,可以根据需要把随机的请求调整成合适的形状,如下图所示:

流量控制的角度

  • 资源的调用关系,例如资源的调用链路,资源和资源之间的关系
  • 运行指标,例如 QPS、线程池、系统负载等

控制的效果,例如直接限流、排队等

而限流限频,简单来说就是实现流量整形一种方式,不同算法也只是实现方法的大同小异

简单计数器

基本方法,单位时间内累加计数器,超过计数器允许值就拦截请求,每次到单位时间就重置计数器

分布式经常使用 redis 来解决

产生问题: redis 具有原子性,但是其他操作(多条语句处理)并不具备原子性

key := getRedisKey()
if redis.Exist(key) {
   // 当代码运行到 A 位置,缓存过期,那么 IncrBy 会产生一个永久的缓存,导致限频失败
count := redis.IncrBy(key, 1) // A
   if count > limit {
    return false
   } 
} else {
   // 当多个请求同时到达 B 位置缓存被重置导致缓存与实际效果有偏差
   redis.SetEx(key, duration, 1) // B 
}
return true

解决方案:

  • redis multi,lua

**代价: **实现细节麻烦,编码应该注重逻辑,减少控制

  • 利用 redis 自身的特性,将判断是否超限的简化为一条,利用 redis 自身的特性

**代价: ** redis 存储的翻倍使用,均匀请求时,key 存活时间是 duration 的两倍

10:00 - 10:05 // 五分钟内都只有这个 key
10:00 - 10:05 10:05-10:10 // 五分种开始和结束访问两次,key 一共存活了 10 min

限频粒度较大,会产生毛刺
// 将限频的时间点加入 key
key = key + (now + duration)
redis.Expire(key, duration) // 只是用来清理残留 key,是否原子性都不影响限频逻辑
if (redis.IncrBy(key, 1) > limit) { // 实际业务只要留有 incr 就可保证原子性
   return false
}

单机计数器(go 原生) github: https://github.com/afex/hystrix-go/blob/master/hystrix/rolling/rolling.go


滑动窗口

思路和计数器基本一致,只是将限频的时间区进一步细分向前逐格子滑动

产生问题:

  • 滑动窗口的问题在于用存储换精度,限流值较大时会消耗大量存储空间,且需要 Multi/Lua 的保障原子性以及 Pipeline 提升效率
  • 限流需求中存在一个滑动的时间窗口
  • 性能问题(事务) 因为这几个连续的操作都是针对同一个 key,使用 pipeline 可以提升 redis 存取效率,但是要记录时间 窗口内所有的行为记录,如果这个量很大,比如 1min 内不能超过 10w, 100w 这种量级的参数,那么他就不适合当作限频方案,因为会消耗大量空间

解决方案:

  • 使用 redis 事务
  • 空间换精度
// 集合的成员为时间戳
// 配合 redis 事务

import (
   "xxx/log"
   "github.com/go-redis/redis"
   "strconv"
   "time"
)

func IsActionAllow(key string, seconds time.Duration, maxCount int) (bool, error) {
   var funcName = "IsActionAllow"

   // 限频单位时间(毫秒)
   period := seconds.Milliseconds()

   // 当前毫秒
   milliSecond := time.Now().UnixNano() / 1e6

   // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
   pipeline := RedClient.TxPipeline()
   pipeline.ZRemRangeByScore(key, strconv.Itoa(0), strconv.FormatInt(milliSecond- period * 1000, 10))
   zRange := pipeline.ZRange(key, 0, -1) // 获取窗口数据
   // score 值比较重要,Member 值没有特殊意义
   z := redis.Z{
    Score:  float64(milliSecond),
    Member: milliSecond,
   }
   pipeline.ZAdd(key, z)
   // 如果 key 是冷数据,滑动窗口内行为是空记录,则 zset 从内存中移除
   pipeline.Expire(key, seconds)

   if _, err := pipeline.Exec(); err != nil {
    log.Err(funcName + "fail").Error(err).Str("key", key)
    return false, err
   } else {
    count := zRange.Val()
    if len(count) > maxCount {
     return false, nil
    } else {
     return true, nil
    }
   }
}

漏桶算法

相当于注水和漏水过程: 以一定的速率流出水,以任意的速率流入水,水超过桶流量则丢弃

产生问题:

没有办法有效的使用网络资源,对处理突发的特性流量缺乏效率

一般都使用漏桶和令牌桶结合

令牌桶算法

除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输

异同

  • 前者主要控制数据注入网络的速率,平滑网络上突发流量,突发流量可以被整性为稳定流量

后者除了控制发送到网络的数据数目,还允许突发流量的发送

  • 前者是强行限制传输速率

后者除了限制数据的平均传输速率外,还允许一定程度上的突发传输(只要令牌通中有令牌,就允许突发传输数据直到用户配置的门槛,适用于突发的特性流量)

方案

Java

guava 包(https://github.com/google/guava)

go

基于 go 的令牌桶算法

  1. 三种策略: Allow,Wait,Reserve
  2. 动态调整桶大小和速率
  3. 自定义增加 token 的时间间隔
  • Allow 返回 true/false
// rateAllow()

func rateAllow() {
 limiter := rate.NewLimiter(10, 100)

 for i := 0; i < 20; i++ {
  if limiter.AllowN(time.Now(), 25) {
   fmt.Printf("%03d OK %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
  } else {
   fmt.Printf("%03d Err %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
  }
  time.Sleep(500 * time.Millisecond)
 }
}
  • wait 获取不到则会阻塞,直到获取成功,或者 timeout
func rateWait() {
 limiter := rate.NewLimiter(1, 5)
 ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 defer cancel()

 for i := 0; ; i++ {
  fmt.Printf("%03d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))

  err := limiter.WaitN(ctx, 2)
  if err != nil {
   fmt.Printf("timeout: %s\n", err.Error())
   return
  }
 }

 fmt.Println("main")
}
  • reserve 返回还需要多久才能获取到的时间 (自己进行主动 sleep /或者返还令牌)
func rateReserve() {
 limiter := rate.NewLimiter(3, 5)

 // 动态修改桶大小
 limiter.SetBurst(100)
 // 动态修改生成令牌速率
 // limiter.SetLimit(1)

 ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5)
 defer cancel()
 for i := 0; ; i++ {
  fmt.Printf("%3d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
  reserveN := limiter.ReserveN(time.Now(), 4)
  if !reserveN.OK() {
   // 返回异常
   fmt.Println("Not allowed to act! Did you remember to set lim.burst to be  0?")
  }
  delay := reserveN.Delay()
  fmt.Println("sleep delay ", delay)
  time.Sleep(delay)

  select {
  case <-ctx.Done():
   fmt.Println("timeout, quit")
   return
  default:
  }
  // TODO 业务
 }

 fmt.Println("main")
}

其他方案

redis-cell

漏桶限流功能已经再 Redis4.0 的 Redis-Cell 支持,lua 脚本可以很方便的进行漏桶分布式限流

github rate by redis

第三方库 github.com/wallstreetcn/rate 实现的分布式令牌桶算法

目前项目中引入使用

  • 修改客户端为集群版
  • 解决集群模式下进行多密钥操作时候出现的问题(CROSSSLOT Keys in request don't hash to the same slot) **原因: ** 实现代码中,EvalSha 操时候,密钥再同一节点,但是却未将密钥哈希至相同的槽中 使用哈希标签将密钥强制放入同一哈希槽

sentinel go

Alibaba 开源项目 Sentinel Go https://github.com/alibaba/Sentinel

2020 年,推出 Sentinel Go 版本

Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。

  • 优势 秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据整合较多业界开源库,例如与 Spring Cloud、Dubbo、gRPC 的整合。提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等

以资源作为维度进行限流,例如接口,URL,甚至服务

同时支持流控规则中的 limitApp 字段用于根据调用来源进行流量控制,无法支持海量的 limitApp。

  • 流量控制策略 流量控制器中 Token 计算分为两种 Direct表示直接使用规则中的 Threshold 表示当前统计周期内的最大Token数量 WarmUp表示通过预热的方式计算当前统计周期内的最大Token数量,预热的计算方式会根据规则中的字段 WarmUpPeriodSec 和 WarmUpColdFactor 来决定预热的曲线
  • 流控算法 Reject:表示如果当前统计周期内,统计结构统计的请求数超过了阈值,就直接拒绝。Throttling:表示匀速排队的统计策略。它的中心思想是,以固定的间隔时间让请求通过。当请求到来的时候,如果当前请求距离上个通过的请求通过的时间间隔不小于预设值,则让当前请求通过;否则,计算当前请求的预期通过时间,如果该请求的预期通过时间小于规则预设的 timeout 时间,则该请求会等待直到预设时间到来通过(排队等待处理);若预期的通过时间超出最大排队时长,则直接拒接这个请求。 匀速排队方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法

**使用: **

  • 开始 对 Sentinel 进行相关配置并进行初始化 import (
    sentinel
    "github.com/sentinel-group/sentinel-golang/api"
    )

    func initSentinel() {
    err := sentinel.InitWithLogDir(confPath, logDir)
    if err != nil {
    // 初始化 Sentinel 失败
    }
    }
    埋点(资源定义)埋点 API 位于 api 包中: Entry(resource string, opts …Option) (base.SentinelEntry, base.BlockError) 其中 resource 代表埋点资源名, opts 代表埋点配置。目前支持以下埋点配置: WithTrafficType(entryType base.TrafficType):标记该埋点资源的流量类型,其中 Inbound 代表入口流量,Outbound 代表出口流量。若不指定,默认为 Outbound WithResourceType(resourceType base.ResourceType):标记该埋点资源的分类 WithAcquireCount(acquireCount uint32):标记每次触发该埋点计为几次调用(可以理解为 batch count)。若不指定,默认为 1 WithArgs(args …interface{}):埋点携带的参数列表,为热点参数统计预留 entry import (
    sentinel
    "github.com/sentinel-group/sentinel-golang/api"
    )

    // Entry 方法用于埋点
    e, b := sentinel.Entry(
    "your-resource-name", sentinel.WithTrafficType(base.Inbound))
    if b != nil {
    // 请求被流控,可以从 BlockError 中获取限流详情
    }
    else {
    // 请求可以通过,在此处编写您的业务逻辑
    // 务必保证业务逻辑结束后 Exit
    e.Exit()
    }
    规则配置 _, err := flow.LoadRules([]*flow.FlowRule{
    {
    ID:
    666,
    Resource:
    "some-resource",
    MetricType: flow.QPS,
    Count:
    10,
    ControlBehavior: flow.Reject,
    },
    })
    if err != nil {
    // 加载规则失败,进行相关处理
    }
  • 案例 package main
    import (
    "fmt"
    sentinel
    "github.com/alibaba/sentinel-golang/api"
    "github.com/alibaba/sentinel-golang/core/flow"
    "github.com/alibaba/sentinel-golang/util"
    "log"
    "math/rand"
    "time"
    )
    func main() {
    // 务必先进行初始化
    err := sentinel.InitDefault()
    if err != nil {
    log.Fatal(err)
    }
    // 配置一条限流规则
    _, err = flow.LoadRules([]*flow.Rule{
    {
    Resource:
    "some-test",
    Threshold:
    2,
    TokenCalculateStrategy: flow.Direct,
    ControlBehavior: flow.Reject,
    },
    })
    if err != nil {
    fmt.Println(err)
    return
    }
    ch :=
    make(chan struct{})
    for i := 0; i < 10; i++ {
    go func() {
    //for {
    // 埋点逻辑,埋点资源名为 some-test
    e, b := sentinel.Entry(
    "some-test")
    if b != nil {
    // 请求被拒绝,在此处进行处理
    fmt.Println(util.CurrentTimeMillis(),
    "block")
    time.Sleep(time.Duration(rand.Uint64() %
    10) * time.Millisecond)
    }
    else {
    // 请求允许通过,此处编写业务逻辑
    fmt.Println(util.CurrentTimeMillis(),
    "Passed")
    time.Sleep(time.Duration(rand.Uint64() %
    10) * time.Millisecond)
    // 务必保证业务结束后调用 Exit
    e.Exit()
    }
    //}
    }()
    }
    <-ch
    }
    日志

结论: 配置规则 Threshold 为 2

通过一次性创建 10 个协程并发进行资源埋点

可以看到日志中成功数目请求为 2,其他的请求全都返回 blockError

思考

  1. 对于业务来说,限频只是兜底,一般敏感性的业务再调用过程中,限频服务出现问题时候,可以根据具体的业务类型判断是统一限制或是放行,或者增加备选限频方案 redis 使用多个限流服务(主备复制) 等措施

并且限频服务出现问题时,一定要做好业务异常的通知和上报,避 免出现由于未限频而导致的损失。

  1. 业务中的限频维度复杂,所以限频的 key 设定时候一定要带有 applicationId, mobile, interface 等描述业务场景,根据这些维度做好限频
  2. 限频应该作为基础服务,提供给应用接口使用,做好控制和业务的剥离

最近发表
标签列表