常見的限流算法與實現



限流的實現

常見的限流算法:



限流是對某一時間窗口內的請求數進行限制,保持系統(tǒng)的可用性和穩(wěn)定性,防止因流量暴增而導致的系統(tǒng)運行緩慢或宕機。

常見的限流算法有三種:

計數器限流(固定窗口)

原理:

時間線劃分為多個獨立且固定大小窗口;

落在每一個時間窗口內的請求就將計數器加1;

如果計數器超過了限流閾值,則后續(xù)落在該窗口的請求都會被拒絕。但時間達到下一個時間窗口時,計數器會被重置為0。



案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: FixedWindow
 * @Description: 固定窗口算法
 * @Version 1.0
 **/
public class FixedWindow {

    /**
     * 閾值
     */
    private static Integer QPS = 2;
    
    /**
     * 時間窗口(毫秒)
     */
    private static long TIME_WINDOWS = 1000;
    
    /**
     * 計數器
     */
    private static AtomicInteger REQ_COUNT = new AtomicInteger();

    /**
     * 窗口開始時間
     */
    private static long START_TIME = System.currentTimeMillis();

    public synchronized static boolean tryAcquire() {
        //超時窗口
        if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) {
            REQ_COUNT.set(0);
            START_TIME = System.currentTimeMillis();
        }
        return REQ_COUNT.incrementAndGet() <= QPS;
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(250);
            LocalTime now = LocalTime.now();
            if (!tryAcquire()) {
                System.out.println(now + " 被限流");
            } else {
                System.out.println(now + " 做點什么");
            }
        }
    }
}

問題:

雖然我們限制了 QPS 為 2,但是當遇到時間窗口的臨界突變時,如 1s 中的后 500 ms 和第 2s 的前 500ms 時,雖然是加起來是 1s 時間,卻可以被請求 4 次。



滑動窗口

滑動窗口算法是對固定窗口算法的改進



原理:

將單位時間劃分為多個區(qū)間,一般都是均分為多個小的時間段;

每一個區(qū)間內都有一個計數器,有一個請求落在該區(qū)間內,則該區(qū)間內的計數器就會加一;

每過一個時間段,時間窗口就會往右滑動一格,拋棄最老的一個區(qū)間,并納入新的一個區(qū)間;

計算整個時間窗口內的請求總數時會累加所有的時間片段內的計數器,計數總和超過了限制數量,則本窗口內所有的請求都被丟棄。



上圖的示例中,每 500ms 滑動一次窗口,可以發(fā)現窗口滑動的間隔越短,時間窗口的臨界突變問題發(fā)生的概率也就越小,不過只要有時間窗口的存在,還是有可能發(fā)生時間窗口的臨界突變問題。

代碼案例:

package com.example.studyproject.algorithm;

import lombok.Data;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: SlidingWindow
 * @Description: 滑動窗口
 * @Version 1.0
 **/
public class SlidingWindow {
    /**
     * 閾值
     */
    private int qps = 2;
    /**
     * 時間窗口總大?。ê撩耄?br>     */
    private long windowSize = 1000;
    /**
     * 多少個子窗口
     */
    private Integer windowCount = 10;
    /**
     * 窗口列表
     */
    private WindowInfo[] windowArray = new WindowInfo[windowCount];

