31. 干货系列从零用Rust编写正反向代理,HTTP限流的实现(limit_req)

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

项目地址

国内: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

HTTP限流

HTTP限流是在HTTP请求处理过程中,对请求进行限制的一种技术手段。其目的是防止系统过载,保护系统的稳定性和可用性。HTTP限流可以基于不同的策略和方法,例如基于时间窗口、令牌桶、漏桶等。

常见的HTTP限流方法包括:

  • 基于时间窗口:这种方法将一段时间划分为若干个时间窗口,每个时间窗口内只允许一定数量的请求通过。例如,每秒只允许20个请求通过。
  • 令牌桶:令牌桶算法允许突发流量,只要有令牌就可以处理请求,当没有令牌时,请求就被拒绝。这种方法适用于处理突发流量的情况。和时间窗口结合时,如果当前时间段已经有20个请求,此时触发令牌桶brust,将当前的流量进行延时处理。
  • 漏桶:漏桶算法不允许突发流量,无论何时都只能按照一定的速率处理请求。这种方法适用于处理稳定流量的情况。

在进行HTTP限流时,需要考虑系统的实际情况和需求,选择合适的限流策略和方法。同时,还需要对系统的性能和负载进行充分的测试和评估,以确保系统的稳定性和可用性。

方案选择

在此项目中,选择的是基于时间窗口及令牌桶做组合使用进行限制,以下做个例子,配置

limit="rate=10r/s brust=10" 

效果将是每秒钟限制10条请求,可以允许突发的10个令牌桶做一秒的延时,在下一秒允许通行。

sequenceDiagram participant C participant S C->>S: 第一秒请求数据10条 Note right of S: 当前记录请求10条 S->>C: 返回成功 C->>S: 第一秒继续请求数据1条 Note right of S: 当前记录请求10条+1条令牌桶 S->>C: 延时一秒再进行后续处理返回成功 C->>S: 第一秒继续请求数据9条 Note right of S: 当前记录请求10条+10条令牌桶 S->>C: 延时一秒再进行后续处理返回成功 C->>S: 第一秒继续请求数据(1条-N条) Note right of S: 当前记录当前秒已满,返回拒绝 S->>C: 直接返回错误,频率过快,已超时 C-->>S: 第二秒请求数据1条 Note right of S: 清除第一秒的记录10条<br>10条令牌桶转化第二秒10条记录<br>故当前为请求数10条+1条令牌桶 S->>C: 延时一秒过时行后续处理返回成功 C-->>S: 第三秒请求数据1条 Note right of S: 清除第二秒的记录10条<br>1条令牌桶转化第三秒1条记录<br>故当前为请求数1条 S->>C: 直接返回成功

以上是时序加令牌的请求数据和返回情况

限流配置

类似于nginx中的limit_req配置,分为limit_req_zonelimit_req两部分,可分为两个类,一个为zone,一个为关联到zone名称的具体项目

#[derive(Debug, Clone)] pub struct LimitReqZone {     /// 键值的匹配方式     pub key: String,     /// IP个数     pub limit: u64,     /// 周期内可以通行的数据     pub nums: u64,     /// 每个周期的时间     pub per: Duration, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct LimitReq {     zone: String,     burst: u64, } 

然后在http的根目录下配置当前的zone空间,为一个HashMap结构,可以配置多种zone结构

#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HttpConfig {     // ...     #[serde_as(as = "HashMap<_, DisplayFromStr>")]     #[serde(default = "HashMap::new")]     pub limit_req_zone: HashMap<String, LimitReqZone>, }  #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct CommonConfig {     // ...     #[serde_as(as = "Option<DisplayFromStr>")]     pub limit_req: Option<LimitReq>, } 

因为并不是任何的请求都要进行限流,所以此处为Option,如果子级未配置,父级有配置,子级将会应用父级的配置。

以下展示在toml格式的配置

# 反向代理相关,七层协议为http及https [http]  [http.limit_req_zone] limit = "{client_ip} limit=10m rate=10r/s" less = "{client_ip} limit=10m rate=10r/min"   # 反向代理中的具体服务,可配置多个多组 [[http.server]] bind_addr = "0.0.0.0:82" server_name = "soft.wm-proxy.com" limit_req = "zone=limit brust=10"  # 按请求路径进行rule匹配,可匹配method,看具体的处理的内容如文件服务或者负载均衡 [[http.server.location]] limit_req = "zone=less brust=1" rule = "/root" file_server = { browse = true }  [[http.server.location]] rule = "/api" file_server = { browse = true } 

