Spring Cloud Gateway实现分布式限流和熔断降级

小伙伴们,你们好呀!我是老寇!一起学习学习gateway限流和熔断降级

一、限流

思考:为啥需要限流?

在一个流量特别大的业务场景中,如果不进行限流,会造成系统宕机,当大批量的请求到达后端服务时,会造成资源耗尽【CPU、内存、线程、网络带宽、数据库连接等是有限的】,进而拖垮系统。

1.常见限流算法

  • 漏桶算法
  • 令牌桶算法

1.1漏桶算法(不推荐)

Spring Cloud Gateway实现分布式限流和熔断降级

1.1.1.原理

将请求缓存到一个队列中,然后以固定的速度处理,从而达到限流的目的

1.1.2.实现

将请求装到一个桶中,桶的容量为固定的一个值,当桶装满之后,就会将请求丢弃掉,桶底部有一个洞,以固定的速率流出。

1.1.3.举例

桶的容量为1W,有10W并发请求,最多只能将1W请求放入桶中,其余请求全部丢弃,以固定的速度处理请求

1.1.4.缺点

处理突发流量效率低(处理请求的速度不变,效率很低)

1.2.令牌桶算法(推荐)

Spring Cloud Gateway实现分布式限流和熔断降级

1.2.1.原理

将请求放在一个缓冲队列中,拿到令牌后才能进行处理

1.2.2.实现

装令牌的桶大小固定,当令牌装满后,则不能将令牌放入其中;每次请求都会到桶中拿取一个令牌才能放行,没有令牌时即丢弃请求/继续放入缓存队列中等待

1.2.3.举例

桶的容量为10w个,生产1w个/s,有10W的并发请求,以每秒10W个/s速度处理,随着桶中的令牌很快用完,速度又慢慢降下来啦,而生产令牌的速度趋于一致1w个/s

1.2.4.缺点

处理突发流量提供了系统性能,但是对系统造成了一定的压力,桶的大小不合理,甚至会压垮系统(处理1亿的并发请求,将桶的大小设置为1,这个系统一下就凉凉啦)

2.网关限流(Spring Cloud Gateway + Redis实战)

2.1.pom.xml配置

        <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-data-redis-reactive</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.cloud</groupId>             <artifactId>spring-cloud-starter-gateway</artifactId>             <exclusions>                 <exclusion>                     <groupId>org.springframework.boot</groupId>                     <artifactId>spring-boot-starter-web</artifactId>                 </exclusion>             </exclusions>         </dependency>     <dependency>         <groupId>org.apache.httpcomponents</groupId>         <artifactId>httpclient</artifactId>     </dependency> 

2.2.yaml配置

spring:   application:     name: laokou-gateway   cloud:     gateway:       routes:         - id: LAOKOU-SSO-DEMO           uri: lb://laokou-sso-demo           predicates:           - Path=/sso/**           filters:           - StripPrefix=1           - name: RequestRateLimiter #请求数限流,名字不能乱打             args:               key-resolver: "#{@ipKeyResolver}"               redis-rate-limiter.replenishRate: 1 #生成令牌速率-设为1方便测试               redis-rate-limiter.burstCapacity: 1 #令牌桶容量-设置1方便测试   redis:     database: 0     cluster:       nodes: x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005,x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005     password: laokou #密码     timeout: 6000ms #连接超时时长(毫秒)     jedis:       pool:         max-active: -1 #连接池最大连接数(使用负值表示无极限)         max-wait: -1ms #连接池最大阻塞等待时间(使用负值表示没有限制)         max-idle: 10 #连接池最大空闲连接         min-idle: 5 #连接池最小空间连接 

2.3.创建bean