    public SlidingWindow(int qps) {
        this.qps = qps;
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < windowArray.length; i++) {
            windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
        }
    }

    /**
     * 1. 計算當前時間窗口
     * 2. 更新當前窗口計數 & 重置過期窗口計數
     * 3. 當前 QPS 是否超過限制
     * @return 是否被限流
     */
    public synchronized boolean tryAcquire() {
        long currentTimeMillis = System.currentTimeMillis();
        // 1. 計算當前時間窗口
        int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
        // 2.  更新當前窗口計數 & 重置過期窗口計數
        int sum = 0;
        for (int i = 0; i < windowArray.length; i++) {
            WindowInfo windowInfo = windowArray[i];
            if ((currentTimeMillis - windowInfo.getTime()) > windowSize) {
                windowInfo.getNumber().set(0);
                windowInfo.setTime(currentTimeMillis);
            }
            if (currentIndex == i && windowInfo.getNumber().get() < qps) {
                windowInfo.getNumber().incrementAndGet();
            }
            sum = sum + windowInfo.getNumber().get();
        }
        // 3. 當前 QPS 是否超過限制
        return sum <= qps;
    }

    @Data
    private class WindowInfo {
        // 窗口開始時間
        private Long time;
        // 計數器
        private AtomicInteger number;

        public WindowInfo(long time, AtomicInteger number) {
            this.time = time;
            this.number = number;
        }
        // get...set...
    }

    public static void main(String[] args) throws InterruptedException {
        int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
        System.out.println(String.format("當前QPS限制為:%d,當前測試次數:%d,間隔:%dms,預計成功次數:%d", qps, count, sleep, success));
        success = 0;
        SlidingWindow myRateLimiter = new SlidingWindow(qps);
        for (int i = 0; i < count; i++) {
            Thread.sleep(sleep);
            if (myRateLimiter.tryAcquire()) {
                success++;
                if (success % qps == 0) {
                    System.out.println(LocalTime.now() + ": success, ");
                } else {
                    System.out.print(LocalTime.now() + ": success, ");
                }
            } else {
                System.out.println(LocalTime.now() + ": fail");
            }
        }
        System.out.println();
        System.out.println("實際測試成功次數:" + success);
    }
}
輸出結果:






已連接到目標 VM, 地址: ''127.0.0.1:50101',傳輸: '套接字''
當前QPS限制為:2,當前測試次數:20,間隔:300ms,預計成功次數:12
14:20:38.833: success, 14:20:39.142: success,
14:20:39.455: success, 14:20:39.766: success,
14:20:40.077: fail
14:20:40.377: fail
14:20:40.678: success, 14:20:40.992: success,
14:20:41.307: fail
14:20:41.621: fail
14:20:41.922: success, 14:20:42.229: success,
14:20:42.539: fail
14:20:42.840: fail
14:20:43.140: success, 14:20:43.455: success,
14:20:43.756: fail
14:20:44.070: fail
14:20:44.386: success, 14:20:44.687: success,

實際測試成功次數:12
與目標 VM 斷開連接, 地址為: ''127.0.0.1:50101',傳輸: '套接字''

進程已結束,退出代碼0

漏桶算法



漏桶算法思路很簡單,我們把水比作是請求,漏桶比作是系統(tǒng)處理能力極限,水先進入到漏桶里,漏桶里的水按一定速率流出,當流出的速率小于流入的速率時,由于漏桶容量有限,后續(xù)進入的水直接溢出(拒絕請求),以此實現限流。

由介紹可以知道,漏桶模式中的消費處理總是能以恒定的速度進行,可以很好的保護自身系統(tǒng)不被突如其來的流量沖垮;但是這也是漏桶模式的缺點,假設 QPS 為 2,同時 2 個請求進來,2 個請求并不能同時進行處理響應,因為每 1s / 2= 500ms 只能處理一個請求。

代碼案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName: LeakyBucket
 * @Description: 漏桶算法
 * @Author: Ze WANG
 * @Date: 2022/9/26
 * @Version 1.0
 **/
public class LeakyBucket {

    /**
     * 水桶的大小
     */
    private final int bucket;

    /**
     * qps,水露出的速度
     */
    private int qps;

    /**
     * 當前水量
     */
    private long water;

    private long timeStamp = System.currentTimeMillis();

    public LeakyBucket(int bucket, int qps) {
        this.bucket = bucket;
        this.qps = qps;
    }

