常見(jiàn)的限流算法與實(shí)現(xiàn)



限流的實(shí)現(xiàn)

常見(jiàn)的限流算法:



限流是對(duì)某一時(shí)間窗口內(nèi)的請(qǐng)求數(shù)進(jìn)行限制,保持系統(tǒng)的可用性和穩(wěn)定性,防止因流量暴增而導(dǎo)致的系統(tǒng)運(yùn)行緩慢或宕機(jī)。

常見(jiàn)的限流算法有三種:

計(jì)數(shù)器限流(固定窗口)

原理:

時(shí)間線劃分為多個(gè)獨(dú)立且固定大小窗口;

落在每一個(gè)時(shí)間窗口內(nèi)的請(qǐng)求就將計(jì)數(shù)器加1;

如果計(jì)數(shù)器超過(guò)了限流閾值,則后續(xù)落在該窗口的請(qǐng)求都會(huì)被拒絕。但時(shí)間達(dá)到下一個(gè)時(shí)間窗口時(shí),計(jì)數(shù)器會(huì)被重置為0。



案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: FixedWindow
 * @Description: 固定窗口算法
 * @Version 1.0
 **/
public class FixedWindow {

    /**
     * 閾值
     */
    private static Integer QPS = 2;
    
    /**
     * 時(shí)間窗口(毫秒)
     */
    private static long TIME_WINDOWS = 1000;
    
    /**
     * 計(jì)數(shù)器
     */
    private static AtomicInteger REQ_COUNT = new AtomicInteger();

    /**
     * 窗口開(kāi)始時(shí)間
     */
    private static long START_TIME = System.currentTimeMillis();

    public synchronized static boolean tryAcquire() {
        //超時(shí)窗口
        if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) {
            REQ_COUNT.set(0);
            START_TIME = System.currentTimeMillis();
        }
        return REQ_COUNT.incrementAndGet() <= QPS;
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(250);
            LocalTime now = LocalTime.now();
            if (!tryAcquire()) {
                System.out.println(now + " 被限流");
            } else {
                System.out.println(now + " 做點(diǎn)什么");
            }
        }
    }
}

問(wèn)題:

雖然我們限制了 QPS 為 2,但是當(dāng)遇到時(shí)間窗口的臨界突變時(shí),如 1s 中的后 500 ms 和第 2s 的前 500ms 時(shí),雖然是加起來(lái)是 1s 時(shí)間,卻可以被請(qǐng)求 4 次。



滑動(dòng)窗口

滑動(dòng)窗口算法是對(duì)固定窗口算法的改進(jìn)



原理:

將單位時(shí)間劃分為多個(gè)區(qū)間,一般都是均分為多個(gè)小的時(shí)間段;

每一個(gè)區(qū)間內(nèi)都有一個(gè)計(jì)數(shù)器,有一個(gè)請(qǐng)求落在該區(qū)間內(nèi),則該區(qū)間內(nèi)的計(jì)數(shù)器就會(huì)加一;

每過(guò)一個(gè)時(shí)間段,時(shí)間窗口就會(huì)往右滑動(dòng)一格,拋棄最老的一個(gè)區(qū)間,并納入新的一個(gè)區(qū)間;

計(jì)算整個(gè)時(shí)間窗口內(nèi)的請(qǐng)求總數(shù)時(shí)會(huì)累加所有的時(shí)間片段內(nèi)的計(jì)數(shù)器,計(jì)數(shù)總和超過(guò)了限制數(shù)量,則本窗口內(nèi)所有的請(qǐng)求都被丟棄。



上圖的示例中,每 500ms 滑動(dòng)一次窗口,可以發(fā)現(xiàn)窗口滑動(dòng)的間隔越短,時(shí)間窗口的臨界突變問(wèn)題發(fā)生的概率也就越小,不過(guò)只要有時(shí)間窗口的存在,還是有可能發(fā)生時(shí)間窗口的臨界突變問(wèn)題。

代碼案例:

package com.example.studyproject.algorithm;

import lombok.Data;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: SlidingWindow
 * @Description: 滑動(dòng)窗口
 * @Version 1.0
 **/
public class SlidingWindow {
    /**
     * 閾值
     */
    private int qps = 2;
    /**
     * 時(shí)間窗口總大小(毫秒)
     */
    private long windowSize = 1000;
    /**
     * 多少個(gè)子窗口
     */
    private Integer windowCount = 10;
    /**
     * 窗口列表
     */
    private WindowInfo[] windowArray = new WindowInfo[windowCount];