@Configuration public class RequestRateLimiterConfig {      @Bean(value = "ipKeyResolver")     public KeyResolver ipKeyResolver(RemoteAddressResolver remoteAddressResolver) {     	return exchange -> Mono.just(remoteAddressResolver.resolve(exchange).getAddress().getHostAddress());     }      @Bean     public RemoteAddressResolver remoteAddressResolver() {     	// 远程地址解析器     	return XForwardedRemoteAddressResolver.trustAll();     }  } 

3.测试限流(编写java并发测试)

@Slf4j public class HttpUtil { public static void apiConcurrent(String url,Map<String,String> params) {         Integer count = 200;         //创建线程池         ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.SECONDS, new SynchronousQueue<>());         //同步工具         CountDownLatch latch = new CountDownLatch(count);         Map<String,String> dataMap = new HashMap<>(1);         dataMap.put("authorize","XXXXXXX");         for (int i = 0; i < count; i++) {             pool.execute(() -> {                 try {                     //访问网关的API接口                     HttpUtil.doGet("http://localhost:1234/sso/laokou-demo/user",dataMap);                 } catch (IOException e) {                     e.printStackTrace();                 }finally {                     latch.countDown();                 }             });         }         try {             latch.await();         } catch (InterruptedException e) {             e.printStackTrace();         }     }   public static String doGet(String url, Map<String, String> params) throws IOException {         //创建HttpClient对象         CloseableHttpClient httpClient = HttpClients.createDefault();         String resultString = "";         CloseableHttpResponse response = null;         try {             //创建uri             URIBuilder builder = new URIBuilder(url);             if (!params.isEmpty()) {                 for (Map.Entry<String, String> entry : params.entrySet()) {                     builder.addParameter(entry.getKey(), entry.getValue());                 }             }             URI uri = builder.build();             //创建http GET请求             HttpGet httpGet = new HttpGet(uri);             List<NameValuePair> paramList = new ArrayList<>();             RequestBuilder requestBuilder = RequestBuilder.get().setUri(new URI(url));             requestBuilder.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));             httpGet.setHeader(new BasicHeader("Content-Type", "application/json;charset=UTF-8"));             httpGet.setHeader(new BasicHeader("Accept", "*/*;charset=utf-8"));             //执行请求             response = httpClient.execute(httpGet);             //判断返回状态是否是200             if (response.getStatusLine().getStatusCode() == 200) {                 resultString = EntityUtils.toString(response.getEntity(), "UTF-8");             }         } catch (Exception e) {             log.info("调用失败:{}",e);         } finally {             if (response != null) {                 response.close();             }             httpClient.close();         }         log.info("打印:{}",resultString);         return resultString;     } } 

Spring Cloud Gateway实现分布式限流和熔断降级
Spring Cloud Gateway实现分布式限流和熔断降级

说明这个网关限流配置是没有问题的

4.源码查看

Spring Cloud Gateway RequestRateLimiter GatewayFilter Factory文档地址

工厂 RequestRateLimiter GatewayFilter使用一个RateLimiter实现来判断当前请求是否被允许继续。如果不允许,HTTP 429 - Too Many Requests则返回默认状态。

4.1.查看 RequestRateLimiterGatewayFilterFactory

	@Override 	public GatewayFilter apply(Config config) { 		KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver); 		RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter); 		boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey); 		HttpStatusHolder emptyKeyStatus = HttpStatusHolder 				.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));   		return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> { 			if (EMPTY_KEY.equals(key)) { 				if (denyEmpty) { 					setResponseStatus(exchange, emptyKeyStatus); 					return exchange.getResponse().setComplete(); 				} 				return chain.filter(exchange); 			} 			String routeId = config.getRouteId(); 			if (routeId == null) { 				Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); 				routeId = route.getId(); 			}                  // 执行限流 			return limiter.isAllowed(routeId, key).flatMap(response -> {   				for (Map.Entry<String, String> header : response.getHeaders().entrySet()) { 					exchange.getResponse().getHeaders().add(header.getKey(), header.getValue()); 				}   				if (response.isAllowed()) { 					return chain.filter(exchange); 				}   				setResponseStatus(exchange, config.getStatusCode()); 				return exchange.getResponse().setComplete(); 			}); 		}); 	} 