这样子就可以实现api不同的进行不同的限速方案,可以实现更好的通用效果。

配置解析

  • LimitReqZone解析
    需要将"{client_ip} limit=10m rate=10r/s"转成LimitReqZone结构,此处我们用的是FromStr接口,用空格分割,第一个字段为key,后续用=做分割,得取相应的值
impl FromStr for LimitReqZone {     type Err = ProxyError;      fn from_str(s: &str) -> Result<Self, Self::Err> {         let v = s.split(" ").collect::<Vec<&str>>();         let key = v[0].to_string();         let mut limit = 0;         let mut nums = 0;         let mut per = Duration::new(0, 0);         for idx in 1..v.len() {             let key_value = v[idx].split("=").map(|k| k.trim()).collect::<Vec<&str>>();             if key_value.len() <= 1 {                 return Err(ProxyError::Extension("未知的LimitReq"));             }             match key_value[0] {                 "limit" => {                     let s = ConfigSize::from_str(key_value[1])?;                     limit = s.0;                 }                 "rate" => {                     let rate_key = key_value[1]                         .split("/")                         .map(|k| k.trim())                         .collect::<Vec<&str>>();                     if rate_key.len() == 1 {                         return Err(ProxyError::Extension("未知的LimitReq"));                     }                      let rate = rate_key[0].trim_end_matches("r");                     nums = rate                         .parse::<u64>()                         .map_err(|_e| ProxyError::Extension("parse error"))?;                                          let s = ConfigDuration::from_str(rate_key[1])?;                     per = s.0;                 }                 _ => {                     return Err(ProxyError::Extension("未知的LimitReq"));                 }             }         }          Ok(LimitReqZone::new(key, limit, nums, per))     } } 
  • LimitReq解析

需要将"zone=less brust=1"转成LimitReq结构,此处我们用的是FromStr接口,用空格分割,将每个值用=做分割,得取相应的值

impl FromStr for LimitReq {     type Err = ProxyError;     fn from_str(s: &str) -> Result<Self, Self::Err> {         let v = s.split(" ").collect::<Vec<&str>>();         let mut zone = String::new();         let mut brust = 0;         for idx in 0..v.len() {             let key_value = v[idx].split("=").map(|k| k.trim()).collect::<Vec<&str>>();             if key_value.len() <= 1 {                 return Err(ProxyError::Extension("未知的LimitReq"));             }             match key_value[0] {                 "zone" => {                     zone = key_value[1].to_string();                 }                 "brust" => {                     brust = key_value[1]                         .parse::<u64>()                         .map_err(|_e| ProxyError::Extension("parse error"))?;                 }                 _ => {                     return Err(ProxyError::Extension("未知的LimitReq"));                 }             }         }          Ok(LimitReq::new(zone, brust))     } } 

限制实现

首先我们配置一个静态可访问的全局变量,因为所有的线程操作都需要汇总到此时判定是否合格

每个命名空间里,都将存储不超过规格数据的IP,如果超过将直接返回失败

pub struct LimitReqData {     /// 记录所有的ip数据的限制情况     ips: HashMap<String, InnerLimit>,     /// IP个数     limit: u64,     /// 周期内可以通行的数据     nums: u64,     /// 每个周期的时间     per: Duration,     /// 最后清理IP的时间     last_remove: Instant, } 

全局静态数据

lazy_static! {     static ref GLOABL_LIMIT_REQ: RwLock<HashMap<&'static str, LimitReqData>> =         RwLock::new(HashMap::new()); }  

返回结果

#[derive(Debug)] pub enum LimitResult {     Ok,     Refuse,     Delay(Duration), } 

所以的判断是否通过,我们将通过以下函数返回相应的结果,从而使外部的函数可以进行相应的处理。

impl LimitReqData {     pub fn recv_new_req(key: &str, ip: &String, burst: u64) -> ProtResult<LimitResult> {         let mut write = GLOBAL_LIMIT_REQ             .write()             .map_err(|_| ProtError::Extension("unlock error"))?;         if !write.contains_key(&*key) {             return Ok(LimitResult::Ok);         }         write.get_mut(key).unwrap().inner_recv_new_req(ip, burst)     } } 

小结

我们通过全局共享数据,需要加锁获取该数据,来判定整体的KEY的流量情况,可能是IP,可能是IP+Cookie等,来灵活的针对用户限流还是针对IP限流或者其它的业务情况进行合理的安排。

点击 [关注][在看][点赞] 是对作者最大的支持

发表评论

评论已关闭。

相关文章