    public SlidingWindow(int qps) {
        this.qps = qps;
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < windowArray.length; i++) {
            windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
        }
    }

    /**
     * 1. 計(jì)算當(dāng)前時(shí)間窗口
     * 2. 更新當(dāng)前窗口計(jì)數(shù) & 重置過(guò)期窗口計(jì)數(shù)
     * 3. 當(dāng)前 QPS 是否超過(guò)限制
     * @return 是否被限流
     */
    public synchronized boolean tryAcquire() {
        long currentTimeMillis = System.currentTimeMillis();
        // 1. 計(jì)算當(dāng)前時(shí)間窗口
        int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
        // 2.  更新當(dāng)前窗口計(jì)數(shù) & 重置過(guò)期窗口計(jì)數(shù)
        int sum = 0;
        for (int i = 0; i < windowArray.length; i++) {
            WindowInfo windowInfo = windowArray[i];
            if ((currentTimeMillis - windowInfo.getTime()) > windowSize) {
                windowInfo.getNumber().set(0);
                windowInfo.setTime(currentTimeMillis);
            }
            if (currentIndex == i && windowInfo.getNumber().get() < qps) {
                windowInfo.getNumber().incrementAndGet();
            }
            sum = sum + windowInfo.getNumber().get();
        }
        // 3. 當(dāng)前 QPS 是否超過(guò)限制
        return sum <= qps;
    }

    @Data
    private class WindowInfo {
        // 窗口開(kāi)始時(shí)間
        private Long time;
        // 計(jì)數(shù)器
        private AtomicInteger number;

        public WindowInfo(long time, AtomicInteger number) {
            this.time = time;
            this.number = number;
        }
        // get...set...
    }

    public static void main(String[] args) throws InterruptedException {
        int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
        System.out.println(String.format("當(dāng)前QPS限制為:%d,當(dāng)前測(cè)試次數(shù):%d,間隔:%dms,預(yù)計(jì)成功次數(shù):%d", qps, count, sleep, success));
        success = 0;
        SlidingWindow myRateLimiter = new SlidingWindow(qps);
        for (int i = 0; i < count; i++) {
            Thread.sleep(sleep);
            if (myRateLimiter.tryAcquire()) {
                success++;
                if (success % qps == 0) {
                    System.out.println(LocalTime.now() + ": success, ");
                } else {
                    System.out.print(LocalTime.now() + ": success, ");
                }
            } else {
                System.out.println(LocalTime.now() + ": fail");
            }
        }
        System.out.println();
        System.out.println("實(shí)際測(cè)試成功次數(shù):" + success);
    }
}
輸出結(jié)果:






已連接到目標(biāo) VM, 地址: ''127.0.0.1:50101',傳輸: '套接字''
當(dāng)前QPS限制為:2,當(dāng)前測(cè)試次數(shù):20,間隔:300ms,預(yù)計(jì)成功次數(shù):12
14:20:38.833: success, 14:20:39.142: success,
14:20:39.455: success, 14:20:39.766: success,
14:20:40.077: fail
14:20:40.377: fail
14:20:40.678: success, 14:20:40.992: success,
14:20:41.307: fail
14:20:41.621: fail
14:20:41.922: success, 14:20:42.229: success,
14:20:42.539: fail
14:20:42.840: fail
14:20:43.140: success, 14:20:43.455: success,
14:20:43.756: fail
14:20:44.070: fail
14:20:44.386: success, 14:20:44.687: success,

實(shí)際測(cè)試成功次數(shù):12
與目標(biāo) VM 斷開(kāi)連接, 地址為: ''127.0.0.1:50101',傳輸: '套接字''

進(jìn)程已結(jié)束,退出代碼0

漏桶算法



漏桶算法思路很簡(jiǎn)單,我們把水比作是請(qǐng)求,漏桶比作是系統(tǒng)處理能力極限,水先進(jìn)入到漏桶里,漏桶里的水按一定速率流出,當(dāng)流出的速率小于流入的速率時(shí),由于漏桶容量有限,后續(xù)進(jìn)入的水直接溢出(拒絕請(qǐng)求),以此實(shí)現(xiàn)限流。

由介紹可以知道,漏桶模式中的消費(fèi)處理總是能以恒定的速度進(jìn)行,可以很好的保護(hù)自身系統(tǒng)不被突如其來(lái)的流量沖垮;但是這也是漏桶模式的缺點(diǎn),假設(shè) QPS 為 2,同時(shí) 2 個(gè)請(qǐng)求進(jìn)來(lái),2 個(gè)請(qǐng)求并不能同時(shí)進(jìn)行處理響應(yīng),因?yàn)槊?1s / 2= 500ms 只能處理一個(gè)請(qǐng)求。

代碼案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName: LeakyBucket
 * @Description: 漏桶算法
 * @Author: Ze WANG
 * @Date: 2022/9/26
 * @Version 1.0
 **/
public class LeakyBucket {

    /**
     * 水桶的大小
     */
    private final int bucket;

    /**
     * qps,水露出的速度
     */
    private int qps;

    /**
     * 當(dāng)前水量
     */
    private long water;

    private long timeStamp = System.currentTimeMillis();

    public LeakyBucket(int bucket, int qps) {
        this.bucket = bucket;
        this.qps = qps;
    }