4.2.查看 RedisRateLimiter

	@Override 	@SuppressWarnings("unchecked") 	public Mono<Response> isAllowed(String routeId, String id) { 		if (!this.initialized.get()) { 			throw new IllegalStateException("RedisRateLimiter is not initialized"); 		}         // 这里如何加载配置?请思考 		Config routeConfig = loadConfiguration(routeId);         // 令牌桶每秒产生令牌数量 		int replenishRate = routeConfig.getReplenishRate();         // 令牌桶容量 		int burstCapacity = routeConfig.getBurstCapacity();         // 请求消耗的令牌数 		int requestedTokens = routeConfig.getRequestedTokens(); 		try {                   // 键 			List<String> keys = getKeys(id);                   // 参数 			List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", "", requestedTokens + ""); 			// 调用lua脚本 			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); 			return flux.onErrorResume(throwable -> { 				if (log.isDebugEnabled()) { 					log.debug("Error calling rate limiter lua", throwable); 				} 				return Flux.just(Arrays.asList(1L, -1L)); 			}).reduce(new ArrayList<Long>(), (longs, l) -> { 				longs.addAll(l); 				return longs; 			}).map(results -> {                           // 判断是否等于1,1表示允许通过,0表示不允许通过 				boolean allowed = results.get(0) == 1L; 				Long tokensLeft = results.get(1); 				Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); 				if (log.isDebugEnabled()) { 					log.debug("response: " + response); 				} 				return response; 			}); 		} 		catch (Exception e) { 			log.error("Error determining if user allowed from redis", e); 		} 		return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); 	}   	static List<String> getKeys(String id) { 		String prefix = "request_rate_limiter.{" + id; 		String tokenKey = prefix + "}.tokens"; 		String timestampKey = prefix + "}.timestamp"; 		return Arrays.asList(tokenKey, timestampKey); 	} 

思考:redis限流配置是如何加载?

其实就是监听动态路由的事件并把配置存起来

Spring Cloud Gateway实现分布式限流和熔断降级

4.3.重点来了,令牌桶 /META-INF/scripts/request_rate_limiter.lua 脚本剖析

-- User Request Rate Limiter filter -- See https://stripe.com/blog/rate-limiters -- See https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L11-L34   -- 令牌桶算法工作原理 -- 1.系统以恒定速率往桶里面放入令牌 -- 2.请求需要被处理,则需要从桶里面获取一个令牌 -- 3.如果桶里面没有令牌可获取,则可以选择等待或直接拒绝并返回   -- 令牌桶算法工作流程 -- 1.计算填满令牌桶所需要的时间(填充时间 = 桶容量 / 速率) -- 2.设置存储数据的TTL(过期时间),为填充时间的两倍(存储时间 = 填充时间 * 2) -- 3.从Redis获取当前令牌的剩余数量和上一次调用的时间戳 -- 4.计算距离上一次调用的时间间隔(时间间隔 = 当前时间 - 上一次调用时间) -- 5.计算填充的令牌数量(填充令牌数量 = 时间间隔 * 速率)【前提:桶容量是固定的,不存在无限制的填充】 -- 6.判断是否有足够多的令牌满足请求【 (填充令牌数量 + 剩余令牌数量) >= 请求数量 && (填充令牌数量 + 剩余令牌数量) <= 桶容量 】 -- 7.如果请求被允许,则从桶里面取出相应数据的令牌 -- 8.如果TTL为正,则更新Redis键中的令牌和时间戳 -- 9.返回两个两个参数(allowed_num:请求被允许标志。1允许,0不允许)、(new_tokens:填充令牌后剩余的令牌数据)   -- 随机写入 redis.replicate_commands()   -- 令牌桶Key -> 存储当前可用令牌的数量(剩余令牌数量) local tokens_key = KEYS[1]   -- 时间戳Key -> 存储上次令牌刷新的时间戳 local timestamp_key = KEYS[2]   -- 令牌填充速率 local rate = tonumber(ARGV[1])   -- 令牌桶容量 local capacity = tonumber(ARGV[2])   -- 当前时间 local now = tonumber(ARGV[3])   -- 请求数量 local requested = tonumber(ARGV[4])   -- 填满令牌桶所需要的时间 local fill_time = capacity / rate   -- 设置key的过期时间(填满令牌桶所需时间的2倍) local ttl = math.floor(fill_time * 2)   -- 判断当前时间,为空则从redis获取 if now == nil then     now = redis.call('TIME')[1] end   -- 获取当前令牌的剩余数量 local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then     last_tokens = capacity end   -- 获取上一次调用的时间戳 local last_refreshed = tonumber(redis.call('get', timestamp_key)) if last_refreshed == nil then     last_refreshed = 0 end   -- 计算距离上一次调用的时间间隔 local delta = math.max(0, now - last_refreshed)   -- 当前的令牌数量(剩余 + 填充 <= 桶容量) local now_tokens = math.min(capacity, last_refreshed + (rate * delta))   -- 判断是否有足够多的令牌满足请求 local allowed = now_tokens >= requested   -- 定义当前令牌的剩余数量 local new_tokens = now_tokens   -- 定义被允许标志 local allowed_num = 0 if allowed then     new_tokens = now_tokens - requested     -- 允许访问     allowed_num = 1 end   -- ttl > 0,将当前令牌的剩余数量和当前时间戳存入redis if ttl > 0 then     redis.call('setex', tokens_key, ttl, new_tokens)     redis.call('setex', timestamp_key, ttl, now) end   -- 返回参数 return { allowed_num, new_tokens } 

4.4.查看 GatewayRedisAutoConfiguration 脚本初始化

	@Bean 	@SuppressWarnings("unchecked") 	public RedisScript redisRequestRateLimiterScript() { 		DefaultRedisScript redisScript = new DefaultRedisScript<>(); 		redisScript.setScriptSource(                           // 根据指定路径获取lua脚本来初始化配置 				new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua"))); 		redisScript.setResultType(List.class); 		return redisScript; 	}   	@Bean 	@ConditionalOnMissingBean 	public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate, 			@Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript, 			ConfigurationService configurationService) { 		return new RedisRateLimiter(redisTemplate, redisScript, configurationService); 	} 

思考:请求限流过滤器是如何开启?

1.通过yaml配置开启

spring:   cloud:     gateway:       server:         webflux:           filter:             request-rate-limiter:               enabled: true 

2.GatewayAutoConfiguration自动注入bean

@Bean @ConditionalOnBean({ RateLimiter.class, KeyResolver.class }) @ConditionalOnEnabledFilter public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter,        KeyResolver resolver) {     return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver); } 

重点来了,真正加载这个bean的是 @ConditionalOnEnabledFilter 注解进行判断

@Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.TYPE, ElementType.METHOD }) @Documented @Conditional(OnEnabledFilter.class) public @interface ConditionalOnEnabledFilter {      // 这里value是用来指定满足条件的某些类,换一句话说,就是这些类都加载或注入到ioc容器,这个注解修饰的自动装配类才会生效     Class<? extends GatewayFilterFactory<?>> value() default OnEnabledFilter.DefaultValue.class;  } 

我们继续跟进代码,查看@Conditional(OnEnabledFilter.class)

众所周知,@Conditional可以用来加载满足条件的bean,所以,我们分析一下OnEnabledFilter

public class OnEnabledFilter extends OnEnabledComponent<GatewayFilterFactory<?>> {} 

我分析它的父类,这里有你想要的答案!

public abstract class OnEnabledComponent<T> extends SpringBootCondition implements ConfigurationCondition {      private static final String PREFIX = "spring.cloud.gateway.server.webflux.";      private static final String SUFFIX = ".enabled";      private ConditionOutcome determineOutcome(Class<? extends T> componentClass, PropertyResolver resolver) {        // 拼接完整名称        // 例如 => spring.cloud.gateway.server.webflux.request-rate-limiter.enabled        String key = PREFIX + normalizeComponentName(componentClass) + SUFFIX;        ConditionMessage.Builder messageBuilder = forCondition(annotationClass().getName(), componentClass.getName());        if ("false".equalsIgnoreCase(resolver.getProperty(key))) {           // 不满足条件不加载bean           return ConditionOutcome.noMatch(messageBuilder.because("bean is not available"));        }        // 满足条件加载bean        return ConditionOutcome.match();     } } 

5.优化限流响应[使用全限定类名直接覆盖类]

小伙伴们,有没有发现,这个这个响应体封装的不太好,因此,我们来自定义吧,我们直接覆盖类,代码修改如下

@Getter @ConfigurationProperties("spring.cloud.gateway.server.webflux.filter.request-rate-limiter") public class RequestRateLimiterGatewayFilterFactory        extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {      private static final String EMPTY_KEY = "____EMPTY_KEY__";      private final RateLimiter<?> defaultRateLimiter;      private final KeyResolver defaultKeyResolver;      /**      * Switch to deny requests if the Key Resolver returns an empty key, defaults to true.      */     @Setter     private boolean denyEmptyKey = true;      /** HttpStatus to return when denyEmptyKey is true, defaults to FORBIDDEN. */     @Setter     private String emptyKeyStatusCode = HttpStatus.FORBIDDEN.name();      public RequestRateLimiterGatewayFilterFactory(RateLimiter<?> defaultRateLimiter, KeyResolver defaultKeyResolver) {        super(Config.class);        this.defaultRateLimiter = defaultRateLimiter;        this.defaultKeyResolver = defaultKeyResolver;     }      @Override     public GatewayFilter apply(Config config) {        KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);        RateLimiter<?> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);        boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);        HttpStatusHolder emptyKeyStatus = HttpStatusHolder           .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));        return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {           if (EMPTY_KEY.equals(key)) {              if (denyEmpty) {                 setResponseStatus(exchange, emptyKeyStatus);                 return exchange.getResponse().setComplete();              }              return chain.filter(exchange);           }           String routeId = config.getRouteId();           if (routeId == null) {              Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);              Assert.notNull(route, "Route is null");              routeId = route.getId();           }           return limiter.isAllowed(routeId, key).flatMap(response -> {              for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {                 exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());              }              if (response.isAllowed()) {                 return chain.filter(exchange);              }              // 主要修改这行              return responseOk(exchange, Result.fail("Too_Many_Requests", "请求太频繁"));           });        });     }          private Mono<Void> responseOk(ServerWebExchange exchange, Object data) {         return responseOk(exchange, JacksonUtils.toJsonStr(data), MediaType.APPLICATION_JSON);     }      private Mono<Void> responseOk(ServerWebExchange exchange, String str, MediaType contentType) {         DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(str.getBytes(StandardCharsets.UTF_8));         ServerHttpResponse response = exchange.getResponse();         response.setStatusCode(HttpStatus.OK);         response.getHeaders().setContentType(contentType);         response.getHeaders().setContentLength(str.getBytes(StandardCharsets.UTF_8).length);         return response.writeWith(Flux.just(buffer));     }      private <T> T getOrDefault(T configValue, T defaultValue) {        return (configValue != null) ? configValue : defaultValue;     }      public static class Config implements HasRouteId {         @Getter        private KeyResolver keyResolver;         @Getter        private RateLimiter<?> rateLimiter;         @Getter        private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS;         @Getter        private Boolean denyEmptyKey;         @Getter        private String emptyKeyStatus;         private String routeId;         public Config setKeyResolver(KeyResolver keyResolver) {           this.keyResolver = keyResolver;           return this;        }         public Config setRateLimiter(RateLimiter<?> rateLimiter) {           this.rateLimiter = rateLimiter;           return this;        }         public Config setStatusCode(HttpStatus statusCode) {           this.statusCode = statusCode;           return this;        }         public Config setDenyEmptyKey(Boolean denyEmptyKey) {           this.denyEmptyKey = denyEmptyKey;           return this;        }         public Config setEmptyKeyStatus(String emptyKeyStatus) {           this.emptyKeyStatus = emptyKeyStatus;           return this;        }         @Override        public void setRouteId(String routeId) {           this.routeId = routeId;        }         @Override        public String getRouteId() {           return this.routeId;        }      }  } 

二、熔断降级

思考:为什么需要熔断降级?

当某个服务发生故障时(超时,响应慢,宕机),上游服务无法及时获取响应,进而也导致故障,出现服务雪崩【服务雪崩是指故障像滚雪球一样沿着调用链向上游扩展,进而导致整个系统瘫痪】

熔断降级的目标就是在故障发生时,快速隔离问题服务【快速失败,防止资源耗尽】,保护系统资源不被耗尽,防止故障扩散,保护核心业务可用性。

1.技术选型

1.1.熔断降级框架选型对比表

对比维度 Hystrix (Netflix) Sentinel (Alibaba) Resilience4j
当前状态 ❌ 停止更新 (维护模式) ✅ 持续更新 ✅ 持续更新
熔断机制 滑动窗口计数 响应时间/异常比例/QPS 错误率/响应时间阈值
流量控制 ❌ 仅基础隔离 ✅ QPS/并发数/热点参数/集群流控 ✅ RateLimiter
隔离策略 线程池(开销大)/信号量 并发线程数(无线程池开销) 信号量/Bulkhead
降级能力 Fallback 方法 Fallback + 系统规则自适应 Fallback + 自定义组合策略
实时监控 ✅ Hystrix Dashboard ✅ 原生控制台(可视化动态规则) ❌ 需整合 Prometheus/Grafana
动态配置 ❌ 依赖 Archaius ✅ 控制台实时推送 ✅ 需编码实现(如Spring Cloud Config)
生态集成 ✅ Spring Cloud Netflix ✅ Spring Cloud Alibaba/多语言网关 ✅ Spring Boot/响应式编程
性能开销 高(线程池隔离) 低(无额外线程) 极低(纯函数式)
适用场景 遗留系统维护 高并发控制/秒杀/热点防护 云原生/轻量级微服务
推荐指数 ⭐⭐ (不推荐新项目) ⭐⭐⭐⭐⭐ (Java高并发首选) ⭐⭐⭐⭐⭐ (云原生/响应式首选)

1.2选型决策指南

需求场景 推荐方案 原因
电商秒杀/API高频调用管控 ✅ Sentinel 精细流量控制+热点防护+实时看板
Kubernetes云原生微服务 ✅ Resilience4j 轻量化+无缝集成Prometheus+响应式支持
Spring Cloud Netflix旧系统 ⚠️ Hystrix 兼容现存代码(短期过渡)
多语言混合架构(如Go+Java) ✅ Sentinel 通过Sidecar代理支持非Java服务
响应式编程(WebFlux) ✅ Resilience4j 原生Reactive API支持

2.Resilience4j使用

Resilience4j官方文档

Resilience4j 可以看作是 Hystrix 的替代品,Resilience4j支持 熔断器单机限流

Resilience4j 是一个专为函数式编程设计的轻量级容错库。Resilience4j 提供高阶函数(装饰器),可通过断路器、速率限制器、重试或隔离功能增强任何函数式接口、lambda 表达式或方法引用。您可以在任何函数式接口、lambda 表达式或方法引用上堆叠多个装饰器。这样做的好处是,您可以只选择所需的装饰器,而无需考虑其他因素。

2.1.网关熔断降级(Spring Cloud Gateway + Resilience4j实战)

2.1.1.pom依赖
<dependency>   <groupId>org.springframework.cloud</groupId>   <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId> </dependency> 

2.1.2.yaml配置

spring:   application:     name: laokou-gateway   cloud:     gateway:        server:         webflux:           routes:             - id: LAOKOU-SSO-DEMO               uri: lb://laokou-sso-demo               predicates:               - Path=/sso/**               filters:               - name: CircuitBreaker                 args:                   name: default                   fallbackUri: "forward:/fallback"           filter:             circuit-breaker:               enabled: true 

2.1.3.CircuitBreakerConfig配置

/**  * @author laokou  */ @Configuration public class CircuitBreakerConfig {      @Bean     public RouterFunction<ServerResponse> routerFunction() {        return RouterFunctions.route(              RequestPredicates.path("/fallback").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),              (request) -> ServerResponse.status(HttpStatus.SC_OK)                 .contentType(MediaType.APPLICATION_JSON)                 .body(BodyInserters.fromValue(Result.fail("Service_Unavailable", "服务正在维护"))));     }      @Bean     public Customizer<ReactiveResilience4JCircuitBreakerFactory> reactiveResilience4JCircuitBreakerFactoryCustomizer() {        return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)           // 3秒后超时时间           .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(3)).build())           .circuitBreakerConfig(io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.ofDefaults())           .build());     }  } 

我是老寇,我们下次再见啦!

发表评论

评论已关闭。

相关文章