    /**
     * 桶是否已經滿了
     * @return true未滿
     */
    public boolean tryAcquire(){
        //1.計算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp)/1000;
        water = Math.max(0,water-timeGap*qps);
        timeStamp = now;

        // 如果未滿,放行
        if(water< bucket){
            water += 1;
            return true;
        }
        return false;
    }

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        ExecutorService singleThread = Executors.newSingleThreadExecutor();

        LeakyBucket rateLimiter = new LeakyBucket(20, 2);
        // 存儲流量的隊列
        Queue<Integer> queue = new LinkedList<>();
        // 模擬請求  不確定速率注水
        singleThread.execute(() -> {
            int count = 0;
            while (true) {
                count++;
                boolean flag = rateLimiter.tryAcquire();
                if (flag) {
                    queue.offer(count);
                    System.out.println(count + "--------流量被放行--------");
                } else {
                    System.out.println(count + "流量被限制");
                }
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 模擬處理請求 固定速率漏水
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            if (!queue.isEmpty()) {
                System.out.println(queue.poll() + "被處理");
            }
        }, 0, 100, TimeUnit.MILLISECONDS);

        // 保證主線程不會退出
        while (true) {
            Thread.sleep(10000);
        }
    }


}

令牌桶算法



令牌桶算法的原理也比較簡單,我們可以理解成醫(yī)院的掛號看病,只有拿到號以后才可以進行診病。

系統(tǒng)會維護一個令牌(token)桶,以一個恒定的速度往桶里放入令牌(token),這時如果有請求進來想要被處理,則需要先從桶里獲取一個令牌(token),當桶里沒有令牌(token)可取時,則該請求將被拒絕服務。令牌桶算法通過控制桶的容量、發(fā)放令牌的速率,來達到對請求的限制。

原理:

令牌桶的實現思路類似于生產者和消費之間的關系。

系統(tǒng)服務作為生產者,按照指定頻率向桶(容器)中添加令牌,如 QPS 為 2,每 500ms 向桶中添加一個令牌,如果桶中令牌數量達到閾值,則不再添加。

請求執(zhí)行作為消費者,每個請求都需要去桶中拿取一個令牌,取到令牌則繼續(xù)執(zhí)行;如果桶中無令牌可取,就觸發(fā)拒絕策略,可以是超時等待,也可以是直接拒絕本次請求,由此達到限流目的。

思考:

1s / 閾值(QPS) = 令牌添加時間間隔。

桶的容量等于限流的閾值,令牌數量達到閾值時,不再添加。

可以適應流量突發(fā),N 個請求到來只需要從桶中獲取 N 個令牌就可以繼續(xù)處理。

有啟動過程,令牌桶啟動時桶中無令牌,然后按照令牌添加時間間隔添加令牌,若啟動時就有閾值數量的請求過來,會因為桶中沒有足夠的令牌而觸發(fā)拒絕策略,不過如 RateLimiter 限流工具已經優(yōu)化了這類問題。

代碼案例

使用Google封裝的令牌桶RateLimiter   

/**
     * 代碼中限制 QPS 為 2,也就是每隔 500ms 生成一個令牌,但是程序每隔 250ms 獲取一次令牌,所以兩次獲取中只有一次會成功。
     */
    public static void main(String[] args) throws InterruptedException {
        RateLimiter rateLimiter = RateLimiter.create(2);

        for (int i = 0; i < 10; i++) {
            String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
            System.out.println(time + ":" + rateLimiter.tryAcquire());
            Thread.sleep(250);
        }
    }

限流的實現

限流有很多種方法實現:

基于Guava工具類實現限流

基于AOP實現限流

基于Redis實現限流(適用于分布式)

使用Redisson實現限流(適用于分布式)

Sentinel限流(適用于分布式)

Nginx、Gateway限流.....(適用于分布式)



作者:碼出宇宙

歡迎關注微信公眾號 :碼出宇宙

掃描添加好友邀你進技術交流群,加我時注明【姓名+公司(學校)+職位】