    /**
     * 桶是否已經(jīng)滿了
     * @return true未滿
     */
    public boolean tryAcquire(){
        //1.計(jì)算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp)/1000;
        water = Math.max(0,water-timeGap*qps);
        timeStamp = now;

        // 如果未滿,放行
        if(water< bucket){
            water += 1;
            return true;
        }
        return false;
    }

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        ExecutorService singleThread = Executors.newSingleThreadExecutor();

        LeakyBucket rateLimiter = new LeakyBucket(20, 2);
        // 存儲(chǔ)流量的隊(duì)列
        Queue<Integer> queue = new LinkedList<>();
        // 模擬請(qǐng)求  不確定速率注水
        singleThread.execute(() -> {
            int count = 0;
            while (true) {
                count++;
                boolean flag = rateLimiter.tryAcquire();
                if (flag) {
                    queue.offer(count);
                    System.out.println(count + "--------流量被放行--------");
                } else {
                    System.out.println(count + "流量被限制");
                }
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 模擬處理請(qǐng)求 固定速率漏水
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            if (!queue.isEmpty()) {
                System.out.println(queue.poll() + "被處理");
            }
        }, 0, 100, TimeUnit.MILLISECONDS);

        // 保證主線程不會(huì)退出
        while (true) {
            Thread.sleep(10000);
        }
    }


}

令牌桶算法



令牌桶算法的原理也比較簡(jiǎn)單,我們可以理解成醫(yī)院的掛號(hào)看病,只有拿到號(hào)以后才可以進(jìn)行診病。

系統(tǒng)會(huì)維護(hù)一個(gè)令牌(token)桶,以一個(gè)恒定的速度往桶里放入令牌(token),這時(shí)如果有請(qǐng)求進(jìn)來(lái)想要被處理,則需要先從桶里獲取一個(gè)令牌(token),當(dāng)桶里沒(méi)有令牌(token)可取時(shí),則該請(qǐng)求將被拒絕服務(wù)。令牌桶算法通過(guò)控制桶的容量、發(fā)放令牌的速率,來(lái)達(dá)到對(duì)請(qǐng)求的限制。

原理:

令牌桶的實(shí)現(xiàn)思路類似于生產(chǎn)者和消費(fèi)之間的關(guān)系。

系統(tǒng)服務(wù)作為生產(chǎn)者,按照指定頻率向桶(容器)中添加令牌,如 QPS 為 2,每 500ms 向桶中添加一個(gè)令牌,如果桶中令牌數(shù)量達(dá)到閾值,則不再添加。

請(qǐng)求執(zhí)行作為消費(fèi)者,每個(gè)請(qǐng)求都需要去桶中拿取一個(gè)令牌,取到令牌則繼續(xù)執(zhí)行;如果桶中無(wú)令牌可取,就觸發(fā)拒絕策略,可以是超時(shí)等待,也可以是直接拒絕本次請(qǐng)求,由此達(dá)到限流目的。

思考:

1s / 閾值(QPS) = 令牌添加時(shí)間間隔。

桶的容量等于限流的閾值,令牌數(shù)量達(dá)到閾值時(shí),不再添加。

可以適應(yīng)流量突發(fā),N 個(gè)請(qǐng)求到來(lái)只需要從桶中獲取 N 個(gè)令牌就可以繼續(xù)處理。

有啟動(dòng)過(guò)程,令牌桶啟動(dòng)時(shí)桶中無(wú)令牌,然后按照令牌添加時(shí)間間隔添加令牌,若啟動(dòng)時(shí)就有閾值數(shù)量的請(qǐng)求過(guò)來(lái),會(huì)因?yàn)橥爸袥](méi)有足夠的令牌而觸發(fā)拒絕策略,不過(guò)如 RateLimiter 限流工具已經(jīng)優(yōu)化了這類問(wèn)題。

代碼案例

使用Google封裝的令牌桶RateLimiter   

/**
     * 代碼中限制 QPS 為 2,也就是每隔 500ms 生成一個(gè)令牌,但是程序每隔 250ms 獲取一次令牌,所以兩次獲取中只有一次會(huì)成功。
     */
    public static void main(String[] args) throws InterruptedException {
        RateLimiter rateLimiter = RateLimiter.create(2);

        for (int i = 0; i < 10; i++) {
            String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
            System.out.println(time + ":" + rateLimiter.tryAcquire());
            Thread.sleep(250);
        }
    }

限流的實(shí)現(xiàn)

限流有很多種方法實(shí)現(xiàn):

基于Guava工具類實(shí)現(xiàn)限流

基于AOP實(shí)現(xiàn)限流

基于Redis實(shí)現(xiàn)限流(適用于分布式)

使用Redisson實(shí)現(xiàn)限流(適用于分布式)

Sentinel限流(適用于分布式)

Nginx、Gateway限流.....(適用于分布式)



作者:碼出宇宙

歡迎關(guān)注微信公眾號(hào) :碼出宇宙

掃描添加好友邀你進(jìn)技術(shù)交流群,加我時(shí)注明【姓名+公司(學(xué)校)+職位】