常見(jiàn)的限流算法與實(shí)現(xiàn)
限流的實(shí)現(xiàn)
常見(jiàn)的限流算法:
限流是對(duì)某一時(shí)間窗口內(nèi)的請(qǐng)求數(shù)進(jìn)行限制,保持系統(tǒng)的可用性和穩(wěn)定性,防止因流量暴增而導(dǎo)致的系統(tǒng)運(yùn)行緩慢或宕機(jī)。
常見(jiàn)的限流算法有三種:
計(jì)數(shù)器限流(固定窗口)
原理:
時(shí)間線劃分為多個(gè)獨(dú)立且固定大小窗口;
落在每一個(gè)時(shí)間窗口內(nèi)的請(qǐng)求就將計(jì)數(shù)器加1;
如果計(jì)數(shù)器超過(guò)了限流閾值,則后續(xù)落在該窗口的請(qǐng)求都會(huì)被拒絕。但時(shí)間達(dá)到下一個(gè)時(shí)間窗口時(shí),計(jì)數(shù)器會(huì)被重置為0。
案例:
package com.example.studyproject.algorithm;
import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName: FixedWindow
* @Description: 固定窗口算法
* @Version 1.0
**/
public class FixedWindow {
/**
* 閾值
*/
private static Integer QPS = 2;
/**
* 時(shí)間窗口(毫秒)
*/
private static long TIME_WINDOWS = 1000;
/**
* 計(jì)數(shù)器
*/
private static AtomicInteger REQ_COUNT = new AtomicInteger();
/**
* 窗口開(kāi)始時(shí)間
*/
private static long START_TIME = System.currentTimeMillis();
public synchronized static boolean tryAcquire() {
//超時(shí)窗口
if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) {
REQ_COUNT.set(0);
START_TIME = System.currentTimeMillis();
}
return REQ_COUNT.incrementAndGet() <= QPS;
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread.sleep(250);
LocalTime now = LocalTime.now();
if (!tryAcquire()) {
System.out.println(now + " 被限流");
} else {
System.out.println(now + " 做點(diǎn)什么");
}
}
}
}
問(wèn)題:
雖然我們限制了 QPS 為 2,但是當(dāng)遇到時(shí)間窗口的臨界突變時(shí),如 1s 中的后 500 ms 和第 2s 的前 500ms 時(shí),雖然是加起來(lái)是 1s 時(shí)間,卻可以被請(qǐng)求 4 次。
滑動(dòng)窗口
滑動(dòng)窗口算法是對(duì)固定窗口算法的改進(jìn)
原理:
將單位時(shí)間劃分為多個(gè)區(qū)間,一般都是均分為多個(gè)小的時(shí)間段;
每一個(gè)區(qū)間內(nèi)都有一個(gè)計(jì)數(shù)器,有一個(gè)請(qǐng)求落在該區(qū)間內(nèi),則該區(qū)間內(nèi)的計(jì)數(shù)器就會(huì)加一;
每過(guò)一個(gè)時(shí)間段,時(shí)間窗口就會(huì)往右滑動(dòng)一格,拋棄最老的一個(gè)區(qū)間,并納入新的一個(gè)區(qū)間;
計(jì)算整個(gè)時(shí)間窗口內(nèi)的請(qǐng)求總數(shù)時(shí)會(huì)累加所有的時(shí)間片段內(nèi)的計(jì)數(shù)器,計(jì)數(shù)總和超過(guò)了限制數(shù)量,則本窗口內(nèi)所有的請(qǐng)求都被丟棄。
上圖的示例中,每 500ms 滑動(dòng)一次窗口,可以發(fā)現(xiàn)窗口滑動(dòng)的間隔越短,時(shí)間窗口的臨界突變問(wèn)題發(fā)生的概率也就越小,不過(guò)只要有時(shí)間窗口的存在,還是有可能發(fā)生時(shí)間窗口的臨界突變問(wèn)題。
代碼案例:
package com.example.studyproject.algorithm;
import lombok.Data;
import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName: SlidingWindow
* @Description: 滑動(dòng)窗口
* @Version 1.0
**/
public class SlidingWindow {
/**
* 閾值
*/
private int qps = 2;
/**
* 時(shí)間窗口總大小(毫秒)
*/
private long windowSize = 1000;
/**
* 多少個(gè)子窗口
*/
private Integer windowCount = 10;
/**
* 窗口列表
*/
private WindowInfo[] windowArray = new WindowInfo[windowCount];
public SlidingWindow(int qps) {
this.qps = qps;
long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < windowArray.length; i++) {
windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
}
}
/**
* 1. 計(jì)算當(dāng)前時(shí)間窗口
* 2. 更新當(dāng)前窗口計(jì)數(shù) & 重置過(guò)期窗口計(jì)數(shù)
* 3. 當(dāng)前 QPS 是否超過(guò)限制
* @return 是否被限流
*/
public synchronized boolean tryAcquire() {
long currentTimeMillis = System.currentTimeMillis();
// 1. 計(jì)算當(dāng)前時(shí)間窗口
int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
// 2. 更新當(dāng)前窗口計(jì)數(shù) & 重置過(guò)期窗口計(jì)數(shù)
int sum = 0;
for (int i = 0; i < windowArray.length; i++) {
WindowInfo windowInfo = windowArray[i];
if ((currentTimeMillis - windowInfo.getTime()) > windowSize) {
windowInfo.getNumber().set(0);
windowInfo.setTime(currentTimeMillis);
}
if (currentIndex == i && windowInfo.getNumber().get() < qps) {
windowInfo.getNumber().incrementAndGet();
}
sum = sum + windowInfo.getNumber().get();
}
// 3. 當(dāng)前 QPS 是否超過(guò)限制
return sum <= qps;
}
@Data
private class WindowInfo {
// 窗口開(kāi)始時(shí)間
private Long time;
// 計(jì)數(shù)器
private AtomicInteger number;
public WindowInfo(long time, AtomicInteger number) {
this.time = time;
this.number = number;
}
// get...set...
}
public static void main(String[] args) throws InterruptedException {
int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
System.out.println(String.format("當(dāng)前QPS限制為:%d,當(dāng)前測(cè)試次數(shù):%d,間隔:%dms,預(yù)計(jì)成功次數(shù):%d", qps, count, sleep, success));
success = 0;
SlidingWindow myRateLimiter = new SlidingWindow(qps);
for (int i = 0; i < count; i++) {
Thread.sleep(sleep);
if (myRateLimiter.tryAcquire()) {
success++;
if (success % qps == 0) {
System.out.println(LocalTime.now() + ": success, ");
} else {
System.out.print(LocalTime.now() + ": success, ");
}
} else {
System.out.println(LocalTime.now() + ": fail");
}
}
System.out.println();
System.out.println("實(shí)際測(cè)試成功次數(shù):" + success);
}
}
輸出結(jié)果:
已連接到目標(biāo) VM, 地址: ''127.0.0.1:50101',傳輸: '套接字''
當(dāng)前QPS限制為:2,當(dāng)前測(cè)試次數(shù):20,間隔:300ms,預(yù)計(jì)成功次數(shù):12
14:20:38.833: success, 14:20:39.142: success,
14:20:39.455: success, 14:20:39.766: success,
14:20:40.077: fail
14:20:40.377: fail
14:20:40.678: success, 14:20:40.992: success,
14:20:41.307: fail
14:20:41.621: fail
14:20:41.922: success, 14:20:42.229: success,
14:20:42.539: fail
14:20:42.840: fail
14:20:43.140: success, 14:20:43.455: success,
14:20:43.756: fail
14:20:44.070: fail
14:20:44.386: success, 14:20:44.687: success,
實(shí)際測(cè)試成功次數(shù):12
與目標(biāo) VM 斷開(kāi)連接, 地址為: ''127.0.0.1:50101',傳輸: '套接字''
進(jìn)程已結(jié)束,退出代碼0
漏桶算法
漏桶算法思路很簡(jiǎn)單,我們把水比作是請(qǐng)求,漏桶比作是系統(tǒng)處理能力極限,水先進(jìn)入到漏桶里,漏桶里的水按一定速率流出,當(dāng)流出的速率小于流入的速率時(shí),由于漏桶容量有限,后續(xù)進(jìn)入的水直接溢出(拒絕請(qǐng)求),以此實(shí)現(xiàn)限流。
由介紹可以知道,漏桶模式中的消費(fèi)處理總是能以恒定的速度進(jìn)行,可以很好的保護(hù)自身系統(tǒng)不被突如其來(lái)的流量沖垮;但是這也是漏桶模式的缺點(diǎn),假設(shè) QPS 為 2,同時(shí) 2 個(gè)請(qǐng)求進(jìn)來(lái),2 個(gè)請(qǐng)求并不能同時(shí)進(jìn)行處理響應(yīng),因?yàn)槊?1s / 2= 500ms 只能處理一個(gè)請(qǐng)求。
代碼案例:
package com.example.studyproject.algorithm;
import java.time.LocalTime;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @ClassName: LeakyBucket
* @Description: 漏桶算法
* @Author: Ze WANG
* @Date: 2022/9/26
* @Version 1.0
**/
public class LeakyBucket {
/**
* 水桶的大小
*/
private final int bucket;
/**
* qps,水露出的速度
*/
private int qps;
/**
* 當(dāng)前水量
*/
private long water;
private long timeStamp = System.currentTimeMillis();
public LeakyBucket(int bucket, int qps) {
this.bucket = bucket;
this.qps = qps;
}
/**
* 桶是否已經(jīng)滿了
* @return true未滿
*/
public boolean tryAcquire(){
//1.計(jì)算剩余水量
long now = System.currentTimeMillis();
long timeGap = (now - timeStamp)/1000;
water = Math.max(0,water-timeGap*qps);
timeStamp = now;
// 如果未滿,放行
if(water< bucket){
water += 1;
return true;
}
return false;
}
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ExecutorService singleThread = Executors.newSingleThreadExecutor();
LeakyBucket rateLimiter = new LeakyBucket(20, 2);
// 存儲(chǔ)流量的隊(duì)列
Queue<Integer> queue = new LinkedList<>();
// 模擬請(qǐng)求 不確定速率注水
singleThread.execute(() -> {
int count = 0;
while (true) {
count++;
boolean flag = rateLimiter.tryAcquire();
if (flag) {
queue.offer(count);
System.out.println(count + "--------流量被放行--------");
} else {
System.out.println(count + "流量被限制");
}
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 模擬處理請(qǐng)求 固定速率漏水
scheduledExecutorService.scheduleAtFixedRate(() -> {
if (!queue.isEmpty()) {
System.out.println(queue.poll() + "被處理");
}
}, 0, 100, TimeUnit.MILLISECONDS);
// 保證主線程不會(huì)退出
while (true) {
Thread.sleep(10000);
}
}
}
令牌桶算法
令牌桶算法的原理也比較簡(jiǎn)單,我們可以理解成醫(yī)院的掛號(hào)看病,只有拿到號(hào)以后才可以進(jìn)行診病。
系統(tǒng)會(huì)維護(hù)一個(gè)令牌(token)桶,以一個(gè)恒定的速度往桶里放入令牌(token),這時(shí)如果有請(qǐng)求進(jìn)來(lái)想要被處理,則需要先從桶里獲取一個(gè)令牌(token),當(dāng)桶里沒(méi)有令牌(token)可取時(shí),則該請(qǐng)求將被拒絕服務(wù)。令牌桶算法通過(guò)控制桶的容量、發(fā)放令牌的速率,來(lái)達(dá)到對(duì)請(qǐng)求的限制。
原理:
令牌桶的實(shí)現(xiàn)思路類似于生產(chǎn)者和消費(fèi)之間的關(guān)系。
系統(tǒng)服務(wù)作為生產(chǎn)者,按照指定頻率向桶(容器)中添加令牌,如 QPS 為 2,每 500ms 向桶中添加一個(gè)令牌,如果桶中令牌數(shù)量達(dá)到閾值,則不再添加。
請(qǐng)求執(zhí)行作為消費(fèi)者,每個(gè)請(qǐng)求都需要去桶中拿取一個(gè)令牌,取到令牌則繼續(xù)執(zhí)行;如果桶中無(wú)令牌可取,就觸發(fā)拒絕策略,可以是超時(shí)等待,也可以是直接拒絕本次請(qǐng)求,由此達(dá)到限流目的。
思考:
1s / 閾值(QPS) = 令牌添加時(shí)間間隔。
桶的容量等于限流的閾值,令牌數(shù)量達(dá)到閾值時(shí),不再添加。
可以適應(yīng)流量突發(fā),N 個(gè)請(qǐng)求到來(lái)只需要從桶中獲取 N 個(gè)令牌就可以繼續(xù)處理。
有啟動(dòng)過(guò)程,令牌桶啟動(dòng)時(shí)桶中無(wú)令牌,然后按照令牌添加時(shí)間間隔添加令牌,若啟動(dòng)時(shí)就有閾值數(shù)量的請(qǐng)求過(guò)來(lái),會(huì)因?yàn)橥爸袥](méi)有足夠的令牌而觸發(fā)拒絕策略,不過(guò)如 RateLimiter 限流工具已經(jīng)優(yōu)化了這類問(wèn)題。
代碼案例
使用Google封裝的令牌桶RateLimiter
/**
* 代碼中限制 QPS 為 2,也就是每隔 500ms 生成一個(gè)令牌,但是程序每隔 250ms 獲取一次令牌,所以兩次獲取中只有一次會(huì)成功。
*/
public static void main(String[] args) throws InterruptedException {
RateLimiter rateLimiter = RateLimiter.create(2);
for (int i = 0; i < 10; i++) {
String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
System.out.println(time + ":" + rateLimiter.tryAcquire());
Thread.sleep(250);
}
}
限流的實(shí)現(xiàn)
限流有很多種方法實(shí)現(xiàn):
基于Guava工具類實(shí)現(xiàn)限流
基于AOP實(shí)現(xiàn)限流
基于Redis實(shí)現(xiàn)限流(適用于分布式)
使用Redisson實(shí)現(xiàn)限流(適用于分布式)
Sentinel限流(適用于分布式)
Nginx、Gateway限流.....(適用于分布式)
作者:碼出宇宙
歡迎關(guān)注微信公眾號(hào) :碼出宇宙
掃描添加好友邀你進(jìn)技術(shù)交流群,加我時(shí)注明【姓名+公司(學(xué)校)+職位】